Apache Mesos
sequence.hpp
Go to the documentation of this file.
1 // Licensed under the Apache License, Version 2.0 (the "License");
2 // you may not use this file except in compliance with the License.
3 // You may obtain a copy of the License at
4 //
5 // http://www.apache.org/licenses/LICENSE-2.0
6 //
7 // Unless required by applicable law or agreed to in writing, software
8 // distributed under the License is distributed on an "AS IS" BASIS,
9 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
10 // See the License for the specific language governing permissions and
11 // limitations under the License
12 
13 #ifndef __PROCESS_SEQUENCE_HPP__
14 #define __PROCESS_SEQUENCE_HPP__
15 
16 #include <glog/logging.h>
17 
18 #include <process/future.hpp>
19 #include <process/id.hpp>
20 #include <process/owned.hpp>
21 #include <process/process.hpp>
22 
23 #include <stout/lambda.hpp>
24 #include <stout/nothing.hpp>
25 
26 namespace process {
27 
28 // Forward declaration.
29 class SequenceProcess;
30 
31 // Provides an abstraction that serializes the execution of a sequence
32 // of callbacks.
33 class Sequence
34 {
35 public:
36  Sequence(const std::string& id = "sequence");
37  ~Sequence();
38 
39  // Registers a callback that will be invoked when all the futures
40  // returned by the previously registered callbacks are in
41  // non-pending status (i.e., ready, discarded or failed). Due to
42  // these semantics, we should avoid registering a callback which
43  // returns a future that could be in non-pending status for a long
44  // time because it will prevent all the subsequent callbacks from
45  // being invoked. This is analogous to the requirement that we want
46  // to avoid invoking blocking function in a libprocess handler. A
47  // user is allowed to cancel a registered callback by discarding the
48  // returned future. Other callbacks in this sequence will NOT be
49  // affected. The subsequent callbacks will not be invoked until the
50  // future is actually DISCARDED.
51  template <typename T>
52  Future<T> add(const lambda::function<Future<T>()>& callback);
53 
54 private:
55  // Not copyable, not assignable.
56  Sequence(const Sequence&);
57  Sequence& operator=(const Sequence&);
58 
59  SequenceProcess* process;
60 };
61 
62 
63 class SequenceProcess : public Process<SequenceProcess>
64 {
65 public:
66  SequenceProcess(const std::string& id)
67  : ProcessBase(ID::generate(id)),
68  last(Nothing()) {}
69 
70  template <typename T>
71  Future<T> add(const lambda::function<Future<T>()>& callback)
72  {
73  // This is the future that is used to notify the next callback
74  // (denoted by 'N' in the following graph).
76 
77  // This is the future that will be returned to the user (denoted
78  // by 'F' in the following graph).
80 
81  // We use a graph to show how we hook these futures. Each box in
82  // the graph represents a future. As mentioned above, 'F' denotes
83  // a future that will be returned to the user, and 'N' denotes a
84  // future that is used for notifying the next callback. Each arrow
85  // represents a "notification" relation. We will explain in detail
86  // what "notification" means in the following.
87  //
88  // 'last' 'last' 'last'
89  // | | |
90  // v v v
91  // +---+ +---+ +---+ +---+ +---+ +---+
92  // | N | | N |--+ | N | | N |--+ | N |--+ | N |
93  // +---+ +---+ | +---+ +---+ | +---+ | +---+
94  // ==> | ^ ==> | ^ | ^
95  // | | | | | |
96  // | +---+ | +---+ | +---+
97  // +-->| F | +-->| F | +-->| F |
98  // +---+ +---+ +---+
99  //
100  // Initial => Added one callback => Added two callbacks.
101 
102  // Setup the "notification" from 'F' to 'N' so that when a
103  // callback is done, signal the notifier ('N').
104  promise->future().onAny(lambda::bind(&completed, notifier));
105 
106  // Setup the "notification" from previous 'N' to 'F' so that when
107  // a notifier ('N') is set (indicating the previous callback has
108  // completed), invoke the next callback ('F') in the sequence.
109  last.onAny(lambda::bind(&notified<T>, promise, callback));
110 
111  // In the following, we setup the hooks so that if this sequence
112  // process is being terminated, all pending callbacks will be
113  // discarded. We use weak futures here to avoid cyclic dependencies.
114 
115  // Discard the future associated with this notifier.
116  notifier->future().onDiscard(
117  lambda::bind(
118  &internal::discard<T>,
119  WeakFuture<T>(promise->future())));
120 
121  // Discard the notifier associated with the previous future.
122  notifier->future().onDiscard(
123  lambda::bind(
124  &internal::discard<Nothing>,
125  WeakFuture<Nothing>(last)));
126 
127  // Update the 'last'.
128  last = notifier->future();
129 
130  return promise->future();
131  }
132 
133 protected:
134  virtual void finalize()
135  {
136  last.discard();
137 
138  // TODO(jieyu): Do we need to wait for the future of the last
139  // callback to be in DISCARDED state?
140  }
141 
142 private:
143  // Invoked when a callback is done.
144  static void completed(Owned<Promise<Nothing>> notifier)
145  {
146  notifier->set(Nothing());
147  }
148 
149  // Invoked when a notifier is set.
150  template <typename T>
151  static void notified(
152  Owned<Promise<T>> promise,
153  const lambda::function<Future<T>()>& callback)
154  {
155  if (promise->future().hasDiscard()) {
156  // The user has shown the intention to discard this callback
157  // (i.e., by calling future.discard()). As a result, we will
158  // just skip this callback.
159  promise->discard();
160  } else {
161  promise->associate(callback());
162  }
163  }
164 
165  Future<Nothing> last;
166 };
167 
168 
169 inline Sequence::Sequence(const std::string& id)
170 {
171  process = new SequenceProcess(id);
172  process::spawn(process);
173 }
174 
175 
177 {
178  process::terminate(process);
179  process::wait(process);
180  delete process;
181 }
182 
183 
184 template <typename T>
185 Future<T> Sequence::add(const lambda::function<Future<T>()>& callback)
186 {
187  return dispatch(process, &SequenceProcess::add<T>, callback);
188 }
189 
190 } // namespace process {
191 
192 #endif // __PROCESS_SEQUENCE_HPP__
std::string generate(const std::string &prefix="")
Returns &#39;prefix(N)&#39; where N represents the number of instances where the same prefix (wrt...
Definition: nothing.hpp:16
Definition: sequence.hpp:63
Future< T > add(const lambda::function< Future< T >()> &callback)
Definition: sequence.hpp:71
Future< T > add(const lambda::function< Future< T >()> &callback)
Definition: sequence.hpp:185
Definition: process.hpp:72
bool discard()
Definition: future.hpp:1173
~Sequence()
Definition: sequence.hpp:176
UPID spawn(ProcessBase *process, bool manage=false)
Spawn a new process.
void terminate(const UPID &pid, bool inject=true)
Sends a TerminateEvent to the given process.
Definition: sequence.hpp:33
void dispatch(const PID< T > &pid, void(T::*method)())
Definition: dispatch.hpp:174
virtual void finalize()
Invoked when a process is terminated.
Definition: sequence.hpp:134
const Future< T > & onAny(AnyCallback &&callback) const
Definition: future.hpp:1458
Protocol< PromiseRequest, PromiseResponse > promise
Definition: future.hpp:78
bool wait(const UPID &pid, const Duration &duration=Seconds(-1))
Wait for the process to exit for no more than the specified seconds.
Result< Process > process(pid_t pid)
Definition: freebsd.hpp:30
SequenceProcess(const std::string &id)
Definition: sequence.hpp:66
Try< Nothing > bind(int_fd s, const Address &address)
Definition: network.hpp:46
Definition: owned.hpp:35
Definition: process.hpp:493
Sequence(const std::string &id="sequence")
Definition: sequence.hpp:169
Definition: future.hpp:57