Apache Mesos
run_queue.hpp
Go to the documentation of this file.
1 // Licensed under the Apache License, Version 2.0 (the "License");
2 // you may not use this file except in compliance with the License.
3 // You may obtain a copy of the License at
4 //
5 // http://www.apache.org/licenses/LICENSE-2.0
6 //
7 // Unless required by applicable law or agreed to in writing, software
8 // distributed under the License is distributed on an "AS IS" BASIS,
9 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
10 // See the License for the specific language governing permissions and
11 // limitations under the License
12 
13 #ifndef __PROCESS_RUN_QUEUE_HPP__
14 #define __PROCESS_RUN_QUEUE_HPP__
15 
16 // At _configuration_ (i.e., build) time you can specify a few
17 // optimizations:
18 //
19 // (1) --enable-lock-free-run-queue (autotools) or
20 // -DENABLE_LOCK_FREE_RUN_QUEUE (cmake) which enables the
21 // lock-free run queue implementation (see below for more details).
22 //
23 // (2) --enable-last-in-first-out-fixed-size-semaphore (autotools) or
24 // -DENABLE_LAST_IN_FIRST_OUT_FIXED_SIZE_SEMAPHORE (cmake) which
25 // enables an optimized semaphore implementation (see semaphore.hpp
26 // for more details).
27 //
28 // By default we use the `LockingRunQueue` and
29 // `DecomissionableKernelSemaphore`.
30 //
31 // We choose to make these _compile-time_ decisions rather than
32 // _runtime_ decisions because we wanted the run queue implementation
33 // to be compile-time optimized (e.g., inlined, etc).
34 
35 #ifdef LOCK_FREE_RUN_QUEUE
36 #include <concurrentqueue.h>
37 #endif // LOCK_FREE_RUN_QUEUE
38 
39 #include <algorithm>
40 #include <list>
41 
42 #include <process/process.hpp>
43 
44 #include <stout/synchronized.hpp>
45 
46 #include "semaphore.hpp"
47 
48 namespace process {
49 
50 #ifndef LOCK_FREE_RUN_QUEUE
51 class RunQueue
52 {
53 public:
55  {
56  synchronized (mutex) {
57  std::list<ProcessBase*>::iterator it = std::find(
58  processes.begin(),
59  processes.end(),
60  process);
61 
62  if (it != processes.end()) {
63  processes.erase(it);
64  return true;
65  }
66  }
67 
68  return false;
69  }
70 
71  void wait()
72  {
73  semaphore.wait();
74  }
75 
77  {
78  synchronized (mutex) {
79  processes.push_back(process);
80  }
81  epoch.fetch_add(1);
82  semaphore.signal();
83  }
84 
85  // Precondition: `wait` must get called before `dequeue`!
87  {
88  synchronized (mutex) {
89  if (!processes.empty()) {
90  ProcessBase* process = processes.front();
91  processes.pop_front();
92  return process;
93  }
94  }
95 
96  return nullptr;
97  }
98 
99  // NOTE: this function can't be const because `synchronized (mutex)`
100  // is not const ...
101  bool empty()
102  {
103  synchronized (mutex) {
104  return processes.empty();
105  }
106  }
107 
108  void decomission()
109  {
110  semaphore.decomission();
111  }
112 
113  size_t capacity() const
114  {
115  return semaphore.capacity();
116  }
117 
118  // Epoch used to capture changes to the run queue when settling.
119  std::atomic_long epoch = ATOMIC_VAR_INIT(0L);
120 
121 private:
122  std::list<ProcessBase*> processes;
123  std::mutex mutex;
124 
125  // Semaphore used for threads to wait.
126 #ifndef LAST_IN_FIRST_OUT_FIXED_SIZE_SEMAPHORE
128 #else
130 #endif // LAST_IN_FIRST_OUT_FIXED_SIZE_SEMAPHORE
131 };
132 
133 #else // LOCK_FREE_RUN_QUEUE
134 
135 class RunQueue
136 {
137 public:
138  bool extract(ProcessBase*)
139  {
140  // NOTE: moodycamel::ConcurrentQueue does not provide a way to
141  // implement extract so we simply return false here.
142  return false;
143  }
144 
145  void wait()
146  {
147  semaphore.wait();
148  }
149 
151  {
152  queue.enqueue(process);
153  epoch.fetch_add(1);
154  semaphore.signal();
155  }
156 
157  // Precondition: `wait` must get called before `dequeue`!
159  {
160  // NOTE: we loop _forever_ until we actually dequeue a process
161  // because the contract for using the run queue is that `wait`
162  // must be called first so we know that there is something to be
163  // dequeued or the run queue has been decommissioned and we should
164  // just return `nullptr`.
165  ProcessBase* process = nullptr;
166  while (!queue.try_dequeue(process)) {
167  if (semaphore.decomissioned()) {
168  break;
169  }
170  }
171  return process;
172  }
173 
174  bool empty() const
175  {
176  return queue.size_approx() == 0;
177  }
178 
179  void decomission()
180  {
181  semaphore.decomission();
182  }
183 
184  size_t capacity() const
185  {
186  return semaphore.capacity();
187  }
188 
189  // Epoch used to capture changes to the run queue when settling.
190  std::atomic_long epoch = ATOMIC_VAR_INIT(0L);
191 
192 private:
193  moodycamel::ConcurrentQueue<ProcessBase*> queue;
194 
195 #ifndef LAST_IN_FIRST_OUT_FIXED_SIZE_SEMAPHORE
197 #else
199 #endif // LAST_IN_FIRST_OUT_FIXED_SIZE_SEMAPHORE
200 };
201 
202 #endif // LOCK_FREE_RUN_QUEUE
203 
204 } // namespace process {
205 
206 #endif // __PROCESS_RUN_QUEUE_HPP__
bool empty()
Definition: run_queue.hpp:101
Definition: run_queue.hpp:51
bool extract(ProcessBase *process)
Definition: run_queue.hpp:54
size_t capacity() const
Definition: run_queue.hpp:113
Definition: process.hpp:72
std::atomic_long epoch
Definition: run_queue.hpp:119
void decomission()
Definition: semaphore.hpp:159
ProcessBase * dequeue()
Definition: run_queue.hpp:86
size_t capacity() const
Definition: semaphore.hpp:178
void signal()
Definition: semaphore.hpp:124
void enqueue(ProcessBase *process)
Definition: run_queue.hpp:76
void wait()
Definition: semaphore.hpp:142
Result< Process > process(pid_t pid)
Definition: freebsd.hpp:30
Definition: executor.hpp:48
void decomission()
Definition: run_queue.hpp:108
void wait()
Definition: run_queue.hpp:71
Definition: semaphore.hpp:139
Try< std::list< std::string > > find(const std::string &directory, const std::string &pattern)
Definition: find.hpp:37