Apache Mesos
http_connection.hpp
Go to the documentation of this file.
1 // Licensed to the Apache Software Foundation (ASF) under one
2 // or more contributor license agreements. See the NOTICE file
3 // distributed with this work for additional information
4 // regarding copyright ownership. The ASF licenses this file
5 // to you under the Apache License, Version 2.0 (the
6 // "License"); you may not use this file except in compliance
7 // with the License. You may obtain a copy of the License at
8 //
9 // http://www.apache.org/licenses/LICENSE-2.0
10 //
11 // Unless required by applicable law or agreed to in writing, software
12 // distributed under the License is distributed on an "AS IS" BASIS,
13 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 // See the License for the specific language governing permissions and
15 // limitations under the License.
16 
17 #ifndef __RESOURCE_PROVIDER_HTTP_CONNECTION_HPP__
18 #define __RESOURCE_PROVIDER_HTTP_CONNECTION_HPP__
19 
20 #include <glog/logging.h>
21 
22 #include <functional>
23 #include <ostream>
24 #include <string>
25 #include <tuple>
26 #include <queue>
27 #include <utility>
28 
29 #include <mesos/http.hpp>
30 
31 #include <mesos/v1/mesos.hpp>
32 
33 #include <process/async.hpp>
34 #include <process/collect.hpp>
35 #include <process/defer.hpp>
36 #include <process/dispatch.hpp>
37 #include <process/future.hpp>
38 #include <process/http.hpp>
39 #include <process/mutex.hpp>
40 #include <process/owned.hpp>
41 #include <process/process.hpp>
42 
43 #include <stout/duration.hpp>
44 #include <stout/nothing.hpp>
45 #include <stout/recordio.hpp>
46 #include <stout/result.hpp>
47 #include <stout/unreachable.hpp>
48 #include <stout/uuid.hpp>
49 
50 #include "common/http.hpp"
51 #include "common/recordio.hpp"
52 
54 
55 namespace mesos {
56 namespace internal {
57 
64 template <typename Call, typename Event>
65 class HttpConnectionProcess
66  : public process::Process<HttpConnectionProcess<Call, Event>>
67 {
68 public:
85  const std::string& prefix,
87  ContentType _contentType,
88  const Option<std::string>& _token,
89  const std::function<Option<Error>(const Call&)>& validate,
90  const std::function<void(void)>& connected,
91  const std::function<void(void)>& disconnected,
92  const std::function<void(const std::queue<Event>&)>& received)
93  : process::ProcessBase(process::ID::generate(prefix)),
94  state(State::DISCONNECTED),
95  contentType(_contentType),
96  token(_token),
97  callbacks {validate, connected, disconnected, received},
98  detector(std::move(_detector)) {}
99 
101  {
102  Option<Error> error = callbacks.validate(call);
103 
104  if (error.isSome()) {
105  return process::Failure(error->message);
106  }
107 
108  if (endpoint.isNone()) {
109  return process::Failure("Not connected to an endpoint");
110  }
111 
112  if (call.type() == Call::SUBSCRIBE && state != State::CONNECTED) {
113  // It might be possible that the client is retrying. We drop the
114  // request if we have an ongoing subscribe request in flight or
115  // if the client is already subscribed.
116  return process::Failure(
117  "Cannot process 'SUBSCRIBE' call as the driver is in "
118  "state " + stringify(state));
119  }
120 
121  if (call.type() != Call::SUBSCRIBE && state != State::SUBSCRIBED) {
122  // We drop all non-subscribe calls if we are not currently subscribed.
123  return process::Failure(
124  "Cannot process '" + stringify(call.type()) + "' call "
125  "as the driver is in state " + stringify(state));
126  }
127 
128  CHECK(state == State::CONNECTED || state == State::SUBSCRIBED);
129  CHECK_SOME(connections);
130 
131  VLOG(1) << "Sending " << call.type() << " call to " << endpoint.get();
132 
134  request.method = "POST";
135  request.url = endpoint.get();
136  request.body = serialize(contentType, call);
137  request.keepAlive = true;
138  request.headers = {{"Accept", stringify(contentType)},
139  {"Content-Type", stringify(contentType)}};
140 
141  if (token.isSome()) {
142  request.headers["Authorization"] = "Bearer " + token.get();
143  }
144 
146  if (call.type() == Call::SUBSCRIBE) {
147  CHECK_EQ(State::CONNECTED, state);
148  state = State::SUBSCRIBING;
149 
150  // Send a streaming request for Subscribe call.
151  response = connections->subscribe.send(request, true);
152  } else {
153  if (streamId.isSome()) {
154  // Set the stream ID associated with this connection.
155  request.headers["Mesos-Stream-Id"] = streamId->toString();
156  }
157 
158  response = connections->nonSubscribe.send(request);
159  }
160 
161  CHECK_SOME(connectionId);
162 
163  return response.then(
164  defer(self(),
165  &Self::_send,
166  connectionId.get(),
167  call,
168  lambda::_1));
169  }
170 
171  void start()
172  {
173  detection = detector->detect(None())
174  .onAny(defer(self(), &Self::detected, lambda::_1));
175  }
176 
177 protected:
178  // Because we're deriving from a templated base class, we have
179  // to explicitly bring these hidden base class names into scope.
182 
183  void finalize() override
184  {
185  disconnect();
186  }
187 
189  {
190  if (future.isFailed()) {
191  LOG(WARNING) << "Failed to detect an endpoint: " << future.failure();
192 
193  // TODO(nfnt): A non-retryable error might be the reason for the
194  // failed future. In that case the client should be informed
195  // about this error and the URL dectection aborted.
196  }
197 
198  // Invoke the disconnected callback if we were previously connected.
199  switch (state) {
200  case State::CONNECTING:
201  case State::DISCONNECTED:
202  break;
203  case State::CONNECTED:
204  case State::SUBSCRIBING:
205  case State::SUBSCRIBED: {
206  mutex.lock()
207  .then(defer(self(), [this]() {
208  return process::async(callbacks.disconnected);
209  }))
210  .onAny(lambda::bind(&process::Mutex::unlock, mutex));
211  }
212  }
213 
214  disconnect();
215 
216  if (future.isDiscarded()) {
217  LOG(INFO) << "Re-detecting endpoint";
218 
219  endpoint = None();
220  } else if (future->isNone()) {
221  LOG(INFO) << "Lost endpoint";
222 
223  endpoint = None();
224  } else {
225  endpoint = future->get();
226 
227  LOG(INFO) << "New endpoint detected at " << endpoint.get();
228 
229  connectionId = id::UUID::random();
230 
231  dispatch(self(), &Self::connect, connectionId.get());
232  }
233 
234  detection = detector->detect(endpoint)
235  .onAny(defer(self(), &Self::detected, lambda::_1));
236  }
237 
238  void connect(const id::UUID& _connectionId)
239  {
240  // It is possible that a new endpoint was detected while we were
241  // waiting to establish a connection with the old master.
242  if (connectionId != _connectionId) {
243  VLOG(1) << "Ignoring connection attempt from stale connection";
244  return;
245  }
246 
247  CHECK_SOME(endpoint);
248  CHECK_EQ(State::DISCONNECTED, state);
249 
250  state = State::CONNECTING;
251 
252  // We create two persistent connections here, one for subscribe
253  // call/streaming response and another for non-subscribe
254  // calls/responses.
255  collect(
256  process::http::connect(endpoint.get()),
257  process::http::connect(endpoint.get()))
258  .onAny(defer(self(), &Self::connected, connectionId.get(), lambda::_1));
259  }
260 
261  void connected(
262  const id::UUID& _connectionId,
263  const process::Future<std::tuple<
265  {
266  // It is possible that a new endpoint was detected while we had an
267  // ongoing (re-)connection attempt with the old endpoint.
268  if (connectionId != _connectionId) {
269  VLOG(1) << "Ignoring connection attempt from stale connection";
270  return;
271  }
272 
273  CHECK_EQ(State::CONNECTING, state);
274 
275  if (!_connections.isReady()) {
276  disconnected(connectionId.get(),
277  _connections.isFailed()
278  ? _connections.failure()
279  : "Connection future discarded");
280  return;
281  }
282 
283  VLOG(1) << "Connected with the remote endpoint at " << endpoint.get();
284 
285  state = State::CONNECTED;
286 
287  connections = Connections {
288  std::get<0>(_connections.get()),
289  std::get<1>(_connections.get())};
290 
291  connections->subscribe.disconnected()
292  .onAny(defer(
293  self(),
295  connectionId.get(),
296  "Subscribe connection interrupted"));
297 
298  connections->nonSubscribe.disconnected()
299  .onAny(defer(
300  self(),
302  connectionId.get(),
303  "Non-subscribe connection interrupted"));
304 
305  // Invoke the connected callback once we have established both
306  // subscribe and non-subscribe connections with the master.
307  mutex.lock()
308  .then(defer(self(), [this]() {
309  return process::async(callbacks.connected);
310  }))
311  .onAny(lambda::bind(&process::Mutex::unlock, mutex));
312  }
313 
314  void disconnect()
315  {
316  if (connections.isSome()) {
317  connections->subscribe.disconnect();
318  connections->nonSubscribe.disconnect();
319  }
320 
321  if (subscribed.isSome()) {
322  subscribed->reader.close();
323  }
324 
325  state = State::DISCONNECTED;
326 
327  connections = None();
328  subscribed = None();
329  endpoint = None();
330  connectionId = None();
331  detection.discard();
332  }
333 
334  void disconnected(const id::UUID& _connectionId, const std::string& failure)
335  {
336  // Ignore if the disconnection happened from an old stale connection.
337  if (connectionId != _connectionId) {
338  VLOG(1) << "Ignoring disconnection attempt from stale connection";
339  return;
340  }
341 
342  // We can reach here if we noticed a disconnection for either of
343  // subscribe/non-subscribe connections. We discard the future here
344  // to trigger an endpoint re-detection.
345  detection.discard();
346  }
347 
349  const id::UUID& _connectionId,
350  const Call& call,
351  const process::http::Response& response)
352  {
353  // It is possible that we detected a new endpoint before a
354  // response could be received.
355  if (connectionId != _connectionId) {
356  return process::Failure("Ignoring response from stale connection");
357  }
358 
359  CHECK(state == State::SUBSCRIBING || state == State::SUBSCRIBED) << state;
360 
361  if (response.code == process::http::Status::OK) {
362  // Only SUBSCRIBE call should get a "200 OK" response.
363  CHECK_EQ(Call::SUBSCRIBE, call.type());
364  CHECK_EQ(process::http::Response::PIPE, response.type);
365  CHECK_SOME(response.reader);
366 
367  state = State::SUBSCRIBED;
368 
369  process::http::Pipe::Reader reader = response.reader.get();
370 
373  lambda::bind(deserialize<Event>, contentType, lambda::_1),
374  reader));
375 
376  subscribed = SubscribedResponse(reader, std::move(decoder));
377 
378  if (response.headers.contains("Mesos-Stream-Id")) {
379  Try<id::UUID> uuid =
380  id::UUID::fromString(response.headers.at("Mesos-Stream-Id"));
381 
382  CHECK_SOME(uuid);
383 
384  streamId = uuid.get();
385  }
386 
387  read();
388 
389  return Nothing();
390  }
391 
392  if (response.code == process::http::Status::ACCEPTED) {
393  // Only non SUBSCRIBE calls should get a "202 Accepted" response.
394  CHECK_NE(Call::SUBSCRIBE, call.type());
395  return Nothing();
396  }
397 
398  // We reset the state to connected if the subscribe call did not
399  // succceed. We can then retry the subscribe call.
400  if (call.type() == Call::SUBSCRIBE) {
401  state = State::CONNECTED;
402  }
403 
406  return process::Failure(
407  "Received '" + response.status + "' (" + response.body + ")");
408  }
409 
410  return process::Failure(
411  "Received unexpected '" + response.status +
412  "' (" + response.body + ")");
413  }
414 
415  void read()
416  {
417  subscribed->decoder->read()
418  .onAny(defer(self(),
419  &Self::_read,
420  subscribed->reader,
421  lambda::_1));
422  }
423 
424  void _read(
425  const process::http::Pipe::Reader& reader,
426  const process::Future<Result<Event>>& event)
427  {
428  CHECK(!event.isDiscarded());
429 
430  // Ignore enqueued events from the previous Subscribe call reader.
431  if (!subscribed.isSome() || subscribed->reader != reader) {
432  VLOG(1) << "Ignoring event from old stale connection";
433  return;
434  }
435 
436  CHECK_EQ(State::SUBSCRIBED, state);
437  CHECK_SOME(connectionId);
438 
439  if (event.isFailed()) {
440  LOG(ERROR) << "Failed to decode stream of events: "
441  << event.failure();
442 
443  disconnected(connectionId.get(), event.failure());
444  return;
445  }
446 
447  if (event->isNone()) {
448  const std::string error = "End-Of-File received";
449  LOG(ERROR) << error;
450 
451  disconnected(connectionId.get(), error);
452  return;
453  }
454 
455  if (event->isError()) {
456  LOG(ERROR) << "Failed to de-serialize event: " << event->error();
457  } else {
458  receive(event->get());
459  }
460 
461  read();
462  }
463 
464  void receive(const Event& event)
465  {
466  // Check if we're are no longer subscribed but received an event.
467  if (state != State::SUBSCRIBED) {
468  LOG(WARNING) << "Ignoring " << stringify(event.type())
469  << " event because we're no longer subscribed";
470  return;
471  }
472 
473  // Queue up the event and invoke the 'received' callback if this
474  // is the first event (between now and when the 'received'
475  // callback actually gets invoked more events might get queued).
476  events.push(event);
477 
478  if (events.size() == 1) {
479  mutex.lock()
480  .then(defer(self(), [this]() {
481  process::Future<Nothing> future =
482  process::async(callbacks.received, events);
483  events = std::queue<Event>();
484  return future;
485  }))
486  .onAny(lambda::bind(&process::Mutex::unlock, mutex));
487  }
488  }
489 
490 private:
491  struct Callbacks
492  {
493  std::function<Option<Error>(const Call&)> validate;
494  std::function<void(void)> connected;
495  std::function<void(void)> disconnected;
496  std::function<void(const std::queue<Event>&)> received;
497  };
498 
499  struct Connections
500  {
501  process::http::Connection subscribe;
502  process::http::Connection nonSubscribe;
503  };
504 
505  struct SubscribedResponse
506  {
507  SubscribedResponse(
510  : reader(std::move(_reader)),
511  decoder(std::move(_decoder)) {}
512 
513  // The decoder cannot be copied meaningfully, see MESOS-5122.
514  SubscribedResponse(const SubscribedResponse&) = delete;
515  SubscribedResponse& operator=(const SubscribedResponse&) = delete;
516  SubscribedResponse& operator=(SubscribedResponse&&) = default;
517  SubscribedResponse(SubscribedResponse&&) = default;
518 
521  };
522 
523  enum class State
524  {
525  DISCONNECTED, // Either of subscribe/non-subscribe connection is broken.
526  CONNECTING, // Trying to establish subscribe and non-subscribe connections.
527  CONNECTED, // Established subscribe and non-subscribe connections.
528  SUBSCRIBING, // Trying to subscribe with the remote endpoint.
529  SUBSCRIBED // Subscribed with the remote endpoint.
530  };
531 
532  friend std::ostream& operator<<(std::ostream& stream, State state)
533  {
534  switch (state) {
535  case State::DISCONNECTED: return stream << "DISCONNECTED";
536  case State::CONNECTING: return stream << "CONNECTING";
537  case State::CONNECTED: return stream << "CONNECTED";
538  case State::SUBSCRIBING: return stream << "SUBSCRIBING";
539  case State::SUBSCRIBED: return stream << "SUBSCRIBED";
540  }
541 
542  UNREACHABLE();
543  }
544 
545  State state;
546  Option<Connections> connections;
547  Option<SubscribedResponse> subscribed;
549  const mesos::ContentType contentType;
550  Option<std::string> token;
551  const Callbacks callbacks;
552  process::Mutex mutex; // Used to serialize the callback invocations.
554  std::queue<Event> events;
555 
556  // There can be multiple simulataneous ongoing (re-)connection
557  // attempts with the remote endpoint (e.g., the endpoint failed over
558  // while an attempt was in progress). This helps us in uniquely
559  // identifying the current connection instance and ignoring the
560  // stale instance.
561  Option<id::UUID> connectionId;
562  Option<id::UUID> streamId;
563 
565 };
566 
567 } // namespace internal {
568 } // namespace mesos {
569 
570 #endif // __RESOURCE_PROVIDER_HTTP_CONNECTION_HPP__
void connected(const id::UUID &_connectionId, const process::Future< std::tuple< process::http::Connection, process::http::Connection >> &_connections)
Definition: http_connection.hpp:261
std::string generate(const std::string &prefix="")
Returns &#39;prefix(N)&#39; where N represents the number of instances where the same prefix (wrt...
void disconnected(const id::UUID &_connectionId, const std::string &failure)
Definition: http_connection.hpp:334
Definition: nothing.hpp:16
ContentType
Definition: http.hpp:43
Option< Error > validate(const std::string &imageDir)
void unlock()
Definition: mutex.hpp:50
Future< Response > request(const Request &request, bool streamedResponse=false)
Asynchronously sends an HTTP request to the process and returns the HTTP response once the entire res...
ProcessBase(const std::string &id="")
T & get()&
Definition: try.hpp:80
URL url
Definition: http.hpp:544
Definition: check.hpp:33
std::string status
Definition: http.hpp:637
Definition: future.hpp:668
Definition: http.hpp:667
uint16_t code
Definition: http.hpp:674
constexpr const char * prefix
Definition: os.hpp:96
void receive(const Event &event)
Definition: http_connection.hpp:464
mesos::v1::scheduler::Call Call
Definition: mesos.hpp:2851
HTTP connection handler.
Definition: resource_provider.hpp:41
bool discard()
Definition: future.hpp:1157
Definition: check.hpp:30
Future< std::vector< T > > collect(const std::vector< Future< T >> &futures)
Definition: collect.hpp:274
bool isSome() const
Definition: option.hpp:116
void connect(const id::UUID &_connectionId)
Definition: http_connection.hpp:238
Definition: http.hpp:533
mesos::v1::scheduler::Event Event
Definition: mesos.hpp:2852
Option< Pipe::Reader > reader
Definition: http.hpp:672
HttpConnectionProcess< Call, Event > Self
Definition: http_connection.hpp:181
Provides RecordIO decoding on top of an http::Pipe::Reader.
Definition: recordio.hpp:62
process::Future< Nothing > send(const Call &call)
Definition: http_connection.hpp:100
#define CHECK_SOME(expression)
Definition: check.hpp:50
static UUID random()
Definition: uuid.hpp:38
std::string body
Definition: http.hpp:670
static const uint16_t SERVICE_UNAVAILABLE
Definition: http.hpp:266
HttpConnectionProcess(const std::string &prefix, process::Owned< EndpointDetector > _detector, ContentType _contentType, const Option< std::string > &_token, const std::function< Option< Error >(const Call &)> &validate, const std::function< void(void)> &connected, const std::function< void(void)> &disconnected, const std::function< void(const std::queue< Event > &)> &received)
Construct a HTTP connection process.
Definition: http_connection.hpp:84
const Future< T > & onAny(AnyCallback &&callback) const
Definition: future.hpp:1442
void detected(const process::Future< Option< process::http::URL >> &future)
Definition: http_connection.hpp:188
Definition: uuid.hpp:35
Definition: agent.hpp:25
static const uint16_t ACCEPTED
Definition: http.hpp:233
const T & get() const &
Definition: option.hpp:119
bool keepAlive
Definition: http.hpp:558
static const uint16_t OK
Definition: http.hpp:231
void start()
Definition: http_connection.hpp:171
void dispatch(const UPID &pid, std::unique_ptr< lambda::CallableOnce< void(ProcessBase *)>> f, const Option< const std::type_info * > &functionType=None())
Future< typename result_of< F()>::type > async(const F &f, typename std::enable_if<!std::is_void< typename result_of< F()>::type >::value >::type *=nullptr)
Definition: async.hpp:238
#define UNREACHABLE()
Definition: unreachable.hpp:22
void finalize() override
Invoked when a process is terminated.
Definition: http_connection.hpp:183
Future< X > then(lambda::CallableOnce< Future< X >(const T &)> f) const
Definition: future.hpp:1576
Future< Nothing > lock()
Definition: mutex.hpp:33
Future< Connection > connect(const network::Address &address, Scheme scheme, const Option< std::string > &peer_hostname)
const std::string message
Definition: errorbase.hpp:46
Definition: none.hpp:27
Definition: attributes.hpp:24
std::string error(const std::string &msg, uint32_t code)
std::string method
Definition: http.hpp:538
Definition: executor.hpp:48
Definition: http.hpp:612
std::string body
Definition: http.hpp:578
PID< HttpConnectionProcess< Call, Event > > self() const
Returns the PID of the process.
Definition: process.hpp:514
bool isNone() const
Definition: option.hpp:117
process::Future< Nothing > _send(const id::UUID &_connectionId, const Call &call, const process::http::Response &response)
Definition: http_connection.hpp:348
std::string toString() const
Definition: uuid.hpp:87
Definition: mutex.hpp:28
Try< Nothing > bind(int_fd s, const Address &address)
Definition: network.hpp:46
void read()
Definition: http_connection.hpp:415
std::string serialize(ContentType contentType, const google::protobuf::Message &message)
static const uint16_t NOT_FOUND
Definition: http.hpp:249
std::string stringify(int flags)
Definition: owned.hpp:36
Headers headers
Definition: http.hpp:639
void disconnect()
Definition: http_connection.hpp:314
Definition: process.hpp:505
enum process::http::Response::@4 type
Deferred< void()> defer(const PID< T > &pid, void(T::*method)())
Definition: defer.hpp:35
bool contains(const Key &key) const
Definition: hashmap.hpp:86
static Try< UUID > fromString(const std::string &s)
Definition: uuid.hpp:67
Represents a connection to an HTTP server.
Definition: http.hpp:965
Definition: http.hpp:315
void _read(const process::http::Pipe::Reader &reader, const process::Future< Result< Event >> &event)
Definition: http_connection.hpp:424
Headers headers
Definition: http.hpp:546
friend std::ostream & operator<<(std::ostream &stream, State state)
Definition: http_connection.hpp:532