13 #ifndef __DECODER_HPP__ 14 #define __DECODER_HPP__ 18 #define flags http_parser_flags 19 #include <http_parser.h> 22 #include <glog/logging.h> 37 #if !(HTTP_PARSER_VERSION_MAJOR >= 2) 38 #error HTTP Parser version >= 2 required. 44 namespace http_parsing {
60 : failure(false),
request(nullptr)
62 http_parser_settings_init(&settings);
64 settings.on_message_begin = &DataDecoder::on_message_begin;
65 settings.on_url = &DataDecoder::on_url;
66 settings.on_header_field = &DataDecoder::on_header_field;
67 settings.on_header_value = &DataDecoder::on_header_value;
68 settings.on_headers_complete = &DataDecoder::on_headers_complete;
69 settings.on_body = &DataDecoder::on_body;
70 settings.on_message_complete = &DataDecoder::on_message_complete;
71 settings.on_chunk_complete = &DataDecoder::on_chunk_complete;
72 settings.on_chunk_header = &DataDecoder::on_chunk_header;
74 http_parser_init(&parser, HTTP_REQUEST);
88 std::deque<http::Request*>
decode(
const char* data,
size_t length)
90 size_t parsed = http_parser_execute(&parser, &settings, data, length);
92 if (parsed != length) {
97 if (!requests.empty()) {
98 std::deque<http::Request*> result = requests;
103 return std::deque<http::Request*>();
112 static int on_message_begin(http_parser* p)
116 CHECK(!decoder->failure);
118 decoder->header = HEADER_FIELD;
119 decoder->field.clear();
120 decoder->value.clear();
121 decoder->query.clear();
122 decoder->url.clear();
124 CHECK(decoder->request ==
nullptr);
131 static int on_chunk_complete(http_parser* p)
136 static int on_chunk_header(http_parser* p)
141 static int on_url(http_parser* p,
const char* data,
size_t length)
144 CHECK_NOTNULL(decoder->request);
150 decoder->url.append(data, length);
155 static int on_header_field(http_parser* p,
const char* data,
size_t length)
158 CHECK_NOTNULL(decoder->request);
160 if (decoder->header != HEADER_FIELD) {
161 decoder->request->
headers[decoder->field] = decoder->value;
162 decoder->field.clear();
163 decoder->value.clear();
166 decoder->field.append(data, length);
167 decoder->header = HEADER_FIELD;
172 static int on_header_value(http_parser* p,
const char* data,
size_t length)
175 CHECK_NOTNULL(decoder->request);
176 decoder->value.append(data, length);
177 decoder->header = HEADER_VALUE;
181 static int on_headers_complete(http_parser* p)
185 CHECK_NOTNULL(decoder->request);
188 decoder->request->
headers[decoder->field] = decoder->value;
189 decoder->field.clear();
190 decoder->value.clear();
192 decoder->request->
method =
193 http_method_str((http_method) decoder->parser.method);
195 decoder->request->
keepAlive = http_should_keep_alive(&decoder->parser) != 0;
200 static int on_body(http_parser* p,
const char* data,
size_t length)
203 CHECK_NOTNULL(decoder->request);
204 decoder->request->
body.append(data, length);
208 static int on_message_complete(http_parser* p)
212 CHECK_NOTNULL(decoder->request);
217 http_parser_url_init(&url);
219 http_parser_parse_url(decoder->url.data(), decoder->url.size(), 0, &url);
221 if (parse_url != 0) {
222 decoder->failure =
true;
226 if (url.field_set & (1 << UF_PATH)) {
227 decoder->request->
url.
path = std::string(
228 decoder->url.data() + url.field_data[UF_PATH].off,
229 url.field_data[UF_PATH].len);
232 if (url.field_set & (1 << UF_FRAGMENT)) {
234 decoder->url.data() + url.field_data[UF_FRAGMENT].off,
235 url.field_data[UF_FRAGMENT].len);
238 if (url.field_set & (1 << UF_QUERY)) {
239 decoder->query = std::string(
240 decoder->url.data() + url.field_data[UF_QUERY].off,
241 url.field_data[UF_QUERY].len);
248 if (decoded.isError()) {
249 decoder->failure =
true;
256 decoder->request->
headers.
get(
"Content-Encoding");
258 if (encoding.
isSome() && encoding.
get() ==
"gzip") {
261 decoder->failure =
true;
264 decoder->request->
body = decompressed.
get();
266 CHECK_LE(static_cast<long>(decoder->request->
body.length()),
269 decoder->request->
headers[
"Content-Length"] =
270 static_cast<char>(decoder->request->
body.length());
273 decoder->requests.push_back(decoder->request);
274 decoder->request =
nullptr;
281 http_parser_settings settings;
296 std::deque<http::Request*> requests;
304 : failure(false), header(HEADER_FIELD), response(nullptr)
306 http_parser_settings_init(&settings);
308 settings.on_message_begin = &ResponseDecoder::on_message_begin;
309 settings.on_url = &ResponseDecoder::on_url;
310 settings.on_header_field = &ResponseDecoder::on_header_field;
311 settings.on_header_value = &ResponseDecoder::on_header_value;
312 settings.on_headers_complete = &ResponseDecoder::on_headers_complete;
313 settings.on_body = &ResponseDecoder::on_body;
314 settings.on_message_complete = &ResponseDecoder::on_message_complete;
315 settings.on_status = &ResponseDecoder::on_status;
316 settings.on_chunk_complete = &ResponseDecoder::on_chunk_complete;
317 settings.on_chunk_header = &ResponseDecoder::on_chunk_header;
319 http_parser_init(&parser, HTTP_RESPONSE);
333 std::deque<http::Response*>
decode(
const char* data,
size_t length)
335 size_t parsed = http_parser_execute(&parser, &settings, data, length);
337 if (parsed != length) {
342 if (!responses.empty()) {
343 std::deque<http::Response*> result = responses;
348 return std::deque<http::Response*>();
357 static int on_message_begin(http_parser* p)
361 CHECK(!decoder->failure);
363 decoder->header = HEADER_FIELD;
364 decoder->field.clear();
365 decoder->value.clear();
367 CHECK(decoder->response ==
nullptr);
370 decoder->response->
status.clear();
371 decoder->response->
headers.clear();
373 decoder->response->
body.clear();
374 decoder->response->
path.clear();
379 static int on_chunk_complete(http_parser* p)
384 static int on_chunk_header(http_parser* p)
389 static int on_url(http_parser* p,
const char* data,
size_t length)
394 static int on_header_field(http_parser* p,
const char* data,
size_t length)
397 CHECK_NOTNULL(decoder->response);
399 if (decoder->header != HEADER_FIELD) {
400 decoder->response->
headers[decoder->field] = decoder->value;
401 decoder->field.clear();
402 decoder->value.clear();
405 decoder->field.append(data, length);
406 decoder->header = HEADER_FIELD;
411 static int on_header_value(http_parser* p,
const char* data,
size_t length)
414 CHECK_NOTNULL(decoder->response);
415 decoder->value.append(data, length);
416 decoder->header = HEADER_VALUE;
420 static int on_headers_complete(http_parser* p)
424 CHECK_NOTNULL(decoder->response);
427 decoder->response->
headers[decoder->field] = decoder->value;
428 decoder->field.clear();
429 decoder->value.clear();
434 static int on_body(http_parser* p,
const char* data,
size_t length)
437 CHECK_NOTNULL(decoder->response);
438 decoder->response->
body.append(data, length);
442 static int on_message_complete(http_parser* p)
446 CHECK_NOTNULL(decoder->response);
449 decoder->response->
code = decoder->parser.status_code;
451 decoder->response->
status =
454 decoder->failure =
true;
460 decoder->response->
headers.
get(
"Content-Encoding");
461 if (encoding.
isSome() && encoding.
get() ==
"gzip") {
464 decoder->failure =
true;
467 decoder->response->
body = decompressed.
get();
469 CHECK_LE(static_cast<long>(decoder->response->
body.length()),
472 decoder->response->
headers[
"Content-Length"] =
473 static_cast<char>(decoder->response->
body.length());
476 decoder->responses.push_back(decoder->response);
477 decoder->response =
nullptr;
481 static int on_status(http_parser* p,
const char* data,
size_t length)
489 http_parser_settings settings;
502 std::deque<http::Response*> responses;
517 : failure(false), header(HEADER_FIELD), response(nullptr)
519 http_parser_settings_init(&settings);
521 settings.on_message_begin =
522 &StreamingResponseDecoder::on_message_begin;
524 &StreamingResponseDecoder::on_url;
525 settings.on_header_field =
526 &StreamingResponseDecoder::on_header_field;
527 settings.on_header_value =
528 &StreamingResponseDecoder::on_header_value;
529 settings.on_headers_complete =
530 &StreamingResponseDecoder::on_headers_complete;
532 &StreamingResponseDecoder::on_body;
533 settings.on_message_complete =
534 &StreamingResponseDecoder::on_message_complete;
536 &StreamingResponseDecoder::on_status;
537 settings.on_chunk_complete =
538 &StreamingResponseDecoder::on_chunk_complete;
539 settings.on_chunk_header =
540 &StreamingResponseDecoder::on_chunk_header;
542 http_parser_init(&parser, HTTP_RESPONSE);
551 if (writer.isSome()) {
552 writer->fail(
"Decoder is being deleted");
560 std::deque<http::Response*>
decode(
const char* data,
size_t length)
562 size_t parsed = http_parser_execute(&parser, &settings, data, length);
564 if (parsed != length) {
569 if (writer.isSome()) {
571 writer_.
fail(
"failed to decode body");
576 if (!responses.empty()) {
577 std::deque<http::Response*> result = responses;
582 return std::deque<http::Response*>();
594 return writer.isSome();
598 static int on_message_begin(http_parser* p)
602 CHECK(!decoder->failure);
604 decoder->header = HEADER_FIELD;
605 decoder->field.clear();
606 decoder->value.clear();
608 CHECK(decoder->response ==
nullptr);
613 decoder->writer =
None();
618 static int on_chunk_complete(http_parser* p)
623 static int on_chunk_header(http_parser* p)
628 static int on_status(http_parser* p,
const char* data,
size_t length)
633 static int on_url(http_parser* p,
const char* data,
size_t length)
638 static int on_header_field(http_parser* p,
const char* data,
size_t length)
644 if (decoder->response ==
nullptr) {
648 if (decoder->header != HEADER_FIELD) {
649 decoder->response->
headers[decoder->field] = decoder->value;
650 decoder->field.clear();
651 decoder->value.clear();
654 decoder->field.append(data, length);
655 decoder->header = HEADER_FIELD;
660 static int on_header_value(http_parser* p,
const char* data,
size_t length)
666 if (decoder->response ==
nullptr) {
670 decoder->value.append(data, length);
671 decoder->header = HEADER_VALUE;
675 static int on_headers_complete(http_parser* p)
681 CHECK_NOTNULL(decoder->response);
684 decoder->response->
headers[decoder->field] = decoder->value;
685 decoder->field.clear();
686 decoder->value.clear();
689 decoder->response->
code = decoder->parser.status_code;
691 decoder->response->
status =
694 decoder->failure =
true;
700 decoder->response->
headers.
get(
"Content-Encoding");
701 if (encoding.
isSome() && encoding.
get() ==
"gzip") {
702 decoder->failure =
true;
709 decoder->writer = pipe.
writer();
714 decoder->responses.push_back(decoder->response);
720 decoder->response =
nullptr;
725 static int on_body(http_parser* p,
const char* data,
size_t length)
732 writer.
write(std::string(data, length));
737 static int on_message_complete(http_parser* p)
743 if (decoder->writer.
isNone()) {
744 CHECK(decoder->failure);
751 decoder->writer =
None();
759 http_parser_settings settings;
773 std::deque<http::Response*> responses;
785 : failure(false), header(HEADER_FIELD),
request(nullptr)
787 http_parser_settings_init(&settings);
789 settings.on_message_begin =
790 &StreamingRequestDecoder::on_message_begin;
792 &StreamingRequestDecoder::on_url;
793 settings.on_header_field =
794 &StreamingRequestDecoder::on_header_field;
795 settings.on_header_value =
796 &StreamingRequestDecoder::on_header_value;
797 settings.on_headers_complete =
798 &StreamingRequestDecoder::on_headers_complete;
800 &StreamingRequestDecoder::on_body;
801 settings.on_message_complete =
802 &StreamingRequestDecoder::on_message_complete;
803 settings.on_chunk_complete =
804 &StreamingRequestDecoder::on_chunk_complete;
805 settings.on_chunk_header =
806 &StreamingRequestDecoder::on_chunk_header;
808 http_parser_init(&parser, HTTP_REQUEST);
817 if (writer.isSome()) {
818 writer->fail(
"Decoder is being deleted");
826 std::deque<http::Request*>
decode(
const char* data,
size_t length)
828 size_t parsed = http_parser_execute(&parser, &settings, data, length);
829 if (parsed != length) {
834 if (writer.isSome()) {
836 writer_.
fail(
"failed to decode body");
841 if (!requests.empty()) {
842 std::deque<http::Request*> result = requests;
847 return std::deque<http::Request*>();
856 static int on_message_begin(http_parser* p)
860 CHECK(!decoder->failure);
862 decoder->header = HEADER_FIELD;
863 decoder->field.clear();
864 decoder->value.clear();
865 decoder->query.clear();
866 decoder->url.clear();
868 CHECK(decoder->request ==
nullptr);
873 decoder->writer =
None();
874 decoder->decompressor.
reset();
879 static int on_chunk_complete(http_parser* p)
884 static int on_chunk_header(http_parser* p)
889 static int on_url(http_parser* p,
const char* data,
size_t length)
894 if (decoder->request ==
nullptr) {
902 decoder->url.append(data, length);
907 static int on_header_field(http_parser* p,
const char* data,
size_t length)
913 if (decoder->request ==
nullptr) {
917 if (decoder->header != HEADER_FIELD) {
918 decoder->request->
headers[decoder->field] = decoder->value;
919 decoder->field.clear();
920 decoder->value.clear();
923 decoder->field.append(data, length);
924 decoder->header = HEADER_FIELD;
929 static int on_header_value(http_parser* p,
const char* data,
size_t length)
935 if (decoder->request ==
nullptr) {
939 decoder->value.append(data, length);
940 decoder->header = HEADER_VALUE;
944 static int on_headers_complete(http_parser* p)
950 CHECK_NOTNULL(decoder->request);
953 decoder->request->
headers[decoder->field] = decoder->value;
954 decoder->field.clear();
955 decoder->value.clear();
957 decoder->request->
method =
958 http_method_str((http_method) decoder->parser.method);
960 decoder->request->
keepAlive = http_should_keep_alive(&decoder->parser) != 0;
965 http_parser_url_init(&url);
967 http_parser_parse_url(decoder->url.data(), decoder->url.size(), 0, &url);
969 if (parse_url != 0) {
970 decoder->failure =
true;
974 if (url.field_set & (1 << UF_PATH)) {
975 decoder->request->
url.
path = std::string(
976 decoder->url.data() + url.field_data[UF_PATH].off,
977 url.field_data[UF_PATH].len);
980 if (url.field_set & (1 << UF_FRAGMENT)) {
982 decoder->url.data() + url.field_data[UF_FRAGMENT].off,
983 url.field_data[UF_FRAGMENT].len);
986 if (url.field_set & (1 << UF_QUERY)) {
987 decoder->query = std::string(
988 decoder->url.data() + url.field_data[UF_QUERY].off,
989 url.field_data[UF_QUERY].len);
996 if (decoded.isError()) {
997 decoder->failure =
true;
1001 decoder->request->
url.
query = std::move(decoded.get());
1004 decoder->request->
headers.
get(
"Content-Encoding");
1006 if (encoding.isSome() && encoding.get() ==
"gzip") {
1007 decoder->decompressor =
1014 decoder->writer = pipe.
writer();
1019 decoder->requests.push_back(decoder->request);
1025 decoder->request =
nullptr;
1030 static int on_body(http_parser* p,
const char* data,
size_t length)
1039 if (decoder->decompressor.
get() !=
nullptr) {
1041 decoder->decompressor->
decompress(std::string(data, length));
1044 decoder->failure =
true;
1048 body = std::move(decompressed.
get());
1050 body = std::string(data, length);
1053 writer.
write(std::move(body));
1058 static int on_message_complete(http_parser* p)
1064 if (decoder->writer.
isNone()) {
1065 CHECK(decoder->failure);
1071 if (decoder->decompressor.
get() !=
nullptr &&
1072 !decoder->decompressor->
finished()) {
1073 writer.
fail(
"Failed to decompress body");
1074 decoder->failure =
true;
1080 decoder->writer =
None();
1088 http_parser_settings settings;
1105 std::deque<http::Request*> requests;
1110 #endif // __DECODER_HPP__ ~DataDecoder()
Definition: decoder.hpp:79
Try< std::array< int, 2 > > pipe()
Definition: pipe.hpp:33
Future< Response > request(const Request &request, bool streamedResponse=false)
Asynchronously sends an HTTP request to the process and returns the HTTP response once the entire res...
T & get()&
Definition: try.hpp:80
URL url
Definition: http.hpp:544
void reset()
Definition: owned.hpp:132
std::string status
Definition: http.hpp:637
enum process::http::Request::@3 type
#define CHECK_NONE(expression)
Definition: check.hpp:54
constexpr int SUCCESS
Definition: decoder.hpp:49
Option< Pipe::Reader > reader
Definition: http.hpp:579
uint16_t code
Definition: http.hpp:674
bool writingBody() const
Definition: decoder.hpp:592
StreamingRequestDecoder()
Definition: decoder.hpp:784
T * get() const
Definition: owned.hpp:117
bool failed() const
Definition: decoder.hpp:351
bool failed() const
Definition: decoder.hpp:585
~StreamingResponseDecoder()
Definition: decoder.hpp:547
Try< hashmap< std::string, std::string > > decode(const std::string &query)
Decode a string that is Base64-encoded with the standard Base64 alphabet.
Definition: base64.hpp:183
bool isSome() const
Definition: option.hpp:116
Try< std::string > decompress(const std::string &compressed)
Definition: gzip.hpp:122
hashmap< std::string, std::string > query
Definition: http.hpp:189
std::deque< http::Request * > decode(const char *data, size_t length)
Definition: decoder.hpp:88
Option< Pipe::Reader > reader
Definition: http.hpp:672
std::string path
Definition: http.hpp:188
#define CHECK_SOME(expression)
Definition: check.hpp:50
std::string body
Definition: http.hpp:670
Definition: decoder.hpp:300
Option< T > max(const Option< T > &left, const Option< T > &right)
Definition: option.hpp:214
Try< std::string > decompress(const std::string &compressed)
Definition: gzip.hpp:243
~ResponseDecoder()
Definition: decoder.hpp:324
bool finished() const
Definition: gzip.hpp:161
bool write(std::string s)
StreamingResponseDecoder()
Definition: decoder.hpp:516
bool fail(const std::string &message)
const T & get() const &
Definition: option.hpp:119
bool keepAlive
Definition: http.hpp:558
Definition: decoder.hpp:513
bool failed() const
Definition: decoder.hpp:850
Option< std::string > fragment
Definition: http.hpp:190
~StreamingRequestDecoder()
Definition: decoder.hpp:813
std::deque< http::Response * > decode(const char *data, size_t length)
Definition: decoder.hpp:333
DataDecoder()
Definition: decoder.hpp:59
Definition: decoder.hpp:781
bool isError() const
Definition: try.hpp:78
bool failed() const
Definition: decoder.hpp:106
std::string path
Definition: http.hpp:671
std::string method
Definition: http.hpp:538
Definition: executor.hpp:48
std::string body
Definition: http.hpp:578
Definition: decoder.hpp:56
bool isNone() const
Definition: option.hpp:117
constexpr int FAILURE
Definition: decoder.hpp:50
mesos::v1::scheduler::Response Response
Definition: mesos.hpp:2854
std::deque< http::Response * > decode(const char *data, size_t length)
Definition: decoder.hpp:560
static std::string string(uint16_t code)
Headers headers
Definition: http.hpp:639
enum process::http::Response::@4 type
std::deque< http::Request * > decode(const char *data, size_t length)
Definition: decoder.hpp:826
bool isValidStatus(uint16_t code)
Option< Value > get(const Key &key) const
Definition: hashmap.hpp:121
Headers headers
Definition: http.hpp:546
ResponseDecoder()
Definition: decoder.hpp:303