Apache Mesos
grpc.hpp
Go to the documentation of this file.
1 // Licensed under the Apache License, Version 2.0 (the "License");
2 // you may not use this file except in compliance with the License.
3 // You may obtain a copy of the License at
4 //
5 // http://www.apache.org/licenses/LICENSE-2.0
6 //
7 // Unless required by applicable law or agreed to in writing, software
8 // distributed under the License is distributed on an "AS IS" BASIS,
9 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
10 // See the License for the specific language governing permissions and
11 // limitations under the License
12 
13 #ifndef __PROCESS_GRPC_HPP__
14 #define __PROCESS_GRPC_HPP__
15 
16 #include <chrono>
17 #include <memory>
18 #include <string>
19 #include <thread>
20 #include <type_traits>
21 #include <utility>
22 
23 #include <google/protobuf/message.h>
24 
25 #include <grpcpp/grpcpp.h>
26 
27 #include <process/check.hpp>
28 #include <process/dispatch.hpp>
29 #include <process/future.hpp>
30 #include <process/pid.hpp>
31 #include <process/process.hpp>
32 
33 #include <stout/error.hpp>
34 #include <stout/lambda.hpp>
35 #include <stout/nothing.hpp>
36 #include <stout/try.hpp>
37 
38 
39 // This file provides libprocess "support" for using gRPC. In particular, it
40 // defines two wrapper classes: `client::Connection` which represents a
41 // connection to a gRPC server, and `client::Runtime` which integrates an event
42 // loop waiting for gRPC responses and provides the `call` interface to create
43 // an asynchronous gRPC call and return a `Future`.
44 
45 
46 #define GRPC_CLIENT_METHOD(service, rpc) \
47  (&service::Stub::PrepareAsync##rpc)
48 
49 namespace process {
50 namespace grpc {
51 
56 class StatusError : public Error
57 {
58 public:
59  StatusError(::grpc::Status _status)
60  : Error(_status.error_message()), status(std::move(_status))
61  {
62  CHECK(!status.ok());
63  }
64 
65  const ::grpc::Status status;
66 };
67 
68 
69 namespace client {
70 
71 // Internal helper utilities.
72 namespace internal {
73 
74 template <typename T>
75 struct MethodTraits; // Undefined.
76 
77 
78 template <typename Stub, typename Request, typename Response>
79 struct MethodTraits<
80  std::unique_ptr<::grpc::ClientAsyncResponseReader<Response>>(Stub::*)(
81  ::grpc::ClientContext*,
82  const Request&,
83  ::grpc::CompletionQueue*)>
84 {
85  typedef Stub stub_type;
86  typedef Request request_type;
88 };
89 
90 } // namespace internal {
91 
92 
100 {
101 public:
103  const std::string& uri,
104  const std::shared_ptr<::grpc::ChannelCredentials>& credentials =
105  ::grpc::InsecureChannelCredentials())
106  : channel(::grpc::CreateChannel(uri, credentials)) {}
107 
108  explicit Connection(std::shared_ptr<::grpc::Channel> _channel)
109  : channel(std::move(_channel)) {}
110 
111  const std::shared_ptr<::grpc::Channel> channel;
112 };
113 
114 
127 class Runtime
128 {
129 public:
130  Runtime() : data(new Data()) {}
131 
147  template <
148  typename Method,
149  typename Request =
151  typename Response =
153  typename std::enable_if<
154  std::is_convertible<
155  typename std::decay<Request>::type*,
156  google::protobuf::Message*>::value,
157  int>::type = 0>
159  const Connection& connection,
160  Method&& method,
161  Request&& request)
162  {
163  // Create a `Promise` that will be set upon receiving a response.
164  // TODO(chhsiao): The `Promise` in the `shared_ptr` is not shared, but only
165  // to be captured by the lambda below. Use a `unique_ptr` once we get C++14.
166  std::shared_ptr<Promise<Try<Response, StatusError>>> promise(
168  Future<Try<Response, StatusError>> future = promise->future();
169 
170  // Send the request in the internal runtime process.
171  // TODO(chhsiao): We use `std::bind` here to forward `request` to avoid an
172  // extra copy. We should capture it by forwarding once we get C++14.
174  [connection, method, promise](
175  const Request& request,
176  bool terminating,
177  ::grpc::CompletionQueue* queue) {
178  if (terminating) {
179  promise->fail("Runtime has been terminated");
180  return;
181  }
182 
183  // TODO(chhsiao): The `shared_ptr`s here aren't shared, but only to be
184  // captured by the lambda below. Use `unique_ptr`s once we get C++14.
185  std::shared_ptr<::grpc::ClientContext> context(
186  new ::grpc::ClientContext());
187 
188  // TODO(chhsiao): Allow the caller to specify a timeout.
189  context->set_deadline(
190  std::chrono::system_clock::now() + std::chrono::seconds(5));
191 
192  // Enable the gRPC wait-for-ready semantics by default. See:
193  // https://github.com/grpc/grpc/blob/master/doc/wait-for-ready.md
194  // TODO(chhsiao): Allow the caller to set the option.
195  context->set_wait_for_ready(true);
196 
197  promise->future().onDiscard([=] { context->TryCancel(); });
198 
199  std::shared_ptr<Response> response(new Response());
200  std::shared_ptr<::grpc::Status> status(new ::grpc::Status());
201 
202  std::shared_ptr<::grpc::ClientAsyncResponseReader<Response>> reader =
204  connection.channel).*method)(context.get(), request, queue);
205 
206  reader->StartCall();
207 
208  // Create a `ReceiveCallback` as a tag in the `CompletionQueue` for
209  // the current asynchronous gRPC call. The callback will set up the
210  // above `Promise` upon receiving a response.
211  // NOTE: `context` and `reader` need to be held on in order to get
212  // updates for the ongoing RPC, and thus are captured here. The
213  // callback itself will later be retrieved and managed in the
214  // looper thread.
215  void* tag = new ReceiveCallback(
216  [context, reader, response, status, promise]() {
217  CHECK_PENDING(promise->future());
218  if (promise->future().hasDiscard()) {
219  promise->discard();
220  } else {
221  promise->set(status->ok()
222  ? std::move(*response)
223  : Try<Response, StatusError>::error(std::move(*status)));
224  }
225  });
226 
227  reader->Finish(response.get(), status.get(), tag);
228  },
229  std::forward<Request>(request),
230  lambda::_1,
231  lambda::_2));
232 
233  return future;
234  }
235 
241  void terminate();
242 
249 
250 private:
251  // Type of the callback functions that can get invoked when sending a request
252  // or receiving a response.
253  typedef lambda::CallableOnce<
254  void(bool, ::grpc::CompletionQueue*)> SendCallback;
256 
257  class RuntimeProcess : public Process<RuntimeProcess>
258  {
259  public:
260  RuntimeProcess();
261  virtual ~RuntimeProcess();
262 
263  void send(SendCallback callback);
264  void receive(ReceiveCallback callback);
265  void terminate();
267 
268  private:
269  void initialize() override;
270  void finalize() override;
271 
272  void loop();
273 
274  ::grpc::CompletionQueue queue;
275  std::unique_ptr<std::thread> looper;
276  bool terminating;
277  Promise<Nothing> terminated;
278  };
279 
280  struct Data
281  {
282  Data();
283  ~Data();
284 
286  Future<Nothing> terminated;
287  };
288 
289  std::shared_ptr<Data> data;
290 };
291 
292 } // namespace client {
293 
294 } // namespace grpc {
295 } // namespace process {
296 
297 #endif // __PROCESS_GRPC_HPP__
Definition: errorbase.hpp:36
#define CHECK_PENDING(expression)
Definition: check.hpp:26
void finalize(bool finalize_wsa=false)
Clean up the library.
Future< Response > request(const Request &request, bool streamedResponse=false)
Asynchronously sends an HTTP request to the process and returns the HTTP response once the entire res...
const std::conditional< std::is_same< E, Error >::value, std::string, E >::type & error() const
Definition: try.hpp:85
Definition: check.hpp:33
Future< Try< Response, StatusError > > call(const Connection &connection, Method &&method, Request &&request)
Sends an asynchronous gRPC call.
Definition: grpc.hpp:158
bool initialize(const Option< std::string > &delegate=None(), const Option< std::string > &readwriteAuthenticationRealm=None(), const Option< std::string > &readonlyAuthenticationRealm=None())
Initialize the library.
Definition: type_utils.hpp:510
A copyable interface to manage a connection to a gRPC server.
Definition: grpc.hpp:99
void terminate(const UPID &pid, bool inject=true)
Sends a TerminateEvent to the given process.
void dispatch(const PID< T > &pid, void(T::*method)())
Definition: dispatch.hpp:174
ssize_t send(const int_fd &fd, const void *buf, size_t len, int flags)
Definition: socket.hpp:159
Connection(std::shared_ptr<::grpc::Channel > _channel)
Definition: grpc.hpp:108
A copyable interface to manage an internal runtime process for asynchronous gRPC calls.
Definition: grpc.hpp:127
Definition: future.hpp:74
struct ev_loop * loop
Definition: loop.hpp:456
Protocol< PromiseRequest, PromiseResponse > promise
Runtime()
Definition: grpc.hpp:130
const std::shared_ptr<::grpc::Channel > channel
Definition: grpc.hpp:111
bool wait(const UPID &pid, const Duration &duration=Seconds(-1))
Wait for the process to exit for no more than the specified seconds.
Represents errors caused by non-OK gRPC statuses.
Definition: grpc.hpp:56
const ::grpc::Status status
Definition: grpc.hpp:65
Definition: attributes.hpp:24
Connection(const std::string &uri, const std::shared_ptr<::grpc::ChannelCredentials > &credentials=::grpc::InsecureChannelCredentials())
Definition: grpc.hpp:102
Definition: executor.hpp:48
Try< uint32_t > type(const std::string &path)
Definition: uri.hpp:21
Try< Nothing > bind(int_fd s, const Address &address)
Definition: network.hpp:46
mesos::v1::scheduler::Response Response
Definition: mesos.hpp:2663
StatusError(::grpc::Status _status)
Definition: grpc.hpp:59
Definition: process.hpp:501
Definition: lambda.hpp:414
Definition: future.hpp:58