Apache Mesos
process.hpp
Go to the documentation of this file.
1 // Licensed under the Apache License, Version 2.0 (the "License");
2 // you may not use this file except in compliance with the License.
3 // You may obtain a copy of the License at
4 //
5 // http://www.apache.org/licenses/LICENSE-2.0
6 //
7 // Unless required by applicable law or agreed to in writing, software
8 // distributed under the License is distributed on an "AS IS" BASIS,
9 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
10 // See the License for the specific language governing permissions and
11 // limitations under the License
12 
13 #ifndef __PROCESS_PROCESS_HPP__
14 #define __PROCESS_PROCESS_HPP__
15 
16 #include <stdint.h>
17 
18 #include <memory>
19 #include <map>
20 #include <queue>
21 #include <vector>
22 
23 #include <process/address.hpp>
25 #include <process/clock.hpp>
26 #include <process/event.hpp>
27 #include <process/filter.hpp>
28 #include <process/firewall.hpp>
29 #include <process/http.hpp>
30 #include <process/message.hpp>
31 #include <process/mime.hpp>
32 #include <process/owned.hpp>
33 #include <process/pid.hpp>
34 
35 #include <stout/duration.hpp>
36 #include <stout/hashmap.hpp>
37 #include <stout/lambda.hpp>
38 #include <stout/option.hpp>
39 #include <stout/synchronized.hpp>
40 
41 namespace process {
42 
43 // Forward declaration.
44 class EventQueue;
45 class Gate;
46 class Logging;
47 class Sequence;
48 
49 namespace firewall {
50 
68 void install(std::vector<Owned<FirewallRule>>&& rules);
69 
70 } // namespace firewall {
71 
72 class ProcessBase : public EventConsumer
73 {
74 public:
75  explicit ProcessBase(const std::string& id = "");
76 
77  virtual ~ProcessBase();
78 
79  const UPID& self() const { return pid; }
80 
81 protected:
85  virtual void serve(Event&& event)
86  {
87  std::move(event).consume(this);
88  }
89 
90  // Callbacks used to consume (i.e., handle) a specific event.
91  void consume(MessageEvent&& event) override;
92  void consume(DispatchEvent&& event) override;
93  void consume(HttpEvent&& event) override;
94  void consume(ExitedEvent&& event) override;
95  void consume(TerminateEvent&& event) override;
96 
100  virtual void initialize() {}
101 
108  virtual void finalize() {}
109 
124  virtual void exited(const UPID&) {}
125 
133  virtual void lost(const UPID&) {}
134 
141  void send(
142  const UPID& to,
143  const std::string& name,
144  const char* data = nullptr,
145  size_t length = 0);
146 
147  void send(
148  const UPID& to,
149  std::string&& name);
150 
151  void send(
152  const UPID& to,
153  std::string&& name,
154  std::string&& data);
155 
161  enum class RemoteConnection
162  {
170  REUSE,
171 
180  RECONNECT,
181  };
182 
195  UPID link(
196  const UPID& pid,
198 
210  typedef lambda::function<void(const UPID&, const std::string&)>
212 
216  void install(
217  const std::string& name,
218  const MessageHandler& handler)
219  {
220  handlers.message[name] = handler;
221  }
222 
226  template <typename T>
227  void install(
228  const std::string& name,
229  void (T::*method)(const UPID&, const std::string&))
230  {
231  // Note that we use dynamic_cast here so a process can use
232  // multiple inheritance if it sees so fit (e.g., to implement
233  // multiple callback interfaces).
234  MessageHandler handler =
235  lambda::bind(method, dynamic_cast<T*>(this), lambda::_1, lambda::_2);
236  install(name, handler);
237  }
238 
242  void delegate(const std::string& name, const UPID& pid)
243  {
244  delegates[name] = pid;
245  }
246 
256  typedef lambda::function<Future<http::Response>(const http::Request&)>
258 
259  // Options to control the behavior of a route.
261  {
263  : requestStreaming(false) {}
264 
265  // Set to true if the endpoint supports request streaming.
266  // Default: false.
268  };
269 
276  void route(
277  const std::string& name,
278  const Option<std::string>& help,
279  const HttpRequestHandler& handler,
280  const RouteOptions& options = RouteOptions());
281 
285  template <typename T>
286  void route(
287  const std::string& name,
288  const Option<std::string>& help,
289  Future<http::Response> (T::*method)(const http::Request&),
290  const RouteOptions& options = RouteOptions())
291  {
292  // Note that we use dynamic_cast here so a process can use
293  // multiple inheritance if it sees so fit (e.g., to implement
294  // multiple callback interfaces).
295  HttpRequestHandler handler =
296  lambda::bind(method, dynamic_cast<T*>(this), lambda::_1);
297  route(name, help, handler, options);
298  }
299 
320  typedef lambda::function<Future<http::Response>(
321  const http::Request&,
324 
325  // TODO(arojas): Consider introducing an `authentication::Realm` type.
326  void route(
327  const std::string& name,
328  const std::string& realm,
329  const Option<std::string>& help,
330  const AuthenticatedHttpRequestHandler& handler,
331  const RouteOptions& options = RouteOptions());
332 
336  template <typename T>
337  void route(
338  const std::string& name,
339  const std::string& realm,
340  const Option<std::string>& help,
341  Future<http::Response> (T::*method)(
342  const http::Request&,
344  const RouteOptions& options = RouteOptions())
345  {
346  // Note that we use dynamic_cast here so a process can use
347  // multiple inheritance if it sees so fit (e.g., to implement
348  // multiple callback interfaces).
350  lambda::bind(method, dynamic_cast<T*>(this), lambda::_1, lambda::_2);
351  route(name, realm, help, handler, options);
352  }
353 
368  void provide(
369  const std::string& name,
370  const std::string& path,
371  const std::map<std::string, std::string>& types = mime::types)
372  {
373  // TODO(benh): Check that name is only alphanumeric (i.e., has no
374  // '/') and that path is absolute.
375  Asset asset;
376  asset.path = path;
377  asset.types = types;
378  assets[name] = asset;
379  }
380 
386  template <typename T>
387  size_t eventCount();
388 
389 private:
390  friend class SocketManager;
391  friend class ProcessManager;
392  friend void* schedule(void*);
393 
394  // Process states.
395  //
396  // Transitioning from BLOCKED to READY also requires enqueueing the
397  // process in the run queue otherwise the events will never be
398  // processed!
399  enum class State
400  {
401  BOTTOM, // Uninitialized but events may be enqueued.
402  BLOCKED, // Initialized, no events enqueued.
403  READY, // Initialized, events enqueued.
404  TERMINATING // Initialized, no more events will be enqueued.
405  };
406 
407  std::atomic<State> state = ATOMIC_VAR_INIT(State::BOTTOM);
408 
409  // Flag for indicating that a terminate event has been injected.
410  std::atomic<bool> termination = ATOMIC_VAR_INIT(false);
411 
412  // Enqueue the specified message, request, or function call.
413  void enqueue(Event* event);
414 
415  // Delegates for messages.
416  std::map<std::string, UPID> delegates;
417 
418  // Definition of an HTTP endpoint. The endpoint can be
419  // associated with an authentication realm, in which case:
420  //
421  // (1) `realm` and `authenticatedHandler` will be set.
422  // Libprocess will perform HTTP authentication for
423  // all requests to this endpoint (by default during
424  // HttpEvent consumption). The authentication principal
425  // will be passed to the `authenticatedHandler`.
426  //
427  // Otherwise, if the endpoint is not associated with an
428  // authentication realm:
429  //
430  // (2) Only `handler` will be set, and no authentication
431  // takes place.
432  struct HttpEndpoint
433  {
435 
436  Option<std::string> realm;
437  Option<AuthenticatedHttpRequestHandler> authenticatedHandler;
438  RouteOptions options;
439  };
440 
441  // Handlers for messages and HTTP requests.
442  struct {
445 
446  // Used for delivering HTTP requests in the correct order.
447  // Initialized lazily to avoid ProcessBase requiring
448  // another Process!
450  } handlers;
451 
452  // Definition of a static asset.
453  struct Asset
454  {
455  std::string path;
456  std::map<std::string, std::string> types;
457  };
458 
459  // Continuation for `consume(HttpEvent&&)`.
460  Future<http::Response> _consume(
461  const HttpEndpoint& endpoint,
462  const std::string& name,
463  const Owned<http::Request>& request);
464 
465  // JSON representation of process. MUST be invoked from within the
466  // process itself in order to safely examine events.
467  operator JSON::Object();
468 
469  // Static assets(s) to provide.
470  std::map<std::string, Asset> assets;
471 
472  // Queue of received events. We employ the PIMPL idiom here and use
473  // a pointer so we can hide the implementation of `EventQueue`.
474  std::unique_ptr<EventQueue> events;
475 
476  // NOTE: this is a shared pointer to a _pointer_, hence this is not
477  // responsible for the ProcessBase itself.
478  std::shared_ptr<ProcessBase*> reference;
479 
480  std::shared_ptr<Gate> gate;
481 
482  // Whether or not the runtime should delete this process after it
483  // has terminated. Note that failure to spawn the process will leave
484  // the process unmanaged and thus it may leak!
485  bool manage = false;
486 
487  // Process PID.
488  UPID pid;
489 };
490 
491 
492 template <typename T>
493 class Process : public virtual ProcessBase {
494 public:
495  virtual ~Process() {}
496 
502  PID<T> self() const { return PID<T>(static_cast<const T*>(this)); }
503 
504 protected:
505  // Useful typedefs for dispatch/delay/defer to self()/this.
506  typedef T Self;
507  typedef T This;
508 };
509 
510 
531 bool initialize(
532  const Option<std::string>& delegate = None(),
533  const Option<std::string>& readwriteAuthenticationRealm = None(),
534  const Option<std::string>& readonlyAuthenticationRealm = None());
535 
536 
543 void finalize(bool finalize_wsa = false);
544 
545 
549 std::string absolutePath(const std::string& path);
550 
551 
556 
557 
562 
563 
569 long workers();
570 
571 
579 UPID spawn(ProcessBase* process, bool manage = false);
580 
581 inline UPID spawn(ProcessBase& process, bool manage = false)
582 {
583  return spawn(&process, manage);
584 }
585 
586 template <typename T>
587 PID<T> spawn(T* t, bool manage = false)
588 {
589  // We save the pid before spawn is called because it's possible that
590  // the process has already been deleted after spawn returns (e.g.,
591  // if 'manage' is true).
592  PID<T> pid(t);
593 
594  if (!spawn(static_cast<ProcessBase*>(t), manage)) {
595  return PID<T>();
596  }
597 
598  return pid;
599 }
600 
601 template <typename T>
602 PID<T> spawn(T& t, bool manage = false)
603 {
604  return spawn(&t, manage);
605 }
606 
607 
621 void terminate(const UPID& pid, bool inject = true);
622 void terminate(const ProcessBase& process, bool inject = true);
623 void terminate(const ProcessBase* process, bool inject = true);
624 
625 
634 bool wait(const UPID& pid, const Duration& duration = Seconds(-1));
635 bool wait(const ProcessBase& process, const Duration& duration = Seconds(-1));
636 bool wait(const ProcessBase* process, const Duration& duration = Seconds(-1));
637 
638 
647 void post(const UPID& to,
648  const std::string& name,
649  const char* data = nullptr,
650  size_t length = 0);
651 
652 
653 void post(const UPID& from,
654  const UPID& to,
655  const std::string& name,
656  const char* data = nullptr,
657  size_t length = 0);
658 
659 
663 inline void terminate(const ProcessBase& process, bool inject)
664 {
665  terminate(process.self(), inject);
666 }
667 
668 
672 inline void terminate(const ProcessBase* process, bool inject)
673 {
674  terminate(process->self(), inject);
675 }
676 
677 
681 inline bool wait(const ProcessBase& process, const Duration& duration)
682 {
683  return process::wait(process.self(), duration); // Explicit to disambiguate.
684 }
685 
686 
690 inline bool wait(const ProcessBase* process, const Duration& duration)
691 {
692  return process::wait(process->self(), duration); // Explicit to disambiguate.
693 }
694 
695 
696 // Per thread process pointer.
697 extern thread_local ProcessBase* __process__;
698 
699 // NOTE: Methods in this namespace should only be used in tests to
700 // inject arbitrary events.
701 namespace inject {
702 
709 bool exited(const UPID& from, const UPID& to);
710 
711 } // namespace inject {
712 
713 } // namespace process {
714 
715 #endif // __PROCESS_PROCESS_HPP__
void finalize(bool finalize_wsa=false)
Clean up the library.
lambda::function< void(const UPID &, const std::string &)> MessageHandler
Any function which takes a &quot;from&quot; UPID and a message body as arguments.
Definition: process.hpp:211
Future< Response > request(const Request &request, bool streamedResponse=false)
Asynchronously sends an HTTP request to the process and returns the HTTP response once the entire res...
ProcessBase(const std::string &id="")
If a persistent socket to the target pid does not exist, a new link is created.
RemoteConnection
Describes the behavior of the link call when the target pid points to a remote process.
Definition: process.hpp:161
void provide(const std::string &name, const std::string &path, const std::map< std::string, std::string > &types=mime::types)
Sets up the default HTTP request handler to provide the static asset(s) at the specified absolute pat...
Definition: process.hpp:368
bool initialize(const Option< std::string > &delegate=None(), const Option< std::string > &readwriteAuthenticationRealm=None(), const Option< std::string > &readonlyAuthenticationRealm=None())
Initialize the library.
Definition: event.hpp:142
T Self
Definition: process.hpp:506
Owned< Sequence > httpSequence
Definition: process.hpp:449
Definition: event.hpp:178
Definition: process.hpp:72
void route(const std::string &name, const Option< std::string > &help, const HttpRequestHandler &handler, const RouteOptions &options=RouteOptions())
Sets up a handler for HTTP requests with the specified name.
Definition: socket_manager.hpp:36
UPID spawn(ProcessBase *process, bool manage=false)
Spawn a new process.
Definition: duration.hpp:32
lambda::function< Future< http::Response > const http::Request &, const Option< http::authentication::Principal > &)> AuthenticatedHttpRequestHandler
Any function which takes a process::http::Request and an Option&lt;Principal&gt; and returns a process::htt...
Definition: process.hpp:323
void terminate(const UPID &pid, bool inject=true)
Sends a TerminateEvent to the given process.
virtual ~Process()
Definition: process.hpp:495
RouteOptions()
Definition: process.hpp:262
Definition: event.hpp:238
hashmap< std::string, HttpEndpoint > http
Definition: process.hpp:444
Definition: event.hpp:214
std::map< std::string, std::string > types
Definition: http.hpp:518
Definition: json.hpp:154
PID< Logging > logging()
Return the PID associated with the global logging process.
void send(const UPID &to, const std::string &name, const char *data=nullptr, size_t length=0)
Sends the message to the specified UPID.
thread_local ProcessBase * __process__
void install(std::vector< Owned< FirewallRule >> &&rules)
Install a list of firewall rules which are used to forbid incoming HTTP requests. ...
void post(const UPID &to, const std::string &name, const char *data=nullptr, size_t length=0)
Sends a message with data without a return address.
hashmap< std::string, MessageHandler > message
Definition: process.hpp:443
An &quot;untyped&quot; PID, used to encapsulate the process ID for lower-layer abstractions (eg...
Definition: pid.hpp:39
friend class ProcessManager
Definition: process.hpp:391
Definition: duration.hpp:259
void route(const std::string &name, const Option< std::string > &help, Future< http::Response >(T::*method)(const http::Request &), const RouteOptions &options=RouteOptions())
Sets up a handler for HTTP requests with the specified name.
Definition: process.hpp:286
virtual void initialize()
Invoked when a process gets spawned.
Definition: process.hpp:100
void install(const std::string &name, const MessageHandler &handler)
Sets up a handler for messages with the specified name.
Definition: process.hpp:216
lambda::function< Future< http::Response >const http::Request &)> HttpRequestHandler
Any function which takes a process::http::Request and returns a process::http::Response.
Definition: process.hpp:257
void consume(MessageEvent &&event) override
bool requestStreaming
Definition: process.hpp:267
UPID link(const UPID &pid, const RemoteConnection remote=RemoteConnection::REUSE)
Links with the specified UPID.
If a persistent socket to the target pid does not exist, a new link is created.
virtual void finalize()
Invoked when a process is terminated.
Definition: process.hpp:108
Definition: event.hpp:49
bool wait(const UPID &pid, const Duration &duration=Seconds(-1))
Wait for the process to exit for no more than the specified seconds.
Definition: address.hpp:52
bool exited(const UPID &from, const UPID &to)
Simulates disconnection of the link between &#39;from&#39; and &#39;to&#39; by sending an ExitedEvent to &#39;to&#39;...
virtual void exited(const UPID &)
Invoked when a linked process has exited.
Definition: process.hpp:124
Definition: process.hpp:260
Result< Process > process(pid_t pid)
Definition: freebsd.hpp:30
A &quot;process identifier&quot; used to uniquely identify a process when dispatching messages.
Definition: pid.hpp:279
Definition: none.hpp:27
friend void * schedule(void *)
void route(const std::string &name, const std::string &realm, const Option< std::string > &help, Future< http::Response >(T::*method)(const http::Request &, const Option< http::authentication::Principal > &), const RouteOptions &options=RouteOptions())
Sets up a handler for HTTP requests with the specified name.
Definition: process.hpp:337
std::string absolutePath(const std::string &path)
Get the request absolutePath path with delegate prefix.
network::inet::Address address()
Returns the socket address associated with this instance of the library.
Definition: event.hpp:103
virtual void lost(const UPID &)
Invoked when a linked process can no longer be monitored.
Definition: process.hpp:133
size_t eventCount()
Returns the number of events of the given type currently on the event queue.
void install(const std::string &name, void(T::*method)(const UPID &, const std::string &))
Sets up a handler for messages with the specified name.
Definition: process.hpp:227
Try< Nothing > bind(int_fd s, const Address &address)
Definition: network.hpp:46
Definition: owned.hpp:35
Definition: process.hpp:493
Definition: event.hpp:60
virtual void serve(Event &&event)
Invoked when an event is serviced.
Definition: process.hpp:85
const UPID & self() const
Definition: process.hpp:79
void delegate(const std::string &name, const UPID &pid)
Delegates incoming messages, with the specified name, to the UPID.
Definition: process.hpp:242
constexpr const char * name
Definition: shell.hpp:41
long workers()
Returns the number of worker threads the library has created.
T This
Definition: process.hpp:507