Apache Mesos
process.hpp
Go to the documentation of this file.
1 // Licensed under the Apache License, Version 2.0 (the "License");
2 // you may not use this file except in compliance with the License.
3 // You may obtain a copy of the License at
4 //
5 // http://www.apache.org/licenses/LICENSE-2.0
6 //
7 // Unless required by applicable law or agreed to in writing, software
8 // distributed under the License is distributed on an "AS IS" BASIS,
9 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
10 // See the License for the specific language governing permissions and
11 // limitations under the License
12 
13 #ifndef __PROCESS_PROCESS_HPP__
14 #define __PROCESS_PROCESS_HPP__
15 
16 #include <stdint.h>
17 
18 #include <memory>
19 #include <map>
20 #include <queue>
21 #include <vector>
22 
23 #include <process/address.hpp>
25 #include <process/clock.hpp>
26 #include <process/event.hpp>
27 #include <process/filter.hpp>
28 #include <process/firewall.hpp>
29 #include <process/http.hpp>
30 #include <process/message.hpp>
31 #include <process/mime.hpp>
32 #include <process/owned.hpp>
33 #include <process/pid.hpp>
34 
35 #include <stout/duration.hpp>
36 #include <stout/hashmap.hpp>
37 #include <stout/lambda.hpp>
38 #include <stout/option.hpp>
39 #include <stout/synchronized.hpp>
40 
41 namespace process {
42 
43 // Forward declaration.
44 class EventQueue;
45 class Gate;
46 class Logging;
47 class Sequence;
48 
49 namespace firewall {
50 
68 void install(std::vector<Owned<FirewallRule>>&& rules);
69 
70 } // namespace firewall {
71 
72 class ProcessBase : public EventConsumer
73 {
74 public:
75  explicit ProcessBase(const std::string& id = "");
76 
77  virtual ~ProcessBase();
78 
79  const UPID& self() const { return pid; }
80 
81 protected:
85  virtual void serve(Event&& event)
86  {
87  std::move(event).consume(this);
88  }
89 
90  // Callbacks used to consume (i.e., handle) a specific event.
91  void consume(MessageEvent&& event) override;
92  void consume(DispatchEvent&& event) override;
93  void consume(HttpEvent&& event) override;
94  void consume(ExitedEvent&& event) override;
95  void consume(TerminateEvent&& event) override;
96 
100  virtual void initialize() {}
101 
108  virtual void finalize() {}
109 
124  virtual void exited(const UPID&) {}
125 
133  virtual void lost(const UPID&) {}
134 
141  void send(
142  const UPID& to,
143  const std::string& name,
144  const char* data = nullptr,
145  size_t length = 0);
146 
147  void send(
148  const UPID& to,
149  std::string&& name);
150 
151  void send(
152  const UPID& to,
153  std::string&& name,
154  std::string&& data);
155 
161  enum class RemoteConnection
162  {
170  REUSE,
171 
180  RECONNECT,
181  };
182 
195  UPID link(
196  const UPID& pid,
197  const RemoteConnection remote = RemoteConnection::REUSE);
198 
210  typedef lambda::function<void(const UPID&, const std::string&)>
212 
216  void install(
217  const std::string& name,
218  const MessageHandler& handler)
219  {
220  handlers.message[name] = handler;
221  }
222 
226  template <typename T>
227  void install(
228  const std::string& name,
229  void (T::*method)(const UPID&, const std::string&))
230  {
231  // Note that we use dynamic_cast here so a process can use
232  // multiple inheritance if it sees so fit (e.g., to implement
233  // multiple callback interfaces).
234  MessageHandler handler =
235  lambda::bind(method, dynamic_cast<T*>(this), lambda::_1, lambda::_2);
236  install(name, handler);
237  }
238 
242  void delegate(const std::string& name, const UPID& pid)
243  {
244  delegates[name] = pid;
245  }
246 
256  typedef lambda::function<Future<http::Response>(const http::Request&)>
258 
259  // Options to control the behavior of a route.
261  {
263  : requestStreaming(false) {}
264 
265  // Set to true if the endpoint supports request streaming.
266  // Default: false.
268  };
269 
276  void route(
277  const std::string& name,
278  const Option<std::string>& help,
279  const HttpRequestHandler& handler,
280  const RouteOptions& options = RouteOptions());
281 
285  template <typename T>
286  void route(
287  const std::string& name,
288  const Option<std::string>& help,
289  Future<http::Response> (T::*method)(const http::Request&),
290  const RouteOptions& options = RouteOptions())
291  {
292  // Note that we use dynamic_cast here so a process can use
293  // multiple inheritance if it sees so fit (e.g., to implement
294  // multiple callback interfaces).
295  HttpRequestHandler handler =
296  lambda::bind(method, dynamic_cast<T*>(this), lambda::_1);
297  route(name, help, handler, options);
298  }
299 
320  typedef lambda::function<Future<http::Response>(
321  const http::Request&,
324 
325  // TODO(arojas): Consider introducing an `authentication::Realm` type.
326  // TODO(bevers): Consider changing the type of the second argument to
327  // `const Option<std::string>&` for consistency with the version below.
328  void route(
329  const std::string& name,
330  const std::string& realm,
331  const Option<std::string>& help,
332  const AuthenticatedHttpRequestHandler& handler,
333  const RouteOptions& options = RouteOptions());
334 
339  template<typename T>
340  void route(
341  const std::string& name,
342  const Option<std::string>& realm,
343  const Option<std::string>& help,
344  Future<http::Response>(T::*method)(
345  const http::Request&, const Option<http::authentication::Principal>&),
346  const RouteOptions& options = RouteOptions())
347  {
348  // Note that we use dynamic_cast here so a process can use
349  // multiple inheritance if it sees so fit (e.g., to implement
350  // multiple callback interfaces).
351  if (realm.isSome()) {
353  lambda::bind(method, dynamic_cast<T*>(this), lambda::_1, lambda::_2);
354  route(name, realm.get(), help, handler, options);
355  } else {
356  HttpRequestHandler handler =
357  lambda::bind(method, dynamic_cast<T*>(this), lambda::_1, None());
358  route(name, help, handler, options);
359  }
360  }
361 
376  void provide(
377  const std::string& name,
378  const std::string& path,
379  const std::map<std::string, std::string>& types = mime::types)
380  {
381  // TODO(benh): Check that name is only alphanumeric (i.e., has no
382  // '/') and that path is absolute.
383  Asset asset;
384  asset.path = path;
385  asset.types = types;
386  assets[name] = asset;
387  }
388 
394  template <typename T>
395  size_t eventCount();
396 
397 private:
398  friend class SocketManager;
399  friend class ProcessManager;
400  friend void* schedule(void*);
401 
402  // Process states.
403  //
404  // Transitioning from BLOCKED to READY also requires enqueueing the
405  // process in the run queue otherwise the events will never be
406  // processed!
407  enum class State
408  {
409  BOTTOM, // Uninitialized but events may be enqueued.
410  BLOCKED, // Initialized, no events enqueued.
411  READY, // Initialized, events enqueued.
412  TERMINATING // Initialized, no more events will be enqueued.
413  };
414 
415  std::atomic<State> state = ATOMIC_VAR_INIT(State::BOTTOM);
416 
417  // Flag for indicating that a terminate event has been injected.
418  std::atomic<bool> termination = ATOMIC_VAR_INIT(false);
419 
420  // Enqueue the specified message, request, or function call.
421  void enqueue(Event* event);
422 
423  // Delegates for messages.
424  std::map<std::string, UPID> delegates;
425 
426  // Definition of an HTTP endpoint. The endpoint can be
427  // associated with an authentication realm, in which case:
428  //
429  // (1) `realm` and `authenticatedHandler` will be set.
430  // Libprocess will perform HTTP authentication for
431  // all requests to this endpoint (by default during
432  // HttpEvent consumption). The authentication principal
433  // will be passed to the `authenticatedHandler`.
434  //
435  // Otherwise, if the endpoint is not associated with an
436  // authentication realm:
437  //
438  // (2) Only `handler` will be set, and no authentication
439  // takes place.
440  struct HttpEndpoint
441  {
443 
444  Option<std::string> realm;
445  Option<AuthenticatedHttpRequestHandler> authenticatedHandler;
446  RouteOptions options;
447  };
448 
449  // Handlers for messages and HTTP requests.
450  struct {
453 
454  // Used for delivering HTTP requests in the correct order.
455  // Initialized lazily to avoid ProcessBase requiring
456  // another Process!
458  } handlers;
459 
460  // Definition of a static asset.
461  struct Asset
462  {
463  std::string path;
464  std::map<std::string, std::string> types;
465  };
466 
467  // Continuation for `consume(HttpEvent&&)`.
468  Future<http::Response> _consume(
469  const HttpEndpoint& endpoint,
470  const std::string& name,
472 
473  // JSON representation of process. MUST be invoked from within the
474  // process itself in order to safely examine events.
475  operator JSON::Object();
476 
477  // Static assets(s) to provide.
478  std::map<std::string, Asset> assets;
479 
480  // Queue of received events. We employ the PIMPL idiom here and use
481  // a pointer so we can hide the implementation of `EventQueue`.
482  std::unique_ptr<EventQueue> events;
483 
484  // NOTE: this is a shared pointer to a _pointer_, hence this is not
485  // responsible for the ProcessBase itself.
486  std::shared_ptr<ProcessBase*> reference;
487 
488  std::shared_ptr<Gate> gate;
489 
490  // Whether or not the runtime should delete this process after it
491  // has terminated. Note that failure to spawn the process will leave
492  // the process unmanaged and thus it may leak!
493  bool manage = false;
494 
495  // Process PID.
496  UPID pid;
497 };
498 
499 
500 template <typename T>
501 class Process : public virtual ProcessBase {
502 public:
503  virtual ~Process() {}
504 
510  PID<T> self() const { return PID<T>(static_cast<const T*>(this)); }
511 
512 protected:
513  // Useful typedefs for dispatch/delay/defer to self()/this.
514  typedef T Self;
515  typedef T This;
516 };
517 
518 
539 bool initialize(
540  const Option<std::string>& delegate = None(),
541  const Option<std::string>& readwriteAuthenticationRealm = None(),
542  const Option<std::string>& readonlyAuthenticationRealm = None());
543 
544 
551 void finalize(bool finalize_wsa = false);
552 
553 
557 std::string absolutePath(const std::string& path);
558 
559 
564 
565 
570 
571 
577 long workers();
578 
579 
587 UPID spawn(ProcessBase* process, bool manage = false);
588 
589 inline UPID spawn(ProcessBase& process, bool manage = false)
590 {
591  return spawn(&process, manage);
592 }
593 
594 template <typename T>
595 PID<T> spawn(T* t, bool manage = false)
596 {
597  // We save the pid before spawn is called because it's possible that
598  // the process has already been deleted after spawn returns (e.g.,
599  // if 'manage' is true).
600  PID<T> pid(t);
601 
602  if (!spawn(static_cast<ProcessBase*>(t), manage)) {
603  return PID<T>();
604  }
605 
606  return pid;
607 }
608 
609 template <typename T>
610 PID<T> spawn(T& t, bool manage = false)
611 {
612  return spawn(&t, manage);
613 }
614 
615 
629 void terminate(const UPID& pid, bool inject = true);
630 void terminate(const ProcessBase& process, bool inject = true);
631 void terminate(const ProcessBase* process, bool inject = true);
632 
633 
642 bool wait(const UPID& pid, const Duration& duration = Seconds(-1));
643 bool wait(const ProcessBase& process, const Duration& duration = Seconds(-1));
644 bool wait(const ProcessBase* process, const Duration& duration = Seconds(-1));
645 
646 
655 void post(const UPID& to,
656  const std::string& name,
657  const char* data = nullptr,
658  size_t length = 0);
659 
660 
661 void post(const UPID& from,
662  const UPID& to,
663  const std::string& name,
664  const char* data = nullptr,
665  size_t length = 0);
666 
667 
671 inline void terminate(const ProcessBase& process, bool inject)
672 {
673  terminate(process.self(), inject);
674 }
675 
676 
680 inline void terminate(const ProcessBase* process, bool inject)
681 {
682  terminate(process->self(), inject);
683 }
684 
685 
689 inline bool wait(const ProcessBase& process, const Duration& duration)
690 {
691  return process::wait(process.self(), duration); // Explicit to disambiguate.
692 }
693 
694 
698 inline bool wait(const ProcessBase* process, const Duration& duration)
699 {
700  return process::wait(process->self(), duration); // Explicit to disambiguate.
701 }
702 
703 
704 // Per thread process pointer.
705 extern thread_local ProcessBase* __process__;
706 
707 // NOTE: Methods in this namespace should only be used in tests to
708 // inject arbitrary events.
709 namespace inject {
710 
717 bool exited(const UPID& from, const UPID& to);
718 
719 } // namespace inject {
720 
721 } // namespace process {
722 
723 #endif // __PROCESS_PROCESS_HPP__
Definition: path.hpp:26
void finalize(bool finalize_wsa=false)
Clean up the library.
lambda::function< void(const UPID &, const std::string &)> MessageHandler
Any function which takes a "from" UPID and a message body as arguments.
Definition: process.hpp:211
Future< Response > request(const Request &request, bool streamedResponse=false)
Asynchronously sends an HTTP request to the process and returns the HTTP response once the entire res...
RemoteConnection
Describes the behavior of the link call when the target pid points to a remote process.
Definition: process.hpp:161
void provide(const std::string &name, const std::string &path, const std::map< std::string, std::string > &types=mime::types)
Sets up the default HTTP request handler to provide the static asset(s) at the specified absolute pat...
Definition: process.hpp:376
bool initialize(const Option< std::string > &delegate=None(), const Option< std::string > &readwriteAuthenticationRealm=None(), const Option< std::string > &readonlyAuthenticationRealm=None())
Initialize the library.
Definition: event.hpp:142
T Self
Definition: process.hpp:514
Owned< Sequence > httpSequence
Definition: process.hpp:457
Definition: event.hpp:178
Definition: process.hpp:72
Definition: socket_manager.hpp:36
UPID spawn(ProcessBase *process, bool manage=false)
Spawn a new process.
Definition: duration.hpp:32
lambda::function< Future< http::Response > const http::Request &, const Option< http::authentication::Principal > &)> AuthenticatedHttpRequestHandler
Any function which takes a process::http::Request and an Option<Principal> and returns a process::htt...
Definition: process.hpp:323
void terminate(const UPID &pid, bool inject=true)
Sends a TerminateEvent to the given process.
virtual ~Process()
Definition: process.hpp:503
RouteOptions()
Definition: process.hpp:262
Definition: event.hpp:238
bool isSome() const
Definition: option.hpp:115
hashmap< std::string, HttpEndpoint > http
Definition: process.hpp:452
Definition: event.hpp:214
std::map< std::string, std::string > types
Definition: http.hpp:518
Definition: json.hpp:154
PID< Logging > logging()
Return the PID associated with the global logging process.
ssize_t send(const os::WindowsFD &fd, const void *buf, size_t len, int flags)
Definition: socket.hpp:162
thread_local ProcessBase * __process__
void install(std::vector< Owned< FirewallRule >> &&rules)
Install a list of firewall rules which are used to forbid incoming HTTP requests. ...
void post(const UPID &to, const std::string &name, const char *data=nullptr, size_t length=0)
Sends a message with data without a return address.
hashmap< std::string, MessageHandler > message
Definition: process.hpp:451
An "untyped" PID, used to encapsulate the process ID for lower-layer abstractions (eg...
Definition: pid.hpp:39
Definition: duration.hpp:207
void route(const std::string &name, const Option< std::string > &help, Future< http::Response >(T::*method)(const http::Request &), const RouteOptions &options=RouteOptions())
Sets up a handler for HTTP requests with the specified name.
Definition: process.hpp:286
virtual void initialize()
Invoked when a process gets spawned.
Definition: process.hpp:100
void install(const std::string &name, const MessageHandler &handler)
Sets up a handler for messages with the specified name.
Definition: process.hpp:216
lambda::function< Future< http::Response >const http::Request &)> HttpRequestHandler
Any function which takes a process::http::Request and returns a process::http::Response.
Definition: process.hpp:257
const T & get() const &
Definition: option.hpp:118
bool requestStreaming
Definition: process.hpp:267
virtual void finalize()
Invoked when a process is terminated.
Definition: process.hpp:108
Definition: event.hpp:49
bool wait(const UPID &pid, const Duration &duration=Seconds(-1))
Wait for the process to exit for no more than the specified seconds.
Definition: address.hpp:50
bool exited(const UPID &from, const UPID &to)
Simulates disconnection of the link between &#39;from&#39; and &#39;to&#39; by sending an ExitedEvent to &#39;to&#39;...
virtual void exited(const UPID &)
Invoked when a linked process has exited.
Definition: process.hpp:124
Definition: process.hpp:260
A "process identifier" used to uniquely identify a process when dispatching messages.
Definition: pid.hpp:279
Definition: none.hpp:27
Definition: executor.hpp:47
std::string absolutePath(const std::string &path)
Get the request absolutePath path with delegate prefix.
network::inet::Address address()
Returns the socket address associated with this instance of the library.
Definition: event.hpp:103
virtual void lost(const UPID &)
Invoked when a linked process can no longer be monitored.
Definition: process.hpp:133
void install(const std::string &name, void(T::*method)(const UPID &, const std::string &))
Sets up a handler for messages with the specified name.
Definition: process.hpp:227
Try< Nothing > bind(int_fd s, const Address &address)
Definition: network.hpp:46
void route(const std::string &name, const Option< std::string > &realm, const Option< std::string > &help, Future< http::Response >(T::*method)(const http::Request &, const Option< http::authentication::Principal > &), const RouteOptions &options=RouteOptions())
Forwards to the correct overload of process::ProcessBase::route(), depending on whether the authentic...
Definition: process.hpp:340
Definition: owned.hpp:36
Definition: process.hpp:501
Definition: event.hpp:60
virtual void serve(Event &&event)
Invoked when an event is serviced.
Definition: process.hpp:85
const UPID & self() const
Definition: process.hpp:79
void delegate(const std::string &name, const UPID &pid)
Delegates incoming messages, with the specified name, to the UPID.
Definition: process.hpp:242
constexpr const char * name
Definition: shell.hpp:43
long workers()
Returns the number of worker threads the library has created.
T This
Definition: process.hpp:515
Try< Nothing > schedule(const mesos::maintenance::Schedule &schedule, const hashmap< MachineID, Machine > &machines)
Performs the following checks on the new maintenance schedule: