Apache Mesos
future.hpp
Go to the documentation of this file.
1 // Licensed under the Apache License, Version 2.0 (the "License");
2 // you may not use this file except in compliance with the License.
3 // You may obtain a copy of the License at
4 //
5 // http://www.apache.org/licenses/LICENSE-2.0
6 //
7 // Unless required by applicable law or agreed to in writing, software
8 // distributed under the License is distributed on an "AS IS" BASIS,
9 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
10 // See the License for the specific language governing permissions and
11 // limitations under the License
12 
13 #ifndef __PROCESS_FUTURE_HPP__
14 #define __PROCESS_FUTURE_HPP__
15 
16 #include <assert.h>
17 
18 #include <atomic>
19 #include <list>
20 #include <memory> // TODO(benh): Replace shared_ptr with unique_ptr.
21 #include <ostream>
22 #include <set>
23 #include <type_traits>
24 #include <utility>
25 #include <vector>
26 
27 #include <glog/logging.h>
28 
29 #include <process/clock.hpp>
30 #include <process/latch.hpp>
31 #include <process/owned.hpp>
32 #include <process/pid.hpp>
33 #include <process/timer.hpp>
34 
35 #include <stout/abort.hpp>
36 #include <stout/check.hpp>
37 #include <stout/duration.hpp>
38 #include <stout/error.hpp>
39 #include <stout/lambda.hpp>
40 #include <stout/none.hpp>
41 #include <stout/option.hpp>
42 #include <stout/preprocessor.hpp>
43 #include <stout/result.hpp>
44 #include <stout/result_of.hpp>
45 #include <stout/synchronized.hpp>
46 #include <stout/try.hpp>
47 
48 #include <stout/os/strerror.hpp>
49 
50 namespace process {
51 
52 // Forward declarations (instead of include to break circular dependency).
53 template <typename G>
54 struct _Deferred;
55 
56 template <typename T>
57 class Future;
58 
59 
60 namespace internal {
61 
62 template <typename T>
63 struct wrap;
64 
65 template <typename T>
66 struct unwrap;
67 
68 } // namespace internal {
69 
70 
71 // Forward declaration of Promise.
72 template <typename T>
73 class Promise;
74 
75 
76 // Forward declaration of WeakFuture.
77 template <typename T>
78 class WeakFuture;
79 
80 // Forward declaration of Failure.
81 struct Failure;
82 struct ErrnoFailure;
83 
84 
85 // Definition of a "shared" future. A future can hold any
86 // copy-constructible value. A future is considered "shared" because
87 // by default a future can be accessed concurrently.
88 template <typename T>
89 class Future
90 {
91 public:
92  // Constructs a failed future.
93  static Future<T> failed(const std::string& message);
94 
95  Future();
96 
97  /*implicit*/ Future(const T& _t);
98 
99  template <typename U>
100  /*implicit*/ Future(const U& u);
101 
102  /*implicit*/ Future(const Failure& failure);
103 
104  /*implicit*/ Future(const ErrnoFailure& failure);
105 
106  /*implicit*/ Future(const Future<T>& that);
107 
108  /*implicit*/ Future(Future<T>&& that);
109 
110  /*implicit*/ Future(const Try<T>& t);
111 
112  /*implicit*/ Future(const Try<Future<T>>& t);
113 
114  ~Future() = default;
115 
116  // Futures are assignable (and copyable). This results in the
117  // reference to the previous future data being decremented and a
118  // reference to 'that' being incremented.
119  Future<T>& operator=(const Future<T>& that);
120 
121  // Comparison operators useful for using futures in collections.
122  bool operator==(const Future<T>& that) const;
123  bool operator!=(const Future<T>& that) const;
124  bool operator<(const Future<T>& that) const;
125 
126  // Helpers to get the current state of this future.
127  bool isPending() const;
128  bool isReady() const;
129  bool isDiscarded() const;
130  bool isFailed() const;
131  bool isAbandoned() const;
132  bool hasDiscard() const;
133 
134  // Discards this future. Returns false if discard has already been
135  // called or the future has already completed, i.e., is ready,
136  // failed, or discarded. Note that a discard does not terminate any
137  // computation but rather acts as a suggestion or indication that
138  // the caller no longer cares about the result of some
139  // computation. The callee can decide whether or not to continue the
140  // computation on their own (where best practices are to attempt to
141  // stop the computation if possible). The callee can discard the
142  // computation via Promise::discard which completes a future, at
143  // which point, Future::isDiscarded is true (and the
144  // Future::onDiscarded callbacks are executed). Before that point,
145  // but after calling Future::discard, only Future::hasDiscard will
146  // return true and the Future::onDiscard callbacks will be invoked.
147  bool discard();
148 
149  // Waits for this future to become ready, discarded, or failed.
150  bool await(const Duration& duration = Seconds(-1)) const;
151 
152  // Return the value associated with this future, waits indefinitely
153  // until a value gets associated or until the future is discarded.
154  const T& get() const;
155  const T* operator->() const;
156 
157  // Returns the failure message associated with this future.
158  const std::string& failure() const;
159 
160  // Type of the callback functions that can get invoked when the
161  // future gets set, fails, or is discarded.
168 
169  // Installs callbacks for the specified events and returns a const
170  // reference to 'this' in order to easily support chaining.
171  const Future<T>& onAbandoned(AbandonedCallback&& callback) const;
172  const Future<T>& onDiscard(DiscardCallback&& callback) const;
173  const Future<T>& onReady(ReadyCallback&& callback) const;
174  const Future<T>& onFailed(FailedCallback&& callback) const;
175  const Future<T>& onDiscarded(DiscardedCallback&& callback) const;
176  const Future<T>& onAny(AnyCallback&& callback) const;
177 
178  // TODO(benh): Add onReady, onFailed, onAny for _Deferred<F> where F
179  // is not expected.
180 
181  template <typename F>
182  const Future<T>& onAbandoned(_Deferred<F>&& deferred) const
183  {
184  return onAbandoned(
185  std::move(deferred).operator lambda::CallableOnce<void()>());
186  }
187 
188  template <typename F>
189  const Future<T>& onDiscard(_Deferred<F>&& deferred) const
190  {
191  return onDiscard(
192  std::move(deferred).operator lambda::CallableOnce<void()>());
193  }
194 
195  template <typename F>
196  const Future<T>& onReady(_Deferred<F>&& deferred) const
197  {
198  return onReady(
199  std::move(deferred).operator lambda::CallableOnce<void(const T&)>());
200  }
201 
202  template <typename F>
203  const Future<T>& onFailed(_Deferred<F>&& deferred) const
204  {
205  return onFailed(std::move(deferred)
206  .operator lambda::CallableOnce<void(const std::string&)>());
207  }
208 
209  template <typename F>
210  const Future<T>& onDiscarded(_Deferred<F>&& deferred) const
211  {
212  return onDiscarded(
213  std::move(deferred).operator lambda::CallableOnce<void()>());
214  }
215 
216  template <typename F>
217  const Future<T>& onAny(_Deferred<F>&& deferred) const
218  {
219  return onAny(std::move(deferred)
220  .operator lambda::CallableOnce<void(const Future<T>&)>());
221  }
222 
223 private:
224  // We use the 'Prefer' and 'LessPrefer' structs as a way to prefer
225  // one function over the other when doing SFINAE for the 'onReady',
226  // 'onFailed', 'onAny', and 'then' functions. In each of these cases
227  // we prefer calling the version of the functor that takes in an
228  // argument (i.e., 'const T&' for 'onReady' and 'then' and 'const
229  // std::string&' for 'onFailed'), but we allow functors that don't
230  // care about the argument. We don't need to do this for
231  // 'onDiscard', 'onDiscarded' or 'onAbandoned' because they don't
232  // take an argument.
233  struct LessPrefer {};
234  struct Prefer : LessPrefer {};
235 
237  const Future<T>& onReady(F&& f, Prefer) const
238  {
239  return onReady(lambda::CallableOnce<void(const T&)>(
241  [](typename std::decay<F>::type&& f, const T& t) {
242  std::move(f)(t);
243  },
244  std::forward<F>(f),
245  lambda::_1)));
246  }
247 
248  // This is the less preferred `onReady`, we prefer the `onReady` method which
249  // has `f` taking a `const T&` parameter. Unfortunately, to complicate
250  // matters, if `F` is the result of a `std::bind` expression we need to SFINAE
251  // out this version of `onReady` and force the use of the preferred `onReady`
252  // (which works because `std::bind` will just ignore the `const T&` argument).
253  // This is necessary because Visual Studio 2015 doesn't support using the
254  // `std::bind` call operator with `result_of` as it's technically not a
255  // requirement by the C++ standard.
256  template <
257  typename F,
258  typename = typename result_of<typename std::enable_if<
260  F>::type()>::type>
261  const Future<T>& onReady(F&& f, LessPrefer) const
262  {
263  return onReady(lambda::CallableOnce<void(const T&)>(
265  [](typename std::decay<F>::type&& f, const T&) {
266  std::move(f)();
267  },
268  std::forward<F>(f),
269  lambda::_1)));
270  }
271 
273  const Future<T>& onFailed(F&& f, Prefer) const
274  {
275  return onFailed(lambda::CallableOnce<void(const std::string&)>(
277  [](typename std::decay<F>::type&& f, const std::string& message) {
278  std::move(f)(message);
279  },
280  std::forward<F>(f),
281  lambda::_1)));
282  }
283 
284  // Refer to the less preferred version of `onReady` for why these SFINAE
285  // conditions are necessary.
286  template <
287  typename F,
288  typename = typename result_of<typename std::enable_if<
290  F>::type()>::type>
291  const Future<T>& onFailed(F&& f, LessPrefer) const
292  {
293  return onFailed(lambda::CallableOnce<void(const std::string&)>(
295  [](typename std::decay<F>::type&& f, const std::string&) mutable {
296  std::move(f)();
297  },
298  std::forward<F>(f),
299  lambda::_1)));
300  }
301 
302  template <typename F, typename = typename result_of<F(const Future<T>&)>::type> // NOLINT(whitespace/line_length)
303  const Future<T>& onAny(F&& f, Prefer) const
304  {
305  return onAny(lambda::CallableOnce<void(const Future<T>&)>(
307  [](typename std::decay<F>::type&& f, const Future<T>& future) {
308  std::move(f)(future);
309  },
310  std::forward<F>(f),
311  lambda::_1)));
312  }
313 
314  // Refer to the less preferred version of `onReady` for why these SFINAE
315  // conditions are necessary.
316  template <
317  typename F,
318  typename = typename result_of<typename std::enable_if<
320  F>::type()>::type>
321  const Future<T>& onAny(F&& f, LessPrefer) const
322  {
323  return onAny(lambda::CallableOnce<void(const Future<T>&)>(
325  [](typename std::decay<F>::type&& f, const Future<T>&) {
326  std::move(f)();
327  },
328  std::forward<F>(f),
329  lambda::_1)));
330  }
331 
332 public:
333  template <typename F>
334  const Future<T>& onAbandoned(F&& f) const
335  {
336  return onAbandoned(lambda::CallableOnce<void()>(
338  [](typename std::decay<F>::type&& f) {
339  std::move(f)();
340  },
341  std::forward<F>(f))));
342  }
343 
344  template <typename F>
345  const Future<T>& onDiscard(F&& f) const
346  {
347  return onDiscard(lambda::CallableOnce<void()>(
349  [](typename std::decay<F>::type&& f) {
350  std::move(f)();
351  },
352  std::forward<F>(f))));
353  }
354 
355  template <typename F>
356  const Future<T>& onReady(F&& f) const
357  {
358  return onReady(std::forward<F>(f), Prefer());
359  }
360 
361  template <typename F>
362  const Future<T>& onFailed(F&& f) const
363  {
364  return onFailed(std::forward<F>(f), Prefer());
365  }
366 
367  template <typename F>
368  const Future<T>& onDiscarded(F&& f) const
369  {
370  return onDiscarded(lambda::CallableOnce<void()>(
372  [](typename std::decay<F>::type&& f) {
373  std::move(f)();
374  },
375  std::forward<F>(f))));
376  }
377 
378  template <typename F>
379  const Future<T>& onAny(F&& f) const
380  {
381  return onAny(std::forward<F>(f), Prefer());
382  }
383 
384  // Installs callbacks that get executed when this future is ready
385  // and associates the result of the callback with the future that is
386  // returned to the caller (which may be of a different type).
387  template <typename X>
388  Future<X> then(lambda::CallableOnce<Future<X>(const T&)> f) const;
389 
390  template <typename X>
391  Future<X> then(lambda::CallableOnce<X(const T&)> f) const;
392 
393  template <typename X>
395  {
396  return then(lambda::CallableOnce<Future<X>(const T&)>(
397  lambda::partial(std::move(f))));
398  }
399 
400  template <typename X>
402  {
403  return then(lambda::CallableOnce<X(const T&)>(
404  lambda::partial(std::move(f))));
405  }
406 
407 private:
408  template <
409  typename F,
410  typename X =
412  Future<X> then(_Deferred<F>&& f, Prefer) const
413  {
414  // note the then<X> is necessary to not have an infinite loop with
415  // then(F&& f)
416  return then<X>(
417  std::move(f).operator lambda::CallableOnce<Future<X>(const T&)>());
418  }
419 
420  // Refer to the less preferred version of `onReady` for why these SFINAE
421  // conditions are necessary.
422  template <
423  typename F,
424  typename X = typename internal::unwrap<
425  typename result_of<typename std::enable_if<
427  F>::type()>::type>::type>
428  Future<X> then(_Deferred<F>&& f, LessPrefer) const
429  {
430  return then<X>(std::move(f).operator lambda::CallableOnce<Future<X>()>());
431  }
432 
434  Future<X> then(F&& f, Prefer) const
435  {
436  return then<X>(
437  lambda::CallableOnce<Future<X>(const T&)>(std::forward<F>(f)));
438  }
439 
440  // Refer to the less preferred version of `onReady` for why these SFINAE
441  // conditions are necessary.
442  template <
443  typename F,
444  typename X = typename internal::unwrap<
445  typename result_of<typename std::enable_if<
447  F>::type()>::type>::type>
448  Future<X> then(F&& f, LessPrefer) const
449  {
450  return then<X>(lambda::CallableOnce<Future<X>()>(std::forward<F>(f)));
451  }
452 
453 public:
454  // NOTE: There are two bugs we're dealing with here.
455  // (1) GCC bug where the explicit use of `this->` is required in the
456  // trailing return type: gcc.gnu.org/bugzilla/show_bug.cgi?id=57543
457  // (2) VS 2017 RC bug where the explicit use of `this->` is disallowed.
458  //
459  // Since VS 2015 and 2017 RC both implement C++14's deduced return type for
460  // functions, we simply choose to use that on Windows.
461  //
462  // TODO(mpark): Remove the trailing return type once we get to C++14.
463  template <typename F>
464  auto then(F&& f) const
465 #ifndef __WINDOWS__
466  -> decltype(this->then(std::forward<F>(f), Prefer()))
467 #endif // __WINDOWS__
468  {
469  return then(std::forward<F>(f), Prefer());
470  }
471 
472  // Installs callbacks that get executed if this future is abandoned,
473  // is discarded, or failed.
474  template <typename F>
475  Future<T> recover(F&& f) const;
476 
477  template <typename F>
478  Future<T> recover(_Deferred<F>&& deferred) const
479  {
480  return recover(
481  std::move(deferred)
482  .operator lambda::CallableOnce<Future<T>(const Future<T>&)>());
483  }
484 
485  // TODO(benh): Considering adding a `rescue` function for rescuing
486  // abandoned futures.
487 
488  // Installs callbacks that get executed if this future completes
489  // because it failed.
491  lambda::CallableOnce<Future<T>(const Future<T>&)> f) const;
492 
493  // TODO(benh): Add overloads of 'repair' that don't require passing
494  // in a function that takes the 'const Future<T>&' parameter and use
495  // Prefer/LessPrefer to disambiguate.
496 
497  // Invokes the specified function after some duration if this future
498  // has not been completed (set, failed, or discarded). Note that
499  // this function is agnostic of discard semantics and while it will
500  // propagate discarding "up the chain" it will still invoke the
501  // specified callback after the specified duration even if 'discard'
502  // was called on the returned future.
504  const Duration& duration,
505  lambda::CallableOnce<Future<T>(const Future<T>&)> f) const;
506 
507  // TODO(benh): Add overloads of 'after' that don't require passing
508  // in a function that takes the 'const Future<T>&' parameter and use
509  // Prefer/LessPrefer to disambiguate.
510 
511 private:
512  template <typename U>
513  friend class Future;
514  friend class Promise<T>;
515  friend class WeakFuture<T>;
516  template <typename U>
517  friend std::ostream& operator<<(std::ostream&, const Future<U>&);
518 
519  enum State
520  {
521  PENDING,
522  READY,
523  FAILED,
524  DISCARDED,
525  };
526 
527  struct Data
528  {
529  Data();
530  ~Data() = default;
531 
532  void clearAllCallbacks();
533 
534  std::atomic_flag lock = ATOMIC_FLAG_INIT;
535  State state;
536  bool discard;
537  bool associated;
538  bool abandoned;
539 
540  // One of:
541  // 1. None, the state is PENDING or DISCARDED.
542  // 2. Some, the state is READY.
543  // 3. Error, the state is FAILED; 'error()' stores the message.
544  Result<T> result;
545 
546  std::vector<AbandonedCallback> onAbandonedCallbacks;
547  std::vector<DiscardCallback> onDiscardCallbacks;
548  std::vector<ReadyCallback> onReadyCallbacks;
549  std::vector<FailedCallback> onFailedCallbacks;
550  std::vector<DiscardedCallback> onDiscardedCallbacks;
551  std::vector<AnyCallback> onAnyCallbacks;
552  };
553 
554  // Abandons this future. Returns false if the future is already
555  // associated or no longer pending. Otherwise returns true and any
556  // Future::onAbandoned callbacks wil be run.
557  //
558  // If `propagating` is true then we'll abandon this future even if
559  // it has already been associated. This is important because
560  // `~Promise()` will try and abandon and we need to ignore that if
561  // the future has been associated since the promise will no longer
562  // be setting the future anyway (and is likely the reason it's being
563  // destructed, because it's useless). When the future that we've
564  // associated with gets abandoned, however, then we need to actually
565  // abandon this future too. Here's an example of this:
566  //
567  // 1: Owned<Promise<int>> promise1(new Promise<int>());
568  // 2: Owned<Promise<int>> promise2(new Promise<int>());
569  // 3:
570  // 4: Future<int> future1 = promise1->future();
571  // 5: Future<int> future2 = promise2->future();
572  // 6:
573  // 7: promise1->associate(future2);
574  // 8:
575  // 9: promise1.reset();
576  // 10:
577  // 11: assert(!future1.isAbandoned());
578  // 12:
579  // 13: promise2.reset();
580  // 14:
581  // 15: assert(future2.isAbandoned());
582  // 16: assert(future3.isAbandoned());
583  //
584  // At line 9 `~Promise()` will attempt to abandon the future by
585  // calling `abandon()` but since it's been associated we won't do
586  // anything. On line 13 the `onAbandoned()` callback will call
587  // `abandon(true)` and know we'll actually abandon the future
588  // because we're _propagating_ the abandon from the associated
589  // future.
590  //
591  // NOTE: this is an _INTERNAL_ function and should never be exposed
592  // or used outside of the implementation.
593  bool abandon(bool propagating = false);
594 
595  // Sets the value for this future, unless the future is already set,
596  // failed, or discarded, in which case it returns false.
597  bool set(const T& _t);
598  bool set(T&& _t);
599 
600  template <typename U>
601  bool _set(U&& _u);
602 
603  // Sets this future as failed, unless the future is already set,
604  // failed, or discarded, in which case it returns false.
605  bool fail(const std::string& _message);
606 
607  std::shared_ptr<Data> data;
608 };
609 
610 
611 namespace internal {
612 
613 // Helper for executing callbacks that have been registered.
614 //
615 // TODO(*): Invoke callbacks in another execution context.
616 template <typename C, typename... Arguments>
617 void run(std::vector<C>&& callbacks, Arguments&&... arguments)
618 {
619  for (size_t i = 0; i < callbacks.size(); ++i) {
620  std::move(callbacks[i])(std::forward<Arguments>(arguments)...);
621  }
622 }
623 
624 } // namespace internal {
625 
626 
627 // Represents a weak reference to a future. This class is used to
628 // break cyclic dependencies between futures.
629 template <typename T>
630 class WeakFuture
631 {
632 public:
633  explicit WeakFuture(const Future<T>& future);
634 
635  // Converts this weak reference to a concrete future. Returns none
636  // if the conversion is not successful.
637  Option<Future<T>> get() const;
638 
639 private:
640  std::weak_ptr<typename Future<T>::Data> data;
641 };
642 
643 
644 template <typename T>
646  : data(future.data) {}
647 
648 
649 template <typename T>
651 {
652  Future<T> future;
653  future.data = data.lock();
654 
655  if (future.data) {
656  return future;
657  }
658 
659  return None();
660 }
661 
662 
663 // Helper for creating failed futures.
664 struct Failure
665 {
666  explicit Failure(const std::string& _message) : message(_message) {}
667  explicit Failure(const Error& error) : message(error.message) {}
668 
669  const std::string message;
670 };
671 
672 
673 struct ErrnoFailure : public Failure
674 {
676 
677  explicit ErrnoFailure(int _code)
678  : Failure(os::strerror(_code)), code(_code) {}
679 
680  explicit ErrnoFailure(const std::string& message)
681  : ErrnoFailure(errno, message) {}
682 
683  ErrnoFailure(int _code, const std::string& message)
684  : Failure(message + ": " + os::strerror(_code)), code(_code) {}
685 
686  const int code;
687 };
688 
689 
690 // Forward declaration to use as friend below.
691 namespace internal {
692 template <typename U>
693 void discarded(Future<U> future);
694 } // namespace internal {
695 
696 
697 // TODO(benh): Make Promise a subclass of Future?
698 template <typename T>
699 class Promise
700 {
701 public:
702  Promise();
703  explicit Promise(const T& t);
704  virtual ~Promise();
705 
706  Promise(Promise<T>&& that);
707 
708  bool discard();
709  bool set(const T& _t);
710  bool set(T&& _t);
711  bool set(const Future<T>& future); // Alias for associate.
712  bool associate(const Future<T>& future);
713  bool fail(const std::string& message);
714 
715  // Returns a copy of the future associated with this promise.
716  Future<T> future() const;
717 
718 private:
719  template <typename U>
720  friend class Future;
721  template <typename U>
722  friend void internal::discarded(Future<U> future);
723 
724  template <typename U>
725  bool _set(U&& u);
726 
727  // Not copyable, not assignable.
728  Promise(const Promise<T>&);
729  Promise<T>& operator=(const Promise<T>&);
730 
731  // Helper for doing the work of actually discarding a future (called
732  // from Promise::discard as well as internal::discarded).
733  static bool discard(Future<T> future);
734 
735  Future<T> f;
736 };
737 
738 
739 template <>
740 class Promise<void>;
741 
742 
743 template <typename T>
744 class Promise<T&>;
745 
746 
747 namespace internal {
748 
749 // Discards a weak future. If the weak future is invalid (i.e., the
750 // future it references to has already been destroyed), this operation
751 // is treated as a no-op.
752 template <typename T>
753 void discard(WeakFuture<T> reference)
754 {
755  Option<Future<T>> future = reference.get();
756  if (future.isSome()) {
757  Future<T> future_ = future.get();
758  future_.discard();
759  }
760 }
761 
762 
763 // Helper for invoking Promise::discard in an onDiscarded callback
764 // (since the onDiscarded callback requires returning void we can't
765 // bind with Promise::discard).
766 template <typename T>
767 void discarded(Future<T> future)
768 {
769  Promise<T>::discard(future);
770 }
771 
772 } // namespace internal {
773 
774 
775 template <typename T>
777 {
778  // Need to "unset" `abandoned` since it gets set in the empty
779  // constructor for `Future`.
780  f.data->abandoned = false;
781 }
782 
783 
784 template <typename T>
786  : f(t) {}
787 
788 
789 template <typename T>
791 {
792  // Note that we don't discard the promise as we don't want to give
793  // the illusion that any computation hasn't started (or possibly
794  // finished) in the event that computation is "visible" by other
795  // means. However, we try and abandon the future if it hasn't been
796  // associated or set (or moved, i.e., `f.data` is true).
797  if (f.data) {
798  f.abandon();
799  }
800 }
801 
802 
803 template <typename T>
805  : f(std::move(that.f)) {}
806 
807 
808 template <typename T>
810 {
811  if (!f.data->associated) {
812  return discard(f);
813  }
814  return false;
815 }
816 
817 
818 template <typename T>
819 bool Promise<T>::set(T&& t)
820 {
821  return _set(std::move(t));
822 }
823 
824 
825 template <typename T>
826 bool Promise<T>::set(const T& t)
827 {
828  return _set(t);
829 }
830 
831 
832 template <typename T>
833 template <typename U>
834 bool Promise<T>::_set(U&& u)
835 {
836  if (!f.data->associated) {
837  return f.set(std::forward<U>(u));
838  }
839  return false;
840 }
841 
842 
843 template <typename T>
844 bool Promise<T>::set(const Future<T>& future)
845 {
846  return associate(future);
847 }
848 
849 
850 template <typename T>
851 bool Promise<T>::associate(const Future<T>& future)
852 {
853  bool associated = false;
854 
855  synchronized (f.data->lock) {
856  // Don't associate if this promise has completed. Note that this
857  // does not include if Future::discard was called on this future
858  // since in that case that would still leave the future PENDING
859  // (note that we cover that case below).
860  if (f.data->state == Future<T>::PENDING && !f.data->associated) {
861  associated = f.data->associated = true;
862 
863  // After this point we don't allow 'f' to be completed via the
864  // promise since we've set 'associated' but Future::discard on
865  // 'f' might get called which will get propagated via the
866  // 'f.onDiscard' below. Note that we currently don't propagate a
867  // discard from 'future.onDiscard' but these semantics might
868  // change if/when we make 'f' and 'future' true aliases of one
869  // another.
870  }
871  }
872 
873  // Note that we do the actual associating after releasing the lock
874  // above to avoid deadlocking by attempting to require the lock
875  // within from invoking 'f.onDiscard' and/or 'f.set/fail' via the
876  // bind statements from doing 'future.onReady/onFailed'.
877  if (associated) {
878  // TODO(jieyu): Make 'f' a true alias of 'future'. Currently, only
879  // 'discard' is associated in both directions. In other words, if
880  // a future gets discarded, the other future will also get
881  // discarded. For 'set' and 'fail', they are associated only in
882  // one direction. In other words, calling 'set' or 'fail' on this
883  // promise will not affect the result of the future that we
884  // associated.
885  f.onDiscard(lambda::bind(&internal::discard<T>, WeakFuture<T>(future)));
886 
887  // Need to disambiguate for the compiler.
888  bool (Future<T>::*set)(const T&) = &Future<T>::set;
889 
890  future
891  .onReady(lambda::bind(set, f, lambda::_1))
892  .onFailed(lambda::bind(&Future<T>::fail, f, lambda::_1))
893  .onDiscarded(lambda::bind(&internal::discarded<T>, f))
894  .onAbandoned(lambda::bind(&Future<T>::abandon, f, true));
895  }
896 
897  return associated;
898 }
899 
900 
901 template <typename T>
902 bool Promise<T>::fail(const std::string& message)
903 {
904  if (!f.data->associated) {
905  return f.fail(message);
906  }
907  return false;
908 }
909 
910 
911 template <typename T>
913 {
914  return f;
915 }
916 
917 
918 // Internal helper utilities.
919 namespace internal {
920 
921 template <typename T>
922 struct wrap
923 {
924  typedef Future<T> type;
925 };
926 
927 
928 template <typename X>
929 struct wrap<Future<X>>
930 {
931  typedef Future<X> type;
932 };
933 
934 
935 template <typename T>
936 struct unwrap
937 {
938  typedef T type;
939 };
940 
941 
942 template <typename X>
943 struct unwrap<Future<X>>
944 {
945  typedef X type;
946 };
947 
948 
949 template <typename T>
950 void select(
951  const Future<T>& future,
952  std::shared_ptr<Promise<Future<T>>> promise)
953 {
954  // We never fail the future associated with our promise.
955  assert(!promise->future().isFailed());
956 
957  if (promise->future().isPending()) { // No-op if it's discarded.
958  if (future.isReady()) { // We only set the promise if a future is ready.
959  promise->set(future);
960  }
961  }
962 }
963 
964 } // namespace internal {
965 
966 
967 // TODO(benh): Move select and discard into 'futures' namespace.
968 
969 // Returns a future that captures any ready future in a set. Note that
970 // select DOES NOT capture a future that has failed or been discarded.
971 template <typename T>
973 {
974  std::shared_ptr<Promise<Future<T>>> promise(new Promise<Future<T>>());
975 
976  promise->future().onDiscard(
977  lambda::bind(&internal::discarded<Future<T>>, promise->future()));
978 
979  foreach (const Future<T>& future, futures) {
980  future.onAny([=](const Future<T>& f) {
981  internal::select(f, promise);
982  });
983  }
984 
985  return promise->future();
986 }
987 
988 
989 template <typename T>
990 void discard(const std::set<Future<T>>& futures)
991 {
992  foreach (Future<T> future, futures) { // Need a non-const copy to discard.
993  future.discard();
994  }
995 }
996 
997 
998 template <typename T>
999 void discard(const std::list<Future<T>>& futures)
1000 {
1001  foreach (Future<T> future, futures) { // Need a non-const copy to discard.
1002  future.discard();
1003  }
1004 }
1005 
1006 
1007 template <typename T>
1008 bool Promise<T>::discard(Future<T> future)
1009 {
1010  bool result = false;
1011 
1012  synchronized (future.data->lock) {
1013  if (future.data->state == Future<T>::PENDING) {
1014  future.data->state = Future<T>::DISCARDED;
1015  result = true;
1016  }
1017  }
1018 
1019  // Invoke all callbacks associated with this future being
1020  // DISCARDED. We don't need a lock because the state is now in
1021  // DISCARDED so there should not be any concurrent modifications to
1022  // the callbacks.
1023  if (result) {
1024  // NOTE: we rely on the fact that we have `future` to protect
1025  // ourselves from one of the callbacks erroneously deleting the
1026  // future. In `Future::_set()` and `Future::fail()` we have to
1027  // explicitly take a copy to protect ourselves.
1028  internal::run(std::move(future.data->onDiscardedCallbacks));
1029  internal::run(std::move(future.data->onAnyCallbacks), future);
1030 
1031  future.data->clearAllCallbacks();
1032  }
1033 
1034  return result;
1035 }
1036 
1037 
1038 template <typename T>
1039 Future<T> Future<T>::failed(const std::string& message)
1040 {
1041  Future<T> future;
1042  future.fail(message);
1043  return future;
1044 }
1045 
1046 
1047 template <typename T>
1049  : state(PENDING),
1050  discard(false),
1051  associated(false),
1052  abandoned(false),
1053  result(None()) {}
1054 
1055 
1056 template <typename T>
1057 void Future<T>::Data::clearAllCallbacks()
1058 {
1059  onAbandonedCallbacks.clear();
1060  onAnyCallbacks.clear();
1061  onDiscardCallbacks.clear();
1062  onDiscardedCallbacks.clear();
1063  onFailedCallbacks.clear();
1064  onReadyCallbacks.clear();
1065 }
1066 
1067 
1068 template <typename T>
1070  : data(new Data())
1071 {
1072  data->abandoned = true;
1073 }
1074 
1075 
1076 template <typename T>
1077 Future<T>::Future(const T& _t)
1078  : data(new Data())
1079 {
1080  set(_t);
1081 }
1082 
1083 
1084 template <typename T>
1085 template <typename U>
1087  : data(new Data())
1088 {
1089  set(u);
1090 }
1091 
1092 
1093 template <typename T>
1095  : data(new Data())
1096 {
1097  fail(failure.message);
1098 }
1099 
1100 
1101 template <typename T>
1103  : data(new Data())
1104 {
1105  fail(failure.message);
1106 }
1107 
1108 
1109 template <typename T>
1111  : data(that.data) {}
1112 
1113 
1114 template <typename T>
1116  : data(std::move(that.data)) {}
1117 
1118 
1119 template <typename T>
1121  : data(new Data())
1122 {
1123  if (t.isSome()){
1124  set(t.get());
1125  } else {
1126  fail(t.error());
1127  }
1128 }
1129 
1130 
1131 template <typename T>
1133  : data(t.isSome() ? t->data : std::shared_ptr<Data>(new Data()))
1134 {
1135  if (!t.isSome()) {
1136  fail(t.error());
1137  }
1138 }
1139 
1140 
1141 template <typename T>
1143 {
1144  if (this != &that) {
1145  data = that.data;
1146  }
1147  return *this;
1148 }
1149 
1150 
1151 template <typename T>
1152 bool Future<T>::operator==(const Future<T>& that) const
1153 {
1154  return data == that.data;
1155 }
1156 
1157 
1158 template <typename T>
1159 bool Future<T>::operator!=(const Future<T>& that) const
1160 {
1161  return !(*this == that);
1162 }
1163 
1164 
1165 template <typename T>
1166 bool Future<T>::operator<(const Future<T>& that) const
1167 {
1168  return data < that.data;
1169 }
1170 
1171 
1172 template <typename T>
1174 {
1175  bool result = false;
1176 
1177  std::vector<DiscardCallback> callbacks;
1178  synchronized (data->lock) {
1179  if (!data->discard && data->state == PENDING) {
1180  result = data->discard = true;
1181 
1182  callbacks.swap(data->onDiscardCallbacks);
1183  }
1184  }
1185 
1186  // Invoke all callbacks associated with doing a discard on this
1187  // future. The callbacks get destroyed when we exit from the
1188  // function.
1189  if (result) {
1190  internal::run(std::move(callbacks));
1191  }
1192 
1193  return result;
1194 }
1195 
1196 
1197 template <typename T>
1198 bool Future<T>::abandon(bool propagating)
1199 {
1200  bool result = false;
1201 
1202  std::vector<AbandonedCallback> callbacks;
1203  synchronized (data->lock) {
1204  if (!data->abandoned &&
1205  data->state == PENDING &&
1206  (!data->associated || propagating)) {
1207  result = data->abandoned = true;
1208 
1209  callbacks.swap(data->onAbandonedCallbacks);
1210  }
1211  }
1212 
1213  // Invoke all callbacks. The callbacks get destroyed when we exit
1214  // from the function.
1215  if (result) {
1216  internal::run(std::move(callbacks));
1217  }
1218 
1219  return result;
1220 }
1221 
1222 
1223 template <typename T>
1225 {
1226  return data->state == PENDING;
1227 }
1228 
1229 
1230 template <typename T>
1232 {
1233  return data->state == READY;
1234 }
1235 
1236 
1237 template <typename T>
1239 {
1240  return data->state == DISCARDED;
1241 }
1242 
1243 
1244 template <typename T>
1246 {
1247  return data->state == FAILED;
1248 }
1249 
1250 
1251 template <typename T>
1253 {
1254  return data->abandoned;
1255 }
1256 
1257 
1258 template <typename T>
1260 {
1261  return data->discard;
1262 }
1263 
1264 
1265 namespace internal {
1266 
1267 inline void awaited(Owned<Latch> latch)
1268 {
1269  latch->trigger();
1270 }
1271 
1272 } // namespace internal {
1273 
1274 
1275 template <typename T>
1276 bool Future<T>::await(const Duration& duration) const
1277 {
1278  // NOTE: We need to preemptively allocate the Latch on the stack
1279  // instead of lazily create it in the critical section below because
1280  // instantiating a Latch requires creating a new process (at the
1281  // time of writing this comment) which might need to do some
1282  // synchronization in libprocess which might deadlock if some other
1283  // code in libprocess is already holding a lock and then attempts to
1284  // do Promise::set (or something similar) that attempts to acquire
1285  // the lock that we acquire here. This is an artifact of using
1286  // Future/Promise within the implementation of libprocess.
1287  //
1288  // We mostly only call 'await' in tests so this should not be a
1289  // performance concern.
1290  Owned<Latch> latch(new Latch());
1291 
1292  bool pending = false;
1293 
1294  synchronized (data->lock) {
1295  if (data->state == PENDING) {
1296  pending = true;
1297  data->onAnyCallbacks.push_back(lambda::bind(&internal::awaited, latch));
1298  }
1299  }
1300 
1301  if (pending) {
1302  return latch->await(duration);
1303  }
1304 
1305  return true;
1306 }
1307 
1308 
1309 template <typename T>
1310 const T& Future<T>::get() const
1311 {
1312  if (!isReady()) {
1313  await();
1314  }
1315 
1316  CHECK(!isPending()) << "Future was in PENDING after await()";
1317  // We can't use CHECK_READY here due to check.hpp depending on future.hpp.
1318  if (!isReady()) {
1319  CHECK(!isFailed()) << "Future::get() but state == FAILED: " << failure();
1320  CHECK(!isDiscarded()) << "Future::get() but state == DISCARDED";
1321  }
1322 
1323  assert(data->result.isSome());
1324  return data->result.get();
1325 }
1326 
1327 
1328 template <typename T>
1329 const T* Future<T>::operator->() const
1330 {
1331  return &get();
1332 }
1333 
1334 
1335 template <typename T>
1336 const std::string& Future<T>::failure() const
1337 {
1338  if (data->state != FAILED) {
1339  ABORT("Future::failure() but state != FAILED");
1340  }
1341 
1342  CHECK_ERROR(data->result);
1343  return data->result.error();
1344 }
1345 
1346 
1347 template <typename T>
1349 {
1350  bool run = false;
1351 
1352  synchronized (data->lock) {
1353  if (data->abandoned) {
1354  run = true;
1355  } else if (data->state == PENDING) {
1356  data->onAbandonedCallbacks.emplace_back(std::move(callback));
1357  }
1358  }
1359 
1360  // TODO(*): Invoke callback in another execution context.
1361  if (run) {
1362  std::move(callback)(); // NOLINT(misc-use-after-move)
1363  }
1364 
1365  return *this;
1366 }
1367 
1368 
1369 template <typename T>
1371 {
1372  bool run = false;
1373 
1374  synchronized (data->lock) {
1375  if (data->discard) {
1376  run = true;
1377  } else if (data->state == PENDING) {
1378  data->onDiscardCallbacks.emplace_back(std::move(callback));
1379  }
1380  }
1381 
1382  // TODO(*): Invoke callback in another execution context.
1383  if (run) {
1384  std::move(callback)(); // NOLINT(misc-use-after-move)
1385  }
1386 
1387  return *this;
1388 }
1389 
1390 
1391 template <typename T>
1393 {
1394  bool run = false;
1395 
1396  synchronized (data->lock) {
1397  if (data->state == READY) {
1398  run = true;
1399  } else if (data->state == PENDING) {
1400  data->onReadyCallbacks.emplace_back(std::move(callback));
1401  }
1402  }
1403 
1404  // TODO(*): Invoke callback in another execution context.
1405  if (run) {
1406  std::move(callback)(data->result.get()); // NOLINT(misc-use-after-move)
1407  }
1408 
1409  return *this;
1410 }
1411 
1412 
1413 template <typename T>
1415 {
1416  bool run = false;
1417 
1418  synchronized (data->lock) {
1419  if (data->state == FAILED) {
1420  run = true;
1421  } else if (data->state == PENDING) {
1422  data->onFailedCallbacks.emplace_back(std::move(callback));
1423  }
1424  }
1425 
1426  // TODO(*): Invoke callback in another execution context.
1427  if (run) {
1428  std::move(callback)(data->result.error()); // NOLINT(misc-use-after-move)
1429  }
1430 
1431  return *this;
1432 }
1433 
1434 
1435 template <typename T>
1437 {
1438  bool run = false;
1439 
1440  synchronized (data->lock) {
1441  if (data->state == DISCARDED) {
1442  run = true;
1443  } else if (data->state == PENDING) {
1444  data->onDiscardedCallbacks.emplace_back(std::move(callback));
1445  }
1446  }
1447 
1448  // TODO(*): Invoke callback in another execution context.
1449  if (run) {
1450  std::move(callback)(); // NOLINT(misc-use-after-move)
1451  }
1452 
1453  return *this;
1454 }
1455 
1456 
1457 template <typename T>
1458 const Future<T>& Future<T>::onAny(AnyCallback&& callback) const
1459 {
1460  bool run = false;
1461 
1462  synchronized (data->lock) {
1463  if (data->state == PENDING) {
1464  data->onAnyCallbacks.emplace_back(std::move(callback));
1465  } else {
1466  run = true;
1467  }
1468  }
1469 
1470  // TODO(*): Invoke callback in another execution context.
1471  if (run) {
1472  std::move(callback)(*this); // NOLINT(misc-use-after-move)
1473  }
1474 
1475  return *this;
1476 }
1477 
1478 namespace internal {
1479 
1480 // NOTE: We need to name this 'thenf' versus 'then' to distinguish it
1481 // from the function 'then' whose parameter 'f' doesn't return a
1482 // Future since the compiler can't properly infer otherwise.
1483 template <typename T, typename X>
1485  std::unique_ptr<Promise<X>> promise,
1486  const Future<T>& future)
1487 {
1488  if (future.isReady()) {
1489  if (future.hasDiscard()) {
1490  promise->discard();
1491  } else {
1492  promise->associate(std::move(f)(future.get()));
1493  }
1494  } else if (future.isFailed()) {
1495  promise->fail(future.failure());
1496  } else if (future.isDiscarded()) {
1497  promise->discard();
1498  }
1499 }
1500 
1501 
1502 template <typename T, typename X>
1503 void then(lambda::CallableOnce<X(const T&)>&& f,
1504  std::unique_ptr<Promise<X>> promise,
1505  const Future<T>& future)
1506 {
1507  if (future.isReady()) {
1508  if (future.hasDiscard()) {
1509  promise->discard();
1510  } else {
1511  promise->set(std::move(f)(future.get()));
1512  }
1513  } else if (future.isFailed()) {
1514  promise->fail(future.failure());
1515  } else if (future.isDiscarded()) {
1516  promise->discard();
1517  }
1518 }
1519 
1520 
1521 template <typename T>
1522 void repair(
1524  std::unique_ptr<Promise<T>> promise,
1525  const Future<T>& future)
1526 {
1527  CHECK(!future.isPending());
1528  if (future.isFailed()) {
1529  promise->associate(std::move(f)(future));
1530  } else {
1531  promise->associate(future);
1532  }
1533 }
1534 
1535 
1536 template <typename T>
1537 void expired(
1538  const std::shared_ptr<lambda::CallableOnce<Future<T>(const Future<T>&)>>& f,
1539  const std::shared_ptr<Latch>& latch,
1540  const std::shared_ptr<Promise<T>>& promise,
1541  const std::shared_ptr<Option<Timer>>& timer,
1542  const Future<T>& future)
1543 {
1544  if (latch->trigger()) {
1545  // If this callback executed first (i.e., we triggered the latch)
1546  // then we want to clear out the timer so that we don't hold a
1547  // circular reference to `future` in it's own `onAny`
1548  // callbacks. See the comment in `Future::after`.
1549  *timer = None();
1550 
1551  // Note that we don't bother checking if 'future' has been
1552  // discarded (i.e., 'future.isDiscarded()' returns true) since
1553  // there is a race between when we make that check and when we
1554  // would invoke 'f(future)' so the callee 'f' should ALWAYS check
1555  // if the future has been discarded and rather than hiding a
1556  // non-deterministic bug we always call 'f' if the timer has
1557  // expired.
1558  promise->associate(std::move(*f)(future));
1559  }
1560 }
1561 
1562 
1563 template <typename T>
1564 void after(
1565  const std::shared_ptr<Latch>& latch,
1566  const std::shared_ptr<Promise<T>>& promise,
1567  const std::shared_ptr<Option<Timer>>& timer,
1568  const Future<T>& future)
1569 {
1570  CHECK(!future.isPending());
1571  if (latch->trigger()) {
1572  // If this callback executes first (i.e., we triggered the latch)
1573  // it must be the case that `timer` is still some and we can try
1574  // and cancel the timer.
1575  CHECK_SOME(*timer);
1576  Clock::cancel(timer->get());
1577 
1578  // We also force the timer to get deallocated so that there isn't
1579  // a cicular reference of the timer with itself which keeps around
1580  // a reference to the original future.
1581  *timer = None();
1582 
1583  promise->associate(future);
1584  }
1585 }
1586 
1587 } // namespace internal {
1588 
1589 
1590 template <typename T>
1591 template <typename X>
1593 {
1594  std::unique_ptr<Promise<X>> promise(new Promise<X>());
1595  Future<X> future = promise->future();
1596 
1598  &internal::thenf<T, X>, std::move(f), std::move(promise), lambda::_1);
1599 
1600  onAny(std::move(thenf));
1601 
1602  onAbandoned([=]() mutable {
1603  future.abandon();
1604  });
1605 
1606  // Propagate discarding up the chain. To avoid cyclic dependencies,
1607  // we keep a weak future in the callback.
1608  future.onDiscard(lambda::bind(&internal::discard<T>, WeakFuture<T>(*this)));
1609 
1610  return future;
1611 }
1612 
1613 
1614 template <typename T>
1615 template <typename X>
1617 {
1618  std::unique_ptr<Promise<X>> promise(new Promise<X>());
1619  Future<X> future = promise->future();
1620 
1622  &internal::then<T, X>, std::move(f), std::move(promise), lambda::_1);
1623 
1624  onAny(std::move(then));
1625 
1626  onAbandoned([=]() mutable {
1627  future.abandon();
1628  });
1629 
1630  // Propagate discarding up the chain. To avoid cyclic dependencies,
1631  // we keep a weak future in the callback.
1632  future.onDiscard(lambda::bind(&internal::discard<T>, WeakFuture<T>(*this)));
1633 
1634  return future;
1635 }
1636 
1637 
1638 template <typename T>
1639 template <typename F>
1641 {
1642  std::shared_ptr<Promise<T>> promise(new Promise<T>());
1643 
1644  const Future<T> future = *this;
1645 
1646  typedef decltype(std::move(f)(future)) R;
1647 
1648  std::shared_ptr<lambda::CallableOnce<R(const Future<T>&)>> callable(
1649  new lambda::CallableOnce<R(const Future<T>&)>(std::move(f)));
1650 
1651  onAny([=]() {
1652  if (future.isDiscarded() || future.isFailed()) {
1653  // We reset `discard` so that if a future gets returned from
1654  // `f(future)` we won't immediately discard it! We still want to
1655  // let the future get discarded later, however, hence if it gets
1656  // set again in the future it'll propagate to the returned
1657  // future.
1658  synchronized (promise->f.data->lock) {
1659  promise->f.data->discard = false;
1660  }
1661 
1662  promise->set(std::move(*callable)(future));
1663  } else {
1664  promise->associate(future);
1665  }
1666  });
1667 
1668  onAbandoned([=]() {
1669  // See comment above for why we reset `discard` here.
1670  synchronized (promise->f.data->lock) {
1671  promise->f.data->discard = false;
1672  }
1673  promise->set(std::move(*callable)(future));
1674  });
1675 
1676  // Propagate discarding up the chain. To avoid cyclic dependencies,
1677  // we keep a weak future in the callback.
1678  promise->future().onDiscard(
1679  lambda::bind(&internal::discard<T>, WeakFuture<T>(*this)));
1680 
1681  return promise->future();
1682 }
1683 
1684 
1685 template <typename T>
1687  lambda::CallableOnce<Future<T>(const Future<T>&)> f) const
1688 {
1689  std::unique_ptr<Promise<T>> promise(new Promise<T>());
1690  Future<T> future = promise->future();
1691 
1692  onAny(lambda::partial(
1693  &internal::repair<T>, std::move(f), std::move(promise), lambda::_1));
1694 
1695  onAbandoned([=]() mutable {
1696  future.abandon();
1697  });
1698 
1699  // Propagate discarding up the chain. To avoid cyclic dependencies,
1700  // we keep a weak future in the callback.
1701  future.onDiscard(lambda::bind(&internal::discard<T>, WeakFuture<T>(*this)));
1702 
1703  return future;
1704 }
1705 
1706 
1707 template <typename T>
1709  const Duration& duration,
1710  lambda::CallableOnce<Future<T>(const Future<T>&)> f) const
1711 {
1712  // TODO(benh): Using a Latch here but Once might be cleaner.
1713  // Unfortunately, Once depends on Future so we can't easily use it
1714  // from here.
1715  std::shared_ptr<Latch> latch(new Latch());
1716  std::shared_ptr<Promise<T>> promise(new Promise<T>());
1717 
1718  // We need to control the lifetime of the timer we create below so
1719  // that we can force the timer to get deallocated after it
1720  // expires. The reason we want to force the timer to get deallocated
1721  // after it expires is because the timer's lambda has a copy of
1722  // `this` (i.e., a Future) and it's stored in the `onAny` callbacks
1723  // of `this` thus creating a circular reference. By storing a
1724  // `shared_ptr<Option<Timer>>` we're able to set the option to none
1725  // after the timer expires which will deallocate our copy of the
1726  // timer and leave the `Option<Timer>` stored in the lambda of the
1727  // `onAny` callback as none. Note that this is safe because the
1728  // `Latch` makes sure that only one of the callbacks will manipulate
1729  // the `shared_ptr<Option<Timer>>` so there isn't any concurrency
1730  // issues we have to worry about.
1731  std::shared_ptr<Option<Timer>> timer(new Option<Timer>());
1732 
1733  typedef lambda::CallableOnce<Future<T>(const Future<T>&)> F;
1734  std::shared_ptr<F> callable(new F(std::move(f)));
1735 
1736  // Set up a timer to invoke the callback if this future has not
1737  // completed. Note that we do not pass a weak reference for this
1738  // future as we don't want the future to get cleaned up and then
1739  // have the timer expire because then we wouldn't have a valid
1740  // future that we could pass to `f`! The reference to `this` that is
1741  // captured in the timer will get removed by setting the
1742  // `Option<Timer>` to none (see comment above) either if the timer
1743  // expires or if `this` completes and we cancel the timer (see
1744  // `internal::expired` and `internal::after` callbacks for where we
1745  // force the deallocation of our copy of the timer).
1746  *timer = Clock::timer(
1747  duration,
1748  lambda::bind(&internal::expired<T>, callable, latch, promise, timer,
1749  *this));
1750 
1751  onAny(lambda::bind(&internal::after<T>, latch, promise, timer, lambda::_1));
1752 
1753  onAbandoned([=]() {
1754  promise->future().abandon();
1755  });
1756 
1757  // Propagate discarding up the chain. To avoid cyclic dependencies,
1758  // we keep a weak future in the callback.
1759  promise->future().onDiscard(
1760  lambda::bind(&internal::discard<T>, WeakFuture<T>(*this)));
1761 
1762  return promise->future();
1763 }
1764 
1765 
1766 template <typename T>
1767 bool Future<T>::set(T&& t)
1768 {
1769  return _set(std::move(t));
1770 }
1771 
1772 
1773 template <typename T>
1774 bool Future<T>::set(const T& t)
1775 {
1776  return _set(t);
1777 }
1778 
1779 
1780 template <typename T>
1781 template <typename U>
1782 bool Future<T>::_set(U&& u)
1783 {
1784  bool result = false;
1785 
1786  synchronized (data->lock) {
1787  if (data->state == PENDING) {
1788  data->result = std::forward<U>(u);
1789  data->state = READY;
1790  result = true;
1791  }
1792  }
1793 
1794  // Invoke all callbacks associated with this future being READY. We
1795  // don't need a lock because the state is now in READY so there
1796  // should not be any concurrent modifications to the callbacks.
1797  if (result) {
1798  // Grab a copy of `data` just in case invoking the callbacks
1799  // erroneously attempts to delete this future.
1800  std::shared_ptr<typename Future<T>::Data> copy = data;
1801  internal::run(std::move(copy->onReadyCallbacks), copy->result.get());
1802  internal::run(std::move(copy->onAnyCallbacks), *this);
1803 
1804  copy->clearAllCallbacks();
1805  }
1806 
1807  return result;
1808 }
1809 
1810 
1811 template <typename T>
1812 bool Future<T>::fail(const std::string& _message)
1813 {
1814  bool result = false;
1815 
1816  synchronized (data->lock) {
1817  if (data->state == PENDING) {
1818  data->result = Result<T>(Error(_message));
1819  data->state = FAILED;
1820  result = true;
1821  }
1822  }
1823 
1824  // Invoke all callbacks associated with this future being FAILED. We
1825  // don't need a lock because the state is now in FAILED so there
1826  // should not be any concurrent modifications to the callbacks.
1827  if (result) {
1828  // Grab a copy of `data` just in case invoking the callbacks
1829  // erroneously attempts to delete this future.
1830  std::shared_ptr<typename Future<T>::Data> copy = data;
1831  internal::run(std::move(copy->onFailedCallbacks), copy->result.error());
1832  internal::run(std::move(copy->onAnyCallbacks), *this);
1833 
1834  copy->clearAllCallbacks();
1835  }
1836 
1837  return result;
1838 }
1839 
1840 
1841 template <typename T>
1842 std::ostream& operator<<(std::ostream& stream, const Future<T>& future)
1843 {
1844  const std::string suffix = future.data->discard ? " (with discard)" : "";
1845 
1846  switch (future.data->state) {
1847  case Future<T>::PENDING:
1848  if (future.data->abandoned) {
1849  return stream << "Abandoned" << suffix;
1850  }
1851  return stream << "Pending" << suffix;
1852 
1853  case Future<T>::READY:
1854  // TODO(benh): Stringify `Future<T>::get()` if it can be
1855  // stringified (will need to be SFINAE'ed appropriately).
1856  return stream << "Ready" << suffix;
1857 
1858  case Future<T>::FAILED:
1859  return stream << "Failed" << suffix << ": " << future.failure();
1860 
1861  case Future<T>::DISCARDED:
1862  return stream << "Discarded" << suffix;
1863  }
1864 
1865  return stream;
1866 }
1867 
1868 
1869 // Helper for setting a set of Promises.
1870 template <typename T>
1871 void setPromises(std::set<Promise<T>*>* promises, const T& t)
1872 {
1873  foreach (Promise<T>* promise, *promises) {
1874  promise->set(t);
1875  delete promise;
1876  }
1877  promises->clear();
1878 }
1879 
1880 
1881 // Helper for failing a set of Promises.
1882 template <typename T>
1883 void failPromises(std::set<Promise<T>*>* promises, const std::string& failure)
1884 {
1885  foreach (Promise<T>* promise, *promises) {
1886  promise->fail(failure);
1887  delete promise;
1888  }
1889  promises->clear();
1890 }
1891 
1892 
1893 // Helper for discarding a set of Promises.
1894 template <typename T>
1896 {
1897  foreach (Promise<T>* promise, *promises) {
1898  promise->discard();
1899  delete promise;
1900  }
1901  promises->clear();
1902 }
1903 
1904 
1905 // Helper for discarding an individual promise in the set.
1906 template <typename T>
1907 void discardPromises(std::set<Promise<T>*>* promises, const Future<T>& future)
1908 {
1909  foreach (Promise<T>* promise, *promises) {
1910  if (promise->future() == future) {
1911  promise->discard();
1912  promises->erase(promise);
1913  delete promise;
1914  return;
1915  }
1916  }
1917 }
1918 
1919 
1920 // Returns a future that will not propagate a discard through to the
1921 // future passed in as an argument. This can be very valuable if you
1922 // want to block some future from getting discarded.
1923 //
1924 // Example:
1925 //
1926 // Promise<int> promise;
1927 // Future<int> future = undiscardable(promise.future());
1928 // future.discard();
1929 // assert(!promise.future().hasDiscard());
1930 //
1931 // Or another example, when chaining futures:
1932 //
1933 // Future<int> future = undiscardable(
1934 // foo()
1935 // .then([]() { ...; })
1936 // .then([]() { ...; }));
1937 //
1938 // This will guarantee that a discard _will not_ propagate to `foo()`
1939 // or any of the futures returned from the invocations of `.then()`.
1940 template <typename T>
1942 {
1943  std::unique_ptr<Promise<T>> promise(new Promise<T>());
1944  Future<T> future_ = promise->future();
1945  future.onAny(lambda::partial(
1946  [](std::unique_ptr<Promise<T>> promise, const Future<T>& future) {
1947  promise->associate(future);
1948  },
1949  std::move(promise),
1950  lambda::_1));
1951  return future_;
1952 }
1953 
1954 
1955 // Decorator that for some callable `f` invokes
1956 // `undiscardable(f(args))` for some `args`. This is used by the
1957 // overload of `undiscardable()` that takes callables instead of a
1958 // specialization of `Future`.
1959 //
1960 // TODO(benh): Factor out a generic decorator pattern to be used in
1961 // other circumstances, e.g., to replace `_Deferred`.
1962 template <typename F>
1964 {
1965  template <
1966  typename G,
1967  typename std::enable_if<
1968  std::is_constructible<F, G>::value, int>::type = 0>
1969  UndiscardableDecorator(G&& g) : f(std::forward<G>(g)) {}
1970 
1971  template <typename... Args>
1972  auto operator()(Args&&... args)
1973  -> decltype(std::declval<F&>()(std::forward<Args>(args)...))
1974  {
1975  using Result =
1976  typename std::decay<decltype(f(std::forward<Args>(args)...))>::type;
1977 
1978  static_assert(
1980  "Expecting Future<T> to be returned from undiscarded(...)");
1981 
1982  return undiscardable(f(std::forward<Args>(args)...));
1983  }
1984 
1985  F f;
1986 };
1987 
1988 
1989 // An overload of `undiscardable()` above that takes and returns a
1990 // callable. The returned callable has decorated the provided callable
1991 // `f` such that when the returned callable is invoked it will in turn
1992 // invoke `undiscardable(f(args))` for some `args`. See
1993 // `UndiscardableDecorator` above for more details.
1994 //
1995 // Example:
1996 //
1997 // Future<int> future = foo()
1998 // .then(undiscardable([]() { ...; }));
1999 //
2000 // This guarantees that even if `future` is discarded the discard will
2001 // not propagate into the lambda passed into `.then()`.
2002 template <
2003  typename F,
2004  typename std::enable_if<
2006  int>::type = 0>
2008 {
2009  return UndiscardableDecorator<
2010  typename std::decay<F>::type>(std::forward<F>(f));
2011 }
2012 
2013 } // namespace process {
2014 
2015 #endif // __PROCESS_FUTURE_HPP__
void select(const Future< T > &future, std::shared_ptr< Promise< Future< T >>> promise)
Definition: future.hpp:950
bool isReady() const
Definition: future.hpp:1231
WeakFuture(const Future< T > &future)
Definition: future.hpp:645
std::string strerror(int errno_)
A thread-safe version of strerror.
Definition: strerror.hpp:30
Definition: errorbase.hpp:35
Definition: option.hpp:28
#define ABORT(...)
Definition: abort.hpp:40
F && f
Definition: defer.hpp:270
const Future< T > & onAny(F &&f) const
Definition: future.hpp:379
lambda::CallableOnce< void()> DiscardedCallback
Definition: future.hpp:166
const T & get() const
Definition: future.hpp:1310
bool set(const T &_t)
Definition: future.hpp:826
T type
Definition: future.hpp:938
Failure(const std::string &_message)
Definition: future.hpp:666
bool pending(int signal)
Definition: signals.hpp:50
Definition: try.hpp:34
Future< T > type
Definition: future.hpp:924
bool fail(const std::string &message)
Definition: future.hpp:902
Definition: future.hpp:664
void awaited(Owned< Latch > latch)
Definition: future.hpp:1267
const Future< T > & onDiscarded(F &&f) const
Definition: future.hpp:368
ErrnoFailure(const std::string &message)
Definition: future.hpp:680
lambda::CallableOnce< void()> DiscardCallback
Definition: future.hpp:163
static bool cancel(const Timer &timer)
const Future< T > & onDiscard(DiscardCallback &&callback) const
Definition: future.hpp:1370
internal::Partial< typename std::decay< F >::type, typename std::decay< Args >::type...> partial(F &&f, Args &&...args)
Definition: lambda.hpp:291
bool await(const Duration &duration=Seconds(-1)) const
Definition: future.hpp:1276
X type
Definition: future.hpp:945
const Future< T > & onFailed(FailedCallback &&callback) const
Definition: future.hpp:1414
const Future< T > & onFailed(F &&f) const
Definition: future.hpp:362
#define CHECK_ERROR(expression)
Definition: check.hpp:52
bool operator!=(const Future< T > &that) const
Definition: future.hpp:1159
void thenf(lambda::CallableOnce< Future< X >(const T &)> &&f, std::unique_ptr< Promise< X >> promise, const Future< T > &future)
Definition: future.hpp:1484
bool discard()
Definition: future.hpp:1173
UndiscardableDecorator(G &&g)
Definition: future.hpp:1969
void expired(const std::shared_ptr< lambda::CallableOnce< Future< T >(const Future< T > &)>> &f, const std::shared_ptr< Latch > &latch, const std::shared_ptr< Promise< T >> &promise, const std::shared_ptr< Option< Timer >> &timer, const Future< T > &future)
Definition: future.hpp:1537
void failPromises(std::set< Promise< T > * > *promises, const std::string &failure)
Definition: future.hpp:1883
Future< T > & operator=(const Future< T > &that)
Definition: future.hpp:1142
Future< T > recover(_Deferred< F > &&deferred) const
Definition: future.hpp:478
Definition: duration.hpp:32
Definition: result.hpp:40
bool isPending() const
Definition: future.hpp:1224
Definition: future.hpp:66
bool isSome() const
Definition: option.hpp:115
bool operator==(const Future< T > &that) const
Definition: future.hpp:1152
Definition: future.hpp:63
void after(const std::shared_ptr< Latch > &latch, const std::shared_ptr< Promise< T >> &promise, const std::shared_ptr< Option< Timer >> &timer, const Future< T > &future)
Definition: future.hpp:1564
#define CHECK_SOME(expression)
Definition: check.hpp:44
bool isDiscarded() const
Definition: future.hpp:1238
ErrnoFailure()
Definition: future.hpp:675
void discardPromises(std::set< Promise< T > * > *promises)
Definition: future.hpp:1895
const Future< T > & onDiscarded(DiscardedCallback &&callback) const
Definition: future.hpp:1436
const Future< T > & onFailed(_Deferred< F > &&deferred) const
Definition: future.hpp:203
const Future< T > & onDiscard(_Deferred< F > &&deferred) const
Definition: future.hpp:189
F f
Definition: future.hpp:1985
const Future< T > & onAny(AnyCallback &&callback) const
Definition: future.hpp:1458
Definition: deferred.hpp:64
lambda::CallableOnce< void(const T &)> ReadyCallback
Definition: future.hpp:164
Definition: duration.hpp:259
bool associate(const Future< T > &future)
Definition: future.hpp:851
void discard(WeakFuture< T > reference)
Definition: future.hpp:753
Future< Future< T > > select(const std::set< Future< T >> &futures)
Definition: future.hpp:972
Definition: traits.hpp:17
Definition: future.hpp:73
Future< X > then(lambda::CallableOnce< X()> f) const
Definition: future.hpp:401
const Future< T > & onReady(F &&f) const
Definition: future.hpp:356
const Future< T > & onAbandoned(AbandonedCallback &&callback) const
Definition: future.hpp:1348
void repair(lambda::CallableOnce< Future< T >(const Future< T > &)> &&f, std::unique_ptr< Promise< T >> promise, const Future< T > &future)
Definition: future.hpp:1522
bool isSome() const
Definition: try.hpp:70
Failure(const Error &error)
Definition: future.hpp:667
const Future< T > & onDiscard(F &&f) const
Definition: future.hpp:345
Definition: future.hpp:673
const T & get() const &
Definition: option.hpp:118
Protocol< PromiseRequest, PromiseResponse > promise
Future< X > type
Definition: future.hpp:931
Future< T > repair(lambda::CallableOnce< Future< T >(const Future< T > &)> f) const
Definition: future.hpp:1686
Definition: future.hpp:78
void discard(const std::set< Future< T >> &futures)
Definition: future.hpp:990
static Try error(const E &e)
Definition: try.hpp:42
Future< R > run(R(*method)())
Definition: run.hpp:55
Try< std::vector< Entry > > list(const std::string &hierarchy, const std::string &cgroup)
Future< X > then(lambda::CallableOnce< Future< X >(const T &)> f) const
Definition: future.hpp:1592
Future< T > undiscardable(const Future< T > &future)
Definition: future.hpp:1941
void run(std::vector< C > &&callbacks, Arguments &&...arguments)
Definition: future.hpp:617
Future< std::list< Future< T > > > await(const std::list< Future< T >> &futures)
Definition: collect.hpp:305
lambda::CallableOnce< void()> AbandonedCallback
Definition: future.hpp:162
bool operator<(const Future< T > &that) const
Definition: future.hpp:1166
Future()
Definition: future.hpp:1069
Result< Process > process(pid_t pid)
Definition: freebsd.hpp:30
Definition: none.hpp:27
Future< X > then(lambda::CallableOnce< Future< X >()> f) const
Definition: future.hpp:394
std::string error(const std::string &msg, uint32_t code)
lambda::CallableOnce< void(const std::string &)> FailedCallback
Definition: future.hpp:165
void then(lambda::CallableOnce< X(const T &)> &&f, std::unique_ptr< Promise< X >> promise, const Future< T > &future)
Definition: future.hpp:1503
const int code
Definition: future.hpp:686
Promise()
Definition: future.hpp:776
Future< T > future() const
Definition: future.hpp:912
ErrnoFailure(int _code, const std::string &message)
Definition: future.hpp:683
const std::string message
Definition: future.hpp:669
bool await(const Duration &duration=Seconds(-1))
auto then(F &&f) const -> decltype(this->then(std::forward< F >(f), Prefer()))
Definition: future.hpp:464
bool discard()
Definition: future.hpp:809
Try< uint32_t > type(const std::string &path)
Future< T > after(const Duration &duration, lambda::CallableOnce< Future< T >(const Future< T > &)> f) const
Definition: future.hpp:1708
const Future< T > & onAbandoned(F &&f) const
Definition: future.hpp:334
bool isAbandoned() const
Definition: future.hpp:1252
ErrnoFailure(int _code)
Definition: future.hpp:677
void discarded(Future< U > future)
const Future< T > & onAny(_Deferred< F > &&deferred) const
Definition: future.hpp:217
static Timer timer(const Duration &duration, const lambda::function< void()> &thunk)
const Future< T > & onDiscarded(_Deferred< F > &&deferred) const
Definition: future.hpp:210
Try< Nothing > bind(int_fd s, const Address &address)
Definition: network.hpp:46
const std::string & failure() const
Definition: future.hpp:1336
Option< Future< T > > get() const
Definition: future.hpp:650
~Future()=default
const Future< T > & onReady(_Deferred< F > &&deferred) const
Definition: future.hpp:196
Definition: owned.hpp:35
Definition: future.hpp:1963
Definition: latch.hpp:24
static Future< T > failed(const std::string &message)
Definition: future.hpp:1039
bool hasDiscard() const
Definition: future.hpp:1259
T copy(const T &t)
Definition: utils.hpp:21
const Future< T > & onReady(ReadyCallback &&callback) const
Definition: future.hpp:1392
const T & get() const
Definition: try.hpp:73
virtual ~Promise()
Definition: future.hpp:790
const Future< T > & onAbandoned(_Deferred< F > &&deferred) const
Definition: future.hpp:182
Definition: lambda.hpp:341
auto operator()(Args &&...args) -> decltype(std::declval< F & >()(std::forward< Args >(args)...))
Definition: future.hpp:1972
void setPromises(std::set< Promise< T > * > *promises, const T &t)
Definition: future.hpp:1871
const T * operator->() const
Definition: future.hpp:1329
bool isFailed() const
Definition: future.hpp:1245
Definition: future.hpp:57
Future< T > recover(F &&f) const
Definition: future.hpp:1640