13 #ifndef __PROCESS_GRPC_HPP__ 14 #define __PROCESS_GRPC_HPP__ 21 #include <type_traits> 24 #include <google/protobuf/message.h> 26 #include <grpcpp/grpcpp.h> 49 #define GRPC_CLIENT_METHOD(service, rpc) \ 50 (&service::Stub::PrepareAsync##rpc) 54 std::ostream&
operator<<(std::ostream& stream, StatusCode statusCode);
71 (_status.error_message().empty()
72 ?
"" :
": " + _status.error_message())),
91 template <
typename Stub,
typename Request,
typename Response>
93 std::unique_ptr<::grpc::ClientAsyncResponseReader<Response>>(Stub::*)(
94 ::grpc::ClientContext*,
96 ::grpc::CompletionQueue*)>
116 const std::string&
uri,
117 const std::shared_ptr<::grpc::ChannelCredentials>& credentials =
118 ::grpc::InsecureChannelCredentials())
119 : channel(::
grpc::CreateChannel(uri, credentials)) {}
121 explicit Connection(std::shared_ptr<::grpc::Channel> _channel)
122 : channel(
std::move(_channel)) {}
124 const std::shared_ptr<::grpc::Channel>
channel;
136 bool wait_for_ready =
true;
184 typename std::enable_if<
187 google::protobuf::Message*>::value,
198 std::shared_ptr<Promise<Try<Response, StatusError>>>
promise(
206 [connection, method, options, promise](
209 ::grpc::CompletionQueue* queue) {
211 promise->fail(
"Runtime has been terminated");
217 std::shared_ptr<::grpc::ClientContext>
context(
218 new ::grpc::ClientContext());
227 std::chrono::time_point_cast<std::chrono::system_clock::duration>(
228 std::chrono::system_clock::now() +
229 std::chrono::nanoseconds(options.
timeout.
ns()));
231 context->set_deadline(time_point);
233 promise->future().onDiscard([=] {
context->TryCancel(); });
235 std::shared_ptr<Response> response(
new Response());
236 std::shared_ptr<::grpc::Status>
status(new ::grpc::Status());
238 std::shared_ptr<::grpc::ClientAsyncResponseReader<Response>> reader =
252 [
context, reader, response, status, promise]() {
254 if (promise->future().hasDiscard()) {
257 promise->set(status->ok()
258 ? std::move(*response)
263 reader->Finish(response.get(), status.get(), tag);
265 std::forward<Request>(
request),
293 class RuntimeProcess :
public Process<RuntimeProcess>
297 ~RuntimeProcess()
override;
299 void send(SendCallback callback);
300 void receive(ReceiveCallback callback);
310 ::grpc::CompletionQueue queue;
311 std::unique_ptr<std::thread> looper;
325 std::shared_ptr<Data> data;
333 #endif // __PROCESS_GRPC_HPP__ Definition: errorbase.hpp:36
#define CHECK_PENDING(expression)
Definition: check.hpp:26
void finalize(bool finalize_wsa=false)
Clean up the library.
Future< Response > request(const Request &request, bool streamedResponse=false)
Asynchronously sends an HTTP request to the process and returns the HTTP response once the entire res...
const std::conditional< std::is_same< E, Error >::value, std::string, E >::type & error() const
Definition: try.hpp:97
Result< ProcessStatus > status(pid_t pid)
Definition: proc.hpp:166
Definition: type_utils.hpp:619
A copyable interface to manage a connection to a gRPC server.
Definition: grpc.hpp:112
Definition: duration.hpp:32
void terminate(const UPID &pid, bool inject=true)
Sends a TerminateEvent to the given process.
Defines the gRPC options for each call.
Definition: grpc.hpp:131
std::ostream & operator<<(std::ostream &stream, StatusCode statusCode)
Stub stub_type
Definition: grpc.hpp:98
Connection(std::shared_ptr<::grpc::Channel > _channel)
Definition: grpc.hpp:121
bool wait_for_ready
Definition: grpc.hpp:136
Try< Nothing > initialize(const Flags &flags)
Initialized state for support of systemd functions in this file.
Definition: duration.hpp:207
Request request_type
Definition: grpc.hpp:99
A copyable interface to manage an internal runtime process for asynchronous gRPC calls.
Definition: grpc.hpp:157
int64_t ns() const
Definition: duration.hpp:46
Definition: future.hpp:74
Protocol< PromiseRequest, PromiseResponse > promise
Runtime()
Definition: grpc.hpp:160
Future< V > loop(const Option< UPID > &pid, Iterate &&iterate, Body &&body)
Definition: loop.hpp:456
const std::shared_ptr<::grpc::Channel > channel
Definition: grpc.hpp:124
bool wait(const UPID &pid, const Duration &duration=Seconds(-1))
Wait for the process to exit for no more than the specified seconds.
void dispatch(const UPID &pid, std::unique_ptr< lambda::CallableOnce< void(ProcessBase *)>> f, const Option< const std::type_info * > &functionType=None())
Represents errors caused by non-OK gRPC statuses.
Definition: grpc.hpp:66
const ::grpc::Status status
Definition: grpc.hpp:78
Definition: attributes.hpp:24
Connection(const std::string &uri, const std::shared_ptr<::grpc::ChannelCredentials > &credentials=::grpc::InsecureChannelCredentials())
Definition: grpc.hpp:115
Definition: executor.hpp:48
Try< uint32_t > type(const std::string &path)
Try< Nothing > bind(int_fd s, const Address &address)
Definition: network.hpp:46
Response response_type
Definition: grpc.hpp:100
mesos::v1::scheduler::Response Response
Definition: mesos.hpp:2854
StatusError(::grpc::Status _status)
Definition: grpc.hpp:69
std::string stringify(int flags)
Duration timeout
Definition: grpc.hpp:141
Definition: process.hpp:505
Future< Try< Response, StatusError > > call(const Connection &connection, Method &&method, Request &&request, const CallOptions &options)
Sends an asynchronous gRPC call.
Definition: grpc.hpp:189
Definition: lambda.hpp:414
Definition: future.hpp:58
Future< size_t > send(const int_fd &fd, const void *buf, size_t size)