Apache Mesos
rwlock.hpp
Go to the documentation of this file.
1 // Licensed under the Apache License, Version 2.0 (the "License");
2 // you may not use this file except in compliance with the License.
3 // You may obtain a copy of the License at
4 //
5 // http://www.apache.org/licenses/LICENSE-2.0
6 //
7 // Unless required by applicable law or agreed to in writing, software
8 // distributed under the License is distributed on an "AS IS" BASIS,
9 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
10 // See the License for the specific language governing permissions and
11 // limitations under the License
12 
13 #ifndef __PROCESS_RWMUTEX_HPP__
14 #define __PROCESS_RWMUTEX_HPP__
15 
16 #include <atomic>
17 #include <memory>
18 #include <queue>
19 
20 #include <process/future.hpp>
21 #include <process/owned.hpp>
22 
23 #include <stout/nothing.hpp>
24 #include <stout/synchronized.hpp>
25 
26 namespace process {
27 
37 {
38 public:
39  ReadWriteLock() : data(new Data()) {}
40 
41  // TODO(bmahler): Consider returning a 'Locked' object in the
42  // future as the mechanism for unlocking, rather than exposing
43  // unlocking functions for all to call.
45  {
46  Future<Nothing> future = Nothing();
47 
48  synchronized (data->lock) {
49  if (!data->write_locked && data->read_locked == 0u) {
50  data->write_locked = true;
51  } else {
52  Waiter w{Waiter::WRITE};
53  future = w.promise.future();
54  data->waiters.push(std::move(w));
55  }
56  }
57 
58  return future;
59  }
60 
61  void write_unlock()
62  {
63  // NOTE: We need to satisfy the waiter(s) futures outside the
64  // critical section because it might trigger callbacks which
65  // try to reacquire a read or write lock.
66  std::queue<Waiter> unblocked;
67 
68  synchronized (data->lock) {
69  CHECK(data->write_locked);
70  CHECK_EQ(data->read_locked, 0u);
71 
72  data->write_locked = false;
73 
74  if (!data->waiters.empty()) {
75  switch (data->waiters.front().type) {
76  case Waiter::READ:
77  // Dequeue the group of readers at the front.
78  while (!data->waiters.empty() &&
79  data->waiters.front().type == Waiter::READ) {
80  unblocked.push(std::move(data->waiters.front()));
81  data->waiters.pop();
82  }
83 
84  data->read_locked = unblocked.size();
85 
86  break;
87 
88  case Waiter::WRITE:
89  unblocked.push(std::move(data->waiters.front()));
90  data->waiters.pop();
91  data->write_locked = true;
92 
93  CHECK_EQ(data->read_locked, 0u);
94 
95  break;
96  }
97  }
98  }
99 
100  while (!unblocked.empty()) {
101  unblocked.front().promise.set(Nothing());
102  unblocked.pop();
103  }
104  }
105 
106  // TODO(bmahler): Consider returning a 'Locked' object in the
107  // future as the mechanism for unlocking, rather than exposing
108  // unlocking functions for all to call.
110  {
111  Future<Nothing> future = Nothing();
112 
113  synchronized (data->lock) {
114  if (!data->write_locked && data->waiters.empty()) {
115  data->read_locked++;
116  } else {
117  Waiter w{Waiter::READ};
118  future = w.promise.future();
119  data->waiters.push(std::move(w));
120  }
121  }
122 
123  return future;
124  }
125 
126  void read_unlock()
127  {
128  // NOTE: We need to satisfy the waiter future outside the
129  // critical section because it might trigger callbacks which
130  // try to reacquire a read or write lock.
131  Option<Waiter> waiter;
132 
133  synchronized (data->lock) {
134  CHECK(!data->write_locked);
135  CHECK_GT(data->read_locked, 0u);
136 
137  data->read_locked--;
138 
139  if (data->read_locked == 0u && !data->waiters.empty()) {
140  CHECK_EQ(data->waiters.front().type, Waiter::WRITE);
141 
142  waiter = std::move(data->waiters.front());
143  data->waiters.pop();
144  data->write_locked = true;
145  }
146  }
147 
148  if (waiter.isSome()) {
149  waiter->promise.set(Nothing());
150  }
151  }
152 
153 private:
154  struct Waiter
155  {
156  enum { READ, WRITE } type;
158  };
159 
160  struct Data
161  {
162  Data() : read_locked(0), write_locked(false) {}
163 
164  ~Data()
165  {
166  // TODO(zhitao): Fail promises?
167  }
168 
169  // The state of the lock can be either:
170  // (1) Unlocked: an incoming read or write grabs the lock.
171  //
172  // (2) Read locked (by one or more readers): an incoming write
173  // will queue in the waiters. An incoming read will proceed
174  // if no one is waiting, otherwise it will queue.
175  //
176  // (3) Write locked: incoming reads and writes will queue.
177 
178  size_t read_locked;
179  bool write_locked;
180  std::queue<Waiter> waiters;
181 
182  // Rather than use a process to serialize access to the
183  // internal data we use a 'std::atomic_flag'.
184  std::atomic_flag lock = ATOMIC_FLAG_INIT;
185  };
186 
187  std::shared_ptr<Data> data;
188 };
189 
190 } // namespace process {
191 
192 #endif // __PROCESS_RWMUTEX_HPP__
const short READ
A possible event while polling.
Definition: io.hpp:34
Future< Nothing > read_lock()
Definition: rwlock.hpp:109
Definition: nothing.hpp:16
Definition: option.hpp:29
void write_unlock()
Definition: rwlock.hpp:61
void read_unlock()
Definition: rwlock.hpp:126
bool isSome() const
Definition: option.hpp:116
Future< Nothing > write_lock()
Definition: rwlock.hpp:44
const short WRITE
A possible event while polling.
Definition: io.hpp:40
Protocol< PromiseRequest, PromiseResponse > promise
Definition: executor.hpp:48
ReadWriteLock is a lock that allows concurrent reads and exclusive writes.
Definition: rwlock.hpp:36
Try< uint32_t > type(const std::string &path)
ReadWriteLock()
Definition: rwlock.hpp:39