Apache Mesos
mpsc_linked_queue.hpp
Go to the documentation of this file.
1 // Licensed under the Apache License, Version 2.0 (the "License");
2 // you may not use this file except in compliance with the License.
3 // You may obtain a copy of the License at
4 //
5 // http://www.apache.org/licenses/LICENSE-2.0
6 //
7 // Unless required by applicable law or agreed to in writing, software
8 // distributed under the License is distributed on an "AS IS" BASIS,
9 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
10 // See the License for the specific language governing permissions and
11 // limitations under the License
12 
13 #ifndef __MPSC_LINKED_QUEUE_HPP__
14 #define __MPSC_LINKED_QUEUE_HPP__
15 
16 #include <atomic>
17 #include <functional>
18 
19 #include <glog/logging.h>
20 
21 namespace process {
22 
23 // This queue is a C++ port of the MpscLinkedQueue of JCTools, but limited to
24 // the core methods:
25 // https://github.com/JCTools/JCTools/blob/master/jctools-core/src/main/java/org/jctools/queues/MpscLinkedQueue.java
26 //
27 // which is a Java port of the MPSC algorithm as presented in following article:
28 // http://www.1024cores.net/home/lock-free-algorithms/queues/non-intrusive-mpsc-node-based-queue
29 //
30 // The queue has following properties:
31 // Producers are wait-free (one atomic exchange per enqueue)
32 // Consumer is
33 // - lock-free
34 // - mostly wait-free, except when consumer reaches the end of the queue
35 // and producer enqueued a new node, but did not update the next pointer
36 // on the old node, yet
37 template <typename T>
39 {
40 private:
41  template <typename E>
42  struct Node
43  {
44  public:
45  explicit Node(E* element = nullptr) : element(element) {}
46 
47  E* element;
48  std::atomic<Node<E>*> next = ATOMIC_VAR_INIT(nullptr);
49  };
50 
51 public:
53  {
54  tail = new Node<T>();
55  head.store(tail);
56  }
57 
59  {
60  while (auto element = dequeue()) {
61  delete element;
62  }
63 
64  delete tail;
65  }
66 
67  // Multi producer safe.
68  void enqueue(T* element)
69  {
70  // A `nullptr` is used to denote an empty queue when doing a
71  // `dequeue()` so producers can't use it as an element.
72  CHECK_NOTNULL(element);
73 
74  auto newNode = new Node<T>(element);
75 
76  // Exchange is guaranteed to only give the old value to one
77  // producer, so this is safe and wait-free.
78  auto oldhead = head.exchange(newNode, std::memory_order_acq_rel);
79 
80  // At this point if this thread context switches out we may block
81  // the consumer from doing a dequeue (see below). Eventually we'll
82  // unblock the consumer once we run again and execute the next
83  // line of code.
84  oldhead->next.store(newNode, std::memory_order_release);
85  }
86 
87  // Single consumer only.
88  T* dequeue()
89  {
90  auto currentTail = tail;
91 
92  // Check and see if there is an actual element linked from `tail`
93  // since we use `tail` as a "stub" rather than the actual element.
94  auto nextTail = currentTail->next.load(std::memory_order_acquire);
95 
96  // There are three possible cases here:
97  //
98  // (1) The queue is empty.
99  // (2) The queue appears empty but a producer is still enqueuing
100  // so let's wait for it and then dequeue.
101  // (3) We have something to dequeue.
102  //
103  // Start by checking if the queue is or appears empty.
104  if (nextTail == nullptr) {
105  // Now check if the queue is actually empty or just appears
106  // empty. If it's actually empty then return `nullptr` to denote
107  // emptiness.
108  if (head.load(std::memory_order_relaxed) == tail) {
109  return nullptr;
110  }
111 
112  // Another thread already inserted a new node, but did not
113  // connect it to the tail, yet, so we spin-wait. At this point
114  // we are not wait-free anymore.
115  do {
116  nextTail = currentTail->next.load(std::memory_order_acquire);
117  } while (nextTail == nullptr);
118  }
119 
120  CHECK_NOTNULL(nextTail);
121 
122  // Set next pointer of current tail to null to disconnect it
123  // from the queue.
124  currentTail->next.store(nullptr, std::memory_order_release);
125 
126  auto element = nextTail->element;
127  nextTail->element = nullptr;
128 
129  tail = nextTail;
130  delete currentTail;
131 
132  return element;
133  }
134 
135  // Single consumer only.
136  //
137  // TODO(drexin): Provide C++ style iteration so someone can just use
138  // the `std::for_each()`.
139  template <typename F>
140  void for_each(F&& f)
141  {
142  auto end = head.load();
143  auto node = tail;
144 
145  for (;;) {
146  node = node->next.load();
147 
148  // We are following the linked structure until we reach the end
149  // node. There is a race with new nodes being added, so we limit
150  // the traversal to the last node at the time we started.
151  if (node == nullptr) {
152  return;
153  }
154 
155  f(node->element);
156 
157  if (node == end) {
158  return;
159  }
160  }
161  }
162 
163  // Single consumer only.
164  bool empty()
165  {
166  return tail->next.load(std::memory_order_relaxed) == nullptr &&
167  head.load(std::memory_order_relaxed) == tail;
168  }
169 
170 private:
171  // TODO(drexin): Programatically get the cache line size.
172  //
173  // We align head to 64 bytes (x86 cache line size) to guarantee
174  // it to be put on a new cache line. This is to prevent false
175  // sharing with other objects that could otherwise end up on
176  // the same cache line as this queue. We also align tail to
177  // avoid false sharing with head and add padding after tail
178  // to avoid false sharing with other objects.
179  alignas(64) std::atomic<Node<T>*> head;
180  alignas(64) Node<T>* tail;
181  char pad[64 - sizeof(Node<T>*)];
182 };
183 
184 } // namespace process {
185 
186 #endif // __MPSC_LINKED_QUEUE_HPP__
Definition: mpsc_linked_queue.hpp:38
void for_each(F &&f)
Definition: mpsc_linked_queue.hpp:140
F && f
Definition: defer.hpp:270
T * dequeue()
Definition: mpsc_linked_queue.hpp:88
MpscLinkedQueue()
Definition: mpsc_linked_queue.hpp:52
Definition: executor.hpp:48
bool empty()
Definition: mpsc_linked_queue.hpp:164
void enqueue(T *element)
Definition: mpsc_linked_queue.hpp:68
~MpscLinkedQueue()
Definition: mpsc_linked_queue.hpp:58