Apache Mesos
socket.hpp
Go to the documentation of this file.
1 // Licensed under the Apache License, Version 2.0 (the "License");
2 // you may not use this file except in compliance with the License.
3 // You may obtain a copy of the License at
4 //
5 // http://www.apache.org/licenses/LICENSE-2.0
6 //
7 // Unless required by applicable law or agreed to in writing, software
8 // distributed under the License is distributed on an "AS IS" BASIS,
9 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
10 // See the License for the specific language governing permissions and
11 // limitations under the License
12 
13 #ifndef __PROCESS_SOCKET_HPP__
14 #define __PROCESS_SOCKET_HPP__
15 
16 #ifndef __WINDOWS__
17 #include <sys/socket.h>
18 #include <sys/wait.h>
19 #endif // __WINDOWS__
20 
21 #include <memory>
22 
23 #include <process/address.hpp>
24 #include <process/future.hpp>
25 
26 #include <stout/abort.hpp>
27 #include <stout/error.hpp>
28 #include <stout/nothing.hpp>
29 #include <stout/try.hpp>
30 #include <stout/unreachable.hpp>
31 #ifdef __WINDOWS__
32 #include <stout/windows.hpp>
33 #endif // __WINDOWS__
34 
35 #include <stout/os/int_fd.hpp>
36 
37 
38 namespace process {
39 namespace network {
40 namespace internal {
41 
56 class SocketImpl : public std::enable_shared_from_this<SocketImpl>
57 {
58 public:
65  enum class Kind
66  {
67  POLL,
68 #ifdef USE_SSL_SOCKET
69  SSL
70 #endif
71  };
72 
76  static Kind DEFAULT_KIND();
77 
88  int_fd s,
89  Kind kind = DEFAULT_KIND());
90 
100  // TODO(josephw): MESOS-5729: Consider making the CLOEXEC option
101  // configurable by the caller of the interface.
103  Address::Family family,
104  Kind kind = DEFAULT_KIND());
105 
106  virtual ~SocketImpl()
107  {
108  // Don't close if the socket was released.
109  if (s >= 0) {
110  CHECK_SOME(os::close(s)) << "Failed to close socket";
111  }
112  }
113 
117  int_fd get() const
118  {
119  return s;
120  }
121 
125  Try<Address> address() const;
126 
130  Try<Address> peer() const;
131 
138  Try<Address> bind(const Address& address);
139 
140  virtual Try<Nothing> listen(int backlog) = 0;
141 
151 
152  virtual Future<Nothing> connect(const Address& address) = 0;
153  virtual Future<size_t> recv(char* data, size_t size) = 0;
154  virtual Future<size_t> send(const char* data, size_t size) = 0;
155  virtual Future<size_t> sendfile(int_fd fd, off_t offset, size_t size) = 0;
156 
171  // TODO(benh): Consider returning Owned<std::string> or
172  // Shared<std::string>, the latter enabling reuse of a pool of
173  // preallocated strings/buffers.
174  virtual Future<std::string> recv(const Option<ssize_t>& size = None());
175 
183  // TODO(benh): Consider taking Shared<std::string>, the latter
184  // enabling reuse of a pool of preallocated strings/buffers.
185  virtual Future<Nothing> send(const std::string& data);
186 
192  {
193  if (::shutdown(s, how) < 0) {
194  return SocketError();
195  }
196 
197  return Nothing();
198  }
199 
200  virtual Kind kind() const = 0;
201 
202 protected:
203  explicit SocketImpl(int_fd _s) : s(_s) { CHECK(s >= 0); }
204 
212  {
213  int_fd released = s;
214  s = -1;
215  return released;
216  }
217 
221  template <typename T>
222  static std::shared_ptr<T> shared(T* t)
223  {
224  std::shared_ptr<T> pointer =
225  std::dynamic_pointer_cast<T>(CHECK_NOTNULL(t)->shared_from_this());
226  CHECK(pointer);
227  return pointer;
228  }
229 
231 };
232 
233 
241 template <typename AddressType>
242 class Socket
243 {
244 public:
245  static_assert(
246  std::is_convertible<AddressType, network::Address>::value,
247  "Requires type convertible to `network::Address`");
248 
259  int_fd s,
261  {
263  if (impl.isError()) {
264  return Error(impl.error());
265  }
266  return Socket(impl.get());
267  }
268 
279  // TODO(josephw): MESOS-5729: Consider making the CLOEXEC option
280  // configurable by the caller of the interface.
282 
297  static Try<Socket> create(
298  Address::Family family,
300 
308  {
309  return impl->kind();
310  }
311 
312  bool operator==(const Socket& that) const
313  {
314  return impl == that.impl;
315  }
316 
317  operator int_fd() const
318  {
319  return impl->get();
320  }
321 
323  {
324  return convert<AddressType>(impl->address());
325  }
326 
328  {
329  return convert<AddressType>(impl->peer());
330  }
331 
332  int_fd get() const
333  {
334  return impl->get();
335  }
336 
337  Try<AddressType> bind(const AddressType& address)
338  {
339  return convert<AddressType>(impl->bind(address));
340  }
341 
342  Try<Nothing> listen(int backlog)
343  {
344  return impl->listen(backlog);
345  }
346 
348  {
349  // NOTE: We save a reference to the listening socket itself
350  // (i.e., 'this') so that we don't close the listening socket
351  // while 'accept' is in flight.
352  std::shared_ptr<SocketImpl> self = impl->shared_from_this();
353 
354  return impl->accept()
355  // TODO(benh): Use && for `accepted` here!
356  .then([self](const std::shared_ptr<SocketImpl>& accepted) {
357  return Socket(accepted);
358  });
359  }
360 
361  Future<Nothing> connect(const AddressType& address)
362  {
363  return impl->connect(address);
364  }
365 
366  Future<size_t> recv(char* data, size_t size) const
367  {
368  return impl->recv(data, size);
369  }
370 
371  Future<size_t> send(const char* data, size_t size) const
372  {
373  return impl->send(data, size);
374  }
375 
376  Future<size_t> sendfile(int_fd fd, off_t offset, size_t size) const
377  {
378  return impl->sendfile(fd, offset, size);
379  }
380 
382  {
383  return impl->recv(size);
384  }
385 
386  Future<Nothing> send(const std::string& data)
387  {
388  return impl->send(data);
389  }
390 
391  enum class Shutdown
392  {
393  READ,
394  WRITE,
395  READ_WRITE
396  };
397 
398  // TODO(benh): Replace the default to Shutdown::READ_WRITE or remove
399  // all together since it's unclear what the default should be.
401  {
402  int how = [&]() {
403  switch (shutdown) {
404  case Shutdown::READ: return SHUT_RD;
405  case Shutdown::WRITE: return SHUT_WR;
406  case Shutdown::READ_WRITE: return SHUT_RDWR;
407  }
408  UNREACHABLE();
409  }();
410 
411  return impl->shutdown(how);
412  }
413 
414  // Support implicit conversion of any `Socket<AddressType>` to a
415  // `Socket<network::Address>`.
416  operator Socket<network::Address>() const
417  {
418  return Socket<network::Address>(impl);
419  }
420 
421 private:
422  // Necessary to support the implicit conversion operator from any
423  // `Socket<AddressType>` to `Socket<network::Address>`.
424  template <typename T>
425  friend class Socket;
426 
427  explicit Socket(std::shared_ptr<SocketImpl>&& that) : impl(std::move(that)) {}
428 
429  explicit Socket(const std::shared_ptr<SocketImpl>& that) : impl(that) {}
430 
431  std::shared_ptr<SocketImpl> impl;
432 };
433 
434 } // namespace internal {
435 
436 
438 
439 namespace inet {
441 } // namespace inet {
442 
443 #ifndef __WINDOWS__
444 namespace unix {
446 } // namespace unix {
447 #endif // __WINDOWS__
448 
449 
450 namespace internal {
451 
452 template <>
454  SocketImpl::Kind kind) = delete;
455 
456 
457 template <>
459  Address::Family family,
460  SocketImpl::Kind kind)
461 {
463  if (impl.isError()) {
464  return Error(impl.error());
465  }
466  return Socket(impl.get());
467 }
468 
469 
470 template <>
472  SocketImpl::Kind kind)
473 {
474  // TODO(benh): Replace this function which defaults to IPv4 in
475  // exchange for explicit IPv4 and IPv6 versions.
478  if (impl.isError()) {
479  return Error(impl.error());
480  }
481  return Socket(impl.get());
482 }
483 
484 
485 template <>
487  Address::Family family,
488  SocketImpl::Kind kind) = delete;
489 
490 
491 #ifndef __WINDOWS__
492 template <>
494  SocketImpl::Kind kind)
495 {
498  if (impl.isError()) {
499  return Error(impl.error());
500  }
501  return Socket(impl.get());
502 }
503 
504 
505 template <>
507  Address::Family family,
508  SocketImpl::Kind kind) = delete;
509 #endif // __WINDOWS__
510 
511 } // namespace internal {
512 } // namespace network {
513 } // namespace process {
514 
515 #endif // __PROCESS_SOCKET_HPP__
network::internal::Socket< network::Address > Socket
Definition: socket.hpp:437
const short READ
A possible event while polling.
Definition: io.hpp:35
Future< size_t > sendfile(int_fd fd, off_t offset, size_t size) const
Definition: socket.hpp:376
Future< std::string > recv(const Option< ssize_t > &size=None())
Definition: socket.hpp:381
Definition: nothing.hpp:16
virtual Future< std::shared_ptr< SocketImpl > > accept()=0
Returns an implementation corresponding to the next pending connection for the listening socket...
Definition: errorbase.hpp:36
Definition: option.hpp:28
Try< Bytes > size(const std::string &path, const FollowSymlink follow=FollowSymlink::FOLLOW_SYMLINK)
Definition: stat.hpp:121
int_fd release()
Releases ownership of the file descriptor.
Definition: socket.hpp:211
T & get()&
Definition: try.hpp:73
virtual Future< size_t > sendfile(int_fd fd, off_t offset, size_t size)=0
Try< Address > peer() const
Returns the peer&#39;s Address for the accepted or connected socket.
Future< size_t > recv(char *data, size_t size) const
Definition: socket.hpp:366
Definition: check.hpp:33
int_fd s
Definition: socket.hpp:230
static std::shared_ptr< T > shared(T *t)
Returns a std::shared_ptr<T> from this implementation.
Definition: socket.hpp:222
Kind
Available kinds of implementations.
Definition: socket.hpp:65
static Try< Socket > create(int_fd s, SocketImpl::Kind kind=SocketImpl::DEFAULT_KIND())
Returns an instance of a Socket using the specified kind of implementation.
Definition: socket.hpp:258
virtual Future< size_t > send(const char *data, size_t size)=0
Definition: type_utils.hpp:510
Definition: address.hpp:276
static Kind DEFAULT_KIND()
Returns the default Kind of implementation.
constexpr int SHUT_RDWR
Definition: windows.hpp:193
virtual Try< Nothing, SocketError > shutdown(int how)
Shuts down the socket.
Definition: socket.hpp:191
Try< Nothing > listen(int backlog)
Definition: socket.hpp:342
Try< Address > bind(const Address &address)
Assigns the specified address to the socket.
virtual Future< Nothing > connect(const Address &address)=0
Future< size_t > send(const char *data, size_t size) const
Definition: socket.hpp:371
ErrnoError SocketError
Definition: error.hpp:33
Try< Nothing, SocketError > shutdown(Shutdown shutdown=Shutdown::READ)
Definition: socket.hpp:400
virtual Try< Nothing > listen(int backlog)=0
Try< Address > address() const
Returns the Address with the assigned ip and assigned port.
#define CHECK_SOME(expression)
Definition: check.hpp:50
SocketImpl(int_fd _s)
Definition: socket.hpp:203
Try< Nothing > close(int fd)
Definition: close.hpp:24
const short WRITE
A possible event while polling.
Definition: io.hpp:40
Implementation interface for a Socket.
Definition: socket.hpp:56
static Try error(const E &e)
Definition: try.hpp:42
#define UNREACHABLE()
Definition: unreachable.hpp:22
SocketImpl::Kind kind() const
Returns the kind representing the underlying implementation of the Socket instance.
Definition: socket.hpp:307
Try< AddressType > bind(const AddressType &address)
Definition: socket.hpp:337
Definition: none.hpp:27
Definition: attributes.hpp:24
bool isError() const
Definition: try.hpp:71
bool operator==(const Socket &that) const
Definition: socket.hpp:312
Definition: executor.hpp:48
Future< Nothing > send(const std::string &data)
Definition: socket.hpp:386
constexpr int SHUT_RD
Definition: windows.hpp:191
Future< Socket > accept()
Definition: socket.hpp:347
int int_fd
Definition: int_fd.hpp:35
virtual ~SocketImpl()
Definition: socket.hpp:106
An abstraction around a socket (file descriptor).
Definition: socket.hpp:242
Family
Definition: address.hpp:285
Try< AddressType > address() const
Definition: socket.hpp:322
constexpr int SHUT_WR
Definition: windows.hpp:192
Try< AddressType > peer() const
Definition: socket.hpp:327
static Try< std::shared_ptr< SocketImpl > > create(int_fd s, Kind kind=DEFAULT_KIND())
Returns an instance of a SocketImpl using the specified kind of implementation.
Definition: future.hpp:58
Future< Nothing > connect(const AddressType &address)
Definition: socket.hpp:361
virtual Future< size_t > recv(char *data, size_t size)=0