Apache Mesos
libev.hpp
Go to the documentation of this file.
1 // Licensed under the Apache License, Version 2.0 (the "License");
2 // you may not use this file except in compliance with the License.
3 // You may obtain a copy of the License at
4 //
5 // http://www.apache.org/licenses/LICENSE-2.0
6 //
7 // Unless required by applicable law or agreed to in writing, software
8 // distributed under the License is distributed on an "AS IS" BASIS,
9 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
10 // See the License for the specific language governing permissions and
11 // limitations under the License
12 
13 #ifndef __LIBEV_HPP__
14 #define __LIBEV_HPP__
15 
16 #include <ev.h>
17 
18 #include <mutex>
19 #include <queue>
20 
21 #include <process/future.hpp>
22 #include <process/owned.hpp>
23 
24 #include <stout/lambda.hpp>
25 #include <stout/synchronized.hpp>
26 
27 namespace process {
28 
29 // Array of event loops.
30 extern struct ev_loop** loops;
31 
32 // Array of async watchers for interrupting loops to specifically deal
33 // with IO watchers and functions (via run_in_event_loop).
34 extern ev_async* async_watchers;
35 
36 // Array of queues of functions to be invoked asynchronously within the
37 // event loops (each queue is protected by a mutex).
38 extern std::mutex* functions_mutexes;
39 extern std::queue<lambda::function<void()>>* functions;
40 
41 // Per thread loop pointer. If this thread is currently inside an
42 // event loop, then this will be set to point to the loop that it's
43 // executing inside. Otherwise, will be set to null.
44 extern thread_local struct ev_loop* _in_event_loop_;
45 
46 // This is a wrapper type of the loop index to ensure that
47 // `get_loop(fd)` is called to select the correct loop for
48 // `run_in_event_loop(...)`.
49 struct LoopIndex
50 {
51  size_t index;
52 
53 private:
54  explicit LoopIndex(size_t index) : index(index) {}
55  LoopIndex() = delete;
56  friend LoopIndex get_loop(int_fd fd);
57 };
58 
59 
60 // Since multiple event loops are supported, and fds are assigned
61 // to loops, callers must first get the loop based on the fd.
63 
64 
65 // Wrapper around function we want to run in the event loop.
66 template <typename T>
68  struct ev_loop* loop,
69  const lambda::function<Future<T>(struct ev_loop*)>& f,
70  const Owned<Promise<T>>& promise)
71 {
72  // Don't bother running the function if the future has been discarded.
73  if (promise->future().hasDiscard()) {
74  promise->discard();
75  } else {
76  promise->set(f(loop));
77  }
78 }
79 
80 
81 // Helper for running a function in one of the event loops.
82 template <typename T>
84  const LoopIndex loop_index,
85  const lambda::function<Future<T>(struct ev_loop*)>& f)
86 {
87  struct ev_loop* loop = loops[loop_index.index];
88 
89  // If this is already the event loop that we're trying to run the
90  // function within, then just run the function.
91  if (_in_event_loop_ == loop) {
92  return f(loop);
93  }
94 
96 
97  Future<T> future = promise->future();
98 
99  // Enqueue the function.
100  {
101  std::lock_guard<std::mutex> guard(functions_mutexes[loop_index.index]);
102  functions[loop_index.index].push(
103  lambda::bind(&_run_in_event_loop<T>, loop, f, promise));
104  }
105 
106  // Interrupt the loop.
107  ev_async_send(loop, &async_watchers[loop_index.index]);
108 
109  return future;
110 }
111 
112 } // namespace process {
113 
114 #endif // __LIBEV_HPP__
F && f
Definition: defer.hpp:270
ev_async * async_watchers
std::queue< lambda::function< void()> > * functions
size_t index
Definition: libev.hpp:51
Future< T > run_in_event_loop(const LoopIndex loop_index, const lambda::function< Future< T >(struct ev_loop *)> &f)
Definition: libev.hpp:83
Definition: future.hpp:74
Protocol< PromiseRequest, PromiseResponse > promise
Future< V > loop(const Option< UPID > &pid, Iterate &&iterate, Body &&body)
Definition: loop.hpp:456
thread_local struct ev_loop * _in_event_loop_
Definition: libevent.hpp:28
Definition: executor.hpp:48
friend LoopIndex get_loop(int_fd fd)
struct ev_loop ** loops
Definition: libev.hpp:49
Try< Nothing > bind(int_fd s, const Address &address)
Definition: network.hpp:46
void _run_in_event_loop(struct ev_loop *loop, const lambda::function< Future< T >(struct ev_loop *)> &f, const Owned< Promise< T >> &promise)
Definition: libev.hpp:67
int int_fd
Definition: int_fd.hpp:35
Definition: owned.hpp:36
std::mutex * functions_mutexes
Definition: future.hpp:58