Apache Mesos
protobuf.hpp
Go to the documentation of this file.
1 // Licensed under the Apache License, Version 2.0 (the "License");
2 // you may not use this file except in compliance with the License.
3 // You may obtain a copy of the License at
4 //
5 // http://www.apache.org/licenses/LICENSE-2.0
6 //
7 // Unless required by applicable law or agreed to in writing, software
8 // distributed under the License is distributed on an "AS IS" BASIS,
9 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
10 // See the License for the specific language governing permissions and
11 // limitations under the License
12 
13 #ifndef __PROCESS_PROTOBUF_HPP__
14 #define __PROCESS_PROTOBUF_HPP__
15 
16 #include <glog/logging.h>
17 
18 #include <google/protobuf/arena.h>
19 #include <google/protobuf/message.h>
20 #include <google/protobuf/repeated_field.h>
21 
22 #include <iterator>
23 #include <set>
24 #include <vector>
25 
26 #include <process/defer.hpp>
27 #include <process/dispatch.hpp>
28 #include <process/id.hpp>
29 #include <process/process.hpp>
30 
31 #include <stout/hashmap.hpp>
32 #include <stout/lambda.hpp>
33 
34 
35 // Provides an implementation of process::post that for a protobuf.
36 namespace process {
37 
38 inline void post(const process::UPID& to,
39  const google::protobuf::Message& message)
40 {
41  std::string data;
42  message.SerializeToString(&data);
43  post(to, message.GetTypeName(), data.data(), data.size());
44 }
45 
46 
47 inline void post(const process::UPID& from,
48  const process::UPID& to,
49  const google::protobuf::Message& message)
50 {
51  std::string data;
52  message.SerializeToString(&data);
53  post(from, to, message.GetTypeName(), data.data(), data.size());
54 }
55 
56 } // namespace process {
57 
58 
59 // The rest of this file provides libprocess "support" for using
60 // protocol buffers. In particular, this file defines a subclass of
61 // Process (ProtobufProcess) that allows you to install protocol
62 // buffer handlers in addition to normal message and HTTP
63 // handlers. Install handlers can optionally take the sender's UPID
64 // as their first argument.
65 // Note that this header file assumes you will be linking
66 // against BOTH libprotobuf and libglog.
67 
68 namespace google {
69 namespace protobuf {
70 
71 // Type conversions helpful for changing between protocol buffer types
72 // and standard C++ types (for parameters).
73 template <typename T>
74 const T& convert(const T& t)
75 {
76  return t;
77 }
78 
79 
80 template <typename T>
81 std::vector<T> convert(const google::protobuf::RepeatedPtrField<T>& items)
82 {
83  return std::vector<T>(items.begin(), items.end());
84 }
85 
86 
87 template <typename T>
88 std::vector<T> convert(google::protobuf::RepeatedPtrField<T>&& items)
89 {
90  return std::vector<T>(
91  std::make_move_iterator(items.begin()),
92  std::make_move_iterator(items.end()));
93 }
94 
95 } // namespace protobuf {
96 } // namespace google {
97 
98 
99 template <typename T>
101 {
102 public:
103  virtual ~ProtobufProcess() {}
104 
105 protected:
106  void consume(process::MessageEvent&& event) override
107  {
108  if (protobufHandlers.count(event.message.name) > 0) {
109  from = event.message.from; // For 'reply'.
110  protobufHandlers[event.message.name](
111  event.message.from, event.message.body);
112  from = process::UPID();
113  } else {
114  process::Process<T>::consume(std::move(event));
115  }
116  }
117 
118  void send(const process::UPID& to,
119  const google::protobuf::Message& message)
120  {
121  std::string data;
122  message.SerializeToString(&data);
123  process::Process<T>::send(to, message.GetTypeName(), std::move(data));
124  }
125 
127 
128  void reply(const google::protobuf::Message& message)
129  {
130  CHECK(from) << "Attempting to reply without a sender";
131  send(from, message);
132  }
133 
134  // Installs that take the sender as the first argument.
135  template <typename M>
136  void install(void (T::*method)(const process::UPID&, const M&))
137  {
138  google::protobuf::Message* m = new M();
139  T* t = static_cast<T*>(this);
140  protobufHandlers[m->GetTypeName()] =
141  lambda::bind(&handlerM<M>,
142  t, method,
143  lambda::_1, lambda::_2);
144  delete m;
145  }
146 
147  template <typename M>
148  void install(void (T::*method)(const process::UPID&, M&&))
149  {
150  google::protobuf::Message* m = new M();
151  T* t = static_cast<T*>(this);
152  protobufHandlers[m->GetTypeName()] =
153  lambda::bind(&handlerMutM<M>,
154  t, method,
155  lambda::_1, lambda::_2);
156  delete m;
157  }
158 
159  template <typename M, typename P>
160  using MessageProperty = P(M::*)() const;
161 
162  template <typename M>
163  void install(void (T::*method)(const process::UPID&))
164  {
165  google::protobuf::Message* m = new M();
166  T* t = static_cast<T*>(this);
167  protobufHandlers[m->GetTypeName()] =
168  lambda::bind(&handler0,
169  t, method,
170  lambda::_1, lambda::_2);
171  delete m;
172  }
173 
174  template <typename M,
175  typename ...P, typename ...PC>
176  void install(
177  void (T::*method)(const process::UPID&, PC...),
178  MessageProperty<M, P>... param)
179  {
180  google::protobuf::Message* m = new M();
181  T* t = static_cast<T*>(this);
182  protobufHandlers[m->GetTypeName()] =
183  lambda::bind(static_cast<void(&)(
184  T*,
185  void (T::*)(const process::UPID&, PC...),
186  const process::UPID&,
187  const std::string&,
188  MessageProperty<M, P>...)>(handlerN),
189  t, method,
190  lambda::_1, lambda::_2, param...);
191  delete m;
192  }
193 
194  // Installs that do not take the sender.
195  template <typename M>
196  void install(void (T::*method)(const M&))
197  {
198  google::protobuf::Message* m = new M();
199  T* t = static_cast<T*>(this);
200  protobufHandlers[m->GetTypeName()] =
201  lambda::bind(&_handlerM<M>,
202  t, method,
203  lambda::_1, lambda::_2);
204  delete m;
205  }
206 
207  template <typename M>
208  void install(void (T::*method)(M&&))
209  {
210  google::protobuf::Message* m = new M();
211  T* t = static_cast<T*>(this);
212  protobufHandlers[m->GetTypeName()] =
213  lambda::bind(&_handlerMutM<M>,
214  t, method,
215  lambda::_1, lambda::_2);
216  delete m;
217  }
218 
219  template <typename M>
220  void install(void (T::*method)())
221  {
222  google::protobuf::Message* m = new M();
223  T* t = static_cast<T*>(this);
224  protobufHandlers[m->GetTypeName()] =
225  lambda::bind(&_handler0,
226  t, method,
227  lambda::_1, lambda::_2);
228  delete m;
229  }
230 
231  template <typename M,
232  typename ...P, typename ...PC>
233  void install(
234  void (T::*method)(PC...),
235  MessageProperty<M, P>... param)
236  {
237  google::protobuf::Message* m = new M();
238  T* t = static_cast<T*>(this);
239  protobufHandlers[m->GetTypeName()] =
240  lambda::bind(static_cast<void(&)(
241  T*,
242  void (T::*)(PC...),
243  const process::UPID&,
244  const std::string&,
245  MessageProperty<M, P>...)>(_handlerN),
246  t, method,
247  lambda::_1, lambda::_2, param...);
248  delete m;
249  }
250 
252 
253 private:
254  // Handlers that take the sender as the first argument.
255  template <typename M>
256  static void handlerM(
257  T* t,
258  void (T::*method)(const process::UPID&, const M&),
259  const process::UPID& sender,
260  const std::string& data)
261  {
262  google::protobuf::Arena arena;
263  M* m = CHECK_NOTNULL(google::protobuf::Arena::CreateMessage<M>(&arena));
264  m->ParseFromString(data);
265 
266  if (m->IsInitialized()) {
267  (t->*method)(sender, *m);
268  } else {
269  LOG(WARNING) << "Initialization errors: "
270  << m->InitializationErrorString();
271  }
272  }
273 
274  template <typename M>
275  static void handlerMutM(
276  T* t,
277  void (T::*method)(const process::UPID&, M&&),
278  const process::UPID& sender,
279  const std::string& data)
280  {
281  M m;
282  m.ParseFromString(data);
283 
284  if (m.IsInitialized()) {
285  (t->*method)(sender, std::move(m));
286  } else {
287  LOG(WARNING) << "Initialization errors: "
288  << m.InitializationErrorString();
289  }
290  }
291 
292  static void handler0(
293  T* t,
294  void (T::*method)(const process::UPID&),
295  const process::UPID& sender,
296  const std::string& data)
297  {
298  (t->*method)(sender);
299  }
300 
301  template <typename M,
302  typename ...P, typename ...PC>
303  static void handlerN(
304  T* t,
305  void (T::*method)(const process::UPID&, PC...),
306  const process::UPID& sender,
307  const std::string& data,
309  {
310  google::protobuf::Arena arena;
311  M* m = CHECK_NOTNULL(google::protobuf::Arena::CreateMessage<M>(&arena));
312  m->ParseFromString(data);
313 
314  if (m->IsInitialized()) {
315  (t->*method)(sender, google::protobuf::convert((m->*p)())...);
316  } else {
317  LOG(WARNING) << "Initialization errors: "
318  << m->InitializationErrorString();
319  }
320  }
321 
322  // Handlers that ignore the sender.
323  template <typename M>
324  static void _handlerM(
325  T* t,
326  void (T::*method)(const M&),
327  const process::UPID&,
328  const std::string& data)
329  {
330  google::protobuf::Arena arena;
331  M* m = CHECK_NOTNULL(google::protobuf::Arena::CreateMessage<M>(&arena));
332  m->ParseFromString(data);
333 
334  if (m->IsInitialized()) {
335  (t->*method)(*m);
336  } else {
337  LOG(WARNING) << "Initialization errors: "
338  << m->InitializationErrorString();
339  }
340  }
341 
342  template <typename M>
343  static void _handlerMutM(
344  T* t,
345  void (T::*method)(M&&),
346  const process::UPID&,
347  const std::string& data)
348  {
349  M m;
350  m.ParseFromString(data);
351 
352  if (m.IsInitialized()) {
353  (t->*method)(std::move(m));
354  } else {
355  LOG(WARNING) << "Initialization errors: "
356  << m.InitializationErrorString();
357  }
358  }
359 
360  static void _handler0(
361  T* t,
362  void (T::*method)(),
363  const process::UPID&,
364  const std::string& data)
365  {
366  (t->*method)();
367  }
368 
369  template <typename M,
370  typename ...P, typename ...PC>
371  static void _handlerN(
372  T* t,
373  void (T::*method)(PC...),
374  const process::UPID&,
375  const std::string& data,
377  {
378  google::protobuf::Arena arena;
379  M* m = CHECK_NOTNULL(google::protobuf::Arena::CreateMessage<M>(&arena));
380  m->ParseFromString(data);
381 
382  if (m->IsInitialized()) {
383  (t->*method)(google::protobuf::convert((m->*p)())...);
384  } else {
385  LOG(WARNING) << "Initialization errors: "
386  << m->InitializationErrorString();
387  }
388  }
389 
390  typedef lambda::function<
391  void(const process::UPID&, const std::string&)> handler;
392  hashmap<std::string, handler> protobufHandlers;
393 
394  // Sender of "current" message, inaccessible by subclasses.
395  // This is only used for reply().
396  process::UPID from;
397 };
398 
399 
400 // Implements a process for sending protobuf "requests" to a process
401 // and waiting for a protobuf "response", but uses futures so that
402 // this can be done without needing to implement a process.
403 template <typename Req, typename Res>
404 class ReqResProcess : public ProtobufProcess<ReqResProcess<Req, Res>>
405 {
406 public:
407  ReqResProcess(const process::UPID& _pid, const Req& _req)
408  : process::ProcessBase(process::ID::generate("__req_res__")),
409  pid(_pid),
410  req(_req)
411  {
413  install<Res>(&ReqResProcess<Req, Res>::response);
414  }
415 
416  virtual ~ReqResProcess()
417  {
418  // Discard the promise.
419  promise.discard();
420  }
421 
423  {
424  promise.future().onDiscard(defer(this, &ReqResProcess::discarded));
425 
427 
428  return promise.future();
429  }
430 
431 private:
432  void discarded()
433  {
434  promise.discard();
435  process::terminate(this);
436  }
437 
438  void response(const Res& res)
439  {
440  promise.set(res);
441  process::terminate(this);
442  }
443 
444  const process::UPID pid;
445  const Req req;
447 };
448 
449 
450 // Allows you to describe request/response protocols and then use
451 // those for sending requests and getting back responses.
452 template <typename Req, typename Res>
453 struct Protocol
454 {
456  const process::UPID& pid,
457  const Req& req) const
458  {
459  // Help debugging by adding some "type constraints".
460  { Req* req = nullptr; google::protobuf::Message* m = req; (void)m; }
461  { Res* res = nullptr; google::protobuf::Message* m = res; (void)m; }
462 
464  process::spawn(process, true);
466  }
467 };
468 
469 #endif // __PROCESS_PROTOBUF_HPP__
std::string generate(const std::string &prefix="")
Returns &#39;prefix(N)&#39; where N represents the number of instances where the same prefix (wrt...
process::Future< Res > operator()(const process::UPID &pid, const Req &req) const
Definition: protobuf.hpp:455
ReqResProcess(const process::UPID &_pid, const Req &_req)
Definition: protobuf.hpp:407
void install(void(T::*method)(const process::UPID &, const M &))
Definition: protobuf.hpp:136
virtual ~ReqResProcess()
Definition: protobuf.hpp:416
void install(void(T::*method)(const M &))
Definition: protobuf.hpp:196
void install(void(T::*method)())
Definition: protobuf.hpp:220
P(M::*)() const MessageProperty
Definition: protobuf.hpp:160
UPID spawn(ProcessBase *process, bool manage=false)
Spawn a new process.
void terminate(const UPID &pid, bool inject=true)
Sends a TerminateEvent to the given process.
void send(const process::UPID &to, const google::protobuf::Message &message)
Definition: protobuf.hpp:118
void reply(const google::protobuf::Message &message)
Definition: protobuf.hpp:128
ssize_t send(const os::WindowsFD &fd, const void *buf, size_t len, int flags)
Definition: socket.hpp:162
Definition: protobuf.hpp:404
void send(const UPID &to, const std::string &name, const char *data=nullptr, size_t length=0)
Sends the message to the specified UPID.
void dispatch(const PID< T > &pid, void(T::*method)())
Definition: dispatch.hpp:174
virtual ~ProtobufProcess()
Definition: protobuf.hpp:103
void post(const UPID &to, const std::string &name, const char *data=nullptr, size_t length=0)
Sends a message with data without a return address.
void install(void(T::*method)(const process::UPID &, PC...), MessageProperty< M, P >...param)
Definition: protobuf.hpp:176
void install(void(T::*method)(M &&))
Definition: protobuf.hpp:208
An "untyped" PID, used to encapsulate the process ID for lower-layer abstractions (eg...
Definition: pid.hpp:39
void consume(process::MessageEvent &&event) override
Definition: protobuf.hpp:106
void consume(MessageEvent &&event) override
Protocol< PromiseRequest, PromiseResponse > promise
Definition: protobuf.hpp:100
Definition: protobuf.hpp:55
Definition: protobuf.hpp:453
const T & convert(const T &t)
Definition: protobuf.hpp:74
process::Future< Res > run()
Definition: protobuf.hpp:422
Definition: executor.hpp:47
jobject convert(JNIEnv *env, const T &t)
void install(void(T::*method)(const process::UPID &, M &&))
Definition: protobuf.hpp:148
Definition: event.hpp:103
void discarded(Future< U > future)
Definition: protobuf.hpp:68
Try< Nothing > bind(int_fd s, const Address &address)
Definition: network.hpp:46
Definition: process.hpp:501
void install(void(T::*method)(const process::UPID &))
Definition: protobuf.hpp:163
Deferred< void()> defer(const PID< T > &pid, void(T::*method)())
Definition: defer.hpp:35
void install(void(T::*method)(PC...), MessageProperty< M, P >...param)
Definition: protobuf.hpp:233