Apache Mesos
watcher.hpp
Go to the documentation of this file.
1 // Licensed to the Apache Software Foundation (ASF) under one
2 // or more contributor license agreements. See the NOTICE file
3 // distributed with this work for additional information
4 // regarding copyright ownership. The ASF licenses this file
5 // to you under the Apache License, Version 2.0 (the
6 // "License"); you may not use this file except in compliance
7 // with the License. You may obtain a copy of the License at
8 //
9 // http://www.apache.org/licenses/LICENSE-2.0
10 //
11 // Unless required by applicable law or agreed to in writing, software
12 // distributed under the License is distributed on an "AS IS" BASIS,
13 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 // See the License for the specific language governing permissions and
15 // limitations under the License
16 
17 #ifndef __MESOS_ZOOKEEPER_WATCHER_HPP__
18 #define __MESOS_ZOOKEEPER_WATCHER_HPP__
19 
20 #include <stdint.h>
21 
22 #include <glog/logging.h>
23 
25 
26 #include <process/dispatch.hpp>
27 
28 
29 // A watcher which dispatches events to a process. Note that it is
30 // only "safe" to reuse an instance across ZooKeeper instances after a
31 // session expiration. TODO(benh): Add a 'reset/initialize' to the
32 // Watcher so that a single instance can be reused.
33 // NOTE: By the time the dispatched events are processed by 'pid',
34 // its session ID may have changed! Therefore, we pass the session ID
35 // for the event to allow the 'pid' Process to check for staleness.
36 template <typename T>
37 class ProcessWatcher : public Watcher
38 {
39 public:
40  explicit ProcessWatcher(const process::PID<T>& _pid)
41  : pid(_pid), reconnect(false) {}
42 
43  void process(
44  int type,
45  int state,
46  int64_t sessionId,
47  const std::string& path) override
48  {
49  if (type == ZOO_SESSION_EVENT) {
50  if (state == ZOO_CONNECTED_STATE) {
51  // Connected (initial or reconnect).
52  process::dispatch(pid, &T::connected, sessionId, reconnect);
53  // If this watcher gets reused then the next connected
54  // event shouldn't be perceived as a reconnect.
55  reconnect = false;
56  } else if (state == ZOO_CONNECTING_STATE) {
57  // The client library automatically reconnects, taking
58  // into account failed servers in the connection string,
59  // appropriately handling the "herd effect", etc.
60  process::dispatch(pid, &T::reconnecting, sessionId);
61  // TODO(benh): If this watcher gets reused then the next
62  // connected event will be perceived as a reconnect, but it
63  // should not.
64  reconnect = true;
65  } else if (state == ZOO_EXPIRED_SESSION_STATE) {
66  process::dispatch(pid, &T::expired, sessionId);
67  // If this watcher gets reused then the next connected
68  // event shouldn't be perceived as a reconnect.
69  reconnect = false;
70  } else {
71  LOG(FATAL) << "Unhandled ZooKeeper state (" << state << ")"
72  << " for ZOO_SESSION_EVENT";
73  }
74  } else if (type == ZOO_CHILD_EVENT) {
75  process::dispatch(pid, &T::updated, sessionId, path);
76  } else if (type == ZOO_CHANGED_EVENT) {
77  process::dispatch(pid, &T::updated, sessionId, path);
78  } else if (type == ZOO_CREATED_EVENT) {
79  process::dispatch(pid, &T::created, sessionId, path);
80  } else if (type == ZOO_DELETED_EVENT) {
81  process::dispatch(pid, &T::deleted, sessionId, path);
82  } else {
83  LOG(FATAL) << "Unhandled ZooKeeper event (" << type << ")"
84  << " in state (" << state << ")";
85  }
86  }
87 
88 private:
89  const process::PID<T> pid;
90  bool reconnect;
91 };
92 
93 #endif // __MESOS_ZOOKEEPER_WATCHER_HPP__
Definition: path.hpp:29
void process(int type, int state, int64_t sessionId, const std::string &path) override
Definition: watcher.hpp:43
void expired(const std::shared_ptr< lambda::CallableOnce< Future< T >(const Future< T > &)>> &f, const std::shared_ptr< Latch > &latch, const std::shared_ptr< Promise< T >> &promise, const std::shared_ptr< Option< Timer >> &timer, const Future< T > &future)
Definition: future.hpp:1521
void dispatch(const PID< T > &pid, void(T::*method)())
Definition: dispatch.hpp:174
This interface specifies the public interface an event handler class must implement.
Definition: zookeeper.hpp:59
ProcessWatcher(const process::PID< T > &_pid)
Definition: watcher.hpp:40
A "process identifier" used to uniquely identify a process when dispatching messages.
Definition: pid.hpp:289
Try< uint32_t > type(const std::string &path)
Definition: watcher.hpp:37