17 #ifndef __COMMON_RECORDIO_HPP__ 18 #define __COMMON_RECORDIO_HPP__ 111 template <
typename T>
114 const std::function<std::string(
const T&)>& func,
120 return reader->read();
125 if (record.isNone()) {
130 if (record.isError()) {
136 if (!writer.
write(func(record.get()))) {
147 template <
typename T>
152 std::function<
Try<T>(
const std::string&)>&& _deserialize,
163 if (!records.empty()) {
164 Result<T> record = std::move(records.front());
169 if (
error.isSome()) {
179 waiters.push(std::move(waiter));
180 return waiters.back()->future();
192 fail(
"Reader is terminating");
196 void fail(
const std::string& message)
200 while (!waiters.empty()) {
201 waiters.front()->fail(message);
210 while (!waiters.empty()) {
221 .onAny(
process::defer(
this, &ReaderProcess::_consume, lambda::_1));
227 fail(
"Pipe::Reader failure: " +
241 fail(
"Decoder failure: " + decode.
error());
245 foreach (
const std::string& record, decode.
get()) {
248 if (!waiters.empty()) {
249 waiters.front()->set(std::move(t));
252 records.push(std::move(t));
260 std::function<Try<T>(
const std::string&)>
deserialize;
263 std::queue<process::Owned<process::Promise<Result<T>>>> waiters;
264 std::queue<Result<T>> records;
275 #endif // __COMMON_RECORDIO_HPP__ bool isReady() const
Definition: future.hpp:1215
std::string generate(const std::string &prefix="")
Returns 'prefix(N)' where N represents the number of instances where the same prefix (wrt...
Definition: errorbase.hpp:36
Provides facilities for "Record-IO" encoding of data.
Definition: recordio.hpp:55
T & get()&
Definition: try.hpp:80
const T & get() const
Definition: future.hpp:1294
ControlFlow< typename std::decay< T >::type >::Break Break(T &&t)
Definition: loop.hpp:237
Result< Classifier > decode(const Netlink< struct rtnl_cls > &cls)
Definition: future.hpp:668
Reader(std::function< Try< T >(const std::string &)> deserialize, process::http::Pipe::Reader reader)
Definition: recordio.hpp:68
Definition: recordio.hpp:45
void initialize() override
Invoked when a process gets spawned.
Definition: recordio.hpp:184
Definition: type_utils.hpp:619
ReaderProcess(std::function< Try< T >(const std::string &)> &&_deserialize, process::http::Pipe::Reader _reader)
Definition: recordio.hpp:151
UPID spawn(ProcessBase *process, bool manage=false)
Spawn a new process.
void terminate(const UPID &pid, bool inject=true)
Sends a TerminateEvent to the given process.
process::Future< Result< T > > read()
Definition: recordio.hpp:161
void dispatch(const PID< T > &pid, void(T::*method)())
Definition: dispatch.hpp:174
Provides RecordIO decoding on top of an http::Pipe::Reader.
Definition: recordio.hpp:62
Try< Message > deserialize(ContentType contentType, const std::string &body)
Definition: http.hpp:109
process::Future< Result< T > > read()
Returns the next piece of decoded data from the pipe.
Definition: recordio.hpp:90
bool write(std::string s)
Definition: future.hpp:74
~ReaderProcess() override
Definition: recordio.hpp:159
virtual ~Reader()
Definition: recordio.hpp:74
Future< V > loop(const Option< UPID > &pid, Iterate &&iterate, Body &&body)
Definition: loop.hpp:456
static Try error(const E &e)
Definition: try.hpp:43
void finalize() override
Invoked when a process is terminated.
Definition: recordio.hpp:189
process::Future< Nothing > transform(process::Owned< Reader< T >> &&reader, const std::function< std::string(const T &)> &func, process::http::Pipe::Writer writer)
This is a helper function that reads records from a Reader, applies a transformation to the records a...
Definition: recordio.hpp:112
Result< Process > process(pid_t pid)
Definition: freebsd.hpp:30
A "process identifier" used to uniquely identify a process when dispatching messages.
Definition: pid.hpp:289
Definition: attributes.hpp:24
bool isError() const
Definition: try.hpp:78
Result< Credentials > read(const Path &path)
Definition: credentials.hpp:35
std::string error(const std::string &msg, uint32_t code)
Definition: executor.hpp:48
const std::string & failure() const
Definition: future.hpp:1320
Definition: process.hpp:505
Deferred< void()> defer(const PID< T > &pid, void(T::*method)())
Definition: defer.hpp:35
Decodes records from "Record-IO" data (see above).
Definition: recordio.hpp:72
bool isFailed() const
Definition: future.hpp:1229
Definition: future.hpp:58