Apache Mesos
decoder.hpp
Go to the documentation of this file.
1 // Licensed under the Apache License, Version 2.0 (the "License");
2 // you may not use this file except in compliance with the License.
3 // You may obtain a copy of the License at
4 //
5 // http://www.apache.org/licenses/LICENSE-2.0
6 //
7 // Unless required by applicable law or agreed to in writing, software
8 // distributed under the License is distributed on an "AS IS" BASIS,
9 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
10 // See the License for the specific language governing permissions and
11 // limitations under the License
12 
13 #ifndef __DECODER_HPP__
14 #define __DECODER_HPP__
15 
16 // `http_parser.h` defines an enum `flags` which conflicts
17 // with, e.g., a namespace in stout. Rename it with a macro.
18 #define flags http_parser_flags
19 #include <http_parser.h>
20 #undef flags
21 
22 #include <glog/logging.h>
23 
24 #include <deque>
25 #include <limits>
26 #include <string>
27 #include <vector>
28 
29 #include <process/http.hpp>
30 
31 #include <stout/foreach.hpp>
32 #include <stout/gzip.hpp>
33 #include <stout/option.hpp>
34 #include <stout/try.hpp>
35 
36 
37 #if !(HTTP_PARSER_VERSION_MAJOR >= 2)
38 #error HTTP Parser version >= 2 required.
39 #endif
40 
41 
42 namespace process {
43 
44 namespace http_parsing {
45 
46 // We expect callbacks to return 0 on success and 1 on failure. These constants
47 // are introduced solely to make decoders' code easier to read and are not meant
48 // to be used outside.
49 constexpr int SUCCESS = 0;
50 constexpr int FAILURE = 1;
51 
52 } // namespace http_parsing {
53 
54 // TODO(benh): Make DataDecoder abstract and make RequestDecoder a
55 // concrete subclass.
57 {
58 public:
60  : failure(false), request(nullptr)
61  {
62  http_parser_settings_init(&settings);
63 
64  settings.on_message_begin = &DataDecoder::on_message_begin;
65  settings.on_url = &DataDecoder::on_url;
66  settings.on_header_field = &DataDecoder::on_header_field;
67  settings.on_header_value = &DataDecoder::on_header_value;
68  settings.on_headers_complete = &DataDecoder::on_headers_complete;
69  settings.on_body = &DataDecoder::on_body;
70  settings.on_message_complete = &DataDecoder::on_message_complete;
71  settings.on_chunk_complete = &DataDecoder::on_chunk_complete;
72  settings.on_chunk_header = &DataDecoder::on_chunk_header;
73 
74  http_parser_init(&parser, HTTP_REQUEST);
75 
76  parser.data = this;
77  }
78 
80  {
81  delete request;
82 
83  foreach (http::Request* request, requests) {
84  delete request;
85  }
86  }
87 
88  std::deque<http::Request*> decode(const char* data, size_t length)
89  {
90  size_t parsed = http_parser_execute(&parser, &settings, data, length);
91 
92  if (parsed != length) {
93  // TODO(bmahler): joyent/http-parser exposes error reasons.
94  failure = true;
95  }
96 
97  if (!requests.empty()) {
98  std::deque<http::Request*> result = requests;
99  requests.clear();
100  return result;
101  }
102 
103  return std::deque<http::Request*>();
104  }
105 
106  bool failed() const
107  {
108  return failure;
109  }
110 
111 private:
112  static int on_message_begin(http_parser* p)
113  {
114  DataDecoder* decoder = (DataDecoder*) p->data;
115 
116  CHECK(!decoder->failure);
117 
118  decoder->header = HEADER_FIELD;
119  decoder->field.clear();
120  decoder->value.clear();
121  decoder->query.clear();
122  decoder->url.clear();
123 
124  CHECK(decoder->request == nullptr);
125 
126  decoder->request = new http::Request();
127 
128  return http_parsing::SUCCESS;
129  }
130 
131  static int on_chunk_complete(http_parser* p)
132  {
133  return http_parsing::SUCCESS;
134  }
135 
136  static int on_chunk_header(http_parser* p)
137  {
138  return http_parsing::SUCCESS;
139  }
140 
141  static int on_url(http_parser* p, const char* data, size_t length)
142  {
143  DataDecoder* decoder = (DataDecoder*) p->data;
144  CHECK_NOTNULL(decoder->request);
145 
146  // The current http_parser library (version 2.6.2 and below)
147  // does not support incremental parsing of URLs. To compensate
148  // we incrementally collect the data and parse it in
149  // `on_message_complete`.
150  decoder->url.append(data, length);
151 
152  return http_parsing::SUCCESS;
153  }
154 
155  static int on_header_field(http_parser* p, const char* data, size_t length)
156  {
157  DataDecoder* decoder = (DataDecoder*) p->data;
158  CHECK_NOTNULL(decoder->request);
159 
160  if (decoder->header != HEADER_FIELD) {
161  decoder->request->headers[decoder->field] = decoder->value;
162  decoder->field.clear();
163  decoder->value.clear();
164  }
165 
166  decoder->field.append(data, length);
167  decoder->header = HEADER_FIELD;
168 
169  return http_parsing::SUCCESS;
170  }
171 
172  static int on_header_value(http_parser* p, const char* data, size_t length)
173  {
174  DataDecoder* decoder = (DataDecoder*) p->data;
175  CHECK_NOTNULL(decoder->request);
176  decoder->value.append(data, length);
177  decoder->header = HEADER_VALUE;
178  return http_parsing::SUCCESS;
179  }
180 
181  static int on_headers_complete(http_parser* p)
182  {
183  DataDecoder* decoder = (DataDecoder*) p->data;
184 
185  CHECK_NOTNULL(decoder->request);
186 
187  // Add final header.
188  decoder->request->headers[decoder->field] = decoder->value;
189  decoder->field.clear();
190  decoder->value.clear();
191 
192  decoder->request->method =
193  http_method_str((http_method) decoder->parser.method);
194 
195  decoder->request->keepAlive = http_should_keep_alive(&decoder->parser) != 0;
196 
197  return http_parsing::SUCCESS;
198  }
199 
200  static int on_body(http_parser* p, const char* data, size_t length)
201  {
202  DataDecoder* decoder = (DataDecoder*) p->data;
203  CHECK_NOTNULL(decoder->request);
204  decoder->request->body.append(data, length);
205  return http_parsing::SUCCESS;
206  }
207 
208  static int on_message_complete(http_parser* p)
209  {
210  DataDecoder* decoder = (DataDecoder*) p->data;
211 
212  CHECK_NOTNULL(decoder->request);
213 
214  // Parse the URL. This data was incrementally built up during calls
215  // to `on_url`.
216  http_parser_url url;
217  http_parser_url_init(&url);
218  int parse_url =
219  http_parser_parse_url(decoder->url.data(), decoder->url.size(), 0, &url);
220 
221  if (parse_url != 0) {
222  decoder->failure = true;
223  return parse_url;
224  }
225 
226  if (url.field_set & (1 << UF_PATH)) {
227  decoder->request->url.path = std::string(
228  decoder->url.data() + url.field_data[UF_PATH].off,
229  url.field_data[UF_PATH].len);
230  }
231 
232  if (url.field_set & (1 << UF_FRAGMENT)) {
233  decoder->request->url.fragment = std::string(
234  decoder->url.data() + url.field_data[UF_FRAGMENT].off,
235  url.field_data[UF_FRAGMENT].len);
236  }
237 
238  if (url.field_set & (1 << UF_QUERY)) {
239  decoder->query = std::string(
240  decoder->url.data() + url.field_data[UF_QUERY].off,
241  url.field_data[UF_QUERY].len);
242  }
243 
244  // Parse the query key/values.
246  http::query::decode(decoder->query);
247 
248  if (decoded.isError()) {
249  decoder->failure = true;
250  return http_parsing::FAILURE;
251  }
252 
253  decoder->request->url.query = decoded.get();
254 
255  Option<std::string> encoding =
256  decoder->request->headers.get("Content-Encoding");
257 
258  if (encoding.isSome() && encoding.get() == "gzip") {
259  Try<std::string> decompressed = gzip::decompress(decoder->request->body);
260  if (decompressed.isError()) {
261  decoder->failure = true;
262  return http_parsing::FAILURE;
263  }
264  decoder->request->body = decompressed.get();
265 
266  CHECK_LE(static_cast<long>(decoder->request->body.length()),
268 
269  decoder->request->headers["Content-Length"] =
270  static_cast<char>(decoder->request->body.length());
271  }
272 
273  decoder->requests.push_back(decoder->request);
274  decoder->request = nullptr;
275  return http_parsing::SUCCESS;
276  }
277 
278  bool failure;
279 
280  http_parser parser;
281  http_parser_settings settings;
282 
283  enum
284  {
285  HEADER_FIELD,
286  HEADER_VALUE
287  } header;
288 
289  std::string field;
290  std::string value;
291  std::string query;
292  std::string url;
293 
295 
296  std::deque<http::Request*> requests;
297 };
298 
299 
301 {
302 public:
304  : failure(false), header(HEADER_FIELD), response(nullptr)
305  {
306  http_parser_settings_init(&settings);
307 
308  settings.on_message_begin = &ResponseDecoder::on_message_begin;
309  settings.on_url = &ResponseDecoder::on_url;
310  settings.on_header_field = &ResponseDecoder::on_header_field;
311  settings.on_header_value = &ResponseDecoder::on_header_value;
312  settings.on_headers_complete = &ResponseDecoder::on_headers_complete;
313  settings.on_body = &ResponseDecoder::on_body;
314  settings.on_message_complete = &ResponseDecoder::on_message_complete;
315  settings.on_status = &ResponseDecoder::on_status;
316  settings.on_chunk_complete = &ResponseDecoder::on_chunk_complete;
317  settings.on_chunk_header = &ResponseDecoder::on_chunk_header;
318 
319  http_parser_init(&parser, HTTP_RESPONSE);
320 
321  parser.data = this;
322  }
323 
325  {
326  delete response;
327 
328  foreach (http::Response* response, responses) {
329  delete response;
330  }
331  }
332 
333  std::deque<http::Response*> decode(const char* data, size_t length)
334  {
335  size_t parsed = http_parser_execute(&parser, &settings, data, length);
336 
337  if (parsed != length) {
338  // TODO(bmahler): joyent/http-parser exposes error reasons.
339  failure = true;
340  }
341 
342  if (!responses.empty()) {
343  std::deque<http::Response*> result = responses;
344  responses.clear();
345  return result;
346  }
347 
348  return std::deque<http::Response*>();
349  }
350 
351  bool failed() const
352  {
353  return failure;
354  }
355 
356 private:
357  static int on_message_begin(http_parser* p)
358  {
359  ResponseDecoder* decoder = (ResponseDecoder*) p->data;
360 
361  CHECK(!decoder->failure);
362 
363  decoder->header = HEADER_FIELD;
364  decoder->field.clear();
365  decoder->value.clear();
366 
367  CHECK(decoder->response == nullptr);
368 
369  decoder->response = new http::Response();
370  decoder->response->status.clear();
371  decoder->response->headers.clear();
372  decoder->response->type = http::Response::BODY;
373  decoder->response->body.clear();
374  decoder->response->path.clear();
375 
376  return http_parsing::SUCCESS;
377  }
378 
379  static int on_chunk_complete(http_parser* p)
380  {
381  return http_parsing::SUCCESS;
382  }
383 
384  static int on_chunk_header(http_parser* p)
385  {
386  return http_parsing::SUCCESS;
387  }
388 
389  static int on_url(http_parser* p, const char* data, size_t length)
390  {
391  return http_parsing::SUCCESS;
392  }
393 
394  static int on_header_field(http_parser* p, const char* data, size_t length)
395  {
396  ResponseDecoder* decoder = (ResponseDecoder*) p->data;
397  CHECK_NOTNULL(decoder->response);
398 
399  if (decoder->header != HEADER_FIELD) {
400  decoder->response->headers[decoder->field] = decoder->value;
401  decoder->field.clear();
402  decoder->value.clear();
403  }
404 
405  decoder->field.append(data, length);
406  decoder->header = HEADER_FIELD;
407 
408  return http_parsing::SUCCESS;
409  }
410 
411  static int on_header_value(http_parser* p, const char* data, size_t length)
412  {
413  ResponseDecoder* decoder = (ResponseDecoder*) p->data;
414  CHECK_NOTNULL(decoder->response);
415  decoder->value.append(data, length);
416  decoder->header = HEADER_VALUE;
417  return http_parsing::SUCCESS;
418  }
419 
420  static int on_headers_complete(http_parser* p)
421  {
422  ResponseDecoder* decoder = (ResponseDecoder*) p->data;
423 
424  CHECK_NOTNULL(decoder->response);
425 
426  // Add final header.
427  decoder->response->headers[decoder->field] = decoder->value;
428  decoder->field.clear();
429  decoder->value.clear();
430 
431  return http_parsing::SUCCESS;
432  }
433 
434  static int on_body(http_parser* p, const char* data, size_t length)
435  {
436  ResponseDecoder* decoder = (ResponseDecoder*) p->data;
437  CHECK_NOTNULL(decoder->response);
438  decoder->response->body.append(data, length);
439  return http_parsing::SUCCESS;
440  }
441 
442  static int on_message_complete(http_parser* p)
443  {
444  ResponseDecoder* decoder = (ResponseDecoder*) p->data;
445 
446  CHECK_NOTNULL(decoder->response);
447 
448  if (http::statuses->contains(decoder->parser.status_code)) {
449  decoder->response->code = decoder->parser.status_code;
450 
451  decoder->response->status =
452  http::Status::string(decoder->parser.status_code);
453  } else {
454  decoder->failure = true;
455  return http_parsing::FAILURE;
456  }
457 
458  // We can only provide the gzip encoding.
459  Option<std::string> encoding =
460  decoder->response->headers.get("Content-Encoding");
461  if (encoding.isSome() && encoding.get() == "gzip") {
462  Try<std::string> decompressed = gzip::decompress(decoder->response->body);
463  if (decompressed.isError()) {
464  decoder->failure = true;
465  return http_parsing::FAILURE;
466  }
467  decoder->response->body = decompressed.get();
468 
469  CHECK_LE(static_cast<long>(decoder->response->body.length()),
471 
472  decoder->response->headers["Content-Length"] =
473  static_cast<char>(decoder->response->body.length());
474  }
475 
476  decoder->responses.push_back(decoder->response);
477  decoder->response = nullptr;
478  return http_parsing::SUCCESS;
479  }
480 
481  static int on_status(http_parser* p, const char* data, size_t length)
482  {
483  return http_parsing::SUCCESS;
484  }
485 
486  bool failure;
487 
488  http_parser parser;
489  http_parser_settings settings;
490 
491  enum
492  {
493  HEADER_FIELD,
494  HEADER_VALUE
495  } header;
496 
497  std::string field;
498  std::string value;
499 
500  http::Response* response;
501 
502  std::deque<http::Response*> responses;
503 };
504 
505 
506 // Provides a response decoder that returns 'PIPE' responses once
507 // the response headers are received, but before the body data
508 // is received. Callers are expected to read the body from the
509 // Pipe::Reader in the response.
510 //
511 // TODO(bmahler): Consolidate streaming and non-streaming response
512 // decoders.
514 {
515 public:
517  : failure(false), header(HEADER_FIELD), response(nullptr)
518  {
519  http_parser_settings_init(&settings);
520 
521  settings.on_message_begin =
522  &StreamingResponseDecoder::on_message_begin;
523  settings.on_url =
524  &StreamingResponseDecoder::on_url;
525  settings.on_header_field =
526  &StreamingResponseDecoder::on_header_field;
527  settings.on_header_value =
528  &StreamingResponseDecoder::on_header_value;
529  settings.on_headers_complete =
530  &StreamingResponseDecoder::on_headers_complete;
531  settings.on_body =
532  &StreamingResponseDecoder::on_body;
533  settings.on_message_complete =
534  &StreamingResponseDecoder::on_message_complete;
535  settings.on_status =
536  &StreamingResponseDecoder::on_status;
537  settings.on_chunk_complete =
538  &StreamingResponseDecoder::on_chunk_complete;
539  settings.on_chunk_header =
540  &StreamingResponseDecoder::on_chunk_header;
541 
542  http_parser_init(&parser, HTTP_RESPONSE);
543 
544  parser.data = this;
545  }
546 
548  {
549  delete response;
550 
551  if (writer.isSome()) {
552  writer->fail("Decoder is being deleted");
553  }
554 
555  foreach (http::Response* response, responses) {
556  delete response;
557  }
558  }
559 
560  std::deque<http::Response*> decode(const char* data, size_t length)
561  {
562  size_t parsed = http_parser_execute(&parser, &settings, data, length);
563 
564  if (parsed != length) {
565  // TODO(bmahler): joyent/http-parser exposes error reasons.
566  failure = true;
567 
568  // If we're still writing the body, fail the writer!
569  if (writer.isSome()) {
570  http::Pipe::Writer writer_ = writer.get(); // Remove const.
571  writer_.fail("failed to decode body");
572  writer = None();
573  }
574  }
575 
576  if (!responses.empty()) {
577  std::deque<http::Response*> result = responses;
578  responses.clear();
579  return result;
580  }
581 
582  return std::deque<http::Response*>();
583  }
584 
585  bool failed() const
586  {
587  return failure;
588  }
589 
590  // Returns whether the decoder is currently writing a response
591  // body. Helpful for knowing if the latest response is complete.
592  bool writingBody() const
593  {
594  return writer.isSome();
595  }
596 
597 private:
598  static int on_message_begin(http_parser* p)
599  {
601 
602  CHECK(!decoder->failure);
603 
604  decoder->header = HEADER_FIELD;
605  decoder->field.clear();
606  decoder->value.clear();
607 
608  CHECK(decoder->response == nullptr);
609  CHECK_NONE(decoder->writer);
610 
611  decoder->response = new http::Response();
612  decoder->response->type = http::Response::PIPE;
613  decoder->writer = None();
614 
615  return http_parsing::SUCCESS;
616  }
617 
618  static int on_chunk_complete(http_parser* p)
619  {
620  return http_parsing::SUCCESS;
621  }
622 
623  static int on_chunk_header(http_parser* p)
624  {
625  return http_parsing::SUCCESS;
626  }
627 
628  static int on_status(http_parser* p, const char* data, size_t length)
629  {
630  return http_parsing::SUCCESS;
631  }
632 
633  static int on_url(http_parser* p, const char* data, size_t length)
634  {
635  return http_parsing::SUCCESS;
636  }
637 
638  static int on_header_field(http_parser* p, const char* data, size_t length)
639  {
641 
642  // TODO(alexr): We currently do not support trailers, i.e., headers after
643  // `on_headers_complete` has been called, and instead treat them as errors.
644  if (decoder->response == nullptr) {
645  return http_parsing::FAILURE;
646  }
647 
648  if (decoder->header != HEADER_FIELD) {
649  decoder->response->headers[decoder->field] = decoder->value;
650  decoder->field.clear();
651  decoder->value.clear();
652  }
653 
654  decoder->field.append(data, length);
655  decoder->header = HEADER_FIELD;
656 
657  return http_parsing::SUCCESS;
658  }
659 
660  static int on_header_value(http_parser* p, const char* data, size_t length)
661  {
663 
664  // TODO(alexr): We currently do not support trailers, i.e., headers after
665  // `on_headers_complete` has been called, and instead treat them as errors.
666  if (decoder->response == nullptr) {
667  return http_parsing::FAILURE;
668  }
669 
670  decoder->value.append(data, length);
671  decoder->header = HEADER_VALUE;
672  return http_parsing::SUCCESS;
673  }
674 
675  static int on_headers_complete(http_parser* p)
676  {
678 
679  // This asserts not only that `on_message_begin` has been previously called,
680  // but also that `on_headers_complete` is not called more than once.
681  CHECK_NOTNULL(decoder->response);
682 
683  // Add final header.
684  decoder->response->headers[decoder->field] = decoder->value;
685  decoder->field.clear();
686  decoder->value.clear();
687 
688  if (http::statuses->contains(decoder->parser.status_code)) {
689  decoder->response->code = decoder->parser.status_code;
690 
691  decoder->response->status =
692  http::Status::string(decoder->parser.status_code);
693  } else {
694  decoder->failure = true;
695  return http_parsing::FAILURE;
696  }
697 
698  // We cannot provide streaming gzip decompression!
699  Option<std::string> encoding =
700  decoder->response->headers.get("Content-Encoding");
701  if (encoding.isSome() && encoding.get() == "gzip") {
702  decoder->failure = true;
703  return http_parsing::FAILURE;
704  }
705 
706  CHECK_NONE(decoder->writer);
707 
709  decoder->writer = pipe.writer();
710  decoder->response->reader = pipe.reader();
711 
712  // Send the response to the caller, but keep a Pipe::Writer for
713  // streaming the body content into the response.
714  decoder->responses.push_back(decoder->response);
715 
716  // TODO(alexr): We currently do not support trailers, i.e., extra headers
717  // after `on_headers_complete` has been called. When we add trailer support,
718  // we need a thread-safe way to surface them up in the response or some
719  // auxiliary data structure.
720  decoder->response = nullptr;
721 
722  return http_parsing::SUCCESS;
723  }
724 
725  static int on_body(http_parser* p, const char* data, size_t length)
726  {
728 
729  CHECK_SOME(decoder->writer);
730 
731  http::Pipe::Writer writer = decoder->writer.get(); // Remove const.
732  writer.write(std::string(data, length));
733 
734  return http_parsing::SUCCESS;
735  }
736 
737  static int on_message_complete(http_parser* p)
738  {
740 
741  // This can happen if the callback `on_headers_complete()` had failed
742  // earlier (e.g., due to invalid status code).
743  if (decoder->writer.isNone()) {
744  CHECK(decoder->failure);
745  return http_parsing::FAILURE;
746  }
747 
748  http::Pipe::Writer writer = decoder->writer.get(); // Remove const.
749  writer.close();
750 
751  decoder->writer = None();
752 
753  return http_parsing::SUCCESS;
754  }
755 
756  bool failure;
757 
758  http_parser parser;
759  http_parser_settings settings;
760 
761  enum
762  {
763  HEADER_FIELD,
764  HEADER_VALUE
765  } header;
766 
767  std::string field;
768  std::string value;
769 
770  http::Response* response;
772 
773  std::deque<http::Response*> responses;
774 };
775 
776 
777 // Provides a request decoder that returns 'PIPE' requests once
778 // the request headers are received, but before the body data
779 // is received. Callers are expected to read the body from the
780 // Pipe::Reader in the request.
782 {
783 public:
785  : failure(false), header(HEADER_FIELD), request(nullptr)
786  {
787  http_parser_settings_init(&settings);
788 
789  settings.on_message_begin =
790  &StreamingRequestDecoder::on_message_begin;
791  settings.on_url =
792  &StreamingRequestDecoder::on_url;
793  settings.on_header_field =
794  &StreamingRequestDecoder::on_header_field;
795  settings.on_header_value =
796  &StreamingRequestDecoder::on_header_value;
797  settings.on_headers_complete =
798  &StreamingRequestDecoder::on_headers_complete;
799  settings.on_body =
800  &StreamingRequestDecoder::on_body;
801  settings.on_message_complete =
802  &StreamingRequestDecoder::on_message_complete;
803  settings.on_chunk_complete =
804  &StreamingRequestDecoder::on_chunk_complete;
805  settings.on_chunk_header =
806  &StreamingRequestDecoder::on_chunk_header;
807 
808  http_parser_init(&parser, HTTP_REQUEST);
809 
810  parser.data = this;
811  }
812 
814  {
815  delete request;
816 
817  if (writer.isSome()) {
818  writer->fail("Decoder is being deleted");
819  }
820 
821  foreach (http::Request* request, requests) {
822  delete request;
823  }
824  }
825 
826  std::deque<http::Request*> decode(const char* data, size_t length)
827  {
828  size_t parsed = http_parser_execute(&parser, &settings, data, length);
829  if (parsed != length) {
830  // TODO(bmahler): joyent/http-parser exposes error reasons.
831  failure = true;
832 
833  // If we're still writing the body, fail the writer!
834  if (writer.isSome()) {
835  http::Pipe::Writer writer_ = writer.get(); // Remove const.
836  writer_.fail("failed to decode body");
837  writer = None();
838  }
839  }
840 
841  if (!requests.empty()) {
842  std::deque<http::Request*> result = requests;
843  requests.clear();
844  return result;
845  }
846 
847  return std::deque<http::Request*>();
848  }
849 
850  bool failed() const
851  {
852  return failure;
853  }
854 
855 private:
856  static int on_message_begin(http_parser* p)
857  {
858  StreamingRequestDecoder* decoder = (StreamingRequestDecoder*) p->data;
859 
860  CHECK(!decoder->failure);
861 
862  decoder->header = HEADER_FIELD;
863  decoder->field.clear();
864  decoder->value.clear();
865  decoder->query.clear();
866  decoder->url.clear();
867 
868  CHECK(decoder->request == nullptr);
869  CHECK_NONE(decoder->writer);
870 
871  decoder->request = new http::Request();
872  decoder->request->type = http::Request::PIPE;
873  decoder->writer = None();
874  decoder->decompressor.reset();
875 
876  return http_parsing::SUCCESS;
877  }
878 
879  static int on_chunk_complete(http_parser* p)
880  {
881  return http_parsing::SUCCESS;
882  }
883 
884  static int on_chunk_header(http_parser* p)
885  {
886  return http_parsing::SUCCESS;
887  }
888 
889  static int on_url(http_parser* p, const char* data, size_t length)
890  {
891  StreamingRequestDecoder* decoder = (StreamingRequestDecoder*) p->data;
892 
893  // URL should not be parsed after `on_headers_complete` has been called.
894  if (decoder->request == nullptr) {
895  return http_parsing::FAILURE;
896  }
897 
898  // The current http_parser library (version 2.6.2 and below)
899  // does not support incremental parsing of URLs. To compensate
900  // we incrementally collect the data and parse it in
901  // `on_header_complete`.
902  decoder->url.append(data, length);
903 
904  return http_parsing::SUCCESS;
905  }
906 
907  static int on_header_field(http_parser* p, const char* data, size_t length)
908  {
909  StreamingRequestDecoder* decoder = (StreamingRequestDecoder*) p->data;
910 
911  // TODO(alexr): We currently do not support trailers, i.e., headers after
912  // `on_headers_complete` has been called, and instead treat them as errors.
913  if (decoder->request == nullptr) {
914  return http_parsing::FAILURE;
915  }
916 
917  if (decoder->header != HEADER_FIELD) {
918  decoder->request->headers[decoder->field] = decoder->value;
919  decoder->field.clear();
920  decoder->value.clear();
921  }
922 
923  decoder->field.append(data, length);
924  decoder->header = HEADER_FIELD;
925 
926  return http_parsing::SUCCESS;
927  }
928 
929  static int on_header_value(http_parser* p, const char* data, size_t length)
930  {
931  StreamingRequestDecoder* decoder = (StreamingRequestDecoder*) p->data;
932 
933  // TODO(alexr): We currently do not support trailers, i.e., headers after
934  // `on_headers_complete` has been called, and instead treat them as errors.
935  if (decoder->request == nullptr) {
936  return http_parsing::FAILURE;
937  }
938 
939  decoder->value.append(data, length);
940  decoder->header = HEADER_VALUE;
941  return http_parsing::SUCCESS;
942  }
943 
944  static int on_headers_complete(http_parser* p)
945  {
946  StreamingRequestDecoder* decoder = (StreamingRequestDecoder*) p->data;
947 
948  // This asserts not only that `on_message_begin` has been previously called,
949  // but also that `on_headers_complete` is not called more than once.
950  CHECK_NOTNULL(decoder->request);
951 
952  // Add final header.
953  decoder->request->headers[decoder->field] = decoder->value;
954  decoder->field.clear();
955  decoder->value.clear();
956 
957  decoder->request->method =
958  http_method_str((http_method) decoder->parser.method);
959 
960  decoder->request->keepAlive = http_should_keep_alive(&decoder->parser) != 0;
961 
962  // Parse the URL. This data was incrementally built up during calls
963  // to `on_url`.
964  http_parser_url url;
965  http_parser_url_init(&url);
966  int parse_url =
967  http_parser_parse_url(decoder->url.data(), decoder->url.size(), 0, &url);
968 
969  if (parse_url != 0) {
970  decoder->failure = true;
971  return parse_url;
972  }
973 
974  if (url.field_set & (1 << UF_PATH)) {
975  decoder->request->url.path = std::string(
976  decoder->url.data() + url.field_data[UF_PATH].off,
977  url.field_data[UF_PATH].len);
978  }
979 
980  if (url.field_set & (1 << UF_FRAGMENT)) {
981  decoder->request->url.fragment = std::string(
982  decoder->url.data() + url.field_data[UF_FRAGMENT].off,
983  url.field_data[UF_FRAGMENT].len);
984  }
985 
986  if (url.field_set & (1 << UF_QUERY)) {
987  decoder->query = std::string(
988  decoder->url.data() + url.field_data[UF_QUERY].off,
989  url.field_data[UF_QUERY].len);
990  }
991 
992  // Parse the query key/values.
994  http::query::decode(decoder->query);
995 
996  if (decoded.isError()) {
997  decoder->failure = true;
998  return http_parsing::FAILURE;
999  }
1000 
1001  decoder->request->url.query = std::move(decoded.get());
1002 
1003  Option<std::string> encoding =
1004  decoder->request->headers.get("Content-Encoding");
1005 
1006  if (encoding.isSome() && encoding.get() == "gzip") {
1007  decoder->decompressor =
1009  }
1010 
1011  CHECK_NONE(decoder->writer);
1012 
1013  http::Pipe pipe;
1014  decoder->writer = pipe.writer();
1015  decoder->request->reader = pipe.reader();
1016 
1017  // Send the request to the caller, but keep a Pipe::Writer for
1018  // streaming the body content into the request.
1019  decoder->requests.push_back(decoder->request);
1020 
1021  // TODO(alexr): We currently do not support trailers, i.e., extra headers
1022  // after `on_headers_complete` has been called. When we add trailer support,
1023  // we need a thread-safe way to surface them up in the request or some
1024  // auxiliary data structure.
1025  decoder->request = nullptr;
1026 
1027  return http_parsing::SUCCESS;
1028  }
1029 
1030  static int on_body(http_parser* p, const char* data, size_t length)
1031  {
1032  StreamingRequestDecoder* decoder = (StreamingRequestDecoder*) p->data;
1033 
1034  CHECK_SOME(decoder->writer);
1035 
1036  http::Pipe::Writer writer = decoder->writer.get(); // Remove const.
1037 
1038  std::string body;
1039  if (decoder->decompressor.get() != nullptr) {
1040  Try<std::string> decompressed =
1041  decoder->decompressor->decompress(std::string(data, length));
1042 
1043  if (decompressed.isError()) {
1044  decoder->failure = true;
1045  return http_parsing::FAILURE;
1046  }
1047 
1048  body = std::move(decompressed.get());
1049  } else {
1050  body = std::string(data, length);
1051  }
1052 
1053  writer.write(std::move(body));
1054 
1055  return http_parsing::SUCCESS;
1056  }
1057 
1058  static int on_message_complete(http_parser* p)
1059  {
1060  StreamingRequestDecoder* decoder = (StreamingRequestDecoder*) p->data;
1061 
1062  // This can happen if the callback `on_headers_complete()` had failed
1063  // earlier (e.g., due to invalid query parameters).
1064  if (decoder->writer.isNone()) {
1065  CHECK(decoder->failure);
1066  return http_parsing::FAILURE;
1067  }
1068 
1069  http::Pipe::Writer writer = decoder->writer.get(); // Remove const.
1070 
1071  if (decoder->decompressor.get() != nullptr &&
1072  !decoder->decompressor->finished()) {
1073  writer.fail("Failed to decompress body");
1074  decoder->failure = true;
1075  return http_parsing::FAILURE;
1076  }
1077 
1078  writer.close();
1079 
1080  decoder->writer = None();
1081 
1082  return http_parsing::SUCCESS;
1083  }
1084 
1085  bool failure;
1086 
1087  http_parser parser;
1088  http_parser_settings settings;
1089 
1090  enum
1091  {
1092  HEADER_FIELD,
1093  HEADER_VALUE
1094  } header;
1095 
1096  std::string field;
1097  std::string value;
1098  std::string query;
1099  std::string url;
1100 
1103  Owned<gzip::Decompressor> decompressor;
1104 
1105  std::deque<http::Request*> requests;
1106 };
1107 
1108 } // namespace process {
1109 
1110 #endif // __DECODER_HPP__
~DataDecoder()
Definition: decoder.hpp:79
Definition: http.hpp:557
Definition: http.hpp:296
Definition: gzip.hpp:88
Future< Response > request(const Request &request, bool streamedResponse=false)
Asynchronously sends an HTTP request to the process and returns the HTTP response once the entire res...
T & get()&
Definition: try.hpp:73
URL url
Definition: http.hpp:529
void reset()
Definition: owned.hpp:132
Definition: check.hpp:33
std::string status
Definition: http.hpp:621
enum process::http::Request::@3 type
#define CHECK_NONE(expression)
Definition: check.hpp:54
constexpr int SUCCESS
Definition: decoder.hpp:49
Definition: http.hpp:651
Option< Pipe::Reader > reader
Definition: http.hpp:564
uint16_t code
Definition: http.hpp:658
bool writingBody() const
Definition: decoder.hpp:592
Result< T > get() const
Definition: http.hpp:487
StreamingRequestDecoder()
Definition: decoder.hpp:784
T * get() const
Definition: owned.hpp:117
bool failed() const
Definition: decoder.hpp:351
bool failed() const
Definition: decoder.hpp:585
~StreamingResponseDecoder()
Definition: decoder.hpp:547
Try< hashmap< std::string, std::string > > decode(const std::string &query)
Decode a string that is Base64-encoded with the standard Base64 alphabet.
Definition: base64.hpp:183
bool isSome() const
Definition: option.hpp:115
Writer writer() const
Definition: http.hpp:518
Try< std::string > decompress(const std::string &compressed)
Definition: gzip.hpp:122
hashmap< std::string, std::string > query
Definition: http.hpp:176
Definition: http.hpp:649
std::deque< http::Request * > decode(const char *data, size_t length)
Definition: decoder.hpp:88
bool contains(const std::string &s, const std::string &substr)
Definition: strings.hpp:406
Option< Pipe::Reader > reader
Definition: http.hpp:656
std::string path
Definition: http.hpp:175
#define CHECK_SOME(expression)
Definition: check.hpp:50
std::string body
Definition: http.hpp:654
Definition: decoder.hpp:300
Definition: http.hpp:340
Option< T > max(const Option< T > &left, const Option< T > &right)
Definition: option.hpp:208
Try< std::string > decompress(const std::string &compressed)
Definition: gzip.hpp:243
~ResponseDecoder()
Definition: decoder.hpp:324
bool finished() const
Definition: gzip.hpp:161
bool write(std::string s)
StreamingResponseDecoder()
Definition: decoder.hpp:516
bool fail(const std::string &message)
Try< std::array< int, 2 > > pipe()
Definition: pipe.hpp:26
Reader reader() const
const T & get() const &
Definition: option.hpp:118
bool keepAlive
Definition: http.hpp:543
Definition: decoder.hpp:513
bool failed() const
Definition: decoder.hpp:850
Option< std::string > fragment
Definition: http.hpp:177
~StreamingRequestDecoder()
Definition: decoder.hpp:813
std::deque< http::Response * > decode(const char *data, size_t length)
Definition: decoder.hpp:333
DataDecoder()
Definition: decoder.hpp:59
Definition: decoder.hpp:781
Definition: none.hpp:27
bool isError() const
Definition: try.hpp:71
bool failed() const
Definition: decoder.hpp:106
std::string path
Definition: http.hpp:655
std::string method
Definition: http.hpp:523
Definition: executor.hpp:47
Definition: http.hpp:595
std::string body
Definition: http.hpp:563
hashmap< uint16_t, std::string > * statuses
Definition: decoder.hpp:56
bool isNone() const
Definition: option.hpp:116
constexpr int FAILURE
Definition: decoder.hpp:50
std::deque< http::Response * > decode(const char *data, size_t length)
Definition: decoder.hpp:560
static std::string string(uint16_t code)
Headers headers
Definition: http.hpp:623
enum process::http::Response::@4 type
std::deque< http::Request * > decode(const char *data, size_t length)
Definition: decoder.hpp:826
Option< Value > get(const Key &key) const
Definition: hashmap.hpp:112
Headers headers
Definition: http.hpp:531
ResponseDecoder()
Definition: decoder.hpp:303