Apache Mesos
status_update_manager_process.hpp
Go to the documentation of this file.
1 // Licensed to the Apache Software Foundation (ASF) under one
2 // or more contributor license agreements. See the NOTICE file
3 // distributed with this work for additional information
4 // regarding copyright ownership. The ASF licenses this file
5 // to you under the Apache License, Version 2.0 (the
6 // "License"); you may not use this file except in compliance
7 // with the License. You may obtain a copy of the License at
8 //
9 // http://www.apache.org/licenses/LICENSE-2.0
10 //
11 // Unless required by applicable law or agreed to in writing, software
12 // distributed under the License is distributed on an "AS IS" BASIS,
13 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 // See the License for the specific language governing permissions and
15 // limitations under the License.
16 
17 #ifndef __STATUS_UPDATE_MANAGER_PROCESS_HPP__
18 #define __STATUS_UPDATE_MANAGER_PROCESS_HPP__
19 
20 #include <list>
21 #include <queue>
22 #include <string>
23 #include <utility>
24 
25 #include <mesos/mesos.hpp>
26 #include <mesos/type_utils.hpp>
27 
28 #include <process/delay.hpp>
29 #include <process/future.hpp>
30 #include <process/owned.hpp>
31 #include <process/protobuf.hpp>
32 #include <process/timeout.hpp>
33 
34 #include <stout/hashmap.hpp>
35 #include <stout/hashset.hpp>
36 #include <stout/duration.hpp>
37 #include <stout/lambda.hpp>
38 #include <stout/option.hpp>
39 #include <stout/os.hpp>
40 #include <stout/path.hpp>
41 #include <stout/protobuf.hpp>
42 #include <stout/stringify.hpp>
43 #include <stout/try.hpp>
44 #include <stout/utils.hpp>
45 #include <stout/uuid.hpp>
46 
47 #include <stout/os/ftruncate.hpp>
48 
50 
51 #include "slave/constants.hpp"
52 
53 namespace mesos {
54 namespace internal {
55 
56 // `StatusUpdateManagerProcess` is responsible for
57 //
58 // 1) Reliably sending status updates.
59 // 2) Checkpointing updates to disk (optional).
60 // 3) Receiving ACKs.
61 // 4) Recovering checkpointed status updates after failover.
62 //
63 // It takes the following template parameters:
64 // - `IDType` the type of the objects used to identify the managed streams.
65 // - `CheckpointType` the type of the protobuf message written to checkpoint
66 // the streams.
67 // - `UpdateType` the type of the status updates that will be managed.
68 //
69 // NOTE: Unless first paused, this actor will forward updates as soon as
70 // possible; for example, during recovery or as soon as the first status update
71 // is processed.
72 //
73 // This process does NOT garbage collect any checkpointed state. The users of it
74 // are responsible for the garbage collection of the status updates files.
75 //
76 // TODO(gkleiman): make `TaskStatusUpdateManager` use this actor (MESOS-8296).
77 template <typename IDType, typename CheckpointType, typename UpdateType>
79  : public ProtobufProcess<
80  StatusUpdateManagerProcess<IDType, CheckpointType, UpdateType>>
81 {
82 public:
83  // This struct contains a map from stream ID to the stream state
84  // recovered for the status updates file.
85  //
86  // The stream state will be `None` if:
87  //
88  // * The status updates file didn't exist.
89  // * The status updates file was empty.
90  //
91  // The stream state contains all the status updates (both acknowledged and
92  // pending) added to the stream.
93  //
94  // This struct also contains a count of the recoverable errors found during
95  // non-strict recovery.
96  struct State
97  {
98  struct StreamState
99  {
100  std::list<UpdateType> updates;
102 
103  StreamState() : updates(), terminated(false) {}
104  };
105 
106  // The value will be `None` if the stream could not be recovered.
108  unsigned int errors;
109 
110  State() : streams(), errors(0) {}
111  };
112 
114  const std::string& id,
115  const std::string& _statusUpdateType)
116  : process::ProcessBase(process::ID::generate(id)),
117  statusUpdateType(_statusUpdateType),
118  paused(false) {}
119 
122  const StatusUpdateManagerProcess& that) = delete;
123 
124  // Implementation.
125 
126  // Explicitly use `initialize` since we're overloading below.
128 
129  // Initializes the actor with the necessary callbacks.
130  //
131  // `_forwardCallback` is called whenever there is a new status update that
132  // needs to be forwarded.
133  // `_getPath` is called in order to generate the path of a status update
134  // stream checkpoint file, given an `IDType`.
136  const lambda::function<void(const UpdateType&)>& _forwardCallback,
137  const lambda::function<const std::string(const IDType&)>& _getPath)
138  {
139  forwardCallback = _forwardCallback;
140  getPath = _getPath;
141  }
142 
143  // Forwards the status update on the specified update stream.
144  //
145  // If `checkpoint` is `false`, the update will be retried as long as it is in
146  // memory, but it will not be checkpointed.
148  const UpdateType& update,
149  const IDType& streamId,
150  bool checkpoint)
151  {
152  LOG(INFO) << "Received " << statusUpdateType << " " << update;
153 
154  if (!streams.contains(streamId)) {
156  createStatusUpdateStream(
157  streamId,
158  update.has_framework_id()
159  ? Option<FrameworkID>(update.framework_id())
160  : None(),
161  checkpoint);
162 
163  if (create.isError()) {
164  return process::Failure(create.error());
165  }
166  }
167  CHECK(streams.contains(streamId));
168  StatusUpdateStream* stream = streams[streamId].get();
169 
170  if (update.has_latest_status()) {
171  return process::Failure(
172  "Expected " + statusUpdateType + " to not contain 'latest_status'");
173  }
174 
175  // Verify that we didn't get a non-checkpointable update for a
176  // stream that is checkpointable, and vice-versa.
177  if (stream->checkpointed() != checkpoint) {
178  return process::Failure(
179  "Mismatched checkpoint value for " + statusUpdateType + " " +
180  stringify(update) + " (expected checkpoint=" +
181  stringify(stream->checkpointed()) + " actual checkpoint=" +
182  stringify(checkpoint) + ")");
183  }
184 
185  // Verify that the framework ID of the update matches the framework ID
186  // of the stream.
187  if (update.has_framework_id() != stream->frameworkId.isSome()) {
188  return process::Failure(
189  "Mismatched framework ID for " + statusUpdateType +
190  " " + stringify(update) + " (expected " +
191  (stream->frameworkId.isSome()
192  ? stringify(stream->frameworkId.get())
193  : "no framework ID") +
194  " got " +
195  (update.has_framework_id()
196  ? stringify(update.framework_id())
197  : "no framework ID") +
198  ")");
199  }
200 
201  if (update.has_framework_id() &&
202  update.framework_id() != stream->frameworkId.get()) {
203  return process::Failure(
204  "Mismatched framework ID for " + statusUpdateType +
205  " " + stringify(update) +
206  " (expected " + stringify(stream->frameworkId.get()) +
207  " actual " + stringify(update.framework_id()) + ")");
208  }
209 
210  // Handle the status update.
211  Try<bool> result = stream->update(update);
212  if (result.isError()) {
213  return process::Failure(result.error());
214  }
215 
216  // This only happens if the status update is a duplicate.
217  if (!result.get()) {
218  return Nothing();
219  }
220 
221  // Forward the status update if this is at the front of the queue.
222  // Subsequent status updates will be sent in `acknowledgement()`.
223  if (!paused && stream->pending.size() == 1) {
224  CHECK_NONE(stream->timeout);
225 
226  const Result<UpdateType>& next = stream->next();
227  if (next.isError()) {
228  return process::Failure(next.error());
229  }
230 
231  CHECK_SOME(next);
232  stream->timeout =
233  forward(stream, next.get(), slave::STATUS_UPDATE_RETRY_INTERVAL_MIN);
234  }
235 
236  return Nothing();
237  }
238 
239  // Process the acknowledgment of a status update.
240  //
241  // This will result in the next status update being forwarded.
242  //
243  // Returns `true` if the ACK is handled successfully (e.g., checkpointed)
244  // and the task's status update stream is not terminated.
245  // `false` same as above except the status update stream is
246  // terminated.
247  // `Failure` if there are any errors (e.g., duplicate, checkpointing).
249  const IDType& streamId,
250  const id::UUID& uuid)
251  {
252  LOG(INFO) << "Received " << statusUpdateType
253  << " acknowledgement (UUID: " << uuid << ")"
254  << " for stream " << stringify(streamId);
255 
256  // This might happen if we haven't completed recovery yet or if the
257  // acknowledgement is for a stream that has been cleaned up.
258  if (!streams.contains(streamId)) {
259  return process::Failure(
260  "Cannot find the " + statusUpdateType + " stream " +
261  stringify(streamId));
262  }
263 
264  StatusUpdateStream* stream = streams[streamId].get();
265 
266  // Handle the acknowledgement.
267  Try<bool> result = stream->acknowledgement(uuid);
268 
269  if (result.isError()) {
270  return process::Failure(result.error());
271  }
272 
273  if (!result.get()) {
274  return process::Failure(
275  "Duplicate " + statusUpdateType + " acknowledgement");
276  }
277 
278  stream->timeout = None();
279 
280  // Get the next update in the queue.
281  const Result<UpdateType>& next = stream->next();
282  if (next.isError()) {
283  return process::Failure(next.error());
284  }
285 
286  bool terminated = stream->terminated;
287  if (terminated) {
288  if (next.isSome()) {
289  LOG(WARNING) << "Acknowledged a terminal " << statusUpdateType
290  << " but updates are still pending";
291  }
292  cleanupStatusUpdateStream(streamId);
293  } else if (!paused && next.isSome()) {
294  // Forward the next queued status update.
295  stream->timeout =
296  forward(stream, next.get(), slave::STATUS_UPDATE_RETRY_INTERVAL_MIN);
297  }
298 
299  return !terminated;
300  }
301 
302  // Recovers the status update manager's state using the supplied stream IDs.
303  //
304  // Returns:
305  // - The recovered state if successful.
306  // - The recovered state, including the number of errors encountered, if
307  // `strict == false` and any of the streams couldn't be recovered.
308  // - A `Failure` if `strict == true` and any of the streams couldn't be
309  // recovered.
311  const std::list<IDType>& streamIds,
312  bool strict)
313  {
314  LOG(INFO) << "Recovering " << statusUpdateType << " manager";
315 
316  State state;
317  foreach (const IDType& streamId, streamIds) {
319  recoverStatusUpdateStream(streamId, strict);
320 
321  if (result.isError()) {
322  const std::string message =
323  "Failed to recover " + statusUpdateType + " stream " +
324  stringify(streamId) + ": " + result.error();
325  LOG(WARNING) << message;
326 
327  if (strict) {
328  foreachkey (const IDType& streamId, utils::copy(streams)) {
329  cleanupStatusUpdateStream(streamId);
330  }
331 
332  CHECK(streams.empty());
333  CHECK(frameworkStreams.empty());
334 
335  return process::Failure(message);
336  }
337 
338  state.errors++;
339  } else if (result.isNone()) {
340  // This can happen if the initial checkpoint of the stream didn't
341  // complete.
342  state.streams[streamId] = None();
343  } else {
344  const typename StatusUpdateStream::State& streamState = result.get();
345 
346  state.streams[streamId] = typename State::StreamState();
347  state.streams[streamId]->updates = streamState.updates;
348  state.streams[streamId]->terminated = streamState.terminated;
349 
350  if (streamState.error) {
351  state.errors++;
352  }
353  }
354  }
355 
356  return state;
357  }
358 
359  // Closes all status update streams corresponding to a framework.
360  //
361  // NOTE: This stops retrying any pending status updates for this framework,
362  // but does NOT garbage collect any checkpointed state. The caller is
363  // responsible for garbage collection after this method has returned.
364  void cleanup(const FrameworkID& frameworkId)
365  {
366  LOG(INFO) << "Closing " << statusUpdateType << " streams of framework "
367  << frameworkId;
368 
369  if (frameworkStreams.contains(frameworkId)) {
370  foreach (const IDType& streamId,
371  utils::copy(frameworkStreams[frameworkId])) {
372  cleanupStatusUpdateStream(streamId);
373  }
374  }
375  }
376 
377  void pause()
378  {
379  LOG(INFO) << "Pausing " << statusUpdateType << " manager";
380  paused = true;
381  }
382 
383  void resume()
384  {
385  LOG(INFO) << "Resuming " << statusUpdateType << " manager";
386  paused = false;
387 
389  const Result<UpdateType>& next = stream->next();
390 
391  if (next.isSome()) {
392  const UpdateType& update = next.get();
393 
394  LOG(INFO) << "Sending " << statusUpdateType << " " << update;
395 
396  stream->timeout = forward(
398  }
399  }
400  }
401 
402 private:
403  // Forward declarations.
404  class StatusUpdateStream;
405 
406  // Helper methods.
407 
408  // Creates a new status update stream, adding it to `streams`.
409  Try<Nothing> createStatusUpdateStream(
410  const IDType& streamId,
411  const Option<FrameworkID>& frameworkId,
412  bool checkpoint)
413  {
414  VLOG(1) << "Creating " << statusUpdateType << " stream "
415  << stringify(streamId) << " checkpoint=" << stringify(checkpoint);
416 
419  statusUpdateType,
420  streamId,
421  frameworkId,
422  checkpoint ? Option<std::string>(getPath(streamId)) : None());
423 
424  if (stream.isError()) {
425  return Error(stream.error());
426  }
427 
428  streams[streamId] = std::move(stream.get());
429 
430  if (frameworkId.isSome()) {
431  frameworkStreams[frameworkId.get()].insert(streamId);
432  }
433 
434  return Nothing();
435  }
436 
437 
438  // Recovers a status update stream and adds it to the map of streams.
439  Result<typename StatusUpdateStream::State> recoverStatusUpdateStream(
440  const IDType& streamId,
441  bool strict)
442  {
443  VLOG(1) << "Recovering " << statusUpdateType << " stream "
444  << stringify(streamId);
445 
446  Result<std::pair<
448  typename StatusUpdateStream::State>> result =
450  statusUpdateType, streamId, getPath(streamId), strict);
451 
452  if (result.isError()) {
453  return Error(result.error());
454  }
455 
456  if (result.isNone()) {
457  return None();
458  }
459 
460  process::Owned<StatusUpdateStream> stream = std::get<0>(result.get());
461  typename StatusUpdateStream::State& streamState = std::get<1>(result.get());
462 
463  if (stream->terminated) {
464  return streamState;
465  }
466 
467  if (stream->frameworkId.isSome()) {
468  frameworkStreams[stream->frameworkId.get()].insert(streamId);
469  }
470 
471  // Get the next update in the queue.
472  const Result<UpdateType>& next = stream->next();
473  if (next.isError()) {
474  return Error(next.error());
475  }
476 
477  if (!paused && next.isSome()) {
478  // Forward the next queued status update.
479  stream->timeout = forward(
481  }
482 
483  streams[streamId] = std::move(stream);
484 
485  return streamState;
486  }
487 
488  void cleanupStatusUpdateStream(const IDType& streamId)
489  {
490  VLOG(1) << "Cleaning up " << statusUpdateType << " stream "
491  << stringify(streamId);
492 
493  CHECK(streams.contains(streamId)) << "Cannot find " << statusUpdateType
494  << " stream " << stringify(streamId);
495 
496  StatusUpdateStream* stream = streams[streamId].get();
497 
498  if (stream->frameworkId.isSome()) {
499  const FrameworkID frameworkId = stream->frameworkId.get();
500 
501  CHECK(frameworkStreams.contains(frameworkId));
502 
503  frameworkStreams[frameworkId].erase(streamId);
504  if (frameworkStreams[frameworkId].empty()) {
505  frameworkStreams.erase(frameworkId);
506  }
507  }
508 
509  streams.erase(streamId);
510  }
511 
512  // Forwards the status update and starts a timer based on the `duration` to
513  // check for ACK.
514  process::Timeout forward(
515  StatusUpdateStream* stream,
516  const UpdateType& _update,
517  const Duration& duration)
518  {
519  CHECK(!paused);
520  CHECK(!_update.has_latest_status());
521  CHECK_NOTNULL(stream);
522 
523  UpdateType update(_update);
524  update.mutable_latest_status()->CopyFrom(
525  stream->pending.empty() ? _update.status()
526  : stream->pending.back().status());
527 
528  VLOG(1) << "Forwarding " << statusUpdateType << " " << update;
529 
530  forwardCallback(update);
531 
532  // Send a message to self to resend after some delay if no ACK is received.
533  return delay(
534  duration,
537  IDType,
538  CheckpointType,
539  UpdateType>>::self(),
540  &StatusUpdateManagerProcess::timeout,
541  stream->streamId,
542  duration)
543  .timeout();
544  }
545 
546  // Status update timeout.
547  void timeout(const IDType& streamId, const Duration& duration)
548  {
549  if (paused || !streams.contains(streamId)) {
550  return;
551  }
552 
553  StatusUpdateStream* stream = streams[streamId].get();
554 
555  // Check and see if we should resend the status update.
556  if (!stream->pending.empty()) {
557  CHECK_SOME(stream->timeout);
558 
559  if (stream->timeout->expired()) {
560  const UpdateType& update = stream->pending.front();
561  LOG(WARNING) << "Resending " << statusUpdateType << " " << update;
562 
563  // Bounded exponential backoff.
564  Duration duration_ =
566 
567  stream->timeout = forward(stream, update, duration_);
568  }
569  }
570  }
571 
572  // Type of status updates handled by the stream, e.g., "operation status
573  // update".
574  const std::string statusUpdateType;
575 
576  lambda::function<void(UpdateType)> forwardCallback;
577  lambda::function<const std::string(const IDType&)> getPath;
578 
580  hashmap<FrameworkID, hashset<IDType>> frameworkStreams;
581  bool paused;
582 
583  // Handles the status updates and acknowledgements, checkpointing them if
584  // necessary. It also holds the information about received, acknowledged and
585  // pending status updates.
586  class StatusUpdateStream
587  {
588  public:
589  struct State
590  {
591  std::list<UpdateType> updates;
592 
593  bool error;
594  bool terminated; // Set to `true` if a terminal status update was ACK'ed.
595 
596  State() : updates(), error(false), terminated(false) {}
597  };
598 
599  ~StatusUpdateStream()
600  {
601  if (fd.isSome()) {
602  Try<Nothing> close = os::close(fd.get());
603 
604  if (close.isError()) {
605  CHECK_SOME(path);
606  LOG(WARNING) << "Failed to close " << statusUpdateType
607  << " stream file '" << path.get() << "': "
608  << close.error();
609  }
610  }
611  }
612 
614  const std::string& statusUpdateType,
615  const IDType& streamId,
616  const Option<FrameworkID>& frameworkId,
617  const Option<std::string>& path)
618  {
619  Option<int_fd> fd;
620 
621  if (path.isSome()) {
622  if (os::exists(path.get())) {
623  return Error("The file '" + path.get() + "' already exists");
624  }
625 
626  // Create the base updates directory, if it doesn't exist.
627  const std::string& dirName = Path(path.get()).dirname();
628  Try<Nothing> directory = os::mkdir(dirName);
629  if (directory.isError()) {
630  return Error(
631  "Failed to create '" + dirName + "': " + directory.error());
632  }
633 
634  // Open the updates file.
635  Try<int_fd> result = os::open(
636  path.get(),
637  O_CREAT | O_SYNC | O_WRONLY | O_CLOEXEC,
639 
640  if (result.isError()) {
641  return Error(
642  "Failed to open '" + path.get() + "' : " + result.error());
643  }
644 
645  fd = result.get();
646  }
647 
649  new StatusUpdateStream(statusUpdateType, streamId, path, fd));
650 
651  stream->frameworkId = frameworkId;
652 
653  return std::move(stream);
654  }
655 
656 
658  const std::string& statusUpdateType,
659  const IDType& streamId,
660  const std::string& path,
661  bool strict)
662  {
663  if (os::exists(Path(path).dirname()) && !os::exists(path)) {
664  // This could happen if the process died before it checkpointed any
665  // status updates.
666  return None();
667  }
668 
669  // Open the status updates file for reading and writing.
670  Try<int_fd> fd = os::open(
671  path,
672 #ifdef __WINDOWS__
673  O_BINARY |
674 #endif // __WINDOWS__
675  O_SYNC | O_RDWR | O_CLOEXEC);
676 
677  if (fd.isError()) {
678  return Error("Failed to open '" + path + "': " + fd.error());
679  }
680 
682  new StatusUpdateStream(statusUpdateType, streamId, path, fd.get()));
683 
684  VLOG(1) << "Replaying " << statusUpdateType << " stream "
685  << stringify(streamId);
686 
687  // Read the updates/acknowledgments, building both the stream's in-memory
688  // structures and the state object which will be returned.
689 
690  State state;
691  Result<CheckpointType> record = None();
692  while (true) {
693  // Ignore errors due to partial protobuf read and enable undoing failed
694  // reads by reverting to the previous seek position.
695  record = ::protobuf::read<CheckpointType>(fd.get(), true, true);
696 
697  if (!record.isSome()) {
698  break;
699  }
700 
701  switch (record->type()) {
702  case CheckpointType::ACK: {
703  // Get the corresponding update for this ACK.
704  const Result<UpdateType>& update = stream->next();
705  if (update.isError()) {
706  return Error(update.error());
707  }
708 
709  if (update.isNone()) {
710  return Error(
711  "Unexpected " + statusUpdateType + " acknowledgment"
712  " (UUID: " + stringify(record->uuid()) +
713  ") for stream " + stringify(streamId));
714  }
715  stream->_handle(update.get(), record->type());
716  break;
717  }
718  case CheckpointType::UPDATE: {
719  stream->_handle(record->update(), record->type());
720  state.updates.push_back(record->update());
721  break;
722  }
723  }
724  }
725 
726  // Always truncate the file to contain only valid updates.
727  // NOTE: This is safe even though we ignore partial protobuf read
728  // errors above, because the `fd` is properly set to the end of the
729  // last valid update by `protobuf::read()`.
730  Try<off_t> currentPosition = os::lseek(fd.get(), 0, SEEK_CUR);
731  if (currentPosition.isError()) {
732  return Error(
733  "Failed to lseek file '" + path + "': " + currentPosition.error());
734  }
735 
736  Try<Nothing> truncated = os::ftruncate(fd.get(), currentPosition.get());
737 
738  if (truncated.isError()) {
739  return Error(
740  "Failed to truncate file '" + path + "': " + truncated.error());
741  }
742 
743  // After reading a non-corrupted updates file, `record` should be `none`.
744  if (record.isError()) {
745  std::string message =
746  "Failed to read file '" + path + "': " + record.error();
747 
748  if (strict) {
749  return Error(message);
750  }
751 
752  LOG(WARNING) << message;
753  state.error = true;
754  }
755 
756  state.terminated = stream->terminated;
757 
758  if (state.updates.empty()) {
759  // A stream is created only once there's something to write to it, so
760  // this can only happen if the checkpointing of the first update was
761  // interrupted.
762 
763  // On Windows you can only delete a file if it is not open. The
764  // stream's destructor will close the file, so we need to destroy it
765  // here.
766  stream.reset();
767 
768  Try<Nothing> removed = os::rm(path);
769  if (removed.isError()) {
770  return Error(
771  "Failed to remove file '" + path + "': " + removed.error());
772  }
773 
774  return None();
775  }
776 
777  return std::make_pair(stream, state);
778  }
779 
780  // This function handles the update, checkpointing if necessary.
781  //
782  // Returns `true`: if the update is successfully handled.
783  // `false`: if the update is a duplicate or has already been
784  // acknowledged.
785  // `Error`: any errors (e.g., checkpointing).
786  Try<bool> update(const UpdateType& update)
787  {
788  if (error.isSome()) {
789  return Error(error.get());
790  }
791 
792  if (!update.status().has_uuid()) {
793  return Error("Status update is missing 'uuid'");
794  }
795  Try<id::UUID> uuid = id::UUID::fromBytes(update.status().uuid().value());
796  CHECK_SOME(uuid);
797 
798  // Check that this status update has not already been acknowledged.
799  if (acknowledged.contains(uuid.get())) {
800  LOG(WARNING) << "Ignoring " << statusUpdateType << " " << update
801  << " that has already been acknowledged";
802  return false;
803  }
804 
805  // Check that this update has not already been received.
806  if (received.contains(uuid.get())) {
807  LOG(WARNING) << "Ignoring duplicate " << statusUpdateType << " "
808  << update;
809  return false;
810  }
811 
812  // Handle the update, checkpointing if necessary.
813  Try<Nothing> result = handle(update, CheckpointType::UPDATE);
814  if (result.isError()) {
815  return Error(result.error());
816  }
817 
818  return true;
819  }
820 
821  // This function handles the ACK, checkpointing if necessary.
822  //
823  // Returns `true`: if the acknowledgement is successfully handled.
824  // `false`: if the acknowledgement is a duplicate.
825  // `Error`: Any errors (e.g., checkpointing).
826  Try<bool> acknowledgement(const id::UUID& uuid)
827  {
828  if (error.isSome()) {
829  return Error(error.get());
830  }
831 
832  // Get the corresponding update for this ACK.
833  const Result<UpdateType>& update_ = next();
834  if (update_.isError()) {
835  return Error(update_.error());
836  }
837 
838  // This might happen if we retried a status update and got back
839  // acknowledgments for both the original and the retried update.
840  if (update_.isNone()) {
841  return Error(
842  "Unexpected acknowledgment (UUID: " + uuid.toString() +
843  ") for " + statusUpdateType + " stream " + stringify(streamId));
844  }
845 
846  const UpdateType& update = update_.get();
847 
848  if (acknowledged.contains(uuid)) {
849  LOG(WARNING) << "Duplicate acknowledgment for " << statusUpdateType
850  << " " << update;
851  return false;
852  }
853 
854  Try<id::UUID> updateUuid =
855  id::UUID::fromBytes(update.status().uuid().value());
856  CHECK_SOME(updateUuid);
857 
858  // This might happen if we retried a status update and got back
859  // acknowledgments for both the original and the retried update.
860  if (uuid != updateUuid.get()) {
861  LOG(WARNING) << "Unexpected " << statusUpdateType
862  << " acknowledgment (received " << uuid
863  << ", expecting " << updateUuid.get() << ") for "
864  << update;
865  return false;
866  }
867 
868  // Handle the ACK, checkpointing if necessary.
869  Try<Nothing> result = handle(update, CheckpointType::ACK);
870  if (result.isError()) {
871  return Error(result.error());
872  }
873 
874  return true;
875  }
876 
877  // Returns the next update (or none, if empty) in the queue.
878  Result<UpdateType> next()
879  {
880  if (error.isSome()) {
881  return Error(error.get());
882  }
883 
884  if (!pending.empty()) {
885  return pending.front();
886  }
887 
888  return None();
889  }
890 
891  // Returns `true` if the stream is checkpointed, `false` otherwise.
892  bool checkpointed() { return path.isSome(); }
893 
894  const IDType streamId;
895 
896  bool terminated;
897  Option<FrameworkID> frameworkId;
898  Option<process::Timeout> timeout; // Timeout for resending status update.
899  std::queue<UpdateType> pending;
900 
901  private:
902  StatusUpdateStream(
903  const std::string& _statusUpdateType,
904  const IDType& _streamId,
905  const Option<std::string>& _path,
906  Option<int_fd> _fd)
907  : streamId(_streamId),
908  terminated(false),
909  statusUpdateType(_statusUpdateType),
910  path(_path),
911  fd(_fd) {}
912 
913  // Handles the status update and writes it to disk, if necessary.
914  //
915  // TODO(vinod): The write has to be asynchronous to avoid status updates
916  // that are being checkpointed, blocking the processing of other updates.
917  // One solution is to wrap the protobuf::write inside async, but it's
918  // probably too much of an overhead to spin up a new libprocess per status
919  // update?
920  // A better solution might be to be have async write capability for file IO.
921  Try<Nothing> handle(
922  const UpdateType& update,
923  const typename CheckpointType::Type& type)
924  {
925  CHECK_NONE(error);
926 
927  // Checkpoint the update if necessary.
928  if (checkpointed()) {
929  LOG(INFO) << "Checkpointing " << type << " for " << statusUpdateType
930  << " " << update;
931 
932  CHECK_SOME(fd);
933 
934  CheckpointType record;
935  record.set_type(type);
936 
937  switch (type) {
938  case CheckpointType::UPDATE:
939  record.mutable_update()->CopyFrom(update);
940  break;
941  case CheckpointType::ACK:
942  record.mutable_uuid()->CopyFrom(update.status().uuid());
943  break;
944  }
945 
946  Try<Nothing> write = ::protobuf::write(fd.get(), record);
947  if (write.isError()) {
948  error =
949  "Failed to write to file '" + path.get() + "': " + write.error();
950  return Error(error.get());
951  }
952  }
953 
954  // Now actually handle the update.
955  _handle(update, type);
956 
957  return Nothing();
958  }
959 
960 
961  // Handles the status update without checkpointing.
962  void _handle(
963  const UpdateType& update,
964  const typename CheckpointType::Type& type)
965  {
966  CHECK_NONE(error);
967 
968  Try<id::UUID> uuid = id::UUID::fromBytes(update.status().uuid().value());
969  CHECK_SOME(uuid);
970 
971  switch (type) {
972  case CheckpointType::UPDATE:
973  if (update.has_framework_id()) {
974  frameworkId = update.framework_id();
975  }
976 
977  received.insert(uuid.get());
978 
979  // Add it to the pending updates queue.
980  pending.push(update);
981  break;
982  case CheckpointType::ACK:
983  acknowledged.insert(uuid.get());
984 
985  // Remove the corresponding update from the pending queue.
986  pending.pop();
987 
988  if (!terminated) {
989  terminated = protobuf::isTerminalState(update.status().state());
990  }
991  break;
992  }
993  }
994 
995  // Type of status updates handled by the stream, e.g., "operation status
996  // update".
997  const std::string& statusUpdateType;
998 
999  const Option<std::string> path; // File path of the update stream.
1000  const Option<int_fd> fd; // File descriptor to the update stream.
1001 
1002  hashset<id::UUID> received;
1003  hashset<id::UUID> acknowledged;
1004 
1005  Option<std::string> error; // Potential non-retryable error.
1006  };
1007 };
1008 
1009 } // namespace internal {
1010 } // namespace mesos {
1011 
1012 #endif // __STATUS_UPDATE_MANAGER_PROCESS_HPP__
Protocol< RecoverRequest, RecoverResponse > recover
constexpr Duration STATUS_UPDATE_RETRY_INTERVAL_MAX
Definition: constants.hpp:59
Definition: path.hpp:29
std::string generate(const std::string &prefix="")
Returns &#39;prefix(N)&#39; where N represents the number of instances where the same prefix (wrt...
Try< Nothing > checkpoint(const std::string &path, const std::string &message, bool sync, bool downgradeResources)
Definition: state.hpp:123
bool exists(const std::string &path)
Definition: exists.hpp:26
bool isNone() const
Definition: result.hpp:113
Definition: nothing.hpp:16
process::Future< State > recover(const std::list< IDType > &streamIds, bool strict)
Definition: status_update_manager_process.hpp:310
StatusUpdateManagerProcess(const std::string &id, const std::string &_statusUpdateType)
Definition: status_update_manager_process.hpp:113
Definition: errorbase.hpp:36
Try< Nothing > rm(const std::string &path)
Definition: rm.hpp:26
const mode_t S_IRGRP
Definition: windows.hpp:313
bool error
Definition: status_update_manager_process.hpp:593
ProcessBase(const std::string &id="")
T & get()&
Definition: try.hpp:80
bool pending(int signal)
Definition: signals.hpp:50
Definition: check.hpp:33
process::Future< Nothing > update(const UpdateType &update, const IDType &streamId, bool checkpoint)
Definition: status_update_manager_process.hpp:147
static Result< T > error(const std::string &message)
Definition: result.hpp:54
void cleanup(const FrameworkID &frameworkId)
Definition: status_update_manager_process.hpp:364
#define CHECK_NONE(expression)
Definition: check.hpp:54
const mode_t S_IWUSR
Definition: windows.hpp:306
Definition: future.hpp:668
constexpr int O_SYNC
Definition: open.hpp:37
Definition: status_update_manager_process.hpp:78
Try< int_fd > open(const std::string &path, int oflag, mode_t mode=0)
Definition: open.hpp:35
StreamState()
Definition: status_update_manager_process.hpp:103
Definition: status_update_manager_process.hpp:589
T * get() const
Definition: owned.hpp:117
const mode_t S_IRUSR
Definition: windows.hpp:305
Timeout timeout() const
Definition: timer.hpp:47
Definition: duration.hpp:32
Definition: check.hpp:30
StatusUpdateManagerProcess & operator=(const StatusUpdateManagerProcess &that)=delete
bool isSome() const
Definition: option.hpp:116
constexpr Duration STATUS_UPDATE_RETRY_INTERVAL_MIN
Definition: constants.hpp:58
Definition: hashmap.hpp:38
Try< Nothing > mkdir(const std::string &directory, bool recursive=true, bool sync=false)
Definition: mkdir.hpp:42
#define CHECK_SOME(expression)
Definition: check.hpp:50
hashmap< std::string, MessageHandler > message
Definition: process.hpp:455
constexpr int O_CLOEXEC
Definition: open.hpp:41
bool isTerminalState(const TaskState &state)
void pause()
Definition: status_update_manager_process.hpp:377
Try< Nothing > close(int fd)
Definition: close.hpp:24
Try< off_t > lseek(int_fd fd, off_t offset, int whence)
Definition: lseek.hpp:25
Represents a POSIX or Windows file system path and offers common path manipulations.
Definition: path.hpp:212
virtual void initialize()
Invoked when a process gets spawned.
Definition: process.hpp:100
void initialize(const lambda::function< void(const UpdateType &)> &_forwardCallback, const lambda::function< const std::string(const IDType &)> &_getPath)
Definition: status_update_manager_process.hpp:135
Definition: uuid.hpp:35
Definition: agent.hpp:25
Option< T > min(const Option< T > &left, const Option< T > &right)
Definition: option.hpp:185
Timer delay(const Duration &duration, const PID< T > &pid, void(T::*method)())
Definition: delay.hpp:31
const T & get() const &
Definition: option.hpp:119
State()
Definition: status_update_manager_process.hpp:596
Definition: protobuf.hpp:108
#define foreachvalue(VALUE, ELEMS)
Definition: foreach.hpp:77
static Try error(const E &e)
Definition: try.hpp:43
bool terminated
Definition: status_update_manager_process.hpp:101
Definition: status_update_manager_process.hpp:96
Definition: none.hpp:27
Definition: attributes.hpp:24
bool isError() const
Definition: try.hpp:78
std::list< UpdateType > updates
Definition: status_update_manager_process.hpp:100
static Try< UUID > fromBytes(const std::string &s)
Definition: uuid.hpp:49
bool terminated
Definition: status_update_manager_process.hpp:594
std::string error(const std::string &msg, uint32_t code)
Definition: timeout.hpp:24
T & get()&
Definition: result.hpp:116
Definition: executor.hpp:48
Type
Definition: capabilities.hpp:82
#define foreachkey(KEY, ELEMS)
Definition: foreach.hpp:74
Protocol< WriteRequest, WriteResponse > write
Try< uint32_t > type(const std::string &path)
bool isSome() const
Definition: result.hpp:112
bool isError() const
Definition: result.hpp:114
unsigned int errors
Definition: status_update_manager_process.hpp:108
std::string toString() const
Definition: uuid.hpp:87
Try< Nothing > create(const std::string &hierarchy, const std::string &cgroup, bool recursive=false)
process::Future< bool > acknowledgement(const IDType &streamId, const id::UUID &uuid)
Definition: status_update_manager_process.hpp:248
Definition: status_update_manager_process.hpp:98
State()
Definition: status_update_manager_process.hpp:110
std::string stringify(int flags)
Definition: owned.hpp:36
void resume()
Definition: status_update_manager_process.hpp:383
const mode_t S_IROTH
Definition: windows.hpp:321
std::list< UpdateType > updates
Definition: status_update_manager_process.hpp:591
T copy(const T &t)
Definition: utils.hpp:21
hashmap< IDType, Option< StreamState > > streams
Definition: status_update_manager_process.hpp:107
Try< Nothing > ftruncate(int fd, off_t length)
Definition: ftruncate.hpp:26