Apache Mesos
process.hpp
Go to the documentation of this file.
1 // Licensed under the Apache License, Version 2.0 (the "License");
2 // you may not use this file except in compliance with the License.
3 // You may obtain a copy of the License at
4 //
5 // http://www.apache.org/licenses/LICENSE-2.0
6 //
7 // Unless required by applicable law or agreed to in writing, software
8 // distributed under the License is distributed on an "AS IS" BASIS,
9 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
10 // See the License for the specific language governing permissions and
11 // limitations under the License
12 
13 #ifndef __PROCESS_PROCESS_HPP__
14 #define __PROCESS_PROCESS_HPP__
15 
16 #include <stdint.h>
17 
18 #include <memory>
19 #include <map>
20 #include <queue>
21 #include <vector>
22 
23 #include <process/address.hpp>
25 #include <process/clock.hpp>
26 #include <process/event.hpp>
27 #include <process/filter.hpp>
28 #include <process/firewall.hpp>
29 #include <process/http.hpp>
30 #include <process/message.hpp>
31 #include <process/mime.hpp>
32 #include <process/owned.hpp>
33 #include <process/pid.hpp>
34 
35 #include <stout/duration.hpp>
36 #include <stout/hashmap.hpp>
37 #include <stout/lambda.hpp>
38 #include <stout/option.hpp>
39 #include <stout/synchronized.hpp>
40 
41 namespace process {
42 
43 // Forward declaration.
44 class EventQueue;
45 class Gate;
46 class Logging;
47 class Sequence;
48 
49 namespace firewall {
50 
68 void install(std::vector<Owned<FirewallRule>>&& rules);
69 
70 } // namespace firewall {
71 
72 class ProcessBase : public EventConsumer
73 {
74 public:
75  explicit ProcessBase(const std::string& id = "");
76 
77  ~ProcessBase() override;
78 
79  const UPID& self() const { return pid; }
80 
81 protected:
85  virtual void serve(Event&& event)
86  {
87  std::move(event).consume(this);
88  }
89 
90  // Callbacks used to consume (i.e., handle) a specific event.
91  void consume(MessageEvent&& event) override;
92  void consume(DispatchEvent&& event) override;
93  void consume(HttpEvent&& event) override;
94  void consume(ExitedEvent&& event) override;
95  void consume(TerminateEvent&& event) override;
96 
100  virtual void initialize() {}
101 
108  virtual void finalize() {}
109 
124  virtual void exited(const UPID&) {}
125 
133  virtual void lost(const UPID&) {}
134 
141  void send(
142  const UPID& to,
143  const std::string& name,
144  const char* data = nullptr,
145  size_t length = 0);
146 
147  void send(
148  const UPID& to,
149  std::string&& name);
150 
151  void send(
152  const UPID& to,
153  std::string&& name,
154  std::string&& data);
155 
161  enum class RemoteConnection
162  {
170  REUSE,
171 
180  RECONNECT,
181  };
182 
195  UPID link(
196  const UPID& pid,
197  const RemoteConnection remote = RemoteConnection::REUSE);
198 
210  typedef lambda::function<void(const UPID&, const std::string&)>
212 
216  void install(
217  const std::string& name,
218  const MessageHandler& handler)
219  {
220  handlers.message[name] = handler;
221  }
222 
226  template <typename T>
227  void install(
228  const std::string& name,
229  void (T::*method)(const UPID&, const std::string&))
230  {
231  // Note that we use dynamic_cast here so a process can use
232  // multiple inheritance if it sees so fit (e.g., to implement
233  // multiple callback interfaces).
234  MessageHandler handler =
235  lambda::bind(method, dynamic_cast<T*>(this), lambda::_1, lambda::_2);
236  install(name, handler);
237  }
238 
242  void delegate(const std::string& name, const UPID& pid)
243  {
244  delegates[name] = pid;
245  }
246 
256  typedef lambda::function<Future<http::Response>(const http::Request&)>
258 
259  // Options to control the behavior of a route.
261  {
263  : requestStreaming(false) {}
264 
265  // Set to true if the endpoint supports request streaming.
266  // Default: false.
268  };
269 
276  void route(
277  const std::string& name,
278  const Option<std::string>& help,
279  const HttpRequestHandler& handler,
280  const RouteOptions& options = RouteOptions());
281 
285  template <typename T>
286  void route(
287  const std::string& name,
288  const Option<std::string>& help,
289  Future<http::Response> (T::*method)(const http::Request&),
290  const RouteOptions& options = RouteOptions())
291  {
292  // Note that we use dynamic_cast here so a process can use
293  // multiple inheritance if it sees so fit (e.g., to implement
294  // multiple callback interfaces).
295  HttpRequestHandler handler =
296  lambda::bind(method, dynamic_cast<T*>(this), lambda::_1);
297  route(name, help, handler, options);
298  }
299 
320  typedef lambda::function<Future<http::Response>(
321  const http::Request&,
324 
325  // TODO(arojas): Consider introducing an `authentication::Realm` type.
326  // TODO(bevers): Consider changing the type of the second argument to
327  // `const Option<std::string>&` for consistency with the version below.
328  void route(
329  const std::string& name,
330  const std::string& realm,
331  const Option<std::string>& help,
332  const AuthenticatedHttpRequestHandler& handler,
333  const RouteOptions& options = RouteOptions());
334 
339  template<typename T>
340  void route(
341  const std::string& name,
342  const Option<std::string>& realm,
343  const Option<std::string>& help,
344  Future<http::Response>(T::*method)(
345  const http::Request&, const Option<http::authentication::Principal>&),
346  const RouteOptions& options = RouteOptions())
347  {
348  // Note that we use dynamic_cast here so a process can use
349  // multiple inheritance if it sees so fit (e.g., to implement
350  // multiple callback interfaces).
351  if (realm.isSome()) {
353  lambda::bind(method, dynamic_cast<T*>(this), lambda::_1, lambda::_2);
354  route(name, realm.get(), help, handler, options);
355  } else {
356  HttpRequestHandler handler =
357  lambda::bind(method, dynamic_cast<T*>(this), lambda::_1, None());
358  route(name, help, handler, options);
359  }
360  }
361 
376  void provide(
377  const std::string& name,
378  const std::string& path,
379  const std::map<std::string, std::string>& types = mime::types)
380  {
381  // TODO(benh): Check that name is only alphanumeric (i.e., has no
382  // '/') and that path is absolute.
383  Asset asset;
384  asset.path = path;
385  asset.types = types;
386  assets[name] = asset;
387  }
388 
394  template <typename T>
395  size_t eventCount();
396 
397 private:
398  friend class SocketManager;
399  friend class ProcessManager;
400  friend void* schedule(void*);
401 
402  // Process states.
403  //
404  // Transitioning from BLOCKED to READY also requires enqueueing the
405  // process in the run queue otherwise the events will never be
406  // processed!
407  enum class State
408  {
409  BOTTOM, // Uninitialized but events may be enqueued.
410  BLOCKED, // Initialized, no events enqueued.
411  READY, // Initialized, events enqueued.
412  TERMINATING // Initialized, no more events will be enqueued.
413  };
414 
415  std::atomic<State> state = ATOMIC_VAR_INIT(State::BOTTOM);
416 
417  // Flag for indicating that a terminate event has been injected.
418  std::atomic<bool> termination = ATOMIC_VAR_INIT(false);
419 
420  // Enqueue the specified message, request, or function call.
421  // Returns false if not enqueued (i.e. the process is terminating).
422  // In this case the caller retains ownership of the event.
423  // Should not be called directly, callers should go through
424  // `ProcessManager::deliver(...)`.
425  bool enqueue(Event* event);
426 
427  // Delegates for messages.
428  std::map<std::string, UPID> delegates;
429 
430  // Definition of an HTTP endpoint. The endpoint can be
431  // associated with an authentication realm, in which case:
432  //
433  // (1) `realm` and `authenticatedHandler` will be set.
434  // Libprocess will perform HTTP authentication for
435  // all requests to this endpoint (by default during
436  // HttpEvent consumption). The authentication principal
437  // will be passed to the `authenticatedHandler`.
438  //
439  // Otherwise, if the endpoint is not associated with an
440  // authentication realm:
441  //
442  // (2) Only `handler` will be set, and no authentication
443  // takes place.
444  struct HttpEndpoint
445  {
447 
448  Option<std::string> realm;
449  Option<AuthenticatedHttpRequestHandler> authenticatedHandler;
450  RouteOptions options;
451  };
452 
453  // Handlers for messages and HTTP requests.
454  struct {
457 
458  // Used for delivering HTTP requests in the correct order.
459  // Initialized lazily to avoid ProcessBase requiring
460  // another Process!
462  } handlers;
463 
464  // Definition of a static asset.
465  struct Asset
466  {
467  std::string path;
468  std::map<std::string, std::string> types;
469  };
470 
471  // Continuation for `consume(HttpEvent&&)`.
472  Future<http::Response> _consume(
473  const HttpEndpoint& endpoint,
474  const std::string& name,
476 
477  // JSON representation of process. MUST be invoked from within the
478  // process itself in order to safely examine events.
479  operator JSON::Object();
480 
481  // Static assets(s) to provide.
482  std::map<std::string, Asset> assets;
483 
484  // Queue of received events. We employ the PIMPL idiom here and use
485  // a pointer so we can hide the implementation of `EventQueue`.
486  std::unique_ptr<EventQueue> events;
487 
488  // NOTE: this is a shared pointer to a _pointer_, hence this is not
489  // responsible for the ProcessBase itself.
490  std::shared_ptr<ProcessBase*> reference;
491 
492  std::shared_ptr<Gate> gate;
493 
494  // Whether or not the runtime should delete this process after it
495  // has terminated. Note that failure to spawn the process will leave
496  // the process unmanaged and thus it may leak!
497  bool manage = false;
498 
499  // Process PID.
500  UPID pid;
501 };
502 
503 
504 template <typename T>
505 class Process : public virtual ProcessBase {
506 public:
507  ~Process() override {}
508 
514  PID<T> self() const { return PID<T>(static_cast<const T*>(this)); }
515 
516 protected:
517  // Useful typedefs for dispatch/delay/defer to self()/this.
518  typedef T Self;
519  typedef T This;
520 };
521 
522 
543 bool initialize(
544  const Option<std::string>& delegate = None(),
545  const Option<std::string>& readwriteAuthenticationRealm = None(),
546  const Option<std::string>& readonlyAuthenticationRealm = None());
547 
548 
555 void finalize(bool finalize_wsa = false);
556 
557 
561 std::string absolutePath(const std::string& path);
562 
563 
568 
569 
574 
575 
581 long workers();
582 
583 
591 UPID spawn(ProcessBase* process, bool manage = false);
592 
593 inline UPID spawn(ProcessBase& process, bool manage = false)
594 {
595  return spawn(&process, manage);
596 }
597 
598 template <typename T>
599 PID<T> spawn(T* t, bool manage = false)
600 {
601  // We save the pid before spawn is called because it's possible that
602  // the process has already been deleted after spawn returns (e.g.,
603  // if 'manage' is true).
604  PID<T> pid(t);
605 
606  if (!spawn(static_cast<ProcessBase*>(t), manage)) {
607  return PID<T>();
608  }
609 
610  return pid;
611 }
612 
613 template <typename T>
614 PID<T> spawn(T& t, bool manage = false)
615 {
616  return spawn(&t, manage);
617 }
618 
619 
633 void terminate(const UPID& pid, bool inject = true);
634 void terminate(const ProcessBase& process, bool inject = true);
635 void terminate(const ProcessBase* process, bool inject = true);
636 
637 
646 bool wait(const UPID& pid, const Duration& duration = Seconds(-1));
647 bool wait(const ProcessBase& process, const Duration& duration = Seconds(-1));
648 bool wait(const ProcessBase* process, const Duration& duration = Seconds(-1));
649 
650 
659 void post(const UPID& to,
660  const std::string& name,
661  const char* data = nullptr,
662  size_t length = 0);
663 
664 
665 void post(const UPID& from,
666  const UPID& to,
667  const std::string& name,
668  const char* data = nullptr,
669  size_t length = 0);
670 
671 
675 inline void terminate(const ProcessBase& process, bool inject)
676 {
677  terminate(process.self(), inject);
678 }
679 
680 
684 inline void terminate(const ProcessBase* process, bool inject)
685 {
686  terminate(process->self(), inject);
687 }
688 
689 
693 inline bool wait(const ProcessBase& process, const Duration& duration)
694 {
695  return process::wait(process.self(), duration); // Explicit to disambiguate.
696 }
697 
698 
702 inline bool wait(const ProcessBase* process, const Duration& duration)
703 {
704  return process::wait(process->self(), duration); // Explicit to disambiguate.
705 }
706 
707 
708 // Per thread process pointer.
709 extern thread_local ProcessBase* __process__;
710 
711 // NOTE: Methods in this namespace should only be used in tests to
712 // inject arbitrary events.
713 namespace inject {
714 
721 bool exited(const UPID& from, const UPID& to);
722 
723 } // namespace inject {
724 
725 } // namespace process {
726 
727 #endif // __PROCESS_PROCESS_HPP__
Definition: path.hpp:29
void finalize(bool finalize_wsa=false)
Clean up the library.
lambda::function< void(const UPID &, const std::string &)> MessageHandler
Any function which takes a "from" UPID and a message body as arguments.
Definition: process.hpp:211
Future< Response > request(const Request &request, bool streamedResponse=false)
Asynchronously sends an HTTP request to the process and returns the HTTP response once the entire res...
RemoteConnection
Describes the behavior of the link call when the target pid points to a remote process.
Definition: process.hpp:161
void provide(const std::string &name, const std::string &path, const std::map< std::string, std::string > &types=mime::types)
Sets up the default HTTP request handler to provide the static asset(s) at the specified absolute pat...
Definition: process.hpp:376
bool initialize(const Option< std::string > &delegate=None(), const Option< std::string > &readwriteAuthenticationRealm=None(), const Option< std::string > &readonlyAuthenticationRealm=None())
Initialize the library.
Definition: event.hpp:142
T Self
Definition: process.hpp:518
Owned< Sequence > httpSequence
Definition: process.hpp:461
Definition: event.hpp:178
Definition: process.hpp:72
Definition: socket_manager.hpp:36
UPID spawn(ProcessBase *process, bool manage=false)
Spawn a new process.
Definition: duration.hpp:32
lambda::function< Future< http::Response > const http::Request &, const Option< http::authentication::Principal > &)> AuthenticatedHttpRequestHandler
Any function which takes a process::http::Request and an Option<Principal> and returns a process::htt...
Definition: process.hpp:323
void terminate(const UPID &pid, bool inject=true)
Sends a TerminateEvent to the given process.
RouteOptions()
Definition: process.hpp:262
Definition: event.hpp:233
bool isSome() const
Definition: option.hpp:116
hashmap< std::string, HttpEndpoint > http
Definition: process.hpp:456
Definition: event.hpp:209
std::map< std::string, std::string > types
Definition: http.hpp:533
~Process() override
Definition: process.hpp:507
Definition: json.hpp:158
PID< Logging > logging()
Return the PID associated with the global logging process.
thread_local ProcessBase * __process__
void install(std::vector< Owned< FirewallRule >> &&rules)
Install a list of firewall rules which are used to forbid incoming HTTP requests. ...
void post(const UPID &to, const std::string &name, const char *data=nullptr, size_t length=0)
Sends a message with data without a return address.
hashmap< std::string, MessageHandler > message
Definition: process.hpp:455
An "untyped" PID, used to encapsulate the process ID for lower-layer abstractions (eg...
Definition: pid.hpp:39
Definition: duration.hpp:207
void route(const std::string &name, const Option< std::string > &help, Future< http::Response >(T::*method)(const http::Request &), const RouteOptions &options=RouteOptions())
Sets up a handler for HTTP requests with the specified name.
Definition: process.hpp:286
virtual void initialize()
Invoked when a process gets spawned.
Definition: process.hpp:100
void install(const std::string &name, const MessageHandler &handler)
Sets up a handler for messages with the specified name.
Definition: process.hpp:216
lambda::function< Future< http::Response >const http::Request &)> HttpRequestHandler
Any function which takes a process::http::Request and returns a process::http::Response.
Definition: process.hpp:257
const T & get() const &
Definition: option.hpp:119
bool requestStreaming
Definition: process.hpp:267
virtual void finalize()
Invoked when a process is terminated.
Definition: process.hpp:108
Definition: event.hpp:49
bool wait(const UPID &pid, const Duration &duration=Seconds(-1))
Wait for the process to exit for no more than the specified seconds.
Definition: address.hpp:52
bool exited(const UPID &from, const UPID &to)
Simulates disconnection of the link between &#39;from&#39; and &#39;to&#39; by sending an ExitedEvent to &#39;to&#39;...
virtual void exited(const UPID &)
Invoked when a linked process has exited.
Definition: process.hpp:124
Definition: process.hpp:260
A "process identifier" used to uniquely identify a process when dispatching messages.
Definition: pid.hpp:289
Definition: none.hpp:27
Definition: executor.hpp:48
std::string absolutePath(const std::string &path)
Get the request absolutePath path with delegate prefix.
network::inet::Address address()
Returns the socket address associated with this instance of the library.
Definition: event.hpp:103
virtual void lost(const UPID &)
Invoked when a linked process can no longer be monitored.
Definition: process.hpp:133
void install(const std::string &name, void(T::*method)(const UPID &, const std::string &))
Sets up a handler for messages with the specified name.
Definition: process.hpp:227
Try< Nothing > bind(int_fd s, const Address &address)
Definition: network.hpp:46
void route(const std::string &name, const Option< std::string > &realm, const Option< std::string > &help, Future< http::Response >(T::*method)(const http::Request &, const Option< http::authentication::Principal > &), const RouteOptions &options=RouteOptions())
Forwards to the correct overload of process::ProcessBase::route(), depending on whether the authentic...
Definition: process.hpp:340
Definition: owned.hpp:36
Definition: process.hpp:505
Definition: event.hpp:60
virtual void serve(Event &&event)
Invoked when an event is serviced.
Definition: process.hpp:85
const UPID & self() const
Definition: process.hpp:79
void delegate(const std::string &name, const UPID &pid)
Delegates incoming messages, with the specified name, to the UPID.
Definition: process.hpp:242
constexpr const char * name
Definition: shell.hpp:41
long workers()
Returns the number of worker threads the library has created.
T This
Definition: process.hpp:519
Try< Nothing > schedule(const mesos::maintenance::Schedule &schedule, const hashmap< MachineID, Machine > &machines)
Performs the following checks on the new maintenance schedule:
Future< size_t > send(const int_fd &fd, const void *buf, size_t size)