Apache Mesos
grpc.hpp
Go to the documentation of this file.
1 // Licensed under the Apache License, Version 2.0 (the "License");
2 // you may not use this file except in compliance with the License.
3 // You may obtain a copy of the License at
4 //
5 // http://www.apache.org/licenses/LICENSE-2.0
6 //
7 // Unless required by applicable law or agreed to in writing, software
8 // distributed under the License is distributed on an "AS IS" BASIS,
9 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
10 // See the License for the specific language governing permissions and
11 // limitations under the License
12 
13 #ifndef __PROCESS_GRPC_HPP__
14 #define __PROCESS_GRPC_HPP__
15 
16 #include <chrono>
17 #include <memory>
18 #include <ostream>
19 #include <string>
20 #include <thread>
21 #include <type_traits>
22 #include <utility>
23 
24 #include <google/protobuf/message.h>
25 
26 #include <grpcpp/grpcpp.h>
27 
28 #include <process/check.hpp>
29 #include <process/dispatch.hpp>
30 #include <process/future.hpp>
31 #include <process/pid.hpp>
32 #include <process/process.hpp>
33 
34 #include <stout/duration.hpp>
35 #include <stout/error.hpp>
36 #include <stout/lambda.hpp>
37 #include <stout/nothing.hpp>
38 #include <stout/stringify.hpp>
39 #include <stout/try.hpp>
40 
41 
42 // This file provides libprocess "support" for using gRPC. In particular, it
43 // defines two wrapper classes: `client::Connection` which represents a
44 // connection to a gRPC server, and `client::Runtime` which integrates an event
45 // loop waiting for gRPC responses and provides the `call` interface to create
46 // an asynchronous gRPC call and return a `Future`.
47 
48 
49 #define GRPC_CLIENT_METHOD(service, rpc) \
50  (&service::Stub::PrepareAsync##rpc)
51 
52 namespace grpc {
53 
54 std::ostream& operator<<(std::ostream& stream, StatusCode statusCode);
55 
56 } // namespace grpc {
57 
58 
59 namespace process {
60 namespace grpc {
61 
66 class StatusError : public Error
67 {
68 public:
69  StatusError(::grpc::Status _status)
70  : Error(stringify(_status.error_code()) +
71  (_status.error_message().empty()
72  ? "" : ": " + _status.error_message())),
73  status(std::move(_status))
74  {
75  CHECK(!status.ok());
76  }
77 
78  const ::grpc::Status status;
79 };
80 
81 
82 namespace client {
83 
84 // Internal helper utilities.
85 namespace internal {
86 
87 template <typename T>
88 struct MethodTraits; // Undefined.
89 
90 
91 template <typename Stub, typename Request, typename Response>
92 struct MethodTraits<
93  std::unique_ptr<::grpc::ClientAsyncResponseReader<Response>>(Stub::*)(
94  ::grpc::ClientContext*,
95  const Request&,
96  ::grpc::CompletionQueue*)>
97 {
98  typedef Stub stub_type;
99  typedef Request request_type;
101 };
102 
103 } // namespace internal {
104 
105 
113 {
114 public:
116  const std::string& uri,
117  const std::shared_ptr<::grpc::ChannelCredentials>& credentials =
118  ::grpc::InsecureChannelCredentials())
119  : channel(::grpc::CreateChannel(uri, credentials)) {}
120 
121  explicit Connection(std::shared_ptr<::grpc::Channel> _channel)
122  : channel(std::move(_channel)) {}
123 
124  const std::shared_ptr<::grpc::Channel> channel;
125 };
126 
127 
132 {
133  // Enable the gRPC wait-for-ready semantics by default so the call will be
134  // retried if the connection is not ready. See:
135  // https://github.com/grpc/grpc/blob/master/doc/wait-for-ready.md
136  bool wait_for_ready = true;
137 
138  // The timeout of the call. A `DEADLINE_EXCEEDED` status will be returned if
139  // there is no response in the specified amount of time. This is required to
140  // avoid the call from being pending forever.
141  Duration timeout = Seconds(60);
142 };
143 
144 
157 class Runtime
158 {
159 public:
160  Runtime() : data(new Data()) {}
161 
178  template <
179  typename Method,
180  typename Request =
182  typename Response =
184  typename std::enable_if<
185  std::is_convertible<
186  typename std::decay<Request>::type*,
187  google::protobuf::Message*>::value,
188  int>::type = 0>
190  const Connection& connection,
191  Method&& method,
192  Request&& request,
193  const CallOptions& options)
194  {
195  // Create a `Promise` that will be set upon receiving a response.
196  // TODO(chhsiao): The `Promise` in the `shared_ptr` is not shared, but only
197  // to be captured by the lambda below. Use a `unique_ptr` once we get C++14.
198  std::shared_ptr<Promise<Try<Response, StatusError>>> promise(
200  Future<Try<Response, StatusError>> future = promise->future();
201 
202  // Send the request in the internal runtime process.
203  // TODO(chhsiao): We use `std::bind` here to forward `request` to avoid an
204  // extra copy. We should capture it by forwarding once we get C++14.
206  [connection, method, options, promise](
207  const Request& request,
208  bool terminating,
209  ::grpc::CompletionQueue* queue) {
210  if (terminating) {
211  promise->fail("Runtime has been terminated");
212  return;
213  }
214 
215  // TODO(chhsiao): The `shared_ptr`s here aren't shared, but only to be
216  // captured by the lambda below. Use `unique_ptr`s once we get C++14.
217  std::shared_ptr<::grpc::ClientContext> context(
218  new ::grpc::ClientContext());
219 
220  context->set_wait_for_ready(options.wait_for_ready);
221 
222  // We need to ensure that we're using a
223  // `std::chrono::system_clock::time_point` because `grpc::TimePoint`
224  // provides a specialization only for this type and we cannot
225  // guarantee that the operation below will always result in this type.
226  auto time_point =
227  std::chrono::time_point_cast<std::chrono::system_clock::duration>(
228  std::chrono::system_clock::now() +
229  std::chrono::nanoseconds(options.timeout.ns()));
230 
231  context->set_deadline(time_point);
232 
233  promise->future().onDiscard([=] { context->TryCancel(); });
234 
235  std::shared_ptr<Response> response(new Response());
236  std::shared_ptr<::grpc::Status> status(new ::grpc::Status());
237 
238  std::shared_ptr<::grpc::ClientAsyncResponseReader<Response>> reader =
240  connection.channel).*method)(context.get(), request, queue);
241 
242  reader->StartCall();
243 
244  // Create a `ReceiveCallback` as a tag in the `CompletionQueue` for
245  // the current asynchronous gRPC call. The callback will set up the
246  // above `Promise` upon receiving a response.
247  // NOTE: `context` and `reader` need to be held on in order to get
248  // updates for the ongoing RPC, and thus are captured here. The
249  // callback itself will later be retrieved and managed in the
250  // looper thread.
251  void* tag = new ReceiveCallback(
252  [context, reader, response, status, promise]() {
253  CHECK_PENDING(promise->future());
254  if (promise->future().hasDiscard()) {
255  promise->discard();
256  } else {
257  promise->set(status->ok()
258  ? std::move(*response)
259  : Try<Response, StatusError>::error(std::move(*status)));
260  }
261  });
262 
263  reader->Finish(response.get(), status.get(), tag);
264  },
265  std::forward<Request>(request),
266  lambda::_1,
267  lambda::_2));
268 
269  return future;
270  }
271 
277  void terminate();
278 
285 
286 private:
287  // Type of the callback functions that can get invoked when sending a request
288  // or receiving a response.
289  typedef lambda::CallableOnce<
290  void(bool, ::grpc::CompletionQueue*)> SendCallback;
292 
293  class RuntimeProcess : public Process<RuntimeProcess>
294  {
295  public:
296  RuntimeProcess();
297  ~RuntimeProcess() override;
298 
299  void send(SendCallback callback);
300  void receive(ReceiveCallback callback);
301  void terminate();
303 
304  private:
305  void initialize() override;
306  void finalize() override;
307 
308  void loop();
309 
310  ::grpc::CompletionQueue queue;
311  std::unique_ptr<std::thread> looper;
312  bool terminating;
313  Promise<Nothing> terminated;
314  };
315 
316  struct Data
317  {
318  Data();
319  ~Data();
320 
322  Future<Nothing> terminated;
323  };
324 
325  std::shared_ptr<Data> data;
326 };
327 
328 } // namespace client {
329 
330 } // namespace grpc {
331 } // namespace process {
332 
333 #endif // __PROCESS_GRPC_HPP__
Definition: errorbase.hpp:36
#define CHECK_PENDING(expression)
Definition: check.hpp:26
void finalize(bool finalize_wsa=false)
Clean up the library.
Future< Response > request(const Request &request, bool streamedResponse=false)
Asynchronously sends an HTTP request to the process and returns the HTTP response once the entire res...
const std::conditional< std::is_same< E, Error >::value, std::string, E >::type & error() const
Definition: try.hpp:97
Definition: check.hpp:33
Result< ProcessStatus > status(pid_t pid)
Definition: proc.hpp:166
Definition: type_utils.hpp:619
A copyable interface to manage a connection to a gRPC server.
Definition: grpc.hpp:112
Definition: duration.hpp:32
void terminate(const UPID &pid, bool inject=true)
Sends a TerminateEvent to the given process.
Defines the gRPC options for each call.
Definition: grpc.hpp:131
std::ostream & operator<<(std::ostream &stream, StatusCode statusCode)
Connection(std::shared_ptr<::grpc::Channel > _channel)
Definition: grpc.hpp:121
bool wait_for_ready
Definition: grpc.hpp:136
Try< Nothing > initialize(const Flags &flags)
Initialized state for support of systemd functions in this file.
Definition: duration.hpp:207
A copyable interface to manage an internal runtime process for asynchronous gRPC calls.
Definition: grpc.hpp:157
int64_t ns() const
Definition: duration.hpp:46
Definition: future.hpp:74
Definition: grpc.hpp:52
Protocol< PromiseRequest, PromiseResponse > promise
Runtime()
Definition: grpc.hpp:160
Future< V > loop(const Option< UPID > &pid, Iterate &&iterate, Body &&body)
Definition: loop.hpp:456
const std::shared_ptr<::grpc::Channel > channel
Definition: grpc.hpp:124
bool wait(const UPID &pid, const Duration &duration=Seconds(-1))
Wait for the process to exit for no more than the specified seconds.
void dispatch(const UPID &pid, std::unique_ptr< lambda::CallableOnce< void(ProcessBase *)>> f, const Option< const std::type_info * > &functionType=None())
Represents errors caused by non-OK gRPC statuses.
Definition: grpc.hpp:66
const ::grpc::Status status
Definition: grpc.hpp:78
Definition: attributes.hpp:24
Connection(const std::string &uri, const std::shared_ptr<::grpc::ChannelCredentials > &credentials=::grpc::InsecureChannelCredentials())
Definition: grpc.hpp:115
Definition: executor.hpp:48
Try< uint32_t > type(const std::string &path)
Definition: uri.hpp:21
Try< Nothing > bind(int_fd s, const Address &address)
Definition: network.hpp:46
mesos::v1::scheduler::Response Response
Definition: mesos.hpp:2854
StatusError(::grpc::Status _status)
Definition: grpc.hpp:69
std::string stringify(int flags)
Duration timeout
Definition: grpc.hpp:141
Definition: process.hpp:505
Future< Try< Response, StatusError > > call(const Connection &connection, Method &&method, Request &&request, const CallOptions &options)
Sends an asynchronous gRPC call.
Definition: grpc.hpp:189
Definition: lambda.hpp:414
Definition: future.hpp:58
Future< size_t > send(const int_fd &fd, const void *buf, size_t size)