Apache Mesos
loop.hpp
Go to the documentation of this file.
1 // Licensed under the Apache License, Version 2.0 (the "License");
2 // you may not use this file except in compliance with the License.
3 // You may obtain a copy of the License at
4 //
5 // http://www.apache.org/licenses/LICENSE-2.0
6 //
7 // Unless required by applicable law or agreed to in writing, software
8 // distributed under the License is distributed on an "AS IS" BASIS,
9 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
10 // See the License for the specific language governing permissions and
11 // limitations under the License
12 
13 #ifndef __PROCESS_LOOP_HPP__
14 #define __PROCESS_LOOP_HPP__
15 
16 #include <mutex>
17 
18 #include <process/defer.hpp>
19 #include <process/dispatch.hpp>
20 #include <process/future.hpp>
21 #include <process/owned.hpp>
22 #include <process/pid.hpp>
23 #include <process/process.hpp>
24 
25 namespace process {
26 
27 // Provides an asynchronous "loop" abstraction. This abstraction is
28 // helpful for code that would have synchronously been written as a
29 // loop but asynchronously ends up being a recursive set of functions
30 // which depending on the compiler may result in a stack overflow
31 // (i.e., a compiler that can't do sufficient tail call optimization
32 // may add stack frames for each recursive call).
33 //
34 // The loop abstraction takes an optional PID `pid` and uses it as the
35 // execution context to run the loop. The implementation does a
36 // `defer` on this `pid` to "pop" the stack when it needs to
37 // asynchronously recurse. This also lets callers synchronize
38 // execution with other code dispatching and deferring using `pid`. If
39 // `None` is passed for `pid` then no `defer` is done and the stack
40 // will still "pop" but be restarted from the execution context
41 // wherever the blocked future is completed. This is usually very safe
42 // when that blocked future will be completed by the IO thread, but
43 // should not be used if it's completed by another process (because
44 // you'll block that process until the next time the loop blocks).
45 //
46 // The two functions passed to the loop represent the loop "iterate"
47 // step and the loop "body" step respectively. Each invocation of
48 // "iterate" returns the next value and the "body" returns whether or
49 // not to continue looping (as well as any other processing necessary
50 // of course). You can think of this synchronously as:
51 //
52 // bool condition = true;
53 // do {
54 // condition = body(iterate());
55 // } while (condition);
56 //
57 // Asynchronously using recursion this might look like:
58 //
59 // Future<Nothing> loop()
60 // {
61 // return iterate()
62 // .then([](T t) {
63 // return body(t)
64 // .then([](bool condition) {
65 // if (condition) {
66 // return loop();
67 // } else {
68 // return Nothing();
69 // }
70 // });
71 // });
72 // }
73 //
74 // And asynchronously using `pid` as the execution context:
75 //
76 // Future<Nothing> loop()
77 // {
78 // return iterate()
79 // .then(defer(pid, [](T t) {
80 // return body(t)
81 // .then(defer(pid, [](bool condition) {
82 // if (condition) {
83 // return loop();
84 // } else {
85 // return Nothing();
86 // }
87 // }));
88 // }));
89 // }
90 //
91 // And now what this looks like using `loop`:
92 //
93 // loop(pid,
94 // []() {
95 // return iterate();
96 // },
97 // [](T t) {
98 // return body(t);
99 // });
100 //
101 // One difference between the `loop` version of the "body" versus the
102 // other non-loop examples above is the return value is not `bool` or
103 // `Future<bool>` but rather `ControlFlow<V>` or
104 // `Future<ControlFlow<V>>`. This enables you to return values out of
105 // the loop via a `Break(...)`, for example:
106 //
107 // loop(pid,
108 // []() {
109 // return iterate();
110 // },
111 // [](T t) {
112 // if (finished(t)) {
113 // return Break(SomeValue());
114 // }
115 // return Continue();
116 // });
117 template <typename Iterate,
118  typename Body,
119  typename T = typename internal::unwrap<typename result_of<Iterate()>::type>::type, // NOLINT(whitespace/line_length)
120  typename CF = typename internal::unwrap<typename result_of<Body(T)>::type>::type, // NOLINT(whitespace/line_length)
121  typename V = typename CF::ValueType>
122 Future<V> loop(const Option<UPID>& pid, Iterate&& iterate, Body&& body);
123 
124 
125 // A helper for `loop` which creates a Process for us to provide an
126 // execution context for running the loop.
127 template <typename Iterate,
128  typename Body,
129  typename T = typename internal::unwrap<typename result_of<Iterate()>::type>::type, // NOLINT(whitespace/line_length)
130  typename CF = typename internal::unwrap<typename result_of<Body(T)>::type>::type, // NOLINT(whitespace/line_length)
131  typename V = typename CF::ValueType>
132 Future<V> loop(Iterate&& iterate, Body&& body)
133 {
134  // Have libprocess own and free the new `ProcessBase`.
135  UPID process = spawn(new ProcessBase(), true);
136 
137  return loop<Iterate, Body, T, CF, V>(
138  process,
139  std::forward<Iterate>(iterate),
140  std::forward<Body>(body))
141  .onAny([=]() {
142  terminate(process);
143  // NOTE: no need to `wait` or `delete` since the `spawn` above
144  // put libprocess in control of garbage collection.
145  });
146 }
147 
148 
149 // Generic "control flow" construct that is leveraged by
150 // implementations such as `loop`. At a high-level a `ControlFlow`
151 // represents some control flow statement such as `continue` or
152 // `break`, however, these statements can both have values or be
153 // value-less (i.e., these are meant to be composed "functionally" so
154 // the representation of `break` captures a value that "exits the
155 // current function" but the representation of `continue` does not).
156 //
157 // The pattern here is to define the type/representation of control
158 // flow statements within the `ControlFlow` class (e.g.,
159 // `ControlFlow::Continue` and `ControlFlow::Break`) but also provide
160 // "syntactic sugar" to make it easier to use at the call site (e.g.,
161 // the functions `Continue()` and `Break(...)`).
162 template <typename T>
164 {
165 public:
166  using ValueType = T;
167 
168  enum class Statement
169  {
170  CONTINUE,
171  BREAK
172  };
173 
174  class Continue
175  {
176  public:
177  Continue() = default;
178 
179  template <typename U>
180  operator ControlFlow<U>() const
181  {
183  }
184  };
185 
186  class Break
187  {
188  public:
189  Break(T t) : t(std::move(t)) {}
190 
191  template <typename U>
192  operator ControlFlow<U>() const &
193  {
195  }
196 
197  template <typename U>
198  operator ControlFlow<U>() &&
199  {
200  return ControlFlow<U>(ControlFlow<U>::Statement::BREAK, std::move(t));
201  }
202 
203  private:
204  T t;
205  };
206 
207  ControlFlow(Statement s, Option<T> t) : s(s), t(std::move(t)) {}
208 
209  Statement statement() const { return s; }
210 
211  T& value() & { return t.get(); }
212  const T& value() const & { return t.get(); }
213  T&& value() && { return t.get(); }
214  const T&& value() const && { return t.get(); }
215 
216 private:
217  Statement s;
218  Option<T> t;
219 };
220 
221 
222 // Provides "syntactic sugar" for creating a `ControlFlow::Continue`.
223 struct Continue
224 {
225  Continue() = default;
226 
227  template <typename T>
228  operator ControlFlow<T>() const
229  {
230  return typename ControlFlow<T>::Continue();
231  }
232 };
233 
234 
235 // Provides "syntactic sugar" for creating a `ControlFlow::Break`.
236 template <typename T>
238 {
240  std::forward<T>(t));
241 }
242 
243 
245 {
247 }
248 
249 
250 namespace internal {
251 
252 template <typename Iterate, typename Body, typename T, typename R>
253 class Loop : public std::enable_shared_from_this<Loop<Iterate, Body, T, R>>
254 {
255 public:
256  template <typename Iterate_, typename Body_>
257  static std::shared_ptr<Loop> create(
258  const Option<UPID>& pid,
259  Iterate_&& iterate,
260  Body_&& body)
261  {
262  return std::shared_ptr<Loop>(
263  new Loop(
264  pid,
265  std::forward<Iterate_>(iterate),
266  std::forward<Body_>(body)));
267  }
268 
269  std::shared_ptr<Loop> shared()
270  {
271  // Must fully specify `shared_from_this` because we're templated.
272  return std::enable_shared_from_this<Loop>::shared_from_this();
273  }
274 
275  std::weak_ptr<Loop> weak()
276  {
277  return std::weak_ptr<Loop>(shared());
278  }
279 
281  {
282  auto self = shared();
283  auto weak_self = weak();
284 
285  // Propagating discards:
286  //
287  // When the caller does a discard we need to propagate it to
288  // either the future returned from `iterate` or the future
289  // returned from `body`. One easy way to do this would be to add
290  // an `onAny` callback for every future returned from `iterate`
291  // and `body`, but that would be a slow memory leak that would
292  // grow over time, especially if the loop was actually
293  // infinite. Instead, we capture the current future that needs to
294  // be discarded within a `discard` function that we'll invoke when
295  // we get a discard. Because there is a race setting the `discard`
296  // function and reading it out to invoke we have to synchronize
297  // access using a mutex. An alternative strategy would be to use
298  // something like `atomic_load` and `atomic_store` with
299  // `shared_ptr` so that we can swap the current future(s)
300  // atomically.
301  promise.future().onDiscard([weak_self]() {
302  auto self = weak_self.lock();
303  if (self) {
304  // We need to make a copy of the current `discard` function so
305  // that we can invoke it outside of the `synchronized` block
306  // in the event that discarding invokes causes the `onAny`
307  // callbacks that we have added in `run` to execute which may
308  // deadlock attempting to re-acquire `mutex`!
309  std::function<void()> f = []() {};
310  synchronized (self->mutex) {
311  f = self->discard;
312  }
313  f();
314  }
315  });
316 
317  if (pid.isSome()) {
318  // Start the loop using `pid` as the execution context.
319  dispatch(pid.get(), [self]() {
320  self->run(self->iterate());
321  });
322 
323  // TODO(benh): Link with `pid` so that we can discard or abandon
324  // the promise in the event `pid` terminates and didn't discard
325  // us so that we can avoid any leaks (memory or otherwise).
326  } else {
327  run(iterate());
328  }
329 
330  return promise.future();
331  }
332 
333  void run(Future<T> next)
334  {
335  auto self = shared();
336 
337  // Reset `discard` so that we're not delaying cleanup of any
338  // captured futures longer than necessary.
339  //
340  // TODO(benh): Use `WeakFuture` in `discard` functions instead.
341  synchronized (mutex) {
342  discard = []() {};
343  }
344 
345  while (next.isReady()) {
346  Future<ControlFlow<R>> flow = body(next.get());
347  if (flow.isReady()) {
348  switch (flow->statement()) {
350  next = iterate();
351  continue;
352  }
354  promise.set(flow->value());
355  return;
356  }
357  }
358  } else {
359  auto continuation = [self](const Future<ControlFlow<R>>& flow) {
360  if (flow.isReady()) {
361  switch (flow->statement()) {
363  self->run(self->iterate());
364  break;
365  }
367  self->promise.set(flow->value());
368  break;
369  }
370  }
371  } else if (flow.isFailed()) {
372  self->promise.fail(flow.failure());
373  } else if (flow.isDiscarded()) {
374  self->promise.discard();
375  }
376  };
377 
378  if (pid.isSome()) {
379  flow.onAny(defer(pid.get(), continuation));
380  } else {
381  flow.onAny(continuation);
382  }
383 
384  if (!promise.future().hasDiscard()) {
385  synchronized (mutex) {
386  self->discard = [=]() mutable { flow.discard(); };
387  }
388  }
389 
390  // There's a race between when a discard occurs and the
391  // `discard` function gets invoked and therefore we must
392  // explicitly always do a discard. In addition, after a
393  // discard occurs we'll need to explicitly do discards for
394  // each new future that blocks.
395  if (promise.future().hasDiscard()) {
396  flow.discard();
397  }
398 
399  return;
400  }
401  }
402 
403  auto continuation = [self](const Future<T>& next) {
404  if (next.isReady()) {
405  self->run(next);
406  } else if (next.isFailed()) {
407  self->promise.fail(next.failure());
408  } else if (next.isDiscarded()) {
409  self->promise.discard();
410  }
411  };
412 
413  if (pid.isSome()) {
414  next.onAny(defer(pid.get(), continuation));
415  } else {
416  next.onAny(continuation);
417  }
418 
419  if (!promise.future().hasDiscard()) {
420  synchronized (mutex) {
421  discard = [=]() mutable { next.discard(); };
422  }
423  }
424 
425  // See comment above as to why we need to explicitly discard
426  // regardless of the path the if statement took above.
427  if (promise.future().hasDiscard()) {
428  next.discard();
429  }
430  }
431 
432 protected:
433  Loop(const Option<UPID>& pid, const Iterate& iterate, const Body& body)
434  : pid(pid), iterate(iterate), body(body) {}
435 
436  Loop(const Option<UPID>& pid, Iterate&& iterate, Body&& body)
437  : pid(pid), iterate(std::move(iterate)), body(std::move(body)) {}
438 
439 private:
440  const Option<UPID> pid;
441  Iterate iterate;
442  Body body;
443  Promise<R> promise;
444 
445  // In order to discard the loop safely we capture the future that
446  // needs to be discarded within the `discard` function and reading
447  // and writing that function with a mutex.
448  std::mutex mutex;
449  std::function<void()> discard = []() {};
450 };
451 
452 } // namespace internal {
453 
454 
455 template <typename Iterate, typename Body, typename T, typename CF, typename V>
456 Future<V> loop(const Option<UPID>& pid, Iterate&& iterate, Body&& body)
457 {
458  using Loop = internal::Loop<
459  typename std::decay<Iterate>::type,
460  typename std::decay<Body>::type,
461  T,
462  V>;
463 
464  std::shared_ptr<Loop> loop = Loop::create(
465  pid,
466  std::forward<Iterate>(iterate),
467  std::forward<Body>(body));
468 
469  return loop->start();
470 }
471 
472 } // namespace process {
473 
474 #endif // __PROCESS_LOOP_HPP__
ControlFlow(Statement s, Option< T > t)
Definition: loop.hpp:207
bool isReady() const
Definition: future.hpp:1231
Definition: loop.hpp:253
Definition: nothing.hpp:16
Definition: option.hpp:28
F && f
Definition: defer.hpp:270
static std::shared_ptr< Loop > create(const Option< UPID > &pid, Iterate_ &&iterate, Body_ &&body)
Definition: loop.hpp:257
const T & get() const
Definition: future.hpp:1310
bool set(const T &_t)
Definition: future.hpp:826
ControlFlow< typename std::decay< T >::type >::Break Break(T &&t)
Definition: loop.hpp:237
Continue()=default
Statement statement() const
Definition: loop.hpp:209
const Future< T > & onDiscard(DiscardCallback &&callback) const
Definition: future.hpp:1370
std::weak_ptr< Loop > weak()
Definition: loop.hpp:275
Statement
Definition: loop.hpp:168
Definition: process.hpp:72
bool discard()
Definition: future.hpp:1173
T ValueType
Definition: loop.hpp:166
UPID spawn(ProcessBase *process, bool manage=false)
Spawn a new process.
void terminate(const UPID &pid, bool inject=true)
Sends a TerminateEvent to the given process.
bool isDiscarded() const
Definition: future.hpp:1238
An &quot;untyped&quot; PID, used to encapsulate the process ID for lower-layer abstractions (eg...
Definition: pid.hpp:39
const Future< T > & onAny(AnyCallback &&callback) const
Definition: future.hpp:1458
const T & value() const &
Definition: loop.hpp:212
Loop(const Option< UPID > &pid, Iterate &&iterate, Body &&body)
Definition: loop.hpp:436
struct ev_loop * loop
Definition: loop.hpp:456
void run(Future< T > next)
Definition: loop.hpp:333
void dispatch(const UPID &pid, std::unique_ptr< lambda::CallableOnce< void(ProcessBase *)>> f, const Option< const std::type_info * > &functionType=None())
Definition: loop.hpp:186
Result< Process > process(pid_t pid)
Definition: freebsd.hpp:30
Definition: loop.hpp:174
Definition: none.hpp:27
Definition: loop.hpp:223
Future< T > future() const
Definition: future.hpp:912
const T && value() const &&
Definition: loop.hpp:214
Try< uint32_t > type(const std::string &path)
T && value()&&
Definition: loop.hpp:213
Loop(const Option< UPID > &pid, const Iterate &iterate, const Body &body)
Definition: loop.hpp:433
Try< Nothing > create(const std::string &hierarchy, const std::string &cgroup, bool recursive=false)
std::shared_ptr< Loop > shared()
Definition: loop.hpp:269
const std::string & failure() const
Definition: future.hpp:1336
Break(T t)
Definition: loop.hpp:189
T & value()&
Definition: loop.hpp:211
Deferred< void()> defer(const PID< T > &pid, void(T::*method)())
Definition: defer.hpp:35
bool hasDiscard() const
Definition: future.hpp:1259
Definition: loop.hpp:163
Future< R > start()
Definition: loop.hpp:280
bool isFailed() const
Definition: future.hpp:1245
Definition: future.hpp:57