Apache Mesos
network.hpp
Go to the documentation of this file.
1 // Licensed to the Apache Software Foundation (ASF) under one
2 // or more contributor license agreements. See the NOTICE file
3 // distributed with this work for additional information
4 // regarding copyright ownership. The ASF licenses this file
5 // to you under the Apache License, Version 2.0 (the
6 // "License"); you may not use this file except in compliance
7 // with the License. You may obtain a copy of the License at
8 //
9 // http://www.apache.org/licenses/LICENSE-2.0
10 //
11 // Unless required by applicable law or agreed to in writing, software
12 // distributed under the License is distributed on an "AS IS" BASIS,
13 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 // See the License for the specific language governing permissions and
15 // limitations under the License.
16 
17 #ifndef __NETWORK_HPP__
18 #define __NETWORK_HPP__
19 
20 // TODO(benh): Eventually move and associate this code with the
21 // libprocess protobuf code rather than keep it here.
22 
23 #include <deque>
24 #include <set>
25 #include <string>
26 #include <vector>
27 
29 
30 #include <process/collect.hpp>
31 #include <process/executor.hpp>
32 #include <process/id.hpp>
33 #include <process/protobuf.hpp>
34 
35 #include <stout/duration.hpp>
36 #include <stout/foreach.hpp>
37 #include <stout/lambda.hpp>
38 #include <stout/nothing.hpp>
39 #include <stout/set.hpp>
40 #include <stout/unreachable.hpp>
41 
42 #include "logging/logging.hpp"
43 
44 // Forward declaration.
45 class NetworkProcess;
46 
47 // A "network" is a collection of protobuf processes (may be local
48 // and/or remote). A network abstracts away the details of maintaining
49 // which processes are waiting to receive messages and requests in the
50 // presence of failures and dynamic reconfiguration.
51 class Network
52 {
53 public:
54  enum WatchMode
55  {
62  };
63 
64  Network();
65  explicit Network(const std::set<process::UPID>& pids);
66  virtual ~Network();
67 
68  // Adds a PID to this network.
69  void add(const process::UPID& pid);
70 
71  // Removes a PID from this network.
72  void remove(const process::UPID& pid);
73 
74  // Set the PIDs that are part of this network.
75  void set(const std::set<process::UPID>& pids);
76 
77  // Returns a future which gets set when the network size satisfies
78  // the constraint specified by 'size' and 'mode'. For example, if
79  // 'size' is 2 and 'mode' is GREATER_THAN, then the returned future
80  // will get set when the size of the network is greater than 2.
82  size_t size,
83  WatchMode mode = NOT_EQUAL_TO) const;
84 
85  // Sends a request to each member of the network and returns a set
86  // of futures that represent their responses.
87  template <typename Req, typename Res>
89  const Protocol<Req, Res>& protocol,
90  const Req& req,
91  const std::set<process::UPID>& filter = std::set<process::UPID>()) const;
92 
93  // Sends a message to each member of the network. The returned
94  // future is set when the message is broadcasted.
95  template <typename M>
97  const M& m,
98  const std::set<process::UPID>& filter = std::set<process::UPID>()) const;
99 
100 private:
101  // Not copyable, not assignable.
102  Network(const Network&);
103  Network& operator=(const Network&);
104 
106 };
107 
108 
109 class ZooKeeperNetwork : public Network
110 {
111 public:
113  const std::string& servers,
114  const Duration& timeout,
115  const std::string& znode,
117  const std::set<process::UPID>& base = std::set<process::UPID>());
118 
119 private:
120  typedef ZooKeeperNetwork This;
121 
122  // Not copyable, not assignable.
124  ZooKeeperNetwork& operator=(const ZooKeeperNetwork&);
125 
126  // Helper that sets up a watch on the group.
127  void watch(const std::set<zookeeper::Group::Membership>& expected);
128 
129  // Invoked when the group memberships have changed.
130  void watched(const process::Future<std::set<zookeeper::Group::Membership>>&);
131 
132  // Invoked when group members data has been collected.
133  void collected(
134  const process::Future<std::vector<Option<std::string>>>& datas);
135 
138 
139  // The set of PIDs that are always in the network.
140  std::set<process::UPID> base;
141 
142  // NOTE: The declaration order here is important. We want to delete
143  // the 'executor' before we delete the 'group' so that we don't get
144  // spurious fatal errors when the 'group' is being deleted.
145  process::Executor executor;
146 };
147 
148 
149 class NetworkProcess : public ProtobufProcess<NetworkProcess>
150 {
151 public:
152  NetworkProcess() : ProcessBase(process::ID::generate("log-network")) {}
153 
154  explicit NetworkProcess(const std::set<process::UPID>& pids)
155  : ProcessBase(process::ID::generate("log-network"))
156  {
157  set(pids);
158  }
159 
160  void add(const process::UPID& pid)
161  {
162  // Link in order to keep a socket open (more efficient).
163  //
164  // We force a reconnect to avoid sending on a "stale" socket. In
165  // general when linking to a remote process, the underlying TCP
166  // connection may become "stale". RFC 793 refers to this as a
167  // "half-open" connection: the RST is not sent upon the death
168  // of the peer and a RST will only be received once further
169  // data is sent on the socket.
170  //
171  // "Half-open" (aka "stale") connections are typically addressed
172  // via keep-alives (see RFC 1122 4.2.3.6) to periodically probe
173  // the connection. In this case, we can rely on the (re-)addition
174  // of the network member to create a new connection.
175  //
176  // See MESOS-5576 for a scenario where reconnecting helps avoid
177  // dropped messages.
178  link(pid, RemoteConnection::RECONNECT);
179 
180  pids.insert(pid);
181 
182  // Update any pending watches.
183  update();
184  }
185 
186  void remove(const process::UPID& pid)
187  {
188  // TODO(benh): unlink(pid);
189  pids.erase(pid);
190 
191  // Update any pending watches.
192  update();
193  }
194 
195  void set(const std::set<process::UPID>& _pids)
196  {
197  pids.clear();
198  foreach (const process::UPID& pid, _pids) {
199  add(pid); // Also does a link.
200  }
201 
202  // Update any pending watches.
203  update();
204  }
205 
207  {
208  if (satisfied(size, mode)) {
209  return pids.size();
210  }
211 
212  Watch* watch = new Watch(size, mode);
213  watches.push_back(watch);
214 
215  // TODO(jieyu): Consider deleting 'watch' if the returned future
216  // is discarded by the user.
217  return watch->promise.future();
218  }
219 
220  // Sends a request to each of the group members and returns a set
221  // of futures that represent their responses.
222  template <typename Req, typename Res>
223  std::set<process::Future<Res>> broadcast(
224  const Protocol<Req, Res>& protocol,
225  const Req& req,
226  const std::set<process::UPID>& filter)
227  {
228  std::set<process::Future<Res>> futures;
229  typename std::set<process::UPID>::const_iterator iterator;
230  for (iterator = pids.begin(); iterator != pids.end(); ++iterator) {
231  const process::UPID& pid = *iterator;
232  if (filter.count(pid) == 0) {
233  futures.insert(protocol(pid, req));
234  }
235  }
236  return futures;
237  }
238 
239  // Sends a request to each of the group members without expecting responses.
240  template <typename M>
242  const M& m,
243  const std::set<process::UPID>& filter)
244  {
245  std::set<process::UPID>::const_iterator iterator;
246  for (iterator = pids.begin(); iterator != pids.end(); ++iterator) {
247  const process::UPID& pid = *iterator;
248  if (filter.count(pid) == 0) {
249  // NOTE: Just send this message as the network process itself
250  // since we don't need to deliver responses back to the caller.
251  // Incoming messages addressed to the network are simply dropped.
252  send(pid, m);
253  }
254  }
255  return Nothing();
256  }
257 
258 protected:
259  void finalize() override
260  {
261  foreach (Watch* watch, watches) {
262  watch->promise.fail("Network is being terminated");
263  delete watch;
264  }
265  watches.clear();
266  }
267 
268 private:
269  struct Watch
270  {
271  Watch(size_t _size, Network::WatchMode _mode)
272  : size(_size), mode(_mode) {}
273 
274  size_t size;
277  };
278 
279  // Not copyable, not assignable.
281  NetworkProcess& operator=(const NetworkProcess&);
282 
283  // Notifies the change of the network.
284  void update()
285  {
286  const size_t size = watches.size();
287  for (size_t i = 0; i < size; i++) {
288  Watch* watch = watches.front();
289  watches.pop_front();
290 
291  if (satisfied(watch->size, watch->mode)) {
292  watch->promise.set(pids.size());
293  delete watch;
294  } else {
295  watches.push_back(watch);
296  }
297  }
298  }
299 
300  // Returns true if the current size of the network satisfies the
301  // constraint specified by 'size' and 'mode'.
302  bool satisfied(size_t size, Network::WatchMode mode)
303  {
304  switch (mode) {
305  case Network::EQUAL_TO:
306  return pids.size() == size;
308  return pids.size() != size;
309  case Network::LESS_THAN:
310  return pids.size() < size;
312  return pids.size() <= size;
314  return pids.size() > size;
316  return pids.size() >= size;
317  default:
318  LOG(FATAL) << "Invalid watch mode";
319  UNREACHABLE();
320  }
321  }
322 
323  std::set<process::UPID> pids;
324  std::deque<Watch*> watches;
325 };
326 
327 
329 {
330  process = new NetworkProcess();
332 }
333 
334 
335 inline Network::Network(const std::set<process::UPID>& pids)
336 {
337  process = new NetworkProcess(pids);
339 }
340 
341 
343 {
346  delete process;
347 }
348 
349 
350 inline void Network::add(const process::UPID& pid)
351 {
353 }
354 
355 
356 inline void Network::remove(const process::UPID& pid)
357 {
359 }
360 
361 
362 inline void Network::set(const std::set<process::UPID>& pids)
363 {
365 }
366 
367 
369  size_t size, Network::WatchMode mode) const
370 {
371  return process::dispatch(process, &NetworkProcess::watch, size, mode);
372 }
373 
374 
375 template <typename Req, typename Res>
377  const Protocol<Req, Res>& protocol,
378  const Req& req,
379  const std::set<process::UPID>& filter) const
380 {
381  return process::dispatch(process, &NetworkProcess::broadcast<Req, Res>,
382  protocol, req, filter);
383 }
384 
385 
386 template <typename M>
388  const M& m,
389  const std::set<process::UPID>& filter) const
390 {
391  // Need to disambiguate overloaded function.
392  Nothing (NetworkProcess::*broadcast)(const M&, const std::set<process::UPID>&)
393  = &NetworkProcess::broadcast<M>;
394 
395  return process::dispatch(process, broadcast, m, filter);
396 }
397 
398 
400  const std::string& servers,
401  const Duration& timeout,
402  const std::string& znode,
404  const std::set<process::UPID>& _base)
405  : group(servers, timeout, znode, auth),
406  base(_base)
407 {
408  // PIDs from the base set are in the network from beginning.
409  set(base);
410 
411  watch(std::set<zookeeper::Group::Membership>());
412 }
413 
414 
415 inline void ZooKeeperNetwork::watch(
416  const std::set<zookeeper::Group::Membership>& expected)
417 {
418  memberships = group.watch(expected);
419  memberships
420  .onAny(executor.defer(lambda::bind(&This::watched, this, lambda::_1)));
421 }
422 
423 
424 inline void ZooKeeperNetwork::watched(
425  const process::Future<std::set<zookeeper::Group::Membership>>&)
426 {
427  if (memberships.isFailed()) {
428  // We can't do much here, we could try creating another Group but
429  // that might just continue indefinitely, so we fail early
430  // instead. Note that Group handles all retryable/recoverable
431  // ZooKeeper errors internally.
432  LOG(FATAL) << "Failed to watch ZooKeeper group: " << memberships.failure();
433  }
434 
435  CHECK_READY(memberships); // Not expecting Group to discard futures.
436 
437  LOG(INFO) << "ZooKeeper group memberships changed";
438 
439  // Get data for each membership in order to convert them to PIDs.
440  std::vector<process::Future<Option<std::string>>> futures;
441 
442  foreach (const zookeeper::Group::Membership& membership, memberships.get()) {
443  futures.push_back(group.data(membership));
444  }
445 
446  process::collect(futures)
447  .after(Seconds(5),
448  [](process::Future<std::vector<Option<std::string>>> datas) {
449  // Handling time outs when collecting membership
450  // data. For now, a timeout is treated as a failure.
451  datas.discard();
452  return process::Failure("Timed out");
453  })
454  .onAny(executor.defer(lambda::bind(&This::collected, this, lambda::_1)));
455 }
456 
457 
458 inline void ZooKeeperNetwork::collected(
459  const process::Future<std::vector<Option<std::string>>>& datas)
460 {
461  if (datas.isFailed()) {
462  LOG(WARNING) << "Failed to get data for ZooKeeper group members: "
463  << datas.failure();
464 
465  // Try again later assuming empty group. Note that this does not
466  // remove any of the current group members.
467  watch(std::set<zookeeper::Group::Membership>());
468  return;
469  }
470 
471  CHECK_READY(datas); // Not expecting collect to discard futures.
472 
473  std::set<process::UPID> pids;
474 
475  foreach (const Option<std::string>& data, datas.get()) {
476  // Data could be None if the membership is gone before its
477  // content can be read.
478  if (data.isSome()) {
479  process::UPID pid(data.get());
480  CHECK(pid) << "Failed to parse '" << data.get() << "'";
481  pids.insert(pid);
482  }
483  }
484 
485  LOG(INFO) << "ZooKeeper group PIDs: " << stringify(pids);
486 
487  // Update the network. We make sure that the PIDs from the base set
488  // are always in the network.
489  set(pids | base);
490 
491  watch(memberships.get());
492 }
493 
494 #endif // __NETWORK_HPP__
std::string generate(const std::string &prefix="")
Returns &#39;prefix(N)&#39; where N represents the number of instances where the same prefix (wrt...
NetworkProcess()
Definition: network.hpp:152
void remove(const process::UPID &pid)
Definition: network.hpp:186
Definition: nothing.hpp:16
Try< bool > update(const std::string &link, const Handle &parent, uint16_t protocol, const action::Mirror &mirror)
Try< Bytes > size(const std::string &path, const FollowSymlink follow=FollowSymlink::FOLLOW_SYMLINK)
Definition: stat.hpp:130
const T & get() const
Definition: future.hpp:1294
ZooKeeperNetwork(const std::string &servers, const Duration &timeout, const std::string &znode, const Option< zookeeper::Authentication > &auth, const std::set< process::UPID > &base=std::set< process::UPID >())
Definition: network.hpp:399
void add(const process::UPID &pid)
Definition: network.hpp:350
Definition: future.hpp:668
NetworkProcess(const std::set< process::UPID > &pids)
Definition: network.hpp:154
Definition: network.hpp:59
event_base * base
void set(const std::set< process::UPID > &pids)
Definition: network.hpp:362
UPID spawn(ProcessBase *process, bool manage=false)
Spawn a new process.
Definition: duration.hpp:32
void terminate(const UPID &pid, bool inject=true)
Sends a TerminateEvent to the given process.
Definition: network.hpp:56
Future< std::vector< T > > collect(const std::vector< Future< T >> &futures)
Definition: collect.hpp:274
bool isSome() const
Definition: option.hpp:116
Definition: network.hpp:51
std::set< process::Future< Res > > broadcast(const Protocol< Req, Res > &protocol, const Req &req, const std::set< process::UPID > &filter)
Definition: network.hpp:223
process::Future< size_t > watch(size_t size, WatchMode mode=NOT_EQUAL_TO) const
Definition: network.hpp:368
_Deferred< F > defer(F &&f)
Definition: executor.hpp:54
void dispatch(const PID< T > &pid, void(T::*method)())
Definition: dispatch.hpp:174
Network()
Definition: network.hpp:328
Definition: network.hpp:149
Definition: network.hpp:58
An "untyped" PID, used to encapsulate the process ID for lower-layer abstractions (eg...
Definition: pid.hpp:39
const Future< T > & onAny(AnyCallback &&callback) const
Definition: future.hpp:1442
Definition: duration.hpp:207
virtual ~Network()
Definition: network.hpp:342
Definition: network.hpp:60
const T & get() const &
Definition: option.hpp:119
Protocol< PromiseRequest, PromiseResponse > promise
process::Future< std::set< process::Future< Res > > > broadcast(const Protocol< Req, Res > &protocol, const Req &req, const std::set< process::UPID > &filter=std::set< process::UPID >()) const
Definition: network.hpp:376
Definition: network.hpp:57
Definition: protobuf.hpp:108
Nothing broadcast(const M &m, const std::set< process::UPID > &filter)
Definition: network.hpp:241
void remove(const process::UPID &pid)
Definition: network.hpp:356
bool wait(const UPID &pid, const Duration &duration=Seconds(-1))
Wait for the process to exit for no more than the specified seconds.
Definition: protobuf.hpp:459
void finalize() override
Invoked when a process is terminated.
Definition: network.hpp:259
#define UNREACHABLE()
Definition: unreachable.hpp:22
#define CHECK_READY(expression)
Definition: check.hpp:29
Definition: grp.hpp:26
void add(const process::UPID &pid)
Definition: network.hpp:160
Definition: group.hpp:49
Definition: network.hpp:61
Definition: executor.hpp:48
WatchMode
Definition: network.hpp:54
Future< T > after(const Duration &duration, lambda::CallableOnce< Future< T >(const Future< T > &)> f) const
Definition: future.hpp:1692
Try< mode_t > mode(const std::string &path, const FollowSymlink follow=FollowSymlink::FOLLOW_SYMLINK)
Definition: stat.hpp:168
Try< Nothing > bind(int_fd s, const Address &address)
Definition: network.hpp:46
Definition: network.hpp:109
const std::string & failure() const
Definition: future.hpp:1320
std::string stringify(int flags)
Definition: executor.hpp:29
void set(const std::set< process::UPID > &_pids)
Definition: network.hpp:195
Try< std::set< pid_t > > pids()
Definition: freebsd.hpp:62
Definition: group.hpp:57
process::Future< size_t > watch(size_t size, Network::WatchMode mode)
Definition: network.hpp:206
void filter(Filter *filter)
bool isFailed() const
Definition: future.hpp:1229
Future< size_t > send(const int_fd &fd, const void *buf, size_t size)