Apache Mesos
heartbeater.hpp
Go to the documentation of this file.
1 // Licensed to the Apache Software Foundation (ASF) under one
2 // or more contributor license agreements. See the NOTICE file
3 // distributed with this work for additional information
4 // regarding copyright ownership. The ASF licenses this file
5 // to you under the Apache License, Version 2.0 (the
6 // "License"); you may not use this file except in compliance
7 // with the License. You may obtain a copy of the License at
8 //
9 // http://www.apache.org/licenses/LICENSE-2.0
10 //
11 // Unless required by applicable law or agreed to in writing, software
12 // distributed under the License is distributed on an "AS IS" BASIS,
13 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 // See the License for the specific language governing permissions and
15 // limitations under the License.
16 
17 #ifndef __COMMON_HEARTBEATER_HPP__
18 #define __COMMON_HEARTBEATER_HPP__
19 
20 #include <string>
21 
22 #include <process/delay.hpp>
23 #include <process/http.hpp>
24 #include <process/process.hpp>
25 #include <process/owned.hpp>
26 
27 #include <stout/duration.hpp>
28 #include <stout/lambda.hpp>
29 #include <stout/option.hpp>
30 
31 #include "common/http.hpp"
32 
33 namespace mesos {
34 namespace internal {
35 
36 // This process periodically sends heartbeats to a given HTTP streaming
37 // response. The optional delay parameter is used to specify the delay
38 // period before sending the first heartbeat. The optional callback parameter
39 // will be called each time a heartbeat is sent.
40 template<typename Message, typename Event>
42  : public process::Process<ResponseHeartbeaterProcess<Message, Event>>
43 {
44 public:
46  const std::string& _logMessage,
47  const Message& _heartbeatMessage,
48  const StreamingHttpConnection<Event>& _connection,
49  const Duration& _interval,
50  const Option<Duration>& _delay = None(),
51  const Option<lambda::function<void()>>& _callback = None())
52  : process::ProcessBase(process::ID::generate("heartbeater")),
53  logMessage(_logMessage),
54  heartbeatMessage(_heartbeatMessage),
55  connection(_connection),
56  interval(_interval),
57  delay(_delay),
58  callback(_callback) {}
59 
60 protected:
61  void initialize() override
62  {
63  if (delay.isSome()) {
65  delay.get(),
66  this,
67  &ResponseHeartbeaterProcess::heartbeat);
68  } else {
69  heartbeat();
70  }
71  }
72 
73 private:
74  void heartbeat()
75  {
76  // Only send a heartbeat if the connection is not closed.
77  if (connection.closed().isPending()) {
78  VLOG(2) << "Sending heartbeat to " << logMessage;
79 
80  if (callback.isSome()) {
81  callback.get()();
82  }
83 
84  connection.send(heartbeatMessage);
85  }
86 
87  process::delay(interval, this, &ResponseHeartbeaterProcess::heartbeat);
88  }
89 
90  const std::string logMessage;
91  const Message heartbeatMessage;
93  const Duration interval;
94  const Option<Duration> delay;
95  const Option<lambda::function<void()>> callback;
96 };
97 
98 
99 template<typename Message, typename Event>
101 {
102 public:
104  const std::string& _logMessage,
105  const Message& _heartbeatMessage,
106  const StreamingHttpConnection<Event>& _connection,
107  const Duration& _interval,
108  const Option<Duration>& _delay = None(),
109  const Option<lambda::function<void()>>& _callback = None())
110  : process(new ResponseHeartbeaterProcess<Message, Event>(
111  _logMessage,
112  _heartbeatMessage,
113  _connection,
114  _interval,
115  _delay,
116  _callback))
117  {
118  process::spawn(process.get());
119  }
120 
122  {
124  process::wait(process.get());
125  }
126 
127  // Not copyable, not assignable.
128  ResponseHeartbeater(const ResponseHeartbeater&) = delete;
129  ResponseHeartbeater& operator=(const ResponseHeartbeater&) = delete;
130 
131 private:
133 };
134 
135 } // namespace internal {
136 } // namespace mesos {
137 
138 #endif // __COMMON_HEARTBEATER_HPP__
std::string generate(const std::string &prefix="")
Returns &#39;prefix(N)&#39; where N represents the number of instances where the same prefix (wrt...
ProcessBase(const std::string &id="")
ResponseHeartbeater(const std::string &_logMessage, const Message &_heartbeatMessage, const StreamingHttpConnection< Event > &_connection, const Duration &_interval, const Option< Duration > &_delay=None(), const Option< lambda::function< void()>> &_callback=None())
Definition: heartbeater.hpp:103
ResponseHeartbeaterProcess(const std::string &_logMessage, const Message &_heartbeatMessage, const StreamingHttpConnection< Event > &_connection, const Duration &_interval, const Option< Duration > &_delay=None(), const Option< lambda::function< void()>> &_callback=None())
Definition: heartbeater.hpp:45
Definition: heartbeater.hpp:41
UPID spawn(ProcessBase *process, bool manage=false)
Spawn a new process.
Definition: duration.hpp:32
void terminate(const UPID &pid, bool inject=true)
Sends a TerminateEvent to the given process.
bool isSome() const
Definition: option.hpp:116
mesos::v1::scheduler::Event Event
Definition: mesos.hpp:2883
~ResponseHeartbeater()
Definition: heartbeater.hpp:121
Definition: agent.hpp:25
Timer delay(const Duration &duration, const PID< T > &pid, void(T::*method)())
Definition: delay.hpp:31
const T & get() const &
Definition: option.hpp:119
bool wait(const UPID &pid, const Duration &duration=Seconds(-1))
Wait for the process to exit for no more than the specified seconds.
Definition: heartbeater.hpp:100
Result< Process > process(pid_t pid)
Definition: freebsd.hpp:30
Definition: none.hpp:27
Definition: attributes.hpp:24
Definition: executor.hpp:48
void initialize() override
Invoked when a process gets spawned.
Definition: heartbeater.hpp:61
Definition: owned.hpp:36
Definition: process.hpp:505