Apache Mesos
future.hpp
Go to the documentation of this file.
1 // Licensed under the Apache License, Version 2.0 (the "License");
2 // you may not use this file except in compliance with the License.
3 // You may obtain a copy of the License at
4 //
5 // http://www.apache.org/licenses/LICENSE-2.0
6 //
7 // Unless required by applicable law or agreed to in writing, software
8 // distributed under the License is distributed on an "AS IS" BASIS,
9 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
10 // See the License for the specific language governing permissions and
11 // limitations under the License
12 
13 #ifndef __PROCESS_FUTURE_HPP__
14 #define __PROCESS_FUTURE_HPP__
15 
16 #include <assert.h>
17 
18 #include <atomic>
19 #include <list>
20 #include <memory> // TODO(benh): Replace shared_ptr with unique_ptr.
21 #include <ostream>
22 #include <set>
23 #include <type_traits>
24 #include <utility>
25 #include <vector>
26 
27 #include <glog/logging.h>
28 
29 #include <process/clock.hpp>
30 #include <process/latch.hpp>
31 #include <process/owned.hpp>
32 #include <process/pid.hpp>
33 #include <process/timer.hpp>
34 
35 #include <stout/abort.hpp>
36 #include <stout/check.hpp>
37 #include <stout/duration.hpp>
38 #include <stout/error.hpp>
39 #include <stout/lambda.hpp>
40 #include <stout/none.hpp>
41 #include <stout/option.hpp>
42 #include <stout/preprocessor.hpp>
43 #include <stout/result.hpp>
44 #include <stout/result_of.hpp>
45 #include <stout/stringify.hpp>
46 #include <stout/synchronized.hpp>
47 #include <stout/try.hpp>
48 
49 #include <stout/os/strerror.hpp>
50 
51 namespace process {
52 
53 // Forward declarations (instead of include to break circular dependency).
54 template <typename G>
55 struct _Deferred;
56 
57 template <typename T>
58 class Future;
59 
60 
61 namespace internal {
62 
63 template <typename T>
64 struct wrap;
65 
66 template <typename T>
67 struct unwrap;
68 
69 } // namespace internal {
70 
71 
72 // Forward declaration of Promise.
73 template <typename T>
74 class Promise;
75 
76 
77 // Forward declaration of WeakFuture.
78 template <typename T>
79 class WeakFuture;
80 
81 // Forward declaration of Failure.
82 struct Failure;
83 struct ErrnoFailure;
84 
85 
86 // Definition of a "shared" future. A future can hold any
87 // copy-constructible value. A future is considered "shared" because
88 // by default a future can be accessed concurrently.
89 template <typename T>
90 class Future
91 {
92 public:
93  // Constructs a failed future.
94  static Future<T> failed(const std::string& message);
95 
96  Future();
97 
98  /*implicit*/ Future(const T& _t);
99  /*implicit*/ Future(T&& _t);
100 
101  template <typename U>
102  /*implicit*/ Future(const U& u);
103 
104  /*implicit*/ Future(const Failure& failure);
105 
106  /*implicit*/ Future(const ErrnoFailure& failure);
107 
108  /*implicit*/ Future(const Future<T>& that) = default;
109  /*implicit*/ Future(Future<T>&& that) = default;
110 
111  template <typename E>
112  /*implicit*/ Future(const Try<T, E>& t);
113 
114  template <typename E>
115  /*implicit*/ Future(const Try<Future<T>, E>& t);
116 
117  ~Future() = default;
118 
119  // Futures are assignable (and copyable). This results in the
120  // reference to the previous future data being decremented and a
121  // reference to 'that' being incremented.
122  Future<T>& operator=(const Future<T>& that) = default;
123  Future<T>& operator=(Future<T>&& that) = default;
124 
125  // Comparison operators useful for using futures in collections.
126  bool operator==(const Future<T>& that) const;
127  bool operator!=(const Future<T>& that) const;
128  bool operator<(const Future<T>& that) const;
129 
130  // Helpers to get the current state of this future.
131  bool isPending() const;
132  bool isReady() const;
133  bool isDiscarded() const;
134  bool isFailed() const;
135  bool isAbandoned() const;
136  bool hasDiscard() const;
137 
138  // Discards this future. Returns false if discard has already been
139  // called or the future has already completed, i.e., is ready,
140  // failed, or discarded. Note that a discard does not terminate any
141  // computation but rather acts as a suggestion or indication that
142  // the caller no longer cares about the result of some
143  // computation. The callee can decide whether or not to continue the
144  // computation on their own (where best practices are to attempt to
145  // stop the computation if possible). The callee can discard the
146  // computation via Promise::discard which completes a future, at
147  // which point, Future::isDiscarded is true (and the
148  // Future::onDiscarded callbacks are executed). Before that point,
149  // but after calling Future::discard, only Future::hasDiscard will
150  // return true and the Future::onDiscard callbacks will be invoked.
151  bool discard();
152 
153  // Waits for this future to become ready, discarded, or failed.
154  bool await(const Duration& duration = Seconds(-1)) const;
155 
156  // Return the value associated with this future, waits indefinitely
157  // until a value gets associated or until the future is discarded.
158  const T& get() const;
159  const T* operator->() const;
160 
161  // Returns the failure message associated with this future.
162  const std::string& failure() const;
163 
164  // Type of the callback functions that can get invoked when the
165  // future gets set, fails, or is discarded.
172 
173  // Installs callbacks for the specified events and returns a const
174  // reference to 'this' in order to easily support chaining.
175  const Future<T>& onAbandoned(AbandonedCallback&& callback) const;
176  const Future<T>& onDiscard(DiscardCallback&& callback) const;
177  const Future<T>& onReady(ReadyCallback&& callback) const;
178  const Future<T>& onFailed(FailedCallback&& callback) const;
179  const Future<T>& onDiscarded(DiscardedCallback&& callback) const;
180  const Future<T>& onAny(AnyCallback&& callback) const;
181 
182  // TODO(benh): Add onReady, onFailed, onAny for _Deferred<F> where F
183  // is not expected.
184 
185  template <typename F>
186  const Future<T>& onAbandoned(_Deferred<F>&& deferred) const
187  {
188  return onAbandoned(
189  std::move(deferred).operator lambda::CallableOnce<void()>());
190  }
191 
192  template <typename F>
193  const Future<T>& onDiscard(_Deferred<F>&& deferred) const
194  {
195  return onDiscard(
196  std::move(deferred).operator lambda::CallableOnce<void()>());
197  }
198 
199  template <typename F>
200  const Future<T>& onReady(_Deferred<F>&& deferred) const
201  {
202  return onReady(
203  std::move(deferred).operator lambda::CallableOnce<void(const T&)>());
204  }
205 
206  template <typename F>
207  const Future<T>& onFailed(_Deferred<F>&& deferred) const
208  {
209  return onFailed(std::move(deferred)
210  .operator lambda::CallableOnce<void(const std::string&)>());
211  }
212 
213  template <typename F>
214  const Future<T>& onDiscarded(_Deferred<F>&& deferred) const
215  {
216  return onDiscarded(
217  std::move(deferred).operator lambda::CallableOnce<void()>());
218  }
219 
220  template <typename F>
221  const Future<T>& onAny(_Deferred<F>&& deferred) const
222  {
223  return onAny(std::move(deferred)
224  .operator lambda::CallableOnce<void(const Future<T>&)>());
225  }
226 
227 private:
228  // We use the 'Prefer' and 'LessPrefer' structs as a way to prefer
229  // one function over the other when doing SFINAE for the 'onReady',
230  // 'onFailed', 'onAny', and 'then' functions. In each of these cases
231  // we prefer calling the version of the functor that takes in an
232  // argument (i.e., 'const T&' for 'onReady' and 'then' and 'const
233  // std::string&' for 'onFailed'), but we allow functors that don't
234  // care about the argument. We don't need to do this for
235  // 'onDiscard', 'onDiscarded' or 'onAbandoned' because they don't
236  // take an argument.
237  struct LessPrefer {};
238  struct Prefer : LessPrefer {};
239 
241  const Future<T>& onReady(F&& f, Prefer) const
242  {
243  return onReady(lambda::CallableOnce<void(const T&)>(
245  [](typename std::decay<F>::type&& f, const T& t) {
246  std::move(f)(t);
247  },
248  std::forward<F>(f),
249  lambda::_1)));
250  }
251 
252  // This is the less preferred `onReady`, we prefer the `onReady` method which
253  // has `f` taking a `const T&` parameter. Unfortunately, to complicate
254  // matters, if `F` is the result of a `std::bind` expression we need to SFINAE
255  // out this version of `onReady` and force the use of the preferred `onReady`
256  // (which works because `std::bind` will just ignore the `const T&` argument).
257  // This is necessary because Visual Studio 2015 doesn't support using the
258  // `std::bind` call operator with `result_of` as it's technically not a
259  // requirement by the C++ standard.
260  template <
261  typename F,
262  typename = typename result_of<typename std::enable_if<
264  F>::type()>::type>
265  const Future<T>& onReady(F&& f, LessPrefer) const
266  {
267  return onReady(lambda::CallableOnce<void(const T&)>(
269  [](typename std::decay<F>::type&& f, const T&) {
270  std::move(f)();
271  },
272  std::forward<F>(f),
273  lambda::_1)));
274  }
275 
277  const Future<T>& onFailed(F&& f, Prefer) const
278  {
279  return onFailed(lambda::CallableOnce<void(const std::string&)>(
281  [](typename std::decay<F>::type&& f, const std::string& message) {
282  std::move(f)(message);
283  },
284  std::forward<F>(f),
285  lambda::_1)));
286  }
287 
288  // Refer to the less preferred version of `onReady` for why these SFINAE
289  // conditions are necessary.
290  template <
291  typename F,
292  typename = typename result_of<typename std::enable_if<
294  F>::type()>::type>
295  const Future<T>& onFailed(F&& f, LessPrefer) const
296  {
297  return onFailed(lambda::CallableOnce<void(const std::string&)>(
299  [](typename std::decay<F>::type&& f, const std::string&) mutable {
300  std::move(f)();
301  },
302  std::forward<F>(f),
303  lambda::_1)));
304  }
305 
306  template <typename F, typename = typename result_of<F(const Future<T>&)>::type> // NOLINT(whitespace/line_length)
307  const Future<T>& onAny(F&& f, Prefer) const
308  {
309  return onAny(lambda::CallableOnce<void(const Future<T>&)>(
311  [](typename std::decay<F>::type&& f, const Future<T>& future) {
312  std::move(f)(future);
313  },
314  std::forward<F>(f),
315  lambda::_1)));
316  }
317 
318  // Refer to the less preferred version of `onReady` for why these SFINAE
319  // conditions are necessary.
320  template <
321  typename F,
322  typename = typename result_of<typename std::enable_if<
324  F>::type()>::type>
325  const Future<T>& onAny(F&& f, LessPrefer) const
326  {
327  return onAny(lambda::CallableOnce<void(const Future<T>&)>(
329  [](typename std::decay<F>::type&& f, const Future<T>&) {
330  std::move(f)();
331  },
332  std::forward<F>(f),
333  lambda::_1)));
334  }
335 
336 public:
337  template <typename F>
338  const Future<T>& onAbandoned(F&& f) const
339  {
340  return onAbandoned(lambda::CallableOnce<void()>(
342  [](typename std::decay<F>::type&& f) {
343  std::move(f)();
344  },
345  std::forward<F>(f))));
346  }
347 
348  template <typename F>
349  const Future<T>& onDiscard(F&& f) const
350  {
351  return onDiscard(lambda::CallableOnce<void()>(
353  [](typename std::decay<F>::type&& f) {
354  std::move(f)();
355  },
356  std::forward<F>(f))));
357  }
358 
359  template <typename F>
360  const Future<T>& onReady(F&& f) const
361  {
362  return onReady(std::forward<F>(f), Prefer());
363  }
364 
365  template <typename F>
366  const Future<T>& onFailed(F&& f) const
367  {
368  return onFailed(std::forward<F>(f), Prefer());
369  }
370 
371  template <typename F>
372  const Future<T>& onDiscarded(F&& f) const
373  {
374  return onDiscarded(lambda::CallableOnce<void()>(
376  [](typename std::decay<F>::type&& f) {
377  std::move(f)();
378  },
379  std::forward<F>(f))));
380  }
381 
382  template <typename F>
383  const Future<T>& onAny(F&& f) const
384  {
385  return onAny(std::forward<F>(f), Prefer());
386  }
387 
388  // Installs callbacks that get executed when this future is ready
389  // and associates the result of the callback with the future that is
390  // returned to the caller (which may be of a different type).
391  template <typename X>
392  Future<X> then(lambda::CallableOnce<Future<X>(const T&)> f) const;
393 
394  template <typename X>
395  Future<X> then(lambda::CallableOnce<X(const T&)> f) const;
396 
397  template <typename X>
399  {
400  return then(lambda::CallableOnce<Future<X>(const T&)>(
401  lambda::partial(std::move(f))));
402  }
403 
404  template <typename X>
406  {
407  return then(lambda::CallableOnce<X(const T&)>(
408  lambda::partial(std::move(f))));
409  }
410 
411 private:
412  template <
413  typename F,
414  typename X =
416  Future<X> then(_Deferred<F>&& f, Prefer) const
417  {
418  // note the then<X> is necessary to not have an infinite loop with
419  // then(F&& f)
420  return then<X>(
421  std::move(f).operator lambda::CallableOnce<Future<X>(const T&)>());
422  }
423 
424  // Refer to the less preferred version of `onReady` for why these SFINAE
425  // conditions are necessary.
426  template <
427  typename F,
428  typename X = typename internal::unwrap<
429  typename result_of<typename std::enable_if<
431  F>::type()>::type>::type>
432  Future<X> then(_Deferred<F>&& f, LessPrefer) const
433  {
434  return then<X>(std::move(f).operator lambda::CallableOnce<Future<X>()>());
435  }
436 
438  Future<X> then(F&& f, Prefer) const
439  {
440  return then<X>(
441  lambda::CallableOnce<Future<X>(const T&)>(std::forward<F>(f)));
442  }
443 
444  // Refer to the less preferred version of `onReady` for why these SFINAE
445  // conditions are necessary.
446  template <
447  typename F,
448  typename X = typename internal::unwrap<
449  typename result_of<typename std::enable_if<
451  F>::type()>::type>::type>
452  Future<X> then(F&& f, LessPrefer) const
453  {
454  return then<X>(lambda::CallableOnce<Future<X>()>(std::forward<F>(f)));
455  }
456 
457 public:
458  // NOTE: There are two bugs we're dealing with here.
459  // (1) GCC bug where the explicit use of `this->` is required in the
460  // trailing return type: gcc.gnu.org/bugzilla/show_bug.cgi?id=57543
461  // (2) VS 2017 RC bug where the explicit use of `this->` is disallowed.
462  //
463  // Since VS 2015 and 2017 RC both implement C++14's deduced return type for
464  // functions, we simply choose to use that on Windows.
465  //
466  // TODO(mpark): Remove the trailing return type once we get to C++14.
467  template <typename F>
468  auto then(F&& f) const
469 #ifndef __WINDOWS__
470  -> decltype(this->then(std::forward<F>(f), Prefer()))
471 #endif // __WINDOWS__
472  {
473  return then(std::forward<F>(f), Prefer());
474  }
475 
476  // Installs callbacks that get executed if this future is abandoned,
477  // is discarded, or failed.
478  template <typename F>
479  Future<T> recover(F&& f) const;
480 
481  template <typename F>
482  Future<T> recover(_Deferred<F>&& deferred) const
483  {
484  return recover(
485  std::move(deferred)
486  .operator lambda::CallableOnce<Future<T>(const Future<T>&)>());
487  }
488 
489  // TODO(benh): Considering adding a `rescue` function for rescuing
490  // abandoned futures.
491 
492  // Installs callbacks that get executed if this future completes
493  // because it failed.
495  lambda::CallableOnce<Future<T>(const Future<T>&)> f) const;
496 
497  // TODO(benh): Add overloads of 'repair' that don't require passing
498  // in a function that takes the 'const Future<T>&' parameter and use
499  // Prefer/LessPrefer to disambiguate.
500 
501  // Invokes the specified function after some duration if this future
502  // has not been completed (set, failed, or discarded). Note that
503  // this function is agnostic of discard semantics and while it will
504  // propagate discarding "up the chain" it will still invoke the
505  // specified callback after the specified duration even if 'discard'
506  // was called on the returned future.
508  const Duration& duration,
509  lambda::CallableOnce<Future<T>(const Future<T>&)> f) const;
510 
511  // TODO(benh): Add overloads of 'after' that don't require passing
512  // in a function that takes the 'const Future<T>&' parameter and use
513  // Prefer/LessPrefer to disambiguate.
514 
515 private:
516  template <typename U>
517  friend class Future;
518  friend class Promise<T>;
519  friend class WeakFuture<T>;
520  template <typename U>
521  friend std::ostream& operator<<(std::ostream&, const Future<U>&);
522 
523  enum State
524  {
525  PENDING,
526  READY,
527  FAILED,
528  DISCARDED,
529  };
530 
531  struct Data
532  {
533  Data();
534  ~Data() = default;
535 
536  void clearAllCallbacks();
537 
538  std::atomic_flag lock = ATOMIC_FLAG_INIT;
539  State state;
540  bool discard;
541  bool associated;
542  bool abandoned;
543 
544  // One of:
545  // 1. None, the state is PENDING or DISCARDED.
546  // 2. Some, the state is READY.
547  // 3. Error, the state is FAILED; 'error()' stores the message.
548  Result<T> result;
549 
550  std::vector<AbandonedCallback> onAbandonedCallbacks;
551  std::vector<DiscardCallback> onDiscardCallbacks;
552  std::vector<ReadyCallback> onReadyCallbacks;
553  std::vector<FailedCallback> onFailedCallbacks;
554  std::vector<DiscardedCallback> onDiscardedCallbacks;
555  std::vector<AnyCallback> onAnyCallbacks;
556  };
557 
558  // Abandons this future. Returns false if the future is already
559  // associated or no longer pending. Otherwise returns true and any
560  // Future::onAbandoned callbacks wil be run.
561  //
562  // If `propagating` is true then we'll abandon this future even if
563  // it has already been associated. This is important because
564  // `~Promise()` will try and abandon and we need to ignore that if
565  // the future has been associated since the promise will no longer
566  // be setting the future anyway (and is likely the reason it's being
567  // destructed, because it's useless). When the future that we've
568  // associated with gets abandoned, however, then we need to actually
569  // abandon this future too. Here's an example of this:
570  //
571  // 1: Owned<Promise<int>> promise1(new Promise<int>());
572  // 2: Owned<Promise<int>> promise2(new Promise<int>());
573  // 3:
574  // 4: Future<int> future1 = promise1->future();
575  // 5: Future<int> future2 = promise2->future();
576  // 6:
577  // 7: promise1->associate(future2);
578  // 8:
579  // 9: promise1.reset();
580  // 10:
581  // 11: assert(!future1.isAbandoned());
582  // 12:
583  // 13: promise2.reset();
584  // 14:
585  // 15: assert(future2.isAbandoned());
586  // 16: assert(future3.isAbandoned());
587  //
588  // At line 9 `~Promise()` will attempt to abandon the future by
589  // calling `abandon()` but since it's been associated we won't do
590  // anything. On line 13 the `onAbandoned()` callback will call
591  // `abandon(true)` and know we'll actually abandon the future
592  // because we're _propagating_ the abandon from the associated
593  // future.
594  //
595  // NOTE: this is an _INTERNAL_ function and should never be exposed
596  // or used outside of the implementation.
597  bool abandon(bool propagating = false);
598 
599  // Sets the value for this future, unless the future is already set,
600  // failed, or discarded, in which case it returns false.
601  bool set(const T& _t);
602  bool set(T&& _t);
603 
604  template <typename U>
605  bool _set(U&& _u);
606 
607  // Sets this future as failed, unless the future is already set,
608  // failed, or discarded, in which case it returns false.
609  bool fail(const std::string& _message);
610 
611  std::shared_ptr<Data> data;
612 };
613 
614 
615 namespace internal {
616 
617 // Helper for executing callbacks that have been registered.
618 //
619 // TODO(*): Invoke callbacks in another execution context.
620 template <typename C, typename... Arguments>
621 void run(std::vector<C>&& callbacks, Arguments&&... arguments)
622 {
623  for (size_t i = 0; i < callbacks.size(); ++i) {
624  std::move(callbacks[i])(std::forward<Arguments>(arguments)...);
625  }
626 }
627 
628 } // namespace internal {
629 
630 
631 // Represents a weak reference to a future. This class is used to
632 // break cyclic dependencies between futures.
633 template <typename T>
634 class WeakFuture
635 {
636 public:
637  explicit WeakFuture(const Future<T>& future);
638 
639  // Converts this weak reference to a concrete future. Returns none
640  // if the conversion is not successful.
641  Option<Future<T>> get() const;
642 
643 private:
644  std::weak_ptr<typename Future<T>::Data> data;
645 };
646 
647 
648 template <typename T>
650  : data(future.data) {}
651 
652 
653 template <typename T>
655 {
656  Future<T> future;
657  future.data = data.lock();
658 
659  if (future.data) {
660  return future;
661  }
662 
663  return None();
664 }
665 
666 
667 // Helper for creating failed futures.
668 struct Failure
669 {
670  explicit Failure(const std::string& _message) : message(_message) {}
671  explicit Failure(const Error& error) : message(error.message) {}
672 
673  const std::string message;
674 };
675 
676 
677 struct ErrnoFailure : public Failure
678 {
680 
681  explicit ErrnoFailure(int _code)
682  : Failure(os::strerror(_code)), code(_code) {}
683 
684  explicit ErrnoFailure(const std::string& message)
685  : ErrnoFailure(errno, message) {}
686 
687  ErrnoFailure(int _code, const std::string& message)
688  : Failure(message + ": " + os::strerror(_code)), code(_code) {}
689 
690  const int code;
691 };
692 
693 
694 // Forward declaration to use as friend below.
695 namespace internal {
696 template <typename U>
697 void discarded(Future<U> future);
698 } // namespace internal {
699 
700 
701 // TODO(benh): Make Promise a subclass of Future?
702 template <typename T>
703 class Promise
704 {
705 public:
706  Promise();
707  explicit Promise(const T& t);
708  virtual ~Promise();
709 
710  Promise(Promise<T>&& that);
711 
712  bool discard();
713  bool set(const T& _t);
714  bool set(T&& _t);
715  bool set(const Future<T>& future); // Alias for associate.
716  bool associate(const Future<T>& future);
717  bool fail(const std::string& message);
718 
719  // Returns a copy of the future associated with this promise.
720  Future<T> future() const;
721 
722 private:
723  template <typename U>
724  friend class Future;
725  template <typename U>
726  friend void internal::discarded(Future<U> future);
727 
728  template <typename U>
729  bool _set(U&& u);
730 
731  // Not copyable, not assignable.
732  Promise(const Promise<T>&);
733  Promise<T>& operator=(const Promise<T>&);
734 
735  // Helper for doing the work of actually discarding a future (called
736  // from Promise::discard as well as internal::discarded).
737  static bool discard(Future<T> future);
738 
739  Future<T> f;
740 };
741 
742 
743 template <>
744 class Promise<void>;
745 
746 
747 template <typename T>
748 class Promise<T&>;
749 
750 
751 namespace internal {
752 
753 // Discards a weak future. If the weak future is invalid (i.e., the
754 // future it references to has already been destroyed), this operation
755 // is treated as a no-op.
756 template <typename T>
757 void discard(WeakFuture<T> reference)
758 {
759  Option<Future<T>> future = reference.get();
760  if (future.isSome()) {
761  Future<T> future_ = future.get();
762  future_.discard();
763  }
764 }
765 
766 
767 // Helper for invoking Promise::discard in an onDiscarded callback
768 // (since the onDiscarded callback requires returning void we can't
769 // bind with Promise::discard).
770 template <typename T>
771 void discarded(Future<T> future)
772 {
773  Promise<T>::discard(future);
774 }
775 
776 } // namespace internal {
777 
778 
779 template <typename T>
781 {
782  // Need to "unset" `abandoned` since it gets set in the empty
783  // constructor for `Future`.
784  f.data->abandoned = false;
785 }
786 
787 
788 template <typename T>
790  : f(t) {}
791 
792 
793 template <typename T>
795 {
796  // Note that we don't discard the promise as we don't want to give
797  // the illusion that any computation hasn't started (or possibly
798  // finished) in the event that computation is "visible" by other
799  // means. However, we try and abandon the future if it hasn't been
800  // associated or set (or moved, i.e., `f.data` is true).
801  if (f.data) {
802  f.abandon();
803  }
804 }
805 
806 
807 template <typename T>
809  : f(std::move(that.f)) {}
810 
811 
812 template <typename T>
814 {
815  if (!f.data->associated) {
816  return discard(f);
817  }
818  return false;
819 }
820 
821 
822 template <typename T>
823 bool Promise<T>::set(T&& t)
824 {
825  return _set(std::move(t));
826 }
827 
828 
829 template <typename T>
830 bool Promise<T>::set(const T& t)
831 {
832  return _set(t);
833 }
834 
835 
836 template <typename T>
837 template <typename U>
838 bool Promise<T>::_set(U&& u)
839 {
840  if (!f.data->associated) {
841  return f.set(std::forward<U>(u));
842  }
843  return false;
844 }
845 
846 
847 template <typename T>
849 {
850  return associate(future);
851 }
852 
853 
854 template <typename T>
856 {
857  bool associated = false;
858 
859  synchronized (f.data->lock) {
860  // Don't associate if this promise has completed. Note that this
861  // does not include if Future::discard was called on this future
862  // since in that case that would still leave the future PENDING
863  // (note that we cover that case below).
864  if (f.data->state == Future<T>::PENDING && !f.data->associated) {
865  associated = f.data->associated = true;
866 
867  // After this point we don't allow 'f' to be completed via the
868  // promise since we've set 'associated' but Future::discard on
869  // 'f' might get called which will get propagated via the
870  // 'f.onDiscard' below. Note that we currently don't propagate a
871  // discard from 'future.onDiscard' but these semantics might
872  // change if/when we make 'f' and 'future' true aliases of one
873  // another.
874  }
875  }
876 
877  // Note that we do the actual associating after releasing the lock
878  // above to avoid deadlocking by attempting to require the lock
879  // within from invoking 'f.onDiscard' and/or 'f.set/fail' via the
880  // bind statements from doing 'future.onReady/onFailed'.
881  if (associated) {
882  // TODO(jieyu): Make 'f' a true alias of 'future'. Currently, only
883  // 'discard' is associated in both directions. In other words, if
884  // a future gets discarded, the other future will also get
885  // discarded. For 'set' and 'fail', they are associated only in
886  // one direction. In other words, calling 'set' or 'fail' on this
887  // promise will not affect the result of the future that we
888  // associated.
889  f.onDiscard(lambda::bind(&internal::discard<T>, WeakFuture<T>(future)));
890 
891  // Need to disambiguate for the compiler.
892  bool (Future<T>::*set)(const T&) = &Future<T>::set;
893 
894  future
895  .onReady(lambda::bind(set, f, lambda::_1))
896  .onFailed(lambda::bind(&Future<T>::fail, f, lambda::_1))
897  .onDiscarded(lambda::bind(&internal::discarded<T>, f))
898  .onAbandoned(lambda::bind(&Future<T>::abandon, f, true));
899  }
900 
901  return associated;
902 }
903 
904 
905 template <typename T>
906 bool Promise<T>::fail(const std::string& message)
907 {
908  if (!f.data->associated) {
909  return f.fail(message);
910  }
911  return false;
912 }
913 
914 
915 template <typename T>
917 {
918  return f;
919 }
920 
921 
922 // Internal helper utilities.
923 namespace internal {
924 
925 template <typename T>
926 struct wrap
927 {
928  typedef Future<T> type;
929 };
930 
931 
932 template <typename X>
933 struct wrap<Future<X>>
934 {
935  typedef Future<X> type;
936 };
937 
938 
939 template <typename T>
940 struct unwrap
941 {
942  typedef T type;
943 };
944 
945 
946 template <typename X>
947 struct unwrap<Future<X>>
948 {
949  typedef X type;
950 };
951 
952 
953 template <typename T>
954 void select(
955  const Future<T>& future,
956  std::shared_ptr<Promise<Future<T>>> promise)
957 {
958  // We never fail the future associated with our promise.
959  assert(!promise->future().isFailed());
960 
961  if (promise->future().isPending()) { // No-op if it's discarded.
962  if (future.isReady()) { // We only set the promise if a future is ready.
963  promise->set(future);
964  }
965  }
966 }
967 
968 } // namespace internal {
969 
970 
971 // TODO(benh): Move select and discard into 'futures' namespace.
972 
973 // Returns a future that captures any ready future in a set. Note that
974 // select DOES NOT capture a future that has failed or been discarded.
975 template <typename T>
977 {
978  std::shared_ptr<Promise<Future<T>>> promise(new Promise<Future<T>>());
979 
980  promise->future().onDiscard(
981  lambda::bind(&internal::discarded<Future<T>>, promise->future()));
982 
983  foreach (const Future<T>& future, futures) {
984  future.onAny([=](const Future<T>& f) {
985  internal::select(f, promise);
986  });
987  }
988 
989  return promise->future();
990 }
991 
992 
993 template <typename Futures>
994 void discard(const Futures& futures)
995 {
996  foreach (auto future, futures) { // Need a non-const copy to discard.
997  future.discard();
998  }
999 }
1000 
1001 
1002 template <typename T>
1004 {
1005  bool result = false;
1006 
1007  synchronized (future.data->lock) {
1008  if (future.data->state == Future<T>::PENDING) {
1009  future.data->state = Future<T>::DISCARDED;
1010  result = true;
1011  }
1012  }
1013 
1014  // Invoke all callbacks associated with this future being
1015  // DISCARDED. We don't need a lock because the state is now in
1016  // DISCARDED so there should not be any concurrent modifications to
1017  // the callbacks.
1018  if (result) {
1019  // NOTE: we rely on the fact that we have `future` to protect
1020  // ourselves from one of the callbacks erroneously deleting the
1021  // future. In `Future::_set()` and `Future::fail()` we have to
1022  // explicitly take a copy to protect ourselves.
1023  internal::run(std::move(future.data->onDiscardedCallbacks));
1024  internal::run(std::move(future.data->onAnyCallbacks), future);
1025 
1026  future.data->clearAllCallbacks();
1027  }
1028 
1029  return result;
1030 }
1031 
1032 
1033 template <typename T>
1034 Future<T> Future<T>::failed(const std::string& message)
1035 {
1036  Future<T> future;
1037  future.fail(message);
1038  return future;
1039 }
1040 
1041 
1042 template <typename T>
1044  : state(PENDING),
1045  discard(false),
1046  associated(false),
1047  abandoned(false),
1048  result(None()) {}
1049 
1050 
1051 template <typename T>
1053 {
1054  onAbandonedCallbacks.clear();
1055  onAnyCallbacks.clear();
1056  onDiscardCallbacks.clear();
1057  onDiscardedCallbacks.clear();
1058  onFailedCallbacks.clear();
1059  onReadyCallbacks.clear();
1060 }
1061 
1062 
1063 template <typename T>
1065  : data(new Data())
1066 {
1067  data->abandoned = true;
1068 }
1069 
1070 
1071 template <typename T>
1072 Future<T>::Future(const T& _t)
1073  : data(new Data())
1074 {
1075  set(_t);
1076 }
1077 
1078 
1079 template <typename T>
1081  : data(new Data())
1082 {
1083  set(std::move(_t));
1084 }
1085 
1086 
1087 template <typename T>
1088 template <typename U>
1090  : data(new Data())
1091 {
1092  set(u);
1093 }
1094 
1095 
1096 template <typename T>
1098  : data(new Data())
1099 {
1100  fail(failure.message);
1101 }
1102 
1103 
1104 template <typename T>
1106  : data(new Data())
1107 {
1108  fail(failure.message);
1109 }
1110 
1111 
1112 template <typename T>
1113 template <typename E>
1115  : data(new Data())
1116 {
1117  if (t.isSome()){
1118  set(t.get());
1119  } else {
1120  // TODO(chhsiao): Consider preserving the error type. See MESOS-8925.
1121  fail(stringify(t.error()));
1122  }
1123 }
1124 
1125 
1126 template <typename T>
1127 template <typename E>
1129  : data(t.isSome() ? t->data : std::shared_ptr<Data>(new Data()))
1130 {
1131  if (!t.isSome()) {
1132  // TODO(chhsiao): Consider preserving the error type. See MESOS-8925.
1133  fail(stringify(t.error()));
1134  }
1135 }
1136 
1137 
1138 template <typename T>
1139 bool Future<T>::operator==(const Future<T>& that) const
1140 {
1141  return data == that.data;
1142 }
1143 
1144 
1145 template <typename T>
1146 bool Future<T>::operator!=(const Future<T>& that) const
1147 {
1148  return !(*this == that);
1149 }
1150 
1151 
1152 template <typename T>
1153 bool Future<T>::operator<(const Future<T>& that) const
1154 {
1155  return data < that.data;
1156 }
1157 
1158 
1159 template <typename T>
1161 {
1162  bool result = false;
1163 
1164  std::vector<DiscardCallback> callbacks;
1165  synchronized (data->lock) {
1166  if (!data->discard && data->state == PENDING) {
1167  result = data->discard = true;
1168 
1169  callbacks.swap(data->onDiscardCallbacks);
1170  }
1171  }
1172 
1173  // Invoke all callbacks associated with doing a discard on this
1174  // future. The callbacks get destroyed when we exit from the
1175  // function.
1176  if (result) {
1177  internal::run(std::move(callbacks));
1178  }
1179 
1180  return result;
1181 }
1182 
1183 
1184 template <typename T>
1185 bool Future<T>::abandon(bool propagating)
1186 {
1187  bool result = false;
1188 
1189  std::vector<AbandonedCallback> callbacks;
1190  synchronized (data->lock) {
1191  if (!data->abandoned &&
1192  data->state == PENDING &&
1193  (!data->associated || propagating)) {
1194  result = data->abandoned = true;
1195 
1196  callbacks.swap(data->onAbandonedCallbacks);
1197  }
1198  }
1199 
1200  // Invoke all callbacks. The callbacks get destroyed when we exit
1201  // from the function.
1202  if (result) {
1203  internal::run(std::move(callbacks));
1204  }
1205 
1206  return result;
1207 }
1208 
1209 
1210 template <typename T>
1212 {
1213  return data->state == PENDING;
1214 }
1215 
1216 
1217 template <typename T>
1219 {
1220  return data->state == READY;
1221 }
1222 
1223 
1224 template <typename T>
1226 {
1227  return data->state == DISCARDED;
1228 }
1229 
1230 
1231 template <typename T>
1233 {
1234  return data->state == FAILED;
1235 }
1236 
1237 
1238 template <typename T>
1240 {
1241  return data->abandoned;
1242 }
1243 
1244 
1245 template <typename T>
1247 {
1248  return data->discard;
1249 }
1250 
1251 
1252 namespace internal {
1253 
1254 inline void awaited(Owned<Latch> latch)
1255 {
1256  latch->trigger();
1257 }
1258 
1259 } // namespace internal {
1260 
1261 
1262 template <typename T>
1263 bool Future<T>::await(const Duration& duration) const
1264 {
1265  // NOTE: We need to preemptively allocate the Latch on the stack
1266  // instead of lazily create it in the critical section below because
1267  // instantiating a Latch requires creating a new process (at the
1268  // time of writing this comment) which might need to do some
1269  // synchronization in libprocess which might deadlock if some other
1270  // code in libprocess is already holding a lock and then attempts to
1271  // do Promise::set (or something similar) that attempts to acquire
1272  // the lock that we acquire here. This is an artifact of using
1273  // Future/Promise within the implementation of libprocess.
1274  //
1275  // We mostly only call 'await' in tests so this should not be a
1276  // performance concern.
1277  Owned<Latch> latch(new Latch());
1278 
1279  bool pending = false;
1280 
1281  synchronized (data->lock) {
1282  if (data->state == PENDING) {
1283  pending = true;
1284  data->onAnyCallbacks.push_back(lambda::bind(&internal::awaited, latch));
1285  }
1286  }
1287 
1288  if (pending) {
1289  return latch->await(duration);
1290  }
1291 
1292  return true;
1293 }
1294 
1295 
1296 template <typename T>
1297 const T& Future<T>::get() const
1298 {
1299  if (!isReady()) {
1300  await();
1301  }
1302 
1303  CHECK(!isPending()) << "Future was in PENDING after await()";
1304  // We can't use CHECK_READY here due to check.hpp depending on future.hpp.
1305  if (!isReady()) {
1306  CHECK(!isFailed()) << "Future::get() but state == FAILED: " << failure();
1307  CHECK(!isDiscarded()) << "Future::get() but state == DISCARDED";
1308  }
1309 
1310  assert(data->result.isSome());
1311  return data->result.get();
1312 }
1313 
1314 
1315 template <typename T>
1316 const T* Future<T>::operator->() const
1317 {
1318  return &get();
1319 }
1320 
1321 
1322 template <typename T>
1323 const std::string& Future<T>::failure() const
1324 {
1325  if (data->state != FAILED) {
1326  ABORT("Future::failure() but state != FAILED");
1327  }
1328 
1329  CHECK_ERROR(data->result);
1330  return data->result.error();
1331 }
1332 
1333 
1334 template <typename T>
1335 const Future<T>& Future<T>::onAbandoned(AbandonedCallback&& callback) const
1336 {
1337  bool run = false;
1338 
1339  synchronized (data->lock) {
1340  if (data->abandoned) {
1341  run = true;
1342  } else if (data->state == PENDING) {
1343  data->onAbandonedCallbacks.emplace_back(std::move(callback));
1344  }
1345  }
1346 
1347  // TODO(*): Invoke callback in another execution context.
1348  if (run) {
1349  std::move(callback)(); // NOLINT(misc-use-after-move)
1350  }
1351 
1352  return *this;
1353 }
1354 
1355 
1356 template <typename T>
1357 const Future<T>& Future<T>::onDiscard(DiscardCallback&& callback) const
1358 {
1359  bool run = false;
1360 
1361  synchronized (data->lock) {
1362  if (data->discard) {
1363  run = true;
1364  } else if (data->state == PENDING) {
1365  data->onDiscardCallbacks.emplace_back(std::move(callback));
1366  }
1367  }
1368 
1369  // TODO(*): Invoke callback in another execution context.
1370  if (run) {
1371  std::move(callback)(); // NOLINT(misc-use-after-move)
1372  }
1373 
1374  return *this;
1375 }
1376 
1377 
1378 template <typename T>
1379 const Future<T>& Future<T>::onReady(ReadyCallback&& callback) const
1380 {
1381  bool run = false;
1382 
1383  synchronized (data->lock) {
1384  if (data->state == READY) {
1385  run = true;
1386  } else if (data->state == PENDING) {
1387  data->onReadyCallbacks.emplace_back(std::move(callback));
1388  }
1389  }
1390 
1391  // TODO(*): Invoke callback in another execution context.
1392  if (run) {
1393  std::move(callback)(data->result.get()); // NOLINT(misc-use-after-move)
1394  }
1395 
1396  return *this;
1397 }
1398 
1399 
1400 template <typename T>
1401 const Future<T>& Future<T>::onFailed(FailedCallback&& callback) const
1402 {
1403  bool run = false;
1404 
1405  synchronized (data->lock) {
1406  if (data->state == FAILED) {
1407  run = true;
1408  } else if (data->state == PENDING) {
1409  data->onFailedCallbacks.emplace_back(std::move(callback));
1410  }
1411  }
1412 
1413  // TODO(*): Invoke callback in another execution context.
1414  if (run) {
1415  std::move(callback)(data->result.error()); // NOLINT(misc-use-after-move)
1416  }
1417 
1418  return *this;
1419 }
1420 
1421 
1422 template <typename T>
1423 const Future<T>& Future<T>::onDiscarded(DiscardedCallback&& callback) const
1424 {
1425  bool run = false;
1426 
1427  synchronized (data->lock) {
1428  if (data->state == DISCARDED) {
1429  run = true;
1430  } else if (data->state == PENDING) {
1431  data->onDiscardedCallbacks.emplace_back(std::move(callback));
1432  }
1433  }
1434 
1435  // TODO(*): Invoke callback in another execution context.
1436  if (run) {
1437  std::move(callback)(); // NOLINT(misc-use-after-move)
1438  }
1439 
1440  return *this;
1441 }
1442 
1443 
1444 template <typename T>
1445 const Future<T>& Future<T>::onAny(AnyCallback&& callback) const
1446 {
1447  bool run = false;
1448 
1449  synchronized (data->lock) {
1450  if (data->state == PENDING) {
1451  data->onAnyCallbacks.emplace_back(std::move(callback));
1452  } else {
1453  run = true;
1454  }
1455  }
1456 
1457  // TODO(*): Invoke callback in another execution context.
1458  if (run) {
1459  std::move(callback)(*this); // NOLINT(misc-use-after-move)
1460  }
1461 
1462  return *this;
1463 }
1464 
1465 namespace internal {
1466 
1467 // NOTE: We need to name this 'thenf' versus 'then' to distinguish it
1468 // from the function 'then' whose parameter 'f' doesn't return a
1469 // Future since the compiler can't properly infer otherwise.
1470 template <typename T, typename X>
1472  std::unique_ptr<Promise<X>> promise,
1473  const Future<T>& future)
1474 {
1475  if (future.isReady()) {
1476  if (future.hasDiscard()) {
1477  promise->discard();
1478  } else {
1479  promise->associate(std::move(f)(future.get()));
1480  }
1481  } else if (future.isFailed()) {
1482  promise->fail(future.failure());
1483  } else if (future.isDiscarded()) {
1484  promise->discard();
1485  }
1486 }
1487 
1488 
1489 template <typename T, typename X>
1490 void then(lambda::CallableOnce<X(const T&)>&& f,
1491  std::unique_ptr<Promise<X>> promise,
1492  const Future<T>& future)
1493 {
1494  if (future.isReady()) {
1495  if (future.hasDiscard()) {
1496  promise->discard();
1497  } else {
1498  promise->set(std::move(f)(future.get()));
1499  }
1500  } else if (future.isFailed()) {
1501  promise->fail(future.failure());
1502  } else if (future.isDiscarded()) {
1503  promise->discard();
1504  }
1505 }
1506 
1507 
1508 template <typename T>
1509 void repair(
1511  std::unique_ptr<Promise<T>> promise,
1512  const Future<T>& future)
1513 {
1514  CHECK(!future.isPending());
1515  if (future.isFailed()) {
1516  promise->associate(std::move(f)(future));
1517  } else {
1518  promise->associate(future);
1519  }
1520 }
1521 
1522 
1523 template <typename T>
1524 void expired(
1525  const std::shared_ptr<lambda::CallableOnce<Future<T>(const Future<T>&)>>& f,
1526  const std::shared_ptr<Latch>& latch,
1527  const std::shared_ptr<Promise<T>>& promise,
1528  const std::shared_ptr<Option<Timer>>& timer,
1529  const Future<T>& future)
1530 {
1531  if (latch->trigger()) {
1532  // If this callback executed first (i.e., we triggered the latch)
1533  // then we want to clear out the timer so that we don't hold a
1534  // circular reference to `future` in it's own `onAny`
1535  // callbacks. See the comment in `Future::after`.
1536  *timer = None();
1537 
1538  // Note that we don't bother checking if 'future' has been
1539  // discarded (i.e., 'future.isDiscarded()' returns true) since
1540  // there is a race between when we make that check and when we
1541  // would invoke 'f(future)' so the callee 'f' should ALWAYS check
1542  // if the future has been discarded and rather than hiding a
1543  // non-deterministic bug we always call 'f' if the timer has
1544  // expired.
1545  promise->associate(std::move(*f)(future));
1546  }
1547 }
1548 
1549 
1550 template <typename T>
1551 void after(
1552  const std::shared_ptr<Latch>& latch,
1553  const std::shared_ptr<Promise<T>>& promise,
1554  const std::shared_ptr<Option<Timer>>& timer,
1555  const Future<T>& future)
1556 {
1557  CHECK(!future.isPending());
1558  if (latch->trigger()) {
1559  // If this callback executes first (i.e., we triggered the latch)
1560  // it must be the case that `timer` is still some and we can try
1561  // and cancel the timer.
1562  CHECK_SOME(*timer);
1563  Clock::cancel(timer->get());
1564 
1565  // We also force the timer to get deallocated so that there isn't
1566  // a cicular reference of the timer with itself which keeps around
1567  // a reference to the original future.
1568  *timer = None();
1569 
1570  promise->associate(future);
1571  }
1572 }
1573 
1574 } // namespace internal {
1575 
1576 
1577 template <typename T>
1578 template <typename X>
1580 {
1581  std::unique_ptr<Promise<X>> promise(new Promise<X>());
1582  Future<X> future = promise->future();
1583 
1585  &internal::thenf<T, X>, std::move(f), std::move(promise), lambda::_1);
1586 
1587  onAny(std::move(thenf));
1588 
1589  onAbandoned([=]() mutable {
1590  future.abandon();
1591  });
1592 
1593  // Propagate discarding up the chain. To avoid cyclic dependencies,
1594  // we keep a weak future in the callback.
1595  future.onDiscard(lambda::bind(&internal::discard<T>, WeakFuture<T>(*this)));
1596 
1597  return future;
1598 }
1599 
1600 
1601 template <typename T>
1602 template <typename X>
1604 {
1605  std::unique_ptr<Promise<X>> promise(new Promise<X>());
1606  Future<X> future = promise->future();
1607 
1609  &internal::then<T, X>, std::move(f), std::move(promise), lambda::_1);
1610 
1611  onAny(std::move(then));
1612 
1613  onAbandoned([=]() mutable {
1614  future.abandon();
1615  });
1616 
1617  // Propagate discarding up the chain. To avoid cyclic dependencies,
1618  // we keep a weak future in the callback.
1619  future.onDiscard(lambda::bind(&internal::discard<T>, WeakFuture<T>(*this)));
1620 
1621  return future;
1622 }
1623 
1624 
1625 template <typename T>
1626 template <typename F>
1628 {
1629  std::shared_ptr<Promise<T>> promise(new Promise<T>());
1630 
1631  const Future<T> future = *this;
1632 
1633  typedef decltype(std::move(f)(future)) R;
1634 
1635  std::shared_ptr<lambda::CallableOnce<R(const Future<T>&)>> callable(
1636  new lambda::CallableOnce<R(const Future<T>&)>(std::move(f)));
1637 
1638  onAny([=]() {
1639  if (future.isDiscarded() || future.isFailed()) {
1640  // We reset `discard` so that if a future gets returned from
1641  // `f(future)` we won't immediately discard it! We still want to
1642  // let the future get discarded later, however, hence if it gets
1643  // set again in the future it'll propagate to the returned
1644  // future.
1645  synchronized (promise->f.data->lock) {
1646  promise->f.data->discard = false;
1647  }
1648 
1649  promise->set(std::move(*callable)(future));
1650  } else {
1651  promise->associate(future);
1652  }
1653  });
1654 
1655  onAbandoned([=]() {
1656  // See comment above for why we reset `discard` here.
1657  synchronized (promise->f.data->lock) {
1658  promise->f.data->discard = false;
1659  }
1660  promise->set(std::move(*callable)(future));
1661  });
1662 
1663  // Propagate discarding up the chain. To avoid cyclic dependencies,
1664  // we keep a weak future in the callback.
1665  promise->future().onDiscard(
1666  lambda::bind(&internal::discard<T>, WeakFuture<T>(*this)));
1667 
1668  return promise->future();
1669 }
1670 
1671 
1672 template <typename T>
1674  lambda::CallableOnce<Future<T>(const Future<T>&)> f) const
1675 {
1676  std::unique_ptr<Promise<T>> promise(new Promise<T>());
1677  Future<T> future = promise->future();
1678 
1680  &internal::repair<T>, std::move(f), std::move(promise), lambda::_1));
1681 
1682  onAbandoned([=]() mutable {
1683  future.abandon();
1684  });
1685 
1686  // Propagate discarding up the chain. To avoid cyclic dependencies,
1687  // we keep a weak future in the callback.
1688  future.onDiscard(lambda::bind(&internal::discard<T>, WeakFuture<T>(*this)));
1689 
1690  return future;
1691 }
1692 
1693 
1694 template <typename T>
1696  const Duration& duration,
1697  lambda::CallableOnce<Future<T>(const Future<T>&)> f) const
1698 {
1699  // TODO(benh): Using a Latch here but Once might be cleaner.
1700  // Unfortunately, Once depends on Future so we can't easily use it
1701  // from here.
1702  std::shared_ptr<Latch> latch(new Latch());
1703  std::shared_ptr<Promise<T>> promise(new Promise<T>());
1704 
1705  // We need to control the lifetime of the timer we create below so
1706  // that we can force the timer to get deallocated after it
1707  // expires. The reason we want to force the timer to get deallocated
1708  // after it expires is because the timer's lambda has a copy of
1709  // `this` (i.e., a Future) and it's stored in the `onAny` callbacks
1710  // of `this` thus creating a circular reference. By storing a
1711  // `shared_ptr<Option<Timer>>` we're able to set the option to none
1712  // after the timer expires which will deallocate our copy of the
1713  // timer and leave the `Option<Timer>` stored in the lambda of the
1714  // `onAny` callback as none. Note that this is safe because the
1715  // `Latch` makes sure that only one of the callbacks will manipulate
1716  // the `shared_ptr<Option<Timer>>` so there isn't any concurrency
1717  // issues we have to worry about.
1718  std::shared_ptr<Option<Timer>> timer(new Option<Timer>());
1719 
1720  typedef lambda::CallableOnce<Future<T>(const Future<T>&)> F;
1721  std::shared_ptr<F> callable(new F(std::move(f)));
1722 
1723  // Set up a timer to invoke the callback if this future has not
1724  // completed. Note that we do not pass a weak reference for this
1725  // future as we don't want the future to get cleaned up and then
1726  // have the timer expire because then we wouldn't have a valid
1727  // future that we could pass to `f`! The reference to `this` that is
1728  // captured in the timer will get removed by setting the
1729  // `Option<Timer>` to none (see comment above) either if the timer
1730  // expires or if `this` completes and we cancel the timer (see
1731  // `internal::expired` and `internal::after` callbacks for where we
1732  // force the deallocation of our copy of the timer).
1733  *timer = Clock::timer(
1734  duration,
1735  lambda::bind(&internal::expired<T>, callable, latch, promise, timer,
1736  *this));
1737 
1738  onAny(lambda::bind(&internal::after<T>, latch, promise, timer, lambda::_1));
1739 
1740  onAbandoned([=]() {
1741  promise->future().abandon();
1742  });
1743 
1744  // Propagate discarding up the chain. To avoid cyclic dependencies,
1745  // we keep a weak future in the callback.
1746  promise->future().onDiscard(
1747  lambda::bind(&internal::discard<T>, WeakFuture<T>(*this)));
1748 
1749  return promise->future();
1750 }
1751 
1752 
1753 template <typename T>
1754 bool Future<T>::set(T&& t)
1755 {
1756  return _set(std::move(t));
1757 }
1758 
1759 
1760 template <typename T>
1761 bool Future<T>::set(const T& t)
1762 {
1763  return _set(t);
1764 }
1765 
1766 
1767 template <typename T>
1768 template <typename U>
1769 bool Future<T>::_set(U&& u)
1770 {
1771  bool result = false;
1772 
1773  synchronized (data->lock) {
1774  if (data->state == PENDING) {
1775  data->result = std::forward<U>(u);
1776  data->state = READY;
1777  result = true;
1778  }
1779  }
1780 
1781  // Invoke all callbacks associated with this future being READY. We
1782  // don't need a lock because the state is now in READY so there
1783  // should not be any concurrent modifications to the callbacks.
1784  if (result) {
1785  // Grab a copy of `data` just in case invoking the callbacks
1786  // erroneously attempts to delete this future.
1787  std::shared_ptr<typename Future<T>::Data> copy = data;
1788  internal::run(std::move(copy->onReadyCallbacks), copy->result.get());
1789  internal::run(std::move(copy->onAnyCallbacks), *this);
1790 
1791  copy->clearAllCallbacks();
1792  }
1793 
1794  return result;
1795 }
1796 
1797 
1798 template <typename T>
1799 bool Future<T>::fail(const std::string& _message)
1800 {
1801  bool result = false;
1802 
1803  synchronized (data->lock) {
1804  if (data->state == PENDING) {
1805  data->result = Result<T>(Error(_message));
1806  data->state = FAILED;
1807  result = true;
1808  }
1809  }
1810 
1811  // Invoke all callbacks associated with this future being FAILED. We
1812  // don't need a lock because the state is now in FAILED so there
1813  // should not be any concurrent modifications to the callbacks.
1814  if (result) {
1815  // Grab a copy of `data` just in case invoking the callbacks
1816  // erroneously attempts to delete this future.
1817  std::shared_ptr<typename Future<T>::Data> copy = data;
1818  internal::run(std::move(copy->onFailedCallbacks), copy->result.error());
1819  internal::run(std::move(copy->onAnyCallbacks), *this);
1820 
1821  copy->clearAllCallbacks();
1822  }
1823 
1824  return result;
1825 }
1826 
1827 
1828 template <typename T>
1829 std::ostream& operator<<(std::ostream& stream, const Future<T>& future)
1830 {
1831  const std::string suffix = future.data->discard ? " (with discard)" : "";
1832 
1833  switch (future.data->state) {
1834  case Future<T>::PENDING:
1835  if (future.data->abandoned) {
1836  return stream << "Abandoned" << suffix;
1837  }
1838  return stream << "Pending" << suffix;
1839 
1840  case Future<T>::READY:
1841  // TODO(benh): Stringify `Future<T>::get()` if it can be
1842  // stringified (will need to be SFINAE'ed appropriately).
1843  return stream << "Ready" << suffix;
1844 
1845  case Future<T>::FAILED:
1846  return stream << "Failed" << suffix << ": " << future.failure();
1847 
1848  case Future<T>::DISCARDED:
1849  return stream << "Discarded" << suffix;
1850  }
1851 
1852  return stream;
1853 }
1854 
1855 
1856 // Helper for setting a set of Promises.
1857 template <typename T>
1858 void setPromises(std::set<Promise<T>*>* promises, const T& t)
1859 {
1860  foreach (Promise<T>* promise, *promises) {
1861  promise->set(t);
1862  delete promise;
1863  }
1864  promises->clear();
1865 }
1866 
1867 
1868 // Helper for failing a set of Promises.
1869 template <typename T>
1870 void failPromises(std::set<Promise<T>*>* promises, const std::string& failure)
1871 {
1872  foreach (Promise<T>* promise, *promises) {
1873  promise->fail(failure);
1874  delete promise;
1875  }
1876  promises->clear();
1877 }
1878 
1879 
1880 // Helper for discarding a set of Promises.
1881 template <typename T>
1883 {
1884  foreach (Promise<T>* promise, *promises) {
1885  promise->discard();
1886  delete promise;
1887  }
1888  promises->clear();
1889 }
1890 
1891 
1892 // Helper for discarding an individual promise in the set.
1893 template <typename T>
1894 void discardPromises(std::set<Promise<T>*>* promises, const Future<T>& future)
1895 {
1896  foreach (Promise<T>* promise, *promises) {
1897  if (promise->future() == future) {
1898  promise->discard();
1899  promises->erase(promise);
1900  delete promise;
1901  return;
1902  }
1903  }
1904 }
1905 
1906 
1907 // Returns a future that will not propagate a discard through to the
1908 // future passed in as an argument. This can be very valuable if you
1909 // want to block some future from getting discarded.
1910 //
1911 // Example:
1912 //
1913 // Promise<int> promise;
1914 // Future<int> future = undiscardable(promise.future());
1915 // future.discard();
1916 // assert(!promise.future().hasDiscard());
1917 //
1918 // Or another example, when chaining futures:
1919 //
1920 // Future<int> future = undiscardable(
1921 // foo()
1922 // .then([]() { ...; })
1923 // .then([]() { ...; }));
1924 //
1925 // This will guarantee that a discard _will not_ propagate to `foo()`
1926 // or any of the futures returned from the invocations of `.then()`.
1927 template <typename T>
1929 {
1930  std::unique_ptr<Promise<T>> promise(new Promise<T>());
1931  Future<T> future_ = promise->future();
1932  future.onAny(lambda::partial(
1933  [](std::unique_ptr<Promise<T>> promise, const Future<T>& future) {
1934  promise->associate(future);
1935  },
1936  std::move(promise),
1937  lambda::_1));
1938  return future_;
1939 }
1940 
1941 
1942 // Decorator that for some callable `f` invokes
1943 // `undiscardable(f(args))` for some `args`. This is used by the
1944 // overload of `undiscardable()` that takes callables instead of a
1945 // specialization of `Future`.
1946 //
1947 // TODO(benh): Factor out a generic decorator pattern to be used in
1948 // other circumstances, e.g., to replace `_Deferred`.
1949 template <typename F>
1951 {
1952  template <
1953  typename G,
1954  typename std::enable_if<
1955  std::is_constructible<F, G>::value, int>::type = 0>
1956  UndiscardableDecorator(G&& g) : f(std::forward<G>(g)) {}
1957 
1958  template <typename... Args>
1959  auto operator()(Args&&... args)
1960  -> decltype(std::declval<F&>()(std::forward<Args>(args)...))
1961  {
1962  using Result =
1963  typename std::decay<decltype(f(std::forward<Args>(args)...))>::type;
1964 
1965  static_assert(
1967  "Expecting Future<T> to be returned from undiscarded(...)");
1968 
1969  return undiscardable(f(std::forward<Args>(args)...));
1970  }
1971 
1972  F f;
1973 };
1974 
1975 
1976 // An overload of `undiscardable()` above that takes and returns a
1977 // callable. The returned callable has decorated the provided callable
1978 // `f` such that when the returned callable is invoked it will in turn
1979 // invoke `undiscardable(f(args))` for some `args`. See
1980 // `UndiscardableDecorator` above for more details.
1981 //
1982 // Example:
1983 //
1984 // Future<int> future = foo()
1985 // .then(undiscardable([]() { ...; }));
1986 //
1987 // This guarantees that even if `future` is discarded the discard will
1988 // not propagate into the lambda passed into `.then()`.
1989 template <
1990  typename F,
1991  typename std::enable_if<
1993  int>::type = 0>
1995 {
1996  return UndiscardableDecorator<
1997  typename std::decay<F>::type>(std::forward<F>(f));
1998 }
1999 
2000 } // namespace process {
2001 
2002 #endif // __PROCESS_FUTURE_HPP__
void select(const Future< T > &future, std::shared_ptr< Promise< Future< T >>> promise)
Definition: future.hpp:954
Protocol< RecoverRequest, RecoverResponse > recover
bool isReady() const
Definition: future.hpp:1218
WeakFuture(const Future< T > &future)
Definition: future.hpp:649
std::string strerror(int errno_)
A thread-safe version of strerror.
Definition: strerror.hpp:30
Definition: errorbase.hpp:36
Definition: option.hpp:28
#define ABORT(...)
Definition: abort.hpp:40
F && f
Definition: defer.hpp:270
const Future< T > & onAny(F &&f) const
Definition: future.hpp:383
lambda::CallableOnce< void()> DiscardedCallback
Definition: future.hpp:170
T & get()&
Definition: try.hpp:73
const T & get() const
Definition: future.hpp:1297
bool operator==(const std::string &s, const UPID::ID &id)
Definition: pid.hpp:216
bool set(const T &_t)
Definition: future.hpp:830
T type
Definition: future.hpp:942
Failure(const std::string &_message)
Definition: future.hpp:670
bool pending(int signal)
Definition: signals.hpp:50
Definition: check.hpp:33
Future< T > type
Definition: future.hpp:928
void discarded(Future< T > future)
Definition: future.hpp:771
bool fail(const std::string &message)
Definition: future.hpp:906
Definition: future.hpp:668
void awaited(Owned< Latch > latch)
Definition: future.hpp:1254
const Future< T > & onDiscarded(F &&f) const
Definition: future.hpp:372
ErrnoFailure(const std::string &message)
Definition: future.hpp:684
lambda::CallableOnce< void()> DiscardCallback
Definition: future.hpp:167
static bool cancel(const Timer &timer)
const Future< T > & onDiscard(DiscardCallback &&callback) const
Definition: future.hpp:1357
internal::Partial< typename std::decay< F >::type, typename std::decay< Args >::type... > partial(F &&f, Args &&...args)
Definition: lambda.hpp:364
bool await(const Duration &duration=Seconds(-1)) const
Definition: future.hpp:1263
X type
Definition: future.hpp:949
bool await(const process::Future< T > &future, const Duration &duration)
Definition: gtest.hpp:67
const Future< T > & onFailed(FailedCallback &&callback) const
Definition: future.hpp:1401
Definition: type_utils.hpp:510
const Future< T > & onFailed(F &&f) const
Definition: future.hpp:366
#define CHECK_ERROR(expression)
Definition: check.hpp:58
bool operator!=(const Future< T > &that) const
Definition: future.hpp:1146
void thenf(lambda::CallableOnce< Future< X >(const T &)> &&f, std::unique_ptr< Promise< X >> promise, const Future< T > &future)
Definition: future.hpp:1471
Definition: posix_signalhandler.hpp:23
bool discard()
Definition: future.hpp:1160
UndiscardableDecorator(G &&g)
Definition: future.hpp:1956
void expired(const std::shared_ptr< lambda::CallableOnce< Future< T >(const Future< T > &)>> &f, const std::shared_ptr< Latch > &latch, const std::shared_ptr< Promise< T >> &promise, const std::shared_ptr< Option< Timer >> &timer, const Future< T > &future)
Definition: future.hpp:1524
void failPromises(std::set< Promise< T > * > *promises, const std::string &failure)
Definition: future.hpp:1870
Future< T > recover(_Deferred< F > &&deferred) const
Definition: future.hpp:482
Definition: duration.hpp:32
Definition: check.hpp:30
bool isPending() const
Definition: future.hpp:1211
Definition: future.hpp:67
bool isSome() const
Definition: option.hpp:115
bool operator==(const Future< T > &that) const
Definition: future.hpp:1139
Definition: future.hpp:64
void after(const std::shared_ptr< Latch > &latch, const std::shared_ptr< Promise< T >> &promise, const std::shared_ptr< Option< Timer >> &timer, const Future< T > &future)
Definition: future.hpp:1551
#define CHECK_SOME(expression)
Definition: check.hpp:50
bool isDiscarded() const
Definition: future.hpp:1225
ErrnoFailure()
Definition: future.hpp:679
void discardPromises(std::set< Promise< T > * > *promises)
Definition: future.hpp:1882
const Future< T > & onDiscarded(DiscardedCallback &&callback) const
Definition: future.hpp:1423
const Future< T > & onFailed(_Deferred< F > &&deferred) const
Definition: future.hpp:207
const Future< T > & onDiscard(_Deferred< F > &&deferred) const
Definition: future.hpp:193
F f
Definition: future.hpp:1972
const Future< T > & onAny(AnyCallback &&callback) const
Definition: future.hpp:1445
Definition: deferred.hpp:64
lambda::CallableOnce< void(const T &)> ReadyCallback
Definition: future.hpp:168
Definition: duration.hpp:207
bool associate(const Future< T > &future)
Definition: future.hpp:855
void discard(WeakFuture< T > reference)
Definition: future.hpp:757
Future< Future< T > > select(const std::set< Future< T >> &futures)
Definition: future.hpp:976
Definition: traits.hpp:17
Definition: future.hpp:74
Future< X > then(lambda::CallableOnce< X()> f) const
Definition: future.hpp:405
const Future< T > & onReady(F &&f) const
Definition: future.hpp:360
const Future< T > & onAbandoned(AbandonedCallback &&callback) const
Definition: future.hpp:1335
void repair(lambda::CallableOnce< Future< T >(const Future< T > &)> &&f, std::unique_ptr< Promise< T >> promise, const Future< T > &future)
Definition: future.hpp:1509
bool isSome() const
Definition: try.hpp:70
Failure(const Error &error)
Definition: future.hpp:671
const Future< T > & onDiscard(F &&f) const
Definition: future.hpp:349
Definition: future.hpp:677
const T & get() const &
Definition: option.hpp:118
Protocol< PromiseRequest, PromiseResponse > promise
Future< X > type
Definition: future.hpp:935
Future< T > repair(lambda::CallableOnce< Future< T >(const Future< T > &)> f) const
Definition: future.hpp:1673
Definition: future.hpp:79
void discard(const Futures &futures)
Definition: future.hpp:994
static Try error(const E &e)
Definition: try.hpp:42
Future< R > run(R(*method)())
Definition: run.hpp:55
Future< X > then(lambda::CallableOnce< Future< X >(const T &)> f) const
Definition: future.hpp:1579
Future< T > undiscardable(const Future< T > &future)
Definition: future.hpp:1928
void run(std::vector< C > &&callbacks, Arguments &&...arguments)
Definition: future.hpp:621
lambda::CallableOnce< void()> AbandonedCallback
Definition: future.hpp:166
bool operator<(const Future< T > &that) const
Definition: future.hpp:1153
Future()
Definition: future.hpp:1064
Definition: none.hpp:27
Definition: attributes.hpp:24
bool operator!=(const std::string &s, const UPID::ID &id)
Definition: pid.hpp:222
Future< X > then(lambda::CallableOnce< Future< X >()> f) const
Definition: future.hpp:398
std::string error(const std::string &msg, uint32_t code)
lambda::CallableOnce< void(const std::string &)> FailedCallback
Definition: future.hpp:169
Definition: executor.hpp:48
void then(lambda::CallableOnce< X(const T &)> &&f, std::unique_ptr< Promise< X >> promise, const Future< T > &future)
Definition: future.hpp:1490
const int code
Definition: future.hpp:690
Promise()
Definition: future.hpp:780
Future< T > future() const
Definition: future.hpp:916
ErrnoFailure(int _code, const std::string &message)
Definition: future.hpp:687
const std::string message
Definition: future.hpp:673
auto then(F &&f) const -> decltype(this->then(std::forward< F >(f), Prefer()))
Definition: future.hpp:468
bool discard()
Definition: future.hpp:813
Try< uint32_t > type(const std::string &path)
Future< T > after(const Duration &duration, lambda::CallableOnce< Future< T >(const Future< T > &)> f) const
Definition: future.hpp:1695
const Future< T > & onAbandoned(F &&f) const
Definition: future.hpp:338
bool isAbandoned() const
Definition: future.hpp:1239
ErrnoFailure(int _code)
Definition: future.hpp:681
void discarded(Future< U > future)
const Future< T > & onAny(_Deferred< F > &&deferred) const
Definition: future.hpp:221
static Timer timer(const Duration &duration, const lambda::function< void()> &thunk)
const Future< T > & onDiscarded(_Deferred< F > &&deferred) const
Definition: future.hpp:214
Try< Nothing > bind(int_fd s, const Address &address)
Definition: network.hpp:46
const std::string & failure() const
Definition: future.hpp:1323
Option< Future< T > > get() const
Definition: future.hpp:654
std::string stringify(int flags)
const Future< T > & onReady(_Deferred< F > &&deferred) const
Definition: future.hpp:200
Definition: owned.hpp:36
Definition: future.hpp:1950
Definition: latch.hpp:24
static Future< T > failed(const std::string &message)
Definition: future.hpp:1034
bool hasDiscard() const
Definition: future.hpp:1246
T copy(const T &t)
Definition: utils.hpp:21
const Future< T > & onReady(ReadyCallback &&callback) const
Definition: future.hpp:1379
virtual ~Promise()
Definition: future.hpp:794
const Future< T > & onAbandoned(_Deferred< F > &&deferred) const
Definition: future.hpp:186
Definition: lambda.hpp:414
auto operator()(Args &&...args) -> decltype(std::declval< F & >()(std::forward< Args >(args)...))
Definition: future.hpp:1959
void setPromises(std::set< Promise< T > * > *promises, const T &t)
Definition: future.hpp:1858
const T * operator->() const
Definition: future.hpp:1316
bool isFailed() const
Definition: future.hpp:1232
Definition: future.hpp:58
Future< T > recover(F &&f) const
Definition: future.hpp:1627