Apache Mesos
semaphore.hpp
Go to the documentation of this file.
1 // Licensed under the Apache License, Version 2.0 (the "License");
2 // you may not use this file except in compliance with the License.
3 // You may obtain a copy of the License at
4 //
5 // http://www.apache.org/licenses/LICENSE-2.0
6 //
7 // Unless required by applicable law or agreed to in writing, software
8 // distributed under the License is distributed on an "AS IS" BASIS,
9 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
10 // See the License for the specific language governing permissions and
11 // limitations under the License
12 
13 #ifndef __PROCESS_SEMAPHORE_HPP__
14 #define __PROCESS_SEMAPHORE_HPP__
15 
16 #ifdef __MACH__
17 #include <mach/mach.h>
18 #elif __WINDOWS__
19 #include <stout/windows.hpp>
20 #else
21 #include <semaphore.h>
22 #endif // __MACH__
23 
24 #include <stout/check.hpp>
25 
26 // TODO(benh): Introduce a user-level semaphore that _only_ traps into
27 // the kernel if the thread would actually need to wait.
28 
29 // TODO(benh): Add tests for these!
30 
31 #ifdef __MACH__
32 class KernelSemaphore
33 {
34 public:
36  {
37  CHECK_EQ(
38  KERN_SUCCESS,
39  semaphore_create(mach_task_self(), &semaphore, SYNC_POLICY_FIFO, 0));
40  }
41 
42  KernelSemaphore(const KernelSemaphore& other) = delete;
43 
45  {
46  CHECK_EQ(KERN_SUCCESS, semaphore_destroy(mach_task_self(), semaphore));
47  }
48 
49  KernelSemaphore& operator=(const KernelSemaphore& other) = delete;
50 
51  void wait()
52  {
53  CHECK_EQ(KERN_SUCCESS, semaphore_wait(semaphore));
54  }
55 
56  void signal()
57  {
58  CHECK_EQ(KERN_SUCCESS, semaphore_signal(semaphore));
59  }
60 
61 private:
62  semaphore_t semaphore;
63 };
64 #elif __WINDOWS__
65 class KernelSemaphore
66 {
67 public:
69  {
70  semaphore = CHECK_NOTNULL(CreateSemaphore(nullptr, 0, LONG_MAX, nullptr));
71  }
72 
73  KernelSemaphore(const KernelSemaphore& other) = delete;
74 
76  {
77  CHECK(CloseHandle(semaphore));
78  }
79 
80  KernelSemaphore& operator=(const KernelSemaphore& other) = delete;
81 
82  void wait()
83  {
84  CHECK_EQ(WAIT_OBJECT_0, WaitForSingleObject(semaphore, INFINITE));
85  }
86 
87  void signal()
88  {
89  CHECK(ReleaseSemaphore(semaphore, 1, nullptr));
90  }
91 
92 private:
93  HANDLE semaphore;
94 };
95 #else
97 {
98 public:
100  {
101  PCHECK(sem_init(&semaphore, 0, 0) == 0);
102  }
103 
104  KernelSemaphore(const KernelSemaphore& other) = delete;
105 
107  {
108  PCHECK(sem_destroy(&semaphore) == 0);
109  }
110 
111  KernelSemaphore& operator=(const KernelSemaphore& other) = delete;
112 
113  void wait()
114  {
115  int result = sem_wait(&semaphore);
116 
117  while (result != 0 && errno == EINTR) {
118  result = sem_wait(&semaphore);
119  }
120 
121  PCHECK(result == 0);
122  }
123 
124  void signal()
125  {
126  PCHECK(sem_post(&semaphore) == 0);
127  }
128 
129 private:
130  sem_t semaphore;
131 };
132 #endif // __MACH__
133 
134 
135 // Provides a "decomissionable" kernel semaphore which allows us to
136 // effectively flush all waiters and keep any future threads from
137 // waiting. In order to be able to decomission the semaphore we need
138 // to keep around the number of waiters so we can signal them all.
140 {
141 public:
142  void wait()
143  {
144  // NOTE: we must check `commissioned` AFTER we have incremented
145  // `waiters` otherwise we might race with `decomission()` and fail
146  // to properly get signaled.
147  waiters.fetch_add(1);
148 
149  if (!comissioned.load()) {
150  waiters.fetch_sub(1);
151  return;
152  }
153 
155 
156  waiters.fetch_sub(1);
157  }
158 
159  void decomission()
160  {
161  comissioned.store(false);
162 
163  // Now signal all the waiters so they wake up and stop
164  // waiting. Note that this may do more `signal()` than necessary
165  // but since no future threads will wait that doesn't matter (it
166  // would only matter if we cared about the value of the semaphore
167  // which in the current implementation we don't).
168  for (size_t i = waiters.load(); i > 0; i--) {
169  signal();
170  }
171  }
172 
173  bool decomissioned() const
174  {
175  return !comissioned.load();
176  }
177 
178  size_t capacity() const
179  {
180  // The semaphore probably doesn't actually support this many but
181  // who knows how to get this value otherwise.
182  return SIZE_MAX;
183  }
184 
185 private:
186  std::atomic<bool> comissioned = ATOMIC_VAR_INIT(true);
187  std::atomic<size_t> waiters = ATOMIC_VAR_INIT(0);
188 };
189 
190 
191 // Empirical evidence (see SVG's attached at
192 // https://issues.apache.org/jira/browse/MESOS-7798) has shown that
193 // the semaphore implementation on Linux has some performance
194 // issues. The two biggest issues we saw:
195 //
196 // (1) When there are many threads contending on the same semaphore
197 // but there are not very many "units of resource" available
198 // then the threads will spin in the kernel spinlock associated
199 // with the futex.
200 //
201 // (2) After a thread is signaled but before the thread wakes up
202 // other signaling threads may attempt to wake up that thread
203 // again. This appears to be because in the Linux/glibc
204 // implementation only the waiting thread decrements the count
205 // of waiters.
206 //
207 // The `DecomissionableLastInFirstOutFixedSizeSemaphore`
208 // optimizes both of the above issues. For (1) we give every thread
209 // their own thread-local semaphore and have them wait on that. That
210 // way there is effectively no contention on the kernel spinlock. For
211 // (2) we have the signaler decrement the number of waiters rather
212 // than the waiter do the decrement after it actually wakes up.
213 //
214 // Two other optimizations we introduce here:
215 //
216 // (1) We store the threads in a last-in-first-out (LIFO) order
217 // rather first-in-first-out (FIFO) ordering. The rational here
218 // is that we may get better cache locality if the kernel starts
219 // the thread on the same CPU and the thread works on the same
220 // resource(s). This would be more pronounced if the threads
221 // were pinned to cores. FIFO doesn't have any possible
222 // performance wins (that we could think of) so there is nothing
223 // but upside to doing LIFO instead.
224 //
225 // (2) We use a fixed size array to store each thread's
226 // semaphore. This ensures we won't need to do any memory
227 // allocation or keeps us from having to do fancier lock-free
228 // code to deal with growing (or shrinking) the storage for the
229 // thread-local semaphores.
230 //
231 // As mentioned above, every thread get's its own semaphore that is
232 // used to wait on the actual semaphore. Because a thread can only be
233 // waiting on a single semaphore at a time it's safe for each thread
234 // to only have one.
235 thread_local KernelSemaphore* __semaphore__ = nullptr;
236 
237 // Using Clang we weren't able to initialize `__semaphore__` likely
238 // because it is declared `thread_local` so instead we dereference the
239 // semaphore on every read.
240 #define _semaphore_ \
241  (__semaphore__ == nullptr ? __semaphore__ = new KernelSemaphore() \
242  : __semaphore__)
243 
245 {
246 public:
247  // TODO(benh): enable specifying the number of threads that will use
248  // this semaphore. Currently this is difficult because we construct
249  // the `RunQueue` and later this class before we've determined the
250  // number of worker threads we'll create.
252  {
253  for (size_t i = 0; i < semaphores.size(); i++) {
254  semaphores[i] = nullptr;
255  }
256  }
257 
258  void signal()
259  {
260  // NOTE: we _always_ increment `count` which means that even if we
261  // try and signal a thread another thread might have come in and
262  // decremented `count` already. This is deliberate, but it would
263  // be interesting to also investigate the performance where we
264  // always signal a new thread.
265  count.fetch_add(1);
266 
267  while (waiters.load() > 0 && count.load() > 0) {
268  for (size_t i = 0; i < semaphores.size(); i++) {
269  // Don't bother finding a semaphore to signal if there isn't
270  // anybody to signal (`waiters` == 0) or anything to do
271  // (`count` == 0).
272  if (waiters.load() == 0 || count.load() == 0) {
273  return;
274  }
275 
276  // Try and find and then signal a waiter.
277  //
278  // TODO(benh): we `load()` first and then do a
279  // compare-and-swap because the read shouldn't require a lock
280  // instruction or synchronizing the bus. In addition, we
281  // should be able to optimize the loads in the future to a
282  // weaker memory ordering. That being said, if we don't see
283  // performance wins when trying that we should consider just
284  // doing a `std::atomic::exchange()` instead.
285  KernelSemaphore* semaphore = semaphores[i].load();
286  if (semaphore != nullptr) {
287  if (!semaphores[i].compare_exchange_strong(semaphore, nullptr)) {
288  continue;
289  }
290 
291  // NOTE: we decrement `waiters` _here_ rather than in `wait`
292  // so that future signalers won't bother looping here
293  // (potentially for a long time) trying to find a waiter
294  // that might have already been signaled but just hasn't
295  // woken up yet. We even go as far as decrementing `waiters`
296  // _before_ we signal to really keep a thread from having to
297  // loop.
298  waiters.fetch_sub(1);
299 
300  semaphore->signal();
301 
302  return;
303  }
304  }
305  }
306  }
307 
308  void wait()
309  {
310  do {
311  size_t old = count.load();
312  while (old > 0) {
313  CAS:
314  if (!count.compare_exchange_strong(old, old - 1)) {
315  continue;
316  }
317  return;
318  }
319 
320  // Need to actually wait (slow path).
321  waiters.fetch_add(1);
322 
323  // NOTE: we must check `commissioned` AFTER we have
324  // incremented `waiters` otherwise we might race with
325  // `decomission()` and fail to properly get signaled.
326  if (!comissioned.load()) {
327  waiters.fetch_sub(1);
328  return;
329  }
330 
331  bool done = false;
332  while (!done) {
333  for (size_t i = 0; i < semaphores.size(); i++) {
334  // NOTE: see TODO in `signal()` above for why we do the
335  // `load()` first rather than trying to compare-and-swap
336  // immediately.
337  KernelSemaphore* semaphore = semaphores[i].load();
338  if (semaphore == nullptr) {
339  // NOTE: we _must_ check one last time if we should really
340  // wait because there is a race that `signal()` was
341  // completely executed in between when we checked `count`
342  // and when we incremented `waiters` and hence we could
343  // wait forever. We delay this check until the 11th hour
344  // so that we can also benefit from the possibility that
345  // more things have been enqueued while we were looking
346  // for a slot in the array.
347  if ((old = count.load()) > 0) {
348  waiters.fetch_sub(1);
349  goto CAS;
350  }
351  if (semaphores[i].compare_exchange_strong(semaphore, _semaphore_)) {
352  done = true;
353  break;
354  }
355  }
356  }
357  }
358 
359  // TODO(benh): To make this be wait-free for the signalers we
360  // need to enqueue semaphore before we increment `waiters`. The
361  // reason we can't do that right now is because we don't know
362  // how to remove ourselves from `semaphores` if, after checking
363  // `count` (which we need to do due to the race between
364  // signaling and waiting) we determine that we don't need to
365  // wait (because then we have our semaphore stuck in the
366  // queue). A solution here could be to have a fixed size queue
367  // that we can just remove ourselves from, but then note that
368  // we'll need to set the semaphore back to zero in the event
369  // that it got signaled so the next time we don't _not_ wait.
370 
371  _semaphore_->wait();
372  } while (true);
373  }
374 
375  void decomission()
376  {
377  comissioned.store(false);
378 
379  // Now signal all the waiters so they wake up and stop
380  // waiting. Note that this may do more `signal()` than necessary
381  // but since no future threads will wait that doesn't matter (it
382  // would only matter if we cared about the value of the semaphore
383  // which in the current implementation we don't).
384  for (size_t i = waiters.load(); i > 0; i--) {
385  signal();
386  }
387  }
388 
389  bool decomissioned() const
390  {
391  return !comissioned.load();
392  }
393 
394  size_t capacity() const
395  {
396  return semaphores.size();
397  }
398 
399 private:
400  // Maximum number of threads that could ever wait on this semaphore.
401  static constexpr size_t THREADS = 128;
402 
403  // Indicates whether or not this semaphore has been decomissioned.
404  std::atomic<bool> comissioned = ATOMIC_VAR_INIT(true);
405 
406  // Count of currently available "units of resource" represented by
407  // this semaphore.
408  std::atomic<size_t> count = ATOMIC_VAR_INIT(0);
409 
410  // Number of threads waiting for an available "unit of resource".
411  std::atomic<size_t> waiters = ATOMIC_VAR_INIT(0);
412 
413  // Fixed array holding thread-local semaphores used for waiting and
414  // signaling threads.
415  std::array<std::atomic<KernelSemaphore*>, THREADS> semaphores;
416 };
417 
418 #endif // __PROCESS_SEMAPHORE_HPP__
size_t capacity() const
Definition: semaphore.hpp:394
bool decomissioned() const
Definition: semaphore.hpp:389
void signal()
Definition: semaphore.hpp:258
KernelSemaphore()
Definition: semaphore.hpp:99
void decomission()
Definition: semaphore.hpp:159
bool decomissioned() const
Definition: semaphore.hpp:173
DecomissionableLastInFirstOutFixedSizeSemaphore()
Definition: semaphore.hpp:251
size_t capacity() const
Definition: semaphore.hpp:178
constexpr Handle HANDLE
Definition: ingress.hpp:37
void signal()
Definition: semaphore.hpp:124
void wait()
Definition: semaphore.hpp:142
~KernelSemaphore()
Definition: semaphore.hpp:106
KernelSemaphore & operator=(const KernelSemaphore &other)=delete
void wait()
Definition: semaphore.hpp:308
Definition: semaphore.hpp:96
thread_local KernelSemaphore * __semaphore__
Definition: semaphore.hpp:235
#define _semaphore_
Definition: semaphore.hpp:240
void wait()
Definition: semaphore.hpp:113
Definition: semaphore.hpp:139
void decomission()
Definition: semaphore.hpp:375