Apache Mesos
group.hpp
Go to the documentation of this file.
1 // Licensed to the Apache Software Foundation (ASF) under one
2 // or more contributor license agreements. See the NOTICE file
3 // distributed with this work for additional information
4 // regarding copyright ownership. The ASF licenses this file
5 // to you under the Apache License, Version 2.0 (the
6 // "License"); you may not use this file except in compliance
7 // with the License. You may obtain a copy of the License at
8 //
9 // http://www.apache.org/licenses/LICENSE-2.0
10 //
11 // Unless required by applicable law or agreed to in writing, software
12 // distributed under the License is distributed on an "AS IS" BASIS,
13 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 // See the License for the specific language governing permissions and
15 // limitations under the License
16 
17 #ifndef __MESOS_ZOOKEEPER_GROUP_HPP__
18 #define __MESOS_ZOOKEEPER_GROUP_HPP__
19 
20 #include <map>
21 #include <set>
22 #include <string>
23 
25 #include <mesos/zookeeper/url.hpp>
26 
27 #include <process/future.hpp>
28 #include <process/process.hpp>
29 #include <process/timer.hpp>
30 
31 #include <stout/check.hpp>
32 #include <stout/duration.hpp>
33 #include <stout/none.hpp>
34 #include <stout/option.hpp>
35 #include <stout/try.hpp>
36 
37 // Forward declarations.
38 class Watcher;
39 class ZooKeeper;
40 
41 namespace zookeeper {
42 
43 // Forward declaration.
44 class GroupProcess;
45 
46 // Represents a distributed group managed by ZooKeeper. A group is
47 // associated with a specific ZooKeeper path, and members are
48 // represented by ephemeral sequential nodes.
49 class Group
50 {
51 public:
52  // Represents a group membership. Note that we order memberships by
53  // membership id (that is, an older membership is ordered before a
54  // younger membership). In addition, we do not use the "cancelled"
55  // future to compare memberships so that two memberships created
56  // from different Group instances will still be considered the same.
57  struct Membership
58  {
59  bool operator==(const Membership& that) const
60  {
61  return sequence == that.sequence;
62  }
63 
64  bool operator!=(const Membership& that) const
65  {
66  return sequence != that.sequence;
67  }
68 
69  bool operator<(const Membership& that) const
70  {
71  return sequence < that.sequence;
72  }
73 
74  bool operator<=(const Membership& that) const
75  {
76  return sequence <= that.sequence;
77  }
78 
79  bool operator>(const Membership& that) const
80  {
81  return sequence > that.sequence;
82  }
83 
84  bool operator>=(const Membership& that) const
85  {
86  return sequence >= that.sequence;
87  }
88 
89  int32_t id() const
90  {
91  return sequence;
92  }
93 
95  {
96  return label_;
97  }
98 
99  // Returns a future that is only satisfied once this membership
100  // has been cancelled. In which case, the value of the future is
101  // true if you own this membership and cancelled it by invoking
102  // Group::cancel. Otherwise, the value of the future is false (and
103  // could signify cancellation due to a session expiration or
104  // operator error).
106  {
107  return cancelled_;
108  }
109 
110  private:
111  friend class GroupProcess; // Creates and manages memberships.
112 
113  Membership(int32_t _sequence,
114  const Option<std::string>& _label,
116  : sequence(_sequence), label_(_label), cancelled_(cancelled) {}
117 
118  const int32_t sequence;
119  const Option<std::string> label_;
120  process::Future<bool> cancelled_;
121  };
122 
123  // Constructs this group using the specified ZooKeeper servers (list
124  // of host:port) with the given session timeout at the specified znode.
125  Group(const std::string& servers,
126  const Duration& sessionTimeout,
127  const std::string& znode,
128  const Option<Authentication>& auth = None());
129  Group(const URL& url,
130  const Duration& sessionTimeout);
131 
132  ~Group();
133 
134  // Returns the result of trying to join a "group" in ZooKeeper.
135  // If "label" is provided the newly created znode contains "label_"
136  // as the prefix. If join is successful, an "owned" membership will
137  // be returned whose retrievable data will be a copy of the
138  // specified parameter. A membership is not "renewed" in the event
139  // of a ZooKeeper session expiration. Instead, a client should watch
140  // the group memberships and rejoin the group as appropriate.
142  const std::string& data,
143  const Option<std::string>& label = None());
144 
145  // Returns the result of trying to cancel a membership. Note that
146  // only memberships that are "owned" (see join) can be canceled.
147  process::Future<bool> cancel(const Membership& membership);
148 
149  // Returns the result of trying to fetch the data associated with a
150  // group membership.
151  // A None is returned if the specified membership doesn't exist,
152  // e.g., it can be removed before this call can read it content.
154 
155  // Returns a future that gets set when the group memberships differ
156  // from the "expected" memberships specified.
158  const std::set<Membership>& expected = std::set<Membership>());
159 
160  // Returns the current ZooKeeper session associated with this group,
161  // or none if no session currently exists.
163 
164  // Made public for testing purposes.
166 };
167 
168 
169 class GroupProcess : public process::Process<GroupProcess>
170 {
171 public:
172  GroupProcess(const std::string& servers,
173  const Duration& sessionTimeout,
174  const std::string& znode,
175  const Option<Authentication>& auth);
176 
177  GroupProcess(const URL& url,
178  const Duration& sessionTimeout);
179 
180  ~GroupProcess() override;
181 
182  void initialize() override;
183 
184  static const Duration RETRY_INTERVAL;
185 
186  // Helper function that returns the basename of the znode of
187  // the membership.
188  static std::string zkBasename(const Group::Membership& membership);
189 
190  // Group implementation.
192  const std::string& data,
193  const Option<std::string>& label);
196  const Group::Membership& membership);
198  const std::set<Group::Membership>& expected);
200 
201  // ZooKeeper events.
202  // Note that events from previous sessions are dropped.
203  void connected(int64_t sessionId, bool reconnect);
204  void reconnecting(int64_t sessionId);
205  void expired(int64_t sessionId);
206  void updated(int64_t sessionId, const std::string& path);
207  void created(int64_t sessionId, const std::string& path);
208  void deleted(int64_t sessionId, const std::string& path);
209 
210 private:
211  void startConnection();
212 
214  const std::string& data,
215  const Option<std::string>& label);
216  Result<bool> doCancel(const Group::Membership& membership);
217  Result<Option<std::string>> doData(const Group::Membership& membership);
218 
219  // Returns true if authentication is successful, false if the
220  // failure is retryable and Error otherwise.
221  Try<bool> authenticate();
222 
223  // Creates the group (which means creating its base path) on ZK.
224  // Returns true if successful, false if the failure is retryable
225  // and Error otherwise.
226  Try<bool> create();
227 
228  // Attempts to cache the current set of memberships.
229  // Returns true if successful, false if the failure is retryable
230  // and Error otherwise.
231  Try<bool> cache();
232 
233  // Synchronizes pending operations with ZooKeeper and also attempts
234  // to cache the current set of memberships if necessary.
235  // Returns true if successful, false if the failure is retryable
236  // and Error otherwise.
237  Try<bool> sync();
238 
239  // Updates any pending watches.
240  void update();
241 
242  // Generic retry method. This mechanism is "generic" in the sense
243  // that it is not specific to any particular operation, but rather
244  // attempts to perform all pending operations (including caching
245  // memberships if necessary).
246  void retry(const Duration& duration);
247 
248  void timedout(int64_t sessionId);
249 
250  // Aborts the group instance and fails all pending operations.
251  // The group then enters an error state and all subsequent
252  // operations will fail as well.
253  void abort(const std::string& message);
254 
255  // Potential non-retryable error set by abort().
257 
258  const std::string servers;
259 
260  // The session timeout requested by the client.
261  const Duration sessionTimeout;
262 
263  const std::string znode;
264 
265  Option<Authentication> auth; // ZooKeeper authentication.
266 
267  const ACL_vector acl; // Default ACL to use.
268 
269  Watcher* watcher;
270  ZooKeeper* zk;
271 
272  // Group connection state.
273  // Normal state transitions:
274  // DISCONNECTED -> CONNECTING -> CONNECTED -> AUTHENTICATED
275  // -> READY.
276  // Reconnection does not change the current state and the state is
277  // only reset to DISCONNECTED after session expiration. Therefore
278  // the client's "progress" in setting up the group is preserved
279  // across reconnections. This means authenticate() and create() are
280  // only successfully executed once in one ZooKeeper session.
281  enum State
282  {
283  DISCONNECTED, // The initial state.
284  CONNECTING, // ZooKeeper connecting.
285  CONNECTED, // ZooKeeper connected but before group setup.
286  AUTHENTICATED, // ZooKeeper connected and authenticated.
287  READY, // ZooKeeper connected, session authenticated and
288  // base path for the group created.
289  } state;
290 
291  struct Join
292  {
293  Join(const std::string& _data, const Option<std::string>& _label)
294  : data(_data), label(_label) {}
295  std::string data;
298  };
299 
300  struct Cancel
301  {
302  explicit Cancel(const Group::Membership& _membership)
303  : membership(_membership) {}
304  Group::Membership membership;
306  };
307 
308  struct Data
309  {
310  explicit Data(const Group::Membership& _membership)
311  : membership(_membership) {}
312  Group::Membership membership;
314  };
315 
316  struct Watch
317  {
318  explicit Watch(const std::set<Group::Membership>& _expected)
319  : expected(_expected) {}
320  std::set<Group::Membership> expected;
322  };
323 
324  struct {
325  std::queue<Join*> joins;
326  std::queue<Cancel*> cancels;
327  std::queue<Data*> datas;
328  std::queue<Watch*> watches;
329  } pending;
330 
331  // Indicates there is a pending delayed retry.
332  bool retrying;
333 
334  // Expected ZooKeeper sequence numbers (either owned/created by this
335  // group instance or not) and the promise we associate with their
336  // "cancellation" (i.e., no longer part of the group).
337  std::map<int32_t, process::Promise<bool>*> owned;
338  std::map<int32_t, process::Promise<bool>*> unowned;
339 
340  // Cache of owned + unowned, where 'None' represents an invalid
341  // cache and 'Some' represents a valid cache.
343 
344  // A timer that controls when we should give up on waiting for the
345  // current connection attempt to succeed and try to reconnect.
346  Option<process::Timer> connectTimer;
347 };
348 
349 } // namespace zookeeper {
350 
351 #endif // __MESOS_ZOOKEEPER_GROUP_HPP__
Definition: path.hpp:29
Definition: zookeeper.hpp:115
Try< bool > update(const std::string &link, const Handle &parent, uint16_t protocol, const action::Mirror &mirror)
bool operator==(const Membership &that) const
Definition: group.hpp:59
Option< std::string > label() const
Definition: group.hpp:94
process::Future< Option< std::string > > data(const Membership &membership)
bool pending(int signal)
Definition: signals.hpp:50
Definition: check.hpp:33
process::Future< Option< int64_t > > session()
static const Duration RETRY_INTERVAL
Definition: group.hpp:184
void expired(const std::shared_ptr< lambda::CallableOnce< Future< T >(const Future< T > &)>> &f, const std::shared_ptr< Latch > &latch, const std::shared_ptr< Promise< T >> &promise, const std::shared_ptr< Option< Timer >> &timer, const Future< T > &future)
Definition: future.hpp:1521
Definition: duration.hpp:32
Definition: check.hpp:30
Group(const std::string &servers, const Duration &sessionTimeout, const std::string &znode, const Option< Authentication > &auth=None())
int32_t id() const
Definition: group.hpp:89
This interface specifies the public interface an event handler class must implement.
Definition: zookeeper.hpp:59
process::Future< bool > cancelled() const
Definition: group.hpp:105
Definition: authentication.hpp:33
Try< Nothing > initialize(const Flags &flags)
Initialized state for support of systemd functions in this file.
friend class GroupProcess
Definition: group.hpp:111
Definition: future.hpp:74
std::queue< Join * > joins
Definition: group.hpp:325
Protocol< PromiseRequest, PromiseResponse > promise
bool operator!=(const Membership &that) const
Definition: group.hpp:64
process::Future< std::set< Membership > > watch(const std::set< Membership > &expected=std::set< Membership >())
bool operator<=(const Membership &that) const
Definition: group.hpp:74
bool operator<(const Membership &that) const
Definition: group.hpp:69
process::Future< Membership > join(const std::string &data, const Option< std::string > &label=None())
std::queue< Watch * > watches
Definition: group.hpp:328
Definition: none.hpp:27
Definition: group.hpp:49
bool operator>=(const Membership &that) const
Definition: group.hpp:84
std::string error(const std::string &msg, uint32_t code)
std::queue< Data * > datas
Definition: group.hpp:327
process::Future< bool > cancel(const Membership &membership)
Definition: url.hpp:46
bool operator>(const Membership &that) const
Definition: group.hpp:79
Try< Nothing > create(const std::string &hierarchy, const std::string &cgroup, bool recursive=false)
Definition: process.hpp:505
GroupProcess * process
Definition: group.hpp:165
std::queue< Cancel * > cancels
Definition: group.hpp:326
Definition: group.hpp:57
Definition: group.hpp:169