Apache Mesos
event_queue.hpp
Go to the documentation of this file.
1 // Licensed under the Apache License, Version 2.0 (the "License");
2 // you may not use this file except in compliance with the License.
3 // You may obtain a copy of the License at
4 //
5 // http://www.apache.org/licenses/LICENSE-2.0
6 //
7 // Unless required by applicable law or agreed to in writing, software
8 // distributed under the License is distributed on an "AS IS" BASIS,
9 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
10 // See the License for the specific language governing permissions and
11 // limitations under the License
12 
13 #ifndef __PROCESS_EVENT_QUEUE_HPP__
14 #define __PROCESS_EVENT_QUEUE_HPP__
15 
16 #include <deque>
17 #include <mutex>
18 #include <string>
19 
20 #ifdef LOCK_FREE_EVENT_QUEUE
21 #include <concurrentqueue.h>
22 #endif // LOCK_FREE_EVENT_QUEUE
23 
24 #include <process/event.hpp>
25 #include <process/http.hpp>
26 
27 #include <stout/json.hpp>
28 #include <stout/stringify.hpp>
29 #include <stout/synchronized.hpp>
30 
31 namespace process {
32 
33 // A _multiple_ producer (MP) _single_ consumer (SC) event queue for a
34 // process. Note that we don't _enforce_ the MP/SC semantics during
35 // runtime but we have explicitly separated out the `Producer`
36 // interface and the `Consumer` interface in order to help avoid
37 // incorrect usage.
38 //
39 // Notable semantics:
40 //
41 // * Consumers _must_ call `empty()` before calling
42 // `dequeue()`. Failing to do so may result in undefined behavior.
43 //
44 // * After a consumer calls `decomission()` they _must_ not call any
45 // thing else (not even `empty()` and especially not
46 // `dequeue()`). Doing so is undefined behavior.
47 //
48 // Notes on the lock-free implementation:
49 //
50 // The SC requirement is necessary for the lock-free implementation
51 // because the underlying queue does not provide linearizability which
52 // means events can be dequeued "out of order". Usually this is not a
53 // problem, after all, in most circumstances we won't know the order
54 // in which events might be enqueued in the first place. However, this
55 // can be a very bad problem if a single process attempts to enqueue
56 // two events in a different process AND THOSE EVENTS ARE
57 // REORDERED. To ensure this will never be the case we give every
58 // event a sequence number. That way an event from the same process
59 // will always have a happens-before relationship with respect to the
60 // events that they enqueue because they'll have distinct sequence
61 // numbers.
62 //
63 // This makes the consumer implementation more difficult because the
64 // consumer might need to "reorder" events as it reads them out. To do
65 // this efficiently we require only a single consumer, which fits well
66 // into the actor model because there will only ever be a single
67 // thread consuming an actors events at a time.
69 {
70 public:
71  EventQueue() : producer(this), consumer(this) {}
72 
73  class Producer
74  {
75  public:
76  void enqueue(Event* event) { queue->enqueue(event); }
77 
78  private:
79  friend class EventQueue;
80 
81  Producer(EventQueue* queue) : queue(queue) {}
82 
83  EventQueue* queue;
84  } producer;
85 
86  class Consumer
87  {
88  public:
89  Event* dequeue() { return queue->dequeue(); }
90  bool empty() { return queue->empty(); }
91  void decomission() { queue->decomission(); }
92  template <typename T>
93  size_t count() { return queue->count<T>(); }
94  operator JSON::Array() { return queue->operator JSON::Array(); }
95 
96  private:
97  friend class EventQueue;
98 
99  Consumer(EventQueue* queue) : queue(queue) {}
100 
101  EventQueue* queue;
102  } consumer;
103 
104 private:
105  friend class Producer;
106  friend class Consumer;
107 
108 #ifndef LOCK_FREE_EVENT_QUEUE
109  void enqueue(Event* event)
110  {
111  bool enqueued = false;
112  synchronized (mutex) {
113  if (comissioned) {
114  events.push_back(event);
115  enqueued = true;
116  }
117  }
118 
119  if (!enqueued) {
120  delete event;
121  }
122  }
123 
124  Event* dequeue()
125  {
126  Event* event = nullptr;
127 
128  synchronized (mutex) {
129  if (events.size() > 0) {
130  Event* event = events.front();
131  events.pop_front();
132  return event;
133  }
134  }
135 
136  // Semantics are the consumer _must_ call `empty()` before calling
137  // `dequeue()` which means an event must be present.
138  return CHECK_NOTNULL(event);
139  }
140 
141  bool empty()
142  {
143  synchronized (mutex) {
144  return events.size() == 0;
145  }
146  }
147 
148  void decomission()
149  {
150  synchronized (mutex) {
151  comissioned = false;
152  while (!events.empty()) {
153  Event* event = events.front();
154  events.pop_front();
155  delete event;
156  }
157  }
158  }
159 
160  template <typename T>
161  size_t count()
162  {
163  synchronized (mutex) {
164  return std::count_if(
165  events.begin(),
166  events.end(),
167  [](const Event* event) {
168  return event->is<T>();
169  });
170  }
171  }
172 
173  operator JSON::Array()
174  {
175  JSON::Array array;
176  synchronized (mutex) {
177  foreach (Event* event, events) {
178  array.values.push_back(JSON::Object(*event));
179  }
180  }
181  return array;
182  }
183 
184  std::mutex mutex;
185  std::deque<Event*> events;
186  bool comissioned = true;
187 #else // LOCK_FREE_EVENT_QUEUE
188  void enqueue(Event* event)
189  {
190  Item item = {sequence.fetch_add(1), event};
191  if (comissioned.load()) {
192  queue.enqueue(std::move(item));
193  } else {
194  sequence.fetch_sub(1);
195  delete event;
196  }
197  }
198 
199  Event* dequeue()
200  {
201  // NOTE: for performance reasons we don't check `comissioned` here
202  // so it's possible that we'll loop forever if a consumer called
203  // `decomission()` and then subsequently called `dequeue()`.
204  Event* event = nullptr;
205  do {
206  // Given the nature of the concurrent queue implementation it's
207  // possible that we'll need to try to dequeue multiple times
208  // until it returns an event even though we know there is an
209  // event because the semantics are that we shouldn't call
210  // `dequeue()` before calling `empty()`.
211  event = try_dequeue();
212  } while (event == nullptr);
213  return event;
214  }
215 
216  bool empty()
217  {
218  // NOTE: for performance reasons we don't check `comissioned` here
219  // so it's possible that we'll return true when in fact we've been
220  // decomissioned and you shouldn't attempt to dequeue anything.
221  return (sequence.load() - next) == 0;
222  }
223 
224  void decomission()
225  {
226  comissioned.store(true);
227  while (!empty()) {
228  // NOTE: we use `try_dequeue()` here because we might be racing
229  // with `enqueue()` where they've already incremented `sequence`
230  // so we think there are more items to dequeue but they aren't
231  // actually going to enqueue anything because they've since seen
232  // `comissioned` is true. We'll attempt to dequeue with
233  // `try_dequeue()` and eventually they'll decrement `sequence`
234  // and so `empty()` will return true and we'll bail.
235  Event* event = try_dequeue();
236  if (event != nullptr) {
237  delete event;
238  }
239  }
240  }
241 
242  template <typename T>
243  size_t count()
244  {
245  // Try and dequeue more elements first!
246  queue.try_dequeue_bulk(std::back_inserter(items), SIZE_MAX);
247 
248  return std::count_if(
249  items.begin(),
250  items.end(),
251  [](const Item& item) {
252  if (item.event != nullptr) {
253  return item.event->is<T>();
254  }
255  return false;
256  });
257  }
258 
259  operator JSON::Array()
260  {
261  // Try and dequeue more elements first!
262  queue.try_dequeue_bulk(std::back_inserter(items), SIZE_MAX);
263 
264  JSON::Array array;
265  foreach (const Item& item, items) {
266  if (item.event != nullptr) {
267  array.values.push_back(JSON::Object(*item.event));
268  }
269  }
270 
271  return array;
272  }
273 
274  struct Item
275  {
276  uint64_t sequence;
277  Event* event;
278  };
279 
280  Event* try_dequeue()
281  {
282  // The general algoritm here is as follows: we bulk dequeue as
283  // many items from the concurrent queue as possible. We then look
284  // for the `next` item in the sequence hoping that it's at the
285  // beginning of `items` but because the `queue` is not
286  // linearizable it might be "out of order". If we find it out of
287  // order we effectively dequeue it but leave it in `items` so as
288  // not to incur any costly rearrangements/compactions in
289  // `items`. We'll later pop the out of order items once they get
290  // to the front.
291 
292  // Start by popping any items that we effectively dequeued but
293  // didn't remove from `items` so as not to incur costly
294  // rearragements/compactions.
295  while (!items.empty() && next > items.front().sequence) {
296  items.pop_front();
297  }
298 
299  // Optimistically let's hope that the next item is at the front of
300  // `item`. If so, pop the item, increment `next`, and return the
301  // event.
302  if (!items.empty() && items.front().sequence == next) {
303  Event* event = items.front().event;
304  items.pop_front();
305  next += 1;
306  return event;
307  }
308 
309  size_t index = 0;
310 
311  do {
312  // Now look for a potentially out of order item. If found,
313  // signifiy the item has been dequeued by nulling the event
314  // (necessary for the implementation of `count()` and `operator
315  // JSON::Array()`) and return the event.
316  for (; index < items.size(); index++) {
317  if (items[index].sequence == next) {
318  Event* event = items[index].event;
319  items[index].event = nullptr;
320  next += 1;
321  return event;
322  }
323  }
324 
325  // If we can bulk dequeue more items then keep looking for the
326  // out of order event!
327  //
328  // NOTE: we use the _small_ value of `4` to dequeue here since
329  // in the presence of enough events being enqueued we could end
330  // up spending a LONG time dequeuing here! Since the next event
331  // in the sequence should really be close to the top of the
332  // queue we use a small value to dequeue.
333  //
334  // The intuition here is this: the faster we can return the next
335  // event the faster that event can get processed and the faster
336  // it might generate other events that can get processed in
337  // parallel by other threads and the more work we get done.
338  } while (queue.try_dequeue_bulk(std::back_inserter(items), 4) != 0);
339 
340  return nullptr;
341  }
342 
343  // Underlying queue of items.
344  moodycamel::ConcurrentQueue<Item> queue;
345 
346  // Counter to represent the item sequence. Note that we use a
347  // unsigned 64-bit integer which means that even if we were adding
348  // one item to the queue every nanosecond we'd be able to run for
349  // 18,446,744,073,709,551,615 nanoseconds or ~585 years! ;-)
350  std::atomic<uint64_t> sequence = ATOMIC_VAR_INIT(0);
351 
352  // Counter to represent the next item we expect to dequeue. Note
353  // that we don't need to make this be atomic because only a single
354  // consumer is ever reading or writing this variable!
355  uint64_t next = 0;
356 
357  // Collection of bulk dequeued items that may be out of order. Note
358  // that like `next` this will only ever be read/written by a single
359  // consumer.
360  //
361  // The use of a deque was explicit because it is implemented as an
362  // array of arrays (or vector of vectors) which usually gives good
363  // performance for appending to the back and popping from the front
364  // which is exactly what we need to do. To avoid any performance
365  // issues that might be incurred we do not remove any items from the
366  // middle of the deque (see comments in `try_dequeue()` above for
367  // more details).
368  std::deque<Item> items;
369 
370  // Whether or not the event queue has been decomissioned. This must
371  // be atomic as it can be read by a producer even though it's only
372  // written by a consumer.
373  std::atomic<bool> comissioned = ATOMIC_VAR_INIT(true);
374 #endif // LOCK_FREE_EVENT_QUEUE
375 };
376 
377 } // namespace process {
378 
379 #endif // __PROCESS_EVENT_QUEUE_HPP__
class process::EventQueue::Producer producer
size_t count()
Definition: event_queue.hpp:93
void enqueue(Event *event)
Definition: event_queue.hpp:76
Definition: event_queue.hpp:68
class process::EventQueue::Consumer consumer
Definition: json.hpp:194
friend class Consumer
Definition: event_queue.hpp:106
bool empty()
Definition: event_queue.hpp:90
Definition: json.hpp:154
Definition: event_queue.hpp:73
EventQueue()
Definition: event_queue.hpp:71
std::vector< Value > values
Definition: json.hpp:199
void decomission()
Definition: event_queue.hpp:91
Definition: executor.hpp:48
Definition: event_queue.hpp:86
Event * dequeue()
Definition: event_queue.hpp:89
Definition: event.hpp:60