Apache Mesos
libev.hpp
Go to the documentation of this file.
1 // Licensed under the Apache License, Version 2.0 (the "License");
2 // you may not use this file except in compliance with the License.
3 // You may obtain a copy of the License at
4 //
5 // http://www.apache.org/licenses/LICENSE-2.0
6 //
7 // Unless required by applicable law or agreed to in writing, software
8 // distributed under the License is distributed on an "AS IS" BASIS,
9 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
10 // See the License for the specific language governing permissions and
11 // limitations under the License
12 
13 #ifndef __LIBEV_HPP__
14 #define __LIBEV_HPP__
15 
16 #include <ev.h>
17 
18 #include <mutex>
19 #include <queue>
20 
21 #include <process/future.hpp>
22 #include <process/owned.hpp>
23 
24 #include <stout/lambda.hpp>
25 #include <stout/synchronized.hpp>
26 
27 namespace process {
28 
29 // Event loop.
30 extern struct ev_loop* loop;
31 
32 // Asynchronous watcher for interrupting loop to specifically deal
33 // with IO watchers and functions (via run_in_event_loop).
34 extern ev_async async_watcher;
35 
36 // Queue of I/O watchers to be asynchronously added to the event loop
37 // (protected by 'watchers' below).
38 // TODO(benh): Replace this queue with functions that we put in
39 // 'functions' below that perform the ev_io_start themselves.
40 extern std::queue<ev_io*>* watchers;
41 extern std::mutex* watchers_mutex;
42 
43 // Queue of functions to be invoked asynchronously within the vent
44 // loop (protected by 'watchers' above).
45 extern std::queue<lambda::function<void()>>* functions;
46 
47 // Per thread bool pointer. We use a pointer to lazily construct the
48 // actual bool.
49 extern thread_local bool* _in_event_loop_;
50 
51 #define __in_event_loop__ *(_in_event_loop_ == nullptr ? \
52  _in_event_loop_ = new bool(false) : _in_event_loop_)
53 
54 
55 // Wrapper around function we want to run in the event loop.
56 template <typename T>
58  const lambda::function<Future<T>()>& f,
59  const Owned<Promise<T>>& promise)
60 {
61  // Don't bother running the function if the future has been discarded.
62  if (promise->future().hasDiscard()) {
63  promise->discard();
64  } else {
65  promise->set(f());
66  }
67 }
68 
69 
70 // Helper for running a function in the event loop.
71 template <typename T>
72 Future<T> run_in_event_loop(const lambda::function<Future<T>()>& f)
73 {
74  // If this is already the event loop then just run the function.
75  if (__in_event_loop__) {
76  return f();
77  }
78 
80 
81  Future<T> future = promise->future();
82 
83  // Enqueue the function.
84  synchronized (watchers_mutex) {
85  functions->push(lambda::bind(&_run_in_event_loop<T>, f, promise));
86  }
87 
88  // Interrupt the loop.
89  ev_async_send(loop, &async_watcher);
90 
91  return future;
92 }
93 
94 } // namespace process {
95 
96 #endif // __LIBEV_HPP__
F && f
Definition: defer.hpp:270
std::queue< lambda::function< void()> > * functions
Future< T > run_in_event_loop(const lambda::function< Future< T >()> &f)
Definition: libev.hpp:72
std::queue< ev_io * > * watchers
#define __in_event_loop__
Definition: libev.hpp:51
Definition: future.hpp:73
struct ev_loop * loop
Definition: loop.hpp:456
Protocol< PromiseRequest, PromiseResponse > promise
thread_local bool * _in_event_loop_
Definition: libevent.hpp:28
Result< Process > process(pid_t pid)
Definition: freebsd.hpp:30
ev_async async_watcher
void _run_in_event_loop(const lambda::function< Future< T >()> &f, const Owned< Promise< T >> &promise)
Definition: libev.hpp:57
Try< Nothing > bind(int_fd s, const Address &address)
Definition: network.hpp:46
Definition: owned.hpp:35
std::mutex * watchers_mutex
Definition: future.hpp:57