13 #ifndef __PROCESS_QUEUE_HPP__ 14 #define __PROCESS_QUEUE_HPP__ 42 synchronized (data->lock) {
43 if (data->promises.empty()) {
44 data->elements.push(t);
46 promise = data->promises.front();
47 data->promises.pop_front();
51 if (promise.
get() !=
nullptr) {
60 synchronized (data->lock) {
61 if (data->elements.empty()) {
63 future = data->promises.back()->future();
65 T t = std::move(data->elements.front());
74 auto weak_data = std::weak_ptr<Data>(data);
77 auto data = weak_data.lock();
82 synchronized (data->lock) {
83 for (
auto it = data->promises.begin();
84 it != data->promises.end();
86 if ((*it)->future() == future) {
88 data->promises.erase(it);
100 synchronized (data->lock) {
101 return data->elements.size();
117 std::atomic_flag lock = ATOMIC_FLAG_INIT;
120 std::deque<Owned<Promise<T>>> promises;
123 std::queue<T> elements;
126 std::shared_ptr<Data> data;
131 #endif // __PROCESS_QUEUE_HPP__
const Future< T > & onDiscard(DiscardCallback &&callback) const
Definition: future.hpp:1354
Queue()
Definition: queue.hpp:32
void put(const T &t)
Definition: queue.hpp:35
T * get() const
Definition: owned.hpp:117
bool discard()
Definition: future.hpp:1157
Definition: future.hpp:74
Protocol< PromiseRequest, PromiseResponse > promise
Definition: executor.hpp:48
size_t size() const
Definition: queue.hpp:98
Definition: future.hpp:58