Apache Mesos
event_queue.hpp
Go to the documentation of this file.
1 // Licensed under the Apache License, Version 2.0 (the "License");
2 // you may not use this file except in compliance with the License.
3 // You may obtain a copy of the License at
4 //
5 // http://www.apache.org/licenses/LICENSE-2.0
6 //
7 // Unless required by applicable law or agreed to in writing, software
8 // distributed under the License is distributed on an "AS IS" BASIS,
9 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
10 // See the License for the specific language governing permissions and
11 // limitations under the License
12 
13 #ifndef __PROCESS_EVENT_QUEUE_HPP__
14 #define __PROCESS_EVENT_QUEUE_HPP__
15 
16 #include <deque>
17 #include <mutex>
18 #include <string>
19 
20 #include <process/event.hpp>
21 #include <process/http.hpp>
22 
23 #include <stout/json.hpp>
24 #include <stout/stringify.hpp>
25 #include <stout/synchronized.hpp>
26 
27 #ifdef LOCK_FREE_EVENT_QUEUE
28 #include "mpsc_linked_queue.hpp"
29 #endif // LOCK_FREE_EVENT_QUEUE
30 
31 namespace process {
32 
33 // A _multiple_ producer (MP) _single_ consumer (SC) event queue for a
34 // process. Note that we don't _enforce_ the MP/SC semantics during
35 // runtime but we have explicitly separated out the `Producer`
36 // interface and the `Consumer` interface in order to help avoid
37 // incorrect usage.
38 //
39 // Notable semantics:
40 //
41 // * Consumers _must_ call `empty()` before calling
42 // `dequeue()`. Failing to do so may result in undefined behavior.
43 //
44 // * After a consumer calls `decomission()` they _must_ not call any
45 // thing else (not even `empty()` and especially not
46 // `dequeue()`). Doing so is undefined behavior.
47 //
48 // Notes on the lock-free implementation:
49 //
50 // The SC requirement is necessary for the lock-free implementation
51 // because the underlying queue does not provide linearizability which
52 // means events can be dequeued "out of order". Usually this is not a
53 // problem, after all, in most circumstances we won't know the order
54 // in which events might be enqueued in the first place. However, this
55 // can be a very bad problem if a single process attempts to enqueue
56 // two events in a different process AND THOSE EVENTS ARE
57 // REORDERED. To ensure this will never be the case we give every
58 // event a sequence number. That way an event from the same process
59 // will always have a happens-before relationship with respect to the
60 // events that they enqueue because they'll have distinct sequence
61 // numbers.
62 //
63 // This makes the consumer implementation more difficult because the
64 // consumer might need to "reorder" events as it reads them out. To do
65 // this efficiently we require only a single consumer, which fits well
66 // into the actor model because there will only ever be a single
67 // thread consuming an actors events at a time.
69 {
70 public:
71  EventQueue() : producer(this), consumer(this) {}
72 
73  class Producer
74  {
75  public:
76  void enqueue(Event* event) { queue->enqueue(event); }
77 
78  private:
79  friend class EventQueue;
80 
81  Producer(EventQueue* queue) : queue(queue) {}
82 
83  EventQueue* queue;
84  } producer;
85 
86  class Consumer
87  {
88  public:
89  Event* dequeue() { return queue->dequeue(); }
90  bool empty() { return queue->empty(); }
91  void decomission() { queue->decomission(); }
92  template <typename T>
93  size_t count() { return queue->count<T>(); }
94  operator JSON::Array() { return queue->operator JSON::Array(); }
95 
96  private:
97  friend class EventQueue;
98 
99  Consumer(EventQueue* queue) : queue(queue) {}
100 
101  EventQueue* queue;
102  } consumer;
103 
104 private:
105  friend class Producer;
106  friend class Consumer;
107 
108 #ifndef LOCK_FREE_EVENT_QUEUE
109  void enqueue(Event* event)
110  {
111  bool enqueued = false;
112  synchronized (mutex) {
113  if (comissioned) {
114  events.push_back(event);
115  enqueued = true;
116  }
117  }
118 
119  if (!enqueued) {
120  delete event;
121  }
122  }
123 
124  Event* dequeue()
125  {
126  Event* event = nullptr;
127 
128  synchronized (mutex) {
129  if (events.size() > 0) {
130  Event* event = events.front();
131  events.pop_front();
132  return event;
133  }
134  }
135 
136  // Semantics are the consumer _must_ call `empty()` before calling
137  // `dequeue()` which means an event must be present.
138  return CHECK_NOTNULL(event);
139  }
140 
141  bool empty()
142  {
143  synchronized (mutex) {
144  return events.size() == 0;
145  }
146  }
147 
148  void decomission()
149  {
150  synchronized (mutex) {
151  comissioned = false;
152  while (!events.empty()) {
153  Event* event = events.front();
154  events.pop_front();
155  delete event;
156  }
157  }
158  }
159 
160  template <typename T>
161  size_t count()
162  {
163  synchronized (mutex) {
164  return std::count_if(
165  events.begin(),
166  events.end(),
167  [](const Event* event) {
168  return event->is<T>();
169  });
170  }
171  }
172 
173  operator JSON::Array()
174  {
175  JSON::Array array;
176  synchronized (mutex) {
177  foreach (Event* event, events) {
178  array.values.push_back(JSON::Object(*event));
179  }
180  }
181  return array;
182  }
183 
184  std::mutex mutex;
185  std::deque<Event*> events;
186  bool comissioned = true;
187 #else // LOCK_FREE_EVENT_QUEUE
188  void enqueue(Event* event)
189  {
190  if (comissioned.load()) {
191  queue.enqueue(event);
192  } else {
193  delete event;
194  }
195  }
196 
197  Event* dequeue()
198  {
199  return queue.dequeue();
200  }
201 
202  bool empty()
203  {
204  return queue.empty();
205  }
206 
207  void decomission()
208  {
209  comissioned.store(true);
210  while (!empty()) {
211  delete dequeue();
212  }
213  }
214 
215  template <typename T>
216  size_t count()
217  {
218  size_t count = 0;
219  queue.for_each([&count](Event* event) {
220  if (event->is<T>()) {
221  count++;
222  }
223  });
224  return count;
225  }
226 
227  operator JSON::Array()
228  {
229  JSON::Array array;
230  queue.for_each([&array](Event* event) {
231  array.values.push_back(JSON::Object(*event));
232  });
233 
234  return array;
235  }
236 
237  // Underlying queue of items.
239 
240  // Whether or not the event queue has been decomissioned. This must
241  // be atomic as it can be read by a producer even though it's only
242  // written by a consumer.
243  std::atomic<bool> comissioned = ATOMIC_VAR_INIT(true);
244 #endif // LOCK_FREE_EVENT_QUEUE
245 };
246 
247 } // namespace process {
248 
249 #endif // __PROCESS_EVENT_QUEUE_HPP__
Definition: mpsc_linked_queue.hpp:38
class process::EventQueue::Producer producer
size_t count()
Definition: event_queue.hpp:93
void enqueue(Event *event)
Definition: event_queue.hpp:76
Definition: event_queue.hpp:68
class process::EventQueue::Consumer consumer
Definition: json.hpp:198
friend class Consumer
Definition: event_queue.hpp:106
bool is() const
Definition: event.hpp:68
bool empty()
Definition: event_queue.hpp:90
Definition: json.hpp:158
Definition: event_queue.hpp:73
EventQueue()
Definition: event_queue.hpp:71
std::vector< Value > values
Definition: json.hpp:203
void decomission()
Definition: event_queue.hpp:91
Definition: executor.hpp:48
Definition: event_queue.hpp:86
Event * dequeue()
Definition: event_queue.hpp:89
Definition: event.hpp:60