Apache Mesos
recordio.hpp
Go to the documentation of this file.
1 // Licensed under the Apache License, Version 2.0 (the "License");
2 // you may not use this file except in compliance with the License.
3 // You may obtain a copy of the License at
4 //
5 // http://www.apache.org/licenses/LICENSE-2.0
6 //
7 // Unless required by applicable law or agreed to in writing, software
8 // distributed under the License is distributed on an "AS IS" BASIS,
9 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
10 // See the License for the specific language governing permissions and
11 // limitations under the License.
12 
13 #ifndef __STOUT_RECORDIO_HPP__
14 #define __STOUT_RECORDIO_HPP__
15 
16 #include <stdlib.h>
17 
18 #include <deque>
19 #include <functional>
20 #include <string>
21 
22 #include <stout/check.hpp>
23 #include <stout/foreach.hpp>
24 #include <stout/numify.hpp>
25 #include <stout/option.hpp>
26 #include <stout/stringify.hpp>
27 #include <stout/try.hpp>
28 
50 namespace recordio {
51 
56 template <typename T>
57 class Encoder
58 {
59 public:
60  Encoder(std::function<std::string(const T&)> _serialize)
61  : serialize(_serialize) {}
62 
66  std::string encode(const T& record) const
67  {
68  std::string s = serialize(record);
69  return stringify(s.size()) + "\n" + s;
70  }
71 
72 private:
73  std::function<std::string(const T&)> serialize;
74 };
75 
76 
81 template <typename T>
82 class Decoder
83 {
84 public:
85  Decoder(std::function<Try<T>(const std::string&)> _deserialize)
86  : state(HEADER), deserialize(_deserialize) {}
87 
100  Try<std::deque<Try<T>>> decode(const std::string& data)
101  {
102  if (state == FAILED) {
103  return Error("Decoder is in a FAILED state");
104  }
105 
106  std::deque<Try<T>> records;
107 
108  foreach (char c, data) {
109  if (state == HEADER) {
110  // Keep reading until we have the entire header.
111  if (c != '\n') {
112  buffer += c;
113  continue;
114  }
115 
116  Try<size_t> numify = ::numify<size_t>(buffer);
117 
118  // If we were unable to decode the length header, do not
119  // continue decoding since we cannot determine where to
120  // pick up the next length header!
121  if (numify.isError()) {
122  state = FAILED;
123  return Error("Failed to decode length '" + buffer + "': " +
124  numify.error());
125  }
126 
127  length = numify.get();
128  buffer.clear();
129  state = RECORD;
130 
131  // Note that for 0 length records, we immediately decode.
132  if (numify.get() <= 0) {
133  records.push_back(deserialize(buffer));
134  state = HEADER;
135  }
136  } else if (state == RECORD) {
137  CHECK_SOME(length);
138  CHECK_LT(buffer.size(), length.get());
139 
140  buffer += c;
141 
142  if (buffer.size() == length.get()) {
143  records.push_back(deserialize(buffer));
144  buffer.clear();
145  state = HEADER;
146  }
147  }
148  }
149 
150  return records;
151  }
152 
153 private:
154  enum
155  {
156  HEADER,
157  RECORD,
158  FAILED
159  } state;
160 
161  // TODO(bmahler): Avoid string here as it will not free
162  // its underlying memory allocation when we clear it.
163  std::string buffer;
164  Option<size_t> length;
165 
166  std::function<Try<T>(const std::string&)> deserialize;
167 };
168 
169 } // namespace recordio {
170 
171 #endif // __STOUT_RECORDIO_HPP__
Definition: errorbase.hpp:36
Provides facilities for "Record-IO" encoding of data.
Definition: recordio.hpp:50
T & get()&
Definition: try.hpp:73
Definition: check.hpp:33
Try< T > numify(const std::string &s)
Definition: numify.hpp:29
#define CHECK_SOME(expression)
Definition: check.hpp:50
Try< Message > deserialize(ContentType contentType, const std::string &body)
Definition: http.hpp:107
std::string encode(const T &record) const
Returns the "Record-IO" encoded record.
Definition: recordio.hpp:66
static Try error(const E &e)
Definition: try.hpp:42
bool isError() const
Definition: try.hpp:71
Try< std::deque< Try< T > > > decode(const std::string &data)
Decodes another chunk of data from the "Record-IO" stream and returns the attempted decoding of any a...
Definition: recordio.hpp:100
Decoder(std::function< Try< T >(const std::string &)> _deserialize)
Definition: recordio.hpp:85
Given an encoding function for individual records, this provides encoding from typed records into "Re...
Definition: recordio.hpp:57
std::string stringify(int flags)
Given a decoding function for individual records, this provides decoding from "Record-IO" data into t...
Definition: recordio.hpp:82
Encoder(std::function< std::string(const T &)> _serialize)
Definition: recordio.hpp:60