Apache Mesos
socket.hpp
Go to the documentation of this file.
1 // Licensed under the Apache License, Version 2.0 (the "License");
2 // you may not use this file except in compliance with the License.
3 // You may obtain a copy of the License at
4 //
5 // http://www.apache.org/licenses/LICENSE-2.0
6 //
7 // Unless required by applicable law or agreed to in writing, software
8 // distributed under the License is distributed on an "AS IS" BASIS,
9 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
10 // See the License for the specific language governing permissions and
11 // limitations under the License
12 
13 #ifndef __PROCESS_SOCKET_HPP__
14 #define __PROCESS_SOCKET_HPP__
15 
16 #ifndef __WINDOWS__
17 #include <sys/socket.h>
18 #include <sys/wait.h>
19 #endif // __WINDOWS__
20 
21 #include <memory>
22 
23 #include <process/address.hpp>
24 #include <process/future.hpp>
25 
26 #include <stout/abort.hpp>
27 #include <stout/nothing.hpp>
28 #include <stout/try.hpp>
29 #include <stout/unreachable.hpp>
30 #ifdef __WINDOWS__
31 #include <stout/windows.hpp>
32 #endif // __WINDOWS__
33 
34 #include <stout/os/int_fd.hpp>
35 
36 
37 namespace process {
38 namespace network {
39 namespace internal {
40 
55 class SocketImpl : public std::enable_shared_from_this<SocketImpl>
56 {
57 public:
64  enum class Kind
65  {
66  POLL,
67 #ifdef USE_SSL_SOCKET
68  SSL
69 #endif
70  };
71 
75  static Kind DEFAULT_KIND();
76 
87  int_fd s,
88  Kind kind = DEFAULT_KIND());
89 
99  // TODO(josephw): MESOS-5729: Consider making the CLOEXEC option
100  // configurable by the caller of the interface.
102  Address::Family family,
103  Kind kind = DEFAULT_KIND());
104 
105  virtual ~SocketImpl()
106  {
107  // Don't close if the socket was released.
108  if (s >= 0) {
109  CHECK_SOME(os::close(s)) << "Failed to close socket";
110  }
111  }
112 
116  int_fd get() const
117  {
118  return s;
119  }
120 
124  Try<Address> address() const;
125 
129  Try<Address> peer() const;
130 
138 
139  virtual Try<Nothing> listen(int backlog) = 0;
140 
150 
151  virtual Future<Nothing> connect(const Address& address) = 0;
152  virtual Future<size_t> recv(char* data, size_t size) = 0;
153  virtual Future<size_t> send(const char* data, size_t size) = 0;
154  virtual Future<size_t> sendfile(int_fd fd, off_t offset, size_t size) = 0;
155 
170  // TODO(benh): Consider returning Owned<std::string> or
171  // Shared<std::string>, the latter enabling reuse of a pool of
172  // preallocated strings/buffers.
173  virtual Future<std::string> recv(const Option<ssize_t>& size = None());
174 
182  // TODO(benh): Consider taking Shared<std::string>, the latter
183  // enabling reuse of a pool of preallocated strings/buffers.
184  virtual Future<Nothing> send(const std::string& data);
185 
190  virtual Try<Nothing> shutdown(int how)
191  {
192  if (::shutdown(s, how) < 0) {
193  return ErrnoError();
194  }
195 
196  return Nothing();
197  }
198 
199  virtual Kind kind() const = 0;
200 
201 protected:
202  explicit SocketImpl(int_fd _s) : s(_s) { CHECK(s >= 0); }
203 
211  {
212  int_fd released = s;
213  s = -1;
214  return released;
215  }
216 
220  template <typename T>
221  static std::shared_ptr<T> shared(T* t)
222  {
223  std::shared_ptr<T> pointer =
224  std::dynamic_pointer_cast<T>(CHECK_NOTNULL(t)->shared_from_this());
225  CHECK(pointer);
226  return pointer;
227  }
228 
230 };
231 
232 
240 template <typename AddressType>
241 class Socket
242 {
243 public:
244  static_assert(
245  std::is_convertible<AddressType, network::Address>::value,
246  "Requires type convertible to `network::Address`");
247 
258  int_fd s,
260  {
262  if (impl.isError()) {
263  return Error(impl.error());
264  }
265  return Socket(impl.get());
266  }
267 
278  // TODO(josephw): MESOS-5729: Consider making the CLOEXEC option
279  // configurable by the caller of the interface.
281 
296  static Try<Socket> create(
297  Address::Family family,
299 
307  {
308  return impl->kind();
309  }
310 
311  bool operator==(const Socket& that) const
312  {
313  return impl == that.impl;
314  }
315 
316  operator int_fd() const
317  {
318  return impl->get();
319  }
320 
322  {
323  return convert<AddressType>(impl->address());
324  }
325 
327  {
328  return convert<AddressType>(impl->peer());
329  }
330 
331  int_fd get() const
332  {
333  return impl->get();
334  }
335 
336  Try<AddressType> bind(const AddressType& address)
337  {
338  return convert<AddressType>(impl->bind(address));
339  }
340 
341  Try<Nothing> listen(int backlog)
342  {
343  return impl->listen(backlog);
344  }
345 
347  {
348  // NOTE: We save a reference to the listening socket itself
349  // (i.e., 'this') so that we don't close the listening socket
350  // while 'accept' is in flight.
351  std::shared_ptr<SocketImpl> self = impl->shared_from_this();
352 
353  return impl->accept()
354  // TODO(benh): Use && for `accepted` here!
355  .then([self](const std::shared_ptr<SocketImpl>& accepted) {
356  return Socket(accepted);
357  });
358  }
359 
360  Future<Nothing> connect(const AddressType& address)
361  {
362  return impl->connect(address);
363  }
364 
365  Future<size_t> recv(char* data, size_t size) const
366  {
367  return impl->recv(data, size);
368  }
369 
370  Future<size_t> send(const char* data, size_t size) const
371  {
372  return impl->send(data, size);
373  }
374 
375  Future<size_t> sendfile(int_fd fd, off_t offset, size_t size) const
376  {
377  return impl->sendfile(fd, offset, size);
378  }
379 
381  {
382  return impl->recv(size);
383  }
384 
385  Future<Nothing> send(const std::string& data)
386  {
387  return impl->send(data);
388  }
389 
390  enum class Shutdown
391  {
392  READ,
393  WRITE,
394  READ_WRITE
395  };
396 
397  // TODO(benh): Replace the default to Shutdown::READ_WRITE or remove
398  // all together since it's unclear what the default should be.
400  {
401  int how = [&]() {
402  switch (shutdown) {
403  case Shutdown::READ: return SHUT_RD;
404  case Shutdown::WRITE: return SHUT_WR;
405  case Shutdown::READ_WRITE: return SHUT_RDWR;
406  }
407  UNREACHABLE();
408  }();
409 
410  return impl->shutdown(how);
411  }
412 
413  // Support implicit conversion of any `Socket<AddressType>` to a
414  // `Socket<network::Address>`.
415  operator Socket<network::Address>() const
416  {
417  return Socket<network::Address>(impl);
418  }
419 
420 private:
421  // Necessary to support the implicit conversion operator from any
422  // `Socket<AddressType>` to `Socket<network::Address>`.
423  template <typename T>
424  friend class Socket;
425 
426  explicit Socket(std::shared_ptr<SocketImpl>&& that) : impl(std::move(that)) {}
427 
428  explicit Socket(const std::shared_ptr<SocketImpl>& that) : impl(that) {}
429 
430  std::shared_ptr<SocketImpl> impl;
431 };
432 
433 } // namespace internal {
434 
435 
437 
438 namespace inet {
440 } // namespace inet {
441 
442 #ifndef __WINDOWS__
443 namespace unix {
445 } // namespace unix {
446 #endif // __WINDOWS__
447 
448 
449 namespace internal {
450 
451 template <>
453  SocketImpl::Kind kind) = delete;
454 
455 
456 template <>
458  Address::Family family,
459  SocketImpl::Kind kind)
460 {
462  if (impl.isError()) {
463  return Error(impl.error());
464  }
465  return Socket(impl.get());
466 }
467 
468 
469 template <>
471  SocketImpl::Kind kind)
472 {
473  // TODO(benh): Replace this function which defaults to IPv4 in
474  // exchange for explicit IPv4 and IPv6 versions.
476  SocketImpl::create(Address::Family::INET4, kind);
477  if (impl.isError()) {
478  return Error(impl.error());
479  }
480  return Socket(impl.get());
481 }
482 
483 
484 template <>
486  Address::Family family,
487  SocketImpl::Kind kind) = delete;
488 
489 
490 #ifndef __WINDOWS__
491 template <>
493  SocketImpl::Kind kind)
494 {
496  SocketImpl::create(Address::Family::UNIX, kind);
497  if (impl.isError()) {
498  return Error(impl.error());
499  }
500  return Socket(impl.get());
501 }
502 
503 
504 template <>
506  Address::Family family,
507  SocketImpl::Kind kind) = delete;
508 #endif // __WINDOWS__
509 
510 } // namespace internal {
511 } // namespace network {
512 } // namespace process {
513 
514 #endif // __PROCESS_SOCKET_HPP__
network::internal::Socket< network::Address > Socket
Definition: socket.hpp:436
const short READ
A possible event while polling.
Definition: io.hpp:34
Future< size_t > sendfile(int_fd fd, off_t offset, size_t size) const
Definition: socket.hpp:375
Future< std::string > recv(const Option< ssize_t > &size=None())
Definition: socket.hpp:380
Definition: nothing.hpp:16
virtual Future< std::shared_ptr< SocketImpl > > accept()=0
Returns an implementation corresponding to the next pending connection for the listening socket...
Definition: errorbase.hpp:35
Definition: option.hpp:28
Try< Bytes > size(const std::string &path, const FollowSymlink follow=FollowSymlink::FOLLOW_SYMLINK)
Definition: stat.hpp:100
int_fd release()
Releases ownership of the file descriptor.
Definition: socket.hpp:210
virtual Future< size_t > sendfile(int_fd fd, off_t offset, size_t size)=0
Try< Address > peer() const
Returns the peer&#39;s Address for the accepted or connected socket.
Future< size_t > recv(char *data, size_t size) const
Definition: socket.hpp:365
Definition: try.hpp:34
virtual Try< Nothing > shutdown(int how)
Shuts down the socket.
Definition: socket.hpp:190
int_fd s
Definition: socket.hpp:229
static std::shared_ptr< T > shared(T *t)
Returns a std::shared_ptr&lt;T&gt; from this implementation.
Definition: socket.hpp:221
Kind
Available kinds of implementations.
Definition: socket.hpp:64
static Try< Socket > create(int_fd s, SocketImpl::Kind kind=SocketImpl::DEFAULT_KIND())
Returns an instance of a Socket using the specified kind of implementation.
Definition: socket.hpp:257
virtual Future< size_t > send(const char *data, size_t size)=0
Definition: address.hpp:277
static Kind DEFAULT_KIND()
Returns the default Kind of implementation.
Definition: errorbase.hpp:49
constexpr int SHUT_RDWR
Definition: windows.hpp:199
Try< Nothing > listen(int backlog)
Definition: socket.hpp:341
Try< Address > bind(const Address &address)
Assigns the specified address to the socket.
friend class Socket
Definition: socket.hpp:424
virtual Future< Nothing > connect(const Address &address)=0
Future< size_t > send(const char *data, size_t size) const
Definition: socket.hpp:370
const char * kind()
virtual Try< Nothing > listen(int backlog)=0
Try< Address > address() const
Returns the Address with the assigned ip and assigned port.
#define CHECK_SOME(expression)
Definition: check.hpp:44
SocketImpl(int_fd _s)
Definition: socket.hpp:202
Try< Nothing > close(int fd)
Definition: close.hpp:24
const short WRITE
A possible event while polling.
Definition: io.hpp:39
Implementation interface for a Socket.
Definition: socket.hpp:55
static Try error(const E &e)
Definition: try.hpp:42
#define UNREACHABLE()
Definition: unreachable.hpp:22
SocketImpl::Kind kind() const
Returns the kind representing the underlying implementation of the Socket instance.
Definition: socket.hpp:306
Try< AddressType > bind(const AddressType &address)
Definition: socket.hpp:336
Result< Process > process(pid_t pid)
Definition: freebsd.hpp:30
Definition: none.hpp:27
bool isError() const
Definition: try.hpp:71
bool operator==(const Socket &that) const
Definition: socket.hpp:311
Try< Nothing > shutdown(Shutdown shutdown=Shutdown::READ)
Definition: socket.hpp:399
Future< Nothing > send(const std::string &data)
Definition: socket.hpp:385
constexpr int SHUT_RD
Definition: windows.hpp:197
Try< Nothing > create(const std::string &hierarchy, const std::string &cgroup, bool recursive=false)
Future< Socket > accept()
Definition: socket.hpp:346
int int_fd
Definition: int_fd.hpp:35
virtual ~SocketImpl()
Definition: socket.hpp:105
An abstraction around a socket (file descriptor).
Definition: socket.hpp:241
Family
Definition: address.hpp:286
Try< AddressType > address() const
Definition: socket.hpp:321
constexpr int SHUT_WR
Definition: windows.hpp:198
const T & get() const
Definition: try.hpp:73
Try< AddressType > peer() const
Definition: socket.hpp:326
static Try< std::shared_ptr< SocketImpl > > create(int_fd s, Kind kind=DEFAULT_KIND())
Returns an instance of a SocketImpl using the specified kind of implementation.
Definition: future.hpp:57
Future< Nothing > connect(const AddressType &address)
Definition: socket.hpp:360
virtual Future< size_t > recv(char *data, size_t size)=0