17 #ifndef __STATUS_UPDATE_MANAGER_PROCESS_HPP__ 18 #define __STATUS_UPDATE_MANAGER_PROCESS_HPP__ 77 template <
typename IDType,
typename Checkpo
intType,
typename UpdateType>
80 StatusUpdateManagerProcess<IDType, CheckpointType, UpdateType>>
114 const std::string&
id,
115 const std::string& _statusUpdateType)
117 statusUpdateType(_statusUpdateType),
136 const lambda::function<
void(
const UpdateType&)>& _forwardCallback,
137 const lambda::function<
const std::string(
const IDType&)>& _getPath)
139 forwardCallback = _forwardCallback;
149 const IDType& streamId,
152 LOG(INFO) <<
"Received " << statusUpdateType <<
" " <<
update;
154 if (!
streams.contains(streamId)) {
156 createStatusUpdateStream(
158 update.has_framework_id()
167 CHECK(
streams.contains(streamId));
168 StatusUpdateStream* stream =
streams[streamId].get();
170 if (update.has_latest_status()) {
172 "Expected " + statusUpdateType +
" to not contain 'latest_status'");
179 "Mismatched checkpoint value for " + statusUpdateType +
" " +
180 stringify(update) +
" (expected checkpoint=" +
181 stringify(stream->checkpointed()) +
" actual checkpoint=" +
187 if (update.has_framework_id() != stream->frameworkId.isSome()) {
189 "Mismatched framework ID for " + statusUpdateType +
190 " " +
stringify(update) +
" (expected " +
191 (stream->frameworkId.isSome()
193 :
"no framework ID") +
195 (update.has_framework_id()
197 :
"no framework ID") +
201 if (update.has_framework_id() &&
202 update.framework_id() != stream->frameworkId.get()) {
204 "Mismatched framework ID for " + statusUpdateType +
206 " (expected " +
stringify(stream->frameworkId.get()) +
207 " actual " +
stringify(update.framework_id()) +
")");
211 Try<bool> result = stream->update(update);
223 if (!paused && stream->pending.size() == 1) {
249 const IDType& streamId,
252 LOG(INFO) <<
"Received " << statusUpdateType
253 <<
" acknowledgement (UUID: " << uuid <<
")" 254 <<
" for stream " <<
stringify(streamId);
258 if (!
streams.contains(streamId)) {
260 "Cannot find the " + statusUpdateType +
" stream " +
264 StatusUpdateStream* stream =
streams[streamId].get();
267 Try<bool> result = stream->acknowledgement(uuid);
275 "Duplicate " + statusUpdateType +
" acknowledgement");
278 stream->timeout =
None();
289 LOG(WARNING) <<
"Acknowledged a terminal " << statusUpdateType
290 <<
" but updates are still pending";
292 cleanupStatusUpdateStream(streamId);
293 }
else if (!paused && next.
isSome()) {
311 const std::list<IDType>& streamIds,
314 LOG(INFO) <<
"Recovering " << statusUpdateType <<
" manager";
317 foreach (
const IDType& streamId, streamIds) {
319 recoverStatusUpdateStream(streamId, strict);
323 "Failed to recover " + statusUpdateType +
" stream " +
329 cleanupStatusUpdateStream(streamId);
333 CHECK(frameworkStreams.empty());
339 }
else if (result.
isNone()) {
350 if (streamState.
error) {
366 LOG(INFO) <<
"Closing " << statusUpdateType <<
" streams of framework " 369 if (frameworkStreams.contains(frameworkId)) {
370 foreach (
const IDType& streamId,
372 cleanupStatusUpdateStream(streamId);
379 LOG(INFO) <<
"Pausing " << statusUpdateType <<
" manager";
385 LOG(INFO) <<
"Resuming " << statusUpdateType <<
" manager";
394 LOG(INFO) <<
"Sending " << statusUpdateType <<
" " <<
update;
396 stream->timeout = forward(
404 class StatusUpdateStream;
410 const IDType& streamId,
414 VLOG(1) <<
"Creating " << statusUpdateType <<
" stream " 430 if (frameworkId.
isSome()) {
431 frameworkStreams[frameworkId.
get()].insert(streamId);
440 const IDType& streamId,
443 VLOG(1) <<
"Recovering " << statusUpdateType <<
" stream " 450 statusUpdateType, streamId, getPath(streamId), strict);
452 if (result.isError()) {
453 return Error(result.error());
456 if (result.isNone()) {
460 process::Owned<StatusUpdateStream> stream = std::get<0>(result.get());
463 if (stream->terminated) {
467 if (stream->frameworkId.isSome()) {
468 frameworkStreams[stream->frameworkId.
get()].insert(streamId);
477 if (!paused && next.
isSome()) {
479 stream->timeout = forward(
483 streams[streamId] = std::move(stream);
488 void cleanupStatusUpdateStream(
const IDType& streamId)
490 VLOG(1) <<
"Cleaning up " << statusUpdateType <<
" stream " 493 CHECK(
streams.contains(streamId)) <<
"Cannot find " << statusUpdateType
496 StatusUpdateStream* stream =
streams[streamId].get();
498 if (stream->frameworkId.isSome()) {
499 const FrameworkID frameworkId = stream->frameworkId.
get();
501 CHECK(frameworkStreams.contains(frameworkId));
503 frameworkStreams[frameworkId].erase(streamId);
504 if (frameworkStreams[frameworkId].empty()) {
505 frameworkStreams.erase(frameworkId);
515 StatusUpdateStream* stream,
516 const UpdateType& _update,
520 CHECK(!_update.has_latest_status());
521 CHECK_NOTNULL(stream);
523 UpdateType
update(_update);
524 update.mutable_latest_status()->CopyFrom(
525 stream->pending.empty() ? _update.status()
526 : stream->pending.back().status());
528 VLOG(1) <<
"Forwarding " << statusUpdateType <<
" " <<
update;
530 forwardCallback(update);
539 UpdateType>>::
self(),
540 &StatusUpdateManagerProcess::timeout,
547 void timeout(
const IDType& streamId,
const Duration& duration)
549 if (paused || !
streams.contains(streamId)) {
553 StatusUpdateStream* stream =
streams[streamId].get();
556 if (!stream->pending.empty()) {
559 if (stream->timeout->expired()) {
560 const UpdateType&
update = stream->pending.front();
561 LOG(WARNING) <<
"Resending " << statusUpdateType <<
" " <<
update;
567 stream->timeout = forward(stream, update, duration_);
574 const std::string statusUpdateType;
576 lambda::function<void(UpdateType)> forwardCallback;
577 lambda::function<const std::string(const IDType&)> getPath;
586 class StatusUpdateStream
596 State() : updates(), error(false), terminated(false) {}
599 ~StatusUpdateStream()
606 LOG(WARNING) <<
"Failed to close " << statusUpdateType
607 <<
" stream file '" <<
path.get() <<
"': " 614 const std::string& statusUpdateType,
615 const IDType& streamId,
623 return Error(
"The file '" + path.
get() +
"' already exists");
627 const std::string& dirName =
Path(path.
get()).dirname();
631 "Failed to create '" + dirName +
"': " + directory.
error());
640 if (result.isError()) {
642 "Failed to open '" + path.
get() +
"' : " + result.error());
649 new StatusUpdateStream(statusUpdateType, streamId, path, fd));
651 stream->frameworkId = frameworkId;
653 return std::move(stream);
658 const std::string& statusUpdateType,
659 const IDType& streamId,
660 const std::string& path,
678 return Error(
"Failed to open '" + path +
"': " + fd.
error());
682 new StatusUpdateStream(statusUpdateType, streamId, path, fd.
get()));
684 VLOG(1) <<
"Replaying " << statusUpdateType <<
" stream " 695 record = ::protobuf::read<CheckpointType>(fd.
get(),
true,
true);
701 switch (record->type()) {
702 case CheckpointType::ACK: {
711 "Unexpected " + statusUpdateType +
" acknowledgment" 715 stream->_handle(update.
get(), record->type());
718 case CheckpointType::UPDATE: {
719 stream->_handle(record->update(), record->type());
720 state.
updates.push_back(record->update());
731 if (currentPosition.
isError()) {
733 "Failed to lseek file '" + path +
"': " + currentPosition.
error());
738 if (truncated.isError()) {
740 "Failed to truncate file '" + path +
"': " + truncated.error());
746 "Failed to read file '" + path +
"': " + record.
error();
749 return Error(message);
771 "Failed to remove file '" + path +
"': " + removed.
error());
777 return std::make_pair(stream, state);
788 if (
error.isSome()) {
792 if (!update.status().has_uuid()) {
793 return Error(
"Status update is missing 'uuid'");
799 if (acknowledged.contains(uuid.
get())) {
800 LOG(WARNING) <<
"Ignoring " << statusUpdateType <<
" " << update
801 <<
" that has already been acknowledged";
806 if (received.contains(uuid.
get())) {
807 LOG(WARNING) <<
"Ignoring duplicate " << statusUpdateType <<
" " 813 Try<Nothing> result = handle(update, CheckpointType::UPDATE);
828 if (
error.isSome()) {
834 if (update_.isError()) {
835 return Error(update_.error());
840 if (update_.isNone()) {
842 "Unexpected acknowledgment (UUID: " + uuid.
toString() +
843 ") for " + statusUpdateType +
" stream " +
stringify(streamId));
846 const UpdateType& update = update_.get();
848 if (acknowledged.contains(uuid)) {
849 LOG(WARNING) <<
"Duplicate acknowledgment for " << statusUpdateType
860 if (uuid != updateUuid.
get()) {
861 LOG(WARNING) <<
"Unexpected " << statusUpdateType
862 <<
" acknowledgment (received " << uuid
863 <<
", expecting " << updateUuid.
get() <<
") for " 869 Try<Nothing> result = handle(update, CheckpointType::ACK);
880 if (
error.isSome()) {
892 bool checkpointed() {
return path.isSome(); }
894 const IDType streamId;
899 std::queue<UpdateType>
pending;
903 const std::string& _statusUpdateType,
904 const IDType& _streamId,
907 : streamId(_streamId),
909 statusUpdateType(_statusUpdateType),
922 const UpdateType& update,
928 if (checkpointed()) {
929 LOG(INFO) <<
"Checkpointing " << type <<
" for " << statusUpdateType
934 CheckpointType record;
935 record.set_type(type);
938 case CheckpointType::UPDATE:
939 record.mutable_update()->CopyFrom(update);
941 case CheckpointType::ACK:
942 record.mutable_uuid()->CopyFrom(update.status().uuid());
949 "Failed to write to file '" + path.get() +
"': " + write.
error();
955 _handle(update, type);
963 const UpdateType& update,
972 case CheckpointType::UPDATE:
973 if (update.has_framework_id()) {
974 frameworkId = update.framework_id();
977 received.insert(uuid.
get());
980 pending.push(update);
982 case CheckpointType::ACK:
983 acknowledged.insert(uuid.
get());
997 const std::string& statusUpdateType;
1012 #endif // __STATUS_UPDATE_MANAGER_PROCESS_HPP__
Protocol< RecoverRequest, RecoverResponse > recover
constexpr Duration STATUS_UPDATE_RETRY_INTERVAL_MAX
Definition: constants.hpp:59
std::string generate(const std::string &prefix="")
Returns 'prefix(N)' where N represents the number of instances where the same prefix (wrt...
Try< Nothing > checkpoint(const std::string &path, const std::string &message, bool sync, bool downgradeResources)
Definition: state.hpp:123
bool exists(const std::string &path)
Definition: exists.hpp:26
bool isNone() const
Definition: result.hpp:113
Definition: nothing.hpp:16
process::Future< State > recover(const std::list< IDType > &streamIds, bool strict)
Definition: status_update_manager_process.hpp:310
StatusUpdateManagerProcess(const std::string &id, const std::string &_statusUpdateType)
Definition: status_update_manager_process.hpp:113
Definition: errorbase.hpp:36
Try< Nothing > rm(const std::string &path)
Definition: rm.hpp:26
const mode_t S_IRGRP
Definition: windows.hpp:313
bool error
Definition: status_update_manager_process.hpp:593
ProcessBase(const std::string &id="")
T & get()&
Definition: try.hpp:80
bool pending(int signal)
Definition: signals.hpp:50
process::Future< Nothing > update(const UpdateType &update, const IDType &streamId, bool checkpoint)
Definition: status_update_manager_process.hpp:147
static Result< T > error(const std::string &message)
Definition: result.hpp:54
void cleanup(const FrameworkID &frameworkId)
Definition: status_update_manager_process.hpp:364
#define CHECK_NONE(expression)
Definition: check.hpp:54
const mode_t S_IWUSR
Definition: windows.hpp:306
Definition: future.hpp:668
constexpr int O_SYNC
Definition: open.hpp:37
Definition: status_update_manager_process.hpp:78
Try< int_fd > open(const std::string &path, int oflag, mode_t mode=0)
Definition: open.hpp:35
StreamState()
Definition: status_update_manager_process.hpp:103
Definition: status_update_manager_process.hpp:589
T * get() const
Definition: owned.hpp:117
const mode_t S_IRUSR
Definition: windows.hpp:305
Timeout timeout() const
Definition: timer.hpp:47
Definition: duration.hpp:32
StatusUpdateManagerProcess & operator=(const StatusUpdateManagerProcess &that)=delete
bool isSome() const
Definition: option.hpp:116
constexpr Duration STATUS_UPDATE_RETRY_INTERVAL_MIN
Definition: constants.hpp:58
Definition: hashmap.hpp:38
Try< Nothing > mkdir(const std::string &directory, bool recursive=true, bool sync=false)
Definition: mkdir.hpp:42
#define CHECK_SOME(expression)
Definition: check.hpp:50
hashmap< std::string, MessageHandler > message
Definition: process.hpp:455
constexpr int O_CLOEXEC
Definition: open.hpp:41
bool isTerminalState(const TaskState &state)
void pause()
Definition: status_update_manager_process.hpp:377
process::Future< Nothing > removed(const std::string &link)
Try< Nothing > close(int fd)
Definition: close.hpp:24
Try< off_t > lseek(int_fd fd, off_t offset, int whence)
Definition: lseek.hpp:25
Represents a POSIX or Windows file system path and offers common path manipulations.
Definition: path.hpp:212
virtual void initialize()
Invoked when a process gets spawned.
Definition: process.hpp:100
void initialize(const lambda::function< void(const UpdateType &)> &_forwardCallback, const lambda::function< const std::string(const IDType &)> &_getPath)
Definition: status_update_manager_process.hpp:135
Option< T > min(const Option< T > &left, const Option< T > &right)
Definition: option.hpp:185
Timer delay(const Duration &duration, const PID< T > &pid, void(T::*method)())
Definition: delay.hpp:31
const T & get() const &
Definition: option.hpp:119
State()
Definition: status_update_manager_process.hpp:596
Definition: protobuf.hpp:108
#define foreachvalue(VALUE, ELEMS)
Definition: foreach.hpp:77
static Try error(const E &e)
Definition: try.hpp:43
bool terminated
Definition: status_update_manager_process.hpp:101
Definition: status_update_manager_process.hpp:96
Definition: attributes.hpp:24
bool isError() const
Definition: try.hpp:78
std::list< UpdateType > updates
Definition: status_update_manager_process.hpp:100
static Try< UUID > fromBytes(const std::string &s)
Definition: uuid.hpp:49
bool terminated
Definition: status_update_manager_process.hpp:594
std::string error(const std::string &msg, uint32_t code)
Definition: timeout.hpp:24
T & get()&
Definition: result.hpp:116
Definition: executor.hpp:48
Type
Definition: capabilities.hpp:82
#define foreachkey(KEY, ELEMS)
Definition: foreach.hpp:74
Protocol< WriteRequest, WriteResponse > write
Try< uint32_t > type(const std::string &path)
bool isSome() const
Definition: result.hpp:112
bool isError() const
Definition: result.hpp:114
unsigned int errors
Definition: status_update_manager_process.hpp:108
std::string toString() const
Definition: uuid.hpp:87
Try< Nothing > create(const std::string &hierarchy, const std::string &cgroup, bool recursive=false)
process::Future< bool > acknowledgement(const IDType &streamId, const id::UUID &uuid)
Definition: status_update_manager_process.hpp:248
Definition: status_update_manager_process.hpp:98
State()
Definition: status_update_manager_process.hpp:110
std::string stringify(int flags)
void resume()
Definition: status_update_manager_process.hpp:383
const mode_t S_IROTH
Definition: windows.hpp:321
std::list< UpdateType > updates
Definition: status_update_manager_process.hpp:591
T copy(const T &t)
Definition: utils.hpp:21
hashmap< IDType, Option< StreamState > > streams
Definition: status_update_manager_process.hpp:107
Try< Nothing > ftruncate(int fd, off_t length)
Definition: ftruncate.hpp:26