Apache Mesos
recordio.hpp
Go to the documentation of this file.
1 // Licensed to the Apache Software Foundation (ASF) under one
2 // or more contributor license agreements. See the NOTICE file
3 // distributed with this work for additional information
4 // regarding copyright ownership. The ASF licenses this file
5 // to you under the Apache License, Version 2.0 (the
6 // "License"); you may not use this file except in compliance
7 // with the License. You may obtain a copy of the License at
8 //
9 // http://www.apache.org/licenses/LICENSE-2.0
10 //
11 // Unless required by applicable law or agreed to in writing, software
12 // distributed under the License is distributed on an "AS IS" BASIS,
13 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 // See the License for the specific language governing permissions and
15 // limitations under the License.
16 
17 #ifndef __COMMON_RECORDIO_HPP__
18 #define __COMMON_RECORDIO_HPP__
19 
20 #include <queue>
21 #include <string>
22 #include <utility>
23 
24 #include <mesos/mesos.hpp>
25 
26 #include <process/defer.hpp>
27 #include <process/dispatch.hpp>
28 #include <process/http.hpp>
29 #include <process/loop.hpp>
30 #include <process/owned.hpp>
31 #include <process/pid.hpp>
32 #include <process/process.hpp>
33 
34 #include <stout/lambda.hpp>
35 #include <stout/nothing.hpp>
36 #include <stout/recordio.hpp>
37 #include <stout/result.hpp>
38 
39 namespace mesos {
40 namespace internal {
41 namespace recordio {
42 
43 namespace internal {
44 template <typename T>
46 } // namespace internal {
47 
48 
61 template <typename T>
62 class Reader
63 {
64 public:
65  // We spawn `ReaderProcess` as a managed process to guarantee
66  // that it does not wait on itself (this would cause a deadlock!).
67  // See comments in `Connection::Data` for further details.
68  Reader(std::function<Try<T>(const std::string&)> deserialize,
71  new internal::ReaderProcess<T>(std::move(deserialize), reader),
72  true)) {}
73 
74  virtual ~Reader()
75  {
76  // Note that we pass 'false' here to avoid injecting the
77  // termination event at the front of the queue. This is
78  // to ensure we don't drop any queued request dispatches
79  // which would leave the caller with a future stuck in
80  // a pending state.
82  }
83 
91  {
93  }
94 
95 private:
97 };
98 
99 
111 template <typename T>
113  process::Owned<Reader<T>>&& reader,
114  const std::function<std::string(const T&)>& func,
116 {
117  return process::loop(
118  None(),
119  [=]() {
120  return reader->read();
121  },
122  [=](const Result<T>& record) mutable
124  // This could happen if EOF is sent by the writer.
125  if (record.isNone()) {
126  return process::Break();
127  }
128 
129  // This could happen if there is a de-serialization error.
130  if (record.isError()) {
131  return process::Failure(record.error());
132  }
133 
134  // TODO(vinod): Instead of detecting that the reader went away only
135  // after attempting a write, leverage `writer.readerClosed` future.
136  if (!writer.write(func(record.get()))) {
137  return process::Failure("Write failed to the pipe");
138  }
139 
140  return process::Continue();
141  });
142 }
143 
144 
145 namespace internal {
146 
147 template <typename T>
148 class ReaderProcess : public process::Process<ReaderProcess<T>>
149 {
150 public:
152  std::function<Try<T>(const std::string&)>&& _deserialize,
154  : process::ProcessBase(process::ID::generate("__reader__")),
155  deserialize(_deserialize),
156  reader(_reader),
157  done(false) {}
158 
159  ~ReaderProcess() override {}
160 
162  {
163  if (!records.empty()) {
164  Result<T> record = std::move(records.front());
165  records.pop();
166  return record;
167  }
168 
169  if (error.isSome()) {
170  return process::Failure(error->message);
171  }
172 
173  if (done) {
174  return None();
175  }
176 
179  waiters.push(std::move(waiter));
180  return waiters.back()->future();
181  }
182 
183 protected:
184  void initialize() override
185  {
186  consume();
187  }
188 
189  void finalize() override
190  {
191  // Fail any remaining waiters.
192  fail("Reader is terminating");
193  }
194 
195 private:
196  void fail(const std::string& message)
197  {
198  error = Error(message);
199 
200  while (!waiters.empty()) {
201  waiters.front()->fail(message);
202  waiters.pop();
203  }
204  }
205 
206  void complete()
207  {
208  done = true;
209 
210  while (!waiters.empty()) {
211  waiters.front()->set(Result<T>::none());
212  waiters.pop();
213  }
214  }
215 
216  using process::Process<ReaderProcess<T>>::consume;
217 
218  void consume()
219  {
220  reader.read()
221  .onAny(process::defer(this, &ReaderProcess::_consume, lambda::_1));
222  }
223 
224  void _consume(const process::Future<std::string>& read)
225  {
226  if (!read.isReady()) {
227  fail("Pipe::Reader failure: " +
228  (read.isFailed() ? read.failure() : "discarded"));
229  return;
230  }
231 
232  // Have we reached EOF?
233  if (read->empty()) {
234  complete();
235  return;
236  }
237 
238  Try<std::deque<std::string>> decode = decoder.decode(read.get());
239 
240  if (decode.isError()) {
241  fail("Decoder failure: " + decode.error());
242  return;
243  }
244 
245  foreach (const std::string& record, decode.get()) {
246  Result<T> t = deserialize(record);
247 
248  if (!waiters.empty()) {
249  waiters.front()->set(std::move(t));
250  waiters.pop();
251  } else {
252  records.push(std::move(t));
253  }
254  }
255 
256  consume();
257  }
258 
259  ::recordio::Decoder decoder;
260  std::function<Try<T>(const std::string&)> deserialize;
262 
263  std::queue<process::Owned<process::Promise<Result<T>>>> waiters;
264  std::queue<Result<T>> records;
265 
266  bool done;
268 };
269 
270 } // namespace internal {
271 } // namespace recordio {
272 } // namespace internal {
273 } // namespace mesos {
274 
275 #endif // __COMMON_RECORDIO_HPP__
bool isReady() const
Definition: future.hpp:1215
std::string generate(const std::string &prefix="")
Returns &#39;prefix(N)&#39; where N represents the number of instances where the same prefix (wrt...
Definition: errorbase.hpp:36
Provides facilities for "Record-IO" encoding of data.
Definition: recordio.hpp:55
T & get()&
Definition: try.hpp:80
const T & get() const
Definition: future.hpp:1294
ControlFlow< typename std::decay< T >::type >::Break Break(T &&t)
Definition: loop.hpp:237
Result< Classifier > decode(const Netlink< struct rtnl_cls > &cls)
Definition: check.hpp:33
Definition: future.hpp:668
Reader(std::function< Try< T >(const std::string &)> deserialize, process::http::Pipe::Reader reader)
Definition: recordio.hpp:68
void initialize() override
Invoked when a process gets spawned.
Definition: recordio.hpp:184
Definition: type_utils.hpp:619
ReaderProcess(std::function< Try< T >(const std::string &)> &&_deserialize, process::http::Pipe::Reader _reader)
Definition: recordio.hpp:151
UPID spawn(ProcessBase *process, bool manage=false)
Spawn a new process.
Definition: check.hpp:30
void terminate(const UPID &pid, bool inject=true)
Sends a TerminateEvent to the given process.
process::Future< Result< T > > read()
Definition: recordio.hpp:161
void dispatch(const PID< T > &pid, void(T::*method)())
Definition: dispatch.hpp:174
Provides RecordIO decoding on top of an http::Pipe::Reader.
Definition: recordio.hpp:62
Try< Message > deserialize(ContentType contentType, const std::string &body)
Definition: http.hpp:109
Definition: http.hpp:354
process::Future< Result< T > > read()
Returns the next piece of decoded data from the pipe.
Definition: recordio.hpp:90
bool write(std::string s)
Definition: agent.hpp:25
Definition: future.hpp:74
~ReaderProcess() override
Definition: recordio.hpp:159
virtual ~Reader()
Definition: recordio.hpp:74
Future< V > loop(const Option< UPID > &pid, Iterate &&iterate, Body &&body)
Definition: loop.hpp:456
static Try error(const E &e)
Definition: try.hpp:43
void finalize() override
Invoked when a process is terminated.
Definition: recordio.hpp:189
process::Future< Nothing > transform(process::Owned< Reader< T >> &&reader, const std::function< std::string(const T &)> &func, process::http::Pipe::Writer writer)
This is a helper function that reads records from a Reader, applies a transformation to the records a...
Definition: recordio.hpp:112
Result< Process > process(pid_t pid)
Definition: freebsd.hpp:30
A "process identifier" used to uniquely identify a process when dispatching messages.
Definition: pid.hpp:289
Definition: none.hpp:27
Definition: attributes.hpp:24
Definition: loop.hpp:223
bool isError() const
Definition: try.hpp:78
Result< Credentials > read(const Path &path)
Definition: credentials.hpp:35
std::string error(const std::string &msg, uint32_t code)
Definition: executor.hpp:48
const std::string & failure() const
Definition: future.hpp:1320
Definition: owned.hpp:36
Definition: process.hpp:505
Deferred< void()> defer(const PID< T > &pid, void(T::*method)())
Definition: defer.hpp:35
Decodes records from "Record-IO" data (see above).
Definition: recordio.hpp:72
Definition: http.hpp:315
Definition: loop.hpp:163
bool isFailed() const
Definition: future.hpp:1229
Definition: future.hpp:58