17 #ifndef __RESOURCE_PROVIDER_HTTP_CONNECTION_HPP__ 18 #define __RESOURCE_PROVIDER_HTTP_CONNECTION_HPP__ 20 #include <glog/logging.h> 64 template <
typename Call,
typename Event>
65 class HttpConnectionProcess
90 const std::function<
void(
void)>&
connected,
92 const std::function<
void(
const std::queue<Event>&)>& received)
94 state(State::DISCONNECTED),
95 contentType(_contentType),
98 detector(std::move(_detector)) {}
112 if (call.type() == Call::SUBSCRIBE && state != State::CONNECTED) {
117 "Cannot process 'SUBSCRIBE' call as the driver is in " 121 if (call.type() != Call::SUBSCRIBE && state != State::SUBSCRIBED) {
124 "Cannot process '" +
stringify(call.type()) +
"' call " 125 "as the driver is in state " +
stringify(state));
128 CHECK(state == State::CONNECTED || state == State::SUBSCRIBED);
131 VLOG(1) <<
"Sending " << call.type() <<
" call to " << endpoint.
get();
135 request.
url = endpoint.
get();
139 {
"Content-Type",
stringify(contentType)}};
142 request.
headers[
"Authorization"] =
"Bearer " + token.
get();
146 if (call.type() == Call::SUBSCRIBE) {
147 CHECK_EQ(State::CONNECTED, state);
148 state = State::SUBSCRIBING;
151 response = connections->subscribe.send(request,
true);
158 response = connections->nonSubscribe.send(request);
163 return response.
then(
173 detection = detector->detect(
None())
190 if (future.isFailed()) {
191 LOG(WARNING) <<
"Failed to detect an endpoint: " << future.failure();
200 case State::CONNECTING:
201 case State::DISCONNECTED:
203 case State::CONNECTED:
204 case State::SUBSCRIBING:
205 case State::SUBSCRIBED: {
216 if (future.isDiscarded()) {
217 LOG(INFO) <<
"Re-detecting endpoint";
220 }
else if (future->isNone()) {
221 LOG(INFO) <<
"Lost endpoint";
225 endpoint = future->get();
227 LOG(INFO) <<
"New endpoint detected at " << endpoint.
get();
234 detection = detector->detect(endpoint)
242 if (connectionId != _connectionId) {
243 VLOG(1) <<
"Ignoring connection attempt from stale connection";
248 CHECK_EQ(State::DISCONNECTED, state);
250 state = State::CONNECTING;
268 if (connectionId != _connectionId) {
269 VLOG(1) <<
"Ignoring connection attempt from stale connection";
273 CHECK_EQ(State::CONNECTING, state);
275 if (!_connections.isReady()) {
277 _connections.isFailed()
278 ? _connections.failure()
279 :
"Connection future discarded");
283 VLOG(1) <<
"Connected with the remote endpoint at " << endpoint.
get();
285 state = State::CONNECTED;
287 connections = Connections {
288 std::get<0>(_connections.get()),
289 std::get<1>(_connections.get())};
291 connections->subscribe.disconnected()
296 "Subscribe connection interrupted"));
298 connections->nonSubscribe.disconnected()
303 "Non-subscribe connection interrupted"));
316 if (connections.
isSome()) {
317 connections->subscribe.disconnect();
318 connections->nonSubscribe.disconnect();
321 if (subscribed.
isSome()) {
322 subscribed->reader.close();
325 state = State::DISCONNECTED;
327 connections =
None();
330 connectionId =
None();
337 if (connectionId != _connectionId) {
338 VLOG(1) <<
"Ignoring disconnection attempt from stale connection";
355 if (connectionId != _connectionId) {
359 CHECK(state == State::SUBSCRIBING || state == State::SUBSCRIBED) << state;
363 CHECK_EQ(Call::SUBSCRIBE, call.type());
367 state = State::SUBSCRIBED;
373 lambda::bind(deserialize<Event>, contentType, lambda::_1),
376 subscribed = SubscribedResponse(reader, std::move(decoder));
384 streamId = uuid.
get();
394 CHECK_NE(Call::SUBSCRIBE, call.type());
400 if (call.type() == Call::SUBSCRIBE) {
401 state = State::CONNECTED;
407 "Received '" + response.
status +
"' (" + response.
body +
")");
411 "Received unexpected '" + response.
status +
412 "' (" + response.
body +
")");
417 subscribed->decoder->read()
428 CHECK(!event.isDiscarded());
431 if (!subscribed.
isSome() || subscribed->reader != reader) {
432 VLOG(1) <<
"Ignoring event from old stale connection";
436 CHECK_EQ(State::SUBSCRIBED, state);
439 if (event.isFailed()) {
440 LOG(ERROR) <<
"Failed to decode stream of events: " 447 if (event->isNone()) {
448 const std::string
error =
"End-Of-File received";
455 if (event->isError()) {
456 LOG(ERROR) <<
"Failed to de-serialize event: " <<
event->error();
467 if (state != State::SUBSCRIBED) {
468 LOG(WARNING) <<
"Ignoring " <<
stringify(event.type())
469 <<
" event because we're no longer subscribed";
478 if (events.size() == 1) {
483 events = std::queue<Event>();
496 std::function<void(const std::queue<Event>&)> received;
505 struct SubscribedResponse
510 : reader(std::move(_reader)),
511 decoder(std::move(_decoder)) {}
514 SubscribedResponse(
const SubscribedResponse&) =
delete;
515 SubscribedResponse& operator=(
const SubscribedResponse&) =
delete;
516 SubscribedResponse& operator=(SubscribedResponse&&) =
default;
517 SubscribedResponse(SubscribedResponse&&) =
default;
532 friend std::ostream&
operator<<(std::ostream& stream, State state)
535 case State::DISCONNECTED:
return stream <<
"DISCONNECTED";
536 case State::CONNECTING:
return stream <<
"CONNECTING";
537 case State::CONNECTED:
return stream <<
"CONNECTED";
538 case State::SUBSCRIBING:
return stream <<
"SUBSCRIBING";
539 case State::SUBSCRIBED:
return stream <<
"SUBSCRIBED";
551 const Callbacks callbacks;
554 std::queue<Event> events;
570 #endif // __RESOURCE_PROVIDER_HTTP_CONNECTION_HPP__
void connected(const id::UUID &_connectionId, const process::Future< std::tuple< process::http::Connection, process::http::Connection >> &_connections)
Definition: http_connection.hpp:261
std::string generate(const std::string &prefix="")
Returns 'prefix(N)' where N represents the number of instances where the same prefix (wrt...
void disconnected(const id::UUID &_connectionId, const std::string &failure)
Definition: http_connection.hpp:334
Definition: nothing.hpp:16
ContentType
Definition: http.hpp:43
Option< Error > validate(const std::string &imageDir)
void unlock()
Definition: mutex.hpp:50
Future< Response > request(const Request &request, bool streamedResponse=false)
Asynchronously sends an HTTP request to the process and returns the HTTP response once the entire res...
ProcessBase(const std::string &id="")
T & get()&
Definition: try.hpp:80
URL url
Definition: http.hpp:544
std::string status
Definition: http.hpp:637
Definition: future.hpp:668
uint16_t code
Definition: http.hpp:674
constexpr const char * prefix
Definition: os.hpp:96
void receive(const Event &event)
Definition: http_connection.hpp:464
mesos::v1::scheduler::Call Call
Definition: mesos.hpp:2851
HTTP connection handler.
Definition: resource_provider.hpp:41
bool discard()
Definition: future.hpp:1157
Future< std::vector< T > > collect(const std::vector< Future< T >> &futures)
Definition: collect.hpp:274
bool isSome() const
Definition: option.hpp:116
void connect(const id::UUID &_connectionId)
Definition: http_connection.hpp:238
mesos::v1::scheduler::Event Event
Definition: mesos.hpp:2852
Option< Pipe::Reader > reader
Definition: http.hpp:672
HttpConnectionProcess< Call, Event > Self
Definition: http_connection.hpp:181
Provides RecordIO decoding on top of an http::Pipe::Reader.
Definition: recordio.hpp:62
process::Future< Nothing > send(const Call &call)
Definition: http_connection.hpp:100
#define CHECK_SOME(expression)
Definition: check.hpp:50
static UUID random()
Definition: uuid.hpp:38
std::string body
Definition: http.hpp:670
static const uint16_t SERVICE_UNAVAILABLE
Definition: http.hpp:266
HttpConnectionProcess(const std::string &prefix, process::Owned< EndpointDetector > _detector, ContentType _contentType, const Option< std::string > &_token, const std::function< Option< Error >(const Call &)> &validate, const std::function< void(void)> &connected, const std::function< void(void)> &disconnected, const std::function< void(const std::queue< Event > &)> &received)
Construct a HTTP connection process.
Definition: http_connection.hpp:84
const Future< T > & onAny(AnyCallback &&callback) const
Definition: future.hpp:1442
void detected(const process::Future< Option< process::http::URL >> &future)
Definition: http_connection.hpp:188
static const uint16_t ACCEPTED
Definition: http.hpp:233
const T & get() const &
Definition: option.hpp:119
bool keepAlive
Definition: http.hpp:558
static const uint16_t OK
Definition: http.hpp:231
void start()
Definition: http_connection.hpp:171
void dispatch(const UPID &pid, std::unique_ptr< lambda::CallableOnce< void(ProcessBase *)>> f, const Option< const std::type_info * > &functionType=None())
Future< typename result_of< F()>::type > async(const F &f, typename std::enable_if<!std::is_void< typename result_of< F()>::type >::value >::type *=nullptr)
Definition: async.hpp:238
#define UNREACHABLE()
Definition: unreachable.hpp:22
void finalize() override
Invoked when a process is terminated.
Definition: http_connection.hpp:183
Future< X > then(lambda::CallableOnce< Future< X >(const T &)> f) const
Definition: future.hpp:1576
Future< Nothing > lock()
Definition: mutex.hpp:33
Future< Connection > connect(const network::Address &address, Scheme scheme, const Option< std::string > &peer_hostname)
const std::string message
Definition: errorbase.hpp:46
Definition: attributes.hpp:24
std::string error(const std::string &msg, uint32_t code)
std::string method
Definition: http.hpp:538
Definition: executor.hpp:48
std::string body
Definition: http.hpp:578
PID< HttpConnectionProcess< Call, Event > > self() const
Returns the PID of the process.
Definition: process.hpp:514
bool isNone() const
Definition: option.hpp:117
process::Future< Nothing > _send(const id::UUID &_connectionId, const Call &call, const process::http::Response &response)
Definition: http_connection.hpp:348
std::string toString() const
Definition: uuid.hpp:87
Try< Nothing > bind(int_fd s, const Address &address)
Definition: network.hpp:46
void read()
Definition: http_connection.hpp:415
std::string serialize(ContentType contentType, const google::protobuf::Message &message)
static const uint16_t NOT_FOUND
Definition: http.hpp:249
std::string stringify(int flags)
Headers headers
Definition: http.hpp:639
void disconnect()
Definition: http_connection.hpp:314
Definition: process.hpp:505
enum process::http::Response::@4 type
Deferred< void()> defer(const PID< T > &pid, void(T::*method)())
Definition: defer.hpp:35
bool contains(const Key &key) const
Definition: hashmap.hpp:86
static Try< UUID > fromString(const std::string &s)
Definition: uuid.hpp:67
Represents a connection to an HTTP server.
Definition: http.hpp:965
void _read(const process::http::Pipe::Reader &reader, const process::Future< Result< Event >> &event)
Definition: http_connection.hpp:424
Headers headers
Definition: http.hpp:546
friend std::ostream & operator<<(std::ostream &stream, State state)
Definition: http_connection.hpp:532