Apache Mesos
state.hpp
Go to the documentation of this file.
1 // Licensed to the Apache Software Foundation (ASF) under one
2 // or more contributor license agreements. See the NOTICE file
3 // distributed with this work for additional information
4 // regarding copyright ownership. The ASF licenses this file
5 // to you under the Apache License, Version 2.0 (the
6 // "License"); you may not use this file except in compliance
7 // with the License. You may obtain a copy of the License at
8 //
9 // http://www.apache.org/licenses/LICENSE-2.0
10 //
11 // Unless required by applicable law or agreed to in writing, software
12 // distributed under the License is distributed on an "AS IS" BASIS,
13 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 // See the License for the specific language governing permissions and
15 // limitations under the License.
16 
17 #ifndef __SLAVE_STATE_HPP__
18 #define __SLAVE_STATE_HPP__
19 
20 #ifndef __WINDOWS__
21 #include <unistd.h>
22 #endif // __WINDOWS__
23 
24 #include <vector>
25 
26 #include <mesos/resources.hpp>
27 #include <mesos/type_utils.hpp>
28 
29 #include <process/pid.hpp>
30 
31 #include <stout/hashmap.hpp>
32 #include <stout/hashset.hpp>
33 #include <stout/path.hpp>
34 #include <stout/protobuf.hpp>
35 #include <stout/try.hpp>
36 #include <stout/utils.hpp>
37 #include <stout/uuid.hpp>
38 
39 #include <stout/os/mkdir.hpp>
40 #include <stout/os/mktemp.hpp>
41 #include <stout/os/rename.hpp>
42 #include <stout/os/rm.hpp>
43 #include <stout/os/write.hpp>
44 
46 
47 #include "messages/messages.hpp"
48 
49 namespace mesos {
50 namespace internal {
51 namespace slave {
52 namespace state {
53 
54 // Forward declarations.
55 struct State;
56 struct SlaveState;
57 struct ResourcesState;
58 struct FrameworkState;
59 struct ExecutorState;
60 struct RunState;
61 struct TaskState;
62 
63 
64 // This function performs recovery from the state stored at 'rootDir'.
65 // If the 'strict' flag is set, any errors encountered while
66 // recovering a state are considered fatal and hence the recovery is
67 // short-circuited and returns an error. There might be orphaned
68 // executors that need to be manually cleaned up. If the 'strict' flag
69 // is not set, any errors encountered are considered non-fatal and the
70 // recovery continues by recovering as much of the state as possible,
71 // while increasing the 'errors' count. Note that 'errors' on a struct
72 // includes the 'errors' encountered recursively. In other words,
73 // 'State.errors' is the sum total of all recovery errors.
74 Try<State> recover(const std::string& rootDir, bool strict);
75 
76 
77 // Reads the protobuf message(s) from the given path.
78 // `T` may be either a single protobuf message or a sequence of messages
79 // if `T` is a specialization of `google::protobuf::RepeatedPtrField`.
80 template <typename T>
81 Result<T> read(const std::string& path)
82 {
83  Result<T> result = ::protobuf::read<T>(path);
84  if (result.isSome()) {
85  upgradeResources(&result.get());
86  }
87 
88  return result;
89 }
90 
91 
92 // While we return a `Result<string>` here in order to keep the return
93 // type of `state::read` consistent, the `None` case does not arise here.
94 // That is, an empty file will result in an empty string, rather than
95 // the `Result` ending up in a `None` state.
96 template <>
97 inline Result<std::string> read<std::string>(const std::string& path)
98 {
99  return os::read(path);
100 }
101 
102 
103 template <>
104 inline Result<Resources> read<Resources>(const std::string& path)
105 {
107  read<google::protobuf::RepeatedPtrField<Resource>>(path);
108 
109  if (resources.isError()) {
110  return Error(resources.error());
111  }
112 
113  if (resources.isNone()) {
114  return None();
115  }
116 
117  return std::move(resources.get());
118 }
119 
120 
121 namespace internal {
122 
124  const std::string& path,
125  const std::string& message,
126  bool sync)
127 {
128  return ::os::write(path, message, sync);
129 }
130 
131 
132 template <
133  typename T,
134  typename std::enable_if<
135  std::is_convertible<T*, google::protobuf::Message*>::value,
136  int>::type = 0>
137 inline Try<Nothing> checkpoint(const std::string& path, T message, bool sync)
138 {
139  // If the `Try` from `downgradeResources` returns an `Error`, we currently
140  // continue to checkpoint the resources in a partially downgraded state.
141  // This implies that an agent with refined reservations cannot be downgraded
142  // to versions before reservation refinement support, which was introduced
143  // in 1.4.0.
144  //
145  // TODO(mpark): Do something smarter with the result once
146  // something like an agent recovery capability is introduced.
147  downgradeResources(&message);
148  return ::protobuf::write(path, message, sync);
149 }
150 
151 
153  const std::string& path,
154  google::protobuf::RepeatedPtrField<Resource> resources,
155  bool sync)
156 {
157  // If the `Try` from `downgradeResources` returns an `Error`, we currently
158  // continue to checkpoint the resources in a partially downgraded state.
159  // This implies that an agent with refined reservations cannot be downgraded
160  // to versions before reservation refinement support, which was introduced
161  // in 1.4.0.
162  //
163  // TODO(mpark): Do something smarter with the result once
164  // something like an agent recovery capability is introduced.
165  downgradeResources(&resources);
166  return ::protobuf::write(path, resources, sync);
167 }
168 
169 
171  const std::string& path,
172  const Resources& resources,
173  bool sync)
174 {
175  const google::protobuf::RepeatedPtrField<Resource>& messages = resources;
176  return checkpoint(path, messages, sync);
177 }
178 
179 } // namespace internal {
180 
181 
182 // Thin wrapper to checkpoint data to disk and perform the necessary
183 // error checking. It checkpoints an instance of T at the given path.
184 // We can checkpoint anything as long as T is supported by
185 // internal::checkpoint. Currently the list of supported Ts are:
186 // - std::string
187 // - google::protobuf::Message
188 // - google::protobuf::RepeatedPtrField<T>
189 // - mesos::Resources
190 //
191 // NOTE: We provide atomic (all-or-nothing) semantics here by always
192 // writing to a temporary file first then using os::rename to atomically
193 // move it to the desired path. If `sync` is set to true, this call succeeds
194 // only if `fsync` is supported and successfully commits the changes to the
195 // filesystem for the checkpoint file and each created directory.
196 //
197 // TODO(chhsiao): Consider enabling syncing by default after evaluating its
198 // performance impact.
199 template <typename T>
200 Try<Nothing> checkpoint(const std::string& path, const T& t, bool sync = false)
201 {
202  // Create the base directory.
203  std::string base = Path(path).dirname();
204 
205  Try<Nothing> mkdir = os::mkdir(base, true, sync);
206  if (mkdir.isError()) {
207  return Error("Failed to create directory '" + base + "': " + mkdir.error());
208  }
209 
210  // NOTE: We create the temporary file at 'base/XXXXXX' to make sure
211  // rename below does not cross devices (MESOS-2319).
212  //
213  // TODO(jieyu): It's possible that the temporary file becomes
214  // dangling if slave crashes or restarts while checkpointing.
215  // Consider adding a way to garbage collect them.
216  Try<std::string> temp = os::mktemp(path::join(base, "XXXXXX"));
217  if (temp.isError()) {
218  return Error("Failed to create temporary file: " + temp.error());
219  }
220 
221  // Now checkpoint the instance of T to the temporary file.
222  Try<Nothing> checkpoint = internal::checkpoint(temp.get(), t, sync);
223  if (checkpoint.isError()) {
224  // Try removing the temporary file on error.
225  os::rm(temp.get());
226 
227  return Error("Failed to write temporary file '" + temp.get() +
228  "': " + checkpoint.error());
229  }
230 
231  // Rename the temporary file to the path.
232  Try<Nothing> rename = os::rename(temp.get(), path, sync);
233  if (rename.isError()) {
234  // Try removing the temporary file on error.
235  os::rm(temp.get());
236 
237  return Error("Failed to rename '" + temp.get() + "' to '" +
238  path + "': " + rename.error());
239  }
240 
241  return Nothing();
242 }
243 
244 
245 // NOTE: The *State structs (e.g., TaskState, RunState, etc) are
246 // defined in reverse dependency order because many of them have
247 // Option<*State> dependencies which means we need them declared in
248 // their entirety in order to compile because things like
249 // Option<*State> need to know the final size of the types.
250 
251 struct TaskState
252 {
253  TaskState() : errors(0) {}
254 
255  static Try<TaskState> recover(
256  const std::string& rootDir,
257  const SlaveID& slaveId,
258  const FrameworkID& frameworkId,
259  const ExecutorID& executorId,
260  const ContainerID& containerId,
261  const TaskID& taskId,
262  bool strict);
263 
264  TaskID id;
266  std::vector<StatusUpdate> updates;
268  unsigned int errors;
269 };
270 
271 
272 struct RunState
273 {
274  RunState() : completed(false), errors(0) {}
275 
276  static Try<RunState> recover(
277  const std::string& rootDir,
278  const SlaveID& slaveId,
279  const FrameworkID& frameworkId,
280  const ExecutorID& executorId,
281  const ContainerID& containerId,
282  bool strict);
283 
288 
289  // This represents if the executor is connected via HTTP. It can be None()
290  // when the connection type is unknown.
292 
293  // Executor terminated and all its updates acknowledged.
294  bool completed;
295 
296  unsigned int errors;
297 };
298 
299 
301 {
302  ExecutorState() : errors(0) {}
303 
305  const std::string& rootDir,
306  const SlaveID& slaveId,
307  const FrameworkID& frameworkId,
308  const ExecutorID& executorId,
309  bool strict);
310 
311  ExecutorID id;
315  unsigned int errors;
316 };
317 
318 
320 {
321  FrameworkState() : errors(0) {}
322 
324  const std::string& rootDir,
325  const SlaveID& slaveId,
326  const FrameworkID& frameworkId,
327  bool strict);
328 
329  FrameworkID id;
331 
332  // Note that HTTP frameworks (supported in 0.24.0) do not have a
333  // PID, in which case 'pid' is Some(UPID()) rather than None().
335 
337  unsigned int errors;
338 };
339 
340 
342 {
343  ResourcesState() : errors(0) {}
344 
346  const std::string& rootDir,
347  bool strict);
348 
351  unsigned int errors;
352 };
353 
354 
356 {
357  SlaveState() : errors(0) {}
358 
359  static Try<SlaveState> recover(
360  const std::string& rootDir,
361  const SlaveID& slaveId,
362  bool strict);
363 
364  SlaveID id;
367  unsigned int errors;
368 };
369 
370 
371 // The top level state. The members are child nodes in the tree. Each
372 // child node (recursively) recovers the checkpointed state.
373 struct State
374 {
375  State() : errors(0) {}
376 
379  bool rebooted = false;
380 
381  // TODO(jieyu): Consider using a vector of Option<Error> here so
382  // that we can print all the errors. This also applies to all the
383  // State structs above.
384  unsigned int errors;
385 };
386 
387 } // namespace state {
388 } // namespace slave {
389 } // namespace internal {
390 } // namespace mesos {
391 
392 #endif // __SLAVE_STATE_HPP__
FrameworkState()
Definition: state.hpp:321
Definition: path.hpp:26
unsigned int errors
Definition: state.hpp:296
Try< Nothing > downgradeResources(google::protobuf::RepeatedPtrField< Resource > *resources)
bool isNone() const
Definition: result.hpp:112
hashmap< FrameworkID, FrameworkState > frameworks
Definition: state.hpp:366
Definition: nothing.hpp:16
Definition: errorbase.hpp:36
Try< Nothing > rm(const std::string &path)
Definition: rm.hpp:26
Option< ResourcesState > resources
Definition: state.hpp:377
Try< Nothing > checkpoint(const std::string &path, const std::string &message, bool sync)
Definition: state.hpp:123
Option< process::UPID > libprocessPid
Definition: state.hpp:287
Definition: check.hpp:33
static Result< T > error(const std::string &message)
Definition: result.hpp:53
unsigned int errors
Definition: state.hpp:351
Option< FrameworkInfo > info
Definition: state.hpp:330
Option< Resources > target
Definition: state.hpp:350
hashmap< TaskID, TaskState > tasks
Definition: state.hpp:285
Try< std::string > mktemp(const std::string &path=path::join(os::temp(),"XXXXXX"))
Definition: mktemp.hpp:36
Result< Resources > read< Resources >(const std::string &path)
Definition: state.hpp:104
Option< ContainerID > id
Definition: state.hpp:284
unsigned int errors
Definition: state.hpp:268
Definition: resources.hpp:81
hashset< id::UUID > acks
Definition: state.hpp:267
event_base * base
std::vector< StatusUpdate > updates
Definition: state.hpp:266
std::string join(const std::string &path1, const std::string &path2, const char _separator=os::PATH_SEPARATOR)
Definition: path.hpp:113
Option< ContainerID > latest
Definition: state.hpp:313
Definition: check.hpp:30
Option< bool > http
Definition: state.hpp:291
void upgradeResources(google::protobuf::RepeatedPtrField< Resource > *resources)
Definition: hashmap.hpp:38
Try< Nothing > mkdir(const std::string &directory, bool recursive=true, bool sync=false)
Definition: mkdir.hpp:42
ExecutorID id
Definition: state.hpp:311
Resources resources
Definition: state.hpp:349
Represents a POSIX or Windows file system path and offers common path manipulations.
Definition: path.hpp:202
RunState()
Definition: state.hpp:274
Definition: spec.hpp:26
hashmap< ContainerID, RunState > runs
Definition: state.hpp:314
ExecutorState()
Definition: state.hpp:302
Option< pid_t > forkedPid
Definition: state.hpp:286
Option< ExecutorInfo > info
Definition: state.hpp:312
const T & get() const
Definition: result.hpp:115
unsigned int errors
Definition: state.hpp:384
State()
Definition: state.hpp:375
static Try error(const E &e)
Definition: try.hpp:42
Try< State > recover(const std::string &rootDir, bool strict)
unsigned int errors
Definition: state.hpp:337
Option< Task > info
Definition: state.hpp:265
std::string dirname() const
Extracts the component up to, but not including, the final &#39;/&#39;.
Definition: path.hpp:298
FrameworkID id
Definition: state.hpp:329
Result< T > read(const std::string &path)
Definition: state.hpp:81
Try< Nothing > rename(const std::string &from, const std::string &to, bool sync=false)
Definition: rename.hpp:40
Result< std::string > read(int_fd fd, size_t size)
Definition: read.hpp:55
Definition: none.hpp:27
Definition: attributes.hpp:24
bool isError() const
Definition: try.hpp:71
ResourcesState()
Definition: state.hpp:343
TaskState()
Definition: state.hpp:253
SlaveID id
Definition: state.hpp:364
Protocol< WriteRequest, WriteResponse > write
unsigned int errors
Definition: state.hpp:367
Try< uint32_t > type(const std::string &path)
std::string temp()
Definition: temp.hpp:27
bool isSome() const
Definition: result.hpp:111
bool isError() const
Definition: result.hpp:113
Option< SlaveState > slave
Definition: state.hpp:378
unsigned int errors
Definition: state.hpp:315
SlaveState()
Definition: state.hpp:357
bool completed
Definition: state.hpp:294
Option< process::UPID > pid
Definition: state.hpp:334
Option< SlaveInfo > info
Definition: state.hpp:365
TaskID id
Definition: state.hpp:264
hashmap< ExecutorID, ExecutorState > executors
Definition: state.hpp:336
Definition: state.hpp:373