Apache Mesos
recordio.hpp
Go to the documentation of this file.
1 // Licensed to the Apache Software Foundation (ASF) under one
2 // or more contributor license agreements. See the NOTICE file
3 // distributed with this work for additional information
4 // regarding copyright ownership. The ASF licenses this file
5 // to you under the Apache License, Version 2.0 (the
6 // "License"); you may not use this file except in compliance
7 // with the License. You may obtain a copy of the License at
8 //
9 // http://www.apache.org/licenses/LICENSE-2.0
10 //
11 // Unless required by applicable law or agreed to in writing, software
12 // distributed under the License is distributed on an "AS IS" BASIS,
13 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 // See the License for the specific language governing permissions and
15 // limitations under the License.
16 
17 #ifndef __COMMON_RECORDIO_HPP__
18 #define __COMMON_RECORDIO_HPP__
19 
20 #include <queue>
21 #include <string>
22 #include <utility>
23 
24 #include <mesos/mesos.hpp>
25 
26 #include <process/defer.hpp>
27 #include <process/dispatch.hpp>
28 #include <process/http.hpp>
29 #include <process/loop.hpp>
30 #include <process/owned.hpp>
31 #include <process/pid.hpp>
32 #include <process/process.hpp>
33 
34 #include <stout/lambda.hpp>
35 #include <stout/nothing.hpp>
36 #include <stout/recordio.hpp>
37 #include <stout/result.hpp>
38 
39 namespace mesos {
40 namespace internal {
41 namespace recordio {
42 
43 namespace internal {
44 template <typename T>
46 } // namespace internal {
47 
48 
61 template <typename T>
62 class Reader
63 {
64 public:
65  // We spawn `ReaderProcess` as a managed process to guarantee
66  // that it does not wait on itself (this would cause a deadlock!).
67  // See comments in `Connection::Data` for further details.
70  : process(process::spawn(
71  new internal::ReaderProcess<T>(std::move(decoder), reader),
72  true)) {}
73 
74  virtual ~Reader()
75  {
76  // Note that we pass 'false' here to avoid injecting the
77  // termination event at the front of the queue. This is
78  // to ensure we don't drop any queued request dispatches
79  // which would leave the caller with a future stuck in
80  // a pending state.
81  process::terminate(process, false);
82  }
83 
91  {
93  }
94 
95 private:
97 };
98 
99 
111 template <typename T>
113  process::Owned<Reader<T>>&& reader,
114  const std::function<std::string(const T&)>& func,
116 {
117  return process::loop(
118  None(),
119  [=]() {
120  return reader->read();
121  },
122  [=](const Result<T>& record) mutable
124  // This could happen if EOF is sent by the writer.
125  if (record.isNone()) {
126  return process::Break();
127  }
128 
129  // This could happen if there is a de-serialization error.
130  if (record.isError()) {
131  return process::Failure(record.error());
132  }
133 
134  // TODO(vinod): Instead of detecting that the reader went away only
135  // after attempting a write, leverage `writer.readerClosed` future.
136  if (!writer.write(func(record.get()))) {
137  return process::Failure("Write failed to the pipe");
138  }
139 
140  return process::Continue();
141  });
142 }
143 
144 
145 namespace internal {
146 
147 template <typename T>
148 class ReaderProcess : public process::Process<ReaderProcess<T>>
149 {
150 public:
152  ::recordio::Decoder<T>&& _decoder,
154  : process::ProcessBase(process::ID::generate("__reader__")),
155  decoder(_decoder),
156  reader(_reader),
157  done(false) {}
158 
159  virtual ~ReaderProcess() {}
160 
162  {
163  if (!records.empty()) {
164  Result<T> record = std::move(records.front());
165  records.pop();
166  return record;
167  }
168 
169  if (error.isSome()) {
170  return process::Failure(error.get().message);
171  }
172 
173  if (done) {
174  return None();
175  }
176 
179  waiters.push(std::move(waiter));
180  return waiters.back()->future();
181  }
182 
183 protected:
184  virtual void initialize() override
185  {
186  consume();
187  }
188 
189  virtual void finalize() override
190  {
191  // Fail any remaining waiters.
192  fail("Reader is terminating");
193  }
194 
195 private:
196  void fail(const std::string& message)
197  {
198  error = Error(message);
199 
200  while (!waiters.empty()) {
201  waiters.front()->fail(message);
202  waiters.pop();
203  }
204  }
205 
206  void complete()
207  {
208  done = true;
209 
210  while (!waiters.empty()) {
211  waiters.front()->set(Result<T>::none());
212  waiters.pop();
213  }
214  }
215 
216  using process::Process<ReaderProcess<T>>::consume;
217 
218  void consume()
219  {
220  reader.read()
221  .onAny(process::defer(this, &ReaderProcess::_consume, lambda::_1));
222  }
223 
224  void _consume(const process::Future<std::string>& read)
225  {
226  if (!read.isReady()) {
227  fail("Pipe::Reader failure: " +
228  (read.isFailed() ? read.failure() : "discarded"));
229  return;
230  }
231 
232  // Have we reached EOF?
233  if (read.get().empty()) {
234  complete();
235  return;
236  }
237 
238  Try<std::deque<Try<T>>> decode = decoder.decode(read.get());
239 
240  if (decode.isError()) {
241  fail("Decoder failure: " + decode.error());
242  return;
243  }
244 
245  foreach (const Try<T>& record, decode.get()) {
246  if (!waiters.empty()) {
247  waiters.front()->set(Result<T>(std::move(record)));
248  waiters.pop();
249  } else {
250  records.push(std::move(record));
251  }
252  }
253 
254  consume();
255  }
256 
257  ::recordio::Decoder<T> decoder;
259 
260  std::queue<process::Owned<process::Promise<Result<T>>>> waiters;
261  std::queue<Result<T>> records;
262 
263  bool done;
264  Option<Error> error;
265 };
266 
267 } // namespace internal {
268 } // namespace recordio {
269 } // namespace internal {
270 } // namespace mesos {
271 
272 #endif // __COMMON_RECORDIO_HPP__
bool isReady() const
Definition: future.hpp:1231
std::string generate(const std::string &prefix="")
Returns &#39;prefix(N)&#39; where N represents the number of instances where the same prefix (wrt...
Definition: errorbase.hpp:35
ProcessBase(const std::string &id="")
const T & get() const
Definition: future.hpp:1310
ControlFlow< typename std::decay< T >::type >::Break Break(T &&t)
Definition: loop.hpp:237
Result< Classifier > decode(const Netlink< struct rtnl_cls > &cls)
Definition: try.hpp:34
Definition: future.hpp:664
virtual void finalize() override
Invoked when a process is terminated.
Definition: recordio.hpp:189
virtual ~ReaderProcess()
Definition: recordio.hpp:159
UPID spawn(ProcessBase *process, bool manage=false)
Spawn a new process.
Definition: result.hpp:40
void terminate(const UPID &pid, bool inject=true)
Sends a TerminateEvent to the given process.
virtual void initialize() override
Invoked when a process gets spawned.
Definition: recordio.hpp:184
bool isSome() const
Definition: option.hpp:115
process::Future< Result< T > > read()
Definition: recordio.hpp:161
void dispatch(const PID< T > &pid, void(T::*method)())
Definition: dispatch.hpp:174
Provides RecordIO decoding on top of an http::Pipe::Reader.
Definition: recordio.hpp:62
hashmap< std::string, MessageHandler > message
Definition: process.hpp:443
Definition: http.hpp:340
process::Future< Result< T > > read()
Returns the next piece of decoded data from the pipe.
Definition: recordio.hpp:90
bool write(std::string s)
Definition: future.hpp:73
struct ev_loop * loop
Definition: loop.hpp:456
const T & get() const &
Definition: option.hpp:118
virtual ~Reader()
Definition: recordio.hpp:74
static Try error(const E &e)
Definition: try.hpp:42
process::Future< Nothing > transform(process::Owned< Reader< T >> &&reader, const std::function< std::string(const T &)> &func, process::http::Pipe::Writer writer)
This is a helper function that reads records from a Reader, applies a transformation to the records a...
Definition: recordio.hpp:112
const std::string message
Definition: errorbase.hpp:45
Result< Process > process(pid_t pid)
Definition: freebsd.hpp:30
Future< std::string > read()
A &quot;process identifier&quot; used to uniquely identify a process when dispatching messages.
Definition: pid.hpp:279
Definition: none.hpp:27
Definition: loop.hpp:223
bool isError() const
Definition: try.hpp:71
Reader(::recordio::Decoder< T > &&decoder, process::http::Pipe::Reader reader)
Definition: recordio.hpp:68
const std::string & failure() const
Definition: future.hpp:1336
Definition: owned.hpp:35
Definition: process.hpp:493
Deferred< void()> defer(const PID< T > &pid, void(T::*method)())
Definition: defer.hpp:35
ReaderProcess(::recordio::Decoder< T > &&_decoder, process::http::Pipe::Reader _reader)
Definition: recordio.hpp:151
Given a decoding function for individual records, this provides decoding from &quot;Record-IO&quot; data into t...
Definition: recordio.hpp:82
Definition: http.hpp:302
const T & get() const
Definition: try.hpp:73
Definition: loop.hpp:163
bool isFailed() const
Definition: future.hpp:1245
Definition: future.hpp:57