Apache Mesos
http_connection.hpp
Go to the documentation of this file.
1 // Licensed to the Apache Software Foundation (ASF) under one
2 // or more contributor license agreements. See the NOTICE file
3 // distributed with this work for additional information
4 // regarding copyright ownership. The ASF licenses this file
5 // to you under the Apache License, Version 2.0 (the
6 // "License"); you may not use this file except in compliance
7 // with the License. You may obtain a copy of the License at
8 //
9 // http://www.apache.org/licenses/LICENSE-2.0
10 //
11 // Unless required by applicable law or agreed to in writing, software
12 // distributed under the License is distributed on an "AS IS" BASIS,
13 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 // See the License for the specific language governing permissions and
15 // limitations under the License.
16 
17 #ifndef __RESOURCE_PROVIDER_HTTP_CONNECTION_HPP__
18 #define __RESOURCE_PROVIDER_HTTP_CONNECTION_HPP__
19 
20 #include <glog/logging.h>
21 
22 #include <functional>
23 #include <ostream>
24 #include <string>
25 #include <tuple>
26 #include <queue>
27 #include <utility>
28 
29 #include <mesos/http.hpp>
30 
31 #include <mesos/v1/mesos.hpp>
32 
33 #include <process/async.hpp>
34 #include <process/collect.hpp>
35 #include <process/defer.hpp>
36 #include <process/dispatch.hpp>
37 #include <process/future.hpp>
38 #include <process/http.hpp>
39 #include <process/mutex.hpp>
40 #include <process/owned.hpp>
41 #include <process/process.hpp>
42 
43 #include <stout/duration.hpp>
44 #include <stout/nothing.hpp>
45 #include <stout/recordio.hpp>
46 #include <stout/result.hpp>
47 #include <stout/unreachable.hpp>
48 #include <stout/uuid.hpp>
49 
50 #include "common/http.hpp"
51 #include "common/recordio.hpp"
52 
54 
55 namespace mesos {
56 namespace internal {
57 
64 template <typename Call, typename Event>
65 class HttpConnectionProcess
66  : public process::Process<HttpConnectionProcess<Call, Event>>
67 {
68 public:
85  const std::string& prefix,
87  ContentType _contentType,
88  const Option<std::string>& _token,
89  const std::function<Option<Error>(const Call&)>& validate,
90  const std::function<void(void)>& connected,
91  const std::function<void(void)>& disconnected,
92  const std::function<void(const std::queue<Event>&)>& received)
93  : process::ProcessBase(process::ID::generate(prefix)),
94  state(State::DISCONNECTED),
95  contentType(_contentType),
96  token(_token),
97  callbacks {validate, connected, disconnected, received},
98  detector(std::move(_detector)) {}
99 
101  {
102  Option<Error> error = callbacks.validate(call);
103 
104  if (error.isSome()) {
105  return process::Failure(error->message);
106  }
107 
108  if (endpoint.isNone()) {
109  return process::Failure("Not connected to an endpoint");
110  }
111 
112  if (call.type() == Call::SUBSCRIBE && state != State::CONNECTED) {
113  // It might be possible that the client is retrying. We drop the
114  // request if we have an ongoing subscribe request in flight or
115  // if the client is already subscribed.
116  return process::Failure(
117  "Cannot process 'SUBSCRIBE' call as the driver is in "
118  "state " + stringify(state));
119  }
120 
121  if (call.type() != Call::SUBSCRIBE && state != State::SUBSCRIBED) {
122  // We drop all non-subscribe calls if we are not currently subscribed.
123  return process::Failure(
124  "Cannot process '" + stringify(call.type()) + "' call "
125  "as the driver is in state " + stringify(state));
126  }
127 
128  CHECK(state == State::CONNECTED || state == State::SUBSCRIBED);
129  CHECK_SOME(connections);
130 
131  VLOG(1) << "Sending " << call.type() << " call to " << endpoint.get();
132 
134  request.method = "POST";
135  request.url = endpoint.get();
136  request.body = serialize(contentType, call);
137  request.keepAlive = true;
138  request.headers = {{"Accept", stringify(contentType)},
139  {"Content-Type", stringify(contentType)}};
140 
141  if (token.isSome()) {
142  request.headers["Authorization"] = "Bearer " + token.get();
143  }
144 
146  if (call.type() == Call::SUBSCRIBE) {
147  CHECK_EQ(State::CONNECTED, state);
148  state = State::SUBSCRIBING;
149 
150  // Send a streaming request for Subscribe call.
151  response = connections->subscribe.send(request, true);
152  } else {
153  if (streamId.isSome()) {
154  // Set the stream ID associated with this connection.
155  request.headers["Mesos-Stream-Id"] = streamId->toString();
156  }
157 
158  response = connections->nonSubscribe.send(request);
159  }
160 
161  CHECK_SOME(connectionId);
162 
163  return response.then(
164  defer(self(),
165  &Self::_send,
166  connectionId.get(),
167  call,
168  lambda::_1));
169  }
170 
171  void start()
172  {
173  detection = detector->detect(None())
174  .onAny(defer(self(), &Self::detected, lambda::_1));
175  }
176 
177 protected:
178  // Because we're deriving from a templated base class, we have
179  // to explicitly bring these hidden base class names into scope.
182 
183  void finalize() override
184  {
185  disconnect();
186  }
187 
189  {
190  if (future.isFailed()) {
191  LOG(WARNING) << "Failed to detect an endpoint: " << future.failure();
192 
193  // TODO(nfnt): A non-retryable error might be the reason for the
194  // failed future. In that case the client should be informed
195  // about this error and the URL dectection aborted.
196  }
197 
198  // Invoke the disconnected callback if we were previously connected.
199  switch (state) {
200  case State::CONNECTING:
201  case State::DISCONNECTED:
202  break;
203  case State::CONNECTED:
204  case State::SUBSCRIBING:
205  case State::SUBSCRIBED: {
206  mutex.lock()
207  .then(defer(self(), [this]() {
208  return process::async(callbacks.disconnected);
209  }))
210  .onAny(lambda::bind(&process::Mutex::unlock, mutex));
211  }
212  }
213 
214  disconnect();
215 
216  if (future.isDiscarded()) {
217  LOG(INFO) << "Re-detecting endpoint";
218 
219  endpoint = None();
220  } else if (future->isNone()) {
221  LOG(INFO) << "Lost endpoint";
222 
223  endpoint = None();
224  } else {
225  endpoint = future->get();
226 
227  LOG(INFO) << "New endpoint detected at " << endpoint.get();
228 
229  connectionId = id::UUID::random();
230 
231  dispatch(self(), &Self::connect, connectionId.get());
232  }
233 
234  detection = detector->detect(endpoint)
235  .onAny(defer(self(), &Self::detected, lambda::_1));
236  }
237 
238  void connect(const id::UUID& _connectionId)
239  {
240  // It is possible that a new endpoint was detected while we were
241  // waiting to establish a connection with the old master.
242  if (connectionId != _connectionId) {
243  VLOG(1) << "Ignoring connection attempt from stale connection";
244  return;
245  }
246 
247  CHECK_SOME(endpoint);
248  CHECK_EQ(State::DISCONNECTED, state);
249 
250  state = State::CONNECTING;
251 
252  // We create two persistent connections here, one for subscribe
253  // call/streaming response and another for non-subscribe
254  // calls/responses.
255  collect(
256  process::http::connect(endpoint.get()),
257  process::http::connect(endpoint.get()))
258  .onAny(defer(self(), &Self::connected, connectionId.get(), lambda::_1));
259  }
260 
261  void connected(
262  const id::UUID& _connectionId,
263  const process::Future<std::tuple<
265  {
266  // It is possible that a new endpoint was detected while we had an
267  // ongoing (re-)connection attempt with the old endpoint.
268  if (connectionId != _connectionId) {
269  VLOG(1) << "Ignoring connection attempt from stale connection";
270  return;
271  }
272 
273  CHECK_EQ(State::CONNECTING, state);
274 
275  if (!_connections.isReady()) {
276  disconnected(connectionId.get(),
277  _connections.isFailed()
278  ? _connections.failure()
279  : "Connection future discarded");
280  return;
281  }
282 
283  VLOG(1) << "Connected with the remote endpoint at " << endpoint.get();
284 
285  state = State::CONNECTED;
286 
287  connections = Connections {
288  std::get<0>(_connections.get()),
289  std::get<1>(_connections.get())};
290 
291  connections->subscribe.disconnected()
292  .onAny(defer(
293  self(),
295  connectionId.get(),
296  "Subscribe connection interrupted"));
297 
298  connections->nonSubscribe.disconnected()
299  .onAny(defer(
300  self(),
302  connectionId.get(),
303  "Non-subscribe connection interrupted"));
304 
305  // Invoke the connected callback once we have established both
306  // subscribe and non-subscribe connections with the master.
307  mutex.lock()
308  .then(defer(self(), [this]() {
309  return process::async(callbacks.connected);
310  }))
311  .onAny(lambda::bind(&process::Mutex::unlock, mutex));
312  }
313 
314  void disconnect()
315  {
316  if (connections.isSome()) {
317  connections->subscribe.disconnect();
318  connections->nonSubscribe.disconnect();
319  }
320 
321  if (subscribed.isSome()) {
322  subscribed->reader.close();
323  }
324 
325  state = State::DISCONNECTED;
326 
327  connections = None();
328  subscribed = None();
329  endpoint = None();
330  connectionId = None();
331  detection.discard();
332  }
333 
334  void disconnected(const id::UUID& _connectionId, const std::string& failure)
335  {
336  // Ignore if the disconnection happened from an old stale connection.
337  if (connectionId != _connectionId) {
338  VLOG(1) << "Ignoring disconnection attempt from stale connection";
339  return;
340  }
341 
342  // We can reach here if we noticed a disconnection for either of
343  // subscribe/non-subscribe connections. We discard the future here
344  // to trigger an endpoint re-detection.
345  detection.discard();
346  }
347 
349  const id::UUID& _connectionId,
350  const Call& call,
351  const process::http::Response& response)
352  {
353  // It is possible that we detected a new endpoint before a
354  // response could be received.
355  if (connectionId != _connectionId) {
356  return process::Failure("Ignoring response from stale connection");
357  }
358 
359  CHECK(state == State::SUBSCRIBING || state == State::SUBSCRIBED) << state;
360 
361  if (response.code == process::http::Status::OK) {
362  // Only SUBSCRIBE call should get a "200 OK" response.
363  CHECK_EQ(Call::SUBSCRIBE, call.type());
364  CHECK_EQ(process::http::Response::PIPE, response.type);
365  CHECK_SOME(response.reader);
366 
367  state = State::SUBSCRIBED;
368 
369  process::http::Pipe::Reader reader = response.reader.get();
370 
371  auto deserializer =
372  lambda::bind(deserialize<Event>, contentType, lambda::_1);
373 
376  ::recordio::Decoder<Event>(deserializer),
377  reader));
378 
379  subscribed = SubscribedResponse(reader, std::move(decoder));
380 
381  if (response.headers.contains("Mesos-Stream-Id")) {
382  Try<id::UUID> uuid =
383  id::UUID::fromString(response.headers.at("Mesos-Stream-Id"));
384 
385  CHECK_SOME(uuid);
386 
387  streamId = uuid.get();
388  }
389 
390  read();
391 
392  return Nothing();
393  }
394 
395  if (response.code == process::http::Status::ACCEPTED) {
396  // Only non SUBSCRIBE calls should get a "202 Accepted" response.
397  CHECK_NE(Call::SUBSCRIBE, call.type());
398  return Nothing();
399  }
400 
401  // We reset the state to connected if the subscribe call did not
402  // succceed. We can then retry the subscribe call.
403  if (call.type() == Call::SUBSCRIBE) {
404  state = State::CONNECTED;
405  }
406 
409  return process::Failure(
410  "Received '" + response.status + "' (" + response.body + ")");
411  }
412 
413  return process::Failure(
414  "Received unexpected '" + response.status +
415  "' (" + response.body + ")");
416  }
417 
418  void read()
419  {
420  subscribed->decoder->read()
421  .onAny(defer(self(),
422  &Self::_read,
423  subscribed->reader,
424  lambda::_1));
425  }
426 
427  void _read(
428  const process::http::Pipe::Reader& reader,
429  const process::Future<Result<Event>>& event)
430  {
431  CHECK(!event.isDiscarded());
432 
433  // Ignore enqueued events from the previous Subscribe call reader.
434  if (!subscribed.isSome() || subscribed->reader != reader) {
435  VLOG(1) << "Ignoring event from old stale connection";
436  return;
437  }
438 
439  CHECK_EQ(State::SUBSCRIBED, state);
440  CHECK_SOME(connectionId);
441 
442  if (event.isFailed()) {
443  LOG(ERROR) << "Failed to decode stream of events: "
444  << event.failure();
445 
446  disconnected(connectionId.get(), event.failure());
447  return;
448  }
449 
450  if (event->isNone()) {
451  const std::string error = "End-Of-File received";
452  LOG(ERROR) << error;
453 
454  disconnected(connectionId.get(), error);
455  return;
456  }
457 
458  if (event->isError()) {
459  LOG(ERROR) << "Failed to de-serialize event: " << event->error();
460  } else {
461  receive(event->get());
462  }
463 
464  read();
465  }
466 
467  void receive(const Event& event)
468  {
469  // Check if we're are no longer subscribed but received an event.
470  if (state != State::SUBSCRIBED) {
471  LOG(WARNING) << "Ignoring " << stringify(event.type())
472  << " event because we're no longer subscribed";
473  return;
474  }
475 
476  // Queue up the event and invoke the 'received' callback if this
477  // is the first event (between now and when the 'received'
478  // callback actually gets invoked more events might get queued).
479  events.push(event);
480 
481  if (events.size() == 1) {
482  mutex.lock()
483  .then(defer(self(), [this]() {
484  process::Future<Nothing> future =
485  process::async(callbacks.received, events);
486  events = std::queue<Event>();
487  return future;
488  }))
489  .onAny(lambda::bind(&process::Mutex::unlock, mutex));
490  }
491  }
492 
493 private:
494  struct Callbacks
495  {
496  std::function<Option<Error>(const Call&)> validate;
497  std::function<void(void)> connected;
498  std::function<void(void)> disconnected;
499  std::function<void(const std::queue<Event>&)> received;
500  };
501 
502  struct Connections
503  {
504  process::http::Connection subscribe;
505  process::http::Connection nonSubscribe;
506  };
507 
508  struct SubscribedResponse
509  {
510  SubscribedResponse(
513  : reader(std::move(_reader)),
514  decoder(std::move(_decoder)) {}
515 
516  // The decoder cannot be copied meaningfully, see MESOS-5122.
517  SubscribedResponse(const SubscribedResponse&) = delete;
518  SubscribedResponse& operator=(const SubscribedResponse&) = delete;
519  SubscribedResponse& operator=(SubscribedResponse&&) = default;
520  SubscribedResponse(SubscribedResponse&&) = default;
521 
524  };
525 
526  enum class State
527  {
528  DISCONNECTED, // Either of subscribe/non-subscribe connection is broken.
529  CONNECTING, // Trying to establish subscribe and non-subscribe connections.
530  CONNECTED, // Established subscribe and non-subscribe connections.
531  SUBSCRIBING, // Trying to subscribe with the remote endpoint.
532  SUBSCRIBED // Subscribed with the remote endpoint.
533  };
534 
535  friend std::ostream& operator<<(std::ostream& stream, State state)
536  {
537  switch (state) {
538  case State::DISCONNECTED: return stream << "DISCONNECTED";
539  case State::CONNECTING: return stream << "CONNECTING";
540  case State::CONNECTED: return stream << "CONNECTED";
541  case State::SUBSCRIBING: return stream << "SUBSCRIBING";
542  case State::SUBSCRIBED: return stream << "SUBSCRIBED";
543  }
544 
545  UNREACHABLE();
546  }
547 
548  State state;
549  Option<Connections> connections;
550  Option<SubscribedResponse> subscribed;
552  const mesos::ContentType contentType;
553  Option<std::string> token;
554  const Callbacks callbacks;
555  process::Mutex mutex; // Used to serialize the callback invocations.
557  std::queue<Event> events;
558 
559  // There can be multiple simulataneous ongoing (re-)connection
560  // attempts with the remote endpoint (e.g., the endpoint failed over
561  // while an attempt was in progress). This helps us in uniquely
562  // identifying the current connection instance and ignoring the
563  // stale instance.
564  Option<id::UUID> connectionId;
565  Option<id::UUID> streamId;
566 
568 };
569 
570 } // namespace internal {
571 } // namespace mesos {
572 
573 #endif // __RESOURCE_PROVIDER_HTTP_CONNECTION_HPP__
void connected(const id::UUID &_connectionId, const process::Future< std::tuple< process::http::Connection, process::http::Connection >> &_connections)
Definition: http_connection.hpp:261
std::string generate(const std::string &prefix="")
Returns &#39;prefix(N)&#39; where N represents the number of instances where the same prefix (wrt...
void disconnected(const id::UUID &_connectionId, const std::string &failure)
Definition: http_connection.hpp:334
Definition: nothing.hpp:16
ContentType
Definition: http.hpp:43
Option< Error > validate(const std::string &imageDir)
void unlock()
Definition: mutex.hpp:50
Future< Response > request(const Request &request, bool streamedResponse=false)
Asynchronously sends an HTTP request to the process and returns the HTTP response once the entire res...
ProcessBase(const std::string &id="")
T & get()&
Definition: try.hpp:73
URL url
Definition: http.hpp:531
Definition: check.hpp:33
std::string status
Definition: http.hpp:624
Definition: future.hpp:668
Definition: http.hpp:654
uint16_t code
Definition: http.hpp:661
constexpr const char * prefix
Definition: os.hpp:94
void receive(const Event &event)
Definition: http_connection.hpp:467
mesos::v1::scheduler::Call Call
Definition: mesos.hpp:2616
HTTP connection handler.
Definition: resource_provider.hpp:41
bool discard()
Definition: future.hpp:1157
Definition: check.hpp:30
Future< std::vector< T > > collect(const std::vector< Future< T >> &futures)
Definition: collect.hpp:274
bool isSome() const
Definition: option.hpp:115
void connect(const id::UUID &_connectionId)
Definition: http_connection.hpp:238
Definition: http.hpp:520
mesos::v1::scheduler::Event Event
Definition: mesos.hpp:2617
Option< Pipe::Reader > reader
Definition: http.hpp:659
HttpConnectionProcess< Call, Event > Self
Definition: http_connection.hpp:181
Provides RecordIO decoding on top of an http::Pipe::Reader.
Definition: recordio.hpp:62
process::Future< Nothing > send(const Call &call)
Definition: http_connection.hpp:100
#define CHECK_SOME(expression)
Definition: check.hpp:50
static UUID random()
Definition: uuid.hpp:38
std::string body
Definition: http.hpp:657
static const uint16_t SERVICE_UNAVAILABLE
Definition: http.hpp:253
HttpConnectionProcess(const std::string &prefix, process::Owned< EndpointDetector > _detector, ContentType _contentType, const Option< std::string > &_token, const std::function< Option< Error >(const Call &)> &validate, const std::function< void(void)> &connected, const std::function< void(void)> &disconnected, const std::function< void(const std::queue< Event > &)> &received)
Construct a HTTP connection process.
Definition: http_connection.hpp:84
const Future< T > & onAny(AnyCallback &&callback) const
Definition: future.hpp:1442
void detected(const process::Future< Option< process::http::URL >> &future)
Definition: http_connection.hpp:188
Definition: uuid.hpp:35
Definition: spec.hpp:26
static const uint16_t ACCEPTED
Definition: http.hpp:220
const T & get() const &
Definition: option.hpp:118
Future< Connection > connect(const network::Address &address, Scheme scheme)
bool keepAlive
Definition: http.hpp:545
static const uint16_t OK
Definition: http.hpp:218
void start()
Definition: http_connection.hpp:171
void dispatch(const UPID &pid, std::unique_ptr< lambda::CallableOnce< void(ProcessBase *)>> f, const Option< const std::type_info * > &functionType=None())
Future< typename result_of< F()>::type > async(const F &f, typename std::enable_if<!std::is_void< typename result_of< F()>::type >::value >::type *=nullptr)
Definition: async.hpp:238
#define UNREACHABLE()
Definition: unreachable.hpp:22
void finalize() override
Invoked when a process is terminated.
Definition: http_connection.hpp:183
Future< X > then(lambda::CallableOnce< Future< X >(const T &)> f) const
Definition: future.hpp:1576
Future< Nothing > lock()
Definition: mutex.hpp:33
const std::string message
Definition: errorbase.hpp:46
Definition: none.hpp:27
Definition: attributes.hpp:24
std::string error(const std::string &msg, uint32_t code)
std::string method
Definition: http.hpp:525
Definition: executor.hpp:48
Definition: http.hpp:599
std::string body
Definition: http.hpp:565
PID< HttpConnectionProcess< Call, Event > > self() const
Returns the PID of the process.
Definition: process.hpp:510
bool isNone() const
Definition: option.hpp:116
process::Future< Nothing > _send(const id::UUID &_connectionId, const Call &call, const process::http::Response &response)
Definition: http_connection.hpp:348
std::string toString() const
Definition: uuid.hpp:87
Definition: mutex.hpp:28
Try< Nothing > bind(int_fd s, const Address &address)
Definition: network.hpp:46
void read()
Definition: http_connection.hpp:418
std::string serialize(ContentType contentType, const google::protobuf::Message &message)
static const uint16_t NOT_FOUND
Definition: http.hpp:236
std::string stringify(int flags)
Definition: owned.hpp:36
Headers headers
Definition: http.hpp:626
void disconnect()
Definition: http_connection.hpp:314
Definition: process.hpp:501
enum process::http::Response::@4 type
Deferred< void()> defer(const PID< T > &pid, void(T::*method)())
Definition: defer.hpp:35
bool contains(const Key &key) const
Definition: hashmap.hpp:86
static Try< UUID > fromString(const std::string &s)
Definition: uuid.hpp:67
Represents a connection to an HTTP server.
Definition: http.hpp:952
Given a decoding function for individual records, this provides decoding from "Record-IO" data into t...
Definition: recordio.hpp:82
Definition: http.hpp:302
void _read(const process::http::Pipe::Reader &reader, const process::Future< Result< Event >> &event)
Definition: http_connection.hpp:427
Headers headers
Definition: http.hpp:533
friend std::ostream & operator<<(std::ostream &stream, State state)
Definition: http_connection.hpp:535