Apache Mesos
protobuf.hpp
Go to the documentation of this file.
1 // Licensed under the Apache License, Version 2.0 (the "License");
2 // you may not use this file except in compliance with the License.
3 // You may obtain a copy of the License at
4 //
5 // http://www.apache.org/licenses/LICENSE-2.0
6 //
7 // Unless required by applicable law or agreed to in writing, software
8 // distributed under the License is distributed on an "AS IS" BASIS,
9 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
10 // See the License for the specific language governing permissions and
11 // limitations under the License
12 
13 #ifndef __PROCESS_PROTOBUF_HPP__
14 #define __PROCESS_PROTOBUF_HPP__
15 
16 #include <glog/logging.h>
17 
18 #include <google/protobuf/arena.h>
19 #include <google/protobuf/message.h>
20 #include <google/protobuf/repeated_field.h>
21 
22 #include <iterator>
23 #include <set>
24 #include <vector>
25 
26 #include <process/defer.hpp>
27 #include <process/dispatch.hpp>
28 #include <process/id.hpp>
29 #include <process/process.hpp>
30 
31 #include <stout/hashmap.hpp>
32 #include <stout/lambda.hpp>
33 
34 
35 // Provides an implementation of process::post that for a protobuf.
36 namespace process {
37 
38 inline void post(const process::UPID& to,
39  const google::protobuf::Message& message)
40 {
41  std::string data;
42  if (message.SerializeToString(&data)) {
43  post(to, message.GetTypeName(), data.data(), data.size());
44  } else {
45  LOG(ERROR) << "Failed to post '" << message.GetTypeName() << "' to "
46  << to << ": Failed to serialize";
47  }
48 }
49 
50 
51 inline void post(const process::UPID& from,
52  const process::UPID& to,
53  const google::protobuf::Message& message)
54 {
55  std::string data;
56  if (message.SerializeToString(&data)) {
57  post(from, to, message.GetTypeName(), data.data(), data.size());
58  } else {
59  LOG(ERROR) << "Failed to post '" << message.GetTypeName() << "' to "
60  << to << ": Failed to serialize";
61  }
62 }
63 
64 } // namespace process {
65 
66 
67 // The rest of this file provides libprocess "support" for using
68 // protocol buffers. In particular, this file defines a subclass of
69 // Process (ProtobufProcess) that allows you to install protocol
70 // buffer handlers in addition to normal message and HTTP
71 // handlers. Install handlers can optionally take the sender's UPID
72 // as their first argument.
73 // Note that this header file assumes you will be linking
74 // against BOTH libprotobuf and libglog.
75 
76 namespace google {
77 namespace protobuf {
78 
79 // Type conversions helpful for changing between protocol buffer types
80 // and standard C++ types (for parameters).
81 template <typename T>
82 const T& convert(const T& t)
83 {
84  return t;
85 }
86 
87 
88 template <typename T>
89 std::vector<T> convert(const google::protobuf::RepeatedPtrField<T>& items)
90 {
91  return std::vector<T>(items.begin(), items.end());
92 }
93 
94 
95 template <typename T>
96 std::vector<T> convert(google::protobuf::RepeatedPtrField<T>&& items)
97 {
98  return std::vector<T>(
99  std::make_move_iterator(items.begin()),
100  std::make_move_iterator(items.end()));
101 }
102 
103 } // namespace protobuf {
104 } // namespace google {
105 
106 
107 template <typename T>
109 {
110 public:
111  ~ProtobufProcess() override {}
112 
113 protected:
114  void consume(process::MessageEvent&& event) override
115  {
116  if (protobufHandlers.count(event.message.name) > 0) {
117  from = event.message.from; // For 'reply'.
118  protobufHandlers[event.message.name](
119  event.message.from, event.message.body);
120  from = process::UPID();
121  } else {
122  process::Process<T>::consume(std::move(event));
123  }
124  }
125 
126  void send(const process::UPID& to,
127  const google::protobuf::Message& message)
128  {
129  std::string data;
130  if (message.SerializeToString(&data)) {
131  process::Process<T>::send(to, message.GetTypeName(), std::move(data));
132  } else {
133  LOG(ERROR) << "Failed to send '" << message.GetTypeName() << "' to "
134  << to << ": Failed to serialize";
135  }
136  }
137 
139 
140  void reply(const google::protobuf::Message& message)
141  {
142  CHECK(from) << "Attempting to reply without a sender";
143  send(from, message);
144  }
145 
146  // Installs that take the sender as the first argument.
147  template <typename M>
148  void install(void (T::*method)(const process::UPID&, const M&))
149  {
150  google::protobuf::Message* m = new M();
151  T* t = static_cast<T*>(this);
152  protobufHandlers[m->GetTypeName()] =
153  lambda::bind(&handlerM<M>,
154  t, method,
155  lambda::_1, lambda::_2);
156  delete m;
157  }
158 
159  template <typename M>
160  void install(void (T::*method)(const process::UPID&, M&&))
161  {
162  google::protobuf::Message* m = new M();
163  T* t = static_cast<T*>(this);
164  protobufHandlers[m->GetTypeName()] =
165  lambda::bind(&handlerMutM<M>,
166  t, method,
167  lambda::_1, lambda::_2);
168  delete m;
169  }
170 
171  template <typename M, typename P>
172  using MessageProperty = P(M::*)() const;
173 
174  template <typename M>
175  void install(void (T::*method)(const process::UPID&))
176  {
177  google::protobuf::Message* m = new M();
178  T* t = static_cast<T*>(this);
179  protobufHandlers[m->GetTypeName()] =
180  lambda::bind(&handler0,
181  t, method,
182  lambda::_1, lambda::_2);
183  delete m;
184  }
185 
186  template <typename M,
187  typename ...P, typename ...PC>
188  void install(
189  void (T::*method)(const process::UPID&, PC...),
190  MessageProperty<M, P>... param)
191  {
192  google::protobuf::Message* m = new M();
193  T* t = static_cast<T*>(this);
194  protobufHandlers[m->GetTypeName()] =
195  lambda::bind(static_cast<void(&)(
196  T*,
197  void (T::*)(const process::UPID&, PC...),
198  const process::UPID&,
199  const std::string&,
200  MessageProperty<M, P>...)>(handlerN),
201  t, method,
202  lambda::_1, lambda::_2, param...);
203  delete m;
204  }
205 
206  // Installs that do not take the sender.
207  template <typename M>
208  void install(void (T::*method)(const M&))
209  {
210  google::protobuf::Message* m = new M();
211  T* t = static_cast<T*>(this);
212  protobufHandlers[m->GetTypeName()] =
213  lambda::bind(&_handlerM<M>,
214  t, method,
215  lambda::_1, lambda::_2);
216  delete m;
217  }
218 
219  template <typename M>
220  void install(void (T::*method)(M&&))
221  {
222  google::protobuf::Message* m = new M();
223  T* t = static_cast<T*>(this);
224  protobufHandlers[m->GetTypeName()] =
225  lambda::bind(&_handlerMutM<M>,
226  t, method,
227  lambda::_1, lambda::_2);
228  delete m;
229  }
230 
231  template <typename M>
232  void install(void (T::*method)())
233  {
234  google::protobuf::Message* m = new M();
235  T* t = static_cast<T*>(this);
236  protobufHandlers[m->GetTypeName()] =
237  lambda::bind(&_handler0,
238  t, method,
239  lambda::_1, lambda::_2);
240  delete m;
241  }
242 
243  template <typename M,
244  typename ...P, typename ...PC>
245  void install(
246  void (T::*method)(PC...),
247  MessageProperty<M, P>... param)
248  {
249  google::protobuf::Message* m = new M();
250  T* t = static_cast<T*>(this);
251  protobufHandlers[m->GetTypeName()] =
252  lambda::bind(static_cast<void(&)(
253  T*,
254  void (T::*)(PC...),
255  const process::UPID&,
256  const std::string&,
257  MessageProperty<M, P>...)>(_handlerN),
258  t, method,
259  lambda::_1, lambda::_2, param...);
260  delete m;
261  }
262 
264 
265 private:
266  // Handlers that take the sender as the first argument.
267  template <typename M>
268  static void handlerM(
269  T* t,
270  void (T::*method)(const process::UPID&, const M&),
271  const process::UPID& sender,
272  const std::string& data)
273  {
274  google::protobuf::Arena arena;
275  M* m = CHECK_NOTNULL(google::protobuf::Arena::CreateMessage<M>(&arena));
276 
277  if (m->ParseFromString(data)) {
278  (t->*method)(sender, *m);
279  } else {
280  LOG(ERROR) << "Failed to deserialize '" << m->GetTypeName()
281  << "' from " << sender;
282  }
283  }
284 
285  template <typename M>
286  static void handlerMutM(
287  T* t,
288  void (T::*method)(const process::UPID&, M&&),
289  const process::UPID& sender,
290  const std::string& data)
291  {
292  M m;
293 
294  if (m.ParseFromString(data)) {
295  (t->*method)(sender, std::move(m));
296  } else {
297  LOG(ERROR) << "Failed to deserialize '" << m.GetTypeName()
298  << "' from " << sender;
299  }
300  }
301 
302  static void handler0(
303  T* t,
304  void (T::*method)(const process::UPID&),
305  const process::UPID& sender,
306  const std::string& data)
307  {
308  (t->*method)(sender);
309  }
310 
311  template <typename M,
312  typename ...P, typename ...PC>
313  static void handlerN(
314  T* t,
315  void (T::*method)(const process::UPID&, PC...),
316  const process::UPID& sender,
317  const std::string& data,
319  {
320  google::protobuf::Arena arena;
321  M* m = CHECK_NOTNULL(google::protobuf::Arena::CreateMessage<M>(&arena));
322 
323  if (m->ParseFromString(data)) {
324  (t->*method)(sender, google::protobuf::convert((m->*p)())...);
325  } else {
326  LOG(ERROR) << "Failed to deserialize '" << m->GetTypeName()
327  << "' from " << sender;
328  }
329  }
330 
331  // Handlers that ignore the sender.
332  template <typename M>
333  static void _handlerM(
334  T* t,
335  void (T::*method)(const M&),
336  const process::UPID& sender,
337  const std::string& data)
338  {
339  google::protobuf::Arena arena;
340  M* m = CHECK_NOTNULL(google::protobuf::Arena::CreateMessage<M>(&arena));
341 
342  if (m->ParseFromString(data)) {
343  (t->*method)(*m);
344  } else {
345  LOG(ERROR) << "Failed to deserialize '" << m->GetTypeName()
346  << "' from " << sender;
347  }
348  }
349 
350  template <typename M>
351  static void _handlerMutM(
352  T* t,
353  void (T::*method)(M&&),
354  const process::UPID& sender,
355  const std::string& data)
356  {
357  M m;
358 
359  if (m.ParseFromString(data)) {
360  (t->*method)(std::move(m));
361  } else {
362  LOG(ERROR) << "Failed to deserialize '" << m.GetTypeName()
363  << "' from " << sender;
364  }
365  }
366 
367  static void _handler0(
368  T* t,
369  void (T::*method)(),
370  const process::UPID&,
371  const std::string& data)
372  {
373  (t->*method)();
374  }
375 
376  template <typename M,
377  typename ...P, typename ...PC>
378  static void _handlerN(
379  T* t,
380  void (T::*method)(PC...),
381  const process::UPID& sender,
382  const std::string& data,
384  {
385  google::protobuf::Arena arena;
386  M* m = CHECK_NOTNULL(google::protobuf::Arena::CreateMessage<M>(&arena));
387 
388  if (m->ParseFromString(data)) {
389  (t->*method)(google::protobuf::convert((m->*p)())...);
390  } else {
391  LOG(ERROR) << "Failed to deserialize '" << m->GetTypeName()
392  << "' from " << sender;
393  }
394  }
395 
396  typedef lambda::function<
397  void(const process::UPID&, const std::string&)> handler;
398  hashmap<std::string, handler> protobufHandlers;
399 
400  // Sender of "current" message, inaccessible by subclasses.
401  // This is only used for reply().
402  process::UPID from;
403 };
404 
405 
406 // Implements a process for sending protobuf "requests" to a process
407 // and waiting for a protobuf "response", but uses futures so that
408 // this can be done without needing to implement a process.
409 template <typename Req, typename Res>
410 class ReqResProcess : public ProtobufProcess<ReqResProcess<Req, Res>>
411 {
412 public:
413  ReqResProcess(const process::UPID& _pid, const Req& _req)
414  : process::ProcessBase(process::ID::generate("__req_res__")),
415  pid(_pid),
416  req(_req)
417  {
419  install<Res>(&ReqResProcess<Req, Res>::response);
420  }
421 
422  ~ReqResProcess() override
423  {
424  // Discard the promise.
425  promise.discard();
426  }
427 
429  {
430  promise.future().onDiscard(defer(this, &ReqResProcess::discarded));
431 
433 
434  return promise.future();
435  }
436 
437 private:
438  void discarded()
439  {
440  promise.discard();
441  process::terminate(this);
442  }
443 
444  void response(const Res& res)
445  {
446  promise.set(res);
447  process::terminate(this);
448  }
449 
450  const process::UPID pid;
451  const Req req;
453 };
454 
455 
456 // Allows you to describe request/response protocols and then use
457 // those for sending requests and getting back responses.
458 template <typename Req, typename Res>
459 struct Protocol
460 {
462  const process::UPID& pid,
463  const Req& req) const
464  {
465  // Help debugging by adding some "type constraints".
466  { Req* req = nullptr; google::protobuf::Message* m = req; (void)m; }
467  { Res* res = nullptr; google::protobuf::Message* m = res; (void)m; }
468 
470  process::spawn(process, true);
472  }
473 };
474 
475 #endif // __PROCESS_PROTOBUF_HPP__
std::string generate(const std::string &prefix="")
Returns &#39;prefix(N)&#39; where N represents the number of instances where the same prefix (wrt...
~ReqResProcess() override
Definition: protobuf.hpp:422
process::Future< Res > operator()(const process::UPID &pid, const Req &req) const
Definition: protobuf.hpp:461
ReqResProcess(const process::UPID &_pid, const Req &_req)
Definition: protobuf.hpp:413
void install(void(T::*method)(const process::UPID &, const M &))
Definition: protobuf.hpp:148
void install(void(T::*method)(const M &))
Definition: protobuf.hpp:208
void install(void(T::*method)())
Definition: protobuf.hpp:232
P(M::*)() const MessageProperty
Definition: protobuf.hpp:172
UPID spawn(ProcessBase *process, bool manage=false)
Spawn a new process.
void terminate(const UPID &pid, bool inject=true)
Sends a TerminateEvent to the given process.
~ProtobufProcess() override
Definition: protobuf.hpp:111
void send(const process::UPID &to, const google::protobuf::Message &message)
Definition: protobuf.hpp:126
void reply(const google::protobuf::Message &message)
Definition: protobuf.hpp:140
Definition: protobuf.hpp:410
void send(const UPID &to, const std::string &name, const char *data=nullptr, size_t length=0)
Sends the message to the specified UPID.
void dispatch(const PID< T > &pid, void(T::*method)())
Definition: dispatch.hpp:174
void post(const UPID &to, const std::string &name, const char *data=nullptr, size_t length=0)
Sends a message with data without a return address.
void install(void(T::*method)(const process::UPID &, PC...), MessageProperty< M, P >...param)
Definition: protobuf.hpp:188
void install(void(T::*method)(M &&))
Definition: protobuf.hpp:220
An "untyped" PID, used to encapsulate the process ID for lower-layer abstractions (eg...
Definition: pid.hpp:39
void consume(process::MessageEvent &&event) override
Definition: protobuf.hpp:114
void consume(MessageEvent &&event) override
Protocol< PromiseRequest, PromiseResponse > promise
Definition: protobuf.hpp:108
Definition: protobuf.hpp:61
Definition: protobuf.hpp:459
const T & convert(const T &t)
Definition: protobuf.hpp:82
process::Future< Res > run()
Definition: protobuf.hpp:428
Definition: executor.hpp:48
jobject convert(JNIEnv *env, const T &t)
void install(void(T::*method)(const process::UPID &, M &&))
Definition: protobuf.hpp:160
Definition: event.hpp:103
void discarded(Future< U > future)
Type utilities for the protobuf library that are not specific to particular protobuf classes...
Definition: type_utils.hpp:552
Try< Nothing > bind(int_fd s, const Address &address)
Definition: network.hpp:46
Definition: process.hpp:505
void install(void(T::*method)(const process::UPID &))
Definition: protobuf.hpp:175
Deferred< void()> defer(const PID< T > &pid, void(T::*method)())
Definition: defer.hpp:35
void install(void(T::*method)(PC...), MessageProperty< M, P >...param)
Definition: protobuf.hpp:245
Future< size_t > send(const int_fd &fd, const void *buf, size_t size)