Apache Mesos
decoder.hpp
Go to the documentation of this file.
1 // Licensed under the Apache License, Version 2.0 (the "License");
2 // you may not use this file except in compliance with the License.
3 // You may obtain a copy of the License at
4 //
5 // http://www.apache.org/licenses/LICENSE-2.0
6 //
7 // Unless required by applicable law or agreed to in writing, software
8 // distributed under the License is distributed on an "AS IS" BASIS,
9 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
10 // See the License for the specific language governing permissions and
11 // limitations under the License
12 
13 #ifndef __DECODER_HPP__
14 #define __DECODER_HPP__
15 
16 // `http_parser.h` defines an enum `flags` which conflicts
17 // with, e.g., a namespace in stout. Rename it with a macro.
18 #define flags http_parser_flags
19 #include <http_parser.h>
20 #undef flags
21 
22 #include <glog/logging.h>
23 
24 #include <deque>
25 #include <limits>
26 #include <string>
27 #include <vector>
28 
29 #include <process/http.hpp>
30 
31 #include <stout/foreach.hpp>
32 #include <stout/gzip.hpp>
33 #include <stout/option.hpp>
34 #include <stout/try.hpp>
35 
36 
37 #if !(HTTP_PARSER_VERSION_MAJOR >= 2)
38 #error HTTP Parser version >= 2 required.
39 #endif
40 
41 
42 namespace process {
43 
44 // TODO(benh): Make DataDecoder abstract and make RequestDecoder a
45 // concrete subclass.
47 {
48 public:
50  : failure(false), request(nullptr)
51  {
52  http_parser_settings_init(&settings);
53 
54  settings.on_message_begin = &DataDecoder::on_message_begin;
55  settings.on_url = &DataDecoder::on_url;
56  settings.on_header_field = &DataDecoder::on_header_field;
57  settings.on_header_value = &DataDecoder::on_header_value;
58  settings.on_headers_complete = &DataDecoder::on_headers_complete;
59  settings.on_body = &DataDecoder::on_body;
60  settings.on_message_complete = &DataDecoder::on_message_complete;
61  settings.on_chunk_complete = &DataDecoder::on_chunk_complete;
62  settings.on_chunk_header = &DataDecoder::on_chunk_header;
63 
64  http_parser_init(&parser, HTTP_REQUEST);
65 
66  parser.data = this;
67  }
68 
70  {
71  delete request;
72 
73  foreach (http::Request* request, requests) {
74  delete request;
75  }
76  }
77 
78  std::deque<http::Request*> decode(const char* data, size_t length)
79  {
80  size_t parsed = http_parser_execute(&parser, &settings, data, length);
81 
82  if (parsed != length) {
83  // TODO(bmahler): joyent/http-parser exposes error reasons.
84  failure = true;
85  }
86 
87  if (!requests.empty()) {
88  std::deque<http::Request*> result = requests;
89  requests.clear();
90  return result;
91  }
92 
93  return std::deque<http::Request*>();
94  }
95 
96  bool failed() const
97  {
98  return failure;
99  }
100 
101 private:
102  static int on_message_begin(http_parser* p)
103  {
104  DataDecoder* decoder = (DataDecoder*) p->data;
105 
106  CHECK(!decoder->failure);
107 
108  decoder->header = HEADER_FIELD;
109  decoder->field.clear();
110  decoder->value.clear();
111  decoder->query.clear();
112  decoder->url.clear();
113 
114  CHECK(decoder->request == nullptr);
115 
116  decoder->request = new http::Request();
117 
118  return 0;
119  }
120 
121  static int on_chunk_complete(http_parser* p)
122  {
123  return 0;
124  }
125 
126  static int on_chunk_header(http_parser* p)
127  {
128  return 0;
129  }
130 
131  static int on_url(http_parser* p, const char* data, size_t length)
132  {
133  DataDecoder* decoder = (DataDecoder*) p->data;
134  CHECK_NOTNULL(decoder->request);
135 
136  // The current http_parser library (version 2.6.2 and below)
137  // does not support incremental parsing of URLs. To compensate
138  // we incrementally collect the data and parse it in
139  // `on_message_complete`.
140  decoder->url.append(data, length);
141 
142  return 0;
143  }
144 
145  static int on_header_field(http_parser* p, const char* data, size_t length)
146  {
147  DataDecoder* decoder = (DataDecoder*) p->data;
148  CHECK_NOTNULL(decoder->request);
149 
150  if (decoder->header != HEADER_FIELD) {
151  decoder->request->headers[decoder->field] = decoder->value;
152  decoder->field.clear();
153  decoder->value.clear();
154  }
155 
156  decoder->field.append(data, length);
157  decoder->header = HEADER_FIELD;
158 
159  return 0;
160  }
161 
162  static int on_header_value(http_parser* p, const char* data, size_t length)
163  {
164  DataDecoder* decoder = (DataDecoder*) p->data;
165  CHECK_NOTNULL(decoder->request);
166  decoder->value.append(data, length);
167  decoder->header = HEADER_VALUE;
168  return 0;
169  }
170 
171  static int on_headers_complete(http_parser* p)
172  {
173  DataDecoder* decoder = (DataDecoder*) p->data;
174 
175  CHECK_NOTNULL(decoder->request);
176 
177  // Add final header.
178  decoder->request->headers[decoder->field] = decoder->value;
179  decoder->field.clear();
180  decoder->value.clear();
181 
182  decoder->request->method =
183  http_method_str((http_method) decoder->parser.method);
184 
185  decoder->request->keepAlive = http_should_keep_alive(&decoder->parser) != 0;
186 
187  return 0;
188  }
189 
190  static int on_body(http_parser* p, const char* data, size_t length)
191  {
192  DataDecoder* decoder = (DataDecoder*) p->data;
193  CHECK_NOTNULL(decoder->request);
194  decoder->request->body.append(data, length);
195  return 0;
196  }
197 
198  static int on_message_complete(http_parser* p)
199  {
200  DataDecoder* decoder = (DataDecoder*) p->data;
201 
202  CHECK_NOTNULL(decoder->request);
203 
204  // Parse the URL. This data was incrementally built up during calls
205  // to `on_url`.
206  http_parser_url url;
207  http_parser_url_init(&url);
208  int parse_url =
209  http_parser_parse_url(decoder->url.data(), decoder->url.size(), 0, &url);
210 
211  if (parse_url != 0) {
212  decoder->failure = true;
213  return parse_url;
214  }
215 
216  if (url.field_set & (1 << UF_PATH)) {
217  decoder->request->url.path = std::string(
218  decoder->url.data() + url.field_data[UF_PATH].off,
219  url.field_data[UF_PATH].len);
220  }
221 
222  if (url.field_set & (1 << UF_FRAGMENT)) {
223  decoder->request->url.fragment = std::string(
224  decoder->url.data() + url.field_data[UF_FRAGMENT].off,
225  url.field_data[UF_FRAGMENT].len);
226  }
227 
228  if (url.field_set & (1 << UF_QUERY)) {
229  decoder->query = std::string(
230  decoder->url.data() + url.field_data[UF_QUERY].off,
231  url.field_data[UF_QUERY].len);
232  }
233 
234  // Parse the query key/values.
236  http::query::decode(decoder->query);
237 
238  if (decoded.isError()) {
239  decoder->failure = true;
240  return 1;
241  }
242 
243  decoder->request->url.query = decoded.get();
244 
245  Option<std::string> encoding =
246  decoder->request->headers.get("Content-Encoding");
247 
248  if (encoding.isSome() && encoding.get() == "gzip") {
249  Try<std::string> decompressed = gzip::decompress(decoder->request->body);
250  if (decompressed.isError()) {
251  decoder->failure = true;
252  return 1;
253  }
254  decoder->request->body = decompressed.get();
255 
256  CHECK_LE(static_cast<long>(decoder->request->body.length()),
258 
259  decoder->request->headers["Content-Length"] =
260  static_cast<char>(decoder->request->body.length());
261  }
262 
263  decoder->requests.push_back(decoder->request);
264  decoder->request = nullptr;
265  return 0;
266  }
267 
268  bool failure;
269 
270  http_parser parser;
271  http_parser_settings settings;
272 
273  enum
274  {
275  HEADER_FIELD,
276  HEADER_VALUE
277  } header;
278 
279  std::string field;
280  std::string value;
281  std::string query;
282  std::string url;
283 
284  http::Request* request;
285 
286  std::deque<http::Request*> requests;
287 };
288 
289 
291 {
292 public:
294  : failure(false), header(HEADER_FIELD), response(nullptr)
295  {
296  http_parser_settings_init(&settings);
297 
298  settings.on_message_begin = &ResponseDecoder::on_message_begin;
299  settings.on_url = &ResponseDecoder::on_url;
300  settings.on_header_field = &ResponseDecoder::on_header_field;
301  settings.on_header_value = &ResponseDecoder::on_header_value;
302  settings.on_headers_complete = &ResponseDecoder::on_headers_complete;
303  settings.on_body = &ResponseDecoder::on_body;
304  settings.on_message_complete = &ResponseDecoder::on_message_complete;
305  settings.on_status = &ResponseDecoder::on_status;
306  settings.on_chunk_complete = &ResponseDecoder::on_chunk_complete;
307  settings.on_chunk_header = &ResponseDecoder::on_chunk_header;
308 
309  http_parser_init(&parser, HTTP_RESPONSE);
310 
311  parser.data = this;
312  }
313 
315  {
316  delete response;
317 
318  foreach (http::Response* response, responses) {
319  delete response;
320  }
321  }
322 
323  std::deque<http::Response*> decode(const char* data, size_t length)
324  {
325  size_t parsed = http_parser_execute(&parser, &settings, data, length);
326 
327  if (parsed != length) {
328  // TODO(bmahler): joyent/http-parser exposes error reasons.
329  failure = true;
330  }
331 
332  if (!responses.empty()) {
333  std::deque<http::Response*> result = responses;
334  responses.clear();
335  return result;
336  }
337 
338  return std::deque<http::Response*>();
339  }
340 
341  bool failed() const
342  {
343  return failure;
344  }
345 
346 private:
347  static int on_message_begin(http_parser* p)
348  {
349  ResponseDecoder* decoder = (ResponseDecoder*) p->data;
350 
351  CHECK(!decoder->failure);
352 
353  decoder->header = HEADER_FIELD;
354  decoder->field.clear();
355  decoder->value.clear();
356 
357  CHECK(decoder->response == nullptr);
358 
359  decoder->response = new http::Response();
360  decoder->response->status.clear();
361  decoder->response->headers.clear();
362  decoder->response->type = http::Response::BODY;
363  decoder->response->body.clear();
364  decoder->response->path.clear();
365 
366  return 0;
367  }
368 
369  static int on_chunk_complete(http_parser* p)
370  {
371  return 0;
372  }
373 
374  static int on_chunk_header(http_parser* p)
375  {
376  return 0;
377  }
378 
379  static int on_url(http_parser* p, const char* data, size_t length)
380  {
381  return 0;
382  }
383 
384  static int on_header_field(http_parser* p, const char* data, size_t length)
385  {
386  ResponseDecoder* decoder = (ResponseDecoder*) p->data;
387  CHECK_NOTNULL(decoder->response);
388 
389  if (decoder->header != HEADER_FIELD) {
390  decoder->response->headers[decoder->field] = decoder->value;
391  decoder->field.clear();
392  decoder->value.clear();
393  }
394 
395  decoder->field.append(data, length);
396  decoder->header = HEADER_FIELD;
397 
398  return 0;
399  }
400 
401  static int on_header_value(http_parser* p, const char* data, size_t length)
402  {
403  ResponseDecoder* decoder = (ResponseDecoder*) p->data;
404  CHECK_NOTNULL(decoder->response);
405  decoder->value.append(data, length);
406  decoder->header = HEADER_VALUE;
407  return 0;
408  }
409 
410  static int on_headers_complete(http_parser* p)
411  {
412  ResponseDecoder* decoder = (ResponseDecoder*) p->data;
413 
414  CHECK_NOTNULL(decoder->response);
415 
416  // Add final header.
417  decoder->response->headers[decoder->field] = decoder->value;
418  decoder->field.clear();
419  decoder->value.clear();
420 
421  return 0;
422  }
423 
424  static int on_body(http_parser* p, const char* data, size_t length)
425  {
426  ResponseDecoder* decoder = (ResponseDecoder*) p->data;
427  CHECK_NOTNULL(decoder->response);
428  decoder->response->body.append(data, length);
429  return 0;
430  }
431 
432  static int on_message_complete(http_parser* p)
433  {
434  ResponseDecoder* decoder = (ResponseDecoder*) p->data;
435 
436  CHECK_NOTNULL(decoder->response);
437 
438  if (http::statuses->contains(decoder->parser.status_code)) {
439  decoder->response->code = decoder->parser.status_code;
440 
441  decoder->response->status =
442  http::Status::string(decoder->parser.status_code);
443  } else {
444  decoder->failure = true;
445  return 1;
446  }
447 
448  // We can only provide the gzip encoding.
449  Option<std::string> encoding =
450  decoder->response->headers.get("Content-Encoding");
451  if (encoding.isSome() && encoding.get() == "gzip") {
452  Try<std::string> decompressed = gzip::decompress(decoder->response->body);
453  if (decompressed.isError()) {
454  decoder->failure = true;
455  return 1;
456  }
457  decoder->response->body = decompressed.get();
458 
459  CHECK_LE(static_cast<long>(decoder->response->body.length()),
461 
462  decoder->response->headers["Content-Length"] =
463  static_cast<char>(decoder->response->body.length());
464  }
465 
466  decoder->responses.push_back(decoder->response);
467  decoder->response = nullptr;
468  return 0;
469  }
470 
471  static int on_status(http_parser* p, const char* data, size_t length)
472  {
473  return 0;
474  }
475 
476  bool failure;
477 
478  http_parser parser;
479  http_parser_settings settings;
480 
481  enum
482  {
483  HEADER_FIELD,
484  HEADER_VALUE
485  } header;
486 
487  std::string field;
488  std::string value;
489 
490  http::Response* response;
491 
492  std::deque<http::Response*> responses;
493 };
494 
495 
496 // Provides a response decoder that returns 'PIPE' responses once
497 // the response headers are received, but before the body data
498 // is received. Callers are expected to read the body from the
499 // Pipe::Reader in the response.
500 //
501 // TODO(bmahler): Consolidate streaming and non-streaming response
502 // decoders.
504 {
505 public:
507  : failure(false), header(HEADER_FIELD), response(nullptr)
508  {
509  http_parser_settings_init(&settings);
510 
511  settings.on_message_begin =
512  &StreamingResponseDecoder::on_message_begin;
513  settings.on_url =
514  &StreamingResponseDecoder::on_url;
515  settings.on_header_field =
516  &StreamingResponseDecoder::on_header_field;
517  settings.on_header_value =
518  &StreamingResponseDecoder::on_header_value;
519  settings.on_headers_complete =
520  &StreamingResponseDecoder::on_headers_complete;
521  settings.on_body =
522  &StreamingResponseDecoder::on_body;
523  settings.on_message_complete =
524  &StreamingResponseDecoder::on_message_complete;
525  settings.on_status =
526  &StreamingResponseDecoder::on_status;
527  settings.on_chunk_complete =
528  &StreamingResponseDecoder::on_chunk_complete;
529  settings.on_chunk_header =
530  &StreamingResponseDecoder::on_chunk_header;
531 
532  http_parser_init(&parser, HTTP_RESPONSE);
533 
534  parser.data = this;
535  }
536 
538  {
539  delete response;
540 
541  if (writer.isSome()) {
542  writer->fail("Decoder is being deleted");
543  }
544 
545  foreach (http::Response* response, responses) {
546  delete response;
547  }
548  }
549 
550  std::deque<http::Response*> decode(const char* data, size_t length)
551  {
552  size_t parsed = http_parser_execute(&parser, &settings, data, length);
553 
554  if (parsed != length) {
555  // TODO(bmahler): joyent/http-parser exposes error reasons.
556  failure = true;
557 
558  // If we're still writing the body, fail the writer!
559  if (writer.isSome()) {
560  http::Pipe::Writer writer_ = writer.get(); // Remove const.
561  writer_.fail("failed to decode body");
562  writer = None();
563  }
564  }
565 
566  if (!responses.empty()) {
567  std::deque<http::Response*> result = responses;
568  responses.clear();
569  return result;
570  }
571 
572  return std::deque<http::Response*>();
573  }
574 
575  bool failed() const
576  {
577  return failure;
578  }
579 
580  // Returns whether the decoder is currently writing a response
581  // body. Helpful for knowing if the latest response is complete.
582  bool writingBody() const
583  {
584  return writer.isSome();
585  }
586 
587 private:
588  static int on_message_begin(http_parser* p)
589  {
591 
592  CHECK(!decoder->failure);
593 
594  decoder->header = HEADER_FIELD;
595  decoder->field.clear();
596  decoder->value.clear();
597 
598  CHECK(decoder->response == nullptr);
599  CHECK_NONE(decoder->writer);
600 
601  decoder->response = new http::Response();
602  decoder->response->type = http::Response::PIPE;
603  decoder->writer = None();
604 
605  return 0;
606  }
607 
608  static int on_chunk_complete(http_parser* p)
609  {
610  return 0;
611  }
612 
613  static int on_chunk_header(http_parser* p)
614  {
615  return 0;
616  }
617 
618  static int on_status(http_parser* p, const char* data, size_t length)
619  {
620  return 0;
621  }
622 
623  static int on_url(http_parser* p, const char* data, size_t length)
624  {
625  return 0;
626  }
627 
628  static int on_header_field(http_parser* p, const char* data, size_t length)
629  {
631 
632  CHECK_NOTNULL(decoder->response);
633 
634  if (decoder->header != HEADER_FIELD) {
635  decoder->response->headers[decoder->field] = decoder->value;
636  decoder->field.clear();
637  decoder->value.clear();
638  }
639 
640  decoder->field.append(data, length);
641  decoder->header = HEADER_FIELD;
642 
643  return 0;
644  }
645 
646  static int on_header_value(http_parser* p, const char* data, size_t length)
647  {
649 
650  CHECK_NOTNULL(decoder->response);
651 
652  decoder->value.append(data, length);
653  decoder->header = HEADER_VALUE;
654  return 0;
655  }
656 
657  static int on_headers_complete(http_parser* p)
658  {
660 
661  CHECK_NOTNULL(decoder->response);
662 
663  // Add final header.
664  decoder->response->headers[decoder->field] = decoder->value;
665  decoder->field.clear();
666  decoder->value.clear();
667 
668  if (http::statuses->contains(decoder->parser.status_code)) {
669  decoder->response->code = decoder->parser.status_code;
670 
671  decoder->response->status =
672  http::Status::string(decoder->parser.status_code);
673  } else {
674  decoder->failure = true;
675  return 1;
676  }
677 
678  // We cannot provide streaming gzip decompression!
679  Option<std::string> encoding =
680  decoder->response->headers.get("Content-Encoding");
681  if (encoding.isSome() && encoding.get() == "gzip") {
682  decoder->failure = true;
683  return 1;
684  }
685 
686  CHECK_NONE(decoder->writer);
687 
688  http::Pipe pipe;
689  decoder->writer = pipe.writer();
690  decoder->response->reader = pipe.reader();
691 
692  // Send the response to the caller, but keep a Pipe::Writer for
693  // streaming the body content into the response.
694  decoder->responses.push_back(decoder->response);
695  decoder->response = nullptr;
696 
697  return 0;
698  }
699 
700  static int on_body(http_parser* p, const char* data, size_t length)
701  {
703 
704  CHECK_SOME(decoder->writer);
705 
706  http::Pipe::Writer writer = decoder->writer.get(); // Remove const.
707  writer.write(std::string(data, length));
708 
709  return 0;
710  }
711 
712  static int on_message_complete(http_parser* p)
713  {
715 
716  // This can happen if the callback `on_headers_complete()` had failed
717  // earlier (e.g., due to invalid status code).
718  if (decoder->writer.isNone()) {
719  CHECK(decoder->failure);
720  return 1;
721  }
722 
723  http::Pipe::Writer writer = decoder->writer.get(); // Remove const.
724  writer.close();
725 
726  decoder->writer = None();
727 
728  return 0;
729  }
730 
731  bool failure;
732 
733  http_parser parser;
734  http_parser_settings settings;
735 
736  enum
737  {
738  HEADER_FIELD,
739  HEADER_VALUE
740  } header;
741 
742  std::string field;
743  std::string value;
744 
745  http::Response* response;
747 
748  std::deque<http::Response*> responses;
749 };
750 
751 
752 // Provides a request decoder that returns 'PIPE' requests once
753 // the request headers are received, but before the body data
754 // is received. Callers are expected to read the body from the
755 // Pipe::Reader in the request.
757 {
758 public:
760  : failure(false), header(HEADER_FIELD), request(nullptr)
761  {
762  http_parser_settings_init(&settings);
763 
764  settings.on_message_begin =
765  &StreamingRequestDecoder::on_message_begin;
766  settings.on_url =
767  &StreamingRequestDecoder::on_url;
768  settings.on_header_field =
769  &StreamingRequestDecoder::on_header_field;
770  settings.on_header_value =
771  &StreamingRequestDecoder::on_header_value;
772  settings.on_headers_complete =
773  &StreamingRequestDecoder::on_headers_complete;
774  settings.on_body =
775  &StreamingRequestDecoder::on_body;
776  settings.on_message_complete =
777  &StreamingRequestDecoder::on_message_complete;
778  settings.on_chunk_complete =
779  &StreamingRequestDecoder::on_chunk_complete;
780  settings.on_chunk_header =
781  &StreamingRequestDecoder::on_chunk_header;
782 
783  http_parser_init(&parser, HTTP_REQUEST);
784 
785  parser.data = this;
786  }
787 
789  {
790  delete request;
791 
792  if (writer.isSome()) {
793  writer->fail("Decoder is being deleted");
794  }
795 
796  foreach (http::Request* request, requests) {
797  delete request;
798  }
799  }
800 
801  std::deque<http::Request*> decode(const char* data, size_t length)
802  {
803  size_t parsed = http_parser_execute(&parser, &settings, data, length);
804  if (parsed != length) {
805  // TODO(bmahler): joyent/http-parser exposes error reasons.
806  failure = true;
807 
808  // If we're still writing the body, fail the writer!
809  if (writer.isSome()) {
810  http::Pipe::Writer writer_ = writer.get(); // Remove const.
811  writer_.fail("failed to decode body");
812  writer = None();
813  }
814  }
815 
816  if (!requests.empty()) {
817  std::deque<http::Request*> result = requests;
818  requests.clear();
819  return result;
820  }
821 
822  return std::deque<http::Request*>();
823  }
824 
825  bool failed() const
826  {
827  return failure;
828  }
829 
830 private:
831  static int on_message_begin(http_parser* p)
832  {
833  StreamingRequestDecoder* decoder = (StreamingRequestDecoder*) p->data;
834 
835  CHECK(!decoder->failure);
836 
837  decoder->header = HEADER_FIELD;
838  decoder->field.clear();
839  decoder->value.clear();
840  decoder->query.clear();
841  decoder->url.clear();
842 
843  CHECK(decoder->request == nullptr);
844  CHECK_NONE(decoder->writer);
845 
846  decoder->request = new http::Request();
847  decoder->request->type = http::Request::PIPE;
848  decoder->writer = None();
849  decoder->decompressor.reset();
850 
851  return 0;
852  }
853 
854  static int on_chunk_complete(http_parser* p)
855  {
856  return 0;
857  }
858 
859  static int on_chunk_header(http_parser* p)
860  {
861  return 0;
862  }
863 
864  static int on_url(http_parser* p, const char* data, size_t length)
865  {
866  StreamingRequestDecoder* decoder = (StreamingRequestDecoder*) p->data;
867 
868  CHECK_NOTNULL(decoder->request);
869 
870  // The current http_parser library (version 2.6.2 and below)
871  // does not support incremental parsing of URLs. To compensate
872  // we incrementally collect the data and parse it in
873  // `on_header_complete`.
874  decoder->url.append(data, length);
875 
876  return 0;
877  }
878 
879  static int on_header_field(http_parser* p, const char* data, size_t length)
880  {
881  StreamingRequestDecoder* decoder = (StreamingRequestDecoder*) p->data;
882 
883  CHECK_NOTNULL(decoder->request);
884 
885  if (decoder->header != HEADER_FIELD) {
886  decoder->request->headers[decoder->field] = decoder->value;
887  decoder->field.clear();
888  decoder->value.clear();
889  }
890 
891  decoder->field.append(data, length);
892  decoder->header = HEADER_FIELD;
893 
894  return 0;
895  }
896 
897  static int on_header_value(http_parser* p, const char* data, size_t length)
898  {
899  StreamingRequestDecoder* decoder = (StreamingRequestDecoder*) p->data;
900 
901  CHECK_NOTNULL(decoder->request);
902 
903  decoder->value.append(data, length);
904  decoder->header = HEADER_VALUE;
905  return 0;
906  }
907 
908  static int on_headers_complete(http_parser* p)
909  {
910  StreamingRequestDecoder* decoder = (StreamingRequestDecoder*) p->data;
911 
912  CHECK_NOTNULL(decoder->request);
913 
914  // Add final header.
915  decoder->request->headers[decoder->field] = decoder->value;
916  decoder->field.clear();
917  decoder->value.clear();
918 
919  decoder->request->method =
920  http_method_str((http_method) decoder->parser.method);
921 
922  decoder->request->keepAlive = http_should_keep_alive(&decoder->parser) != 0;
923 
924  // Parse the URL. This data was incrementally built up during calls
925  // to `on_url`.
926  http_parser_url url;
927  http_parser_url_init(&url);
928  int parse_url =
929  http_parser_parse_url(decoder->url.data(), decoder->url.size(), 0, &url);
930 
931  if (parse_url != 0) {
932  decoder->failure = true;
933  return parse_url;
934  }
935 
936  if (url.field_set & (1 << UF_PATH)) {
937  decoder->request->url.path = std::string(
938  decoder->url.data() + url.field_data[UF_PATH].off,
939  url.field_data[UF_PATH].len);
940  }
941 
942  if (url.field_set & (1 << UF_FRAGMENT)) {
943  decoder->request->url.fragment = std::string(
944  decoder->url.data() + url.field_data[UF_FRAGMENT].off,
945  url.field_data[UF_FRAGMENT].len);
946  }
947 
948  if (url.field_set & (1 << UF_QUERY)) {
949  decoder->query = std::string(
950  decoder->url.data() + url.field_data[UF_QUERY].off,
951  url.field_data[UF_QUERY].len);
952  }
953 
954  // Parse the query key/values.
956  http::query::decode(decoder->query);
957 
958  if (decoded.isError()) {
959  decoder->failure = true;
960  return 1;
961  }
962 
963  decoder->request->url.query = std::move(decoded.get());
964 
965  Option<std::string> encoding =
966  decoder->request->headers.get("Content-Encoding");
967 
968  if (encoding.isSome() && encoding.get() == "gzip") {
969  decoder->decompressor =
970  Owned<gzip::Decompressor>(new gzip::Decompressor());
971  }
972 
973  CHECK_NONE(decoder->writer);
974 
975  http::Pipe pipe;
976  decoder->writer = pipe.writer();
977  decoder->request->reader = pipe.reader();
978 
979  // Send the request to the caller, but keep a Pipe::Writer for
980  // streaming the body content into the request.
981  decoder->requests.push_back(decoder->request);
982  decoder->request = nullptr;
983 
984  return 0;
985  }
986 
987  static int on_body(http_parser* p, const char* data, size_t length)
988  {
989  StreamingRequestDecoder* decoder = (StreamingRequestDecoder*) p->data;
990 
991  CHECK_SOME(decoder->writer);
992 
993  http::Pipe::Writer writer = decoder->writer.get(); // Remove const.
994 
995  std::string body;
996  if (decoder->decompressor.get() != nullptr) {
997  Try<std::string> decompressed =
998  decoder->decompressor->decompress(std::string(data, length));
999 
1000  if (decompressed.isError()) {
1001  decoder->failure = true;
1002  return 1;
1003  }
1004 
1005  body = std::move(decompressed.get());
1006  } else {
1007  body = std::string(data, length);
1008  }
1009 
1010  writer.write(std::move(body));
1011 
1012  return 0;
1013  }
1014 
1015  static int on_message_complete(http_parser* p)
1016  {
1017  StreamingRequestDecoder* decoder = (StreamingRequestDecoder*) p->data;
1018 
1019  // This can happen if the callback `on_headers_complete()` had failed
1020  // earlier (e.g., due to invalid query parameters).
1021  if (decoder->writer.isNone()) {
1022  CHECK(decoder->failure);
1023  return 1;
1024  }
1025 
1026  http::Pipe::Writer writer = decoder->writer.get(); // Remove const.
1027 
1028  if (decoder->decompressor.get() != nullptr &&
1029  !decoder->decompressor->finished()) {
1030  writer.fail("Failed to decompress body");
1031  decoder->failure = true;
1032  return 1;
1033  }
1034 
1035  writer.close();
1036 
1037  decoder->writer = None();
1038 
1039  return 0;
1040  }
1041 
1042  bool failure;
1043 
1044  http_parser parser;
1045  http_parser_settings settings;
1046 
1047  enum
1048  {
1049  HEADER_FIELD,
1050  HEADER_VALUE
1051  } header;
1052 
1053  std::string field;
1054  std::string value;
1055  std::string query;
1056  std::string url;
1057 
1058  http::Request* request;
1060  Owned<gzip::Decompressor> decompressor;
1061 
1062  std::deque<http::Request*> requests;
1063 };
1064 
1065 } // namespace process {
1066 
1067 #endif // __DECODER_HPP__
~DataDecoder()
Definition: decoder.hpp:69
Definition: http.hpp:557
Definition: gzip.hpp:88
void reset()
Definition: owned.hpp:130
Definition: try.hpp:34
std::string status
Definition: http.hpp:621
enum process::http::Request::@3 type
#define CHECK_NONE(expression)
Definition: check.hpp:48
Definition: http.hpp:651
bool writingBody() const
Definition: decoder.hpp:582
StreamingRequestDecoder()
Definition: decoder.hpp:759
bool failed() const
Definition: decoder.hpp:341
bool failed() const
Definition: decoder.hpp:575
~StreamingResponseDecoder()
Definition: decoder.hpp:537
Try< hashmap< std::string, std::string > > decode(const std::string &query)
Decode a string that is Base64-encoded with the standard Base64 alphabet.
Definition: base64.hpp:172
bool isSome() const
Definition: option.hpp:115
Definition: http.hpp:518
Definition: http.hpp:649
std::deque< http::Request * > decode(const char *data, size_t length)
Definition: decoder.hpp:78
bool contains(const std::string &s, const std::string &substr)
Definition: strings.hpp:406
#define CHECK_SOME(expression)
Definition: check.hpp:44
std::string body
Definition: http.hpp:654
Definition: decoder.hpp:290
Definition: http.hpp:340
Option< T > max(const Option< T > &left, const Option< T > &right)
Definition: option.hpp:199
Try< std::string > decompress(const std::string &compressed)
Definition: gzip.hpp:243
~ResponseDecoder()
Definition: decoder.hpp:314
StreamingResponseDecoder()
Definition: decoder.hpp:506
bool fail(const std::string &message)
Try< std::array< int, 2 > > pipe()
Definition: pipe.hpp:26
const T & get() const &
Definition: option.hpp:118
Definition: decoder.hpp:503
bool failed() const
Definition: decoder.hpp:825
~StreamingRequestDecoder()
Definition: decoder.hpp:788
std::deque< http::Response * > decode(const char *data, size_t length)
Definition: decoder.hpp:323
Result< Process > process(pid_t pid)
Definition: freebsd.hpp:30
DataDecoder()
Definition: decoder.hpp:49
Definition: decoder.hpp:756
Definition: none.hpp:27
bool isError() const
Definition: try.hpp:71
bool failed() const
Definition: decoder.hpp:96
std::string path
Definition: http.hpp:655
Definition: http.hpp:595
hashmap< uint16_t, std::string > * statuses
Definition: decoder.hpp:46
std::deque< http::Response * > decode(const char *data, size_t length)
Definition: decoder.hpp:550
static std::string string(uint16_t code)
Headers headers
Definition: http.hpp:623
enum process::http::Response::@4 type
std::deque< http::Request * > decode(const char *data, size_t length)
Definition: decoder.hpp:801
const T & get() const
Definition: try.hpp:73
ResponseDecoder()
Definition: decoder.hpp:293