Apache Mesos
collect.hpp
Go to the documentation of this file.
1 // Licensed under the Apache License, Version 2.0 (the "License");
2 // you may not use this file except in compliance with the License.
3 // You may obtain a copy of the License at
4 //
5 // http://www.apache.org/licenses/LICENSE-2.0
6 //
7 // Unless required by applicable law or agreed to in writing, software
8 // distributed under the License is distributed on an "AS IS" BASIS,
9 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
10 // See the License for the specific language governing permissions and
11 // limitations under the License
12 
13 #ifndef __PROCESS_COLLECT_HPP__
14 #define __PROCESS_COLLECT_HPP__
15 
16 #include <functional>
17 #include <tuple>
18 #include <vector>
19 
20 #include <process/check.hpp>
21 #include <process/defer.hpp>
22 #include <process/future.hpp>
23 #include <process/id.hpp>
24 #include <process/owned.hpp>
25 #include <process/process.hpp>
26 
27 #include <stout/lambda.hpp>
28 
29 // TODO(bmahler): Move these into a futures.hpp header to group Future
30 // related utilities.
31 
32 namespace process {
33 
34 // Waits on each future in the specified list and returns the list of
35 // resulting values in the same order. If any future is discarded then
36 // the result will be a failure. Likewise, if any future fails then
37 // the result future will be a failure.
38 template <typename T>
39 Future<std::vector<T>> collect(const std::vector<Future<T>>& futures);
40 
41 
42 // Waits on each future specified and returns the wrapping future
43 // typed of a tuple of values.
44 template <typename... Ts>
45 Future<std::tuple<Ts...>> collect(const Future<Ts>&... futures);
46 
47 
48 // Waits on each future in the specified set and returns the list of
49 // non-pending futures.
50 template <typename T>
51 Future<std::vector<Future<T>>> await(const std::vector<Future<T>>& futures);
52 
53 
54 // Waits on each future specified and returns the wrapping future
55 // typed of a tuple of futures.
56 template <typename... Ts>
57 Future<std::tuple<Future<Ts>...>> await(const Future<Ts>&... futures);
58 
59 
60 // Waits on the future specified and returns after the future has been
61 // completed or the await has been discarded. This is useful when
62 // wanting to "break out" of a future chain if a discard occurs but
63 // the underlying future has not been discarded. For example:
64 //
65 // Future<string> foo()
66 // {
67 // return bar()
68 // .then([](int i) {
69 // return stringify(i);
70 // });
71 // }
72 //
73 // Future<stringify> future = foo();
74 // future.discard();
75 //
76 // In the above code we'll propagate the discard to `bar()` but might
77 // wait forever if `bar()` can't discard their computation. In some
78 // circumstances you might want to break out early and you can do that
79 // by using `await`, because if we discard an `await` that function
80 // will return even though all of the future's it is waiting on have
81 // not been discarded (in other words, the `await` can be properly
82 // discarded even if the underlying futures have not been discarded).
83 //
84 // Future<string> foo()
85 // {
86 // return await(bar())
87 // .recover([](const Future<Future<string>>& future) {
88 // if (future.isDiscarded()) {
89 // cleanup();
90 // }
91 // return Failure("Discarded waiting");
92 // })
93 // .then([](const Future<int>& future) {
94 // return future
95 // .then([](int i) {
96 // return stringify(i);
97 // });
98 // });
99 // }
100 //
101 // Future<string> future = foo();
102 // future.discard();
103 //
104 // Using `await` will enable you to continue execution even if `bar()`
105 // does not (or can not) discard their execution.
106 template <typename T>
108 {
109  return await(std::vector<Future<T>>{future})
110  .then([=]() {
111  return Future<Future<T>>(future);
112  });
113 }
114 
115 
116 namespace internal {
117 
118 template <typename T>
119 class CollectProcess : public Process<CollectProcess<T>>
120 {
121 public:
123  const std::vector<Future<T>>& _futures,
124  Promise<std::vector<T>>* _promise)
125  : ProcessBase(ID::generate("__collect__")),
126  futures(_futures),
127  promise(_promise),
128  ready(0) {}
129 
130  ~CollectProcess() override
131  {
132  delete promise;
133  }
134 
135 protected:
136  void initialize() override
137  {
138  // Stop this nonsense if nobody cares.
139  promise->future().onDiscard(defer(this, &CollectProcess::discarded));
140 
141  foreach (const Future<T>& future, futures) {
142  future.onAny(defer(this, &CollectProcess::waited, lambda::_1));
143  future.onAbandoned(defer(this, &CollectProcess::abandoned));
144  }
145  }
146 
147 private:
148  void abandoned()
149  {
150  // There is no use waiting because this future will never complete
151  // so terminate this process which will cause `promise` to get
152  // deleted and our future to also be abandoned.
153  terminate(this);
154  }
155 
156  void discarded()
157  {
158  foreach (Future<T> future, futures) {
159  future.discard();
160  }
161 
162  // NOTE: we discard the promise after we set discard on each of
163  // the futures so that there is a happens-before relationship that
164  // can be assumed by callers.
165  promise->discard();
166 
167  terminate(this);
168  }
169 
170  void waited(const Future<T>& future)
171  {
172  if (future.isFailed()) {
173  promise->fail("Collect failed: " + future.failure());
174  terminate(this);
175  } else if (future.isDiscarded()) {
176  promise->fail("Collect failed: future discarded");
177  terminate(this);
178  } else {
179  CHECK_READY(future);
180  ready += 1;
181  if (ready == futures.size()) {
182  std::vector<T> values;
183  values.reserve(futures.size());
184 
185  foreach (const Future<T>& future, futures) {
186  values.push_back(future.get());
187  }
188 
189  promise->set(std::move(values));
190  terminate(this);
191  }
192  }
193  }
194 
195  const std::vector<Future<T>> futures;
196  Promise<std::vector<T>>* promise;
197  size_t ready;
198 };
199 
200 
201 template <typename T>
202 class AwaitProcess : public Process<AwaitProcess<T>>
203 {
204 public:
206  const std::vector<Future<T>>& _futures,
207  Promise<std::vector<Future<T>>>* _promise)
208  : ProcessBase(ID::generate("__await__")),
209  futures(_futures),
210  promise(_promise),
211  ready(0) {}
212 
213  ~AwaitProcess() override
214  {
215  delete promise;
216  }
217 
218  void initialize() override
219  {
220  // Stop this nonsense if nobody cares.
221  promise->future().onDiscard(defer(this, &AwaitProcess::discarded));
222 
223  foreach (const Future<T>& future, futures) {
224  future.onAny(defer(this, &AwaitProcess::waited, lambda::_1));
225  future.onAbandoned(defer(this, &AwaitProcess::abandoned));
226  }
227  }
228 
229 private:
230  void abandoned()
231  {
232  // There is no use waiting because this future will never complete
233  // so terminate this process which will cause `promise` to get
234  // deleted and our future to also be abandoned.
235  terminate(this);
236  }
237 
238  void discarded()
239  {
240  foreach (Future<T> future, futures) {
241  future.discard();
242  }
243 
244  // NOTE: we discard the promise after we set discard on each of
245  // the futures so that there is a happens-before relationship that
246  // can be assumed by callers.
247  promise->discard();
248 
249  terminate(this);
250  }
251 
252  void waited(const Future<T>& future)
253  {
254  CHECK(!future.isPending());
255 
256  ready += 1;
257  if (ready == futures.size()) {
258  // It is safe to move futures at this point.
259  promise->set(std::move(futures));
260  terminate(this);
261  }
262  }
263 
264  std::vector<Future<T>> futures;
266  size_t ready;
267 };
268 
269 
270 } // namespace internal {
271 
272 
273 template <typename T>
275  const std::vector<Future<T>>& futures)
276 {
277  if (futures.empty()) {
278  return std::vector<T>();
279  }
280 
282  Future<std::vector<T>> future = promise->future();
283  spawn(new internal::CollectProcess<T>(futures, promise), true);
284  return future;
285 }
286 
287 
288 template <typename... Ts>
289 Future<std::tuple<Ts...>> collect(const Future<Ts>&... futures)
290 {
291  std::vector<Future<Nothing>> wrappers = {
292  futures.then([]() { return Nothing(); })...
293  };
294 
295  // TODO(klueska): Unfortunately, we have to use a lambda followed
296  // by a bind here because of a bug in gcc 4.8 to handle variadic
297  // parameters in lambdas:
298  // https://gcc.gnu.org/bugzilla/show_bug.cgi?id=47226
299  auto f = [](const Future<Ts>&... futures) {
300  return std::make_tuple(futures.get()...);
301  };
302 
303  return collect(wrappers)
304  .then(std::bind(f, futures...));
305 }
306 
307 
308 template <typename T>
310  const std::vector<Future<T>>& futures)
311 {
312  if (futures.empty()) {
313  return futures;
314  }
315 
316  Promise<std::vector<Future<T>>>* promise =
318  Future<std::vector<Future<T>>> future = promise->future();
319  spawn(new internal::AwaitProcess<T>(futures, promise), true);
320  return future;
321 }
322 
323 
324 template <typename... Ts>
326 {
327  std::vector<Future<Nothing>> wrappers = {
328  futures.then([]() { return Nothing(); })...
329  };
330 
331  // TODO(klueska): Unfortunately, we have to use a lambda followed
332  // by a bind here because of a bug in gcc 4.8 to handle variadic
333  // parameters in lambdas:
334  // https://gcc.gnu.org/bugzilla/show_bug.cgi?id=47226
335  auto f = [](const Future<Ts>&... futures) {
336  return std::make_tuple(futures...);
337  };
338 
339  return await(wrappers)
340  .then(std::bind(f, futures...));
341 }
342 
343 } // namespace process {
344 
345 #endif // __PROCESS_COLLECT_HPP__
std::string generate(const std::string &prefix="")
Returns &#39;prefix(N)&#39; where N represents the number of instances where the same prefix (wrt...
Definition: nothing.hpp:16
Definition: collect.hpp:119
~AwaitProcess() override
Definition: collect.hpp:213
F && f
Definition: defer.hpp:270
const T & get() const
Definition: future.hpp:1294
bool set(const T &_t)
Definition: future.hpp:827
bool fail(const std::string &message)
Definition: future.hpp:903
const Future< T > & onDiscard(DiscardCallback &&callback) const
Definition: future.hpp:1354
AwaitProcess(const std::vector< Future< T >> &_futures, Promise< std::vector< Future< T >>> *_promise)
Definition: collect.hpp:205
Future< std::vector< Future< T > > > await(const std::vector< Future< T >> &futures)
Definition: collect.hpp:309
bool await(const process::Future< T > &future, const Duration &duration)
Definition: gtest.hpp:67
Definition: process.hpp:72
bool discard()
Definition: future.hpp:1157
UPID spawn(ProcessBase *process, bool manage=false)
Spawn a new process.
void terminate(const UPID &pid, bool inject=true)
Sends a TerminateEvent to the given process.
bool isPending() const
Definition: future.hpp:1208
Future< std::vector< T > > collect(const std::vector< Future< T >> &futures)
Definition: collect.hpp:274
bool isDiscarded() const
Definition: future.hpp:1222
void initialize() override
Invoked when a process gets spawned.
Definition: collect.hpp:136
const Future< T > & onAny(AnyCallback &&callback) const
Definition: future.hpp:1442
CollectProcess(const std::vector< Future< T >> &_futures, Promise< std::vector< T >> *_promise)
Definition: collect.hpp:122
~CollectProcess() override
Definition: collect.hpp:130
Definition: collect.hpp:202
Definition: future.hpp:74
const Future< T > & onAbandoned(AbandonedCallback &&callback) const
Definition: future.hpp:1332
#define CHECK_READY(expression)
Definition: check.hpp:29
Future< X > then(lambda::CallableOnce< Future< X >(const T &)> f) const
Definition: future.hpp:1576
Definition: attributes.hpp:24
Definition: executor.hpp:48
void then(lambda::CallableOnce< X(const T &)> &&f, std::unique_ptr< Promise< X >> promise, const Future< T > &future)
Definition: future.hpp:1487
Future< T > future() const
Definition: future.hpp:913
void initialize() override
Invoked when a process gets spawned.
Definition: collect.hpp:218
bool discard()
Definition: future.hpp:810
void discarded(Future< U > future)
Try< Nothing > bind(int_fd s, const Address &address)
Definition: network.hpp:46
const std::string & failure() const
Definition: future.hpp:1320
Definition: process.hpp:505
Deferred< void()> defer(const PID< T > &pid, void(T::*method)())
Definition: defer.hpp:35
bool isFailed() const
Definition: future.hpp:1229
Definition: future.hpp:58