13 #ifndef __PROCESS_LIMITER_HPP__ 14 #define __PROCESS_LIMITER_HPP__ 33 class RateLimiterProcess;
69 CHECK_GT(duration.
secs(), 0);
70 permitsPerSecond = permits / duration.
secs();
75 permitsPerSecond(_permitsPerSecond)
77 CHECK_GT(permitsPerSecond, 0);
91 if (!promises.empty()) {
94 promises.push_back(promise);
99 if (timeout.remaining() >
Seconds(0)) {
102 promises.push_back(promise);
103 delay(timeout.remaining(),
self(), &Self::_acquire);
109 timeout =
Seconds(1) / permitsPerSecond;
120 CHECK(!promises.empty());
124 while (!promises.empty()) {
126 promises.pop_front();
130 timeout =
Seconds(1) / permitsPerSecond;
138 if (!promises.empty()) {
139 delay(timeout.remaining(),
self(), &Self::_acquire);
146 if (promise->
future() == future) {
152 double permitsPerSecond;
156 std::deque<Promise<Nothing>*> promises;
189 #endif // __PROCESS_LIMITER_HPP__ std::string generate(const std::string &prefix="")
Returns 'prefix(N)' where N represents the number of instances where the same prefix (wrt...
Definition: nothing.hpp:16
bool set(const T &_t)
Definition: future.hpp:827
const Future< T > & onDiscard(DiscardCallback &&callback) const
Definition: future.hpp:1354
Definition: process.hpp:72
virtual Future< Nothing > acquire() const
Definition: limiter.hpp:182
RateLimiterProcess(int permits, const Duration &duration)
Definition: limiter.hpp:65
UPID spawn(ProcessBase *process, bool manage=false)
Spawn a new process.
Definition: duration.hpp:32
void terminate(const UPID &pid, bool inject=true)
Sends a TerminateEvent to the given process.
Definition: limiter.hpp:62
void dispatch(const PID< T > &pid, void(T::*method)())
Definition: dispatch.hpp:174
bool isDiscarded() const
Definition: future.hpp:1222
Definition: duration.hpp:207
RateLimiterProcess(double _permitsPerSecond)
Definition: limiter.hpp:73
void discard(WeakFuture< T > reference)
Definition: future.hpp:759
Timer delay(const Duration &duration, const PID< T > &pid, void(T::*method)())
Definition: delay.hpp:31
Protocol< PromiseRequest, PromiseResponse > promise
double secs() const
Definition: duration.hpp:49
void discard(const Futures &futures)
Definition: future.hpp:991
bool wait(const UPID &pid, const Duration &duration=Seconds(-1))
Wait for the process to exit for no more than the specified seconds.
Definition: timeout.hpp:24
Definition: executor.hpp:48
Future< T > future() const
Definition: future.hpp:913
bool discard()
Definition: future.hpp:810
virtual ~RateLimiter()
Definition: limiter.hpp:174
void finalize() override
Invoked when a process is terminated.
Definition: limiter.hpp:80
Definition: process.hpp:505
Deferred< void()> defer(const PID< T > &pid, void(T::*method)())
Definition: defer.hpp:35
Definition: limiter.hpp:42
Future< Nothing > acquire()
Definition: limiter.hpp:89
RateLimiter(int permits, const Duration &duration)
Definition: limiter.hpp:160