Apache Mesos
sequence.hpp
Go to the documentation of this file.
1 // Licensed under the Apache License, Version 2.0 (the "License");
2 // you may not use this file except in compliance with the License.
3 // You may obtain a copy of the License at
4 //
5 // http://www.apache.org/licenses/LICENSE-2.0
6 //
7 // Unless required by applicable law or agreed to in writing, software
8 // distributed under the License is distributed on an "AS IS" BASIS,
9 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
10 // See the License for the specific language governing permissions and
11 // limitations under the License
12 
13 #ifndef __PROCESS_SEQUENCE_HPP__
14 #define __PROCESS_SEQUENCE_HPP__
15 
16 #include <glog/logging.h>
17 
18 #include <process/future.hpp>
19 #include <process/id.hpp>
20 #include <process/owned.hpp>
21 #include <process/process.hpp>
22 
23 #include <stout/lambda.hpp>
24 #include <stout/nothing.hpp>
25 
26 namespace process {
27 
28 // Forward declaration.
29 class SequenceProcess;
30 
31 // Provides an abstraction that serializes the execution of a sequence
32 // of callbacks.
33 class Sequence
34 {
35 public:
36  Sequence(const std::string& id = "sequence");
37  ~Sequence();
38 
39  // Registers a callback that will be invoked when all the futures
40  // returned by the previously registered callbacks are in
41  // non-pending status (i.e., ready, discarded or failed). Due to
42  // these semantics, we should avoid registering a callback which
43  // returns a future that could be in non-pending status for a long
44  // time because it will prevent all the subsequent callbacks from
45  // being invoked. This is analogous to the requirement that we want
46  // to avoid invoking blocking function in a libprocess handler. A
47  // user is allowed to cancel a registered callback by discarding the
48  // returned future. Other callbacks in this sequence will NOT be
49  // affected. The subsequent callbacks will not be invoked until the
50  // future is actually DISCARDED.
51  template <typename T>
52  Future<T> add(const lambda::function<Future<T>()>& callback);
53 
54 private:
55  // Not copyable, not assignable.
56  Sequence(const Sequence&);
57  Sequence& operator=(const Sequence&);
58 
60 };
61 
62 
63 class SequenceProcess : public Process<SequenceProcess>
64 {
65 public:
66  SequenceProcess(const std::string& id)
67  : ProcessBase(ID::generate(id)),
68  last(Nothing()) {}
69 
70  template <typename T>
71  Future<T> add(const lambda::function<Future<T>()>& callback)
72  {
73  // This is the future that is used to notify the next callback
74  // (denoted by 'N' in the following graph).
76 
77  // This is the future that will be returned to the user (denoted
78  // by 'F' in the following graph).
80 
81  // We use a graph to show how we hook these futures. Each box in
82  // the graph represents a future. As mentioned above, 'F' denotes
83  // a future that will be returned to the user, and 'N' denotes a
84  // future that is used for notifying the next callback. Each arrow
85  // represents a "notification" relation. We will explain in detail
86  // what "notification" means in the following.
87  //
88  // 'last' 'last' 'last'
89  // | | |
90  // v v v
91  // +---+ +---+ +---+ +---+ +---+ +---+
92  // | N | | N |--+ | N | | N |--+ | N |--+ | N |
93  // +---+ +---+ | +---+ +---+ | +---+ | +---+
94  // ==> | ^ ==> | ^ | ^
95  // | | | | | |
96  // | +---+ | +---+ | +---+
97  // +-->| F | +-->| F | +-->| F |
98  // +---+ +---+ +---+
99  //
100  // Initial => Added one callback => Added two callbacks.
101 
102  // Setup the "notification" from 'F' to 'N' so that when a
103  // callback is done, signal the notifier ('N').
104  promise->future().onAny(lambda::bind(&completed, notifier));
105 
106  // Setup the "notification" from previous 'N' to 'F' so that when
107  // a notifier ('N') is set (indicating the previous callback has
108  // completed), invoke the next callback ('F') in the sequence.
109  last.onAny(lambda::bind(&notified<T>, promise, callback));
110 
111  // In the following, we setup the hooks so that if this sequence
112  // process is being terminated, all pending callbacks will be
113  // discarded. We use weak futures here to avoid cyclic dependencies.
114 
115  // Discard the future associated with this notifier.
116  //
117  // NOTE: When we discard the notifier future, any `onDiscard()` callbacks
118  // registered on `promise->future` will be invoked, but `onDiscard`
119  // callbacks registered on the future returned by `add()` will NOT be
120  // invoked. This is because currently discards do not propagate through
121  // `dispatch()`. In other words, users should be careful when registering
122  // `onDiscard` callbacks on the returned future.
123  //
124  // TODO(*): Propagate `onDiscard` through `dispatch`.
125  notifier->future().onDiscard(
126  lambda::bind(
127  &internal::discard<T>,
128  WeakFuture<T>(promise->future())));
129 
130  // Discard the notifier associated with the previous future.
131  notifier->future().onDiscard(
132  lambda::bind(
133  &internal::discard<Nothing>,
134  WeakFuture<Nothing>(last)));
135 
136  // Update the 'last'.
137  last = notifier->future();
138 
139  return promise->future();
140  }
141 
142 protected:
143  void finalize() override
144  {
145  last.discard();
146 
147  // TODO(jieyu): Do we need to wait for the future of the last
148  // callback to be in DISCARDED state?
149  }
150 
151 private:
152  // Invoked when a callback is done.
153  static void completed(Owned<Promise<Nothing>> notifier)
154  {
155  notifier->set(Nothing());
156  }
157 
158  // Invoked when a notifier is set.
159  template <typename T>
160  static void notified(
162  const lambda::function<Future<T>()>& callback)
163  {
164  if (promise->future().hasDiscard()) {
165  // The user has shown the intention to discard this callback
166  // (i.e., by calling future.discard()). As a result, we will
167  // just skip this callback.
168  promise->discard();
169  } else {
170  promise->associate(callback());
171  }
172  }
173 
174  Future<Nothing> last;
175 };
176 
177 
178 inline Sequence::Sequence(const std::string& id)
179 {
180  process = new SequenceProcess(id);
182 }
183 
184 
186 {
187  // We set `inject` to false so that the terminate message is added to the
188  // end of the sequence actor message queue. This guarantees that all `add()`
189  // calls which happened before the sequence destruction are processed.
190  // See MESOS-8741.
191  process::terminate(process, false);
193  delete process;
194 }
195 
196 
197 template <typename T>
198 Future<T> Sequence::add(const lambda::function<Future<T>()>& callback)
199 {
200  return dispatch(process, &SequenceProcess::add<T>, callback);
201 }
202 
203 } // namespace process {
204 
205 #endif // __PROCESS_SEQUENCE_HPP__
std::string generate(const std::string &prefix="")
Returns &#39;prefix(N)&#39; where N represents the number of instances where the same prefix (wrt...
Definition: nothing.hpp:16
Definition: sequence.hpp:63
Future< T > add(const lambda::function< Future< T >()> &callback)
Definition: sequence.hpp:71
Future< T > add(const lambda::function< Future< T >()> &callback)
Definition: sequence.hpp:198
Definition: process.hpp:72
~Sequence()
Definition: sequence.hpp:185
UPID spawn(ProcessBase *process, bool manage=false)
Spawn a new process.
void terminate(const UPID &pid, bool inject=true)
Sends a TerminateEvent to the given process.
Definition: sequence.hpp:33
void dispatch(const PID< T > &pid, void(T::*method)())
Definition: dispatch.hpp:174
Protocol< PromiseRequest, PromiseResponse > promise
Definition: future.hpp:79
bool wait(const UPID &pid, const Duration &duration=Seconds(-1))
Wait for the process to exit for no more than the specified seconds.
Definition: executor.hpp:48
SequenceProcess(const std::string &id)
Definition: sequence.hpp:66
void finalize() override
Invoked when a process is terminated.
Definition: sequence.hpp:143
Try< Nothing > bind(int_fd s, const Address &address)
Definition: network.hpp:46
Definition: owned.hpp:36
Definition: process.hpp:505
Sequence(const std::string &id="sequence")
Definition: sequence.hpp:178
Definition: future.hpp:58