13 #ifndef __PROCESS_PROTOBUF_HPP__ 14 #define __PROCESS_PROTOBUF_HPP__ 16 #include <glog/logging.h> 18 #include <google/protobuf/arena.h> 19 #include <google/protobuf/message.h> 20 #include <google/protobuf/repeated_field.h> 39 const google::protobuf::Message& message)
42 if (message.SerializeToString(&data)) {
43 post(to, message.GetTypeName(), data.data(), data.size());
45 LOG(ERROR) <<
"Failed to post '" << message.GetTypeName() <<
"' to " 46 << to <<
": Failed to serialize";
53 const google::protobuf::Message& message)
56 if (message.SerializeToString(&data)) {
57 post(from, to, message.GetTypeName(), data.data(), data.size());
59 LOG(ERROR) <<
"Failed to post '" << message.GetTypeName() <<
"' to " 60 << to <<
": Failed to serialize";
89 std::vector<T>
convert(
const google::protobuf::RepeatedPtrField<T>& items)
91 return std::vector<T>(items.begin(), items.end());
96 std::vector<T>
convert(google::protobuf::RepeatedPtrField<T>&& items)
98 return std::vector<T>(
99 std::make_move_iterator(items.begin()),
100 std::make_move_iterator(items.end()));
107 template <
typename T>
116 if (protobufHandlers.count(event.message.name) > 0) {
117 from =
event.message.from;
118 protobufHandlers[
event.message.name](
119 event.message.from,
event.message.body);
127 const google::protobuf::Message& message)
130 if (message.SerializeToString(&data)) {
133 LOG(ERROR) <<
"Failed to send '" << message.GetTypeName() <<
"' to " 134 << to <<
": Failed to serialize";
140 void reply(
const google::protobuf::Message& message)
142 CHECK(from) <<
"Attempting to reply without a sender";
147 template <
typename M>
150 google::protobuf::Message* m =
new M();
151 T* t =
static_cast<T*
>(
this);
152 protobufHandlers[m->GetTypeName()] =
155 lambda::_1, lambda::_2);
159 template <
typename M>
162 google::protobuf::Message* m =
new M();
163 T* t =
static_cast<T*
>(
this);
164 protobufHandlers[m->GetTypeName()] =
167 lambda::_1, lambda::_2);
171 template <
typename M,
typename P>
174 template <
typename M>
177 google::protobuf::Message* m =
new M();
178 T* t =
static_cast<T*
>(
this);
179 protobufHandlers[m->GetTypeName()] =
182 lambda::_1, lambda::_2);
186 template <
typename M,
187 typename ...P,
typename ...PC>
192 google::protobuf::Message* m =
new M();
193 T* t =
static_cast<T*
>(
this);
194 protobufHandlers[m->GetTypeName()] =
202 lambda::_1, lambda::_2, param...);
207 template <
typename M>
210 google::protobuf::Message* m =
new M();
211 T* t =
static_cast<T*
>(
this);
212 protobufHandlers[m->GetTypeName()] =
215 lambda::_1, lambda::_2);
219 template <
typename M>
222 google::protobuf::Message* m =
new M();
223 T* t =
static_cast<T*
>(
this);
224 protobufHandlers[m->GetTypeName()] =
227 lambda::_1, lambda::_2);
231 template <
typename M>
234 google::protobuf::Message* m =
new M();
235 T* t =
static_cast<T*
>(
this);
236 protobufHandlers[m->GetTypeName()] =
239 lambda::_1, lambda::_2);
243 template <
typename M,
244 typename ...P,
typename ...PC>
246 void (T::*method)(PC...),
249 google::protobuf::Message* m =
new M();
250 T* t =
static_cast<T*
>(
this);
251 protobufHandlers[m->GetTypeName()] =
259 lambda::_1, lambda::_2, param...);
267 template <
typename M>
268 static void handlerM(
272 const std::string& data)
274 google::protobuf::Arena arena;
275 M* m = CHECK_NOTNULL(google::protobuf::Arena::CreateMessage<M>(&arena));
277 if (m->ParseFromString(data)) {
278 (t->*method)(sender, *m);
280 LOG(ERROR) <<
"Failed to deserialize '" << m->GetTypeName()
281 <<
"' from " << sender;
285 template <
typename M>
286 static void handlerMutM(
290 const std::string& data)
294 if (m.ParseFromString(data)) {
295 (t->*method)(sender, std::move(m));
297 LOG(ERROR) <<
"Failed to deserialize '" << m.GetTypeName()
298 <<
"' from " << sender;
302 static void handler0(
306 const std::string& data)
308 (t->*method)(sender);
311 template <
typename M,
312 typename ...P,
typename ...PC>
313 static void handlerN(
317 const std::string& data,
320 google::protobuf::Arena arena;
321 M* m = CHECK_NOTNULL(google::protobuf::Arena::CreateMessage<M>(&arena));
323 if (m->ParseFromString(data)) {
326 LOG(ERROR) <<
"Failed to deserialize '" << m->GetTypeName()
327 <<
"' from " << sender;
332 template <
typename M>
333 static void _handlerM(
335 void (T::*method)(
const M&),
337 const std::string& data)
339 google::protobuf::Arena arena;
340 M* m = CHECK_NOTNULL(google::protobuf::Arena::CreateMessage<M>(&arena));
342 if (m->ParseFromString(data)) {
345 LOG(ERROR) <<
"Failed to deserialize '" << m->GetTypeName()
346 <<
"' from " << sender;
350 template <
typename M>
351 static void _handlerMutM(
353 void (T::*method)(M&&),
355 const std::string& data)
359 if (m.ParseFromString(data)) {
360 (t->*method)(std::move(m));
362 LOG(ERROR) <<
"Failed to deserialize '" << m.GetTypeName()
363 <<
"' from " << sender;
367 static void _handler0(
371 const std::string& data)
376 template <
typename M,
377 typename ...P,
typename ...PC>
378 static void _handlerN(
380 void (T::*method)(PC...),
382 const std::string& data,
385 google::protobuf::Arena arena;
386 M* m = CHECK_NOTNULL(google::protobuf::Arena::CreateMessage<M>(&arena));
388 if (m->ParseFromString(data)) {
391 LOG(ERROR) <<
"Failed to deserialize '" << m->GetTypeName()
392 <<
"' from " << sender;
396 typedef lambda::function<
409 template <
typename Req,
typename Res>
430 promise.future().onDiscard(
defer(
this, &ReqResProcess::discarded));
444 void response(
const Res& res)
458 template <
typename Req,
typename Res>
463 const Req& req)
const 466 { Req* req =
nullptr; google::protobuf::Message* m = req; (void)m; }
467 { Res* res =
nullptr; google::protobuf::Message* m = res; (void)m; }
475 #endif // __PROCESS_PROTOBUF_HPP__ std::string generate(const std::string &prefix="")
Returns 'prefix(N)' where N represents the number of instances where the same prefix (wrt...
~ReqResProcess() override
Definition: protobuf.hpp:422
process::Future< Res > operator()(const process::UPID &pid, const Req &req) const
Definition: protobuf.hpp:461
ReqResProcess(const process::UPID &_pid, const Req &_req)
Definition: protobuf.hpp:413
void install(void(T::*method)(const process::UPID &, const M &))
Definition: protobuf.hpp:148
void install(void(T::*method)(const M &))
Definition: protobuf.hpp:208
void install(void(T::*method)())
Definition: protobuf.hpp:232
P(M::*)() const MessageProperty
Definition: protobuf.hpp:172
UPID spawn(ProcessBase *process, bool manage=false)
Spawn a new process.
void terminate(const UPID &pid, bool inject=true)
Sends a TerminateEvent to the given process.
~ProtobufProcess() override
Definition: protobuf.hpp:111
void send(const process::UPID &to, const google::protobuf::Message &message)
Definition: protobuf.hpp:126
void reply(const google::protobuf::Message &message)
Definition: protobuf.hpp:140
Definition: protobuf.hpp:410
void send(const UPID &to, const std::string &name, const char *data=nullptr, size_t length=0)
Sends the message to the specified UPID.
void dispatch(const PID< T > &pid, void(T::*method)())
Definition: dispatch.hpp:174
void post(const UPID &to, const std::string &name, const char *data=nullptr, size_t length=0)
Sends a message with data without a return address.
void install(void(T::*method)(const process::UPID &, PC...), MessageProperty< M, P >...param)
Definition: protobuf.hpp:188
void install(void(T::*method)(M &&))
Definition: protobuf.hpp:220
An "untyped" PID, used to encapsulate the process ID for lower-layer abstractions (eg...
Definition: pid.hpp:39
void consume(process::MessageEvent &&event) override
Definition: protobuf.hpp:114
void consume(MessageEvent &&event) override
Protocol< PromiseRequest, PromiseResponse > promise
Definition: protobuf.hpp:108
Definition: protobuf.hpp:61
Definition: protobuf.hpp:459
const T & convert(const T &t)
Definition: protobuf.hpp:82
process::Future< Res > run()
Definition: protobuf.hpp:428
Definition: executor.hpp:48
jobject convert(JNIEnv *env, const T &t)
void install(void(T::*method)(const process::UPID &, M &&))
Definition: protobuf.hpp:160
Definition: event.hpp:103
void discarded(Future< U > future)
Type utilities for the protobuf library that are not specific to particular protobuf classes...
Definition: type_utils.hpp:552
Try< Nothing > bind(int_fd s, const Address &address)
Definition: network.hpp:46
Definition: process.hpp:505
void install(void(T::*method)(const process::UPID &))
Definition: protobuf.hpp:175
Deferred< void()> defer(const PID< T > &pid, void(T::*method)())
Definition: defer.hpp:35
void install(void(T::*method)(PC...), MessageProperty< M, P >...param)
Definition: protobuf.hpp:245
Future< size_t > send(const int_fd &fd, const void *buf, size_t size)