17 #ifndef __NETWORK_HPP__ 18 #define __NETWORK_HPP__ 65 explicit Network(
const std::set<process::UPID>&
pids);
75 void set(
const std::set<process::UPID>&
pids);
87 template <
typename Req,
typename Res>
91 const std::set<process::UPID>&
filter = std::set<process::UPID>())
const;
98 const std::set<process::UPID>&
filter = std::set<process::UPID>())
const;
113 const std::string& servers,
115 const std::string& znode,
117 const std::set<process::UPID>&
base = std::set<process::UPID>());
127 void watch(
const std::set<zookeeper::Group::Membership>& expected);
130 void watched(
const process::Future<std::set<zookeeper::Group::Membership>>&);
140 std::set<process::UPID>
base;
178 link(pid, RemoteConnection::RECONNECT);
195 void set(
const std::set<process::UPID>& _pids)
208 if (satisfied(size, mode)) {
212 Watch*
watch =
new Watch(size, mode);
213 watches.push_back(watch);
217 return watch->promise.future();
222 template <
typename Req,
typename Res>
226 const std::set<process::UPID>&
filter)
228 std::set<process::Future<Res>> futures;
229 typename std::set<process::UPID>::const_iterator iterator;
230 for (iterator =
pids.begin(); iterator !=
pids.end(); ++iterator) {
232 if (filter.count(pid) == 0) {
233 futures.insert(protocol(pid, req));
240 template <
typename M>
243 const std::set<process::UPID>&
filter)
245 std::set<process::UPID>::const_iterator iterator;
246 for (iterator =
pids.begin(); iterator !=
pids.end(); ++iterator) {
248 if (filter.count(pid) == 0) {
261 foreach (Watch*
watch, watches) {
262 watch->promise.fail(
"Network is being terminated");
286 const size_t size = watches.size();
287 for (
size_t i = 0; i <
size; i++) {
288 Watch*
watch = watches.front();
291 if (satisfied(watch->size, watch->mode)) {
292 watch->promise.set(
pids.size());
295 watches.push_back(watch);
318 LOG(FATAL) <<
"Invalid watch mode";
323 std::set<process::UPID>
pids;
324 std::deque<Watch*> watches;
375 template <
typename Req,
typename Res>
379 const std::set<process::UPID>&
filter)
const 382 protocol, req, filter);
386 template <
typename M>
389 const std::set<process::UPID>&
filter)
const 393 = &NetworkProcess::broadcast<M>;
400 const std::string& servers,
402 const std::string& znode,
404 const std::set<process::UPID>& _base)
405 :
group(servers, timeout, znode, auth),
411 watch(std::set<zookeeper::Group::Membership>());
415 inline void ZooKeeperNetwork::watch(
416 const std::set<zookeeper::Group::Membership>& expected)
418 memberships =
group.watch(expected);
424 inline void ZooKeeperNetwork::watched(
432 LOG(FATAL) <<
"Failed to watch ZooKeeper group: " << memberships.
failure();
437 LOG(INFO) <<
"ZooKeeper group memberships changed";
440 std::vector<process::Future<Option<std::string>>> futures;
443 futures.push_back(
group.data(membership));
458 inline void ZooKeeperNetwork::collected(
461 if (datas.isFailed()) {
462 LOG(WARNING) <<
"Failed to get data for ZooKeeper group members: " 467 watch(std::set<zookeeper::Group::Membership>());
473 std::set<process::UPID>
pids;
480 CHECK(pid) <<
"Failed to parse '" << data.
get() <<
"'";
485 LOG(INFO) <<
"ZooKeeper group PIDs: " <<
stringify(pids);
491 watch(memberships.
get());
494 #endif // __NETWORK_HPP__ std::string generate(const std::string &prefix="")
Returns 'prefix(N)' where N represents the number of instances where the same prefix (wrt...
NetworkProcess()
Definition: network.hpp:152
void remove(const process::UPID &pid)
Definition: network.hpp:186
Definition: nothing.hpp:16
Try< bool > update(const std::string &link, const Handle &parent, uint16_t protocol, const action::Mirror &mirror)
Try< Bytes > size(const std::string &path, const FollowSymlink follow=FollowSymlink::FOLLOW_SYMLINK)
Definition: stat.hpp:130
const T & get() const
Definition: future.hpp:1294
ZooKeeperNetwork(const std::string &servers, const Duration &timeout, const std::string &znode, const Option< zookeeper::Authentication > &auth, const std::set< process::UPID > &base=std::set< process::UPID >())
Definition: network.hpp:399
void add(const process::UPID &pid)
Definition: network.hpp:350
Definition: future.hpp:668
NetworkProcess(const std::set< process::UPID > &pids)
Definition: network.hpp:154
Definition: network.hpp:59
void set(const std::set< process::UPID > &pids)
Definition: network.hpp:362
UPID spawn(ProcessBase *process, bool manage=false)
Spawn a new process.
Definition: duration.hpp:32
void terminate(const UPID &pid, bool inject=true)
Sends a TerminateEvent to the given process.
Definition: network.hpp:56
Future< std::vector< T > > collect(const std::vector< Future< T >> &futures)
Definition: collect.hpp:274
bool isSome() const
Definition: option.hpp:116
Definition: network.hpp:51
std::set< process::Future< Res > > broadcast(const Protocol< Req, Res > &protocol, const Req &req, const std::set< process::UPID > &filter)
Definition: network.hpp:223
process::Future< size_t > watch(size_t size, WatchMode mode=NOT_EQUAL_TO) const
Definition: network.hpp:368
_Deferred< F > defer(F &&f)
Definition: executor.hpp:54
void dispatch(const PID< T > &pid, void(T::*method)())
Definition: dispatch.hpp:174
Network()
Definition: network.hpp:328
Definition: network.hpp:149
Definition: network.hpp:58
An "untyped" PID, used to encapsulate the process ID for lower-layer abstractions (eg...
Definition: pid.hpp:39
const Future< T > & onAny(AnyCallback &&callback) const
Definition: future.hpp:1442
Definition: duration.hpp:207
virtual ~Network()
Definition: network.hpp:342
Definition: network.hpp:60
const T & get() const &
Definition: option.hpp:119
Protocol< PromiseRequest, PromiseResponse > promise
process::Future< std::set< process::Future< Res > > > broadcast(const Protocol< Req, Res > &protocol, const Req &req, const std::set< process::UPID > &filter=std::set< process::UPID >()) const
Definition: network.hpp:376
Definition: network.hpp:57
Definition: protobuf.hpp:108
Nothing broadcast(const M &m, const std::set< process::UPID > &filter)
Definition: network.hpp:241
void remove(const process::UPID &pid)
Definition: network.hpp:356
bool wait(const UPID &pid, const Duration &duration=Seconds(-1))
Wait for the process to exit for no more than the specified seconds.
Definition: protobuf.hpp:459
void finalize() override
Invoked when a process is terminated.
Definition: network.hpp:259
#define UNREACHABLE()
Definition: unreachable.hpp:22
#define CHECK_READY(expression)
Definition: check.hpp:29
void add(const process::UPID &pid)
Definition: network.hpp:160
Definition: network.hpp:61
Definition: executor.hpp:48
WatchMode
Definition: network.hpp:54
Future< T > after(const Duration &duration, lambda::CallableOnce< Future< T >(const Future< T > &)> f) const
Definition: future.hpp:1692
Try< mode_t > mode(const std::string &path, const FollowSymlink follow=FollowSymlink::FOLLOW_SYMLINK)
Definition: stat.hpp:168
Try< Nothing > bind(int_fd s, const Address &address)
Definition: network.hpp:46
Definition: network.hpp:109
const std::string & failure() const
Definition: future.hpp:1320
std::string stringify(int flags)
Definition: executor.hpp:29
void set(const std::set< process::UPID > &_pids)
Definition: network.hpp:195
Try< std::set< pid_t > > pids()
Definition: freebsd.hpp:62
process::Future< size_t > watch(size_t size, Network::WatchMode mode)
Definition: network.hpp:206
void filter(Filter *filter)
bool isFailed() const
Definition: future.hpp:1229
Future< size_t > send(const int_fd &fd, const void *buf, size_t size)