Apache Mesos
event.hpp
Go to the documentation of this file.
1 // Licensed under the Apache License, Version 2.0 (the "License");
2 // you may not use this file except in compliance with the License.
3 // You may obtain a copy of the License at
4 //
5 // http://www.apache.org/licenses/LICENSE-2.0
6 //
7 // Unless required by applicable law or agreed to in writing, software
8 // distributed under the License is distributed on an "AS IS" BASIS,
9 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
10 // See the License for the specific language governing permissions and
11 // limitations under the License
12 
13 #ifndef __PROCESS_EVENT_HPP__
14 #define __PROCESS_EVENT_HPP__
15 
16 #include <memory> // TODO(benh): Replace shared_ptr with unique_ptr.
17 
18 #include <process/future.hpp>
19 #include <process/http.hpp>
20 #include <process/message.hpp>
21 #include <process/socket.hpp>
22 
23 #include <stout/abort.hpp>
24 #include <stout/json.hpp>
25 #include <stout/lambda.hpp>
26 
27 namespace process {
28 
29 // Forward declarations.
30 class ProcessBase;
31 struct MessageEvent;
32 struct DispatchEvent;
33 struct HttpEvent;
34 struct ExitedEvent;
35 struct TerminateEvent;
36 
37 
39 {
40  virtual ~EventVisitor() {}
41  virtual void visit(const MessageEvent&) {}
42  virtual void visit(const DispatchEvent&) {}
43  virtual void visit(const HttpEvent&) {}
44  virtual void visit(const ExitedEvent&) {}
45  virtual void visit(const TerminateEvent&) {}
46 };
47 
48 
50 {
51  virtual ~EventConsumer() {}
52  virtual void consume(MessageEvent&&) {}
53  virtual void consume(DispatchEvent&&) {}
54  virtual void consume(HttpEvent&&) {}
55  virtual void consume(ExitedEvent&&) {}
56  virtual void consume(TerminateEvent&&) {}
57 };
58 
59 
60 struct Event
61 {
62  virtual ~Event() {}
63 
64  virtual void visit(EventVisitor* visitor) const = 0;
65  virtual void consume(EventConsumer* consumer) && = 0;
66 
67  template <typename T>
68  bool is() const
69  {
70  bool result = false;
71  struct IsVisitor : EventVisitor
72  {
73  explicit IsVisitor(bool* _result) : result(_result) {}
74  virtual void visit(const T&) { *result = true; }
75  bool* result;
76  } visitor(&result);
77  visit(&visitor);
78  return result;
79  }
80 
81  template <typename T>
82  const T& as() const
83  {
84  const T* result = nullptr;
85  struct AsVisitor : EventVisitor
86  {
87  explicit AsVisitor(const T** _result) : result(_result) {}
88  virtual void visit(const T& t) { *result = &t; }
89  const T** result;
90  } visitor(&result);
91  visit(&visitor);
92  if (result == nullptr) {
93  ABORT("Attempting to \"cast\" event incorrectly!");
94  }
95  return *result;
96  }
97 
98  // JSON representation for an Event.
99  operator JSON::Object() const;
100 };
101 
102 
104 {
105  explicit MessageEvent(Message&& _message)
106  : message(std::move(_message)) {}
107 
109  const UPID& from,
110  const UPID& to,
111  const std::string& name,
112  const char* data,
113  size_t length)
114  : message{name, from, to, std::string(data, length)} {}
115 
117  const UPID& from,
118  const UPID& to,
119  std::string&& name,
120  std::string&& data)
121  : message{std::move(name), from, to, std::move(data)} {}
122 
123  MessageEvent(MessageEvent&& that) = default;
124  MessageEvent(const MessageEvent& that) = delete;
125  MessageEvent& operator=(MessageEvent&&) = default;
126  MessageEvent& operator=(const MessageEvent&) = delete;
127 
128  void visit(EventVisitor* visitor) const override
129  {
130  visitor->visit(*this);
131  }
132 
133  void consume(EventConsumer* consumer) && override
134  {
135  consumer->consume(std::move(*this));
136  }
137 
139 };
140 
141 
142 struct HttpEvent : Event
143 {
145  http::Request* _request,
146  Promise<http::Response>* _response)
147  : request(_request),
148  response(_response) {}
149 
150  HttpEvent(HttpEvent&&) = default;
151  HttpEvent(const HttpEvent&) = delete;
152  HttpEvent& operator=(HttpEvent&&) = default;
153  HttpEvent& operator=(const HttpEvent&) = delete;
154 
155  virtual ~HttpEvent()
156  {
157  if (response) {
158  // Fail the response in case it wasn't set.
159  response->set(http::InternalServerError());
160  }
161  }
162 
163  void visit(EventVisitor* visitor) const override
164  {
165  visitor->visit(*this);
166  }
167 
168  void consume(EventConsumer* consumer) && override
169  {
170  consumer->consume(std::move(*this));
171  }
172 
173  std::unique_ptr<http::Request> request;
174  std::unique_ptr<Promise<http::Response>> response;
175 };
176 
177 
179 {
181  const UPID& _pid,
182  std::unique_ptr<lambda::CallableOnce<void(ProcessBase*)>> _f,
183  const Option<const std::type_info*>& _functionType)
184  : pid(_pid),
185  f(std::move(_f)),
186  functionType(_functionType)
187  {}
188 
189  DispatchEvent(DispatchEvent&&) = default;
190  DispatchEvent(const DispatchEvent&) = delete;
191  DispatchEvent& operator=(DispatchEvent&&) = default;
192  DispatchEvent& operator=(const DispatchEvent&) = delete;
193 
194  void visit(EventVisitor* visitor) const override
195  {
196  visitor->visit(*this);
197  }
198 
199  void consume(EventConsumer* consumer) && override
200  {
201  consumer->consume(std::move(*this));
202  }
203 
204  // PID receiving the dispatch.
206 
207  // Function to get invoked as a result of this dispatch event.
208  std::unique_ptr<lambda::CallableOnce<void(ProcessBase*)>> f;
209 
211 };
212 
213 
215 {
216  explicit ExitedEvent(const UPID& _pid)
217  : pid(_pid) {}
218 
219  ExitedEvent(ExitedEvent&&) = default;
220  ExitedEvent(const ExitedEvent&) = delete;
221  ExitedEvent& operator=(ExitedEvent&&) = default;
222  ExitedEvent& operator=(const ExitedEvent&) = delete;
223 
224  void visit(EventVisitor* visitor) const override
225  {
226  visitor->visit(*this);
227  }
228 
229  void consume(EventConsumer* consumer) && override
230  {
231  consumer->consume(std::move(*this));
232  }
233 
235 };
236 
237 
239 {
240  TerminateEvent(const UPID& _from, bool _inject)
241  : from(_from), inject(_inject) {}
242 
243  TerminateEvent(TerminateEvent&&) = default;
244  TerminateEvent(const TerminateEvent&) = delete;
245  TerminateEvent& operator=(TerminateEvent&&) = default;
246  TerminateEvent& operator=(const TerminateEvent&) = delete;
247 
248  void visit(EventVisitor* visitor) const override
249  {
250  visitor->visit(*this);
251  }
252 
253  void consume(EventConsumer* consumer) && override
254  {
255  consumer->consume(std::move(*this));
256  }
257 
259  bool inject;
260 };
261 
262 
263 inline Event::operator JSON::Object() const
264 {
265  JSON::Object object;
266 
267  struct Visitor : EventVisitor
268  {
269  explicit Visitor(JSON::Object* _object) : object(_object) {}
270 
271  virtual void visit(const MessageEvent& event)
272  {
273  object->values["type"] = "MESSAGE";
274 
275  const Message& message = event.message;
276 
277  object->values["name"] = message.name;
278  object->values["from"] = stringify(message.from);
279  object->values["to"] = stringify(message.to);
280  object->values["body"] = message.body;
281  }
282 
283  virtual void visit(const HttpEvent& event)
284  {
285  object->values["type"] = "HTTP";
286 
287  const http::Request& request = *event.request;
288 
289  object->values["method"] = request.method;
290  object->values["url"] = stringify(request.url);
291  }
292 
293  virtual void visit(const DispatchEvent& event)
294  {
295  object->values["type"] = "DISPATCH";
296  }
297 
298  virtual void visit(const ExitedEvent& event)
299  {
300  object->values["type"] = "EXITED";
301  }
302 
303  virtual void visit(const TerminateEvent& event)
304  {
305  object->values["type"] = "TERMINATE";
306  }
307 
308  JSON::Object* object;
309  } visitor(&object);
310 
311  visit(&visitor);
312 
313  return object;
314 }
315 
316 } // namespace process {
317 
318 #endif // __PROCESS_EVENT_HPP__
#define ABORT(...)
Definition: abort.hpp:40
void visit(EventVisitor *visitor) const override
Definition: event.hpp:194
virtual void visit(const TerminateEvent &)
Definition: event.hpp:45
Future< Response > request(const Request &request, bool streamedResponse=false)
Asynchronously sends an HTTP request to the process and returns the HTTP response once the entire res...
F && f
Definition: defer.hpp:270
URL url
Definition: http.hpp:528
MessageEvent(const UPID &from, const UPID &to, const std::string &name, const char *data, size_t length)
Definition: event.hpp:108
Definition: event.hpp:142
Definition: message.hpp:22
void consume(EventConsumer *consumer)&&override
Definition: event.hpp:253
Definition: type_utils.hpp:510
std::unique_ptr< Promise< http::Response > > response
Definition: event.hpp:174
Definition: event.hpp:178
Definition: process.hpp:72
bool is() const
Definition: event.hpp:68
virtual void visit(const MessageEvent &)
Definition: event.hpp:41
UPID pid
Definition: event.hpp:234
void consume(EventConsumer *consumer)&&override
Definition: event.hpp:133
UPID pid
Definition: event.hpp:205
Option< const std::type_info * > functionType
Definition: event.hpp:210
ExitedEvent(const UPID &_pid)
Definition: event.hpp:216
Definition: event.hpp:238
Definition: event.hpp:214
Definition: http.hpp:517
Definition: json.hpp:154
UPID from
Definition: event.hpp:258
UPID to
Definition: message.hpp:26
void consume(EventConsumer *consumer)&&override
Definition: event.hpp:168
UPID from
Definition: message.hpp:25
Message message
Definition: event.hpp:138
HttpEvent(http::Request *_request, Promise< http::Response > *_response)
Definition: event.hpp:144
An "untyped" PID, used to encapsulate the process ID for lower-layer abstractions (eg...
Definition: pid.hpp:39
void visit(EventVisitor *visitor) const override
Definition: event.hpp:163
Definition: future.hpp:74
void consume(EventConsumer *consumer)&&override
Definition: event.hpp:199
std::unique_ptr< http::Request > request
Definition: event.hpp:173
void visit(EventVisitor *visitor) const override
Definition: event.hpp:224
Definition: http.hpp:820
virtual void visit(const HttpEvent &)
Definition: event.hpp:43
const T & as() const
Definition: event.hpp:82
Definition: event.hpp:49
virtual void consume(MessageEvent &&)
Definition: event.hpp:52
MessageEvent(Message &&_message)
Definition: event.hpp:105
virtual ~Event()
Definition: event.hpp:62
virtual void consume(TerminateEvent &&)
Definition: event.hpp:56
virtual void consume(DispatchEvent &&)
Definition: event.hpp:53
std::string method
Definition: http.hpp:522
Definition: executor.hpp:48
virtual void consume(HttpEvent &&)
Definition: event.hpp:54
Definition: event.hpp:38
void visit(EventVisitor *visitor) const override
Definition: event.hpp:128
std::string body
Definition: message.hpp:27
virtual void consume(ExitedEvent &&)
Definition: event.hpp:55
Definition: event.hpp:103
DispatchEvent(const UPID &_pid, std::unique_ptr< lambda::CallableOnce< void(ProcessBase *)>> _f, const Option< const std::type_info * > &_functionType)
Definition: event.hpp:180
virtual void visit(const DispatchEvent &)
Definition: event.hpp:42
virtual ~EventVisitor()
Definition: event.hpp:40
virtual ~HttpEvent()
Definition: event.hpp:155
std::string stringify(int flags)
std::unique_ptr< lambda::CallableOnce< void(ProcessBase *)> > f
Definition: event.hpp:208
bool inject
Definition: event.hpp:259
virtual void visit(const ExitedEvent &)
Definition: event.hpp:44
Definition: event.hpp:60
std::string name
Definition: message.hpp:24
constexpr const char * name
Definition: shell.hpp:43
void visit(EventVisitor *visitor) const override
Definition: event.hpp:248
virtual ~EventConsumer()
Definition: event.hpp:51
MessageEvent(const UPID &from, const UPID &to, std::string &&name, std::string &&data)
Definition: event.hpp:116
void consume(EventConsumer *consumer)&&override
Definition: event.hpp:229
Definition: lambda.hpp:414
TerminateEvent(const UPID &_from, bool _inject)
Definition: event.hpp:240