13 #ifndef __PROCESS_FUTURE_HPP__ 14 #define __PROCESS_FUTURE_HPP__ 23 #include <type_traits> 27 #include <glog/logging.h> 94 static Future<T> failed(
const std::string& message);
101 template <
typename U>
111 template <
typename E>
114 template <
typename E>
128 bool operator<(const Future<T>& that)
const;
131 bool isPending()
const;
132 bool isReady()
const;
133 bool isDiscarded()
const;
134 bool isFailed()
const;
135 bool isAbandoned()
const;
136 bool hasDiscard()
const;
158 const T&
get()
const;
159 const T* operator->()
const;
162 const std::string& failure()
const;
175 const Future<T>& onAbandoned(AbandonedCallback&& callback)
const;
176 const Future<T>& onDiscard(DiscardCallback&& callback)
const;
177 const Future<T>& onReady(ReadyCallback&& callback)
const;
178 const Future<T>& onFailed(FailedCallback&& callback)
const;
179 const Future<T>& onDiscarded(DiscardedCallback&& callback)
const;
185 template <
typename F>
192 template <
typename F>
199 template <
typename F>
206 template <
typename F>
209 return onFailed(std::move(deferred)
213 template <
typename F>
220 template <
typename F>
223 return onAny(std::move(deferred)
237 struct LessPrefer {};
238 struct Prefer : LessPrefer {};
262 typename =
typename result_of<
typename std::enable_if<
265 const Future<T>& onReady(F&&
f, LessPrefer)
const 277 const Future<T>& onFailed(F&&
f, Prefer)
const 282 std::move(
f)(message);
292 typename =
typename result_of<
typename std::enable_if<
295 const Future<T>& onFailed(F&&
f, LessPrefer)
const 306 template <
typename F,
typename =
typename result_of<F(const Future<T>&)>::type>
312 std::move(
f)(future);
322 typename =
typename result_of<
typename std::enable_if<
325 const Future<T>& onAny(F&&
f, LessPrefer)
const 337 template <
typename F>
345 std::forward<F>(
f))));
348 template <
typename F>
356 std::forward<F>(
f))));
359 template <
typename F>
362 return onReady(std::forward<F>(
f), Prefer());
365 template <
typename F>
368 return onFailed(std::forward<F>(
f), Prefer());
371 template <
typename F>
379 std::forward<F>(
f))));
382 template <
typename F>
385 return onAny(std::forward<F>(
f), Prefer());
391 template <
typename X>
394 template <
typename X>
397 template <
typename X>
404 template <
typename X>
429 typename result_of<
typename std::enable_if<
449 typename result_of<
typename std::enable_if<
467 template <
typename F>
470 -> decltype(this->
then(std::forward<F>(
f), Prefer()))
471 #endif // __WINDOWS__ 473 return then(std::forward<F>(
f), Prefer());
478 template <
typename F>
481 template <
typename F>
516 template <
typename U>
520 template <
typename U>
521 friend std::ostream& operator<<(std::ostream&, const Future<U>&);
536 void clearAllCallbacks();
538 std::atomic_flag lock = ATOMIC_FLAG_INIT;
550 std::vector<AbandonedCallback> onAbandonedCallbacks;
551 std::vector<DiscardCallback> onDiscardCallbacks;
552 std::vector<ReadyCallback> onReadyCallbacks;
553 std::vector<FailedCallback> onFailedCallbacks;
554 std::vector<DiscardedCallback> onDiscardedCallbacks;
555 std::vector<AnyCallback> onAnyCallbacks;
597 bool abandon(
bool propagating =
false);
601 bool set(
const T& _t);
604 template <
typename U>
609 bool fail(
const std::string& _message);
611 std::shared_ptr<Data> data;
620 template <
typename C,
typename... Arguments>
621 void run(std::vector<C>&& callbacks, Arguments&&... arguments)
623 for (
size_t i = 0; i < callbacks.size(); ++i) {
624 std::move(callbacks[i])(std::forward<Arguments>(arguments)...);
633 template <
typename T>
644 std::weak_ptr<typename Future<T>::Data> data;
648 template <
typename T>
650 : data(future.data) {}
653 template <
typename T>
657 future.data = data.lock();
670 explicit Failure(
const std::string& _message) : message(_message) {}
696 template <
typename U>
702 template <
typename T>
719 bool set(
const T& _t);
723 bool fail(
const std::string& message);
729 template <
typename U>
731 template <
typename U>
734 template <
typename U>
749 template <
typename T>
758 template <
typename T>
772 template <
typename T>
781 template <
typename T>
786 f.data->abandoned =
false;
790 template <
typename T>
795 template <
typename T>
809 template <
typename T>
812 if (!f.data->associated) {
819 template <
typename T>
822 return _set(std::move(t));
826 template <
typename T>
833 template <
typename T>
834 template <
typename U>
837 if (!f.data->associated) {
838 return f.set(std::forward<U>(u));
844 template <
typename T>
851 template <
typename T>
854 bool associated =
false;
856 synchronized (f.data->lock) {
862 associated = f.data->associated =
true;
902 template <
typename T>
905 if (!f.data->associated) {
906 return f.fail(message);
912 template <
typename T>
922 template <
typename T>
929 template <
typename X>
936 template <
typename T>
943 template <
typename X>
950 template <
typename T>
956 assert(!
promise->future().isFailed());
958 if (
promise->future().isPending()) {
972 template <
typename T>
977 promise->future().onDiscard(
986 return promise->future();
990 template <
typename Futures>
993 foreach (
auto future, futures) {
999 template <
typename T>
1002 bool result =
false;
1004 synchronized (future.data->lock) {
1020 internal::run(std::move(future.data->onDiscardedCallbacks));
1021 internal::run(std::move(future.data->onAnyCallbacks), future);
1023 future.data->clearAllCallbacks();
1030 template <
typename T>
1034 future.fail(message);
1039 template <
typename T>
1048 template <
typename T>
1051 onAbandonedCallbacks.clear();
1052 onAnyCallbacks.clear();
1053 onDiscardCallbacks.clear();
1054 onDiscardedCallbacks.clear();
1055 onFailedCallbacks.clear();
1056 onReadyCallbacks.clear();
1060 template <
typename T>
1064 data->abandoned =
true;
1068 template <
typename T>
1076 template <
typename T>
1084 template <
typename T>
1085 template <
typename U>
1093 template <
typename T>
1101 template <
typename T>
1109 template <
typename T>
1110 template <
typename E>
1123 template <
typename T>
1124 template <
typename E>
1126 : data(t.isSome() ? t->data :
std::shared_ptr<Data>(new Data()))
1135 template <
typename T>
1138 return data == that.data;
1142 template <
typename T>
1145 return !(*
this == that);
1149 template <
typename T>
1152 return data < that.data;
1156 template <
typename T>
1159 bool result =
false;
1161 std::vector<DiscardCallback> callbacks;
1162 synchronized (data->lock) {
1163 if (!data->discard && data->state == PENDING) {
1164 result = data->discard =
true;
1166 callbacks.swap(data->onDiscardCallbacks);
1181 template <
typename T>
1184 bool result =
false;
1186 std::vector<AbandonedCallback> callbacks;
1187 synchronized (data->lock) {
1188 if (!data->abandoned &&
1189 data->state == PENDING &&
1190 (!data->associated || propagating)) {
1191 result = data->abandoned =
true;
1193 callbacks.swap(data->onAbandonedCallbacks);
1207 template <
typename T>
1210 return data->state == PENDING;
1214 template <
typename T>
1217 return data->state == READY;
1221 template <
typename T>
1224 return data->state == DISCARDED;
1228 template <
typename T>
1231 return data->state == FAILED;
1235 template <
typename T>
1238 return data->abandoned;
1242 template <
typename T>
1245 return data->discard;
1259 template <
typename T>
1278 synchronized (data->lock) {
1279 if (data->state == PENDING) {
1286 return latch->await(duration);
1293 template <
typename T>
1300 CHECK(!
isPending()) <<
"Future was in PENDING after await()";
1303 CHECK(!
isFailed()) <<
"Future::get() but state == FAILED: " <<
failure();
1304 CHECK(!
isDiscarded()) <<
"Future::get() but state == DISCARDED";
1307 assert(data->result.isSome());
1308 return data->result.get();
1312 template <
typename T>
1319 template <
typename T>
1322 if (data->state != FAILED) {
1323 ABORT(
"Future::failure() but state != FAILED");
1327 return data->result.error();
1331 template <
typename T>
1336 synchronized (data->lock) {
1337 if (data->abandoned) {
1339 }
else if (data->state == PENDING) {
1340 data->onAbandonedCallbacks.emplace_back(std::move(callback));
1346 std::move(callback)();
1353 template <
typename T>
1358 synchronized (data->lock) {
1359 if (data->discard) {
1361 }
else if (data->state == PENDING) {
1362 data->onDiscardCallbacks.emplace_back(std::move(callback));
1368 std::move(callback)();
1375 template <
typename T>
1380 synchronized (data->lock) {
1381 if (data->state == READY) {
1383 }
else if (data->state == PENDING) {
1384 data->onReadyCallbacks.emplace_back(std::move(callback));
1390 std::move(callback)(data->result.get());
1397 template <
typename T>
1402 synchronized (data->lock) {
1403 if (data->state == FAILED) {
1405 }
else if (data->state == PENDING) {
1406 data->onFailedCallbacks.emplace_back(std::move(callback));
1412 std::move(callback)(data->result.error());
1419 template <
typename T>
1424 synchronized (data->lock) {
1425 if (data->state == DISCARDED) {
1427 }
else if (data->state == PENDING) {
1428 data->onDiscardedCallbacks.emplace_back(std::move(callback));
1434 std::move(callback)();
1441 template <
typename T>
1446 synchronized (data->lock) {
1447 if (data->state == PENDING) {
1448 data->onAnyCallbacks.emplace_back(std::move(callback));
1456 std::move(callback)(*this);
1467 template <
typename T,
typename X>
1486 template <
typename T,
typename X>
1505 template <
typename T>
1513 promise->associate(std::move(
f)(future));
1520 template <
typename T>
1523 const std::shared_ptr<Latch>& latch,
1528 if (latch->trigger()) {
1542 promise->associate(std::move(*
f)(future));
1547 template <
typename T>
1549 const std::shared_ptr<Latch>& latch,
1555 if (latch->trigger()) {
1574 template <
typename T>
1575 template <
typename X>
1582 &internal::thenf<T, X>, std::move(
f), std::move(promise), lambda::_1);
1598 template <
typename T>
1599 template <
typename X>
1606 &internal::then<T, X>, std::move(
f), std::move(promise), lambda::_1);
1608 onAny(std::move(then));
1622 template <
typename T>
1623 template <
typename F>
1630 typedef decltype(std::move(
f)(future)) R;
1632 std::shared_ptr<lambda::CallableOnce<R(const Future<T>&)>> callable(
1642 synchronized (promise->f.data->lock) {
1643 promise->f.data->discard =
false;
1646 promise->set(std::move(*callable)(future));
1648 promise->associate(future);
1654 synchronized (promise->f.data->lock) {
1655 promise->f.data->discard =
false;
1657 promise->set(std::move(*callable)(future));
1662 promise->future().onDiscard(
1665 return promise->future();
1669 template <
typename T>
1677 &internal::repair<T>, std::move(
f), std::move(promise), lambda::_1));
1691 template <
typename T>
1699 std::shared_ptr<Latch> latch(
new Latch());
1718 std::shared_ptr<F> callable(
new F(std::move(
f)));
1732 lambda::bind(&internal::expired<T>, callable, latch, promise, timer,
1738 promise->future().abandon();
1743 promise->future().onDiscard(
1746 return promise->future();
1750 template <
typename T>
1753 return _set(std::move(t));
1757 template <
typename T>
1764 template <
typename T>
1765 template <
typename U>
1768 bool result =
false;
1770 synchronized (data->lock) {
1771 if (data->state == PENDING) {
1772 data->result = std::forward<U>(u);
1773 data->state = READY;
1784 std::shared_ptr<typename Future<T>::Data>
copy = data;
1785 internal::run(std::move(copy->onReadyCallbacks), copy->result.get());
1788 copy->clearAllCallbacks();
1795 template <
typename T>
1798 bool result =
false;
1800 synchronized (data->lock) {
1801 if (data->state == PENDING) {
1803 data->state = FAILED;
1814 std::shared_ptr<typename Future<T>::Data>
copy = data;
1815 internal::run(std::move(copy->onFailedCallbacks), copy->result.error());
1818 copy->clearAllCallbacks();
1825 template <
typename T>
1826 std::ostream& operator<<(std::ostream& stream, const Future<T>& future)
1828 const std::string suffix = future.data->discard ?
" (with discard)" :
"";
1830 switch (future.data->state) {
1832 if (future.data->abandoned) {
1833 return stream <<
"Abandoned" << suffix;
1835 return stream <<
"Pending" << suffix;
1840 return stream <<
"Ready" << suffix;
1843 return stream <<
"Failed" << suffix <<
": " << future.
failure();
1846 return stream <<
"Discarded" << suffix;
1854 template <
typename T>
1866 template <
typename T>
1870 promise->
fail(failure);
1878 template <
typename T>
1890 template <
typename T>
1894 if (promise->
future() == future) {
1896 promises->erase(promise);
1924 template <
typename T>
1931 promise->associate(future);
1946 template <
typename F>
1951 typename std::enable_if<
1952 std::is_constructible<F, G>::value,
int>
::type = 0>
1955 template <
typename... Args>
1957 -> decltype(std::declval<F&>()(std::forward<Args>(args)...))
1960 typename std::decay<decltype(f(std::forward<Args>(args)...))>::
type;
1964 "Expecting Future<T> to be returned from undiscarded(...)");
1988 typename std::enable_if<
1999 #endif // __PROCESS_FUTURE_HPP__ void select(const Future< T > &future, std::shared_ptr< Promise< Future< T >>> promise)
Definition: future.hpp:951
Protocol< RecoverRequest, RecoverResponse > recover
bool isReady() const
Definition: future.hpp:1215
WeakFuture(const Future< T > &future)
Definition: future.hpp:649
std::string strerror(int errno_)
A thread-safe version of strerror.
Definition: strerror.hpp:30
Definition: errorbase.hpp:36
Definition: option.hpp:29
#define ABORT(...)
Definition: abort.hpp:40
F && f
Definition: defer.hpp:270
const Future< T > & onAny(F &&f) const
Definition: future.hpp:383
lambda::CallableOnce< void()> DiscardedCallback
Definition: future.hpp:170
T & get()&
Definition: try.hpp:80
const T & get() const
Definition: future.hpp:1294
bool operator==(const std::string &s, const UPID::ID &id)
Definition: pid.hpp:226
bool set(const T &_t)
Definition: future.hpp:827
T type
Definition: future.hpp:939
Failure(const std::string &_message)
Definition: future.hpp:670
bool pending(int signal)
Definition: signals.hpp:50
Future< T > type
Definition: future.hpp:925
void discarded(Future< T > future)
Definition: future.hpp:773
bool fail(const std::string &message)
Definition: future.hpp:903
Definition: future.hpp:668
void awaited(Owned< Latch > latch)
Definition: future.hpp:1251
const Future< T > & onDiscarded(F &&f) const
Definition: future.hpp:372
ErrnoFailure(const std::string &message)
Definition: future.hpp:684
lambda::CallableOnce< void()> DiscardCallback
Definition: future.hpp:167
static bool cancel(const Timer &timer)
const Future< T > & onDiscard(DiscardCallback &&callback) const
Definition: future.hpp:1354
internal::Partial< typename std::decay< F >::type, typename std::decay< Args >::type... > partial(F &&f, Args &&...args)
Definition: lambda.hpp:364
bool await(const Duration &duration=Seconds(-1)) const
Definition: future.hpp:1260
X type
Definition: future.hpp:946
bool await(const process::Future< T > &future, const Duration &duration)
Definition: gtest.hpp:67
const Future< T > & onFailed(FailedCallback &&callback) const
Definition: future.hpp:1398
Definition: type_utils.hpp:619
const Future< T > & onFailed(F &&f) const
Definition: future.hpp:366
#define CHECK_ERROR(expression)
Definition: check.hpp:58
bool operator!=(const Future< T > &that) const
Definition: future.hpp:1143
void thenf(lambda::CallableOnce< Future< X >(const T &)> &&f, std::unique_ptr< Promise< X >> promise, const Future< T > &future)
Definition: future.hpp:1468
Definition: posix_signalhandler.hpp:23
bool discard()
Definition: future.hpp:1157
UndiscardableDecorator(G &&g)
Definition: future.hpp:1953
void expired(const std::shared_ptr< lambda::CallableOnce< Future< T >(const Future< T > &)>> &f, const std::shared_ptr< Latch > &latch, const std::shared_ptr< Promise< T >> &promise, const std::shared_ptr< Option< Timer >> &timer, const Future< T > &future)
Definition: future.hpp:1521
void failPromises(std::set< Promise< T > * > *promises, const std::string &failure)
Definition: future.hpp:1867
Future< T > recover(_Deferred< F > &&deferred) const
Definition: future.hpp:482
Definition: duration.hpp:32
bool isPending() const
Definition: future.hpp:1208
Definition: future.hpp:67
bool isSome() const
Definition: option.hpp:116
bool operator==(const Future< T > &that) const
Definition: future.hpp:1136
Try< bool > set(const std::string &_link, unsigned int flags)
Definition: internal.hpp:125
Definition: future.hpp:64
void after(const std::shared_ptr< Latch > &latch, const std::shared_ptr< Promise< T >> &promise, const std::shared_ptr< Option< Timer >> &timer, const Future< T > &future)
Definition: future.hpp:1548
#define CHECK_SOME(expression)
Definition: check.hpp:50
bool isDiscarded() const
Definition: future.hpp:1222
ErrnoFailure()
Definition: future.hpp:679
void discardPromises(std::set< Promise< T > * > *promises)
Definition: future.hpp:1879
const Future< T > & onDiscarded(DiscardedCallback &&callback) const
Definition: future.hpp:1420
const Future< T > & onFailed(_Deferred< F > &&deferred) const
Definition: future.hpp:207
const Future< T > & onDiscard(_Deferred< F > &&deferred) const
Definition: future.hpp:193
F f
Definition: future.hpp:1969
const Future< T > & onAny(AnyCallback &&callback) const
Definition: future.hpp:1442
Definition: deferred.hpp:64
lambda::CallableOnce< void(const T &)> ReadyCallback
Definition: future.hpp:168
Definition: duration.hpp:207
bool associate(const Future< T > &future)
Definition: future.hpp:852
void discard(WeakFuture< T > reference)
Definition: future.hpp:759
Future< Future< T > > select(const std::set< Future< T >> &futures)
Definition: future.hpp:973
Definition: traits.hpp:17
Definition: future.hpp:74
Future< X > then(lambda::CallableOnce< X()> f) const
Definition: future.hpp:405
const Future< T > & onReady(F &&f) const
Definition: future.hpp:360
const Future< T > & onAbandoned(AbandonedCallback &&callback) const
Definition: future.hpp:1332
void repair(lambda::CallableOnce< Future< T >(const Future< T > &)> &&f, std::unique_ptr< Promise< T >> promise, const Future< T > &future)
Definition: future.hpp:1506
bool isSome() const
Definition: try.hpp:77
Failure(const Error &error)
Definition: future.hpp:671
const Future< T > & onDiscard(F &&f) const
Definition: future.hpp:349
Definition: future.hpp:677
const T & get() const &
Definition: option.hpp:119
Protocol< PromiseRequest, PromiseResponse > promise
Future< X > type
Definition: future.hpp:932
Future< T > repair(lambda::CallableOnce< Future< T >(const Future< T > &)> f) const
Definition: future.hpp:1670
Definition: future.hpp:79
void discard(const Futures &futures)
Definition: future.hpp:991
static Try error(const E &e)
Definition: try.hpp:43
Future< R > run(R(*method)())
Definition: run.hpp:55
Future< X > then(lambda::CallableOnce< Future< X >(const T &)> f) const
Definition: future.hpp:1576
Future< T > undiscardable(const Future< T > &future)
Definition: future.hpp:1925
void run(std::vector< C > &&callbacks, Arguments &&...arguments)
Definition: future.hpp:621
lambda::CallableOnce< void()> AbandonedCallback
Definition: future.hpp:166
bool operator<(const Future< T > &that) const
Definition: future.hpp:1150
Future()
Definition: future.hpp:1061
Definition: attributes.hpp:24
bool operator!=(const std::string &s, const UPID::ID &id)
Definition: pid.hpp:232
Future< X > then(lambda::CallableOnce< Future< X >()> f) const
Definition: future.hpp:398
std::string error(const std::string &msg, uint32_t code)
lambda::CallableOnce< void(const std::string &)> FailedCallback
Definition: future.hpp:169
Definition: executor.hpp:48
void then(lambda::CallableOnce< X(const T &)> &&f, std::unique_ptr< Promise< X >> promise, const Future< T > &future)
Definition: future.hpp:1487
const int code
Definition: future.hpp:690
Promise()
Definition: future.hpp:782
Future< T > future() const
Definition: future.hpp:913
ErrnoFailure(int _code, const std::string &message)
Definition: future.hpp:687
const std::string message
Definition: future.hpp:673
auto then(F &&f) const -> decltype(this->then(std::forward< F >(f), Prefer()))
Definition: future.hpp:468
bool discard()
Definition: future.hpp:810
Try< uint32_t > type(const std::string &path)
Future< T > after(const Duration &duration, lambda::CallableOnce< Future< T >(const Future< T > &)> f) const
Definition: future.hpp:1692
const Future< T > & onAbandoned(F &&f) const
Definition: future.hpp:338
bool isAbandoned() const
Definition: future.hpp:1236
ErrnoFailure(int _code)
Definition: future.hpp:681
void discarded(Future< U > future)
const Future< T > & onAny(_Deferred< F > &&deferred) const
Definition: future.hpp:221
static Timer timer(const Duration &duration, const lambda::function< void()> &thunk)
const Future< T > & onDiscarded(_Deferred< F > &&deferred) const
Definition: future.hpp:214
Try< Nothing > bind(int_fd s, const Address &address)
Definition: network.hpp:46
const std::string & failure() const
Definition: future.hpp:1320
Option< Future< T > > get() const
Definition: future.hpp:654
std::string stringify(int flags)
const Future< T > & onReady(_Deferred< F > &&deferred) const
Definition: future.hpp:200
Definition: future.hpp:1947
static Future< T > failed(const std::string &message)
Definition: future.hpp:1031
bool hasDiscard() const
Definition: future.hpp:1243
T copy(const T &t)
Definition: utils.hpp:21
const Future< T > & onReady(ReadyCallback &&callback) const
Definition: future.hpp:1376
virtual ~Promise()
Definition: future.hpp:796
const Future< T > & onAbandoned(_Deferred< F > &&deferred) const
Definition: future.hpp:186
Definition: lambda.hpp:414
auto operator()(Args &&...args) -> decltype(std::declval< F & >()(std::forward< Args >(args)...))
Definition: future.hpp:1956
void setPromises(std::set< Promise< T > * > *promises, const T &t)
Definition: future.hpp:1855
const T * operator->() const
Definition: future.hpp:1313
bool isFailed() const
Definition: future.hpp:1229
Definition: future.hpp:58
Future< T > recover(F &&f) const
Definition: future.hpp:1624