Apache Mesos
socket_manager.hpp
Go to the documentation of this file.
1 // Licensed under the Apache License, Version 2.0 (the "License");
2 // you may not use this file except in compliance with the License.
3 // You may obtain a copy of the License at
4 //
5 // http://www.apache.org/licenses/LICENSE-2.0
6 //
7 // Unless required by applicable law or agreed to in writing, software
8 // distributed under the License is distributed on an "AS IS" BASIS,
9 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
10 // See the License for the specific language governing permissions and
11 // limitations under the License
12 
13 #ifndef __PROCESS_SOCKET_MANAGER_HPP__
14 #define __PROCESS_SOCKET_MANAGER_HPP__
15 
16 #include <mutex>
17 #include <queue>
18 
19 #include <process/address.hpp>
20 #include <process/future.hpp>
21 #include <process/message.hpp>
22 #include <process/process.hpp>
23 #include <process/socket.hpp>
24 
25 #include <stout/hashmap.hpp>
26 #include <stout/hashset.hpp>
27 
28 #include "encoder.hpp"
29 
30 namespace process {
31 
32 // Forward declaration.
33 class HttpProxy;
34 
35 
37 {
38 public:
39  SocketManager();
41 
42  // Closes all managed sockets and clears any associated metadata.
43  // The `__s__` server socket must be closed and `ProcessManager`
44  // must be finalized before calling this.
45  void finalize();
46 
48 
49  void link(
51  const UPID& to,
52  const ProcessBase::RemoteConnection remote,
55 
56  // Test-only method to fetch the file descriptor behind a
57  // persistent socket.
59 
61 
62  // Used to clean up the pointer to an `HttpProxy` in case the
63  // `HttpProxy` is killed outside the control of the `SocketManager`.
64  // This generally happens when `process::finalize` is called.
65  void unproxy(const network::inet::Socket& socket);
66 
67  void send(
68  Encoder* encoder,
69  bool persist,
70  const network::inet::Socket& socket);
71 
72  void send(
73  const http::Response& response,
74  const http::Request& request,
75  const network::inet::Socket& socket);
76 
77  void send(
78  Message&& message,
81 
82  Encoder* next(int_fd s);
83 
84  void close(int_fd s);
85 
87  void exited(ProcessBase* process);
88 
89 private:
90  // TODO(bmahler): Leverage a bidirectional multimap instead, or
91  // hide the complexity of manipulating 'links' through methods.
92  struct
93  {
94  // For links, we maintain a bidirectional mapping between the
95  // "linkers" (Processes) and the "linkees" (remote / local UPIDs).
96  // For remote socket addresses, we also need a mapping to the
97  // linkees for that socket address, because socket closure only
98  // notifies at the address level.
102  } links;
103 
104  // Switch the underlying socket that a remote end is talking to.
105  // This manipulates the data structures below by swapping all data
106  // mapped to 'from' to being mapped to 'to'. This is useful for
107  // downgrading a socket from SSL to POLL based.
108  void swap_implementing_socket(
109  const network::inet::Socket& from,
110  const network::inet::Socket& to);
111 
112  // Helper function for link().
113  void link_connect(
114  const Future<Nothing>& future,
115  network::inet::Socket socket,
116  const UPID& to);
117 
118  // Helper function for send().
119  void send_connect(
120  const Future<Nothing>& future,
121  network::inet::Socket socket,
122  Message&& message);
123 
124  // Collection of all active sockets (both inbound and outbound).
126 
127  // Collection of sockets that should be disposed when they are
128  // finished being used (e.g., when there is no more data to send on
129  // them). Can contain both inbound and outbound sockets.
130  hashset<int_fd> dispose;
131 
132  // Map from socket to socket address for outbound sockets.
134 
135  // Map from socket address to temporary sockets (outbound sockets
136  // that will be closed once there is no more data to send on them).
138 
139  // Map from socket address (ip, port) to persistent sockets
140  // (outbound sockets that will remain open even if there is no more
141  // data to send on them). We distinguish these from the 'temps'
142  // collection so we can tell when a persistent socket has been lost
143  // (and thus generate ExitedEvents).
145 
146  // Map from outbound socket to outgoing queue.
148 
149  // HTTP proxies.
151 
152  // Protects instance variables.
153  std::recursive_mutex mutex;
154 };
155 
156 
157 // Global instance of the socket manager.
159 
160 } // namespace process {
161 
162 #endif // __PROCESS_SOCKET_MANAGER_HPP__
void exited(const network::inet::Address &address)
Future< Response > request(const Request &request, bool streamedResponse=false)
Asynchronously sends an HTTP request to the process and returns the HTTP response once the entire res...
RemoteConnection
Describes the behavior of the link call when the target pid points to a remote process.
Definition: process.hpp:161
Definition: message.hpp:22
void send(Encoder *encoder, bool persist, const network::inet::Socket &socket)
Kind
Available kinds of implementations.
Definition: socket.hpp:67
static Kind DEFAULT_KIND()
Returns the default Kind of implementation.
Definition: process.hpp:72
Definition: socket_manager.hpp:36
void accepted(const network::inet::Socket &socket)
Definition: http.hpp:533
Definition: hashmap.hpp:38
const char * kind()
void link(ProcessBase *process, const UPID &to, const ProcessBase::RemoteConnection remote, const network::internal::SocketImpl::Kind &kind=network::internal::SocketImpl::DEFAULT_KIND())
An "untyped" PID, used to encapsulate the process ID for lower-layer abstractions (eg...
Definition: pid.hpp:39
Encoder * next(int_fd s)
hashmap< ProcessBase *, hashset< UPID > > linkees
Definition: socket_manager.hpp:100
Definition: encoder.hpp:38
Option< int_fd > get_persistent_socket(const UPID &to)
hashmap< network::inet::Address, hashset< UPID > > remotes
Definition: socket_manager.hpp:101
hashmap< UPID, hashset< ProcessBase * > > linkers
Definition: socket_manager.hpp:99
PID< HttpProxy > proxy(const network::inet::Socket &socket)
Definition: address.hpp:52
A "process identifier" used to uniquely identify a process when dispatching messages.
Definition: pid.hpp:289
Definition: executor.hpp:48
Definition: http.hpp:612
void close(int_fd s)
network::inet::Address address()
Returns the socket address associated with this instance of the library.
void unproxy(const network::inet::Socket &socket)
Try< Netlink< struct nl_sock > > socket(int protocol=NETLINK_ROUTE)
Definition: internal.hpp:91
SocketManager * socket_manager
int int_fd
Definition: int_fd.hpp:35