Apache Mesos
grpc.hpp
Go to the documentation of this file.
1 // Licensed under the Apache License, Version 2.0 (the "License");
2 // you may not use this file except in compliance with the License.
3 // You may obtain a copy of the License at
4 //
5 // http://www.apache.org/licenses/LICENSE-2.0
6 //
7 // Unless required by applicable law or agreed to in writing, software
8 // distributed under the License is distributed on an "AS IS" BASIS,
9 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
10 // See the License for the specific language governing permissions and
11 // limitations under the License
12 
13 #ifndef __PROCESS_GRPC_HPP__
14 #define __PROCESS_GRPC_HPP__
15 
16 #include <atomic>
17 #include <chrono>
18 #include <memory>
19 #include <thread>
20 #include <type_traits>
21 
22 #include <google/protobuf/message.h>
23 
24 #include <grpc++/grpc++.h>
25 
26 #include <process/future.hpp>
27 #include <process/owned.hpp>
28 #include <process/process.hpp>
29 
30 #include <stout/duration.hpp>
31 #include <stout/lambda.hpp>
32 #include <stout/synchronized.hpp>
33 #include <stout/try.hpp>
34 
35 
36 // This file provides libprocess "support" for using gRPC. In
37 // particular, it defines two wrapper classes: `Channel` (representing a
38 // connection to a gRPC server) and `client::Runtime`, which integrates
39 // an event loop waiting for gRPC responses, and provides the `call`
40 // interface to create an asynchrous gRPC call and return a `Future`.
41 
42 
43 #define GRPC_RPC(service, rpc) \
44  (&service::Stub::Async##rpc)
45 
46 namespace process {
47 namespace grpc {
48 
49 // Forward declarations.
50 namespace client { class Runtime; }
51 
52 
59 class Channel
60 {
61 public:
62  Channel(const std::string& uri,
63  const std::shared_ptr<::grpc::ChannelCredentials>& credentials =
64  ::grpc::InsecureChannelCredentials())
65  : channel(::grpc::CreateChannel(uri, credentials)) {}
66 
67 private:
68  std::shared_ptr<::grpc::Channel> channel;
69 
70  friend class client::Runtime;
71 };
72 
73 
79 template <typename T>
80 struct RpcResult
81 {
82  RpcResult(const ::grpc::Status& _status, const T& _response)
83  : status(_status), response(_response) {}
84 
85  ::grpc::Status status;
87 };
88 
89 namespace client {
90 
104 class Runtime
105 {
106 public:
107  Runtime() : data(new Data()) {}
108 
118  template <typename Stub, typename Request, typename Response>
120  const Channel& channel,
121  std::unique_ptr<::grpc::ClientAsyncResponseReader<Response>>(Stub::*rpc)(
122  ::grpc::ClientContext*,
123  const Request&,
124  ::grpc::CompletionQueue*),
125  const Request& request)
126  {
127  static_assert(
128  std::is_convertible<Request*, google::protobuf::Message*>::value,
129  "Request must be a protobuf message");
130 
131  synchronized (data->lock) {
132  if (data->terminating) {
133  return Failure("Runtime has been terminated.");
134  }
135 
136  std::shared_ptr<::grpc::ClientContext> context(
137  new ::grpc::ClientContext());
138 
139  // TODO(chhsiao): Allow the caller to specify a timeout.
140  context->set_deadline(
141  std::chrono::system_clock::now() + std::chrono::seconds(5));
142 
143  // Create a `Promise` and a callback lambda as a tag and invokes
144  // an asynchronous gRPC call through the `CompletionQueue`
145  // managed by `data`. The `Promise` will be set by the callback
146  // upon server response.
147  std::shared_ptr<Promise<RpcResult<Response>>> promise(
149 
150  promise->future().onDiscard([=] { context->TryCancel(); });
151 
152  std::shared_ptr<Response> response(new Response());
153  std::shared_ptr<::grpc::Status> status(new ::grpc::Status());
154 
155  std::shared_ptr<::grpc::ClientAsyncResponseReader<Response>> reader(
156  (Stub(channel.channel).*rpc)(context.get(), request, &data->queue));
157 
158  reader->Finish(
159  response.get(),
160  status.get(),
161  new lambda::function<void()>(
162  // NOTE: `context` and `reader` need to be held on in
163  // order to get updates for the ongoing RPC, and thus
164  // are captured here. The lambda itself will later be
165  // retrieved and managed in `Data::loop()`.
166  [context, reader, response, status, promise]() {
167  CHECK(promise->future().isPending());
168  if (promise->future().hasDiscard()) {
169  promise->discard();
170  return;
171  }
172 
173  promise->set(RpcResult<Response>(*status, *response));
174  }));
175 
176  return promise->future();
177  }
178  }
179 
186  void terminate();
187 
194 
195 private:
196  struct Data
197  {
198  Data();
199  ~Data();
200 
201  void loop();
202  void terminate();
203 
204  std::unique_ptr<std::thread> looper;
205  ::grpc::CompletionQueue queue;
207  std::atomic_flag lock = ATOMIC_FLAG_INIT;
208  bool terminating = false;
209  Promise<Nothing> terminated;
210  };
211 
212  std::shared_ptr<Data> data;
213 };
214 
215 } // namespace client {
216 
217 } // namespace grpc {
218 } // namespace process {
219 
220 #endif // __PROCESS_GRPC_HPP__
T response
Definition: grpc.hpp:86
Future< Response > request(const Request &request, bool streamedResponse=false)
Asynchronously sends an HTTP request to the process and returns the HTTP response once the entire res...
Definition: future.hpp:664
Result< ProcessStatus > status(pid_t pid)
Definition: proc.hpp:166
Definition: process.hpp:72
The response of a RPC call.
Definition: grpc.hpp:80
RpcResult(const ::grpc::Status &_status, const T &_response)
Definition: grpc.hpp:82
A copyable interface to manage an internal gRPC runtime instance for asynchronous gRPC calls...
Definition: grpc.hpp:104
Definition: future.hpp:73
struct ev_loop * loop
Definition: loop.hpp:456
Channel(const std::string &uri, const std::shared_ptr<::grpc::ChannelCredentials > &credentials=::grpc::InsecureChannelCredentials())
Definition: grpc.hpp:62
Protocol< PromiseRequest, PromiseResponse > promise
Runtime()
Definition: grpc.hpp:107
Future< RpcResult< Response > > call(const Channel &channel, std::unique_ptr<::grpc::ClientAsyncResponseReader< Response >>(Stub::*rpc)(::grpc::ClientContext *, const Request &,::grpc::CompletionQueue *), const Request &request)
Sends an asynchronous gRPC call.
Definition: grpc.hpp:119
Result< Process > process(pid_t pid)
Definition: freebsd.hpp:30
A copyable interface to manage a connection to a gRPC server.
Definition: grpc.hpp:59
void terminate()
Asks the internal gRPC runtime instance to shut down the CompletionQueue, which would stop its looper...
::grpc::Status status
Definition: grpc.hpp:85
Future< Nothing > wait()
Definition: future.hpp:57