Apache Mesos
executor.hpp
Go to the documentation of this file.
1 // Licensed under the Apache License, Version 2.0 (the "License");
2 // you may not use this file except in compliance with the License.
3 // You may obtain a copy of the License at
4 //
5 // http://www.apache.org/licenses/LICENSE-2.0
6 //
7 // Unless required by applicable law or agreed to in writing, software
8 // distributed under the License is distributed on an "AS IS" BASIS,
9 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
10 // See the License for the specific language governing permissions and
11 // limitations under the License
12 
13 #ifndef __PROCESS_EXECUTOR_HPP__
14 #define __PROCESS_EXECUTOR_HPP__
15 
16 #include <process/deferred.hpp>
17 #include <process/id.hpp>
18 #include <process/process.hpp>
19 
20 #include <stout/nothing.hpp>
21 #include <stout/result_of.hpp>
22 
23 namespace process {
24 
25 // Provides an abstraction that can take a standard function object
26 // and defer or asynchronously execute it without needing a process.
27 // Each converted function object will get execute serially with respect
28 // to one another when invoked.
29 class Executor
30 {
31 public:
32  Executor() : process(ID::generate("__executor__"))
33  {
34  spawn(process);
35  }
36 
38  {
39  terminate(process);
40  wait(process);
41  }
42 
43  void stop()
44  {
45  terminate(process);
46 
47  // TODO(benh): Note that this doesn't wait because that could
48  // cause a deadlock ... thus, the semantics here are that no more
49  // dispatches will occur after this function returns but one may
50  // be occurring concurrently.
51  }
52 
53  template <typename F>
55  {
56  return _Deferred<F>(process.self(), std::forward<F>(f));
57  }
58 
59 
60 private:
61  // Not copyable, not assignable.
62  Executor(const Executor&);
63  Executor& operator=(const Executor&);
64 
65  ProcessBase process;
66 
67 
68 public:
69  template <
70  typename F,
71  typename R = typename result_of<F()>::type,
72  typename std::enable_if<!std::is_void<R>::value, int>::type = 0>
73  auto execute(F&& f)
74  -> decltype(dispatch(process, std::function<R()>(std::forward<F>(f))))
75  {
76  // NOTE: Currently we cannot pass a mutable lambda into `dispatch()`
77  // because it would be captured by copy, so we convert `f` into a
78  // `std::function` to bypass this restriction.
79  return dispatch(process, std::function<R()>(std::forward<F>(f)));
80  }
81 
82  // NOTE: This overload for `void` returns `Future<Nothing>` so we can
83  // chain. This follows the same behavior as `async()`.
84  template <
85  typename F,
86  typename R = typename result_of<F()>::type,
87  typename std::enable_if<std::is_void<R>::value, int>::type = 0>
89  {
90  // NOTE: Currently we cannot pass a mutable lambda into `dispatch()`
91  // because it would be captured by copy, so we convert `f` into a
92  // `std::function` to bypass this restriction. This wrapper also
93  // avoids `f` being evaluated when it is a nested bind.
94  // TODO(chhsiao): Capture `f` by forwarding once we switch to C++14.
95  return dispatch(
96  process,
97  std::bind(
98  [](const std::function<R()>& f_) { f_(); return Nothing(); },
99  std::function<R()>(std::forward<F>(f))));
100  }
101 };
102 
103 
104 // Per thread executor pointer. We use a pointer to lazily construct the
105 // actual executor.
106 extern thread_local Executor* _executor_;
107 
108 #define __executor__ \
109  (_executor_ == nullptr ? _executor_ = new Executor() : _executor_)
110 
111 } // namespace process {
112 
113 #endif // __PROCESS_EXECUTOR_HPP__
std::string generate(const std::string &prefix="")
Returns &#39;prefix(N)&#39; where N represents the number of instances where the same prefix (wrt...
Definition: nothing.hpp:16
F && f
Definition: defer.hpp:270
Definition: process.hpp:72
UPID spawn(ProcessBase *process, bool manage=false)
Spawn a new process.
void terminate(const UPID &pid, bool inject=true)
Sends a TerminateEvent to the given process.
thread_local Executor * _executor_
Future< Nothing > execute(F &&f)
Definition: executor.hpp:88
_Deferred< F > defer(F &&f)
Definition: executor.hpp:54
void dispatch(const PID< T > &pid, void(T::*method)())
Definition: dispatch.hpp:174
auto execute(F &&f) -> decltype(dispatch(process, std::function< R()>(std::forward< F >(f))))
Definition: executor.hpp:73
Definition: deferred.hpp:64
bool wait(const UPID &pid, const Duration &duration=Seconds(-1))
Wait for the process to exit for no more than the specified seconds.
Executor()
Definition: executor.hpp:32
void stop()
Definition: executor.hpp:43
Result< Process > process(pid_t pid)
Definition: freebsd.hpp:30
Try< uint32_t > type(const std::string &path)
Try< Nothing > bind(int_fd s, const Address &address)
Definition: network.hpp:46
Definition: executor.hpp:29
const UPID & self() const
Definition: process.hpp:79
~Executor()
Definition: executor.hpp:37