Apache Mesos
event.hpp
Go to the documentation of this file.
1 // Licensed under the Apache License, Version 2.0 (the "License");
2 // you may not use this file except in compliance with the License.
3 // You may obtain a copy of the License at
4 //
5 // http://www.apache.org/licenses/LICENSE-2.0
6 //
7 // Unless required by applicable law or agreed to in writing, software
8 // distributed under the License is distributed on an "AS IS" BASIS,
9 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
10 // See the License for the specific language governing permissions and
11 // limitations under the License
12 
13 #ifndef __PROCESS_EVENT_HPP__
14 #define __PROCESS_EVENT_HPP__
15 
16 #include <memory> // TODO(benh): Replace shared_ptr with unique_ptr.
17 
18 #include <process/future.hpp>
19 #include <process/http.hpp>
20 #include <process/message.hpp>
21 #include <process/socket.hpp>
22 
23 #include <stout/abort.hpp>
24 #include <stout/json.hpp>
25 #include <stout/lambda.hpp>
26 
27 namespace process {
28 
29 // Forward declarations.
30 class ProcessBase;
31 struct MessageEvent;
32 struct DispatchEvent;
33 struct HttpEvent;
34 struct ExitedEvent;
35 struct TerminateEvent;
36 
37 
39 {
40  virtual ~EventVisitor() {}
41  virtual void visit(const MessageEvent&) {}
42  virtual void visit(const DispatchEvent&) {}
43  virtual void visit(const HttpEvent&) {}
44  virtual void visit(const ExitedEvent&) {}
45  virtual void visit(const TerminateEvent&) {}
46 };
47 
48 
50 {
51  virtual ~EventConsumer() {}
52  virtual void consume(MessageEvent&&) {}
53  virtual void consume(DispatchEvent&&) {}
54  virtual void consume(HttpEvent&&) {}
55  virtual void consume(ExitedEvent&&) {}
56  virtual void consume(TerminateEvent&&) {}
57 };
58 
59 
60 struct Event
61 {
62  virtual ~Event() {}
63 
64  virtual void visit(EventVisitor* visitor) const = 0;
65  virtual void consume(EventConsumer* consumer) && = 0;
66 
67  template <typename T>
68  bool is() const
69  {
70  bool result = false;
71  struct IsVisitor : EventVisitor
72  {
73  explicit IsVisitor(bool* _result) : result(_result) {}
74  void visit(const T&) override { *result = true; }
75  bool* result;
76  } visitor(&result);
77  visit(&visitor);
78  return result;
79  }
80 
81  template <typename T>
82  const T& as() const
83  {
84  const T* result = nullptr;
85  struct AsVisitor : EventVisitor
86  {
87  explicit AsVisitor(const T** _result) : result(_result) {}
88  void visit(const T& t) override { *result = &t; }
89  const T** result;
90  } visitor(&result);
91  visit(&visitor);
92  if (result == nullptr) {
93  ABORT("Attempting to \"cast\" event incorrectly!");
94  }
95  return *result;
96  }
97 
98  // JSON representation for an Event.
99  operator JSON::Object() const;
100 };
101 
102 
104 {
105  explicit MessageEvent(Message&& _message)
106  : message(std::move(_message)) {}
107 
109  const UPID& from,
110  const UPID& to,
111  const std::string& name,
112  const char* data,
113  size_t length)
114  : message{name, from, to, std::string(data, length)} {}
115 
117  const UPID& from,
118  const UPID& to,
119  std::string&& name,
120  std::string&& data)
121  : message{std::move(name), from, to, std::move(data)} {}
122 
123  MessageEvent(MessageEvent&& that) = default;
124  MessageEvent(const MessageEvent& that) = delete;
125  MessageEvent& operator=(MessageEvent&&) = default;
126  MessageEvent& operator=(const MessageEvent&) = delete;
127 
128  void visit(EventVisitor* visitor) const override
129  {
130  visitor->visit(*this);
131  }
132 
133  void consume(EventConsumer* consumer) && override
134  {
135  consumer->consume(std::move(*this));
136  }
137 
139 };
140 
141 
142 struct HttpEvent : Event
143 {
145  http::Request* _request,
146  Promise<http::Response>* _response)
147  : request(_request),
148  response(_response) {}
149 
150  HttpEvent(HttpEvent&&) = default;
151  HttpEvent(const HttpEvent&) = delete;
152  HttpEvent& operator=(HttpEvent&&) = default;
153  HttpEvent& operator=(const HttpEvent&) = delete;
154 
155  ~HttpEvent() override
156  {
157  if (response) {
158  // Fail the response in case it wasn't set.
159  response->set(http::InternalServerError());
160  }
161  }
162 
163  void visit(EventVisitor* visitor) const override
164  {
165  visitor->visit(*this);
166  }
167 
168  void consume(EventConsumer* consumer) && override
169  {
170  consumer->consume(std::move(*this));
171  }
172 
173  std::unique_ptr<http::Request> request;
174  std::unique_ptr<Promise<http::Response>> response;
175 };
176 
177 
179 {
181  std::unique_ptr<lambda::CallableOnce<void(ProcessBase*)>> _f,
182  const Option<const std::type_info*>& _functionType)
183  : f(std::move(_f)),
184  functionType(_functionType)
185  {}
186 
187  DispatchEvent(DispatchEvent&&) = default;
188  DispatchEvent(const DispatchEvent&) = delete;
189  DispatchEvent& operator=(DispatchEvent&&) = default;
190  DispatchEvent& operator=(const DispatchEvent&) = delete;
191 
192  void visit(EventVisitor* visitor) const override
193  {
194  visitor->visit(*this);
195  }
196 
197  void consume(EventConsumer* consumer) && override
198  {
199  consumer->consume(std::move(*this));
200  }
201 
202  // Function to get invoked as a result of this dispatch event.
203  std::unique_ptr<lambda::CallableOnce<void(ProcessBase*)>> f;
204 
206 };
207 
208 
210 {
211  explicit ExitedEvent(const UPID& _pid)
212  : pid(_pid) {}
213 
214  ExitedEvent(ExitedEvent&&) = default;
215  ExitedEvent(const ExitedEvent&) = delete;
216  ExitedEvent& operator=(ExitedEvent&&) = default;
217  ExitedEvent& operator=(const ExitedEvent&) = delete;
218 
219  void visit(EventVisitor* visitor) const override
220  {
221  visitor->visit(*this);
222  }
223 
224  void consume(EventConsumer* consumer) && override
225  {
226  consumer->consume(std::move(*this));
227  }
228 
230 };
231 
232 
234 {
235  TerminateEvent(const UPID& _from, bool _inject)
236  : from(_from), inject(_inject) {}
237 
238  TerminateEvent(TerminateEvent&&) = default;
239  TerminateEvent(const TerminateEvent&) = delete;
240  TerminateEvent& operator=(TerminateEvent&&) = default;
241  TerminateEvent& operator=(const TerminateEvent&) = delete;
242 
243  void visit(EventVisitor* visitor) const override
244  {
245  visitor->visit(*this);
246  }
247 
248  void consume(EventConsumer* consumer) && override
249  {
250  consumer->consume(std::move(*this));
251  }
252 
254  bool inject;
255 };
256 
257 
258 inline Event::operator JSON::Object() const
259 {
260  JSON::Object object;
261 
262  struct Visitor : EventVisitor
263  {
264  explicit Visitor(JSON::Object* _object) : object(_object) {}
265 
266  void visit(const MessageEvent& event) override
267  {
268  object->values["type"] = "MESSAGE";
269 
270  const Message& message = event.message;
271 
272  object->values["name"] = message.name;
273  object->values["from"] = stringify(message.from);
274  object->values["to"] = stringify(message.to);
275  object->values["body"] = message.body;
276  }
277 
278  void visit(const HttpEvent& event) override
279  {
280  object->values["type"] = "HTTP";
281 
282  const http::Request& request = *event.request;
283 
284  object->values["method"] = request.method;
285  object->values["url"] = stringify(request.url);
286  }
287 
288  void visit(const DispatchEvent& event) override
289  {
290  object->values["type"] = "DISPATCH";
291  }
292 
293  void visit(const ExitedEvent& event) override
294  {
295  object->values["type"] = "EXITED";
296  }
297 
298  void visit(const TerminateEvent& event) override
299  {
300  object->values["type"] = "TERMINATE";
301  }
302 
303  JSON::Object* object;
304  } visitor(&object);
305 
306  visit(&visitor);
307 
308  return object;
309 }
310 
311 } // namespace process {
312 
313 #endif // __PROCESS_EVENT_HPP__
#define ABORT(...)
Definition: abort.hpp:40
void visit(EventVisitor *visitor) const override
Definition: event.hpp:192
virtual void visit(const TerminateEvent &)
Definition: event.hpp:45
Future< Response > request(const Request &request, bool streamedResponse=false)
Asynchronously sends an HTTP request to the process and returns the HTTP response once the entire res...
F && f
Definition: defer.hpp:270
URL url
Definition: http.hpp:531
MessageEvent(const UPID &from, const UPID &to, const std::string &name, const char *data, size_t length)
Definition: event.hpp:108
Definition: event.hpp:142
Definition: message.hpp:22
void consume(EventConsumer *consumer)&&override
Definition: event.hpp:248
Definition: type_utils.hpp:550
std::unique_ptr< Promise< http::Response > > response
Definition: event.hpp:174
Definition: event.hpp:178
Definition: process.hpp:72
~HttpEvent() override
Definition: event.hpp:155
bool is() const
Definition: event.hpp:68
virtual void visit(const MessageEvent &)
Definition: event.hpp:41
UPID pid
Definition: event.hpp:229
void consume(EventConsumer *consumer)&&override
Definition: event.hpp:133
Option< const std::type_info * > functionType
Definition: event.hpp:205
ExitedEvent(const UPID &_pid)
Definition: event.hpp:211
Definition: event.hpp:233
Definition: event.hpp:209
Definition: http.hpp:520
Definition: json.hpp:158
UPID from
Definition: event.hpp:253
UPID to
Definition: message.hpp:26
void consume(EventConsumer *consumer)&&override
Definition: event.hpp:168
UPID from
Definition: message.hpp:25
Message message
Definition: event.hpp:138
HttpEvent(http::Request *_request, Promise< http::Response > *_response)
Definition: event.hpp:144
An "untyped" PID, used to encapsulate the process ID for lower-layer abstractions (eg...
Definition: pid.hpp:39
void visit(EventVisitor *visitor) const override
Definition: event.hpp:163
Definition: future.hpp:74
void consume(EventConsumer *consumer)&&override
Definition: event.hpp:197
std::unique_ptr< http::Request > request
Definition: event.hpp:173
void visit(EventVisitor *visitor) const override
Definition: event.hpp:219
Definition: http.hpp:825
virtual void visit(const HttpEvent &)
Definition: event.hpp:43
const T & as() const
Definition: event.hpp:82
Definition: event.hpp:49
virtual void consume(MessageEvent &&)
Definition: event.hpp:52
MessageEvent(Message &&_message)
Definition: event.hpp:105
DispatchEvent(std::unique_ptr< lambda::CallableOnce< void(ProcessBase *)>> _f, const Option< const std::type_info * > &_functionType)
Definition: event.hpp:180
virtual ~Event()
Definition: event.hpp:62
virtual void consume(TerminateEvent &&)
Definition: event.hpp:56
virtual void consume(DispatchEvent &&)
Definition: event.hpp:53
std::string method
Definition: http.hpp:525
Definition: executor.hpp:48
virtual void consume(HttpEvent &&)
Definition: event.hpp:54
Definition: event.hpp:38
void visit(EventVisitor *visitor) const override
Definition: event.hpp:128
std::string body
Definition: message.hpp:27
virtual void consume(ExitedEvent &&)
Definition: event.hpp:55
Definition: event.hpp:103
virtual void visit(const DispatchEvent &)
Definition: event.hpp:42
virtual ~EventVisitor()
Definition: event.hpp:40
std::string stringify(int flags)
std::unique_ptr< lambda::CallableOnce< void(ProcessBase *)> > f
Definition: event.hpp:203
bool inject
Definition: event.hpp:254
virtual void visit(const ExitedEvent &)
Definition: event.hpp:44
Definition: event.hpp:60
std::string name
Definition: message.hpp:24
constexpr const char * name
Definition: shell.hpp:43
void visit(EventVisitor *visitor) const override
Definition: event.hpp:243
virtual ~EventConsumer()
Definition: event.hpp:51
MessageEvent(const UPID &from, const UPID &to, std::string &&name, std::string &&data)
Definition: event.hpp:116
void consume(EventConsumer *consumer)&&override
Definition: event.hpp:224
Definition: lambda.hpp:414
TerminateEvent(const UPID &_from, bool _inject)
Definition: event.hpp:235