Apache Mesos
queue.hpp
Go to the documentation of this file.
1 // Licensed under the Apache License, Version 2.0 (the "License");
2 // you may not use this file except in compliance with the License.
3 // You may obtain a copy of the License at
4 //
5 // http://www.apache.org/licenses/LICENSE-2.0
6 //
7 // Unless required by applicable law or agreed to in writing, software
8 // distributed under the License is distributed on an "AS IS" BASIS,
9 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
10 // See the License for the specific language governing permissions and
11 // limitations under the License
12 
13 #ifndef __PROCESS_QUEUE_HPP__
14 #define __PROCESS_QUEUE_HPP__
15 
16 #include <atomic>
17 #include <deque>
18 #include <memory>
19 #include <queue>
20 
21 #include <process/future.hpp>
22 #include <process/owned.hpp>
23 
24 #include <stout/synchronized.hpp>
25 
26 namespace process {
27 
28 template <typename T>
29 class Queue
30 {
31 public:
32  Queue() : data(new Data()) {}
33 
34  void put(const T& t)
35  {
36  // NOTE: We need to grab the promise 'date->promises.front()' but
37  // set it outside of the critical section because setting it might
38  // trigger callbacks that try to reacquire the lock.
40 
41  synchronized (data->lock) {
42  if (data->promises.empty()) {
43  data->elements.push(t);
44  } else {
45  promise = data->promises.front();
46  data->promises.pop_front();
47  }
48  }
49 
50  if (promise.get() != nullptr) {
51  promise->set(t);
52  }
53  }
54 
55  Future<T> get()
56  {
57  synchronized (data->lock) {
58  if (data->elements.empty()) {
59  data->promises.push_back(Owned<Promise<T>>(new Promise<T>()));
60  return data->promises.back()->future();
61  } else {
62  Future<T> future = Future<T>(data->elements.front());
63  data->elements.pop();
64  return future;
65  }
66  }
67  }
68 
69  size_t size() const
70  {
71  synchronized (data->lock) {
72  return data->elements.size();
73  }
74  }
75 
76 private:
77  struct Data
78  {
79  Data() = default;
80 
81  ~Data()
82  {
83  // TODO(benh): Fail promises?
84  }
85 
86  // Rather than use a process to serialize access to the queue's
87  // internal data we use a 'std::atomic_flag'.
88  std::atomic_flag lock = ATOMIC_FLAG_INIT;
89 
90  // Represents "waiters" for elements from the queue.
91  std::deque<Owned<Promise<T>>> promises;
92 
93  // Represents elements already put in the queue.
94  std::queue<T> elements;
95  };
96 
97  std::shared_ptr<Data> data;
98 };
99 
100 } // namespace process {
101 
102 #endif // __PROCESS_QUEUE_HPP__
Queue()
Definition: queue.hpp:32
void put(const T &t)
Definition: queue.hpp:34
T * get() const
Definition: owned.hpp:115
Definition: future.hpp:73
Protocol< PromiseRequest, PromiseResponse > promise
Result< Process > process(pid_t pid)
Definition: freebsd.hpp:30
Definition: queue.hpp:29
size_t size() const
Definition: queue.hpp:69
Definition: owned.hpp:35
Definition: future.hpp:57