Apache Mesos
limiter.hpp
Go to the documentation of this file.
1 // Licensed under the Apache License, Version 2.0 (the "License");
2 // you may not use this file except in compliance with the License.
3 // You may obtain a copy of the License at
4 //
5 // http://www.apache.org/licenses/LICENSE-2.0
6 //
7 // Unless required by applicable law or agreed to in writing, software
8 // distributed under the License is distributed on an "AS IS" BASIS,
9 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
10 // See the License for the specific language governing permissions and
11 // limitations under the License
12 
13 #ifndef __PROCESS_LIMITER_HPP__
14 #define __PROCESS_LIMITER_HPP__
15 
16 #include <deque>
17 
18 #include <process/defer.hpp>
19 #include <process/delay.hpp>
20 #include <process/dispatch.hpp>
21 #include <process/id.hpp>
22 #include <process/future.hpp>
23 #include <process/process.hpp>
24 #include <process/timeout.hpp>
25 
26 #include <stout/duration.hpp>
27 #include <stout/foreach.hpp>
28 #include <stout/nothing.hpp>
29 
30 namespace process {
31 
32 // Forward declaration.
33 class RateLimiterProcess;
34 
35 // Provides an abstraction that rate limits the number of "permits"
36 // that can be acquired over some duration.
37 // NOTE: Currently, each libprocess Process should use a separate
38 // RateLimiter instance. This is because if multiple processes share
39 // a RateLimiter instance, by the time a process acts on the Future
40 // returned by 'acquire()' another process might have acquired the
41 // next permit and do its rate limited operation.
43 {
44 public:
45  RateLimiter(int permits, const Duration& duration);
46  explicit RateLimiter(double permitsPerSecond);
47  virtual ~RateLimiter();
48 
49  // Returns a future that becomes ready when the permit is acquired.
50  // Discarding this future cancels this acquisition.
51  virtual Future<Nothing> acquire() const;
52 
53 private:
54  // Not copyable, not assignable.
55  RateLimiter(const RateLimiter&);
56  RateLimiter& operator=(const RateLimiter&);
57 
59 };
60 
61 
62 class RateLimiterProcess : public Process<RateLimiterProcess>
63 {
64 public:
65  RateLimiterProcess(int permits, const Duration& duration)
66  : ProcessBase(ID::generate("__limiter__"))
67  {
68  CHECK_GT(permits, 0);
69  CHECK_GT(duration.secs(), 0);
70  permitsPerSecond = permits / duration.secs();
71  }
72 
73  explicit RateLimiterProcess(double _permitsPerSecond)
74  : ProcessBase(ID::generate("__limiter__")),
75  permitsPerSecond(_permitsPerSecond)
76  {
77  CHECK_GT(permitsPerSecond, 0);
78  }
79 
80  void finalize() override
81  {
82  foreach (Promise<Nothing>* promise, promises) {
83  promise->discard();
84  delete promise;
85  }
86  promises.clear();
87  }
88 
90  {
91  if (!promises.empty()) {
92  // Need to wait for others to get permits first.
94  promises.push_back(promise);
95  return promise->future()
96  .onDiscard(defer(self(), &Self::discard, promise->future()));
97  }
98 
99  if (timeout.remaining() > Seconds(0)) {
100  // Need to wait a bit longer, but first one in the queue.
102  promises.push_back(promise);
103  delay(timeout.remaining(), self(), &Self::_acquire);
104  return promise->future()
105  .onDiscard(defer(self(), &Self::discard, promise->future()));
106  }
107 
108  // No need to wait!
109  timeout = Seconds(1) / permitsPerSecond;
110  return Nothing();
111  }
112 
113 private:
114  // Not copyable, not assignable.
116  RateLimiterProcess& operator=(const RateLimiterProcess&);
117 
118  void _acquire()
119  {
120  CHECK(!promises.empty());
121 
122  // Keep removing the top of the queue until we find a promise
123  // whose future is not discarded.
124  while (!promises.empty()) {
125  Promise<Nothing>* promise = promises.front();
126  promises.pop_front();
127  if (!promise->future().isDiscarded()) {
128  promise->set(Nothing());
129  delete promise;
130  timeout = Seconds(1) / permitsPerSecond;
131  break;
132  } else {
133  delete promise;
134  }
135  }
136 
137  // Repeat if necessary.
138  if (!promises.empty()) {
139  delay(timeout.remaining(), self(), &Self::_acquire);
140  }
141  }
142 
143  void discard(const Future<Nothing>& future)
144  {
145  foreach (Promise<Nothing>* promise, promises) {
146  if (promise->future() == future) {
147  promise->discard();
148  }
149  }
150  }
151 
152  double permitsPerSecond;
153 
154  Timeout timeout;
155 
156  std::deque<Promise<Nothing>*> promises;
157 };
158 
159 
160 inline RateLimiter::RateLimiter(int permits, const Duration& duration)
161 {
162  process = new RateLimiterProcess(permits, duration);
163  spawn(process);
164 }
165 
166 
167 inline RateLimiter::RateLimiter(double permitsPerSecond)
168 {
169  process = new RateLimiterProcess(permitsPerSecond);
170  spawn(process);
171 }
172 
173 
175 {
177  wait(process);
178  delete process;
179 }
180 
181 
183 {
185 }
186 
187 } // namespace process {
188 
189 #endif // __PROCESS_LIMITER_HPP__
std::string generate(const std::string &prefix="")
Returns &#39;prefix(N)&#39; where N represents the number of instances where the same prefix (wrt...
Definition: nothing.hpp:16
bool set(const T &_t)
Definition: future.hpp:827
const Future< T > & onDiscard(DiscardCallback &&callback) const
Definition: future.hpp:1354
Definition: process.hpp:72
virtual Future< Nothing > acquire() const
Definition: limiter.hpp:182
RateLimiterProcess(int permits, const Duration &duration)
Definition: limiter.hpp:65
UPID spawn(ProcessBase *process, bool manage=false)
Spawn a new process.
Definition: duration.hpp:32
void terminate(const UPID &pid, bool inject=true)
Sends a TerminateEvent to the given process.
Definition: limiter.hpp:62
void dispatch(const PID< T > &pid, void(T::*method)())
Definition: dispatch.hpp:174
bool isDiscarded() const
Definition: future.hpp:1222
Definition: duration.hpp:207
RateLimiterProcess(double _permitsPerSecond)
Definition: limiter.hpp:73
void discard(WeakFuture< T > reference)
Definition: future.hpp:759
Timer delay(const Duration &duration, const PID< T > &pid, void(T::*method)())
Definition: delay.hpp:31
Protocol< PromiseRequest, PromiseResponse > promise
double secs() const
Definition: duration.hpp:49
void discard(const Futures &futures)
Definition: future.hpp:991
bool wait(const UPID &pid, const Duration &duration=Seconds(-1))
Wait for the process to exit for no more than the specified seconds.
Definition: timeout.hpp:24
Definition: executor.hpp:48
Future< T > future() const
Definition: future.hpp:913
bool discard()
Definition: future.hpp:810
virtual ~RateLimiter()
Definition: limiter.hpp:174
void finalize() override
Invoked when a process is terminated.
Definition: limiter.hpp:80
Definition: process.hpp:505
Deferred< void()> defer(const PID< T > &pid, void(T::*method)())
Definition: defer.hpp:35
Definition: limiter.hpp:42
Future< Nothing > acquire()
Definition: limiter.hpp:89
RateLimiter(int permits, const Duration &duration)
Definition: limiter.hpp:160