Apache Mesos
grpc.hpp
Go to the documentation of this file.
1 // Licensed under the Apache License, Version 2.0 (the "License");
2 // you may not use this file except in compliance with the License.
3 // You may obtain a copy of the License at
4 //
5 // http://www.apache.org/licenses/LICENSE-2.0
6 //
7 // Unless required by applicable law or agreed to in writing, software
8 // distributed under the License is distributed on an "AS IS" BASIS,
9 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
10 // See the License for the specific language governing permissions and
11 // limitations under the License
12 
13 #ifndef __PROCESS_GRPC_HPP__
14 #define __PROCESS_GRPC_HPP__
15 
16 #include <chrono>
17 #include <memory>
18 #include <string>
19 #include <thread>
20 #include <type_traits>
21 #include <utility>
22 
23 #include <google/protobuf/message.h>
24 
25 #include <grpcpp/grpcpp.h>
26 
27 #include <process/check.hpp>
28 #include <process/dispatch.hpp>
29 #include <process/future.hpp>
30 #include <process/pid.hpp>
31 #include <process/process.hpp>
32 
33 #include <stout/duration.hpp>
34 #include <stout/error.hpp>
35 #include <stout/lambda.hpp>
36 #include <stout/nothing.hpp>
37 #include <stout/try.hpp>
38 
39 
40 // This file provides libprocess "support" for using gRPC. In particular, it
41 // defines two wrapper classes: `client::Connection` which represents a
42 // connection to a gRPC server, and `client::Runtime` which integrates an event
43 // loop waiting for gRPC responses and provides the `call` interface to create
44 // an asynchronous gRPC call and return a `Future`.
45 
46 
47 #define GRPC_CLIENT_METHOD(service, rpc) \
48  (&service::Stub::PrepareAsync##rpc)
49 
50 namespace process {
51 namespace grpc {
52 
57 class StatusError : public Error
58 {
59 public:
60  StatusError(::grpc::Status _status)
61  : Error(_status.error_message()), status(std::move(_status))
62  {
63  CHECK(!status.ok());
64  }
65 
66  const ::grpc::Status status;
67 };
68 
69 
70 namespace client {
71 
72 // Internal helper utilities.
73 namespace internal {
74 
75 template <typename T>
76 struct MethodTraits; // Undefined.
77 
78 
79 template <typename Stub, typename Request, typename Response>
80 struct MethodTraits<
81  std::unique_ptr<::grpc::ClientAsyncResponseReader<Response>>(Stub::*)(
82  ::grpc::ClientContext*,
83  const Request&,
84  ::grpc::CompletionQueue*)>
85 {
86  typedef Stub stub_type;
87  typedef Request request_type;
89 };
90 
91 } // namespace internal {
92 
93 
101 {
102 public:
104  const std::string& uri,
105  const std::shared_ptr<::grpc::ChannelCredentials>& credentials =
106  ::grpc::InsecureChannelCredentials())
107  : channel(::grpc::CreateChannel(uri, credentials)) {}
108 
109  explicit Connection(std::shared_ptr<::grpc::Channel> _channel)
110  : channel(std::move(_channel)) {}
111 
112  const std::shared_ptr<::grpc::Channel> channel;
113 };
114 
115 
120 {
121  // Enable the gRPC wait-for-ready semantics by default so the call will be
122  // retried if the connection is not ready. See:
123  // https://github.com/grpc/grpc/blob/master/doc/wait-for-ready.md
124  bool wait_for_ready = true;
125 
126  // The timeout of the call. A `DEADLINE_EXCEEDED` status will be returned if
127  // there is no response in the specified amount of time. This is required to
128  // avoid the call from being pending forever.
129  Duration timeout = Seconds(60);
130 };
131 
132 
145 class Runtime
146 {
147 public:
148  Runtime() : data(new Data()) {}
149 
166  template <
167  typename Method,
168  typename Request =
170  typename Response =
172  typename std::enable_if<
173  std::is_convertible<
174  typename std::decay<Request>::type*,
175  google::protobuf::Message*>::value,
176  int>::type = 0>
178  const Connection& connection,
179  Method&& method,
180  Request&& request,
181  const CallOptions& options)
182  {
183  // Create a `Promise` that will be set upon receiving a response.
184  // TODO(chhsiao): The `Promise` in the `shared_ptr` is not shared, but only
185  // to be captured by the lambda below. Use a `unique_ptr` once we get C++14.
186  std::shared_ptr<Promise<Try<Response, StatusError>>> promise(
188  Future<Try<Response, StatusError>> future = promise->future();
189 
190  // Send the request in the internal runtime process.
191  // TODO(chhsiao): We use `std::bind` here to forward `request` to avoid an
192  // extra copy. We should capture it by forwarding once we get C++14.
194  [connection, method, options, promise](
195  const Request& request,
196  bool terminating,
197  ::grpc::CompletionQueue* queue) {
198  if (terminating) {
199  promise->fail("Runtime has been terminated");
200  return;
201  }
202 
203  // TODO(chhsiao): The `shared_ptr`s here aren't shared, but only to be
204  // captured by the lambda below. Use `unique_ptr`s once we get C++14.
205  std::shared_ptr<::grpc::ClientContext> context(
206  new ::grpc::ClientContext());
207 
208  context->set_wait_for_ready(options.wait_for_ready);
209 
210  // We need to ensure that we're using a
211  // `std::chrono::system_clock::time_point` because `grpc::TimePoint`
212  // provides a specialization only for this type and we cannot
213  // guarantee that the operation below will always result in this type.
214  auto time_point =
215  std::chrono::time_point_cast<std::chrono::system_clock::duration>(
216  std::chrono::system_clock::now() +
217  std::chrono::nanoseconds(options.timeout.ns()));
218 
219  context->set_deadline(time_point);
220 
221  promise->future().onDiscard([=] { context->TryCancel(); });
222 
223  std::shared_ptr<Response> response(new Response());
224  std::shared_ptr<::grpc::Status> status(new ::grpc::Status());
225 
226  std::shared_ptr<::grpc::ClientAsyncResponseReader<Response>> reader =
228  connection.channel).*method)(context.get(), request, queue);
229 
230  reader->StartCall();
231 
232  // Create a `ReceiveCallback` as a tag in the `CompletionQueue` for
233  // the current asynchronous gRPC call. The callback will set up the
234  // above `Promise` upon receiving a response.
235  // NOTE: `context` and `reader` need to be held on in order to get
236  // updates for the ongoing RPC, and thus are captured here. The
237  // callback itself will later be retrieved and managed in the
238  // looper thread.
239  void* tag = new ReceiveCallback(
240  [context, reader, response, status, promise]() {
241  CHECK_PENDING(promise->future());
242  if (promise->future().hasDiscard()) {
243  promise->discard();
244  } else {
245  promise->set(status->ok()
246  ? std::move(*response)
247  : Try<Response, StatusError>::error(std::move(*status)));
248  }
249  });
250 
251  reader->Finish(response.get(), status.get(), tag);
252  },
253  std::forward<Request>(request),
254  lambda::_1,
255  lambda::_2));
256 
257  return future;
258  }
259 
265  void terminate();
266 
273 
274 private:
275  // Type of the callback functions that can get invoked when sending a request
276  // or receiving a response.
277  typedef lambda::CallableOnce<
278  void(bool, ::grpc::CompletionQueue*)> SendCallback;
280 
281  class RuntimeProcess : public Process<RuntimeProcess>
282  {
283  public:
284  RuntimeProcess();
285  ~RuntimeProcess() override;
286 
287  void send(SendCallback callback);
288  void receive(ReceiveCallback callback);
289  void terminate();
291 
292  private:
293  void initialize() override;
294  void finalize() override;
295 
296  void loop();
297 
298  ::grpc::CompletionQueue queue;
299  std::unique_ptr<std::thread> looper;
300  bool terminating;
301  Promise<Nothing> terminated;
302  };
303 
304  struct Data
305  {
306  Data();
307  ~Data();
308 
310  Future<Nothing> terminated;
311  };
312 
313  std::shared_ptr<Data> data;
314 };
315 
316 } // namespace client {
317 
318 } // namespace grpc {
319 } // namespace process {
320 
321 #endif // __PROCESS_GRPC_HPP__
Definition: errorbase.hpp:36
#define CHECK_PENDING(expression)
Definition: check.hpp:26
void finalize(bool finalize_wsa=false)
Clean up the library.
Future< Response > request(const Request &request, bool streamedResponse=false)
Asynchronously sends an HTTP request to the process and returns the HTTP response once the entire res...
const std::conditional< std::is_same< E, Error >::value, std::string, E >::type & error() const
Definition: try.hpp:85
Definition: check.hpp:33
bool initialize(const Option< std::string > &delegate=None(), const Option< std::string > &readwriteAuthenticationRealm=None(), const Option< std::string > &readonlyAuthenticationRealm=None())
Initialize the library.
Definition: type_utils.hpp:510
A copyable interface to manage a connection to a gRPC server.
Definition: grpc.hpp:100
Definition: duration.hpp:32
void terminate(const UPID &pid, bool inject=true)
Sends a TerminateEvent to the given process.
Defines the gRPC options for each call.
Definition: grpc.hpp:119
void dispatch(const PID< T > &pid, void(T::*method)())
Definition: dispatch.hpp:174
Connection(std::shared_ptr<::grpc::Channel > _channel)
Definition: grpc.hpp:109
bool wait_for_ready
Definition: grpc.hpp:124
Definition: duration.hpp:207
A copyable interface to manage an internal runtime process for asynchronous gRPC calls.
Definition: grpc.hpp:145
int64_t ns() const
Definition: duration.hpp:46
Definition: future.hpp:74
struct ev_loop * loop
Definition: loop.hpp:456
Protocol< PromiseRequest, PromiseResponse > promise
Runtime()
Definition: grpc.hpp:148
const std::shared_ptr<::grpc::Channel > channel
Definition: grpc.hpp:112
bool wait(const UPID &pid, const Duration &duration=Seconds(-1))
Wait for the process to exit for no more than the specified seconds.
Represents errors caused by non-OK gRPC statuses.
Definition: grpc.hpp:57
const ::grpc::Status status
Definition: grpc.hpp:66
Definition: attributes.hpp:24
Connection(const std::string &uri, const std::shared_ptr<::grpc::ChannelCredentials > &credentials=::grpc::InsecureChannelCredentials())
Definition: grpc.hpp:103
Definition: executor.hpp:48
Try< uint32_t > type(const std::string &path)
Definition: uri.hpp:21
Try< Nothing > bind(int_fd s, const Address &address)
Definition: network.hpp:46
mesos::v1::scheduler::Response Response
Definition: mesos.hpp:2619
StatusError(::grpc::Status _status)
Definition: grpc.hpp:60
Duration timeout
Definition: grpc.hpp:129
Definition: process.hpp:501
Future< Try< Response, StatusError > > call(const Connection &connection, Method &&method, Request &&request, const CallOptions &options)
Sends an asynchronous gRPC call.
Definition: grpc.hpp:177
Definition: lambda.hpp:414
Definition: future.hpp:58
Future< size_t > send(const int_fd &fd, const void *buf, size_t size)