Apache Mesos
http_connection.hpp
Go to the documentation of this file.
1 // Licensed to the Apache Software Foundation (ASF) under one
2 // or more contributor license agreements. See the NOTICE file
3 // distributed with this work for additional information
4 // regarding copyright ownership. The ASF licenses this file
5 // to you under the Apache License, Version 2.0 (the
6 // "License"); you may not use this file except in compliance
7 // with the License. You may obtain a copy of the License at
8 //
9 // http://www.apache.org/licenses/LICENSE-2.0
10 //
11 // Unless required by applicable law or agreed to in writing, software
12 // distributed under the License is distributed on an "AS IS" BASIS,
13 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 // See the License for the specific language governing permissions and
15 // limitations under the License.
16 
17 #ifndef __RESOURCE_PROVIDER_HTTP_CONNECTION_HPP__
18 #define __RESOURCE_PROVIDER_HTTP_CONNECTION_HPP__
19 
20 #include <glog/logging.h>
21 
22 #include <functional>
23 #include <ostream>
24 #include <string>
25 #include <tuple>
26 #include <queue>
27 #include <utility>
28 
29 #include <mesos/http.hpp>
30 
31 #include <mesos/v1/mesos.hpp>
32 
33 #include <process/async.hpp>
34 #include <process/collect.hpp>
35 #include <process/defer.hpp>
36 #include <process/dispatch.hpp>
37 #include <process/future.hpp>
38 #include <process/http.hpp>
39 #include <process/mutex.hpp>
40 #include <process/owned.hpp>
41 #include <process/process.hpp>
42 
43 #include <stout/duration.hpp>
44 #include <stout/nothing.hpp>
45 #include <stout/recordio.hpp>
46 #include <stout/result.hpp>
47 #include <stout/unreachable.hpp>
48 #include <stout/uuid.hpp>
49 
50 #include "common/http.hpp"
51 #include "common/recordio.hpp"
52 
54 
55 namespace mesos {
56 namespace internal {
57 
64 template <typename Call, typename Event>
65 class HttpConnectionProcess
66  : public process::Process<HttpConnectionProcess<Call, Event>>
67 {
68 public:
85  const std::string& prefix,
87  ContentType _contentType,
88  const std::function<Option<Error>(const Call&)>& validate,
89  const std::function<void(void)>& connected,
90  const std::function<void(void)>& disconnected,
91  const std::function<void(const std::queue<Event>&)>& received)
92  : process::ProcessBase(process::ID::generate(prefix)),
93  state(State::DISCONNECTED),
94  contentType(_contentType),
95  callbacks {validate, connected, disconnected, received},
96  detector(std::move(_detector)) {}
97 
99  {
100  Option<Error> error = callbacks.validate(call);
101 
102  if (error.isSome()) {
103  return process::Failure(error->message);
104  }
105 
106  if (endpoint.isNone()) {
107  return process::Failure("Not connected to an endpoint");
108  }
109 
110  if (call.type() == Call::SUBSCRIBE && state != State::CONNECTED) {
111  // It might be possible that the client is retrying. We drop the
112  // request if we have an ongoing subscribe request in flight or
113  // if the client is already subscribed.
114  return process::Failure(
115  "Cannot process 'SUBSCRIBE' call as the driver is in "
116  "state " + stringify(state));
117  }
118 
119  if (call.type() != Call::SUBSCRIBE && state != State::SUBSCRIBED) {
120  // We drop all non-subscribe calls if we are not currently subscribed.
121  return process::Failure(
122  "Cannot process '" + stringify(call.type()) + "' call "
123  "as the driver is in state " + stringify(state));
124  }
125 
126  CHECK(state == State::CONNECTED || state == State::SUBSCRIBED);
127  CHECK_SOME(connections);
128 
129  VLOG(1) << "Sending " << call.type() << " call to " << endpoint.get();
130 
132  request.method = "POST";
133  request.url = endpoint.get();
134  request.body = serialize(contentType, call);
135  request.keepAlive = true;
136  request.headers = {{"Accept", stringify(contentType)},
137  {"Content-Type", stringify(contentType)}};
138 
140  if (call.type() == Call::SUBSCRIBE) {
141  CHECK_EQ(State::CONNECTED, state);
142  state = State::SUBSCRIBING;
143 
144  // Send a streaming request for Subscribe call.
145  response = connections->subscribe.send(request, true);
146  } else {
147  if (streamId.isSome()) {
148  // Set the stream ID associated with this connection.
149  request.headers["Mesos-Stream-Id"] = streamId->toString();
150  }
151 
152  response = connections->nonSubscribe.send(request);
153  }
154 
155  CHECK_SOME(connectionId);
156 
157  return response.then(
158  defer(self(),
159  &Self::_send,
160  connectionId.get(),
161  call,
162  lambda::_1));
163  }
164 
165  void start()
166  {
167  detection = detector->detect(None())
168  .onAny(defer(self(), &Self::detected, lambda::_1));
169  }
170 
171 protected:
172  // Because we're deriving from a templated base class, we have
173  // to explicitly bring these hidden base class names into scope.
176 
177  void finalize() override
178  {
179  disconnect();
180  }
181 
183  {
184  if (future.isFailed()) {
185  LOG(WARNING) << "Failed to detect an endpoint: " << future.failure();
186 
187  // TODO(nfnt): A non-retryable error might be the reason for the
188  // failed future. In that case the client should be informed
189  // about this error and the URL dectection aborted.
190  }
191 
192  // Invoke the disconnected callback if we were previously connected.
193  switch (state) {
194  case State::CONNECTING:
195  case State::DISCONNECTED:
196  break;
197  case State::CONNECTED:
198  case State::SUBSCRIBING:
199  case State::SUBSCRIBED: {
200  mutex.lock()
201  .then(defer(self(), [this]() {
202  return process::async(callbacks.disconnected);
203  }))
204  .onAny(lambda::bind(&process::Mutex::unlock, mutex));
205  }
206  }
207 
208  disconnect();
209 
210  if (future.isDiscarded()) {
211  LOG(INFO) << "Re-detecting endpoint";
212 
213  endpoint = None();
214  } else if (future->isNone()) {
215  LOG(INFO) << "Lost endpoint";
216 
217  endpoint = None();
218  } else {
219  endpoint = future->get();
220 
221  LOG(INFO) << "New endpoint detected at " << endpoint.get();
222 
223  connectionId = id::UUID::random();
224 
225  dispatch(self(), &Self::connect, connectionId.get());
226  }
227 
228  detection = detector->detect(endpoint)
229  .onAny(defer(self(), &Self::detected, lambda::_1));
230  }
231 
232  void connect(const id::UUID& _connectionId)
233  {
234  // It is possible that a new endpoint was detected while we were
235  // waiting to establish a connection with the old master.
236  if (connectionId != _connectionId) {
237  VLOG(1) << "Ignoring connection attempt from stale connection";
238  return;
239  }
240 
241  CHECK_SOME(endpoint);
242  CHECK_EQ(State::DISCONNECTED, state);
243 
244  state = State::CONNECTING;
245 
246  // We create two persistent connections here, one for subscribe
247  // call/streaming response and another for non-subscribe
248  // calls/responses.
249  collect(
250  process::http::connect(endpoint.get()),
251  process::http::connect(endpoint.get()))
252  .onAny(defer(self(), &Self::connected, connectionId.get(), lambda::_1));
253  }
254 
255  void connected(
256  const id::UUID& _connectionId,
257  const process::Future<std::tuple<
259  {
260  // It is possible that a new endpoint was detected while we had an
261  // ongoing (re-)connection attempt with the old endpoint.
262  if (connectionId != _connectionId) {
263  VLOG(1) << "Ignoring connection attempt from stale connection";
264  return;
265  }
266 
267  CHECK_EQ(State::CONNECTING, state);
268 
269  if (!_connections.isReady()) {
270  disconnected(connectionId.get(),
271  _connections.isFailed()
272  ? _connections.failure()
273  : "Connection future discarded");
274  return;
275  }
276 
277  VLOG(1) << "Connected with the remote endpoint at " << endpoint.get();
278 
279  state = State::CONNECTED;
280 
281  connections = Connections {
282  std::get<0>(_connections.get()),
283  std::get<1>(_connections.get())};
284 
285  connections->subscribe.disconnected()
286  .onAny(defer(
287  self(),
289  connectionId.get(),
290  "Subscribe connection interrupted"));
291 
292  connections->nonSubscribe.disconnected()
293  .onAny(defer(
294  self(),
296  connectionId.get(),
297  "Non-subscribe connection interrupted"));
298 
299  // Invoke the connected callback once we have established both
300  // subscribe and non-subscribe connections with the master.
301  mutex.lock()
302  .then(defer(self(), [this]() {
303  return process::async(callbacks.connected);
304  }))
305  .onAny(lambda::bind(&process::Mutex::unlock, mutex));
306  }
307 
308  void disconnect()
309  {
310  if (connections.isSome()) {
311  connections->subscribe.disconnect();
312  connections->nonSubscribe.disconnect();
313  }
314 
315  if (subscribed.isSome()) {
316  subscribed->reader.close();
317  }
318 
319  state = State::DISCONNECTED;
320 
321  connections = None();
322  subscribed = None();
323  endpoint = None();
324  connectionId = None();
325  detection.discard();
326  }
327 
328  void disconnected(const id::UUID& _connectionId, const std::string& failure)
329  {
330  // Ignore if the disconnection happened from an old stale connection.
331  if (connectionId != _connectionId) {
332  VLOG(1) << "Ignoring disconnection attempt from stale connection";
333  return;
334  }
335 
336  // We can reach here if we noticed a disconnection for either of
337  // subscribe/non-subscribe connections. We discard the future here
338  // to trigger an endpoint re-detection.
339  detection.discard();
340  }
341 
343  const id::UUID& _connectionId,
344  const Call& call,
345  const process::http::Response& response)
346  {
347  // It is possible that we detected a new endpoint before a
348  // response could be received.
349  if (connectionId != _connectionId) {
350  return process::Failure("Ignoring response from stale connection");
351  }
352 
353  CHECK(state == State::SUBSCRIBING || state == State::SUBSCRIBED) << state;
354 
355  if (response.code == process::http::Status::OK) {
356  // Only SUBSCRIBE call should get a "200 OK" response.
357  CHECK_EQ(Call::SUBSCRIBE, call.type());
358  CHECK_EQ(process::http::Response::PIPE, response.type);
359  CHECK_SOME(response.reader);
360 
361  state = State::SUBSCRIBED;
362 
363  process::http::Pipe::Reader reader = response.reader.get();
364 
365  auto deserializer =
366  lambda::bind(deserialize<Event>, contentType, lambda::_1);
367 
370  ::recordio::Decoder<Event>(deserializer),
371  reader));
372 
373  subscribed = SubscribedResponse(reader, std::move(decoder));
374 
375  if (response.headers.contains("Mesos-Stream-Id")) {
376  Try<id::UUID> uuid =
377  id::UUID::fromString(response.headers.at("Mesos-Stream-Id"));
378 
379  CHECK_SOME(uuid);
380 
381  streamId = uuid.get();
382  }
383 
384  read();
385 
386  return Nothing();
387  }
388 
389  if (response.code == process::http::Status::ACCEPTED) {
390  // Only non SUBSCRIBE calls should get a "202 Accepted" response.
391  CHECK_NE(Call::SUBSCRIBE, call.type());
392  return Nothing();
393  }
394 
395  // We reset the state to connected if the subscribe call did not
396  // succceed. We can then retry the subscribe call.
397  if (call.type() == Call::SUBSCRIBE) {
398  state = State::CONNECTED;
399  }
400 
403  return process::Failure(
404  "Received '" + response.status + "' (" + response.body + ")");
405  }
406 
407  return process::Failure(
408  "Received unexpected '" + response.status +
409  "' (" + response.body + ")");
410  }
411 
412  void read()
413  {
414  subscribed->decoder->read()
415  .onAny(defer(self(),
416  &Self::_read,
417  subscribed->reader,
418  lambda::_1));
419  }
420 
421  void _read(
422  const process::http::Pipe::Reader& reader,
423  const process::Future<Result<Event>>& event)
424  {
425  CHECK(!event.isDiscarded());
426 
427  // Ignore enqueued events from the previous Subscribe call reader.
428  if (!subscribed.isSome() || subscribed->reader != reader) {
429  VLOG(1) << "Ignoring event from old stale connection";
430  return;
431  }
432 
433  CHECK_EQ(State::SUBSCRIBED, state);
434  CHECK_SOME(connectionId);
435 
436  if (event.isFailed()) {
437  LOG(ERROR) << "Failed to decode stream of events: "
438  << event.failure();
439 
440  disconnected(connectionId.get(), event.failure());
441  return;
442  }
443 
444  if (event->isNone()) {
445  const std::string error = "End-Of-File received";
446  LOG(ERROR) << error;
447 
448  disconnected(connectionId.get(), error);
449  return;
450  }
451 
452  if (event->isError()) {
453  LOG(ERROR) << "Failed to de-serialize event: " << event->error();
454  } else {
455  receive(event->get());
456  }
457 
458  read();
459  }
460 
461  void receive(const Event& event)
462  {
463  // Check if we're are no longer subscribed but received an event.
464  if (state != State::SUBSCRIBED) {
465  LOG(WARNING) << "Ignoring " << stringify(event.type())
466  << " event because we're no longer subscribed";
467  return;
468  }
469 
470  // Queue up the event and invoke the 'received' callback if this
471  // is the first event (between now and when the 'received'
472  // callback actually gets invoked more events might get queued).
473  events.push(event);
474 
475  if (events.size() == 1) {
476  mutex.lock()
477  .then(defer(self(), [this]() {
478  process::Future<Nothing> future =
479  process::async(callbacks.received, events);
480  events = std::queue<Event>();
481  return future;
482  }))
483  .onAny(lambda::bind(&process::Mutex::unlock, mutex));
484  }
485  }
486 
487 private:
488  struct Callbacks
489  {
490  std::function<Option<Error>(const Call&)> validate;
491  std::function<void(void)> connected;
492  std::function<void(void)> disconnected;
493  std::function<void(const std::queue<Event>&)> received;
494  };
495 
496  struct Connections
497  {
498  process::http::Connection subscribe;
499  process::http::Connection nonSubscribe;
500  };
501 
502  struct SubscribedResponse
503  {
504  SubscribedResponse(
507  : reader(std::move(_reader)),
508  decoder(std::move(_decoder)) {}
509 
510  // The decoder cannot be copied meaningfully, see MESOS-5122.
511  SubscribedResponse(const SubscribedResponse&) = delete;
512  SubscribedResponse& operator=(const SubscribedResponse&) = delete;
513  SubscribedResponse& operator=(SubscribedResponse&&) = default;
514  SubscribedResponse(SubscribedResponse&&) = default;
515 
518  };
519 
520  enum class State
521  {
522  DISCONNECTED, // Either of subscribe/non-subscribe connection is broken.
523  CONNECTING, // Trying to establish subscribe and non-subscribe connections.
524  CONNECTED, // Established subscribe and non-subscribe connections.
525  SUBSCRIBING, // Trying to subscribe with the remote endpoint.
526  SUBSCRIBED // Subscribed with the remote endpoint.
527  };
528 
529  friend std::ostream& operator<<(std::ostream& stream, State state)
530  {
531  switch (state) {
532  case State::DISCONNECTED: return stream << "DISCONNECTED";
533  case State::CONNECTING: return stream << "CONNECTING";
534  case State::CONNECTED: return stream << "CONNECTED";
535  case State::SUBSCRIBING: return stream << "SUBSCRIBING";
536  case State::SUBSCRIBED: return stream << "SUBSCRIBED";
537  }
538 
539  UNREACHABLE();
540  }
541 
542  State state;
543  Option<Connections> connections;
544  Option<SubscribedResponse> subscribed;
546  const mesos::ContentType contentType;
547  const Callbacks callbacks;
548  process::Mutex mutex; // Used to serialize the callback invocations.
550  std::queue<Event> events;
551 
552  // There can be multiple simulataneous ongoing (re-)connection
553  // attempts with the remote endpoint (e.g., the endpoint failed over
554  // while an attempt was in progress). This helps us in uniquely
555  // identifying the current connection instance and ignoring the
556  // stale instance.
557  Option<id::UUID> connectionId;
558  Option<id::UUID> streamId;
559 
561 };
562 
563 } // namespace internal {
564 } // namespace mesos {
565 
566 #endif // __RESOURCE_PROVIDER_HTTP_CONNECTION_HPP__
void connected(const id::UUID &_connectionId, const process::Future< std::tuple< process::http::Connection, process::http::Connection >> &_connections)
Definition: http_connection.hpp:255
std::string generate(const std::string &prefix="")
Returns &#39;prefix(N)&#39; where N represents the number of instances where the same prefix (wrt...
void disconnected(const id::UUID &_connectionId, const std::string &failure)
Definition: http_connection.hpp:328
Definition: nothing.hpp:16
ContentType
Definition: http.hpp:43
Option< Error > validate(const std::string &imageDir)
void unlock()
Definition: mutex.hpp:50
Future< Response > request(const Request &request, bool streamedResponse=false)
Asynchronously sends an HTTP request to the process and returns the HTTP response once the entire res...
ProcessBase(const std::string &id="")
T & get()&
Definition: try.hpp:73
URL url
Definition: http.hpp:529
Definition: check.hpp:33
std::string status
Definition: http.hpp:621
Definition: future.hpp:665
Definition: http.hpp:651
uint16_t code
Definition: http.hpp:658
constexpr const char * prefix
Definition: os.hpp:94
void receive(const Event &event)
Definition: http_connection.hpp:461
mesos::v1::scheduler::Call Call
Definition: mesos.hpp:2555
HTTP connection handler.
Definition: resource_provider.hpp:41
bool discard()
Definition: future.hpp:1162
Definition: check.hpp:30
bool isSome() const
Definition: option.hpp:115
void connect(const id::UUID &_connectionId)
Definition: http_connection.hpp:232
Definition: http.hpp:518
mesos::v1::scheduler::Event Event
Definition: mesos.hpp:2556
Option< Pipe::Reader > reader
Definition: http.hpp:656
HttpConnectionProcess< Call, Event > Self
Definition: http_connection.hpp:175
Provides RecordIO decoding on top of an http::Pipe::Reader.
Definition: recordio.hpp:62
process::Future< Nothing > send(const Call &call)
Definition: http_connection.hpp:98
#define CHECK_SOME(expression)
Definition: check.hpp:50
static UUID random()
Definition: uuid.hpp:38
std::string body
Definition: http.hpp:654
static const uint16_t SERVICE_UNAVAILABLE
Definition: http.hpp:253
const Future< T > & onAny(AnyCallback &&callback) const
Definition: future.hpp:1447
void detected(const process::Future< Option< process::http::URL >> &future)
Definition: http_connection.hpp:182
Definition: uuid.hpp:35
Definition: spec.hpp:30
static const uint16_t ACCEPTED
Definition: http.hpp:220
HttpConnectionProcess(const std::string &prefix, process::Owned< EndpointDetector > _detector, ContentType _contentType, const std::function< Option< Error >(const Call &)> &validate, const std::function< void(void)> &connected, const std::function< void(void)> &disconnected, const std::function< void(const std::queue< Event > &)> &received)
Construct a HTTP connection process.
Definition: http_connection.hpp:84
const T & get() const &
Definition: option.hpp:118
Future< Connection > connect(const network::Address &address, Scheme scheme)
bool keepAlive
Definition: http.hpp:543
static const uint16_t OK
Definition: http.hpp:218
void start()
Definition: http_connection.hpp:165
void dispatch(const UPID &pid, std::unique_ptr< lambda::CallableOnce< void(ProcessBase *)>> f, const Option< const std::type_info * > &functionType=None())
Future< typename result_of< F()>::type > async(const F &f, typename std::enable_if<!std::is_void< typename result_of< F()>::type >::value >::type *=nullptr)
Definition: async.hpp:238
#define UNREACHABLE()
Definition: unreachable.hpp:22
void finalize() override
Invoked when a process is terminated.
Definition: http_connection.hpp:177
Future< X > then(lambda::CallableOnce< Future< X >(const T &)> f) const
Definition: future.hpp:1581
Future< Nothing > lock()
Definition: mutex.hpp:33
const std::string message
Definition: errorbase.hpp:45
Definition: none.hpp:27
Definition: attributes.hpp:24
std::string error(const std::string &msg, uint32_t code)
std::string method
Definition: http.hpp:523
Definition: executor.hpp:47
Definition: http.hpp:595
std::string body
Definition: http.hpp:563
PID< HttpConnectionProcess< Call, Event > > self() const
Returns the PID of the process.
Definition: process.hpp:510
bool isNone() const
Definition: option.hpp:116
process::Future< Nothing > _send(const id::UUID &_connectionId, const Call &call, const process::http::Response &response)
Definition: http_connection.hpp:342
std::string toString() const
Definition: uuid.hpp:87
Definition: mutex.hpp:28
Try< Nothing > bind(int_fd s, const Address &address)
Definition: network.hpp:46
void read()
Definition: http_connection.hpp:412
std::string serialize(ContentType contentType, const google::protobuf::Message &message)
static const uint16_t NOT_FOUND
Definition: http.hpp:236
std::string stringify(int flags)
Definition: owned.hpp:36
Headers headers
Definition: http.hpp:623
void disconnect()
Definition: http_connection.hpp:308
Definition: process.hpp:501
enum process::http::Response::@4 type
Deferred< void()> defer(const PID< T > &pid, void(T::*method)())
Definition: defer.hpp:35
bool contains(const Key &key) const
Definition: hashmap.hpp:86
static Try< UUID > fromString(const std::string &s)
Definition: uuid.hpp:67
Represents a connection to an HTTP server.
Definition: http.hpp:945
Given a decoding function for individual records, this provides decoding from "Record-IO" data into t...
Definition: recordio.hpp:82
Definition: http.hpp:302
void _read(const process::http::Pipe::Reader &reader, const process::Future< Result< Event >> &event)
Definition: http_connection.hpp:421
Headers headers
Definition: http.hpp:531
friend std::ostream & operator<<(std::ostream &stream, State state)
Definition: http_connection.hpp:529
Future< std::list< T > > collect(const std::list< Future< T >> &futures)
Definition: collect.hpp:270