Apache Mesos
dispatch.hpp
Go to the documentation of this file.
1 // Licensed under the Apache License, Version 2.0 (the "License");
2 // you may not use this file except in compliance with the License.
3 // You may obtain a copy of the License at
4 //
5 // http://www.apache.org/licenses/LICENSE-2.0
6 //
7 // Unless required by applicable law or agreed to in writing, software
8 // distributed under the License is distributed on an "AS IS" BASIS,
9 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
10 // See the License for the specific language governing permissions and
11 // limitations under the License
12 
13 #ifndef __PROCESS_DISPATCH_HPP__
14 #define __PROCESS_DISPATCH_HPP__
15 
16 #include <functional>
17 #include <memory>
18 #include <string>
19 
20 #include <process/process.hpp>
21 
22 #include <stout/lambda.hpp>
23 #include <stout/preprocessor.hpp>
24 #include <stout/result_of.hpp>
25 
26 namespace process {
27 
28 // The dispatch mechanism enables you to "schedule" a method to get
29 // invoked on a process. The result of that method invocation is
30 // accessible via the future that is returned by the dispatch method
31 // (note, however, that it might not be the _same_ future as the one
32 // returned from the method, if the method even returns a future, see
33 // below). Assuming some class 'Fibonacci' has a (visible) method
34 // named 'compute' that takes an integer, N (and returns the Nth
35 // fibonacci number) you might use dispatch like so:
36 //
37 // PID<Fibonacci> pid = spawn(new Fibonacci(), true); // Use the GC.
38 // Future<int> f = dispatch(pid, &Fibonacci::compute, 10);
39 //
40 // Because the pid argument is "typed" we can ensure that methods are
41 // only invoked on processes that are actually of that type. Providing
42 // this mechanism for varying numbers of function types and arguments
43 // requires support for variadic templates, slated to be released in
44 // C++11. Until then, we use the Boost preprocessor macros to
45 // accomplish the same thing (albeit less cleanly). See below for
46 // those definitions.
47 //
48 // Dispatching is done via a level of indirection. The dispatch
49 // routine itself creates a promise that is passed as an argument to a
50 // partially applied 'dispatcher' function (defined below). The
51 // dispatcher routines get passed to the actual process via an
52 // internal routine called, not surprisingly, 'dispatch', defined
53 // below:
54 
55 namespace internal {
56 
57 // The internal dispatch routine schedules a function to get invoked
58 // within the context of the process associated with the specified pid
59 // (first argument), unless that process is no longer valid. Note that
60 // this routine does not expect anything in particular about the
61 // specified function (second argument). The semantics are simple: the
62 // function gets applied/invoked with the process as its first
63 // argument.
64 void dispatch(
65  const UPID& pid,
66  std::unique_ptr<lambda::CallableOnce<void(ProcessBase*)>> f,
67  const Option<const std::type_info*>& functionType = None());
68 
69 
70 // NOTE: This struct is used by the public `dispatch(const UPID& pid, F&& f)`
71 // function. See comments there for the reason why we need this.
72 template <typename R>
73 struct Dispatch;
74 
75 
76 // Partial specialization for callable objects returning `void` to be dispatched
77 // on a process.
78 // NOTE: This struct is used by the public `dispatch(const UPID& pid, F&& f)`
79 // function. See comments there for the reason why we need this.
80 template <>
81 struct Dispatch<void>
82 {
83  template <typename F>
84  void operator()(const UPID& pid, F&& f)
85  {
86  std::unique_ptr<lambda::CallableOnce<void(ProcessBase*)>> f_(
89  [](typename std::decay<F>::type&& f, ProcessBase*) {
90  std::move(f)();
91  },
92  std::forward<F>(f),
93  lambda::_1)));
94 
95  internal::dispatch(pid, std::move(f_));
96  }
97 };
98 
99 
100 // Partial specialization for callable objects returning `Future<R>` to be
101 // dispatched on a process.
102 // NOTE: This struct is used by the public `dispatch(const UPID& pid, F&& f)`
103 // function. See comments there for the reason why we need this.
104 template <typename R>
105 struct Dispatch<Future<R>>
106 {
107  template <typename F>
108  Future<R> operator()(const UPID& pid, F&& f)
109  {
110  std::unique_ptr<Promise<R>> promise(new Promise<R>());
111  Future<R> future = promise->future();
112 
113  std::unique_ptr<lambda::CallableOnce<void(ProcessBase*)>> f_(
114  new lambda::CallableOnce<void(ProcessBase*)>(
116  [](std::unique_ptr<Promise<R>> promise,
117  typename std::decay<F>::type&& f,
118  ProcessBase*) {
119  promise->associate(std::move(f)());
120  },
121  std::move(promise),
122  std::forward<F>(f),
123  lambda::_1)));
124 
125  internal::dispatch(pid, std::move(f_));
126 
127  return future;
128  }
129 };
130 
131 
132 // Dispatches a callable object returning `R` on a process.
133 // NOTE: This struct is used by the public `dispatch(const UPID& pid, F&& f)`
134 // function. See comments there for the reason why we need this.
135 template <typename R>
136 struct Dispatch
137 {
138  template <typename F>
139  Future<R> operator()(const UPID& pid, F&& f)
140  {
141  std::unique_ptr<Promise<R>> promise(new Promise<R>());
142  Future<R> future = promise->future();
143 
144  std::unique_ptr<lambda::CallableOnce<void(ProcessBase*)>> f_(
145  new lambda::CallableOnce<void(ProcessBase*)>(
147  [](std::unique_ptr<Promise<R>> promise,
148  typename std::decay<F>::type&& f,
149  ProcessBase*) {
150  promise->set(std::move(f)());
151  },
152  std::move(promise),
153  std::forward<F>(f),
154  lambda::_1)));
155 
156  internal::dispatch(pid, std::move(f_));
157 
158  return future;
159  }
160 };
161 
162 } // namespace internal {
163 
164 
165 // Okay, now for the definition of the dispatch routines
166 // themselves. For each routine we provide the version in C++11 using
167 // variadic templates so the reader can see what the Boost
168 // preprocessor macros are effectively providing. Using C++11 closures
169 // would shorten these definitions even more.
170 //
171 // First, definitions of dispatch for methods returning void:
172 
173 template <typename T>
174 void dispatch(const PID<T>& pid, void (T::*method)())
175 {
176  std::unique_ptr<lambda::CallableOnce<void(ProcessBase*)>> f(
177  new lambda::CallableOnce<void(ProcessBase*)>(
178  [=](ProcessBase* process) {
179  assert(process != nullptr);
180  T* t = dynamic_cast<T*>(process);
181  assert(t != nullptr);
182  (t->*method)();
183  }));
184 
185  internal::dispatch(pid, std::move(f), &typeid(method));
186 }
187 
188 template <typename T>
189 void dispatch(const Process<T>& process, void (T::*method)())
190 {
191  dispatch(process.self(), method);
192 }
193 
194 template <typename T>
195 void dispatch(const Process<T>* process, void (T::*method)())
196 {
197  dispatch(process->self(), method);
198 }
199 
200 // Due to a bug (http://gcc.gnu.org/bugzilla/show_bug.cgi?id=41933)
201 // with variadic templates and lambdas, we still need to do
202 // preprocessor expansions.
203 
204 // The following assumes base names for type and variable are `A` and `a`.
205 #define FORWARD(Z, N, DATA) std::forward<A ## N>(a ## N)
206 #define MOVE(Z, N, DATA) std::move(a ## N)
207 #define DECL(Z, N, DATA) typename std::decay<A ## N>::type&& a ## N
208 
209 #define TEMPLATE(Z, N, DATA) \
210  template <typename T, \
211  ENUM_PARAMS(N, typename P), \
212  ENUM_PARAMS(N, typename A)> \
213  void dispatch( \
214  const PID<T>& pid, \
215  void (T::*method)(ENUM_PARAMS(N, P)), \
216  ENUM_BINARY_PARAMS(N, A, &&a)) \
217  { \
218  std::unique_ptr<lambda::CallableOnce<void(ProcessBase*)>> f( \
219  new lambda::CallableOnce<void(ProcessBase*)>( \
220  lambda::partial( \
221  [method](ENUM(N, DECL, _), ProcessBase* process) { \
222  assert(process != nullptr); \
223  T* t = dynamic_cast<T*>(process); \
224  assert(t != nullptr); \
225  (t->*method)(ENUM(N, MOVE, _)); \
226  }, \
227  ENUM(N, FORWARD, _), \
228  lambda::_1))); \
229  \
230  internal::dispatch(pid, std::move(f), &typeid(method)); \
231  } \
232  \
233  template <typename T, \
234  ENUM_PARAMS(N, typename P), \
235  ENUM_PARAMS(N, typename A)> \
236  void dispatch( \
237  const Process<T>& process, \
238  void (T::*method)(ENUM_PARAMS(N, P)), \
239  ENUM_BINARY_PARAMS(N, A, &&a)) \
240  { \
241  dispatch(process.self(), method, ENUM(N, FORWARD, _)); \
242  } \
243  \
244  template <typename T, \
245  ENUM_PARAMS(N, typename P), \
246  ENUM_PARAMS(N, typename A)> \
247  void dispatch( \
248  const Process<T>* process, \
249  void (T::*method)(ENUM_PARAMS(N, P)), \
250  ENUM_BINARY_PARAMS(N, A, &&a)) \
251  { \
252  dispatch(process->self(), method, ENUM(N, FORWARD, _)); \
253  }
254 
255  REPEAT_FROM_TO(1, 13, TEMPLATE, _) // Args A0 -> A11.
256 #undef TEMPLATE
257 
258 
259 // Next, definitions of methods returning a future:
260 
261 template <typename R, typename T>
262 Future<R> dispatch(const PID<T>& pid, Future<R> (T::*method)())
263 {
264  std::unique_ptr<Promise<R>> promise(new Promise<R>());
265  Future<R> future = promise->future();
266 
267  std::unique_ptr<lambda::CallableOnce<void(ProcessBase*)>> f(
268  new lambda::CallableOnce<void(ProcessBase*)>(
270  [=](std::unique_ptr<Promise<R>> promise, ProcessBase* process) {
271  assert(process != nullptr);
272  T* t = dynamic_cast<T*>(process);
273  assert(t != nullptr);
274  promise->associate((t->*method)());
275  },
276  std::move(promise),
277  lambda::_1)));
278 
279  internal::dispatch(pid, std::move(f), &typeid(method));
280 
281  return future;
282 }
283 
284 template <typename R, typename T>
285 Future<R> dispatch(const Process<T>& process, Future<R> (T::*method)())
286 {
287  return dispatch(process.self(), method);
288 }
289 
290 template <typename R, typename T>
291 Future<R> dispatch(const Process<T>* process, Future<R> (T::*method)())
292 {
293  return dispatch(process->self(), method);
294 }
295 
296 #define TEMPLATE(Z, N, DATA) \
297  template <typename R, \
298  typename T, \
299  ENUM_PARAMS(N, typename P), \
300  ENUM_PARAMS(N, typename A)> \
301  Future<R> dispatch( \
302  const PID<T>& pid, \
303  Future<R> (T::*method)(ENUM_PARAMS(N, P)), \
304  ENUM_BINARY_PARAMS(N, A, &&a)) \
305  { \
306  std::unique_ptr<Promise<R>> promise(new Promise<R>()); \
307  Future<R> future = promise->future(); \
308  \
309  std::unique_ptr<lambda::CallableOnce<void(ProcessBase*)>> f( \
310  new lambda::CallableOnce<void(ProcessBase*)>( \
311  lambda::partial( \
312  [method](std::unique_ptr<Promise<R>> promise, \
313  ENUM(N, DECL, _), \
314  ProcessBase* process) { \
315  assert(process != nullptr); \
316  T* t = dynamic_cast<T*>(process); \
317  assert(t != nullptr); \
318  promise->associate( \
319  (t->*method)(ENUM(N, MOVE, _))); \
320  }, \
321  std::move(promise), \
322  ENUM(N, FORWARD, _), \
323  lambda::_1))); \
324  \
325  internal::dispatch(pid, std::move(f), &typeid(method)); \
326  \
327  return future; \
328  } \
329  \
330  template <typename R, \
331  typename T, \
332  ENUM_PARAMS(N, typename P), \
333  ENUM_PARAMS(N, typename A)> \
334  Future<R> dispatch( \
335  const Process<T>& process, \
336  Future<R> (T::*method)(ENUM_PARAMS(N, P)), \
337  ENUM_BINARY_PARAMS(N, A, &&a)) \
338  { \
339  return dispatch(process.self(), method, ENUM(N, FORWARD, _)); \
340  } \
341  \
342  template <typename R, \
343  typename T, \
344  ENUM_PARAMS(N, typename P), \
345  ENUM_PARAMS(N, typename A)> \
346  Future<R> dispatch( \
347  const Process<T>* process, \
348  Future<R> (T::*method)(ENUM_PARAMS(N, P)), \
349  ENUM_BINARY_PARAMS(N, A, &&a)) \
350  { \
351  return dispatch(process->self(), method, ENUM(N, FORWARD, _)); \
352  }
353 
354  REPEAT_FROM_TO(1, 13, TEMPLATE, _) // Args A0 -> A11.
355 #undef TEMPLATE
356 
357 
358 // Next, definitions of methods returning a value.
359 
360 template <typename R, typename T>
361 Future<R> dispatch(const PID<T>& pid, R (T::*method)())
362 {
363  std::unique_ptr<Promise<R>> promise(new Promise<R>());
364  Future<R> future = promise->future();
365 
366  std::unique_ptr<lambda::CallableOnce<void(ProcessBase*)>> f(
367  new lambda::CallableOnce<void(ProcessBase*)>(
369  [=](std::unique_ptr<Promise<R>> promise, ProcessBase* process) {
370  assert(process != nullptr);
371  T* t = dynamic_cast<T*>(process);
372  assert(t != nullptr);
373  promise->set((t->*method)());
374  },
375  std::move(promise),
376  lambda::_1)));
377 
378  internal::dispatch(pid, std::move(f), &typeid(method));
379 
380  return future;
381 }
382 
383 template <typename R, typename T>
384 Future<R> dispatch(const Process<T>& process, R (T::*method)())
385 {
386  return dispatch(process.self(), method);
387 }
388 
389 template <typename R, typename T>
390 Future<R> dispatch(const Process<T>* process, R (T::*method)())
391 {
392  return dispatch(process->self(), method);
393 }
394 
395 #define TEMPLATE(Z, N, DATA) \
396  template <typename R, \
397  typename T, \
398  ENUM_PARAMS(N, typename P), \
399  ENUM_PARAMS(N, typename A)> \
400  Future<R> dispatch( \
401  const PID<T>& pid, \
402  R (T::*method)(ENUM_PARAMS(N, P)), \
403  ENUM_BINARY_PARAMS(N, A, &&a)) \
404  { \
405  std::unique_ptr<Promise<R>> promise(new Promise<R>()); \
406  Future<R> future = promise->future(); \
407  \
408  std::unique_ptr<lambda::CallableOnce<void(ProcessBase*)>> f( \
409  new lambda::CallableOnce<void(ProcessBase*)>( \
410  lambda::partial( \
411  [method](std::unique_ptr<Promise<R>> promise, \
412  ENUM(N, DECL, _), \
413  ProcessBase* process) { \
414  assert(process != nullptr); \
415  T* t = dynamic_cast<T*>(process); \
416  assert(t != nullptr); \
417  promise->set((t->*method)(ENUM(N, MOVE, _))); \
418  }, \
419  std::move(promise), \
420  ENUM(N, FORWARD, _), \
421  lambda::_1))); \
422  \
423  internal::dispatch(pid, std::move(f), &typeid(method)); \
424  \
425  return future; \
426  } \
427  \
428  template <typename R, \
429  typename T, \
430  ENUM_PARAMS(N, typename P), \
431  ENUM_PARAMS(N, typename A)> \
432  Future<R> dispatch( \
433  const Process<T>& process, \
434  R (T::*method)(ENUM_PARAMS(N, P)), \
435  ENUM_BINARY_PARAMS(N, A, &&a)) \
436  { \
437  return dispatch(process.self(), method, ENUM(N, FORWARD, _)); \
438  } \
439  \
440  template <typename R, \
441  typename T, \
442  ENUM_PARAMS(N, typename P), \
443  ENUM_PARAMS(N, typename A)> \
444  Future<R> dispatch( \
445  const Process<T>* process, \
446  R (T::*method)(ENUM_PARAMS(N, P)), \
447  ENUM_BINARY_PARAMS(N, A, &&a)) \
448  { \
449  return dispatch(process->self(), method, ENUM(N, FORWARD, _)); \
450  }
451 
452  REPEAT_FROM_TO(1, 13, TEMPLATE, _) // Args A0 -> A11.
453 #undef TEMPLATE
454 
455 #undef DECL
456 #undef MOVE
457 #undef FORWARD
458 
459 // We use partial specialization of
460 // - internal::Dispatch<void> vs
461 // - internal::Dispatch<Future<R>> vs
462 // - internal::Dispatch
463 // in order to determine whether R is void, Future or other types.
465 auto dispatch(const UPID& pid, F&& f)
466  -> decltype(internal::Dispatch<R>()(pid, std::forward<F>(f)))
467 {
468  return internal::Dispatch<R>()(pid, std::forward<F>(f));
469 }
470 
471 } // namespace process {
472 
473 #endif // __PROCESS_DISPATCH_HPP__
F && f
Definition: defer.hpp:270
Future< R > operator()(const UPID &pid, F &&f)
Definition: dispatch.hpp:108
internal::Partial< typename std::decay< F >::type, typename std::decay< Args >::type... > partial(F &&f, Args &&...args)
Definition: lambda.hpp:364
REPEAT_FROM_TO(1, 13, TEMPLATE, _) class AsyncExecutorProcess
Definition: async.hpp:63
void operator()(const UPID &pid, F &&f)
Definition: dispatch.hpp:84
Definition: process.hpp:72
An "untyped" PID, used to encapsulate the process ID for lower-layer abstractions (eg...
Definition: pid.hpp:39
Protocol< PromiseRequest, PromiseResponse > promise
void dispatch(const UPID &pid, std::unique_ptr< lambda::CallableOnce< void(ProcessBase *)>> f, const Option< const std::type_info * > &functionType=None())
Definition: dispatch.hpp:73
Result< Process > process(pid_t pid)
Definition: freebsd.hpp:30
A "process identifier" used to uniquely identify a process when dispatching messages.
Definition: pid.hpp:289
Definition: none.hpp:27
Definition: attributes.hpp:24
Future< R > operator()(const UPID &pid, F &&f)
Definition: dispatch.hpp:139
Definition: executor.hpp:48
PID< T > self() const
Returns the PID of the process.
Definition: process.hpp:514
Try< uint32_t > type(const std::string &path)
#define TEMPLATE(Z, N, DATA)
Definition: dispatch.hpp:395
Definition: process.hpp:505
Definition: lambda.hpp:414
Definition: future.hpp:58