Apache Mesos
collect.hpp
Go to the documentation of this file.
1 // Licensed under the Apache License, Version 2.0 (the "License");
2 // you may not use this file except in compliance with the License.
3 // You may obtain a copy of the License at
4 //
5 // http://www.apache.org/licenses/LICENSE-2.0
6 //
7 // Unless required by applicable law or agreed to in writing, software
8 // distributed under the License is distributed on an "AS IS" BASIS,
9 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
10 // See the License for the specific language governing permissions and
11 // limitations under the License
12 
13 #ifndef __PROCESS_COLLECT_HPP__
14 #define __PROCESS_COLLECT_HPP__
15 
16 #include <functional>
17 #include <list>
18 #include <tuple>
19 
20 #include <process/check.hpp>
21 #include <process/defer.hpp>
22 #include <process/future.hpp>
23 #include <process/id.hpp>
24 #include <process/owned.hpp>
25 #include <process/process.hpp>
26 
27 #include <stout/lambda.hpp>
28 
29 // TODO(bmahler): Move these into a futures.hpp header to group Future
30 // related utilities.
31 
32 namespace process {
33 
34 // Waits on each future in the specified list and returns the list of
35 // resulting values in the same order. If any future is discarded then
36 // the result will be a failure. Likewise, if any future fails then
37 // the result future will be a failure.
38 template <typename T>
39 Future<std::list<T>> collect(const std::list<Future<T>>& futures);
40 
41 
42 // Waits on each future specified and returns the wrapping future
43 // typed of a tuple of values.
44 template <typename... Ts>
45 Future<std::tuple<Ts...>> collect(const Future<Ts>&... futures);
46 
47 
48 // Waits on each future in the specified set and returns the list of
49 // non-pending futures.
50 template <typename T>
51 Future<std::list<Future<T>>> await(const std::list<Future<T>>& futures);
52 
53 
54 // Waits on each future specified and returns the wrapping future
55 // typed of a tuple of futures.
56 template <typename... Ts>
57 Future<std::tuple<Future<Ts>...>> await(const Future<Ts>&... futures);
58 
59 
60 // Waits on the future specified and returns after the future has been
61 // completed or the await has been discarded. This is useful when
62 // wanting to "break out" of a future chain if a discard occurs but
63 // the underlying future has not been discarded. For example:
64 //
65 // Future<string> foo()
66 // {
67 // return bar()
68 // .then([](int i) {
69 // return stringify(i);
70 // });
71 // }
72 //
73 // Future<stringify> future = foo();
74 // future.discard();
75 //
76 // In the above code we'll propagate the discard to `bar()` but might
77 // wait forever if `bar()` can't discard their computation. In some
78 // circumstances you might want to break out early and you can do that
79 // by using `await`, because if we discard an `await` that function
80 // will return even though all of the future's it is waiting on have
81 // not been discarded (in other words, the `await` can be properly
82 // discarded even if the underlying futures have not been discarded).
83 //
84 // Future<string> foo()
85 // {
86 // return await(bar())
87 // .recover([](const Future<Future<string>>& future) {
88 // if (future.isDiscarded()) {
89 // cleanup();
90 // }
91 // return Failure("Discarded waiting");
92 // })
93 // .then([](const Future<int>& future) {
94 // return future
95 // .then([](int i) {
96 // return stringify(i);
97 // });
98 // });
99 // }
100 //
101 // Future<string> future = foo();
102 // future.discard();
103 //
104 // Using `await` will enable you to continue execution even if `bar()`
105 // does not (or can not) discard their execution.
106 template <typename T>
108 {
109  return await(std::list<Future<T>>{future})
110  .then([=]() {
111  return Future<Future<T>>(future);
112  });
113 }
114 
115 
116 namespace internal {
117 
118 template <typename T>
119 class CollectProcess : public Process<CollectProcess<T>>
120 {
121 public:
123  const std::list<Future<T>>& _futures,
124  Promise<std::list<T>>* _promise)
125  : ProcessBase(ID::generate("__collect__")),
126  futures(_futures),
127  promise(_promise),
128  ready(0) {}
129 
130  virtual ~CollectProcess()
131  {
132  delete promise;
133  }
134 
135 protected:
136  virtual void initialize()
137  {
138  // Stop this nonsense if nobody cares.
139  promise->future().onDiscard(defer(this, &CollectProcess::discarded));
140 
141  foreach (const Future<T>& future, futures) {
142  future.onAny(defer(this, &CollectProcess::waited, lambda::_1));
143  future.onAbandoned(defer(this, &CollectProcess::abandoned));
144  }
145  }
146 
147 private:
148  void abandoned()
149  {
150  // There is no use waiting because this future will never complete
151  // so terminate this process which will cause `promise` to get
152  // deleted and our future to also be abandoned.
153  terminate(this);
154  }
155 
156  void discarded()
157  {
158  foreach (Future<T> future, futures) {
159  future.discard();
160  }
161 
162  // NOTE: we discard the promise after we set discard on each of
163  // the futures so that there is a happens-before relationship that
164  // can be assumed by callers.
165  promise->discard();
166 
167  terminate(this);
168  }
169 
170  void waited(const Future<T>& future)
171  {
172  if (future.isFailed()) {
173  promise->fail("Collect failed: " + future.failure());
174  terminate(this);
175  } else if (future.isDiscarded()) {
176  promise->fail("Collect failed: future discarded");
177  terminate(this);
178  } else {
179  CHECK_READY(future);
180  ready += 1;
181  if (ready == futures.size()) {
182  std::list<T> values;
183  foreach (const Future<T>& future, futures) {
184  values.push_back(future.get());
185  }
186  promise->set(values);
187  terminate(this);
188  }
189  }
190  }
191 
192  const std::list<Future<T>> futures;
193  Promise<std::list<T>>* promise;
194  size_t ready;
195 };
196 
197 
198 template <typename T>
199 class AwaitProcess : public Process<AwaitProcess<T>>
200 {
201 public:
203  const std::list<Future<T>>& _futures,
204  Promise<std::list<Future<T>>>* _promise)
205  : ProcessBase(ID::generate("__await__")),
206  futures(_futures),
207  promise(_promise),
208  ready(0) {}
209 
210  virtual ~AwaitProcess()
211  {
212  delete promise;
213  }
214 
215  virtual void initialize()
216  {
217  // Stop this nonsense if nobody cares.
218  promise->future().onDiscard(defer(this, &AwaitProcess::discarded));
219 
220  foreach (const Future<T>& future, futures) {
221  future.onAny(defer(this, &AwaitProcess::waited, lambda::_1));
222  future.onAbandoned(defer(this, &AwaitProcess::abandoned));
223  }
224  }
225 
226 private:
227  void abandoned()
228  {
229  // There is no use waiting because this future will never complete
230  // so terminate this process which will cause `promise` to get
231  // deleted and our future to also be abandoned.
232  terminate(this);
233  }
234 
235  void discarded()
236  {
237  foreach (Future<T> future, futures) {
238  future.discard();
239  }
240 
241  // NOTE: we discard the promise after we set discard on each of
242  // the futures so that there is a happens-before relationship that
243  // can be assumed by callers.
244  promise->discard();
245 
246  terminate(this);
247  }
248 
249  void waited(const Future<T>& future)
250  {
251  CHECK(!future.isPending());
252 
253  ready += 1;
254  if (ready == futures.size()) {
255  promise->set(futures);
256  terminate(this);
257  }
258  }
259 
260  const std::list<Future<T>> futures;
261  Promise<std::list<Future<T>>>* promise;
262  size_t ready;
263 };
264 
265 
266 } // namespace internal {
267 
268 
269 template <typename T>
271  const std::list<Future<T>>& futures)
272 {
273  if (futures.empty()) {
274  return std::list<T>();
275  }
276 
278  Future<std::list<T>> future = promise->future();
279  spawn(new internal::CollectProcess<T>(futures, promise), true);
280  return future;
281 }
282 
283 
284 template <typename... Ts>
285 Future<std::tuple<Ts...>> collect(const Future<Ts>&... futures)
286 {
287  std::list<Future<Nothing>> wrappers = {
288  futures.then([]() { return Nothing(); })...
289  };
290 
291  // TODO(klueska): Unfortunately, we have to use a lambda followed
292  // by a bind here because of a bug in gcc 4.8 to handle variadic
293  // parameters in lambdas:
294  // https://gcc.gnu.org/bugzilla/show_bug.cgi?id=47226
295  auto f = [](const Future<Ts>&... futures) {
296  return std::make_tuple(futures.get()...);
297  };
298 
299  return collect(wrappers)
300  .then(std::bind(f, futures...));
301 }
302 
303 
304 template <typename T>
306  const std::list<Future<T>>& futures)
307 {
308  if (futures.empty()) {
309  return futures;
310  }
311 
314  Future<std::list<Future<T>>> future = promise->future();
315  spawn(new internal::AwaitProcess<T>(futures, promise), true);
316  return future;
317 }
318 
319 
320 template <typename... Ts>
322 {
323  std::list<Future<Nothing>> wrappers = {
324  futures.then([]() { return Nothing(); })...
325  };
326 
327  // TODO(klueska): Unfortunately, we have to use a lambda followed
328  // by a bind here because of a bug in gcc 4.8 to handle variadic
329  // parameters in lambdas:
330  // https://gcc.gnu.org/bugzilla/show_bug.cgi?id=47226
331  auto f = [](const Future<Ts>&... futures) {
332  return std::make_tuple(futures...);
333  };
334 
335  return await(wrappers)
336  .then(std::bind(f, futures...));
337 }
338 
339 } // namespace process {
340 
341 #endif // __PROCESS_COLLECT_HPP__
std::string generate(const std::string &prefix="")
Returns &#39;prefix(N)&#39; where N represents the number of instances where the same prefix (wrt...
Definition: nothing.hpp:16
Definition: collect.hpp:119
F && f
Definition: defer.hpp:270
const T & get() const
Definition: future.hpp:1310
bool set(const T &_t)
Definition: future.hpp:826
bool fail(const std::string &message)
Definition: future.hpp:902
virtual ~CollectProcess()
Definition: collect.hpp:130
const Future< T > & onDiscard(DiscardCallback &&callback) const
Definition: future.hpp:1370
Definition: process.hpp:72
bool discard()
Definition: future.hpp:1173
UPID spawn(ProcessBase *process, bool manage=false)
Spawn a new process.
void terminate(const UPID &pid, bool inject=true)
Sends a TerminateEvent to the given process.
virtual void initialize()
Invoked when a process gets spawned.
Definition: collect.hpp:136
const Future< T > & onAny(AnyCallback &&callback) const
Definition: future.hpp:1458
Definition: collect.hpp:199
Definition: future.hpp:73
const Future< T > & onAbandoned(AbandonedCallback &&callback) const
Definition: future.hpp:1348
Protocol< PromiseRequest, PromiseResponse > promise
Try< std::vector< Entry > > list(const std::string &hierarchy, const std::string &cgroup)
#define CHECK_READY(expression)
Definition: check.hpp:29
Future< X > then(lambda::CallableOnce< Future< X >(const T &)> f) const
Definition: future.hpp:1592
Future< std::list< Future< T > > > await(const std::list< Future< T >> &futures)
Definition: collect.hpp:305
AwaitProcess(const std::list< Future< T >> &_futures, Promise< std::list< Future< T >>> *_promise)
Definition: collect.hpp:202
Result< Process > process(pid_t pid)
Definition: freebsd.hpp:30
CollectProcess(const std::list< Future< T >> &_futures, Promise< std::list< T >> *_promise)
Definition: collect.hpp:122
void then(lambda::CallableOnce< X(const T &)> &&f, std::unique_ptr< Promise< X >> promise, const Future< T > &future)
Definition: future.hpp:1503
virtual ~AwaitProcess()
Definition: collect.hpp:210
Future< T > future() const
Definition: future.hpp:912
bool discard()
Definition: future.hpp:809
Try< Nothing > bind(int_fd s, const Address &address)
Definition: network.hpp:46
virtual void initialize()
Invoked when a process gets spawned.
Definition: collect.hpp:215
Definition: process.hpp:493
Deferred< void()> defer(const PID< T > &pid, void(T::*method)())
Definition: defer.hpp:35
Definition: future.hpp:57
Future< std::list< T > > collect(const std::list< Future< T >> &futures)
Definition: collect.hpp:270