Apache Mesos
queue.hpp
Go to the documentation of this file.
1 // Licensed under the Apache License, Version 2.0 (the "License");
2 // you may not use this file except in compliance with the License.
3 // You may obtain a copy of the License at
4 //
5 // http://www.apache.org/licenses/LICENSE-2.0
6 //
7 // Unless required by applicable law or agreed to in writing, software
8 // distributed under the License is distributed on an "AS IS" BASIS,
9 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
10 // See the License for the specific language governing permissions and
11 // limitations under the License
12 
13 #ifndef __PROCESS_QUEUE_HPP__
14 #define __PROCESS_QUEUE_HPP__
15 
16 #include <atomic>
17 #include <deque>
18 #include <memory>
19 #include <queue>
20 
21 #include <process/future.hpp>
22 #include <process/owned.hpp>
23 
24 #include <stout/synchronized.hpp>
25 
26 namespace process {
27 
28 template <typename T>
29 class Queue
30 {
31 public:
32  Queue() : data(new Data()) {}
33 
34  // TODO(bmahler): Take a T&& here instead.
35  void put(const T& t)
36  {
37  // NOTE: We need to grab the promise 'date->promises.front()' but
38  // set it outside of the critical section because setting it might
39  // trigger callbacks that try to reacquire the lock.
41 
42  synchronized (data->lock) {
43  if (data->promises.empty()) {
44  data->elements.push(t);
45  } else {
46  promise = data->promises.front();
47  data->promises.pop_front();
48  }
49  }
50 
51  if (promise.get() != nullptr) {
52  promise->set(t);
53  }
54  }
55 
56  Future<T> get()
57  {
58  Future<T> future;
59 
60  synchronized (data->lock) {
61  if (data->elements.empty()) {
62  data->promises.emplace_back(new Promise<T>());
63  future = data->promises.back()->future();
64  } else {
65  T t = std::move(data->elements.front());
66  data->elements.pop();
67  return Future<T>(std::move(t));
68  }
69  }
70 
71  // If there were no items available, we set up a discard
72  // handler. This is done here to minimize the amount of
73  // work done within the critical section above.
74  auto weak_data = std::weak_ptr<Data>(data);
75 
76  future.onDiscard([weak_data, future]() {
77  auto data = weak_data.lock();
78  if (!data) {
79  return;
80  }
81 
82  synchronized (data->lock) {
83  for (auto it = data->promises.begin();
84  it != data->promises.end();
85  ++it) {
86  if ((*it)->future() == future) {
87  (*it)->discard();
88  data->promises.erase(it);
89  break;
90  }
91  }
92  }
93  });
94 
95  return future;
96  }
97 
98  size_t size() const
99  {
100  synchronized (data->lock) {
101  return data->elements.size();
102  }
103  }
104 
105 private:
106  struct Data
107  {
108  Data() = default;
109 
110  ~Data()
111  {
112  // TODO(benh): Fail promises?
113  }
114 
115  // Rather than use a process to serialize access to the queue's
116  // internal data we use a 'std::atomic_flag'.
117  std::atomic_flag lock = ATOMIC_FLAG_INIT;
118 
119  // Represents "waiters" for elements from the queue.
120  std::deque<Owned<Promise<T>>> promises;
121 
122  // Represents elements already put in the queue.
123  std::queue<T> elements;
124  };
125 
126  std::shared_ptr<Data> data;
127 };
128 
129 } // namespace process {
130 
131 #endif // __PROCESS_QUEUE_HPP__
const Future< T > & onDiscard(DiscardCallback &&callback) const
Definition: future.hpp:1354
Queue()
Definition: queue.hpp:32
void put(const T &t)
Definition: queue.hpp:35
T * get() const
Definition: owned.hpp:117
bool discard()
Definition: future.hpp:1157
Definition: future.hpp:74
Protocol< PromiseRequest, PromiseResponse > promise
Definition: queue.hpp:29
Definition: executor.hpp:48
size_t size() const
Definition: queue.hpp:98
Definition: owned.hpp:36
Definition: future.hpp:58