Apache Mesos
task_status_update_manager.hpp
Go to the documentation of this file.
1 // Licensed to the Apache Software Foundation (ASF) under one
2 // or more contributor license agreements. See the NOTICE file
3 // distributed with this work for additional information
4 // regarding copyright ownership. The ASF licenses this file
5 // to you under the Apache License, Version 2.0 (the
6 // "License"); you may not use this file except in compliance
7 // with the License. You may obtain a copy of the License at
8 //
9 // http://www.apache.org/licenses/LICENSE-2.0
10 //
11 // Unless required by applicable law or agreed to in writing, software
12 // distributed under the License is distributed on an "AS IS" BASIS,
13 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 // See the License for the specific language governing permissions and
15 // limitations under the License.
16 
17 #ifndef __TASK_STATUS_UPDATE_MANAGER_HPP__
18 #define __TASK_STATUS_UPDATE_MANAGER_HPP__
19 
20 #include <queue>
21 #include <string>
22 
23 #include <mesos/mesos.hpp>
24 
25 #include <process/future.hpp>
26 #include <process/pid.hpp>
27 #include <process/timeout.hpp>
28 
29 #include <stout/hashset.hpp>
30 #include <stout/lambda.hpp>
31 #include <stout/option.hpp>
32 #include <stout/try.hpp>
33 #include <stout/uuid.hpp>
34 
35 #include "messages/messages.hpp"
36 
37 #include "slave/flags.hpp"
38 
39 namespace mesos {
40 namespace internal {
41 namespace slave {
42 
43 // Forward declarations.
44 
45 namespace state {
46 struct SlaveState;
47 }
48 
49 class TaskStatusUpdateManagerProcess;
50 struct TaskStatusUpdateStream;
51 
52 
53 // TaskStatusUpdateManager is responsible for
54 // 1) Reliably sending status updates to the master.
55 // 2) Checkpointing the update to disk (optional).
56 // 3) Sending ACKs to the executor (optional).
57 // 4) Receiving ACKs from the scheduler.
59 {
60 public:
62  virtual ~TaskStatusUpdateManager();
63 
64  // Expects a callback 'forward' which gets called whenever there is
65  // a new status update that needs to be forwarded to the master.
66  void initialize(const lambda::function<void(StatusUpdate)>& forward);
67 
68  // TODO(vinod): Come up with better names/signatures for the
69  // checkpointing and non-checkpointing 'update()' functions.
70  // Currently, it is not obvious that one version of 'update()'
71  // does checkpointing while the other doesn't.
72 
73  // Checkpoints the status update and reliably sends the
74  // update to the master (and hence the scheduler).
75  // @return Whether the update is handled successfully
76  // (e.g. checkpointed).
78  const StatusUpdate& update,
79  const SlaveID& slaveId,
80  const ExecutorID& executorId,
81  const ContainerID& containerId);
82 
83  // Retries the update to the master (as long as the slave is
84  // alive), but does not checkpoint the update.
85  // @return Whether the update is handled successfully.
87  const StatusUpdate& update,
88  const SlaveID& slaveId);
89 
90  // Checkpoints the status update to disk if necessary.
91  // Also, sends the next pending status update, if any.
92  // @return True if the ACK is handled successfully (e.g., checkpointed)
93  // and the task's status update stream is not terminated.
94  // False same as above except the status update stream is terminated.
95  // Failed if there are any errors (e.g., duplicate, checkpointing).
97  const TaskID& taskId,
98  const FrameworkID& frameworkId,
99  const id::UUID& uuid);
100 
101  // Recover status updates.
103  const std::string& rootDir,
104  const Option<state::SlaveState>& state);
105 
106 
107  // Pause sending updates.
108  // This is useful when the slave is disconnected because a
109  // disconnected slave will drop the updates.
110  void pause();
111 
112  // Unpause and resend all the pending updates right away.
113  // This is useful when the updates were pending because there was
114  // no master elected (e.g., during recovery) or framework failed over.
115  void resume();
116 
117  // Closes all the status update streams corresponding to this framework.
118  // NOTE: This stops retrying any pending status updates for this framework.
119  void cleanup(const FrameworkID& frameworkId);
120 
121 private:
122  TaskStatusUpdateManagerProcess* process;
123 };
124 
125 
126 // TaskStatusUpdateStream handles the status updates and acknowledgements
127 // of a task, checkpointing them if necessary. It also holds the information
128 // about received, acknowledged and pending status updates.
129 // NOTE: A task is expected to have a globally unique ID across the lifetime
130 // of a framework. In other words the tuple (taskId, frameworkId) should be
131 // always unique.
133 {
134  TaskStatusUpdateStream(const TaskID& _taskId,
135  const FrameworkID& _frameworkId,
136  const SlaveID& _slaveId,
137  const Flags& _flags,
138  bool _checkpoint,
139  const Option<ExecutorID>& executorId,
140  const Option<ContainerID>& containerId);
141 
143 
144  // This function handles the update, checkpointing if necessary.
145  // @return True if the update is successfully handled.
146  // False if the update is a duplicate.
147  // Error Any errors (e.g., checkpointing).
148  Try<bool> update(const StatusUpdate& update);
149 
150  // This function handles the ACK, checkpointing if necessary.
151  // @return True if the acknowledgement is successfully handled.
152  // False if the acknowledgement is a duplicate.
153  // Error Any errors (e.g., checkpointing).
155  const TaskID& taskId,
156  const FrameworkID& frameworkId,
157  const id::UUID& uuid,
158  const StatusUpdate& update);
159 
160  // Returns the next update (or none, if empty) in the queue.
162 
163  // Replays the stream by sequentially handling an update and its
164  // corresponding ACK, if present.
166  const std::vector<StatusUpdate>& updates,
167  const hashset<id::UUID>& acks);
168 
169  // TODO(vinod): Explore semantics to make these private.
170  const bool checkpoint;
172  Option<process::Timeout> timeout; // Timeout for resending status update.
173  std::queue<StatusUpdate> pending;
174 
175 private:
176  // Handles the status update and writes it to disk, if necessary.
177  // TODO(vinod): The write has to be asynchronous to avoid status updates that
178  // are being checkpointed, blocking the processing of other updates.
179  // One solution is to wrap the protobuf::write inside async, but its probably
180  // too much of an overhead to spin up a new libprocess per status update?
181  // A better solution might be to be have async write capability for file io.
182  Try<Nothing> handle(
183  const StatusUpdate& update,
185 
186  void _handle(
187  const StatusUpdate& update,
189 
190  const TaskID taskId;
191  const FrameworkID frameworkId;
192  const SlaveID slaveId;
193 
194  const Flags flags;
195 
196  hashset<id::UUID> received;
197  hashset<id::UUID> acknowledged;
198 
199  Option<std::string> path; // File path of the update stream.
200  Option<int_fd> fd; // File descriptor to the update stream.
201 
202  Option<std::string> error; // Potential non-retryable error.
203 };
204 
205 } // namespace slave {
206 } // namespace internal {
207 } // namespace mesos {
208 
209 
210 #endif // __TASK_STATUS_UPDATE_MANAGER_HPP__
process::Future< bool > acknowledgement(const TaskID &taskId, const FrameworkID &frameworkId, const id::UUID &uuid)
Try< bool > acknowledgement(const TaskID &taskId, const FrameworkID &frameworkId, const id::UUID &uuid, const StatusUpdate &update)
Definition: option.hpp:28
Definition: try.hpp:34
const bool checkpoint
Definition: task_status_update_manager.hpp:170
process::Future< Nothing > update(const StatusUpdate &update, const SlaveID &slaveId, const ExecutorID &executorId, const ContainerID &containerId)
Definition: flags.hpp:39
Definition: result.hpp:40
Definition: task_status_update_manager.hpp:58
Try< bool > update(const StatusUpdate &update)
Definition: uuid.hpp:35
Option< process::Timeout > timeout
Definition: task_status_update_manager.hpp:172
Definition: task_status_update_manager.hpp:132
process::Future< Nothing > recover(const std::string &rootDir, const Option< state::SlaveState > &state)
void cleanup(const FrameworkID &frameworkId)
#define flags
Definition: decoder.hpp:18
Type
Definition: capabilities.hpp:79
TaskStatusUpdateStream(const TaskID &_taskId, const FrameworkID &_frameworkId, const SlaveID &_slaveId, const Flags &_flags, bool _checkpoint, const Option< ExecutorID > &executorId, const Option< ContainerID > &containerId)
Try< uint32_t > type(const std::string &path)
std::queue< StatusUpdate > pending
Definition: task_status_update_manager.hpp:173
bool terminated
Definition: task_status_update_manager.hpp:171
void initialize(const lambda::function< void(StatusUpdate)> &forward)
Try< Nothing > replay(const std::vector< StatusUpdate > &updates, const hashset< id::UUID > &acks)