Apache Mesos
future.hpp
Go to the documentation of this file.
1 // Licensed under the Apache License, Version 2.0 (the "License");
2 // you may not use this file except in compliance with the License.
3 // You may obtain a copy of the License at
4 //
5 // http://www.apache.org/licenses/LICENSE-2.0
6 //
7 // Unless required by applicable law or agreed to in writing, software
8 // distributed under the License is distributed on an "AS IS" BASIS,
9 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
10 // See the License for the specific language governing permissions and
11 // limitations under the License
12 
13 #ifndef __PROCESS_FUTURE_HPP__
14 #define __PROCESS_FUTURE_HPP__
15 
16 #include <assert.h>
17 
18 #include <atomic>
19 #include <list>
20 #include <memory> // TODO(benh): Replace shared_ptr with unique_ptr.
21 #include <ostream>
22 #include <set>
23 #include <type_traits>
24 #include <utility>
25 #include <vector>
26 
27 #include <glog/logging.h>
28 
29 #include <process/clock.hpp>
30 #include <process/latch.hpp>
31 #include <process/owned.hpp>
32 #include <process/pid.hpp>
33 #include <process/timer.hpp>
34 
35 #include <stout/abort.hpp>
36 #include <stout/check.hpp>
37 #include <stout/duration.hpp>
38 #include <stout/error.hpp>
39 #include <stout/lambda.hpp>
40 #include <stout/none.hpp>
41 #include <stout/option.hpp>
42 #include <stout/preprocessor.hpp>
43 #include <stout/result.hpp>
44 #include <stout/result_of.hpp>
45 #include <stout/synchronized.hpp>
46 #include <stout/try.hpp>
47 
48 #include <stout/os/strerror.hpp>
49 
50 namespace process {
51 
52 // Forward declarations (instead of include to break circular dependency).
53 template <typename G>
54 struct _Deferred;
55 
56 template <typename T>
57 class Future;
58 
59 
60 namespace internal {
61 
62 template <typename T>
63 struct wrap;
64 
65 template <typename T>
66 struct unwrap;
67 
68 } // namespace internal {
69 
70 
71 // Forward declaration of Promise.
72 template <typename T>
73 class Promise;
74 
75 
76 // Forward declaration of WeakFuture.
77 template <typename T>
78 class WeakFuture;
79 
80 // Forward declaration of Failure.
81 struct Failure;
82 struct ErrnoFailure;
83 
84 
85 // Definition of a "shared" future. A future can hold any
86 // copy-constructible value. A future is considered "shared" because
87 // by default a future can be accessed concurrently.
88 template <typename T>
89 class Future
90 {
91 public:
92  // Constructs a failed future.
93  static Future<T> failed(const std::string& message);
94 
95  Future();
96 
97  /*implicit*/ Future(const T& _t);
98  /*implicit*/ Future(T&& _t);
99 
100  template <typename U>
101  /*implicit*/ Future(const U& u);
102 
103  /*implicit*/ Future(const Failure& failure);
104 
105  /*implicit*/ Future(const ErrnoFailure& failure);
106 
107  /*implicit*/ Future(const Future<T>& that) = default;
108  /*implicit*/ Future(Future<T>&& that) = default;
109 
110  /*implicit*/ Future(const Try<T>& t);
111 
112  /*implicit*/ Future(const Try<Future<T>>& t);
113 
114  ~Future() = default;
115 
116  // Futures are assignable (and copyable). This results in the
117  // reference to the previous future data being decremented and a
118  // reference to 'that' being incremented.
119  Future<T>& operator=(const Future<T>& that) = default;
120  Future<T>& operator=(Future<T>&& that) = default;
121 
122  // Comparison operators useful for using futures in collections.
123  bool operator==(const Future<T>& that) const;
124  bool operator!=(const Future<T>& that) const;
125  bool operator<(const Future<T>& that) const;
126 
127  // Helpers to get the current state of this future.
128  bool isPending() const;
129  bool isReady() const;
130  bool isDiscarded() const;
131  bool isFailed() const;
132  bool isAbandoned() const;
133  bool hasDiscard() const;
134 
135  // Discards this future. Returns false if discard has already been
136  // called or the future has already completed, i.e., is ready,
137  // failed, or discarded. Note that a discard does not terminate any
138  // computation but rather acts as a suggestion or indication that
139  // the caller no longer cares about the result of some
140  // computation. The callee can decide whether or not to continue the
141  // computation on their own (where best practices are to attempt to
142  // stop the computation if possible). The callee can discard the
143  // computation via Promise::discard which completes a future, at
144  // which point, Future::isDiscarded is true (and the
145  // Future::onDiscarded callbacks are executed). Before that point,
146  // but after calling Future::discard, only Future::hasDiscard will
147  // return true and the Future::onDiscard callbacks will be invoked.
148  bool discard();
149 
150  // Waits for this future to become ready, discarded, or failed.
151  bool await(const Duration& duration = Seconds(-1)) const;
152 
153  // Return the value associated with this future, waits indefinitely
154  // until a value gets associated or until the future is discarded.
155  const T& get() const;
156  const T* operator->() const;
157 
158  // Returns the failure message associated with this future.
159  const std::string& failure() const;
160 
161  // Type of the callback functions that can get invoked when the
162  // future gets set, fails, or is discarded.
169 
170  // Installs callbacks for the specified events and returns a const
171  // reference to 'this' in order to easily support chaining.
172  const Future<T>& onAbandoned(AbandonedCallback&& callback) const;
173  const Future<T>& onDiscard(DiscardCallback&& callback) const;
174  const Future<T>& onReady(ReadyCallback&& callback) const;
175  const Future<T>& onFailed(FailedCallback&& callback) const;
176  const Future<T>& onDiscarded(DiscardedCallback&& callback) const;
177  const Future<T>& onAny(AnyCallback&& callback) const;
178 
179  // TODO(benh): Add onReady, onFailed, onAny for _Deferred<F> where F
180  // is not expected.
181 
182  template <typename F>
183  const Future<T>& onAbandoned(_Deferred<F>&& deferred) const
184  {
185  return onAbandoned(
186  std::move(deferred).operator lambda::CallableOnce<void()>());
187  }
188 
189  template <typename F>
190  const Future<T>& onDiscard(_Deferred<F>&& deferred) const
191  {
192  return onDiscard(
193  std::move(deferred).operator lambda::CallableOnce<void()>());
194  }
195 
196  template <typename F>
197  const Future<T>& onReady(_Deferred<F>&& deferred) const
198  {
199  return onReady(
200  std::move(deferred).operator lambda::CallableOnce<void(const T&)>());
201  }
202 
203  template <typename F>
204  const Future<T>& onFailed(_Deferred<F>&& deferred) const
205  {
206  return onFailed(std::move(deferred)
207  .operator lambda::CallableOnce<void(const std::string&)>());
208  }
209 
210  template <typename F>
211  const Future<T>& onDiscarded(_Deferred<F>&& deferred) const
212  {
213  return onDiscarded(
214  std::move(deferred).operator lambda::CallableOnce<void()>());
215  }
216 
217  template <typename F>
218  const Future<T>& onAny(_Deferred<F>&& deferred) const
219  {
220  return onAny(std::move(deferred)
221  .operator lambda::CallableOnce<void(const Future<T>&)>());
222  }
223 
224 private:
225  // We use the 'Prefer' and 'LessPrefer' structs as a way to prefer
226  // one function over the other when doing SFINAE for the 'onReady',
227  // 'onFailed', 'onAny', and 'then' functions. In each of these cases
228  // we prefer calling the version of the functor that takes in an
229  // argument (i.e., 'const T&' for 'onReady' and 'then' and 'const
230  // std::string&' for 'onFailed'), but we allow functors that don't
231  // care about the argument. We don't need to do this for
232  // 'onDiscard', 'onDiscarded' or 'onAbandoned' because they don't
233  // take an argument.
234  struct LessPrefer {};
235  struct Prefer : LessPrefer {};
236 
238  const Future<T>& onReady(F&& f, Prefer) const
239  {
240  return onReady(lambda::CallableOnce<void(const T&)>(
242  [](typename std::decay<F>::type&& f, const T& t) {
243  std::move(f)(t);
244  },
245  std::forward<F>(f),
246  lambda::_1)));
247  }
248 
249  // This is the less preferred `onReady`, we prefer the `onReady` method which
250  // has `f` taking a `const T&` parameter. Unfortunately, to complicate
251  // matters, if `F` is the result of a `std::bind` expression we need to SFINAE
252  // out this version of `onReady` and force the use of the preferred `onReady`
253  // (which works because `std::bind` will just ignore the `const T&` argument).
254  // This is necessary because Visual Studio 2015 doesn't support using the
255  // `std::bind` call operator with `result_of` as it's technically not a
256  // requirement by the C++ standard.
257  template <
258  typename F,
259  typename = typename result_of<typename std::enable_if<
261  F>::type()>::type>
262  const Future<T>& onReady(F&& f, LessPrefer) const
263  {
264  return onReady(lambda::CallableOnce<void(const T&)>(
266  [](typename std::decay<F>::type&& f, const T&) {
267  std::move(f)();
268  },
269  std::forward<F>(f),
270  lambda::_1)));
271  }
272 
274  const Future<T>& onFailed(F&& f, Prefer) const
275  {
276  return onFailed(lambda::CallableOnce<void(const std::string&)>(
278  [](typename std::decay<F>::type&& f, const std::string& message) {
279  std::move(f)(message);
280  },
281  std::forward<F>(f),
282  lambda::_1)));
283  }
284 
285  // Refer to the less preferred version of `onReady` for why these SFINAE
286  // conditions are necessary.
287  template <
288  typename F,
289  typename = typename result_of<typename std::enable_if<
291  F>::type()>::type>
292  const Future<T>& onFailed(F&& f, LessPrefer) const
293  {
294  return onFailed(lambda::CallableOnce<void(const std::string&)>(
296  [](typename std::decay<F>::type&& f, const std::string&) mutable {
297  std::move(f)();
298  },
299  std::forward<F>(f),
300  lambda::_1)));
301  }
302 
303  template <typename F, typename = typename result_of<F(const Future<T>&)>::type> // NOLINT(whitespace/line_length)
304  const Future<T>& onAny(F&& f, Prefer) const
305  {
306  return onAny(lambda::CallableOnce<void(const Future<T>&)>(
308  [](typename std::decay<F>::type&& f, const Future<T>& future) {
309  std::move(f)(future);
310  },
311  std::forward<F>(f),
312  lambda::_1)));
313  }
314 
315  // Refer to the less preferred version of `onReady` for why these SFINAE
316  // conditions are necessary.
317  template <
318  typename F,
319  typename = typename result_of<typename std::enable_if<
321  F>::type()>::type>
322  const Future<T>& onAny(F&& f, LessPrefer) const
323  {
324  return onAny(lambda::CallableOnce<void(const Future<T>&)>(
326  [](typename std::decay<F>::type&& f, const Future<T>&) {
327  std::move(f)();
328  },
329  std::forward<F>(f),
330  lambda::_1)));
331  }
332 
333 public:
334  template <typename F>
335  const Future<T>& onAbandoned(F&& f) const
336  {
337  return onAbandoned(lambda::CallableOnce<void()>(
339  [](typename std::decay<F>::type&& f) {
340  std::move(f)();
341  },
342  std::forward<F>(f))));
343  }
344 
345  template <typename F>
346  const Future<T>& onDiscard(F&& f) const
347  {
348  return onDiscard(lambda::CallableOnce<void()>(
350  [](typename std::decay<F>::type&& f) {
351  std::move(f)();
352  },
353  std::forward<F>(f))));
354  }
355 
356  template <typename F>
357  const Future<T>& onReady(F&& f) const
358  {
359  return onReady(std::forward<F>(f), Prefer());
360  }
361 
362  template <typename F>
363  const Future<T>& onFailed(F&& f) const
364  {
365  return onFailed(std::forward<F>(f), Prefer());
366  }
367 
368  template <typename F>
369  const Future<T>& onDiscarded(F&& f) const
370  {
371  return onDiscarded(lambda::CallableOnce<void()>(
373  [](typename std::decay<F>::type&& f) {
374  std::move(f)();
375  },
376  std::forward<F>(f))));
377  }
378 
379  template <typename F>
380  const Future<T>& onAny(F&& f) const
381  {
382  return onAny(std::forward<F>(f), Prefer());
383  }
384 
385  // Installs callbacks that get executed when this future is ready
386  // and associates the result of the callback with the future that is
387  // returned to the caller (which may be of a different type).
388  template <typename X>
389  Future<X> then(lambda::CallableOnce<Future<X>(const T&)> f) const;
390 
391  template <typename X>
392  Future<X> then(lambda::CallableOnce<X(const T&)> f) const;
393 
394  template <typename X>
396  {
397  return then(lambda::CallableOnce<Future<X>(const T&)>(
398  lambda::partial(std::move(f))));
399  }
400 
401  template <typename X>
403  {
404  return then(lambda::CallableOnce<X(const T&)>(
405  lambda::partial(std::move(f))));
406  }
407 
408 private:
409  template <
410  typename F,
411  typename X =
413  Future<X> then(_Deferred<F>&& f, Prefer) const
414  {
415  // note the then<X> is necessary to not have an infinite loop with
416  // then(F&& f)
417  return then<X>(
418  std::move(f).operator lambda::CallableOnce<Future<X>(const T&)>());
419  }
420 
421  // Refer to the less preferred version of `onReady` for why these SFINAE
422  // conditions are necessary.
423  template <
424  typename F,
425  typename X = typename internal::unwrap<
426  typename result_of<typename std::enable_if<
428  F>::type()>::type>::type>
429  Future<X> then(_Deferred<F>&& f, LessPrefer) const
430  {
431  return then<X>(std::move(f).operator lambda::CallableOnce<Future<X>()>());
432  }
433 
435  Future<X> then(F&& f, Prefer) const
436  {
437  return then<X>(
438  lambda::CallableOnce<Future<X>(const T&)>(std::forward<F>(f)));
439  }
440 
441  // Refer to the less preferred version of `onReady` for why these SFINAE
442  // conditions are necessary.
443  template <
444  typename F,
445  typename X = typename internal::unwrap<
446  typename result_of<typename std::enable_if<
448  F>::type()>::type>::type>
449  Future<X> then(F&& f, LessPrefer) const
450  {
451  return then<X>(lambda::CallableOnce<Future<X>()>(std::forward<F>(f)));
452  }
453 
454 public:
455  // NOTE: There are two bugs we're dealing with here.
456  // (1) GCC bug where the explicit use of `this->` is required in the
457  // trailing return type: gcc.gnu.org/bugzilla/show_bug.cgi?id=57543
458  // (2) VS 2017 RC bug where the explicit use of `this->` is disallowed.
459  //
460  // Since VS 2015 and 2017 RC both implement C++14's deduced return type for
461  // functions, we simply choose to use that on Windows.
462  //
463  // TODO(mpark): Remove the trailing return type once we get to C++14.
464  template <typename F>
465  auto then(F&& f) const
466 #ifndef __WINDOWS__
467  -> decltype(this->then(std::forward<F>(f), Prefer()))
468 #endif // __WINDOWS__
469  {
470  return then(std::forward<F>(f), Prefer());
471  }
472 
473  // Installs callbacks that get executed if this future is abandoned,
474  // is discarded, or failed.
475  template <typename F>
476  Future<T> recover(F&& f) const;
477 
478  template <typename F>
479  Future<T> recover(_Deferred<F>&& deferred) const
480  {
481  return recover(
482  std::move(deferred)
483  .operator lambda::CallableOnce<Future<T>(const Future<T>&)>());
484  }
485 
486  // TODO(benh): Considering adding a `rescue` function for rescuing
487  // abandoned futures.
488 
489  // Installs callbacks that get executed if this future completes
490  // because it failed.
492  lambda::CallableOnce<Future<T>(const Future<T>&)> f) const;
493 
494  // TODO(benh): Add overloads of 'repair' that don't require passing
495  // in a function that takes the 'const Future<T>&' parameter and use
496  // Prefer/LessPrefer to disambiguate.
497 
498  // Invokes the specified function after some duration if this future
499  // has not been completed (set, failed, or discarded). Note that
500  // this function is agnostic of discard semantics and while it will
501  // propagate discarding "up the chain" it will still invoke the
502  // specified callback after the specified duration even if 'discard'
503  // was called on the returned future.
505  const Duration& duration,
506  lambda::CallableOnce<Future<T>(const Future<T>&)> f) const;
507 
508  // TODO(benh): Add overloads of 'after' that don't require passing
509  // in a function that takes the 'const Future<T>&' parameter and use
510  // Prefer/LessPrefer to disambiguate.
511 
512 private:
513  template <typename U>
514  friend class Future;
515  friend class Promise<T>;
516  friend class WeakFuture<T>;
517  template <typename U>
518  friend std::ostream& operator<<(std::ostream&, const Future<U>&);
519 
520  enum State
521  {
522  PENDING,
523  READY,
524  FAILED,
525  DISCARDED,
526  };
527 
528  struct Data
529  {
530  Data();
531  ~Data() = default;
532 
533  void clearAllCallbacks();
534 
535  std::atomic_flag lock = ATOMIC_FLAG_INIT;
536  State state;
537  bool discard;
538  bool associated;
539  bool abandoned;
540 
541  // One of:
542  // 1. None, the state is PENDING or DISCARDED.
543  // 2. Some, the state is READY.
544  // 3. Error, the state is FAILED; 'error()' stores the message.
545  Result<T> result;
546 
547  std::vector<AbandonedCallback> onAbandonedCallbacks;
548  std::vector<DiscardCallback> onDiscardCallbacks;
549  std::vector<ReadyCallback> onReadyCallbacks;
550  std::vector<FailedCallback> onFailedCallbacks;
551  std::vector<DiscardedCallback> onDiscardedCallbacks;
552  std::vector<AnyCallback> onAnyCallbacks;
553  };
554 
555  // Abandons this future. Returns false if the future is already
556  // associated or no longer pending. Otherwise returns true and any
557  // Future::onAbandoned callbacks wil be run.
558  //
559  // If `propagating` is true then we'll abandon this future even if
560  // it has already been associated. This is important because
561  // `~Promise()` will try and abandon and we need to ignore that if
562  // the future has been associated since the promise will no longer
563  // be setting the future anyway (and is likely the reason it's being
564  // destructed, because it's useless). When the future that we've
565  // associated with gets abandoned, however, then we need to actually
566  // abandon this future too. Here's an example of this:
567  //
568  // 1: Owned<Promise<int>> promise1(new Promise<int>());
569  // 2: Owned<Promise<int>> promise2(new Promise<int>());
570  // 3:
571  // 4: Future<int> future1 = promise1->future();
572  // 5: Future<int> future2 = promise2->future();
573  // 6:
574  // 7: promise1->associate(future2);
575  // 8:
576  // 9: promise1.reset();
577  // 10:
578  // 11: assert(!future1.isAbandoned());
579  // 12:
580  // 13: promise2.reset();
581  // 14:
582  // 15: assert(future2.isAbandoned());
583  // 16: assert(future3.isAbandoned());
584  //
585  // At line 9 `~Promise()` will attempt to abandon the future by
586  // calling `abandon()` but since it's been associated we won't do
587  // anything. On line 13 the `onAbandoned()` callback will call
588  // `abandon(true)` and know we'll actually abandon the future
589  // because we're _propagating_ the abandon from the associated
590  // future.
591  //
592  // NOTE: this is an _INTERNAL_ function and should never be exposed
593  // or used outside of the implementation.
594  bool abandon(bool propagating = false);
595 
596  // Sets the value for this future, unless the future is already set,
597  // failed, or discarded, in which case it returns false.
598  bool set(const T& _t);
599  bool set(T&& _t);
600 
601  template <typename U>
602  bool _set(U&& _u);
603 
604  // Sets this future as failed, unless the future is already set,
605  // failed, or discarded, in which case it returns false.
606  bool fail(const std::string& _message);
607 
608  std::shared_ptr<Data> data;
609 };
610 
611 
612 namespace internal {
613 
614 // Helper for executing callbacks that have been registered.
615 //
616 // TODO(*): Invoke callbacks in another execution context.
617 template <typename C, typename... Arguments>
618 void run(std::vector<C>&& callbacks, Arguments&&... arguments)
619 {
620  for (size_t i = 0; i < callbacks.size(); ++i) {
621  std::move(callbacks[i])(std::forward<Arguments>(arguments)...);
622  }
623 }
624 
625 } // namespace internal {
626 
627 
628 // Represents a weak reference to a future. This class is used to
629 // break cyclic dependencies between futures.
630 template <typename T>
631 class WeakFuture
632 {
633 public:
634  explicit WeakFuture(const Future<T>& future);
635 
636  // Converts this weak reference to a concrete future. Returns none
637  // if the conversion is not successful.
638  Option<Future<T>> get() const;
639 
640 private:
641  std::weak_ptr<typename Future<T>::Data> data;
642 };
643 
644 
645 template <typename T>
647  : data(future.data) {}
648 
649 
650 template <typename T>
652 {
653  Future<T> future;
654  future.data = data.lock();
655 
656  if (future.data) {
657  return future;
658  }
659 
660  return None();
661 }
662 
663 
664 // Helper for creating failed futures.
665 struct Failure
666 {
667  explicit Failure(const std::string& _message) : message(_message) {}
668  explicit Failure(const Error& error) : message(error.message) {}
669 
670  const std::string message;
671 };
672 
673 
674 struct ErrnoFailure : public Failure
675 {
677 
678  explicit ErrnoFailure(int _code)
679  : Failure(os::strerror(_code)), code(_code) {}
680 
681  explicit ErrnoFailure(const std::string& message)
682  : ErrnoFailure(errno, message) {}
683 
684  ErrnoFailure(int _code, const std::string& message)
685  : Failure(message + ": " + os::strerror(_code)), code(_code) {}
686 
687  const int code;
688 };
689 
690 
691 // Forward declaration to use as friend below.
692 namespace internal {
693 template <typename U>
694 void discarded(Future<U> future);
695 } // namespace internal {
696 
697 
698 // TODO(benh): Make Promise a subclass of Future?
699 template <typename T>
700 class Promise
701 {
702 public:
703  Promise();
704  explicit Promise(const T& t);
705  virtual ~Promise();
706 
707  Promise(Promise<T>&& that);
708 
709  bool discard();
710  bool set(const T& _t);
711  bool set(T&& _t);
712  bool set(const Future<T>& future); // Alias for associate.
713  bool associate(const Future<T>& future);
714  bool fail(const std::string& message);
715 
716  // Returns a copy of the future associated with this promise.
717  Future<T> future() const;
718 
719 private:
720  template <typename U>
721  friend class Future;
722  template <typename U>
723  friend void internal::discarded(Future<U> future);
724 
725  template <typename U>
726  bool _set(U&& u);
727 
728  // Not copyable, not assignable.
729  Promise(const Promise<T>&);
730  Promise<T>& operator=(const Promise<T>&);
731 
732  // Helper for doing the work of actually discarding a future (called
733  // from Promise::discard as well as internal::discarded).
734  static bool discard(Future<T> future);
735 
736  Future<T> f;
737 };
738 
739 
740 template <>
741 class Promise<void>;
742 
743 
744 template <typename T>
745 class Promise<T&>;
746 
747 
748 namespace internal {
749 
750 // Discards a weak future. If the weak future is invalid (i.e., the
751 // future it references to has already been destroyed), this operation
752 // is treated as a no-op.
753 template <typename T>
754 void discard(WeakFuture<T> reference)
755 {
756  Option<Future<T>> future = reference.get();
757  if (future.isSome()) {
758  Future<T> future_ = future.get();
759  future_.discard();
760  }
761 }
762 
763 
764 // Helper for invoking Promise::discard in an onDiscarded callback
765 // (since the onDiscarded callback requires returning void we can't
766 // bind with Promise::discard).
767 template <typename T>
768 void discarded(Future<T> future)
769 {
770  Promise<T>::discard(future);
771 }
772 
773 } // namespace internal {
774 
775 
776 template <typename T>
778 {
779  // Need to "unset" `abandoned` since it gets set in the empty
780  // constructor for `Future`.
781  f.data->abandoned = false;
782 }
783 
784 
785 template <typename T>
787  : f(t) {}
788 
789 
790 template <typename T>
792 {
793  // Note that we don't discard the promise as we don't want to give
794  // the illusion that any computation hasn't started (or possibly
795  // finished) in the event that computation is "visible" by other
796  // means. However, we try and abandon the future if it hasn't been
797  // associated or set (or moved, i.e., `f.data` is true).
798  if (f.data) {
799  f.abandon();
800  }
801 }
802 
803 
804 template <typename T>
806  : f(std::move(that.f)) {}
807 
808 
809 template <typename T>
811 {
812  if (!f.data->associated) {
813  return discard(f);
814  }
815  return false;
816 }
817 
818 
819 template <typename T>
820 bool Promise<T>::set(T&& t)
821 {
822  return _set(std::move(t));
823 }
824 
825 
826 template <typename T>
827 bool Promise<T>::set(const T& t)
828 {
829  return _set(t);
830 }
831 
832 
833 template <typename T>
834 template <typename U>
835 bool Promise<T>::_set(U&& u)
836 {
837  if (!f.data->associated) {
838  return f.set(std::forward<U>(u));
839  }
840  return false;
841 }
842 
843 
844 template <typename T>
846 {
847  return associate(future);
848 }
849 
850 
851 template <typename T>
853 {
854  bool associated = false;
855 
856  synchronized (f.data->lock) {
857  // Don't associate if this promise has completed. Note that this
858  // does not include if Future::discard was called on this future
859  // since in that case that would still leave the future PENDING
860  // (note that we cover that case below).
861  if (f.data->state == Future<T>::PENDING && !f.data->associated) {
862  associated = f.data->associated = true;
863 
864  // After this point we don't allow 'f' to be completed via the
865  // promise since we've set 'associated' but Future::discard on
866  // 'f' might get called which will get propagated via the
867  // 'f.onDiscard' below. Note that we currently don't propagate a
868  // discard from 'future.onDiscard' but these semantics might
869  // change if/when we make 'f' and 'future' true aliases of one
870  // another.
871  }
872  }
873 
874  // Note that we do the actual associating after releasing the lock
875  // above to avoid deadlocking by attempting to require the lock
876  // within from invoking 'f.onDiscard' and/or 'f.set/fail' via the
877  // bind statements from doing 'future.onReady/onFailed'.
878  if (associated) {
879  // TODO(jieyu): Make 'f' a true alias of 'future'. Currently, only
880  // 'discard' is associated in both directions. In other words, if
881  // a future gets discarded, the other future will also get
882  // discarded. For 'set' and 'fail', they are associated only in
883  // one direction. In other words, calling 'set' or 'fail' on this
884  // promise will not affect the result of the future that we
885  // associated.
886  f.onDiscard(lambda::bind(&internal::discard<T>, WeakFuture<T>(future)));
887 
888  // Need to disambiguate for the compiler.
889  bool (Future<T>::*set)(const T&) = &Future<T>::set;
890 
891  future
892  .onReady(lambda::bind(set, f, lambda::_1))
893  .onFailed(lambda::bind(&Future<T>::fail, f, lambda::_1))
894  .onDiscarded(lambda::bind(&internal::discarded<T>, f))
895  .onAbandoned(lambda::bind(&Future<T>::abandon, f, true));
896  }
897 
898  return associated;
899 }
900 
901 
902 template <typename T>
903 bool Promise<T>::fail(const std::string& message)
904 {
905  if (!f.data->associated) {
906  return f.fail(message);
907  }
908  return false;
909 }
910 
911 
912 template <typename T>
914 {
915  return f;
916 }
917 
918 
919 // Internal helper utilities.
920 namespace internal {
921 
922 template <typename T>
923 struct wrap
924 {
925  typedef Future<T> type;
926 };
927 
928 
929 template <typename X>
930 struct wrap<Future<X>>
931 {
932  typedef Future<X> type;
933 };
934 
935 
936 template <typename T>
937 struct unwrap
938 {
939  typedef T type;
940 };
941 
942 
943 template <typename X>
944 struct unwrap<Future<X>>
945 {
946  typedef X type;
947 };
948 
949 
950 template <typename T>
951 void select(
952  const Future<T>& future,
953  std::shared_ptr<Promise<Future<T>>> promise)
954 {
955  // We never fail the future associated with our promise.
956  assert(!promise->future().isFailed());
957 
958  if (promise->future().isPending()) { // No-op if it's discarded.
959  if (future.isReady()) { // We only set the promise if a future is ready.
960  promise->set(future);
961  }
962  }
963 }
964 
965 } // namespace internal {
966 
967 
968 // TODO(benh): Move select and discard into 'futures' namespace.
969 
970 // Returns a future that captures any ready future in a set. Note that
971 // select DOES NOT capture a future that has failed or been discarded.
972 template <typename T>
974 {
975  std::shared_ptr<Promise<Future<T>>> promise(new Promise<Future<T>>());
976 
977  promise->future().onDiscard(
978  lambda::bind(&internal::discarded<Future<T>>, promise->future()));
979 
980  foreach (const Future<T>& future, futures) {
981  future.onAny([=](const Future<T>& f) {
982  internal::select(f, promise);
983  });
984  }
985 
986  return promise->future();
987 }
988 
989 
990 template <typename T>
991 void discard(const std::set<Future<T>>& futures)
992 {
993  foreach (Future<T> future, futures) { // Need a non-const copy to discard.
994  future.discard();
995  }
996 }
997 
998 
999 template <typename T>
1000 void discard(const std::list<Future<T>>& futures)
1001 {
1002  foreach (Future<T> future, futures) { // Need a non-const copy to discard.
1003  future.discard();
1004  }
1005 }
1006 
1007 
1008 template <typename T>
1010 {
1011  bool result = false;
1012 
1013  synchronized (future.data->lock) {
1014  if (future.data->state == Future<T>::PENDING) {
1015  future.data->state = Future<T>::DISCARDED;
1016  result = true;
1017  }
1018  }
1019 
1020  // Invoke all callbacks associated with this future being
1021  // DISCARDED. We don't need a lock because the state is now in
1022  // DISCARDED so there should not be any concurrent modifications to
1023  // the callbacks.
1024  if (result) {
1025  // NOTE: we rely on the fact that we have `future` to protect
1026  // ourselves from one of the callbacks erroneously deleting the
1027  // future. In `Future::_set()` and `Future::fail()` we have to
1028  // explicitly take a copy to protect ourselves.
1029  internal::run(std::move(future.data->onDiscardedCallbacks));
1030  internal::run(std::move(future.data->onAnyCallbacks), future);
1031 
1032  future.data->clearAllCallbacks();
1033  }
1034 
1035  return result;
1036 }
1037 
1038 
1039 template <typename T>
1040 Future<T> Future<T>::failed(const std::string& message)
1041 {
1042  Future<T> future;
1043  future.fail(message);
1044  return future;
1045 }
1046 
1047 
1048 template <typename T>
1050  : state(PENDING),
1051  discard(false),
1052  associated(false),
1053  abandoned(false),
1054  result(None()) {}
1055 
1056 
1057 template <typename T>
1059 {
1060  onAbandonedCallbacks.clear();
1061  onAnyCallbacks.clear();
1062  onDiscardCallbacks.clear();
1063  onDiscardedCallbacks.clear();
1064  onFailedCallbacks.clear();
1065  onReadyCallbacks.clear();
1066 }
1067 
1068 
1069 template <typename T>
1071  : data(new Data())
1072 {
1073  data->abandoned = true;
1074 }
1075 
1076 
1077 template <typename T>
1078 Future<T>::Future(const T& _t)
1079  : data(new Data())
1080 {
1081  set(_t);
1082 }
1083 
1084 
1085 template <typename T>
1087  : data(new Data())
1088 {
1089  set(std::move(_t));
1090 }
1091 
1092 
1093 template <typename T>
1094 template <typename U>
1096  : data(new Data())
1097 {
1098  set(u);
1099 }
1100 
1101 
1102 template <typename T>
1104  : data(new Data())
1105 {
1106  fail(failure.message);
1107 }
1108 
1109 
1110 template <typename T>
1112  : data(new Data())
1113 {
1114  fail(failure.message);
1115 }
1116 
1117 
1118 template <typename T>
1120  : data(new Data())
1121 {
1122  if (t.isSome()){
1123  set(t.get());
1124  } else {
1125  fail(t.error());
1126  }
1127 }
1128 
1129 
1130 template <typename T>
1132  : data(t.isSome() ? t->data : std::shared_ptr<Data>(new Data()))
1133 {
1134  if (!t.isSome()) {
1135  fail(t.error());
1136  }
1137 }
1138 
1139 
1140 template <typename T>
1141 bool Future<T>::operator==(const Future<T>& that) const
1142 {
1143  return data == that.data;
1144 }
1145 
1146 
1147 template <typename T>
1148 bool Future<T>::operator!=(const Future<T>& that) const
1149 {
1150  return !(*this == that);
1151 }
1152 
1153 
1154 template <typename T>
1155 bool Future<T>::operator<(const Future<T>& that) const
1156 {
1157  return data < that.data;
1158 }
1159 
1160 
1161 template <typename T>
1163 {
1164  bool result = false;
1165 
1166  std::vector<DiscardCallback> callbacks;
1167  synchronized (data->lock) {
1168  if (!data->discard && data->state == PENDING) {
1169  result = data->discard = true;
1170 
1171  callbacks.swap(data->onDiscardCallbacks);
1172  }
1173  }
1174 
1175  // Invoke all callbacks associated with doing a discard on this
1176  // future. The callbacks get destroyed when we exit from the
1177  // function.
1178  if (result) {
1179  internal::run(std::move(callbacks));
1180  }
1181 
1182  return result;
1183 }
1184 
1185 
1186 template <typename T>
1187 bool Future<T>::abandon(bool propagating)
1188 {
1189  bool result = false;
1190 
1191  std::vector<AbandonedCallback> callbacks;
1192  synchronized (data->lock) {
1193  if (!data->abandoned &&
1194  data->state == PENDING &&
1195  (!data->associated || propagating)) {
1196  result = data->abandoned = true;
1197 
1198  callbacks.swap(data->onAbandonedCallbacks);
1199  }
1200  }
1201 
1202  // Invoke all callbacks. The callbacks get destroyed when we exit
1203  // from the function.
1204  if (result) {
1205  internal::run(std::move(callbacks));
1206  }
1207 
1208  return result;
1209 }
1210 
1211 
1212 template <typename T>
1214 {
1215  return data->state == PENDING;
1216 }
1217 
1218 
1219 template <typename T>
1221 {
1222  return data->state == READY;
1223 }
1224 
1225 
1226 template <typename T>
1228 {
1229  return data->state == DISCARDED;
1230 }
1231 
1232 
1233 template <typename T>
1235 {
1236  return data->state == FAILED;
1237 }
1238 
1239 
1240 template <typename T>
1242 {
1243  return data->abandoned;
1244 }
1245 
1246 
1247 template <typename T>
1249 {
1250  return data->discard;
1251 }
1252 
1253 
1254 namespace internal {
1255 
1256 inline void awaited(Owned<Latch> latch)
1257 {
1258  latch->trigger();
1259 }
1260 
1261 } // namespace internal {
1262 
1263 
1264 template <typename T>
1265 bool Future<T>::await(const Duration& duration) const
1266 {
1267  // NOTE: We need to preemptively allocate the Latch on the stack
1268  // instead of lazily create it in the critical section below because
1269  // instantiating a Latch requires creating a new process (at the
1270  // time of writing this comment) which might need to do some
1271  // synchronization in libprocess which might deadlock if some other
1272  // code in libprocess is already holding a lock and then attempts to
1273  // do Promise::set (or something similar) that attempts to acquire
1274  // the lock that we acquire here. This is an artifact of using
1275  // Future/Promise within the implementation of libprocess.
1276  //
1277  // We mostly only call 'await' in tests so this should not be a
1278  // performance concern.
1279  Owned<Latch> latch(new Latch());
1280 
1281  bool pending = false;
1282 
1283  synchronized (data->lock) {
1284  if (data->state == PENDING) {
1285  pending = true;
1286  data->onAnyCallbacks.push_back(lambda::bind(&internal::awaited, latch));
1287  }
1288  }
1289 
1290  if (pending) {
1291  return latch->await(duration);
1292  }
1293 
1294  return true;
1295 }
1296 
1297 
1298 template <typename T>
1299 const T& Future<T>::get() const
1300 {
1301  if (!isReady()) {
1302  await();
1303  }
1304 
1305  CHECK(!isPending()) << "Future was in PENDING after await()";
1306  // We can't use CHECK_READY here due to check.hpp depending on future.hpp.
1307  if (!isReady()) {
1308  CHECK(!isFailed()) << "Future::get() but state == FAILED: " << failure();
1309  CHECK(!isDiscarded()) << "Future::get() but state == DISCARDED";
1310  }
1311 
1312  assert(data->result.isSome());
1313  return data->result.get();
1314 }
1315 
1316 
1317 template <typename T>
1318 const T* Future<T>::operator->() const
1319 {
1320  return &get();
1321 }
1322 
1323 
1324 template <typename T>
1325 const std::string& Future<T>::failure() const
1326 {
1327  if (data->state != FAILED) {
1328  ABORT("Future::failure() but state != FAILED");
1329  }
1330 
1331  CHECK_ERROR(data->result);
1332  return data->result.error();
1333 }
1334 
1335 
1336 template <typename T>
1337 const Future<T>& Future<T>::onAbandoned(AbandonedCallback&& callback) const
1338 {
1339  bool run = false;
1340 
1341  synchronized (data->lock) {
1342  if (data->abandoned) {
1343  run = true;
1344  } else if (data->state == PENDING) {
1345  data->onAbandonedCallbacks.emplace_back(std::move(callback));
1346  }
1347  }
1348 
1349  // TODO(*): Invoke callback in another execution context.
1350  if (run) {
1351  std::move(callback)(); // NOLINT(misc-use-after-move)
1352  }
1353 
1354  return *this;
1355 }
1356 
1357 
1358 template <typename T>
1359 const Future<T>& Future<T>::onDiscard(DiscardCallback&& callback) const
1360 {
1361  bool run = false;
1362 
1363  synchronized (data->lock) {
1364  if (data->discard) {
1365  run = true;
1366  } else if (data->state == PENDING) {
1367  data->onDiscardCallbacks.emplace_back(std::move(callback));
1368  }
1369  }
1370 
1371  // TODO(*): Invoke callback in another execution context.
1372  if (run) {
1373  std::move(callback)(); // NOLINT(misc-use-after-move)
1374  }
1375 
1376  return *this;
1377 }
1378 
1379 
1380 template <typename T>
1381 const Future<T>& Future<T>::onReady(ReadyCallback&& callback) const
1382 {
1383  bool run = false;
1384 
1385  synchronized (data->lock) {
1386  if (data->state == READY) {
1387  run = true;
1388  } else if (data->state == PENDING) {
1389  data->onReadyCallbacks.emplace_back(std::move(callback));
1390  }
1391  }
1392 
1393  // TODO(*): Invoke callback in another execution context.
1394  if (run) {
1395  std::move(callback)(data->result.get()); // NOLINT(misc-use-after-move)
1396  }
1397 
1398  return *this;
1399 }
1400 
1401 
1402 template <typename T>
1403 const Future<T>& Future<T>::onFailed(FailedCallback&& callback) const
1404 {
1405  bool run = false;
1406 
1407  synchronized (data->lock) {
1408  if (data->state == FAILED) {
1409  run = true;
1410  } else if (data->state == PENDING) {
1411  data->onFailedCallbacks.emplace_back(std::move(callback));
1412  }
1413  }
1414 
1415  // TODO(*): Invoke callback in another execution context.
1416  if (run) {
1417  std::move(callback)(data->result.error()); // NOLINT(misc-use-after-move)
1418  }
1419 
1420  return *this;
1421 }
1422 
1423 
1424 template <typename T>
1425 const Future<T>& Future<T>::onDiscarded(DiscardedCallback&& callback) const
1426 {
1427  bool run = false;
1428 
1429  synchronized (data->lock) {
1430  if (data->state == DISCARDED) {
1431  run = true;
1432  } else if (data->state == PENDING) {
1433  data->onDiscardedCallbacks.emplace_back(std::move(callback));
1434  }
1435  }
1436 
1437  // TODO(*): Invoke callback in another execution context.
1438  if (run) {
1439  std::move(callback)(); // NOLINT(misc-use-after-move)
1440  }
1441 
1442  return *this;
1443 }
1444 
1445 
1446 template <typename T>
1447 const Future<T>& Future<T>::onAny(AnyCallback&& callback) const
1448 {
1449  bool run = false;
1450 
1451  synchronized (data->lock) {
1452  if (data->state == PENDING) {
1453  data->onAnyCallbacks.emplace_back(std::move(callback));
1454  } else {
1455  run = true;
1456  }
1457  }
1458 
1459  // TODO(*): Invoke callback in another execution context.
1460  if (run) {
1461  std::move(callback)(*this); // NOLINT(misc-use-after-move)
1462  }
1463 
1464  return *this;
1465 }
1466 
1467 namespace internal {
1468 
1469 // NOTE: We need to name this 'thenf' versus 'then' to distinguish it
1470 // from the function 'then' whose parameter 'f' doesn't return a
1471 // Future since the compiler can't properly infer otherwise.
1472 template <typename T, typename X>
1474  std::unique_ptr<Promise<X>> promise,
1475  const Future<T>& future)
1476 {
1477  if (future.isReady()) {
1478  if (future.hasDiscard()) {
1479  promise->discard();
1480  } else {
1481  promise->associate(std::move(f)(future.get()));
1482  }
1483  } else if (future.isFailed()) {
1484  promise->fail(future.failure());
1485  } else if (future.isDiscarded()) {
1486  promise->discard();
1487  }
1488 }
1489 
1490 
1491 template <typename T, typename X>
1492 void then(lambda::CallableOnce<X(const T&)>&& f,
1493  std::unique_ptr<Promise<X>> promise,
1494  const Future<T>& future)
1495 {
1496  if (future.isReady()) {
1497  if (future.hasDiscard()) {
1498  promise->discard();
1499  } else {
1500  promise->set(std::move(f)(future.get()));
1501  }
1502  } else if (future.isFailed()) {
1503  promise->fail(future.failure());
1504  } else if (future.isDiscarded()) {
1505  promise->discard();
1506  }
1507 }
1508 
1509 
1510 template <typename T>
1511 void repair(
1513  std::unique_ptr<Promise<T>> promise,
1514  const Future<T>& future)
1515 {
1516  CHECK(!future.isPending());
1517  if (future.isFailed()) {
1518  promise->associate(std::move(f)(future));
1519  } else {
1520  promise->associate(future);
1521  }
1522 }
1523 
1524 
1525 template <typename T>
1526 void expired(
1527  const std::shared_ptr<lambda::CallableOnce<Future<T>(const Future<T>&)>>& f,
1528  const std::shared_ptr<Latch>& latch,
1529  const std::shared_ptr<Promise<T>>& promise,
1530  const std::shared_ptr<Option<Timer>>& timer,
1531  const Future<T>& future)
1532 {
1533  if (latch->trigger()) {
1534  // If this callback executed first (i.e., we triggered the latch)
1535  // then we want to clear out the timer so that we don't hold a
1536  // circular reference to `future` in it's own `onAny`
1537  // callbacks. See the comment in `Future::after`.
1538  *timer = None();
1539 
1540  // Note that we don't bother checking if 'future' has been
1541  // discarded (i.e., 'future.isDiscarded()' returns true) since
1542  // there is a race between when we make that check and when we
1543  // would invoke 'f(future)' so the callee 'f' should ALWAYS check
1544  // if the future has been discarded and rather than hiding a
1545  // non-deterministic bug we always call 'f' if the timer has
1546  // expired.
1547  promise->associate(std::move(*f)(future));
1548  }
1549 }
1550 
1551 
1552 template <typename T>
1553 void after(
1554  const std::shared_ptr<Latch>& latch,
1555  const std::shared_ptr<Promise<T>>& promise,
1556  const std::shared_ptr<Option<Timer>>& timer,
1557  const Future<T>& future)
1558 {
1559  CHECK(!future.isPending());
1560  if (latch->trigger()) {
1561  // If this callback executes first (i.e., we triggered the latch)
1562  // it must be the case that `timer` is still some and we can try
1563  // and cancel the timer.
1564  CHECK_SOME(*timer);
1565  Clock::cancel(timer->get());
1566 
1567  // We also force the timer to get deallocated so that there isn't
1568  // a cicular reference of the timer with itself which keeps around
1569  // a reference to the original future.
1570  *timer = None();
1571 
1572  promise->associate(future);
1573  }
1574 }
1575 
1576 } // namespace internal {
1577 
1578 
1579 template <typename T>
1580 template <typename X>
1582 {
1583  std::unique_ptr<Promise<X>> promise(new Promise<X>());
1584  Future<X> future = promise->future();
1585 
1587  &internal::thenf<T, X>, std::move(f), std::move(promise), lambda::_1);
1588 
1589  onAny(std::move(thenf));
1590 
1591  onAbandoned([=]() mutable {
1592  future.abandon();
1593  });
1594 
1595  // Propagate discarding up the chain. To avoid cyclic dependencies,
1596  // we keep a weak future in the callback.
1597  future.onDiscard(lambda::bind(&internal::discard<T>, WeakFuture<T>(*this)));
1598 
1599  return future;
1600 }
1601 
1602 
1603 template <typename T>
1604 template <typename X>
1606 {
1607  std::unique_ptr<Promise<X>> promise(new Promise<X>());
1608  Future<X> future = promise->future();
1609 
1611  &internal::then<T, X>, std::move(f), std::move(promise), lambda::_1);
1612 
1613  onAny(std::move(then));
1614 
1615  onAbandoned([=]() mutable {
1616  future.abandon();
1617  });
1618 
1619  // Propagate discarding up the chain. To avoid cyclic dependencies,
1620  // we keep a weak future in the callback.
1621  future.onDiscard(lambda::bind(&internal::discard<T>, WeakFuture<T>(*this)));
1622 
1623  return future;
1624 }
1625 
1626 
1627 template <typename T>
1628 template <typename F>
1630 {
1631  std::shared_ptr<Promise<T>> promise(new Promise<T>());
1632 
1633  const Future<T> future = *this;
1634 
1635  typedef decltype(std::move(f)(future)) R;
1636 
1637  std::shared_ptr<lambda::CallableOnce<R(const Future<T>&)>> callable(
1638  new lambda::CallableOnce<R(const Future<T>&)>(std::move(f)));
1639 
1640  onAny([=]() {
1641  if (future.isDiscarded() || future.isFailed()) {
1642  // We reset `discard` so that if a future gets returned from
1643  // `f(future)` we won't immediately discard it! We still want to
1644  // let the future get discarded later, however, hence if it gets
1645  // set again in the future it'll propagate to the returned
1646  // future.
1647  synchronized (promise->f.data->lock) {
1648  promise->f.data->discard = false;
1649  }
1650 
1651  promise->set(std::move(*callable)(future));
1652  } else {
1653  promise->associate(future);
1654  }
1655  });
1656 
1657  onAbandoned([=]() {
1658  // See comment above for why we reset `discard` here.
1659  synchronized (promise->f.data->lock) {
1660  promise->f.data->discard = false;
1661  }
1662  promise->set(std::move(*callable)(future));
1663  });
1664 
1665  // Propagate discarding up the chain. To avoid cyclic dependencies,
1666  // we keep a weak future in the callback.
1667  promise->future().onDiscard(
1668  lambda::bind(&internal::discard<T>, WeakFuture<T>(*this)));
1669 
1670  return promise->future();
1671 }
1672 
1673 
1674 template <typename T>
1676  lambda::CallableOnce<Future<T>(const Future<T>&)> f) const
1677 {
1678  std::unique_ptr<Promise<T>> promise(new Promise<T>());
1679  Future<T> future = promise->future();
1680 
1682  &internal::repair<T>, std::move(f), std::move(promise), lambda::_1));
1683 
1684  onAbandoned([=]() mutable {
1685  future.abandon();
1686  });
1687 
1688  // Propagate discarding up the chain. To avoid cyclic dependencies,
1689  // we keep a weak future in the callback.
1690  future.onDiscard(lambda::bind(&internal::discard<T>, WeakFuture<T>(*this)));
1691 
1692  return future;
1693 }
1694 
1695 
1696 template <typename T>
1698  const Duration& duration,
1699  lambda::CallableOnce<Future<T>(const Future<T>&)> f) const
1700 {
1701  // TODO(benh): Using a Latch here but Once might be cleaner.
1702  // Unfortunately, Once depends on Future so we can't easily use it
1703  // from here.
1704  std::shared_ptr<Latch> latch(new Latch());
1705  std::shared_ptr<Promise<T>> promise(new Promise<T>());
1706 
1707  // We need to control the lifetime of the timer we create below so
1708  // that we can force the timer to get deallocated after it
1709  // expires. The reason we want to force the timer to get deallocated
1710  // after it expires is because the timer's lambda has a copy of
1711  // `this` (i.e., a Future) and it's stored in the `onAny` callbacks
1712  // of `this` thus creating a circular reference. By storing a
1713  // `shared_ptr<Option<Timer>>` we're able to set the option to none
1714  // after the timer expires which will deallocate our copy of the
1715  // timer and leave the `Option<Timer>` stored in the lambda of the
1716  // `onAny` callback as none. Note that this is safe because the
1717  // `Latch` makes sure that only one of the callbacks will manipulate
1718  // the `shared_ptr<Option<Timer>>` so there isn't any concurrency
1719  // issues we have to worry about.
1720  std::shared_ptr<Option<Timer>> timer(new Option<Timer>());
1721 
1722  typedef lambda::CallableOnce<Future<T>(const Future<T>&)> F;
1723  std::shared_ptr<F> callable(new F(std::move(f)));
1724 
1725  // Set up a timer to invoke the callback if this future has not
1726  // completed. Note that we do not pass a weak reference for this
1727  // future as we don't want the future to get cleaned up and then
1728  // have the timer expire because then we wouldn't have a valid
1729  // future that we could pass to `f`! The reference to `this` that is
1730  // captured in the timer will get removed by setting the
1731  // `Option<Timer>` to none (see comment above) either if the timer
1732  // expires or if `this` completes and we cancel the timer (see
1733  // `internal::expired` and `internal::after` callbacks for where we
1734  // force the deallocation of our copy of the timer).
1735  *timer = Clock::timer(
1736  duration,
1737  lambda::bind(&internal::expired<T>, callable, latch, promise, timer,
1738  *this));
1739 
1740  onAny(lambda::bind(&internal::after<T>, latch, promise, timer, lambda::_1));
1741 
1742  onAbandoned([=]() {
1743  promise->future().abandon();
1744  });
1745 
1746  // Propagate discarding up the chain. To avoid cyclic dependencies,
1747  // we keep a weak future in the callback.
1748  promise->future().onDiscard(
1749  lambda::bind(&internal::discard<T>, WeakFuture<T>(*this)));
1750 
1751  return promise->future();
1752 }
1753 
1754 
1755 template <typename T>
1756 bool Future<T>::set(T&& t)
1757 {
1758  return _set(std::move(t));
1759 }
1760 
1761 
1762 template <typename T>
1763 bool Future<T>::set(const T& t)
1764 {
1765  return _set(t);
1766 }
1767 
1768 
1769 template <typename T>
1770 template <typename U>
1771 bool Future<T>::_set(U&& u)
1772 {
1773  bool result = false;
1774 
1775  synchronized (data->lock) {
1776  if (data->state == PENDING) {
1777  data->result = std::forward<U>(u);
1778  data->state = READY;
1779  result = true;
1780  }
1781  }
1782 
1783  // Invoke all callbacks associated with this future being READY. We
1784  // don't need a lock because the state is now in READY so there
1785  // should not be any concurrent modifications to the callbacks.
1786  if (result) {
1787  // Grab a copy of `data` just in case invoking the callbacks
1788  // erroneously attempts to delete this future.
1789  std::shared_ptr<typename Future<T>::Data> copy = data;
1790  internal::run(std::move(copy->onReadyCallbacks), copy->result.get());
1791  internal::run(std::move(copy->onAnyCallbacks), *this);
1792 
1793  copy->clearAllCallbacks();
1794  }
1795 
1796  return result;
1797 }
1798 
1799 
1800 template <typename T>
1801 bool Future<T>::fail(const std::string& _message)
1802 {
1803  bool result = false;
1804 
1805  synchronized (data->lock) {
1806  if (data->state == PENDING) {
1807  data->result = Result<T>(Error(_message));
1808  data->state = FAILED;
1809  result = true;
1810  }
1811  }
1812 
1813  // Invoke all callbacks associated with this future being FAILED. We
1814  // don't need a lock because the state is now in FAILED so there
1815  // should not be any concurrent modifications to the callbacks.
1816  if (result) {
1817  // Grab a copy of `data` just in case invoking the callbacks
1818  // erroneously attempts to delete this future.
1819  std::shared_ptr<typename Future<T>::Data> copy = data;
1820  internal::run(std::move(copy->onFailedCallbacks), copy->result.error());
1821  internal::run(std::move(copy->onAnyCallbacks), *this);
1822 
1823  copy->clearAllCallbacks();
1824  }
1825 
1826  return result;
1827 }
1828 
1829 
1830 template <typename T>
1831 std::ostream& operator<<(std::ostream& stream, const Future<T>& future)
1832 {
1833  const std::string suffix = future.data->discard ? " (with discard)" : "";
1834 
1835  switch (future.data->state) {
1836  case Future<T>::PENDING:
1837  if (future.data->abandoned) {
1838  return stream << "Abandoned" << suffix;
1839  }
1840  return stream << "Pending" << suffix;
1841 
1842  case Future<T>::READY:
1843  // TODO(benh): Stringify `Future<T>::get()` if it can be
1844  // stringified (will need to be SFINAE'ed appropriately).
1845  return stream << "Ready" << suffix;
1846 
1847  case Future<T>::FAILED:
1848  return stream << "Failed" << suffix << ": " << future.failure();
1849 
1850  case Future<T>::DISCARDED:
1851  return stream << "Discarded" << suffix;
1852  }
1853 
1854  return stream;
1855 }
1856 
1857 
1858 // Helper for setting a set of Promises.
1859 template <typename T>
1860 void setPromises(std::set<Promise<T>*>* promises, const T& t)
1861 {
1862  foreach (Promise<T>* promise, *promises) {
1863  promise->set(t);
1864  delete promise;
1865  }
1866  promises->clear();
1867 }
1868 
1869 
1870 // Helper for failing a set of Promises.
1871 template <typename T>
1872 void failPromises(std::set<Promise<T>*>* promises, const std::string& failure)
1873 {
1874  foreach (Promise<T>* promise, *promises) {
1875  promise->fail(failure);
1876  delete promise;
1877  }
1878  promises->clear();
1879 }
1880 
1881 
1882 // Helper for discarding a set of Promises.
1883 template <typename T>
1885 {
1886  foreach (Promise<T>* promise, *promises) {
1887  promise->discard();
1888  delete promise;
1889  }
1890  promises->clear();
1891 }
1892 
1893 
1894 // Helper for discarding an individual promise in the set.
1895 template <typename T>
1896 void discardPromises(std::set<Promise<T>*>* promises, const Future<T>& future)
1897 {
1898  foreach (Promise<T>* promise, *promises) {
1899  if (promise->future() == future) {
1900  promise->discard();
1901  promises->erase(promise);
1902  delete promise;
1903  return;
1904  }
1905  }
1906 }
1907 
1908 
1909 // Returns a future that will not propagate a discard through to the
1910 // future passed in as an argument. This can be very valuable if you
1911 // want to block some future from getting discarded.
1912 //
1913 // Example:
1914 //
1915 // Promise<int> promise;
1916 // Future<int> future = undiscardable(promise.future());
1917 // future.discard();
1918 // assert(!promise.future().hasDiscard());
1919 //
1920 // Or another example, when chaining futures:
1921 //
1922 // Future<int> future = undiscardable(
1923 // foo()
1924 // .then([]() { ...; })
1925 // .then([]() { ...; }));
1926 //
1927 // This will guarantee that a discard _will not_ propagate to `foo()`
1928 // or any of the futures returned from the invocations of `.then()`.
1929 template <typename T>
1931 {
1932  std::unique_ptr<Promise<T>> promise(new Promise<T>());
1933  Future<T> future_ = promise->future();
1934  future.onAny(lambda::partial(
1935  [](std::unique_ptr<Promise<T>> promise, const Future<T>& future) {
1936  promise->associate(future);
1937  },
1938  std::move(promise),
1939  lambda::_1));
1940  return future_;
1941 }
1942 
1943 
1944 // Decorator that for some callable `f` invokes
1945 // `undiscardable(f(args))` for some `args`. This is used by the
1946 // overload of `undiscardable()` that takes callables instead of a
1947 // specialization of `Future`.
1948 //
1949 // TODO(benh): Factor out a generic decorator pattern to be used in
1950 // other circumstances, e.g., to replace `_Deferred`.
1951 template <typename F>
1953 {
1954  template <
1955  typename G,
1956  typename std::enable_if<
1957  std::is_constructible<F, G>::value, int>::type = 0>
1958  UndiscardableDecorator(G&& g) : f(std::forward<G>(g)) {}
1959 
1960  template <typename... Args>
1961  auto operator()(Args&&... args)
1962  -> decltype(std::declval<F&>()(std::forward<Args>(args)...))
1963  {
1964  using Result =
1965  typename std::decay<decltype(f(std::forward<Args>(args)...))>::type;
1966 
1967  static_assert(
1969  "Expecting Future<T> to be returned from undiscarded(...)");
1970 
1971  return undiscardable(f(std::forward<Args>(args)...));
1972  }
1973 
1974  F f;
1975 };
1976 
1977 
1978 // An overload of `undiscardable()` above that takes and returns a
1979 // callable. The returned callable has decorated the provided callable
1980 // `f` such that when the returned callable is invoked it will in turn
1981 // invoke `undiscardable(f(args))` for some `args`. See
1982 // `UndiscardableDecorator` above for more details.
1983 //
1984 // Example:
1985 //
1986 // Future<int> future = foo()
1987 // .then(undiscardable([]() { ...; }));
1988 //
1989 // This guarantees that even if `future` is discarded the discard will
1990 // not propagate into the lambda passed into `.then()`.
1991 template <
1992  typename F,
1993  typename std::enable_if<
1995  int>::type = 0>
1997 {
1998  return UndiscardableDecorator<
1999  typename std::decay<F>::type>(std::forward<F>(f));
2000 }
2001 
2002 } // namespace process {
2003 
2004 #endif // __PROCESS_FUTURE_HPP__
void select(const Future< T > &future, std::shared_ptr< Promise< Future< T >>> promise)
Definition: future.hpp:951
Protocol< RecoverRequest, RecoverResponse > recover
bool isReady() const
Definition: future.hpp:1220
WeakFuture(const Future< T > &future)
Definition: future.hpp:646
std::string strerror(int errno_)
A thread-safe version of strerror.
Definition: strerror.hpp:30
Definition: errorbase.hpp:35
Definition: option.hpp:28
#define ABORT(...)
Definition: abort.hpp:40
F && f
Definition: defer.hpp:270
const Future< T > & onAny(F &&f) const
Definition: future.hpp:380
lambda::CallableOnce< void()> DiscardedCallback
Definition: future.hpp:167
T & get()&
Definition: try.hpp:73
const T & get() const
Definition: future.hpp:1299
bool operator==(const std::string &s, const UPID::ID &id)
Definition: pid.hpp:216
bool set(const T &_t)
Definition: future.hpp:827
T type
Definition: future.hpp:939
Failure(const std::string &_message)
Definition: future.hpp:667
bool pending(int signal)
Definition: signals.hpp:50
Definition: check.hpp:33
Future< T > type
Definition: future.hpp:925
void discarded(Future< T > future)
Definition: future.hpp:768
bool fail(const std::string &message)
Definition: future.hpp:903
Definition: future.hpp:665
void awaited(Owned< Latch > latch)
Definition: future.hpp:1256
const Future< T > & onDiscarded(F &&f) const
Definition: future.hpp:369
ErrnoFailure(const std::string &message)
Definition: future.hpp:681
lambda::CallableOnce< void()> DiscardCallback
Definition: future.hpp:164
static bool cancel(const Timer &timer)
const Future< T > & onDiscard(DiscardCallback &&callback) const
Definition: future.hpp:1359
internal::Partial< typename std::decay< F >::type, typename std::decay< Args >::type... > partial(F &&f, Args &&...args)
Definition: lambda.hpp:364
bool await(const Duration &duration=Seconds(-1)) const
Definition: future.hpp:1265
X type
Definition: future.hpp:946
bool await(const process::Future< T > &future, const Duration &duration)
Definition: gtest.hpp:67
const Future< T > & onFailed(FailedCallback &&callback) const
Definition: future.hpp:1403
Definition: type_utils.hpp:510
const Future< T > & onFailed(F &&f) const
Definition: future.hpp:363
#define CHECK_ERROR(expression)
Definition: check.hpp:58
bool operator!=(const Future< T > &that) const
Definition: future.hpp:1148
void thenf(lambda::CallableOnce< Future< X >(const T &)> &&f, std::unique_ptr< Promise< X >> promise, const Future< T > &future)
Definition: future.hpp:1473
Definition: posix_signalhandler.hpp:23
bool discard()
Definition: future.hpp:1162
UndiscardableDecorator(G &&g)
Definition: future.hpp:1958
void expired(const std::shared_ptr< lambda::CallableOnce< Future< T >(const Future< T > &)>> &f, const std::shared_ptr< Latch > &latch, const std::shared_ptr< Promise< T >> &promise, const std::shared_ptr< Option< Timer >> &timer, const Future< T > &future)
Definition: future.hpp:1526
void failPromises(std::set< Promise< T > * > *promises, const std::string &failure)
Definition: future.hpp:1872
Future< T > recover(_Deferred< F > &&deferred) const
Definition: future.hpp:479
Definition: duration.hpp:32
Definition: check.hpp:30
bool isPending() const
Definition: future.hpp:1213
Definition: future.hpp:66
bool isSome() const
Definition: option.hpp:115
bool operator==(const Future< T > &that) const
Definition: future.hpp:1141
Definition: future.hpp:63
void after(const std::shared_ptr< Latch > &latch, const std::shared_ptr< Promise< T >> &promise, const std::shared_ptr< Option< Timer >> &timer, const Future< T > &future)
Definition: future.hpp:1553
#define CHECK_SOME(expression)
Definition: check.hpp:50
bool isDiscarded() const
Definition: future.hpp:1227
ErrnoFailure()
Definition: future.hpp:676
void discardPromises(std::set< Promise< T > * > *promises)
Definition: future.hpp:1884
const Future< T > & onDiscarded(DiscardedCallback &&callback) const
Definition: future.hpp:1425
const Future< T > & onFailed(_Deferred< F > &&deferred) const
Definition: future.hpp:204
const Future< T > & onDiscard(_Deferred< F > &&deferred) const
Definition: future.hpp:190
F f
Definition: future.hpp:1974
const Future< T > & onAny(AnyCallback &&callback) const
Definition: future.hpp:1447
Definition: deferred.hpp:64
lambda::CallableOnce< void(const T &)> ReadyCallback
Definition: future.hpp:165
Definition: duration.hpp:207
bool associate(const Future< T > &future)
Definition: future.hpp:852
void discard(WeakFuture< T > reference)
Definition: future.hpp:754
Future< Future< T > > select(const std::set< Future< T >> &futures)
Definition: future.hpp:973
Definition: traits.hpp:17
Definition: future.hpp:73
Future< X > then(lambda::CallableOnce< X()> f) const
Definition: future.hpp:402
const Future< T > & onReady(F &&f) const
Definition: future.hpp:357
const Future< T > & onAbandoned(AbandonedCallback &&callback) const
Definition: future.hpp:1337
void repair(lambda::CallableOnce< Future< T >(const Future< T > &)> &&f, std::unique_ptr< Promise< T >> promise, const Future< T > &future)
Definition: future.hpp:1511
bool isSome() const
Definition: try.hpp:70
Failure(const Error &error)
Definition: future.hpp:668
const Future< T > & onDiscard(F &&f) const
Definition: future.hpp:346
Definition: future.hpp:674
const T & get() const &
Definition: option.hpp:118
Protocol< PromiseRequest, PromiseResponse > promise
Future< X > type
Definition: future.hpp:932
Future< T > repair(lambda::CallableOnce< Future< T >(const Future< T > &)> f) const
Definition: future.hpp:1675
Definition: future.hpp:78
void discard(const std::set< Future< T >> &futures)
Definition: future.hpp:991
static Try error(const E &e)
Definition: try.hpp:42
Future< R > run(R(*method)())
Definition: run.hpp:55
Try< std::vector< Entry > > list(const std::string &hierarchy, const std::string &cgroup)
Future< X > then(lambda::CallableOnce< Future< X >(const T &)> f) const
Definition: future.hpp:1581
Future< T > undiscardable(const Future< T > &future)
Definition: future.hpp:1930
void run(std::vector< C > &&callbacks, Arguments &&...arguments)
Definition: future.hpp:618
lambda::CallableOnce< void()> AbandonedCallback
Definition: future.hpp:163
bool operator<(const Future< T > &that) const
Definition: future.hpp:1155
Future()
Definition: future.hpp:1070
Definition: none.hpp:27
Definition: attributes.hpp:24
bool operator!=(const std::string &s, const UPID::ID &id)
Definition: pid.hpp:222
Future< X > then(lambda::CallableOnce< Future< X >()> f) const
Definition: future.hpp:395
std::string error(const std::string &msg, uint32_t code)
lambda::CallableOnce< void(const std::string &)> FailedCallback
Definition: future.hpp:166
Definition: executor.hpp:47
void then(lambda::CallableOnce< X(const T &)> &&f, std::unique_ptr< Promise< X >> promise, const Future< T > &future)
Definition: future.hpp:1492
const int code
Definition: future.hpp:687
Promise()
Definition: future.hpp:777
Future< T > future() const
Definition: future.hpp:913
ErrnoFailure(int _code, const std::string &message)
Definition: future.hpp:684
const std::string message
Definition: future.hpp:670
auto then(F &&f) const -> decltype(this->then(std::forward< F >(f), Prefer()))
Definition: future.hpp:465
bool discard()
Definition: future.hpp:810
Try< uint32_t > type(const std::string &path)
Future< T > after(const Duration &duration, lambda::CallableOnce< Future< T >(const Future< T > &)> f) const
Definition: future.hpp:1697
const Future< T > & onAbandoned(F &&f) const
Definition: future.hpp:335
bool isAbandoned() const
Definition: future.hpp:1241
ErrnoFailure(int _code)
Definition: future.hpp:678
void discarded(Future< U > future)
const Future< T > & onAny(_Deferred< F > &&deferred) const
Definition: future.hpp:218
static Timer timer(const Duration &duration, const lambda::function< void()> &thunk)
const Future< T > & onDiscarded(_Deferred< F > &&deferred) const
Definition: future.hpp:211
Try< Nothing > bind(int_fd s, const Address &address)
Definition: network.hpp:46
const std::string & failure() const
Definition: future.hpp:1325
Option< Future< T > > get() const
Definition: future.hpp:651
const Future< T > & onReady(_Deferred< F > &&deferred) const
Definition: future.hpp:197
Definition: owned.hpp:36
Definition: future.hpp:1952
Definition: latch.hpp:24
static Future< T > failed(const std::string &message)
Definition: future.hpp:1040
bool hasDiscard() const
Definition: future.hpp:1248
T copy(const T &t)
Definition: utils.hpp:21
const Future< T > & onReady(ReadyCallback &&callback) const
Definition: future.hpp:1381
virtual ~Promise()
Definition: future.hpp:791
const Future< T > & onAbandoned(_Deferred< F > &&deferred) const
Definition: future.hpp:183
Definition: lambda.hpp:414
auto operator()(Args &&...args) -> decltype(std::declval< F & >()(std::forward< Args >(args)...))
Definition: future.hpp:1961
void setPromises(std::set< Promise< T > * > *promises, const T &t)
Definition: future.hpp:1860
const T * operator->() const
Definition: future.hpp:1318
bool isFailed() const
Definition: future.hpp:1234
Definition: future.hpp:57
Future< T > recover(F &&f) const
Definition: future.hpp:1629