Apache Mesos
future.hpp
Go to the documentation of this file.
1 // Licensed under the Apache License, Version 2.0 (the "License");
2 // you may not use this file except in compliance with the License.
3 // You may obtain a copy of the License at
4 //
5 // http://www.apache.org/licenses/LICENSE-2.0
6 //
7 // Unless required by applicable law or agreed to in writing, software
8 // distributed under the License is distributed on an "AS IS" BASIS,
9 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
10 // See the License for the specific language governing permissions and
11 // limitations under the License
12 
13 #ifndef __PROCESS_FUTURE_HPP__
14 #define __PROCESS_FUTURE_HPP__
15 
16 #include <assert.h>
17 
18 #include <atomic>
19 #include <list>
20 #include <memory> // TODO(benh): Replace shared_ptr with unique_ptr.
21 #include <ostream>
22 #include <set>
23 #include <type_traits>
24 #include <utility>
25 #include <vector>
26 
27 #include <glog/logging.h>
28 
29 #include <process/clock.hpp>
30 #include <process/latch.hpp>
31 #include <process/owned.hpp>
32 #include <process/pid.hpp>
33 #include <process/timer.hpp>
34 
35 #include <stout/abort.hpp>
36 #include <stout/check.hpp>
37 #include <stout/duration.hpp>
38 #include <stout/error.hpp>
39 #include <stout/lambda.hpp>
40 #include <stout/none.hpp>
41 #include <stout/option.hpp>
42 #include <stout/preprocessor.hpp>
43 #include <stout/result.hpp>
44 #include <stout/result_of.hpp>
45 #include <stout/stringify.hpp>
46 #include <stout/synchronized.hpp>
47 #include <stout/try.hpp>
48 
49 #include <stout/os/strerror.hpp>
50 
51 namespace process {
52 
53 // Forward declarations (instead of include to break circular dependency).
54 template <typename G>
55 struct _Deferred;
56 
57 template <typename T>
58 class Future;
59 
60 
61 namespace internal {
62 
63 template <typename T>
64 struct wrap;
65 
66 template <typename T>
67 struct unwrap;
68 
69 } // namespace internal {
70 
71 
72 // Forward declaration of Promise.
73 template <typename T>
74 class Promise;
75 
76 
77 // Forward declaration of WeakFuture.
78 template <typename T>
79 class WeakFuture;
80 
81 // Forward declaration of Failure.
82 struct Failure;
83 struct ErrnoFailure;
84 
85 
86 // Definition of a "shared" future. A future can hold any
87 // copy-constructible value. A future is considered "shared" because
88 // by default a future can be accessed concurrently.
89 template <typename T>
90 class Future
91 {
92 public:
93  // Constructs a failed future.
94  static Future<T> failed(const std::string& message);
95 
96  Future();
97 
98  /*implicit*/ Future(const T& _t);
99  /*implicit*/ Future(T&& _t);
100 
101  template <typename U>
102  /*implicit*/ Future(const U& u);
103 
104  /*implicit*/ Future(const Failure& failure);
105 
106  /*implicit*/ Future(const ErrnoFailure& failure);
107 
108  /*implicit*/ Future(const Future<T>& that) = default;
109  /*implicit*/ Future(Future<T>&& that) = default;
110 
111  template <typename E>
112  /*implicit*/ Future(const Try<T, E>& t);
113 
114  template <typename E>
115  /*implicit*/ Future(const Try<Future<T>, E>& t);
116 
117  ~Future() = default;
118 
119  // Futures are assignable (and copyable). This results in the
120  // reference to the previous future data being decremented and a
121  // reference to 'that' being incremented.
122  Future<T>& operator=(const Future<T>& that) = default;
123  Future<T>& operator=(Future<T>&& that) = default;
124 
125  // Comparison operators useful for using futures in collections.
126  bool operator==(const Future<T>& that) const;
127  bool operator!=(const Future<T>& that) const;
128  bool operator<(const Future<T>& that) const;
129 
130  // Helpers to get the current state of this future.
131  bool isPending() const;
132  bool isReady() const;
133  bool isDiscarded() const;
134  bool isFailed() const;
135  bool isAbandoned() const;
136  bool hasDiscard() const;
137 
138  // Discards this future. Returns false if discard has already been
139  // called or the future has already completed, i.e., is ready,
140  // failed, or discarded. Note that a discard does not terminate any
141  // computation but rather acts as a suggestion or indication that
142  // the caller no longer cares about the result of some
143  // computation. The callee can decide whether or not to continue the
144  // computation on their own (where best practices are to attempt to
145  // stop the computation if possible). The callee can discard the
146  // computation via Promise::discard which completes a future, at
147  // which point, Future::isDiscarded is true (and the
148  // Future::onDiscarded callbacks are executed). Before that point,
149  // but after calling Future::discard, only Future::hasDiscard will
150  // return true and the Future::onDiscard callbacks will be invoked.
151  bool discard();
152 
153  // Waits for this future to become ready, discarded, or failed.
154  bool await(const Duration& duration = Seconds(-1)) const;
155 
156  // Return the value associated with this future, waits indefinitely
157  // until a value gets associated or until the future is discarded.
158  const T& get() const;
159  const T* operator->() const;
160 
161  // Returns the failure message associated with this future.
162  const std::string& failure() const;
163 
164  // Type of the callback functions that can get invoked when the
165  // future gets set, fails, or is discarded.
172 
173  // Installs callbacks for the specified events and returns a const
174  // reference to 'this' in order to easily support chaining.
175  const Future<T>& onAbandoned(AbandonedCallback&& callback) const;
176  const Future<T>& onDiscard(DiscardCallback&& callback) const;
177  const Future<T>& onReady(ReadyCallback&& callback) const;
178  const Future<T>& onFailed(FailedCallback&& callback) const;
179  const Future<T>& onDiscarded(DiscardedCallback&& callback) const;
180  const Future<T>& onAny(AnyCallback&& callback) const;
181 
182  // TODO(benh): Add onReady, onFailed, onAny for _Deferred<F> where F
183  // is not expected.
184 
185  template <typename F>
186  const Future<T>& onAbandoned(_Deferred<F>&& deferred) const
187  {
188  return onAbandoned(
189  std::move(deferred).operator lambda::CallableOnce<void()>());
190  }
191 
192  template <typename F>
193  const Future<T>& onDiscard(_Deferred<F>&& deferred) const
194  {
195  return onDiscard(
196  std::move(deferred).operator lambda::CallableOnce<void()>());
197  }
198 
199  template <typename F>
200  const Future<T>& onReady(_Deferred<F>&& deferred) const
201  {
202  return onReady(
203  std::move(deferred).operator lambda::CallableOnce<void(const T&)>());
204  }
205 
206  template <typename F>
207  const Future<T>& onFailed(_Deferred<F>&& deferred) const
208  {
209  return onFailed(std::move(deferred)
210  .operator lambda::CallableOnce<void(const std::string&)>());
211  }
212 
213  template <typename F>
214  const Future<T>& onDiscarded(_Deferred<F>&& deferred) const
215  {
216  return onDiscarded(
217  std::move(deferred).operator lambda::CallableOnce<void()>());
218  }
219 
220  template <typename F>
221  const Future<T>& onAny(_Deferred<F>&& deferred) const
222  {
223  return onAny(std::move(deferred)
224  .operator lambda::CallableOnce<void(const Future<T>&)>());
225  }
226 
227 private:
228  // We use the 'Prefer' and 'LessPrefer' structs as a way to prefer
229  // one function over the other when doing SFINAE for the 'onReady',
230  // 'onFailed', 'onAny', and 'then' functions. In each of these cases
231  // we prefer calling the version of the functor that takes in an
232  // argument (i.e., 'const T&' for 'onReady' and 'then' and 'const
233  // std::string&' for 'onFailed'), but we allow functors that don't
234  // care about the argument. We don't need to do this for
235  // 'onDiscard', 'onDiscarded' or 'onAbandoned' because they don't
236  // take an argument.
237  struct LessPrefer {};
238  struct Prefer : LessPrefer {};
239 
241  const Future<T>& onReady(F&& f, Prefer) const
242  {
243  return onReady(lambda::CallableOnce<void(const T&)>(
245  [](typename std::decay<F>::type&& f, const T& t) {
246  std::move(f)(t);
247  },
248  std::forward<F>(f),
249  lambda::_1)));
250  }
251 
252  // This is the less preferred `onReady`, we prefer the `onReady` method which
253  // has `f` taking a `const T&` parameter. Unfortunately, to complicate
254  // matters, if `F` is the result of a `std::bind` expression we need to SFINAE
255  // out this version of `onReady` and force the use of the preferred `onReady`
256  // (which works because `std::bind` will just ignore the `const T&` argument).
257  // This is necessary because Visual Studio 2015 doesn't support using the
258  // `std::bind` call operator with `result_of` as it's technically not a
259  // requirement by the C++ standard.
260  template <
261  typename F,
262  typename = typename result_of<typename std::enable_if<
264  F>::type()>::type>
265  const Future<T>& onReady(F&& f, LessPrefer) const
266  {
267  return onReady(lambda::CallableOnce<void(const T&)>(
269  [](typename std::decay<F>::type&& f, const T&) {
270  std::move(f)();
271  },
272  std::forward<F>(f),
273  lambda::_1)));
274  }
275 
277  const Future<T>& onFailed(F&& f, Prefer) const
278  {
279  return onFailed(lambda::CallableOnce<void(const std::string&)>(
281  [](typename std::decay<F>::type&& f, const std::string& message) {
282  std::move(f)(message);
283  },
284  std::forward<F>(f),
285  lambda::_1)));
286  }
287 
288  // Refer to the less preferred version of `onReady` for why these SFINAE
289  // conditions are necessary.
290  template <
291  typename F,
292  typename = typename result_of<typename std::enable_if<
294  F>::type()>::type>
295  const Future<T>& onFailed(F&& f, LessPrefer) const
296  {
297  return onFailed(lambda::CallableOnce<void(const std::string&)>(
299  [](typename std::decay<F>::type&& f, const std::string&) mutable {
300  std::move(f)();
301  },
302  std::forward<F>(f),
303  lambda::_1)));
304  }
305 
306  template <typename F, typename = typename result_of<F(const Future<T>&)>::type> // NOLINT(whitespace/line_length)
307  const Future<T>& onAny(F&& f, Prefer) const
308  {
309  return onAny(lambda::CallableOnce<void(const Future<T>&)>(
311  [](typename std::decay<F>::type&& f, const Future<T>& future) {
312  std::move(f)(future);
313  },
314  std::forward<F>(f),
315  lambda::_1)));
316  }
317 
318  // Refer to the less preferred version of `onReady` for why these SFINAE
319  // conditions are necessary.
320  template <
321  typename F,
322  typename = typename result_of<typename std::enable_if<
324  F>::type()>::type>
325  const Future<T>& onAny(F&& f, LessPrefer) const
326  {
327  return onAny(lambda::CallableOnce<void(const Future<T>&)>(
329  [](typename std::decay<F>::type&& f, const Future<T>&) {
330  std::move(f)();
331  },
332  std::forward<F>(f),
333  lambda::_1)));
334  }
335 
336 public:
337  template <typename F>
338  const Future<T>& onAbandoned(F&& f) const
339  {
340  return onAbandoned(lambda::CallableOnce<void()>(
342  [](typename std::decay<F>::type&& f) {
343  std::move(f)();
344  },
345  std::forward<F>(f))));
346  }
347 
348  template <typename F>
349  const Future<T>& onDiscard(F&& f) const
350  {
351  return onDiscard(lambda::CallableOnce<void()>(
353  [](typename std::decay<F>::type&& f) {
354  std::move(f)();
355  },
356  std::forward<F>(f))));
357  }
358 
359  template <typename F>
360  const Future<T>& onReady(F&& f) const
361  {
362  return onReady(std::forward<F>(f), Prefer());
363  }
364 
365  template <typename F>
366  const Future<T>& onFailed(F&& f) const
367  {
368  return onFailed(std::forward<F>(f), Prefer());
369  }
370 
371  template <typename F>
372  const Future<T>& onDiscarded(F&& f) const
373  {
374  return onDiscarded(lambda::CallableOnce<void()>(
376  [](typename std::decay<F>::type&& f) {
377  std::move(f)();
378  },
379  std::forward<F>(f))));
380  }
381 
382  template <typename F>
383  const Future<T>& onAny(F&& f) const
384  {
385  return onAny(std::forward<F>(f), Prefer());
386  }
387 
388  // Installs callbacks that get executed when this future is ready
389  // and associates the result of the callback with the future that is
390  // returned to the caller (which may be of a different type).
391  template <typename X>
392  Future<X> then(lambda::CallableOnce<Future<X>(const T&)> f) const;
393 
394  template <typename X>
395  Future<X> then(lambda::CallableOnce<X(const T&)> f) const;
396 
397  template <typename X>
399  {
400  return then(lambda::CallableOnce<Future<X>(const T&)>(
401  lambda::partial(std::move(f))));
402  }
403 
404  template <typename X>
406  {
407  return then(lambda::CallableOnce<X(const T&)>(
408  lambda::partial(std::move(f))));
409  }
410 
411 private:
412  template <
413  typename F,
414  typename X =
416  Future<X> then(_Deferred<F>&& f, Prefer) const
417  {
418  // note the then<X> is necessary to not have an infinite loop with
419  // then(F&& f)
420  return then<X>(
421  std::move(f).operator lambda::CallableOnce<Future<X>(const T&)>());
422  }
423 
424  // Refer to the less preferred version of `onReady` for why these SFINAE
425  // conditions are necessary.
426  template <
427  typename F,
428  typename X = typename internal::unwrap<
429  typename result_of<typename std::enable_if<
431  F>::type()>::type>::type>
432  Future<X> then(_Deferred<F>&& f, LessPrefer) const
433  {
434  return then<X>(std::move(f).operator lambda::CallableOnce<Future<X>()>());
435  }
436 
438  Future<X> then(F&& f, Prefer) const
439  {
440  return then<X>(
441  lambda::CallableOnce<Future<X>(const T&)>(std::forward<F>(f)));
442  }
443 
444  // Refer to the less preferred version of `onReady` for why these SFINAE
445  // conditions are necessary.
446  template <
447  typename F,
448  typename X = typename internal::unwrap<
449  typename result_of<typename std::enable_if<
451  F>::type()>::type>::type>
452  Future<X> then(F&& f, LessPrefer) const
453  {
454  return then<X>(lambda::CallableOnce<Future<X>()>(std::forward<F>(f)));
455  }
456 
457 public:
458  // NOTE: There are two bugs we're dealing with here.
459  // (1) GCC bug where the explicit use of `this->` is required in the
460  // trailing return type: gcc.gnu.org/bugzilla/show_bug.cgi?id=57543
461  // (2) VS 2017 RC bug where the explicit use of `this->` is disallowed.
462  //
463  // Since VS 2015 and 2017 RC both implement C++14's deduced return type for
464  // functions, we simply choose to use that on Windows.
465  //
466  // TODO(mpark): Remove the trailing return type once we get to C++14.
467  template <typename F>
468  auto then(F&& f) const
469 #ifndef __WINDOWS__
470  -> decltype(this->then(std::forward<F>(f), Prefer()))
471 #endif // __WINDOWS__
472  {
473  return then(std::forward<F>(f), Prefer());
474  }
475 
476  // Installs callbacks that get executed if this future is abandoned,
477  // is discarded, or failed.
478  template <typename F>
479  Future<T> recover(F&& f) const;
480 
481  template <typename F>
482  Future<T> recover(_Deferred<F>&& deferred) const
483  {
484  return recover(
485  std::move(deferred)
486  .operator lambda::CallableOnce<Future<T>(const Future<T>&)>());
487  }
488 
489  // TODO(benh): Considering adding a `rescue` function for rescuing
490  // abandoned futures.
491 
492  // Installs callbacks that get executed if this future completes
493  // because it failed.
495  lambda::CallableOnce<Future<T>(const Future<T>&)> f) const;
496 
497  // TODO(benh): Add overloads of 'repair' that don't require passing
498  // in a function that takes the 'const Future<T>&' parameter and use
499  // Prefer/LessPrefer to disambiguate.
500 
501  // Invokes the specified function after some duration if this future
502  // has not been completed (set, failed, or discarded). Note that
503  // this function is agnostic of discard semantics and while it will
504  // propagate discarding "up the chain" it will still invoke the
505  // specified callback after the specified duration even if 'discard'
506  // was called on the returned future.
508  const Duration& duration,
509  lambda::CallableOnce<Future<T>(const Future<T>&)> f) const;
510 
511  // TODO(benh): Add overloads of 'after' that don't require passing
512  // in a function that takes the 'const Future<T>&' parameter and use
513  // Prefer/LessPrefer to disambiguate.
514 
515 private:
516  template <typename U>
517  friend class Future;
518  friend class Promise<T>;
519  friend class WeakFuture<T>;
520  template <typename U>
521  friend std::ostream& operator<<(std::ostream&, const Future<U>&);
522 
523  enum State
524  {
525  PENDING,
526  READY,
527  FAILED,
528  DISCARDED,
529  };
530 
531  struct Data
532  {
533  Data();
534  ~Data() = default;
535 
536  void clearAllCallbacks();
537 
538  std::atomic_flag lock = ATOMIC_FLAG_INIT;
539  State state;
540  bool discard;
541  bool associated;
542  bool abandoned;
543 
544  // One of:
545  // 1. None, the state is PENDING or DISCARDED.
546  // 2. Some, the state is READY.
547  // 3. Error, the state is FAILED; 'error()' stores the message.
548  Result<T> result;
549 
550  std::vector<AbandonedCallback> onAbandonedCallbacks;
551  std::vector<DiscardCallback> onDiscardCallbacks;
552  std::vector<ReadyCallback> onReadyCallbacks;
553  std::vector<FailedCallback> onFailedCallbacks;
554  std::vector<DiscardedCallback> onDiscardedCallbacks;
555  std::vector<AnyCallback> onAnyCallbacks;
556  };
557 
558  // Abandons this future. Returns false if the future is already
559  // associated or no longer pending. Otherwise returns true and any
560  // Future::onAbandoned callbacks wil be run.
561  //
562  // If `propagating` is true then we'll abandon this future even if
563  // it has already been associated. This is important because
564  // `~Promise()` will try and abandon and we need to ignore that if
565  // the future has been associated since the promise will no longer
566  // be setting the future anyway (and is likely the reason it's being
567  // destructed, because it's useless). When the future that we've
568  // associated with gets abandoned, however, then we need to actually
569  // abandon this future too. Here's an example of this:
570  //
571  // 1: Owned<Promise<int>> promise1(new Promise<int>());
572  // 2: Owned<Promise<int>> promise2(new Promise<int>());
573  // 3:
574  // 4: Future<int> future1 = promise1->future();
575  // 5: Future<int> future2 = promise2->future();
576  // 6:
577  // 7: promise1->associate(future2);
578  // 8:
579  // 9: promise1.reset();
580  // 10:
581  // 11: assert(!future1.isAbandoned());
582  // 12:
583  // 13: promise2.reset();
584  // 14:
585  // 15: assert(future2.isAbandoned());
586  // 16: assert(future3.isAbandoned());
587  //
588  // At line 9 `~Promise()` will attempt to abandon the future by
589  // calling `abandon()` but since it's been associated we won't do
590  // anything. On line 13 the `onAbandoned()` callback will call
591  // `abandon(true)` and know we'll actually abandon the future
592  // because we're _propagating_ the abandon from the associated
593  // future.
594  //
595  // NOTE: this is an _INTERNAL_ function and should never be exposed
596  // or used outside of the implementation.
597  bool abandon(bool propagating = false);
598 
599  // Sets the value for this future, unless the future is already set,
600  // failed, or discarded, in which case it returns false.
601  bool set(const T& _t);
602  bool set(T&& _t);
603 
604  template <typename U>
605  bool _set(U&& _u);
606 
607  // Sets this future as failed, unless the future is already set,
608  // failed, or discarded, in which case it returns false.
609  bool fail(const std::string& _message);
610 
611  std::shared_ptr<Data> data;
612 };
613 
614 
615 namespace internal {
616 
617 // Helper for executing callbacks that have been registered.
618 //
619 // TODO(*): Invoke callbacks in another execution context.
620 template <typename C, typename... Arguments>
621 void run(std::vector<C>&& callbacks, Arguments&&... arguments)
622 {
623  for (size_t i = 0; i < callbacks.size(); ++i) {
624  std::move(callbacks[i])(std::forward<Arguments>(arguments)...);
625  }
626 }
627 
628 } // namespace internal {
629 
630 
631 // Represents a weak reference to a future. This class is used to
632 // break cyclic dependencies between futures.
633 template <typename T>
634 class WeakFuture
635 {
636 public:
637  explicit WeakFuture(const Future<T>& future);
638 
639  // Converts this weak reference to a concrete future. Returns none
640  // if the conversion is not successful.
641  Option<Future<T>> get() const;
642 
643 private:
644  std::weak_ptr<typename Future<T>::Data> data;
645 };
646 
647 
648 template <typename T>
650  : data(future.data) {}
651 
652 
653 template <typename T>
655 {
656  Future<T> future;
657  future.data = data.lock();
658 
659  if (future.data) {
660  return future;
661  }
662 
663  return None();
664 }
665 
666 
667 // Helper for creating failed futures.
668 struct Failure
669 {
670  explicit Failure(const std::string& _message) : message(_message) {}
671  explicit Failure(const Error& error) : message(error.message) {}
672 
673  const std::string message;
674 };
675 
676 
677 struct ErrnoFailure : public Failure
678 {
680 
681  explicit ErrnoFailure(int _code)
682  : Failure(os::strerror(_code)), code(_code) {}
683 
684  explicit ErrnoFailure(const std::string& message)
685  : ErrnoFailure(errno, message) {}
686 
687  ErrnoFailure(int _code, const std::string& message)
688  : Failure(message + ": " + os::strerror(_code)), code(_code) {}
689 
690  const int code;
691 };
692 
693 
694 // Forward declaration to use as friend below.
695 namespace internal {
696 template <typename U>
697 void discarded(Future<U> future);
698 } // namespace internal {
699 
700 
701 // TODO(benh): Make Promise a subclass of Future?
702 template <typename T>
703 class Promise
704 {
705 public:
706  Promise();
707  virtual ~Promise();
708 
709  explicit Promise(const T& t);
710 
711  Promise(Promise&& that) = default;
712  Promise& operator=(Promise&&) = default;
713 
714  // Not copyable, not assignable.
715  Promise(const Promise& that) = delete;
716  Promise& operator=(const Promise&) = delete;
717 
718  bool discard();
719  bool set(const T& _t);
720  bool set(T&& _t);
721  bool set(const Future<T>& future); // Alias for associate.
722  bool associate(const Future<T>& future);
723  bool fail(const std::string& message);
724 
725  // Returns a copy of the future associated with this promise.
726  Future<T> future() const;
727 
728 private:
729  template <typename U>
730  friend class Future;
731  template <typename U>
732  friend void internal::discarded(Future<U> future);
733 
734  template <typename U>
735  bool _set(U&& u);
736 
737  // Helper for doing the work of actually discarding a future (called
738  // from Promise::discard as well as internal::discarded).
739  static bool discard(Future<T> future);
740 
741  Future<T> f;
742 };
743 
744 
745 template <>
746 class Promise<void>;
747 
748 
749 template <typename T>
750 class Promise<T&>;
751 
752 
753 namespace internal {
754 
755 // Discards a weak future. If the weak future is invalid (i.e., the
756 // future it references to has already been destroyed), this operation
757 // is treated as a no-op.
758 template <typename T>
759 void discard(WeakFuture<T> reference)
760 {
761  Option<Future<T>> future = reference.get();
762  if (future.isSome()) {
763  Future<T> future_ = future.get();
764  future_.discard();
765  }
766 }
767 
768 
769 // Helper for invoking Promise::discard in an onDiscarded callback
770 // (since the onDiscarded callback requires returning void we can't
771 // bind with Promise::discard).
772 template <typename T>
773 void discarded(Future<T> future)
774 {
775  Promise<T>::discard(future);
776 }
777 
778 } // namespace internal {
779 
780 
781 template <typename T>
783 {
784  // Need to "unset" `abandoned` since it gets set in the empty
785  // constructor for `Future`.
786  f.data->abandoned = false;
787 }
788 
789 
790 template <typename T>
792  : f(t) {}
793 
794 
795 template <typename T>
797 {
798  // Note that we don't discard the promise as we don't want to give
799  // the illusion that any computation hasn't started (or possibly
800  // finished) in the event that computation is "visible" by other
801  // means. However, we try and abandon the future if it hasn't been
802  // associated or set (or moved, i.e., `f.data` is true).
803  if (f.data) {
804  f.abandon();
805  }
806 }
807 
808 
809 template <typename T>
811 {
812  if (!f.data->associated) {
813  return discard(f);
814  }
815  return false;
816 }
817 
818 
819 template <typename T>
820 bool Promise<T>::set(T&& t)
821 {
822  return _set(std::move(t));
823 }
824 
825 
826 template <typename T>
827 bool Promise<T>::set(const T& t)
828 {
829  return _set(t);
830 }
831 
832 
833 template <typename T>
834 template <typename U>
835 bool Promise<T>::_set(U&& u)
836 {
837  if (!f.data->associated) {
838  return f.set(std::forward<U>(u));
839  }
840  return false;
841 }
842 
843 
844 template <typename T>
846 {
847  return associate(future);
848 }
849 
850 
851 template <typename T>
853 {
854  bool associated = false;
855 
856  synchronized (f.data->lock) {
857  // Don't associate if this promise has completed. Note that this
858  // does not include if Future::discard was called on this future
859  // since in that case that would still leave the future PENDING
860  // (note that we cover that case below).
861  if (f.data->state == Future<T>::PENDING && !f.data->associated) {
862  associated = f.data->associated = true;
863 
864  // After this point we don't allow 'f' to be completed via the
865  // promise since we've set 'associated' but Future::discard on
866  // 'f' might get called which will get propagated via the
867  // 'f.onDiscard' below. Note that we currently don't propagate a
868  // discard from 'future.onDiscard' but these semantics might
869  // change if/when we make 'f' and 'future' true aliases of one
870  // another.
871  }
872  }
873 
874  // Note that we do the actual associating after releasing the lock
875  // above to avoid deadlocking by attempting to require the lock
876  // within from invoking 'f.onDiscard' and/or 'f.set/fail' via the
877  // bind statements from doing 'future.onReady/onFailed'.
878  if (associated) {
879  // TODO(jieyu): Make 'f' a true alias of 'future'. Currently, only
880  // 'discard' is associated in both directions. In other words, if
881  // a future gets discarded, the other future will also get
882  // discarded. For 'set' and 'fail', they are associated only in
883  // one direction. In other words, calling 'set' or 'fail' on this
884  // promise will not affect the result of the future that we
885  // associated.
886  f.onDiscard(lambda::bind(&internal::discard<T>, WeakFuture<T>(future)));
887 
888  // Need to disambiguate for the compiler.
889  bool (Future<T>::*set)(const T&) = &Future<T>::set;
890 
891  future
892  .onReady(lambda::bind(set, f, lambda::_1))
893  .onFailed(lambda::bind(&Future<T>::fail, f, lambda::_1))
894  .onDiscarded(lambda::bind(&internal::discarded<T>, f))
895  .onAbandoned(lambda::bind(&Future<T>::abandon, f, true));
896  }
897 
898  return associated;
899 }
900 
901 
902 template <typename T>
903 bool Promise<T>::fail(const std::string& message)
904 {
905  if (!f.data->associated) {
906  return f.fail(message);
907  }
908  return false;
909 }
910 
911 
912 template <typename T>
914 {
915  return f;
916 }
917 
918 
919 // Internal helper utilities.
920 namespace internal {
921 
922 template <typename T>
923 struct wrap
924 {
925  typedef Future<T> type;
926 };
927 
928 
929 template <typename X>
930 struct wrap<Future<X>>
931 {
932  typedef Future<X> type;
933 };
934 
935 
936 template <typename T>
937 struct unwrap
938 {
939  typedef T type;
940 };
941 
942 
943 template <typename X>
944 struct unwrap<Future<X>>
945 {
946  typedef X type;
947 };
948 
949 
950 template <typename T>
951 void select(
952  const Future<T>& future,
953  std::shared_ptr<Promise<Future<T>>> promise)
954 {
955  // We never fail the future associated with our promise.
956  assert(!promise->future().isFailed());
957 
958  if (promise->future().isPending()) { // No-op if it's discarded.
959  if (future.isReady()) { // We only set the promise if a future is ready.
960  promise->set(future);
961  }
962  }
963 }
964 
965 } // namespace internal {
966 
967 
968 // TODO(benh): Move select and discard into 'futures' namespace.
969 
970 // Returns a future that captures any ready future in a set. Note that
971 // select DOES NOT capture a future that has failed or been discarded.
972 template <typename T>
974 {
975  std::shared_ptr<Promise<Future<T>>> promise(new Promise<Future<T>>());
976 
977  promise->future().onDiscard(
978  lambda::bind(&internal::discarded<Future<T>>, promise->future()));
979 
980  foreach (const Future<T>& future, futures) {
981  future.onAny([=](const Future<T>& f) {
982  internal::select(f, promise);
983  });
984  }
985 
986  return promise->future();
987 }
988 
989 
990 template <typename Futures>
991 void discard(const Futures& futures)
992 {
993  foreach (auto future, futures) { // Need a non-const copy to discard.
994  future.discard();
995  }
996 }
997 
998 
999 template <typename T>
1001 {
1002  bool result = false;
1003 
1004  synchronized (future.data->lock) {
1005  if (future.data->state == Future<T>::PENDING) {
1006  future.data->state = Future<T>::DISCARDED;
1007  result = true;
1008  }
1009  }
1010 
1011  // Invoke all callbacks associated with this future being
1012  // DISCARDED. We don't need a lock because the state is now in
1013  // DISCARDED so there should not be any concurrent modifications to
1014  // the callbacks.
1015  if (result) {
1016  // NOTE: we rely on the fact that we have `future` to protect
1017  // ourselves from one of the callbacks erroneously deleting the
1018  // future. In `Future::_set()` and `Future::fail()` we have to
1019  // explicitly take a copy to protect ourselves.
1020  internal::run(std::move(future.data->onDiscardedCallbacks));
1021  internal::run(std::move(future.data->onAnyCallbacks), future);
1022 
1023  future.data->clearAllCallbacks();
1024  }
1025 
1026  return result;
1027 }
1028 
1029 
1030 template <typename T>
1031 Future<T> Future<T>::failed(const std::string& message)
1032 {
1033  Future<T> future;
1034  future.fail(message);
1035  return future;
1036 }
1037 
1038 
1039 template <typename T>
1041  : state(PENDING),
1042  discard(false),
1043  associated(false),
1044  abandoned(false),
1045  result(None()) {}
1046 
1047 
1048 template <typename T>
1050 {
1051  onAbandonedCallbacks.clear();
1052  onAnyCallbacks.clear();
1053  onDiscardCallbacks.clear();
1054  onDiscardedCallbacks.clear();
1055  onFailedCallbacks.clear();
1056  onReadyCallbacks.clear();
1057 }
1058 
1059 
1060 template <typename T>
1062  : data(new Data())
1063 {
1064  data->abandoned = true;
1065 }
1066 
1067 
1068 template <typename T>
1069 Future<T>::Future(const T& _t)
1070  : data(new Data())
1071 {
1072  set(_t);
1073 }
1074 
1075 
1076 template <typename T>
1078  : data(new Data())
1079 {
1080  set(std::move(_t));
1081 }
1082 
1083 
1084 template <typename T>
1085 template <typename U>
1087  : data(new Data())
1088 {
1089  set(u);
1090 }
1091 
1092 
1093 template <typename T>
1095  : data(new Data())
1096 {
1097  fail(failure.message);
1098 }
1099 
1100 
1101 template <typename T>
1103  : data(new Data())
1104 {
1105  fail(failure.message);
1106 }
1107 
1108 
1109 template <typename T>
1110 template <typename E>
1112  : data(new Data())
1113 {
1114  if (t.isSome()){
1115  set(t.get());
1116  } else {
1117  // TODO(chhsiao): Consider preserving the error type. See MESOS-8925.
1118  fail(stringify(t.error()));
1119  }
1120 }
1121 
1122 
1123 template <typename T>
1124 template <typename E>
1126  : data(t.isSome() ? t->data : std::shared_ptr<Data>(new Data()))
1127 {
1128  if (!t.isSome()) {
1129  // TODO(chhsiao): Consider preserving the error type. See MESOS-8925.
1130  fail(stringify(t.error()));
1131  }
1132 }
1133 
1134 
1135 template <typename T>
1136 bool Future<T>::operator==(const Future<T>& that) const
1137 {
1138  return data == that.data;
1139 }
1140 
1141 
1142 template <typename T>
1143 bool Future<T>::operator!=(const Future<T>& that) const
1144 {
1145  return !(*this == that);
1146 }
1147 
1148 
1149 template <typename T>
1150 bool Future<T>::operator<(const Future<T>& that) const
1151 {
1152  return data < that.data;
1153 }
1154 
1155 
1156 template <typename T>
1158 {
1159  bool result = false;
1160 
1161  std::vector<DiscardCallback> callbacks;
1162  synchronized (data->lock) {
1163  if (!data->discard && data->state == PENDING) {
1164  result = data->discard = true;
1165 
1166  callbacks.swap(data->onDiscardCallbacks);
1167  }
1168  }
1169 
1170  // Invoke all callbacks associated with doing a discard on this
1171  // future. The callbacks get destroyed when we exit from the
1172  // function.
1173  if (result) {
1174  internal::run(std::move(callbacks));
1175  }
1176 
1177  return result;
1178 }
1179 
1180 
1181 template <typename T>
1182 bool Future<T>::abandon(bool propagating)
1183 {
1184  bool result = false;
1185 
1186  std::vector<AbandonedCallback> callbacks;
1187  synchronized (data->lock) {
1188  if (!data->abandoned &&
1189  data->state == PENDING &&
1190  (!data->associated || propagating)) {
1191  result = data->abandoned = true;
1192 
1193  callbacks.swap(data->onAbandonedCallbacks);
1194  }
1195  }
1196 
1197  // Invoke all callbacks. The callbacks get destroyed when we exit
1198  // from the function.
1199  if (result) {
1200  internal::run(std::move(callbacks));
1201  }
1202 
1203  return result;
1204 }
1205 
1206 
1207 template <typename T>
1209 {
1210  return data->state == PENDING;
1211 }
1212 
1213 
1214 template <typename T>
1216 {
1217  return data->state == READY;
1218 }
1219 
1220 
1221 template <typename T>
1223 {
1224  return data->state == DISCARDED;
1225 }
1226 
1227 
1228 template <typename T>
1230 {
1231  return data->state == FAILED;
1232 }
1233 
1234 
1235 template <typename T>
1237 {
1238  return data->abandoned;
1239 }
1240 
1241 
1242 template <typename T>
1244 {
1245  return data->discard;
1246 }
1247 
1248 
1249 namespace internal {
1250 
1251 inline void awaited(Owned<Latch> latch)
1252 {
1253  latch->trigger();
1254 }
1255 
1256 } // namespace internal {
1257 
1258 
1259 template <typename T>
1260 bool Future<T>::await(const Duration& duration) const
1261 {
1262  // NOTE: We need to preemptively allocate the Latch on the stack
1263  // instead of lazily create it in the critical section below because
1264  // instantiating a Latch requires creating a new process (at the
1265  // time of writing this comment) which might need to do some
1266  // synchronization in libprocess which might deadlock if some other
1267  // code in libprocess is already holding a lock and then attempts to
1268  // do Promise::set (or something similar) that attempts to acquire
1269  // the lock that we acquire here. This is an artifact of using
1270  // Future/Promise within the implementation of libprocess.
1271  //
1272  // We mostly only call 'await' in tests so this should not be a
1273  // performance concern.
1274  Owned<Latch> latch(new Latch());
1275 
1276  bool pending = false;
1277 
1278  synchronized (data->lock) {
1279  if (data->state == PENDING) {
1280  pending = true;
1281  data->onAnyCallbacks.push_back(lambda::bind(&internal::awaited, latch));
1282  }
1283  }
1284 
1285  if (pending) {
1286  return latch->await(duration);
1287  }
1288 
1289  return true;
1290 }
1291 
1292 
1293 template <typename T>
1294 const T& Future<T>::get() const
1295 {
1296  if (!isReady()) {
1297  await();
1298  }
1299 
1300  CHECK(!isPending()) << "Future was in PENDING after await()";
1301  // We can't use CHECK_READY here due to check.hpp depending on future.hpp.
1302  if (!isReady()) {
1303  CHECK(!isFailed()) << "Future::get() but state == FAILED: " << failure();
1304  CHECK(!isDiscarded()) << "Future::get() but state == DISCARDED";
1305  }
1306 
1307  assert(data->result.isSome());
1308  return data->result.get();
1309 }
1310 
1311 
1312 template <typename T>
1313 const T* Future<T>::operator->() const
1314 {
1315  return &get();
1316 }
1317 
1318 
1319 template <typename T>
1320 const std::string& Future<T>::failure() const
1321 {
1322  if (data->state != FAILED) {
1323  ABORT("Future::failure() but state != FAILED");
1324  }
1325 
1326  CHECK_ERROR(data->result);
1327  return data->result.error();
1328 }
1329 
1330 
1331 template <typename T>
1332 const Future<T>& Future<T>::onAbandoned(AbandonedCallback&& callback) const
1333 {
1334  bool run = false;
1335 
1336  synchronized (data->lock) {
1337  if (data->abandoned) {
1338  run = true;
1339  } else if (data->state == PENDING) {
1340  data->onAbandonedCallbacks.emplace_back(std::move(callback));
1341  }
1342  }
1343 
1344  // TODO(*): Invoke callback in another execution context.
1345  if (run) {
1346  std::move(callback)(); // NOLINT(misc-use-after-move)
1347  }
1348 
1349  return *this;
1350 }
1351 
1352 
1353 template <typename T>
1354 const Future<T>& Future<T>::onDiscard(DiscardCallback&& callback) const
1355 {
1356  bool run = false;
1357 
1358  synchronized (data->lock) {
1359  if (data->discard) {
1360  run = true;
1361  } else if (data->state == PENDING) {
1362  data->onDiscardCallbacks.emplace_back(std::move(callback));
1363  }
1364  }
1365 
1366  // TODO(*): Invoke callback in another execution context.
1367  if (run) {
1368  std::move(callback)(); // NOLINT(misc-use-after-move)
1369  }
1370 
1371  return *this;
1372 }
1373 
1374 
1375 template <typename T>
1376 const Future<T>& Future<T>::onReady(ReadyCallback&& callback) const
1377 {
1378  bool run = false;
1379 
1380  synchronized (data->lock) {
1381  if (data->state == READY) {
1382  run = true;
1383  } else if (data->state == PENDING) {
1384  data->onReadyCallbacks.emplace_back(std::move(callback));
1385  }
1386  }
1387 
1388  // TODO(*): Invoke callback in another execution context.
1389  if (run) {
1390  std::move(callback)(data->result.get()); // NOLINT(misc-use-after-move)
1391  }
1392 
1393  return *this;
1394 }
1395 
1396 
1397 template <typename T>
1398 const Future<T>& Future<T>::onFailed(FailedCallback&& callback) const
1399 {
1400  bool run = false;
1401 
1402  synchronized (data->lock) {
1403  if (data->state == FAILED) {
1404  run = true;
1405  } else if (data->state == PENDING) {
1406  data->onFailedCallbacks.emplace_back(std::move(callback));
1407  }
1408  }
1409 
1410  // TODO(*): Invoke callback in another execution context.
1411  if (run) {
1412  std::move(callback)(data->result.error()); // NOLINT(misc-use-after-move)
1413  }
1414 
1415  return *this;
1416 }
1417 
1418 
1419 template <typename T>
1420 const Future<T>& Future<T>::onDiscarded(DiscardedCallback&& callback) const
1421 {
1422  bool run = false;
1423 
1424  synchronized (data->lock) {
1425  if (data->state == DISCARDED) {
1426  run = true;
1427  } else if (data->state == PENDING) {
1428  data->onDiscardedCallbacks.emplace_back(std::move(callback));
1429  }
1430  }
1431 
1432  // TODO(*): Invoke callback in another execution context.
1433  if (run) {
1434  std::move(callback)(); // NOLINT(misc-use-after-move)
1435  }
1436 
1437  return *this;
1438 }
1439 
1440 
1441 template <typename T>
1442 const Future<T>& Future<T>::onAny(AnyCallback&& callback) const
1443 {
1444  bool run = false;
1445 
1446  synchronized (data->lock) {
1447  if (data->state == PENDING) {
1448  data->onAnyCallbacks.emplace_back(std::move(callback));
1449  } else {
1450  run = true;
1451  }
1452  }
1453 
1454  // TODO(*): Invoke callback in another execution context.
1455  if (run) {
1456  std::move(callback)(*this); // NOLINT(misc-use-after-move)
1457  }
1458 
1459  return *this;
1460 }
1461 
1462 namespace internal {
1463 
1464 // NOTE: We need to name this 'thenf' versus 'then' to distinguish it
1465 // from the function 'then' whose parameter 'f' doesn't return a
1466 // Future since the compiler can't properly infer otherwise.
1467 template <typename T, typename X>
1469  std::unique_ptr<Promise<X>> promise,
1470  const Future<T>& future)
1471 {
1472  if (future.isReady()) {
1473  if (future.hasDiscard()) {
1474  promise->discard();
1475  } else {
1476  promise->associate(std::move(f)(future.get()));
1477  }
1478  } else if (future.isFailed()) {
1479  promise->fail(future.failure());
1480  } else if (future.isDiscarded()) {
1481  promise->discard();
1482  }
1483 }
1484 
1485 
1486 template <typename T, typename X>
1487 void then(lambda::CallableOnce<X(const T&)>&& f,
1488  std::unique_ptr<Promise<X>> promise,
1489  const Future<T>& future)
1490 {
1491  if (future.isReady()) {
1492  if (future.hasDiscard()) {
1493  promise->discard();
1494  } else {
1495  promise->set(std::move(f)(future.get()));
1496  }
1497  } else if (future.isFailed()) {
1498  promise->fail(future.failure());
1499  } else if (future.isDiscarded()) {
1500  promise->discard();
1501  }
1502 }
1503 
1504 
1505 template <typename T>
1506 void repair(
1508  std::unique_ptr<Promise<T>> promise,
1509  const Future<T>& future)
1510 {
1511  CHECK(!future.isPending());
1512  if (future.isFailed()) {
1513  promise->associate(std::move(f)(future));
1514  } else {
1515  promise->associate(future);
1516  }
1517 }
1518 
1519 
1520 template <typename T>
1521 void expired(
1522  const std::shared_ptr<lambda::CallableOnce<Future<T>(const Future<T>&)>>& f,
1523  const std::shared_ptr<Latch>& latch,
1524  const std::shared_ptr<Promise<T>>& promise,
1525  const std::shared_ptr<Option<Timer>>& timer,
1526  const Future<T>& future)
1527 {
1528  if (latch->trigger()) {
1529  // If this callback executed first (i.e., we triggered the latch)
1530  // then we want to clear out the timer so that we don't hold a
1531  // circular reference to `future` in it's own `onAny`
1532  // callbacks. See the comment in `Future::after`.
1533  *timer = None();
1534 
1535  // Note that we don't bother checking if 'future' has been
1536  // discarded (i.e., 'future.isDiscarded()' returns true) since
1537  // there is a race between when we make that check and when we
1538  // would invoke 'f(future)' so the callee 'f' should ALWAYS check
1539  // if the future has been discarded and rather than hiding a
1540  // non-deterministic bug we always call 'f' if the timer has
1541  // expired.
1542  promise->associate(std::move(*f)(future));
1543  }
1544 }
1545 
1546 
1547 template <typename T>
1548 void after(
1549  const std::shared_ptr<Latch>& latch,
1550  const std::shared_ptr<Promise<T>>& promise,
1551  const std::shared_ptr<Option<Timer>>& timer,
1552  const Future<T>& future)
1553 {
1554  CHECK(!future.isPending());
1555  if (latch->trigger()) {
1556  // If this callback executes first (i.e., we triggered the latch)
1557  // it must be the case that `timer` is still some and we can try
1558  // and cancel the timer.
1559  CHECK_SOME(*timer);
1560  Clock::cancel(timer->get());
1561 
1562  // We also force the timer to get deallocated so that there isn't
1563  // a cicular reference of the timer with itself which keeps around
1564  // a reference to the original future.
1565  *timer = None();
1566 
1567  promise->associate(future);
1568  }
1569 }
1570 
1571 } // namespace internal {
1572 
1573 
1574 template <typename T>
1575 template <typename X>
1577 {
1578  std::unique_ptr<Promise<X>> promise(new Promise<X>());
1579  Future<X> future = promise->future();
1580 
1582  &internal::thenf<T, X>, std::move(f), std::move(promise), lambda::_1);
1583 
1584  onAny(std::move(thenf));
1585 
1586  onAbandoned([=]() mutable {
1587  future.abandon();
1588  });
1589 
1590  // Propagate discarding up the chain. To avoid cyclic dependencies,
1591  // we keep a weak future in the callback.
1592  future.onDiscard(lambda::bind(&internal::discard<T>, WeakFuture<T>(*this)));
1593 
1594  return future;
1595 }
1596 
1597 
1598 template <typename T>
1599 template <typename X>
1601 {
1602  std::unique_ptr<Promise<X>> promise(new Promise<X>());
1603  Future<X> future = promise->future();
1604 
1606  &internal::then<T, X>, std::move(f), std::move(promise), lambda::_1);
1607 
1608  onAny(std::move(then));
1609 
1610  onAbandoned([=]() mutable {
1611  future.abandon();
1612  });
1613 
1614  // Propagate discarding up the chain. To avoid cyclic dependencies,
1615  // we keep a weak future in the callback.
1616  future.onDiscard(lambda::bind(&internal::discard<T>, WeakFuture<T>(*this)));
1617 
1618  return future;
1619 }
1620 
1621 
1622 template <typename T>
1623 template <typename F>
1625 {
1626  std::shared_ptr<Promise<T>> promise(new Promise<T>());
1627 
1628  const Future<T> future = *this;
1629 
1630  typedef decltype(std::move(f)(future)) R;
1631 
1632  std::shared_ptr<lambda::CallableOnce<R(const Future<T>&)>> callable(
1633  new lambda::CallableOnce<R(const Future<T>&)>(std::move(f)));
1634 
1635  onAny([=]() {
1636  if (future.isDiscarded() || future.isFailed()) {
1637  // We reset `discard` so that if a future gets returned from
1638  // `f(future)` we won't immediately discard it! We still want to
1639  // let the future get discarded later, however, hence if it gets
1640  // set again in the future it'll propagate to the returned
1641  // future.
1642  synchronized (promise->f.data->lock) {
1643  promise->f.data->discard = false;
1644  }
1645 
1646  promise->set(std::move(*callable)(future));
1647  } else {
1648  promise->associate(future);
1649  }
1650  });
1651 
1652  onAbandoned([=]() {
1653  // See comment above for why we reset `discard` here.
1654  synchronized (promise->f.data->lock) {
1655  promise->f.data->discard = false;
1656  }
1657  promise->set(std::move(*callable)(future));
1658  });
1659 
1660  // Propagate discarding up the chain. To avoid cyclic dependencies,
1661  // we keep a weak future in the callback.
1662  promise->future().onDiscard(
1663  lambda::bind(&internal::discard<T>, WeakFuture<T>(*this)));
1664 
1665  return promise->future();
1666 }
1667 
1668 
1669 template <typename T>
1671  lambda::CallableOnce<Future<T>(const Future<T>&)> f) const
1672 {
1673  std::unique_ptr<Promise<T>> promise(new Promise<T>());
1674  Future<T> future = promise->future();
1675 
1677  &internal::repair<T>, std::move(f), std::move(promise), lambda::_1));
1678 
1679  onAbandoned([=]() mutable {
1680  future.abandon();
1681  });
1682 
1683  // Propagate discarding up the chain. To avoid cyclic dependencies,
1684  // we keep a weak future in the callback.
1685  future.onDiscard(lambda::bind(&internal::discard<T>, WeakFuture<T>(*this)));
1686 
1687  return future;
1688 }
1689 
1690 
1691 template <typename T>
1693  const Duration& duration,
1694  lambda::CallableOnce<Future<T>(const Future<T>&)> f) const
1695 {
1696  // TODO(benh): Using a Latch here but Once might be cleaner.
1697  // Unfortunately, Once depends on Future so we can't easily use it
1698  // from here.
1699  std::shared_ptr<Latch> latch(new Latch());
1700  std::shared_ptr<Promise<T>> promise(new Promise<T>());
1701 
1702  // We need to control the lifetime of the timer we create below so
1703  // that we can force the timer to get deallocated after it
1704  // expires. The reason we want to force the timer to get deallocated
1705  // after it expires is because the timer's lambda has a copy of
1706  // `this` (i.e., a Future) and it's stored in the `onAny` callbacks
1707  // of `this` thus creating a circular reference. By storing a
1708  // `shared_ptr<Option<Timer>>` we're able to set the option to none
1709  // after the timer expires which will deallocate our copy of the
1710  // timer and leave the `Option<Timer>` stored in the lambda of the
1711  // `onAny` callback as none. Note that this is safe because the
1712  // `Latch` makes sure that only one of the callbacks will manipulate
1713  // the `shared_ptr<Option<Timer>>` so there isn't any concurrency
1714  // issues we have to worry about.
1715  std::shared_ptr<Option<Timer>> timer(new Option<Timer>());
1716 
1717  typedef lambda::CallableOnce<Future<T>(const Future<T>&)> F;
1718  std::shared_ptr<F> callable(new F(std::move(f)));
1719 
1720  // Set up a timer to invoke the callback if this future has not
1721  // completed. Note that we do not pass a weak reference for this
1722  // future as we don't want the future to get cleaned up and then
1723  // have the timer expire because then we wouldn't have a valid
1724  // future that we could pass to `f`! The reference to `this` that is
1725  // captured in the timer will get removed by setting the
1726  // `Option<Timer>` to none (see comment above) either if the timer
1727  // expires or if `this` completes and we cancel the timer (see
1728  // `internal::expired` and `internal::after` callbacks for where we
1729  // force the deallocation of our copy of the timer).
1730  *timer = Clock::timer(
1731  duration,
1732  lambda::bind(&internal::expired<T>, callable, latch, promise, timer,
1733  *this));
1734 
1735  onAny(lambda::bind(&internal::after<T>, latch, promise, timer, lambda::_1));
1736 
1737  onAbandoned([=]() {
1738  promise->future().abandon();
1739  });
1740 
1741  // Propagate discarding up the chain. To avoid cyclic dependencies,
1742  // we keep a weak future in the callback.
1743  promise->future().onDiscard(
1744  lambda::bind(&internal::discard<T>, WeakFuture<T>(*this)));
1745 
1746  return promise->future();
1747 }
1748 
1749 
1750 template <typename T>
1751 bool Future<T>::set(T&& t)
1752 {
1753  return _set(std::move(t));
1754 }
1755 
1756 
1757 template <typename T>
1758 bool Future<T>::set(const T& t)
1759 {
1760  return _set(t);
1761 }
1762 
1763 
1764 template <typename T>
1765 template <typename U>
1766 bool Future<T>::_set(U&& u)
1767 {
1768  bool result = false;
1769 
1770  synchronized (data->lock) {
1771  if (data->state == PENDING) {
1772  data->result = std::forward<U>(u);
1773  data->state = READY;
1774  result = true;
1775  }
1776  }
1777 
1778  // Invoke all callbacks associated with this future being READY. We
1779  // don't need a lock because the state is now in READY so there
1780  // should not be any concurrent modifications to the callbacks.
1781  if (result) {
1782  // Grab a copy of `data` just in case invoking the callbacks
1783  // erroneously attempts to delete this future.
1784  std::shared_ptr<typename Future<T>::Data> copy = data;
1785  internal::run(std::move(copy->onReadyCallbacks), copy->result.get());
1786  internal::run(std::move(copy->onAnyCallbacks), *this);
1787 
1788  copy->clearAllCallbacks();
1789  }
1790 
1791  return result;
1792 }
1793 
1794 
1795 template <typename T>
1796 bool Future<T>::fail(const std::string& _message)
1797 {
1798  bool result = false;
1799 
1800  synchronized (data->lock) {
1801  if (data->state == PENDING) {
1802  data->result = Result<T>(Error(_message));
1803  data->state = FAILED;
1804  result = true;
1805  }
1806  }
1807 
1808  // Invoke all callbacks associated with this future being FAILED. We
1809  // don't need a lock because the state is now in FAILED so there
1810  // should not be any concurrent modifications to the callbacks.
1811  if (result) {
1812  // Grab a copy of `data` just in case invoking the callbacks
1813  // erroneously attempts to delete this future.
1814  std::shared_ptr<typename Future<T>::Data> copy = data;
1815  internal::run(std::move(copy->onFailedCallbacks), copy->result.error());
1816  internal::run(std::move(copy->onAnyCallbacks), *this);
1817 
1818  copy->clearAllCallbacks();
1819  }
1820 
1821  return result;
1822 }
1823 
1824 
1825 template <typename T>
1826 std::ostream& operator<<(std::ostream& stream, const Future<T>& future)
1827 {
1828  const std::string suffix = future.data->discard ? " (with discard)" : "";
1829 
1830  switch (future.data->state) {
1831  case Future<T>::PENDING:
1832  if (future.data->abandoned) {
1833  return stream << "Abandoned" << suffix;
1834  }
1835  return stream << "Pending" << suffix;
1836 
1837  case Future<T>::READY:
1838  // TODO(benh): Stringify `Future<T>::get()` if it can be
1839  // stringified (will need to be SFINAE'ed appropriately).
1840  return stream << "Ready" << suffix;
1841 
1842  case Future<T>::FAILED:
1843  return stream << "Failed" << suffix << ": " << future.failure();
1844 
1845  case Future<T>::DISCARDED:
1846  return stream << "Discarded" << suffix;
1847  }
1848 
1849  return stream;
1850 }
1851 
1852 
1853 // Helper for setting a set of Promises.
1854 template <typename T>
1855 void setPromises(std::set<Promise<T>*>* promises, const T& t)
1856 {
1857  foreach (Promise<T>* promise, *promises) {
1858  promise->set(t);
1859  delete promise;
1860  }
1861  promises->clear();
1862 }
1863 
1864 
1865 // Helper for failing a set of Promises.
1866 template <typename T>
1867 void failPromises(std::set<Promise<T>*>* promises, const std::string& failure)
1868 {
1869  foreach (Promise<T>* promise, *promises) {
1870  promise->fail(failure);
1871  delete promise;
1872  }
1873  promises->clear();
1874 }
1875 
1876 
1877 // Helper for discarding a set of Promises.
1878 template <typename T>
1880 {
1881  foreach (Promise<T>* promise, *promises) {
1882  promise->discard();
1883  delete promise;
1884  }
1885  promises->clear();
1886 }
1887 
1888 
1889 // Helper for discarding an individual promise in the set.
1890 template <typename T>
1891 void discardPromises(std::set<Promise<T>*>* promises, const Future<T>& future)
1892 {
1893  foreach (Promise<T>* promise, *promises) {
1894  if (promise->future() == future) {
1895  promise->discard();
1896  promises->erase(promise);
1897  delete promise;
1898  return;
1899  }
1900  }
1901 }
1902 
1903 
1904 // Returns a future that will not propagate a discard through to the
1905 // future passed in as an argument. This can be very valuable if you
1906 // want to block some future from getting discarded.
1907 //
1908 // Example:
1909 //
1910 // Promise<int> promise;
1911 // Future<int> future = undiscardable(promise.future());
1912 // future.discard();
1913 // assert(!promise.future().hasDiscard());
1914 //
1915 // Or another example, when chaining futures:
1916 //
1917 // Future<int> future = undiscardable(
1918 // foo()
1919 // .then([]() { ...; })
1920 // .then([]() { ...; }));
1921 //
1922 // This will guarantee that a discard _will not_ propagate to `foo()`
1923 // or any of the futures returned from the invocations of `.then()`.
1924 template <typename T>
1926 {
1927  std::unique_ptr<Promise<T>> promise(new Promise<T>());
1928  Future<T> future_ = promise->future();
1929  future.onAny(lambda::partial(
1930  [](std::unique_ptr<Promise<T>> promise, const Future<T>& future) {
1931  promise->associate(future);
1932  },
1933  std::move(promise),
1934  lambda::_1));
1935  return future_;
1936 }
1937 
1938 
1939 // Decorator that for some callable `f` invokes
1940 // `undiscardable(f(args))` for some `args`. This is used by the
1941 // overload of `undiscardable()` that takes callables instead of a
1942 // specialization of `Future`.
1943 //
1944 // TODO(benh): Factor out a generic decorator pattern to be used in
1945 // other circumstances, e.g., to replace `_Deferred`.
1946 template <typename F>
1948 {
1949  template <
1950  typename G,
1951  typename std::enable_if<
1952  std::is_constructible<F, G>::value, int>::type = 0>
1953  UndiscardableDecorator(G&& g) : f(std::forward<G>(g)) {}
1954 
1955  template <typename... Args>
1956  auto operator()(Args&&... args)
1957  -> decltype(std::declval<F&>()(std::forward<Args>(args)...))
1958  {
1959  using Result =
1960  typename std::decay<decltype(f(std::forward<Args>(args)...))>::type;
1961 
1962  static_assert(
1964  "Expecting Future<T> to be returned from undiscarded(...)");
1965 
1966  return undiscardable(f(std::forward<Args>(args)...));
1967  }
1968 
1969  F f;
1970 };
1971 
1972 
1973 // An overload of `undiscardable()` above that takes and returns a
1974 // callable. The returned callable has decorated the provided callable
1975 // `f` such that when the returned callable is invoked it will in turn
1976 // invoke `undiscardable(f(args))` for some `args`. See
1977 // `UndiscardableDecorator` above for more details.
1978 //
1979 // Example:
1980 //
1981 // Future<int> future = foo()
1982 // .then(undiscardable([]() { ...; }));
1983 //
1984 // This guarantees that even if `future` is discarded the discard will
1985 // not propagate into the lambda passed into `.then()`.
1986 template <
1987  typename F,
1988  typename std::enable_if<
1990  int>::type = 0>
1992 {
1993  return UndiscardableDecorator<
1994  typename std::decay<F>::type>(std::forward<F>(f));
1995 }
1996 
1997 } // namespace process {
1998 
1999 #endif // __PROCESS_FUTURE_HPP__
void select(const Future< T > &future, std::shared_ptr< Promise< Future< T >>> promise)
Definition: future.hpp:951
Protocol< RecoverRequest, RecoverResponse > recover
bool isReady() const
Definition: future.hpp:1215
WeakFuture(const Future< T > &future)
Definition: future.hpp:649
std::string strerror(int errno_)
A thread-safe version of strerror.
Definition: strerror.hpp:30
Definition: errorbase.hpp:36
Definition: option.hpp:29
#define ABORT(...)
Definition: abort.hpp:40
F && f
Definition: defer.hpp:270
const Future< T > & onAny(F &&f) const
Definition: future.hpp:383
lambda::CallableOnce< void()> DiscardedCallback
Definition: future.hpp:170
T & get()&
Definition: try.hpp:80
const T & get() const
Definition: future.hpp:1294
bool operator==(const std::string &s, const UPID::ID &id)
Definition: pid.hpp:226
bool set(const T &_t)
Definition: future.hpp:827
T type
Definition: future.hpp:939
Failure(const std::string &_message)
Definition: future.hpp:670
bool pending(int signal)
Definition: signals.hpp:50
Definition: check.hpp:33
Future< T > type
Definition: future.hpp:925
void discarded(Future< T > future)
Definition: future.hpp:773
bool fail(const std::string &message)
Definition: future.hpp:903
Definition: future.hpp:668
void awaited(Owned< Latch > latch)
Definition: future.hpp:1251
const Future< T > & onDiscarded(F &&f) const
Definition: future.hpp:372
ErrnoFailure(const std::string &message)
Definition: future.hpp:684
lambda::CallableOnce< void()> DiscardCallback
Definition: future.hpp:167
static bool cancel(const Timer &timer)
const Future< T > & onDiscard(DiscardCallback &&callback) const
Definition: future.hpp:1354
internal::Partial< typename std::decay< F >::type, typename std::decay< Args >::type... > partial(F &&f, Args &&...args)
Definition: lambda.hpp:364
bool await(const Duration &duration=Seconds(-1)) const
Definition: future.hpp:1260
X type
Definition: future.hpp:946
bool await(const process::Future< T > &future, const Duration &duration)
Definition: gtest.hpp:67
const Future< T > & onFailed(FailedCallback &&callback) const
Definition: future.hpp:1398
Definition: type_utils.hpp:619
const Future< T > & onFailed(F &&f) const
Definition: future.hpp:366
#define CHECK_ERROR(expression)
Definition: check.hpp:58
bool operator!=(const Future< T > &that) const
Definition: future.hpp:1143
void thenf(lambda::CallableOnce< Future< X >(const T &)> &&f, std::unique_ptr< Promise< X >> promise, const Future< T > &future)
Definition: future.hpp:1468
Definition: posix_signalhandler.hpp:23
bool discard()
Definition: future.hpp:1157
UndiscardableDecorator(G &&g)
Definition: future.hpp:1953
void expired(const std::shared_ptr< lambda::CallableOnce< Future< T >(const Future< T > &)>> &f, const std::shared_ptr< Latch > &latch, const std::shared_ptr< Promise< T >> &promise, const std::shared_ptr< Option< Timer >> &timer, const Future< T > &future)
Definition: future.hpp:1521
void failPromises(std::set< Promise< T > * > *promises, const std::string &failure)
Definition: future.hpp:1867
Future< T > recover(_Deferred< F > &&deferred) const
Definition: future.hpp:482
Definition: duration.hpp:32
Definition: check.hpp:30
bool isPending() const
Definition: future.hpp:1208
Definition: future.hpp:67
bool isSome() const
Definition: option.hpp:116
bool operator==(const Future< T > &that) const
Definition: future.hpp:1136
Definition: future.hpp:64
void after(const std::shared_ptr< Latch > &latch, const std::shared_ptr< Promise< T >> &promise, const std::shared_ptr< Option< Timer >> &timer, const Future< T > &future)
Definition: future.hpp:1548
#define CHECK_SOME(expression)
Definition: check.hpp:50
bool isDiscarded() const
Definition: future.hpp:1222
ErrnoFailure()
Definition: future.hpp:679
void discardPromises(std::set< Promise< T > * > *promises)
Definition: future.hpp:1879
const Future< T > & onDiscarded(DiscardedCallback &&callback) const
Definition: future.hpp:1420
const Future< T > & onFailed(_Deferred< F > &&deferred) const
Definition: future.hpp:207
const Future< T > & onDiscard(_Deferred< F > &&deferred) const
Definition: future.hpp:193
F f
Definition: future.hpp:1969
const Future< T > & onAny(AnyCallback &&callback) const
Definition: future.hpp:1442
Definition: deferred.hpp:64
lambda::CallableOnce< void(const T &)> ReadyCallback
Definition: future.hpp:168
Definition: duration.hpp:207
bool associate(const Future< T > &future)
Definition: future.hpp:852
void discard(WeakFuture< T > reference)
Definition: future.hpp:759
Future< Future< T > > select(const std::set< Future< T >> &futures)
Definition: future.hpp:973
Definition: traits.hpp:17
Definition: future.hpp:74
Future< X > then(lambda::CallableOnce< X()> f) const
Definition: future.hpp:405
const Future< T > & onReady(F &&f) const
Definition: future.hpp:360
const Future< T > & onAbandoned(AbandonedCallback &&callback) const
Definition: future.hpp:1332
void repair(lambda::CallableOnce< Future< T >(const Future< T > &)> &&f, std::unique_ptr< Promise< T >> promise, const Future< T > &future)
Definition: future.hpp:1506
bool isSome() const
Definition: try.hpp:77
Failure(const Error &error)
Definition: future.hpp:671
const Future< T > & onDiscard(F &&f) const
Definition: future.hpp:349
Definition: future.hpp:677
const T & get() const &
Definition: option.hpp:119
Protocol< PromiseRequest, PromiseResponse > promise
Future< X > type
Definition: future.hpp:932
Future< T > repair(lambda::CallableOnce< Future< T >(const Future< T > &)> f) const
Definition: future.hpp:1670
Definition: future.hpp:79
void discard(const Futures &futures)
Definition: future.hpp:991
static Try error(const E &e)
Definition: try.hpp:43
Future< R > run(R(*method)())
Definition: run.hpp:55
Future< X > then(lambda::CallableOnce< Future< X >(const T &)> f) const
Definition: future.hpp:1576
Future< T > undiscardable(const Future< T > &future)
Definition: future.hpp:1925
void run(std::vector< C > &&callbacks, Arguments &&...arguments)
Definition: future.hpp:621
lambda::CallableOnce< void()> AbandonedCallback
Definition: future.hpp:166
bool operator<(const Future< T > &that) const
Definition: future.hpp:1150
Future()
Definition: future.hpp:1061
Definition: none.hpp:27
Definition: attributes.hpp:24
bool operator!=(const std::string &s, const UPID::ID &id)
Definition: pid.hpp:232
Future< X > then(lambda::CallableOnce< Future< X >()> f) const
Definition: future.hpp:398
std::string error(const std::string &msg, uint32_t code)
lambda::CallableOnce< void(const std::string &)> FailedCallback
Definition: future.hpp:169
Definition: executor.hpp:48
void then(lambda::CallableOnce< X(const T &)> &&f, std::unique_ptr< Promise< X >> promise, const Future< T > &future)
Definition: future.hpp:1487
const int code
Definition: future.hpp:690
Promise()
Definition: future.hpp:782
Future< T > future() const
Definition: future.hpp:913
ErrnoFailure(int _code, const std::string &message)
Definition: future.hpp:687
const std::string message
Definition: future.hpp:673
auto then(F &&f) const -> decltype(this->then(std::forward< F >(f), Prefer()))
Definition: future.hpp:468
bool discard()
Definition: future.hpp:810
Try< uint32_t > type(const std::string &path)
Future< T > after(const Duration &duration, lambda::CallableOnce< Future< T >(const Future< T > &)> f) const
Definition: future.hpp:1692
const Future< T > & onAbandoned(F &&f) const
Definition: future.hpp:338
bool isAbandoned() const
Definition: future.hpp:1236
ErrnoFailure(int _code)
Definition: future.hpp:681
void discarded(Future< U > future)
const Future< T > & onAny(_Deferred< F > &&deferred) const
Definition: future.hpp:221
static Timer timer(const Duration &duration, const lambda::function< void()> &thunk)
const Future< T > & onDiscarded(_Deferred< F > &&deferred) const
Definition: future.hpp:214
Try< Nothing > bind(int_fd s, const Address &address)
Definition: network.hpp:46
const std::string & failure() const
Definition: future.hpp:1320
Option< Future< T > > get() const
Definition: future.hpp:654
std::string stringify(int flags)
const Future< T > & onReady(_Deferred< F > &&deferred) const
Definition: future.hpp:200
Definition: owned.hpp:36
Definition: future.hpp:1947
Definition: latch.hpp:24
static Future< T > failed(const std::string &message)
Definition: future.hpp:1031
bool hasDiscard() const
Definition: future.hpp:1243
T copy(const T &t)
Definition: utils.hpp:21
const Future< T > & onReady(ReadyCallback &&callback) const
Definition: future.hpp:1376
virtual ~Promise()
Definition: future.hpp:796
const Future< T > & onAbandoned(_Deferred< F > &&deferred) const
Definition: future.hpp:186
Definition: lambda.hpp:414
auto operator()(Args &&...args) -> decltype(std::declval< F & >()(std::forward< Args >(args)...))
Definition: future.hpp:1956
void setPromises(std::set< Promise< T > * > *promises, const T &t)
Definition: future.hpp:1855
const T * operator->() const
Definition: future.hpp:1313
bool isFailed() const
Definition: future.hpp:1229
Definition: future.hpp:58
Future< T > recover(F &&f) const
Definition: future.hpp:1624