Apache Mesos
event_queue.hpp
Go to the documentation of this file.
1 // Licensed under the Apache License, Version 2.0 (the "License");
2 // you may not use this file except in compliance with the License.
3 // You may obtain a copy of the License at
4 //
5 // http://www.apache.org/licenses/LICENSE-2.0
6 //
7 // Unless required by applicable law or agreed to in writing, software
8 // distributed under the License is distributed on an "AS IS" BASIS,
9 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
10 // See the License for the specific language governing permissions and
11 // limitations under the License
12 
13 #ifndef __PROCESS_EVENT_QUEUE_HPP__
14 #define __PROCESS_EVENT_QUEUE_HPP__
15 
16 #include <deque>
17 #include <mutex>
18 #include <string>
19 
20 #include <process/event.hpp>
21 #include <process/http.hpp>
22 
23 #include <stout/json.hpp>
24 #include <stout/stringify.hpp>
25 #include <stout/synchronized.hpp>
26 
27 #ifdef LOCK_FREE_EVENT_QUEUE
28 #include "mpsc_linked_queue.hpp"
29 #endif // LOCK_FREE_EVENT_QUEUE
30 
31 namespace process {
32 
33 // A _multiple_ producer (MP) _single_ consumer (SC) event queue for a
34 // process. Note that we don't _enforce_ the MP/SC semantics during
35 // runtime but we have explicitly separated out the `Producer`
36 // interface and the `Consumer` interface in order to help avoid
37 // incorrect usage.
38 //
39 // Notable semantics:
40 //
41 // * Consumers _must_ call `empty()` before calling
42 // `dequeue()`. Failing to do so may result in undefined behavior.
43 //
44 // * After a consumer calls `decomission()` they _must_ not call any
45 // thing else (not even `empty()` and especially not
46 // `dequeue()`). Doing so is undefined behavior.
47 //
48 // Notes on the lock-free implementation:
49 //
50 // The SC requirement is necessary for the lock-free implementation
51 // because the underlying queue does not provide linearizability which
52 // means events can be dequeued "out of order". Usually this is not a
53 // problem, after all, in most circumstances we won't know the order
54 // in which events might be enqueued in the first place. However, this
55 // can be a very bad problem if a single process attempts to enqueue
56 // two events in a different process AND THOSE EVENTS ARE
57 // REORDERED. To ensure this will never be the case we give every
58 // event a sequence number. That way an event from the same process
59 // will always have a happens-before relationship with respect to the
60 // events that they enqueue because they'll have distinct sequence
61 // numbers.
62 //
63 // This makes the consumer implementation more difficult because the
64 // consumer might need to "reorder" events as it reads them out. To do
65 // this efficiently we require only a single consumer, which fits well
66 // into the actor model because there will only ever be a single
67 // thread consuming an actors events at a time.
69 {
70 public:
71  EventQueue() : producer(this), consumer(this) {}
72 
73  class Producer
74  {
75  public:
76  // Returns false if not enqueued; this means the queue
77  // is decomissioned. In this case the caller retains
78  // ownership of the event.
79  bool enqueue(Event* event) { return queue->enqueue(event); }
80 
81  private:
82  friend class EventQueue;
83 
84  Producer(EventQueue* queue) : queue(queue) {}
85 
86  EventQueue* queue;
87  } producer;
88 
89  class Consumer
90  {
91  public:
92  Event* dequeue() { return queue->dequeue(); }
93  bool empty() { return queue->empty(); }
94  void decomission() { queue->decomission(); }
95  template <typename T>
96  size_t count() { return queue->count<T>(); }
97  operator JSON::Array() { return queue->operator JSON::Array(); }
98 
99  private:
100  friend class EventQueue;
101 
102  Consumer(EventQueue* queue) : queue(queue) {}
103 
104  EventQueue* queue;
105  } consumer;
106 
107 private:
108  friend class Producer;
109  friend class Consumer;
110 
111 #ifndef LOCK_FREE_EVENT_QUEUE
112  bool enqueue(Event* event)
113  {
114  synchronized (mutex) {
115  if (comissioned) {
116  events.push_back(event);
117  return true;
118  }
119  }
120 
121  return false;
122  }
123 
124  Event* dequeue()
125  {
126  Event* event = nullptr;
127 
128  synchronized (mutex) {
129  if (events.size() > 0) {
130  Event* event = events.front();
131  events.pop_front();
132  return event;
133  }
134  }
135 
136  // Semantics are the consumer _must_ call `empty()` before calling
137  // `dequeue()` which means an event must be present.
138  return CHECK_NOTNULL(event);
139  }
140 
141  bool empty()
142  {
143  synchronized (mutex) {
144  return events.size() == 0;
145  }
146  }
147 
148  void decomission()
149  {
150  synchronized (mutex) {
151  comissioned = false;
152  while (!events.empty()) {
153  Event* event = events.front();
154  events.pop_front();
155  delete event;
156  }
157  }
158  }
159 
160  template <typename T>
161  size_t count()
162  {
163  synchronized (mutex) {
164  return std::count_if(
165  events.begin(),
166  events.end(),
167  [](const Event* event) {
168  return event->is<T>();
169  });
170  }
171  }
172 
173  operator JSON::Array()
174  {
175  JSON::Array array;
176  synchronized (mutex) {
177  foreach (Event* event, events) {
178  array.values.push_back(JSON::Object(*event));
179  }
180  }
181  return array;
182  }
183 
184  std::mutex mutex;
185  std::deque<Event*> events;
186  bool comissioned = true;
187 #else // LOCK_FREE_EVENT_QUEUE
188  bool enqueue(Event* event)
189  {
190  if (comissioned.load()) {
191  queue.enqueue(event);
192  return true;
193  }
194 
195  return false;
196  }
197 
198  Event* dequeue()
199  {
200  return queue.dequeue();
201  }
202 
203  bool empty()
204  {
205  return queue.empty();
206  }
207 
208  void decomission()
209  {
210  comissioned.store(true);
211  while (!empty()) {
212  delete dequeue();
213  }
214  }
215 
216  template <typename T>
217  size_t count()
218  {
219  size_t count = 0;
220  queue.for_each([&count](Event* event) {
221  if (event->is<T>()) {
222  count++;
223  }
224  });
225  return count;
226  }
227 
228  operator JSON::Array()
229  {
230  JSON::Array array;
231  queue.for_each([&array](Event* event) {
232  array.values.push_back(JSON::Object(*event));
233  });
234 
235  return array;
236  }
237 
238  // Underlying queue of items.
240 
241  // Whether or not the event queue has been decomissioned. This must
242  // be atomic as it can be read by a producer even though it's only
243  // written by a consumer.
244  std::atomic<bool> comissioned = ATOMIC_VAR_INIT(true);
245 #endif // LOCK_FREE_EVENT_QUEUE
246 };
247 
248 } // namespace process {
249 
250 #endif // __PROCESS_EVENT_QUEUE_HPP__
Definition: mpsc_linked_queue.hpp:38
class process::EventQueue::Producer producer
size_t count()
Definition: event_queue.hpp:96
Definition: event_queue.hpp:68
class process::EventQueue::Consumer consumer
Definition: json.hpp:198
friend class Consumer
Definition: event_queue.hpp:109
bool is() const
Definition: event.hpp:68
bool empty()
Definition: event_queue.hpp:93
Definition: json.hpp:158
Definition: event_queue.hpp:73
EventQueue()
Definition: event_queue.hpp:71
std::vector< Value > values
Definition: json.hpp:203
void decomission()
Definition: event_queue.hpp:94
bool enqueue(Event *event)
Definition: event_queue.hpp:79
Definition: executor.hpp:48
Definition: event_queue.hpp:89
Event * dequeue()
Definition: event_queue.hpp:92
Definition: event.hpp:60