Apache Mesos
network.hpp
Go to the documentation of this file.
1 // Licensed to the Apache Software Foundation (ASF) under one
2 // or more contributor license agreements. See the NOTICE file
3 // distributed with this work for additional information
4 // regarding copyright ownership. The ASF licenses this file
5 // to you under the Apache License, Version 2.0 (the
6 // "License"); you may not use this file except in compliance
7 // with the License. You may obtain a copy of the License at
8 //
9 // http://www.apache.org/licenses/LICENSE-2.0
10 //
11 // Unless required by applicable law or agreed to in writing, software
12 // distributed under the License is distributed on an "AS IS" BASIS,
13 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 // See the License for the specific language governing permissions and
15 // limitations under the License.
16 
17 #ifndef __NETWORK_HPP__
18 #define __NETWORK_HPP__
19 
20 // TODO(benh): Eventually move and associate this code with the
21 // libprocess protobuf code rather than keep it here.
22 
23 #include <list>
24 #include <set>
25 #include <string>
26 
28 
29 #include <process/collect.hpp>
30 #include <process/executor.hpp>
31 #include <process/id.hpp>
32 #include <process/protobuf.hpp>
33 
34 #include <stout/duration.hpp>
35 #include <stout/foreach.hpp>
36 #include <stout/lambda.hpp>
37 #include <stout/nothing.hpp>
38 #include <stout/set.hpp>
39 #include <stout/unreachable.hpp>
40 
41 #include "logging/logging.hpp"
42 
43 // Forward declaration.
44 class NetworkProcess;
45 
46 // A "network" is a collection of protobuf processes (may be local
47 // and/or remote). A network abstracts away the details of maintaining
48 // which processes are waiting to receive messages and requests in the
49 // presence of failures and dynamic reconfiguration.
50 class Network
51 {
52 public:
53  enum WatchMode
54  {
61  };
62 
63  Network();
64  explicit Network(const std::set<process::UPID>& pids);
65  virtual ~Network();
66 
67  // Adds a PID to this network.
68  void add(const process::UPID& pid);
69 
70  // Removes a PID from this network.
71  void remove(const process::UPID& pid);
72 
73  // Set the PIDs that are part of this network.
74  void set(const std::set<process::UPID>& pids);
75 
76  // Returns a future which gets set when the network size satisfies
77  // the constraint specified by 'size' and 'mode'. For example, if
78  // 'size' is 2 and 'mode' is GREATER_THAN, then the returned future
79  // will get set when the size of the network is greater than 2.
81  size_t size,
82  WatchMode mode = NOT_EQUAL_TO) const;
83 
84  // Sends a request to each member of the network and returns a set
85  // of futures that represent their responses.
86  template <typename Req, typename Res>
88  const Protocol<Req, Res>& protocol,
89  const Req& req,
90  const std::set<process::UPID>& filter = std::set<process::UPID>()) const;
91 
92  // Sends a message to each member of the network. The returned
93  // future is set when the message is broadcasted.
94  template <typename M>
96  const M& m,
97  const std::set<process::UPID>& filter = std::set<process::UPID>()) const;
98 
99 private:
100  // Not copyable, not assignable.
101  Network(const Network&);
102  Network& operator=(const Network&);
103 
104  NetworkProcess* process;
105 };
106 
107 
108 class ZooKeeperNetwork : public Network
109 {
110 public:
112  const std::string& servers,
113  const Duration& timeout,
114  const std::string& znode,
116  const std::set<process::UPID>& base = std::set<process::UPID>());
117 
118 private:
119  typedef ZooKeeperNetwork This;
120 
121  // Not copyable, not assignable.
123  ZooKeeperNetwork& operator=(const ZooKeeperNetwork&);
124 
125  // Helper that sets up a watch on the group.
126  void watch(const std::set<zookeeper::Group::Membership>& expected);
127 
128  // Invoked when the group memberships have changed.
129  void watched(const process::Future<std::set<zookeeper::Group::Membership>>&);
130 
131  // Invoked when group members data has been collected.
132  void collected(
134 
137 
138  // The set of PIDs that are always in the network.
139  std::set<process::UPID> base;
140 
141  // NOTE: The declaration order here is important. We want to delete
142  // the 'executor' before we delete the 'group' so that we don't get
143  // spurious fatal errors when the 'group' is being deleted.
144  process::Executor executor;
145 };
146 
147 
148 class NetworkProcess : public ProtobufProcess<NetworkProcess>
149 {
150 public:
151  NetworkProcess() : ProcessBase(process::ID::generate("log-network")) {}
152 
153  explicit NetworkProcess(const std::set<process::UPID>& pids)
154  : ProcessBase(process::ID::generate("log-network"))
155  {
156  set(pids);
157  }
158 
159  void add(const process::UPID& pid)
160  {
161  // Link in order to keep a socket open (more efficient).
162  //
163  // We force a reconnect to avoid sending on a "stale" socket. In
164  // general when linking to a remote process, the underlying TCP
165  // connection may become "stale". RFC 793 refers to this as a
166  // "half-open" connection: the RST is not sent upon the death
167  // of the peer and a RST will only be received once further
168  // data is sent on the socket.
169  //
170  // "Half-open" (aka "stale") connections are typically addressed
171  // via keep-alives (see RFC 1122 4.2.3.6) to periodically probe
172  // the connection. In this case, we can rely on the (re-)addition
173  // of the network member to create a new connection.
174  //
175  // See MESOS-5576 for a scenario where reconnecting helps avoid
176  // dropped messages.
178 
179  pids.insert(pid);
180 
181  // Update any pending watches.
182  update();
183  }
184 
185  void remove(const process::UPID& pid)
186  {
187  // TODO(benh): unlink(pid);
188  pids.erase(pid);
189 
190  // Update any pending watches.
191  update();
192  }
193 
194  void set(const std::set<process::UPID>& _pids)
195  {
196  pids.clear();
197  foreach (const process::UPID& pid, _pids) {
198  add(pid); // Also does a link.
199  }
200 
201  // Update any pending watches.
202  update();
203  }
204 
206  {
207  if (satisfied(size, mode)) {
208  return pids.size();
209  }
210 
211  Watch* watch = new Watch(size, mode);
212  watches.push_back(watch);
213 
214  // TODO(jieyu): Consider deleting 'watch' if the returned future
215  // is discarded by the user.
216  return watch->promise.future();
217  }
218 
219  // Sends a request to each of the group members and returns a set
220  // of futures that represent their responses.
221  template <typename Req, typename Res>
222  std::set<process::Future<Res>> broadcast(
223  const Protocol<Req, Res>& protocol,
224  const Req& req,
225  const std::set<process::UPID>& filter)
226  {
227  std::set<process::Future<Res>> futures;
228  typename std::set<process::UPID>::const_iterator iterator;
229  for (iterator = pids.begin(); iterator != pids.end(); ++iterator) {
230  const process::UPID& pid = *iterator;
231  if (filter.count(pid) == 0) {
232  futures.insert(protocol(pid, req));
233  }
234  }
235  return futures;
236  }
237 
238  // Sends a request to each of the group members without expecting responses.
239  template <typename M>
241  const M& m,
242  const std::set<process::UPID>& filter)
243  {
244  std::set<process::UPID>::const_iterator iterator;
245  for (iterator = pids.begin(); iterator != pids.end(); ++iterator) {
246  const process::UPID& pid = *iterator;
247  if (filter.count(pid) == 0) {
248  // NOTE: Just send this message as the network process itself
249  // since we don't need to deliver responses back to the caller.
250  // Incoming messages addressed to the network are simply dropped.
251  send(pid, m);
252  }
253  }
254  return Nothing();
255  }
256 
257 protected:
258  virtual void finalize()
259  {
260  foreach (Watch* watch, watches) {
261  watch->promise.fail("Network is being terminated");
262  delete watch;
263  }
264  watches.clear();
265  }
266 
267 private:
268  struct Watch
269  {
270  Watch(size_t _size, Network::WatchMode _mode)
271  : size(_size), mode(_mode) {}
272 
273  size_t size;
276  };
277 
278  // Not copyable, not assignable.
280  NetworkProcess& operator=(const NetworkProcess&);
281 
282  // Notifies the change of the network.
283  void update()
284  {
285  const size_t size = watches.size();
286  for (size_t i = 0; i < size; i++) {
287  Watch* watch = watches.front();
288  watches.pop_front();
289 
290  if (satisfied(watch->size, watch->mode)) {
291  watch->promise.set(pids.size());
292  delete watch;
293  } else {
294  watches.push_back(watch);
295  }
296  }
297  }
298 
299  // Returns true if the current size of the network satisfies the
300  // constraint specified by 'size' and 'mode'.
301  bool satisfied(size_t size, Network::WatchMode mode)
302  {
303  switch (mode) {
304  case Network::EQUAL_TO:
305  return pids.size() == size;
307  return pids.size() != size;
308  case Network::LESS_THAN:
309  return pids.size() < size;
311  return pids.size() <= size;
313  return pids.size() > size;
315  return pids.size() >= size;
316  default:
317  LOG(FATAL) << "Invalid watch mode";
318  UNREACHABLE();
319  }
320  }
321 
322  std::set<process::UPID> pids;
323  std::list<Watch*> watches;
324 };
325 
326 
328 {
329  process = new NetworkProcess();
330  process::spawn(process);
331 }
332 
333 
334 inline Network::Network(const std::set<process::UPID>& pids)
335 {
336  process = new NetworkProcess(pids);
337  process::spawn(process);
338 }
339 
340 
342 {
343  process::terminate(process);
344  process::wait(process);
345  delete process;
346 }
347 
348 
349 inline void Network::add(const process::UPID& pid)
350 {
351  process::dispatch(process, &NetworkProcess::add, pid);
352 }
353 
354 
355 inline void Network::remove(const process::UPID& pid)
356 {
358 }
359 
360 
361 inline void Network::set(const std::set<process::UPID>& pids)
362 {
363  process::dispatch(process, &NetworkProcess::set, pids);
364 }
365 
366 
368  size_t size, Network::WatchMode mode) const
369 {
370  return process::dispatch(process, &NetworkProcess::watch, size, mode);
371 }
372 
373 
374 template <typename Req, typename Res>
376  const Protocol<Req, Res>& protocol,
377  const Req& req,
378  const std::set<process::UPID>& filter) const
379 {
380  return process::dispatch(process, &NetworkProcess::broadcast<Req, Res>,
381  protocol, req, filter);
382 }
383 
384 
385 template <typename M>
387  const M& m,
388  const std::set<process::UPID>& filter) const
389 {
390  // Need to disambiguate overloaded function.
391  Nothing (NetworkProcess::*broadcast)(const M&, const std::set<process::UPID>&)
392  = &NetworkProcess::broadcast<M>;
393 
394  return process::dispatch(process, broadcast, m, filter);
395 }
396 
397 
399  const std::string& servers,
400  const Duration& timeout,
401  const std::string& znode,
403  const std::set<process::UPID>& _base)
404  : group(servers, timeout, znode, auth),
405  base(_base)
406 {
407  // PIDs from the base set are in the network from beginning.
408  set(base);
409 
410  watch(std::set<zookeeper::Group::Membership>());
411 }
412 
413 
414 inline void ZooKeeperNetwork::watch(
415  const std::set<zookeeper::Group::Membership>& expected)
416 {
417  memberships = group.watch(expected);
418  memberships
419  .onAny(executor.defer(lambda::bind(&This::watched, this, lambda::_1)));
420 }
421 
422 
423 inline void ZooKeeperNetwork::watched(
424  const process::Future<std::set<zookeeper::Group::Membership>>&)
425 {
426  if (memberships.isFailed()) {
427  // We can't do much here, we could try creating another Group but
428  // that might just continue indefinitely, so we fail early
429  // instead. Note that Group handles all retryable/recoverable
430  // ZooKeeper errors internally.
431  LOG(FATAL) << "Failed to watch ZooKeeper group: " << memberships.failure();
432  }
433 
434  CHECK_READY(memberships); // Not expecting Group to discard futures.
435 
436  LOG(INFO) << "ZooKeeper group memberships changed";
437 
438  // Get data for each membership in order to convert them to PIDs.
439  std::list<process::Future<Option<std::string>>> futures;
440 
441  foreach (const zookeeper::Group::Membership& membership, memberships.get()) {
442  futures.push_back(group.data(membership));
443  }
444 
445  process::collect(futures)
446  .after(Seconds(5),
448  // Handling time outs when collecting membership
449  // data. For now, a timeout is treated as a failure.
450  datas.discard();
451  return process::Failure("Timed out");
452  })
453  .onAny(executor.defer(lambda::bind(&This::collected, this, lambda::_1)));
454 }
455 
456 
457 inline void ZooKeeperNetwork::collected(
459 {
460  if (datas.isFailed()) {
461  LOG(WARNING) << "Failed to get data for ZooKeeper group members: "
462  << datas.failure();
463 
464  // Try again later assuming empty group. Note that this does not
465  // remove any of the current group members.
466  watch(std::set<zookeeper::Group::Membership>());
467  return;
468  }
469 
470  CHECK_READY(datas); // Not expecting collect to discard futures.
471 
472  std::set<process::UPID> pids;
473 
474  foreach (const Option<std::string>& data, datas.get()) {
475  // Data could be None if the membership is gone before its
476  // content can be read.
477  if (data.isSome()) {
478  process::UPID pid(data.get());
479  CHECK(pid) << "Failed to parse '" << data.get() << "'";
480  pids.insert(pid);
481  }
482  }
483 
484  LOG(INFO) << "ZooKeeper group PIDs: " << stringify(pids);
485 
486  // Update the network. We make sure that the PIDs from the base set
487  // are always in the network.
488  set(pids | base);
489 
490  watch(memberships.get());
491 }
492 
493 #endif // __NETWORK_HPP__
std::string generate(const std::string &prefix="")
Returns &#39;prefix(N)&#39; where N represents the number of instances where the same prefix (wrt...
NetworkProcess()
Definition: network.hpp:151
void remove(const process::UPID &pid)
Definition: network.hpp:185
Definition: nothing.hpp:16
Try< Bytes > size(const std::string &path, const FollowSymlink follow=FollowSymlink::FOLLOW_SYMLINK)
Definition: stat.hpp:100
ProcessBase(const std::string &id="")
const T & get() const
Definition: future.hpp:1310
ZooKeeperNetwork(const std::string &servers, const Duration &timeout, const std::string &znode, const Option< zookeeper::Authentication > &auth, const std::set< process::UPID > &base=std::set< process::UPID >())
Definition: network.hpp:398
void add(const process::UPID &pid)
Definition: network.hpp:349
Definition: future.hpp:664
NetworkProcess(const std::set< process::UPID > &pids)
Definition: network.hpp:153
Definition: network.hpp:58
event_base * base
void set(const std::set< process::UPID > &pids)
Definition: network.hpp:361
UPID spawn(ProcessBase *process, bool manage=false)
Spawn a new process.
Definition: duration.hpp:32
void terminate(const UPID &pid, bool inject=true)
Sends a TerminateEvent to the given process.
Definition: network.hpp:55
void send(const process::UPID &to, const google::protobuf::Message &message)
Definition: protobuf.hpp:118
bool isSome() const
Definition: option.hpp:115
Definition: network.hpp:50
std::set< process::Future< Res > > broadcast(const Protocol< Req, Res > &protocol, const Req &req, const std::set< process::UPID > &filter)
Definition: network.hpp:222
process::Future< size_t > watch(size_t size, WatchMode mode=NOT_EQUAL_TO) const
Definition: network.hpp:367
_Deferred< F > defer(F &&f)
Definition: executor.hpp:54
void dispatch(const PID< T > &pid, void(T::*method)())
Definition: dispatch.hpp:174
Network()
Definition: network.hpp:327
Definition: network.hpp:148
Definition: network.hpp:57
An &quot;untyped&quot; PID, used to encapsulate the process ID for lower-layer abstractions (eg...
Definition: pid.hpp:39
const Future< T > & onAny(AnyCallback &&callback) const
Definition: future.hpp:1458
Definition: duration.hpp:259
virtual ~Network()
Definition: network.hpp:341
Definition: network.hpp:59
const T & get() const &
Definition: option.hpp:118
Protocol< PromiseRequest, PromiseResponse > promise
process::Future< std::set< process::Future< Res > > > broadcast(const Protocol< Req, Res > &protocol, const Req &req, const std::set< process::UPID > &filter=std::set< process::UPID >()) const
Definition: network.hpp:375
Definition: network.hpp:56
UPID link(const UPID &pid, const RemoteConnection remote=RemoteConnection::REUSE)
Links with the specified UPID.
Definition: protobuf.hpp:100
If a persistent socket to the target pid does not exist, a new link is created.
Nothing broadcast(const M &m, const std::set< process::UPID > &filter)
Definition: network.hpp:240
void remove(const process::UPID &pid)
Definition: network.hpp:355
bool wait(const UPID &pid, const Duration &duration=Seconds(-1))
Wait for the process to exit for no more than the specified seconds.
Definition: protobuf.hpp:453
#define UNREACHABLE()
Definition: unreachable.hpp:22
Try< std::vector< Entry > > list(const std::string &hierarchy, const std::string &cgroup)
#define CHECK_READY(expression)
Definition: check.hpp:29
Result< Process > process(pid_t pid)
Definition: freebsd.hpp:30
Definition: grp.hpp:26
void add(const process::UPID &pid)
Definition: network.hpp:159
Definition: group.hpp:49
Definition: network.hpp:60
virtual void finalize()
Invoked when a process is terminated.
Definition: network.hpp:258
WatchMode
Definition: network.hpp:53
Future< T > after(const Duration &duration, lambda::CallableOnce< Future< T >(const Future< T > &)> f) const
Definition: future.hpp:1708
Try< mode_t > mode(const std::string &path, const FollowSymlink follow=FollowSymlink::FOLLOW_SYMLINK)
Definition: stat.hpp:126
Try< Nothing > bind(int_fd s, const Address &address)
Definition: network.hpp:46
Definition: network.hpp:108
const std::string & failure() const
Definition: future.hpp:1336
std::string stringify(int flags)
Definition: executor.hpp:29
void set(const std::set< process::UPID > &_pids)
Definition: network.hpp:194
Try< std::set< pid_t > > pids()
Definition: freebsd.hpp:62
Definition: group.hpp:57
process::Future< size_t > watch(size_t size, Network::WatchMode mode)
Definition: network.hpp:205
void filter(Filter *filter)
bool isFailed() const
Definition: future.hpp:1245
Future< std::list< T > > collect(const std::list< Future< T >> &futures)
Definition: collect.hpp:270