Apache Mesos
state.hpp
Go to the documentation of this file.
1 // Licensed to the Apache Software Foundation (ASF) under one
2 // or more contributor license agreements. See the NOTICE file
3 // distributed with this work for additional information
4 // regarding copyright ownership. The ASF licenses this file
5 // to you under the Apache License, Version 2.0 (the
6 // "License"); you may not use this file except in compliance
7 // with the License. You may obtain a copy of the License at
8 //
9 // http://www.apache.org/licenses/LICENSE-2.0
10 //
11 // Unless required by applicable law or agreed to in writing, software
12 // distributed under the License is distributed on an "AS IS" BASIS,
13 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 // See the License for the specific language governing permissions and
15 // limitations under the License.
16 
17 #ifndef __SLAVE_STATE_HPP__
18 #define __SLAVE_STATE_HPP__
19 
20 #ifndef __WINDOWS__
21 #include <unistd.h>
22 #endif // __WINDOWS__
23 
24 #include <vector>
25 
26 #include <mesos/resources.hpp>
27 #include <mesos/type_utils.hpp>
28 
29 #include <process/pid.hpp>
30 
31 #include <stout/hashmap.hpp>
32 #include <stout/hashset.hpp>
33 #include <stout/path.hpp>
34 #include <stout/protobuf.hpp>
35 #include <stout/try.hpp>
36 #include <stout/utils.hpp>
37 #include <stout/uuid.hpp>
38 
39 #include <stout/os/mkdir.hpp>
40 #include <stout/os/mktemp.hpp>
41 #include <stout/os/rename.hpp>
42 #include <stout/os/rm.hpp>
43 #include <stout/os/write.hpp>
44 
46 
47 #include "messages/messages.hpp"
48 
49 namespace mesos {
50 namespace internal {
51 namespace slave {
52 namespace state {
53 
54 // Forward declarations.
55 struct State;
56 struct SlaveState;
57 struct ResourcesState;
58 struct FrameworkState;
59 struct ExecutorState;
60 struct RunState;
61 struct TaskState;
62 
63 
64 // This function performs recovery from the state stored at 'rootDir'.
65 // If the 'strict' flag is set, any errors encountered while
66 // recovering a state are considered fatal and hence the recovery is
67 // short-circuited and returns an error. There might be orphaned
68 // executors that need to be manually cleaned up. If the 'strict' flag
69 // is not set, any errors encountered are considered non-fatal and the
70 // recovery continues by recovering as much of the state as possible,
71 // while increasing the 'errors' count. Note that 'errors' on a struct
72 // includes the 'errors' encountered recursively. In other words,
73 // 'State.errors' is the sum total of all recovery errors.
74 Try<State> recover(const std::string& rootDir, bool strict);
75 
76 
77 // Reads the protobuf message(s) from the given path.
78 // `T` may be either a single protobuf message or a sequence of messages
79 // if `T` is a specialization of `google::protobuf::RepeatedPtrField`.
80 template <typename T>
81 Result<T> read(const std::string& path)
82 {
83  Result<T> result = ::protobuf::read<T>(path);
84  if (result.isSome()) {
85  upgradeResources(&result.get());
86  }
87 
88  return result;
89 }
90 
91 
92 // While we return a `Result<string>` here in order to keep the return
93 // type of `state::read` consistent, the `None` case does not arise here.
94 // That is, an empty file will result in an empty string, rather than
95 // the `Result` ending up in a `None` state.
96 template <>
97 inline Result<std::string> read<std::string>(const std::string& path)
98 {
99  return os::read(path);
100 }
101 
102 
103 template <>
104 inline Result<Resources> read<Resources>(const std::string& path)
105 {
107  read<google::protobuf::RepeatedPtrField<Resource>>(path);
108 
109  if (resources.isError()) {
110  return Error(resources.error());
111  }
112 
113  if (resources.isNone()) {
114  return None();
115  }
116 
117  return std::move(resources.get());
118 }
119 
120 
121 namespace internal {
122 
124  const std::string& path,
125  const std::string& message,
126  bool sync,
127  bool downgradeResources)
128 {
129  return ::os::write(path, message, sync);
130 }
131 
132 
133 template <
134  typename T,
135  typename std::enable_if<
136  std::is_convertible<T*, google::protobuf::Message*>::value,
137  int>::type = 0>
139  const std::string& path,
140  T message,
141  bool sync,
142  bool downgrade)
143 {
144  if (downgrade) {
145  // If the `Try` from `downgradeResources` returns an `Error`, we currently
146  // continue to checkpoint the resources in a partially downgraded state.
147  // This implies that an agent with refined reservations cannot be downgraded
148  // to versions before reservation refinement support, which was introduced
149  // in 1.4.0.
150  //
151  // TODO(mpark): Do something smarter with the result once
152  // something like an agent recovery capability is introduced.
153  downgradeResources(&message);
154  }
155 
156  return ::protobuf::write(path, message, sync);
157 }
158 
159 
161  const std::string& path,
162  google::protobuf::RepeatedPtrField<Resource> resources,
163  bool sync,
164  bool downgrade)
165 {
166  if (downgrade) {
167  // If the `Try` from `downgradeResources` returns an `Error`, we currently
168  // continue to checkpoint the resources in a partially downgraded state.
169  // This implies that an agent with refined reservations cannot be downgraded
170  // to versions before reservation refinement support, which was introduced
171  // in 1.4.0.
172  //
173  // TODO(mpark): Do something smarter with the result once
174  // something like an agent recovery capability is introduced.
175  downgradeResources(&resources);
176  }
177 
178  return ::protobuf::write(path, resources, sync);
179 }
180 
181 
183  const std::string& path,
184  const Resources& resources,
185  bool sync,
186  bool downgrade)
187 {
188  const google::protobuf::RepeatedPtrField<Resource>& messages = resources;
189  return checkpoint(path, messages, sync, downgrade);
190 }
191 
192 } // namespace internal {
193 
194 
195 // Thin wrapper to checkpoint data to disk and perform the necessary
196 // error checking. It checkpoints an instance of T at the given path.
197 // We can checkpoint anything as long as T is supported by
198 // internal::checkpoint. Currently the list of supported Ts are:
199 // - std::string
200 // - google::protobuf::Message
201 // - google::protobuf::RepeatedPtrField<T>
202 // - mesos::Resources
203 //
204 // NOTE: We provide atomic (all-or-nothing) semantics here by always
205 // writing to a temporary file first then using os::rename to atomically
206 // move it to the desired path. If `sync` is set to true, this call succeeds
207 // only if `fsync` is supported and successfully commits the changes to the
208 // filesystem for the checkpoint file and each created directory.
209 //
210 // TODO(chhsiao): Consider enabling syncing by default after evaluating its
211 // performance impact.
212 template <typename T>
214  const std::string& path,
215  const T& t,
216  bool sync = false,
217  bool downgrade = true)
218 {
219  // Create the base directory.
220  std::string base = Path(path).dirname();
221 
222  Try<Nothing> mkdir = os::mkdir(base, true, sync);
223  if (mkdir.isError()) {
224  return Error("Failed to create directory '" + base + "': " + mkdir.error());
225  }
226 
227  // NOTE: We create the temporary file at 'base/XXXXXX' to make sure
228  // rename below does not cross devices (MESOS-2319).
229  //
230  // TODO(jieyu): It's possible that the temporary file becomes
231  // dangling if slave crashes or restarts while checkpointing.
232  // Consider adding a way to garbage collect them.
233  Try<std::string> temp = os::mktemp(path::join(base, "XXXXXX"));
234  if (temp.isError()) {
235  return Error("Failed to create temporary file: " + temp.error());
236  }
237 
238  // Now checkpoint the instance of T to the temporary file.
240  internal::checkpoint(temp.get(), t, sync, downgrade);
241  if (checkpoint.isError()) {
242  // Try removing the temporary file on error.
243  os::rm(temp.get());
244 
245  return Error("Failed to write temporary file '" + temp.get() +
246  "': " + checkpoint.error());
247  }
248 
249  // Rename the temporary file to the path.
250  Try<Nothing> rename = os::rename(temp.get(), path, sync);
251  if (rename.isError()) {
252  // Try removing the temporary file on error.
253  os::rm(temp.get());
254 
255  return Error("Failed to rename '" + temp.get() + "' to '" +
256  path + "': " + rename.error());
257  }
258 
259  return Nothing();
260 }
261 
262 
263 // NOTE: The *State structs (e.g., TaskState, RunState, etc) are
264 // defined in reverse dependency order because many of them have
265 // Option<*State> dependencies which means we need them declared in
266 // their entirety in order to compile because things like
267 // Option<*State> need to know the final size of the types.
268 
269 struct TaskState
270 {
271  TaskState() : errors(0) {}
272 
273  static Try<TaskState> recover(
274  const std::string& rootDir,
275  const SlaveID& slaveId,
276  const FrameworkID& frameworkId,
277  const ExecutorID& executorId,
278  const ContainerID& containerId,
279  const TaskID& taskId,
280  bool strict);
281 
282  TaskID id;
284  std::vector<StatusUpdate> updates;
286  unsigned int errors;
287 };
288 
289 
290 struct RunState
291 {
292  RunState() : completed(false), errors(0) {}
293 
294  static Try<RunState> recover(
295  const std::string& rootDir,
296  const SlaveID& slaveId,
297  const FrameworkID& frameworkId,
298  const ExecutorID& executorId,
299  const ContainerID& containerId,
300  bool strict,
301  bool rebooted);
302 
307 
308  // This represents if the executor is connected via HTTP. It can be None()
309  // when the connection type is unknown.
311 
312  // Executor terminated and all its updates acknowledged.
313  bool completed;
314 
315  unsigned int errors;
316 };
317 
318 
320 {
321  ExecutorState() : errors(0) {}
322 
324  const std::string& rootDir,
325  const SlaveID& slaveId,
326  const FrameworkID& frameworkId,
327  const ExecutorID& executorId,
328  bool strict,
329  bool rebooted);
330 
331  ExecutorID id;
335  unsigned int errors;
336 };
337 
338 
340 {
341  FrameworkState() : errors(0) {}
342 
344  const std::string& rootDir,
345  const SlaveID& slaveId,
346  const FrameworkID& frameworkId,
347  bool strict,
348  bool rebooted);
349 
350  FrameworkID id;
352 
353  // Note that HTTP frameworks (supported in 0.24.0) do not have a
354  // PID, in which case 'pid' is Some(UPID()) rather than None().
356 
358  unsigned int errors;
359 };
360 
361 
363 {
364  ResourcesState() : errors(0) {}
365 
367  const std::string& rootDir,
368  bool strict);
369 
372  unsigned int errors;
373 };
374 
375 
377 {
378  SlaveState() : errors(0) {}
379 
380  static Try<SlaveState> recover(
381  const std::string& rootDir,
382  const SlaveID& slaveId,
383  bool strict,
384  bool rebooted);
385 
386  SlaveID id;
389 
390  // `operations` will be `None()` if the agent that checkpointed the
391  // state didn't support checkpointing operations.
393 
394  // The drain state of the agent, if any.
396 
397  unsigned int errors;
398 };
399 
400 
401 // The top level state. The members are child nodes in the tree. Each
402 // child node (recursively) recovers the checkpointed state.
403 struct State
404 {
405  State() : errors(0) {}
406 
409  bool rebooted = false;
410 
411  // TODO(jieyu): Consider using a vector of Option<Error> here so
412  // that we can print all the errors. This also applies to all the
413  // State structs above.
414  unsigned int errors;
415 };
416 
417 } // namespace state {
418 } // namespace slave {
419 } // namespace internal {
420 } // namespace mesos {
421 
422 #endif // __SLAVE_STATE_HPP__
FrameworkState()
Definition: state.hpp:341
Definition: path.hpp:29
unsigned int errors
Definition: state.hpp:315
Try< Nothing > downgradeResources(google::protobuf::RepeatedPtrField< Resource > *resources)
Try< Nothing > checkpoint(const std::string &path, const std::string &message, bool sync, bool downgradeResources)
Definition: state.hpp:123
bool isNone() const
Definition: result.hpp:113
hashmap< FrameworkID, FrameworkState > frameworks
Definition: state.hpp:388
Definition: nothing.hpp:16
Definition: errorbase.hpp:36
Option< DrainConfig > drainConfig
Definition: state.hpp:395
Try< Nothing > rm(const std::string &path)
Definition: rm.hpp:26
Option< ResourcesState > resources
Definition: state.hpp:407
Option< process::UPID > libprocessPid
Definition: state.hpp:306
Definition: check.hpp:33
static Result< T > error(const std::string &message)
Definition: result.hpp:54
unsigned int errors
Definition: state.hpp:372
Option< FrameworkInfo > info
Definition: state.hpp:351
Option< Resources > target
Definition: state.hpp:371
hashmap< TaskID, TaskState > tasks
Definition: state.hpp:304
Try< std::string > mktemp(const std::string &path=path::join(os::temp(),"XXXXXX"))
Definition: mktemp.hpp:36
Result< Resources > read< Resources >(const std::string &path)
Definition: state.hpp:104
Option< ContainerID > id
Definition: state.hpp:303
unsigned int errors
Definition: state.hpp:286
Definition: resources.hpp:83
hashset< id::UUID > acks
Definition: state.hpp:285
event_base * base
Option< std::vector< Operation > > operations
Definition: state.hpp:392
std::vector< StatusUpdate > updates
Definition: state.hpp:284
std::string join(const std::string &path1, const std::string &path2, const char _separator=os::PATH_SEPARATOR)
Definition: path.hpp:116
Option< ContainerID > latest
Definition: state.hpp:333
Definition: check.hpp:30
Option< bool > http
Definition: state.hpp:310
void upgradeResources(google::protobuf::RepeatedPtrField< Resource > *resources)
Definition: hashmap.hpp:38
Try< Nothing > mkdir(const std::string &directory, bool recursive=true, bool sync=false)
Definition: mkdir.hpp:42
ExecutorID id
Definition: state.hpp:331
Resources resources
Definition: state.hpp:370
Represents a POSIX or Windows file system path and offers common path manipulations.
Definition: path.hpp:212
RunState()
Definition: state.hpp:292
Definition: agent.hpp:25
hashmap< ContainerID, RunState > runs
Definition: state.hpp:334
ExecutorState()
Definition: state.hpp:321
Option< pid_t > forkedPid
Definition: state.hpp:305
Option< ExecutorInfo > info
Definition: state.hpp:332
unsigned int errors
Definition: state.hpp:414
State()
Definition: state.hpp:405
static Try error(const E &e)
Definition: try.hpp:43
Try< State > recover(const std::string &rootDir, bool strict)
unsigned int errors
Definition: state.hpp:358
Option< Task > info
Definition: state.hpp:283
std::string dirname() const
Extracts the component up to, but not including, the final &#39;/&#39;.
Definition: path.hpp:308
FrameworkID id
Definition: state.hpp:350
Result< T > read(const std::string &path)
Definition: state.hpp:81
Try< Nothing > rename(const std::string &from, const std::string &to, bool sync=false)
Definition: rename.hpp:40
Result< std::string > read(int_fd fd, size_t size)
Definition: read.hpp:55
Definition: none.hpp:27
Definition: attributes.hpp:24
bool isError() const
Definition: try.hpp:78
T & get()&
Definition: result.hpp:116
ResourcesState()
Definition: state.hpp:364
TaskState()
Definition: state.hpp:271
SlaveID id
Definition: state.hpp:386
Protocol< WriteRequest, WriteResponse > write
unsigned int errors
Definition: state.hpp:397
Try< uint32_t > type(const std::string &path)
std::string temp()
Definition: temp.hpp:27
bool isSome() const
Definition: result.hpp:112
bool isError() const
Definition: result.hpp:114
Option< SlaveState > slave
Definition: state.hpp:408
unsigned int errors
Definition: state.hpp:335
SlaveState()
Definition: state.hpp:378
bool completed
Definition: state.hpp:313
Option< process::UPID > pid
Definition: state.hpp:355
Option< SlaveInfo > info
Definition: state.hpp:387
TaskID id
Definition: state.hpp:282
hashmap< ExecutorID, ExecutorState > executors
Definition: state.hpp:357
Definition: state.hpp:403