Apache Mesos
state.hpp
Go to the documentation of this file.
1 // Licensed to the Apache Software Foundation (ASF) under one
2 // or more contributor license agreements. See the NOTICE file
3 // distributed with this work for additional information
4 // regarding copyright ownership. The ASF licenses this file
5 // to you under the Apache License, Version 2.0 (the
6 // "License"); you may not use this file except in compliance
7 // with the License. You may obtain a copy of the License at
8 //
9 // http://www.apache.org/licenses/LICENSE-2.0
10 //
11 // Unless required by applicable law or agreed to in writing, software
12 // distributed under the License is distributed on an "AS IS" BASIS,
13 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 // See the License for the specific language governing permissions and
15 // limitations under the License.
16 
17 #ifndef __SLAVE_STATE_HPP__
18 #define __SLAVE_STATE_HPP__
19 
20 #ifndef __WINDOWS__
21 #include <unistd.h>
22 #endif // __WINDOWS__
23 
24 #include <vector>
25 
26 #include <mesos/resources.hpp>
27 #include <mesos/type_utils.hpp>
28 
29 #include <process/pid.hpp>
30 
31 #include <stout/hashmap.hpp>
32 #include <stout/hashset.hpp>
33 #include <stout/path.hpp>
34 #include <stout/protobuf.hpp>
35 #include <stout/try.hpp>
36 #include <stout/utils.hpp>
37 #include <stout/uuid.hpp>
38 
39 #include <stout/os/mkdir.hpp>
40 #include <stout/os/mktemp.hpp>
41 #include <stout/os/rename.hpp>
42 #include <stout/os/rm.hpp>
43 #include <stout/os/write.hpp>
44 
46 
47 #include "messages/messages.hpp"
48 
49 namespace mesos {
50 namespace internal {
51 namespace slave {
52 namespace state {
53 
54 // Forward declarations.
55 struct State;
56 struct SlaveState;
57 struct ResourcesState;
58 struct FrameworkState;
59 struct ExecutorState;
60 struct RunState;
61 struct TaskState;
62 
63 
64 // This function performs recovery from the state stored at 'rootDir'.
65 // If the 'strict' flag is set, any errors encountered while
66 // recovering a state are considered fatal and hence the recovery is
67 // short-circuited and returns an error. There might be orphaned
68 // executors that need to be manually cleaned up. If the 'strict' flag
69 // is not set, any errors encountered are considered non-fatal and the
70 // recovery continues by recovering as much of the state as possible,
71 // while increasing the 'errors' count. Note that 'errors' on a struct
72 // includes the 'errors' encountered recursively. In other words,
73 // 'State.errors' is the sum total of all recovery errors.
74 Try<State> recover(const std::string& rootDir, bool strict);
75 
76 
77 // Reads the protobuf message(s) from the given path.
78 // `T` may be either a single protobuf message or a sequence of messages
79 // if `T` is a specialization of `google::protobuf::RepeatedPtrField`.
80 template <typename T>
81 Try<T> read(const std::string& path)
82 {
83  Try<T> result = ::protobuf::read<T>(path);
84  if (result.isError()) {
85  return Error(result.error());
86  }
87 
88  upgradeResources(&result.get());
89  return result;
90 }
91 
92 template <>
93 inline Try<std::string> read<std::string>(const std::string& path)
94 {
95  return os::read(path);
96 }
97 
98 
99 template <>
100 inline Try<Resources> read<Resources>(const std::string& path)
101 {
103  read<google::protobuf::RepeatedPtrField<Resource>>(path);
104 
105  if (resources.isError()) {
106  return Error(resources.error());
107  }
108 
109  return std::move(resources.get());
110 }
111 
112 
113 namespace internal {
114 
116  const std::string& path,
117  const std::string& message)
118 {
119  return ::os::write(path, message);
120 }
121 
122 
123 template <
124  typename T,
125  typename std::enable_if<
126  std::is_convertible<T*, google::protobuf::Message*>::value,
127  int>::type = 0>
128 inline Try<Nothing> checkpoint(const std::string& path, T message)
129 {
130  // If the `Try` from `downgradeResources` returns an `Error`, we currently
131  // continue to checkpoint the resources in a partially downgraded state.
132  // This implies that an agent with refined reservations cannot be downgraded
133  // to versions before reservation refinement support, which was introduced
134  // in 1.4.0.
135  //
136  // TODO(mpark): Do something smarter with the result once
137  // something like an agent recovery capability is introduced.
138  downgradeResources(&message);
139  return ::protobuf::write(path, message);
140 }
141 
142 
144  const std::string& path,
145  google::protobuf::RepeatedPtrField<Resource> resources)
146 {
147  // If the `Try` from `downgradeResources` returns an `Error`, we currently
148  // continue to checkpoint the resources in a partially downgraded state.
149  // This implies that an agent with refined reservations cannot be downgraded
150  // to versions before reservation refinement support, which was introduced
151  // in 1.4.0.
152  //
153  // TODO(mpark): Do something smarter with the result once
154  // something like an agent recovery capability is introduced.
155  downgradeResources(&resources);
156  return ::protobuf::write(path, resources);
157 }
158 
159 
161  const std::string& path,
162  const Resources& resources)
163 {
164  const google::protobuf::RepeatedPtrField<Resource>& messages = resources;
165  return checkpoint(path, messages);
166 }
167 
168 } // namespace internal {
169 
170 
171 // Thin wrapper to checkpoint data to disk and perform the necessary
172 // error checking. It checkpoints an instance of T at the given path.
173 // We can checkpoint anything as long as T is supported by
174 // internal::checkpoint. Currently the list of supported Ts are:
175 // - std::string
176 // - google::protobuf::Message
177 // - google::protobuf::RepeatedPtrField<T>
178 // - mesos::Resources
179 //
180 // NOTE: We provide atomic (all-or-nothing) semantics here by always
181 // writing to a temporary file first then using os::rename to atomically
182 // move it to the desired path.
183 template <typename T>
184 Try<Nothing> checkpoint(const std::string& path, const T& t)
185 {
186  // Create the base directory.
187  std::string base = Path(path).dirname();
188 
189  Try<Nothing> mkdir = os::mkdir(base);
190  if (mkdir.isError()) {
191  return Error("Failed to create directory '" + base + "': " + mkdir.error());
192  }
193 
194  // NOTE: We create the temporary file at 'base/XXXXXX' to make sure
195  // rename below does not cross devices (MESOS-2319).
196  //
197  // TODO(jieyu): It's possible that the temporary file becomes
198  // dangling if slave crashes or restarts while checkpointing.
199  // Consider adding a way to garbage collect them.
200  Try<std::string> temp = os::mktemp(path::join(base, "XXXXXX"));
201  if (temp.isError()) {
202  return Error("Failed to create temporary file: " + temp.error());
203  }
204 
205  // Now checkpoint the instance of T to the temporary file.
207  if (checkpoint.isError()) {
208  // Try removing the temporary file on error.
209  os::rm(temp.get());
210 
211  return Error("Failed to write temporary file '" + temp.get() +
212  "': " + checkpoint.error());
213  }
214 
215  // Rename the temporary file to the path.
216  Try<Nothing> rename = os::rename(temp.get(), path);
217  if (rename.isError()) {
218  // Try removing the temporary file on error.
219  os::rm(temp.get());
220 
221  return Error("Failed to rename '" + temp.get() + "' to '" +
222  path + "': " + rename.error());
223  }
224 
225  return Nothing();
226 }
227 
228 
229 // NOTE: The *State structs (e.g., TaskState, RunState, etc) are
230 // defined in reverse dependency order because many of them have
231 // Option<*State> dependencies which means we need them declared in
232 // their entirety in order to compile because things like
233 // Option<*State> need to know the final size of the types.
234 
235 struct TaskState
236 {
237  TaskState() : errors(0) {}
238 
239  static Try<TaskState> recover(
240  const std::string& rootDir,
241  const SlaveID& slaveId,
242  const FrameworkID& frameworkId,
243  const ExecutorID& executorId,
244  const ContainerID& containerId,
245  const TaskID& taskId,
246  bool strict);
247 
248  TaskID id;
250  std::vector<StatusUpdate> updates;
252  unsigned int errors;
253 };
254 
255 
256 struct RunState
257 {
258  RunState() : completed(false), errors(0) {}
259 
260  static Try<RunState> recover(
261  const std::string& rootDir,
262  const SlaveID& slaveId,
263  const FrameworkID& frameworkId,
264  const ExecutorID& executorId,
265  const ContainerID& containerId,
266  bool strict);
267 
272 
273  // This represents if the executor is connected via HTTP. It can be None()
274  // when the connection type is unknown.
276 
277  // Executor terminated and all its updates acknowledged.
278  bool completed;
279 
280  unsigned int errors;
281 };
282 
283 
285 {
287 
289  const std::string& rootDir,
290  const SlaveID& slaveId,
291  const FrameworkID& frameworkId,
292  const ExecutorID& executorId,
293  bool strict);
294 
295  ExecutorID id;
299  unsigned int errors;
300 };
301 
302 
304 {
306 
308  const std::string& rootDir,
309  const SlaveID& slaveId,
310  const FrameworkID& frameworkId,
311  bool strict);
312 
313  FrameworkID id;
315 
316  // Note that HTTP frameworks (supported in 0.24.0) do not have a
317  // PID, in which case 'pid' is Some(UPID()) rather than None().
319 
321  unsigned int errors;
322 };
323 
324 
326 {
328 
330  const std::string& rootDir,
331  bool strict);
332 
335  unsigned int errors;
336 };
337 
338 
340 {
341  SlaveState() : errors(0) {}
342 
343  static Try<SlaveState> recover(
344  const std::string& rootDir,
345  const SlaveID& slaveId,
346  bool strict);
347 
348  SlaveID id;
351  unsigned int errors;
352 };
353 
354 
355 // The top level state. The members are child nodes in the tree. Each
356 // child node (recursively) recovers the checkpointed state.
357 struct State
358 {
359  State() : errors(0) {}
360 
363  bool rebooted = false;
364 
365  // TODO(jieyu): Consider using a vector of Option<Error> here so
366  // that we can print all the errors. This also applies to all the
367  // State structs above.
368  unsigned int errors;
369 };
370 
371 } // namespace state {
372 } // namespace slave {
373 } // namespace internal {
374 } // namespace mesos {
375 
376 #endif // __SLAVE_STATE_HPP__
FrameworkState()
Definition: state.hpp:305
unsigned int errors
Definition: state.hpp:280
Try< Nothing > downgradeResources(google::protobuf::RepeatedPtrField< Resource > *resources)
hashmap< FrameworkID, FrameworkState > frameworks
Definition: state.hpp:350
Definition: nothing.hpp:16
Definition: errorbase.hpp:35
Try< Nothing > rm(const std::string &path)
Definition: rm.hpp:26
Option< ResourcesState > resources
Definition: state.hpp:361
Option< process::UPID > libprocessPid
Definition: state.hpp:271
Definition: try.hpp:34
Try< Resources > read< Resources >(const std::string &path)
Definition: state.hpp:100
static Try< SlaveState > recover(const std::string &rootDir, const SlaveID &slaveId, bool strict)
unsigned int errors
Definition: state.hpp:335
Option< FrameworkInfo > info
Definition: state.hpp:314
Option< Resources > target
Definition: state.hpp:334
hashmap< TaskID, TaskState > tasks
Definition: state.hpp:269
Try< std::string > mktemp(const std::string &path=path::join(os::temp(),"XXXXXX"))
Definition: mktemp.hpp:36
Option< ContainerID > id
Definition: state.hpp:268
unsigned int errors
Definition: state.hpp:252
Definition: resources.hpp:79
Try< T > read(const std::string &path)
Definition: state.hpp:81
hashset< id::UUID > acks
Definition: state.hpp:251
static Try< RunState > recover(const std::string &rootDir, const SlaveID &slaveId, const FrameworkID &frameworkId, const ExecutorID &executorId, const ContainerID &containerId, bool strict)
static Try< ExecutorState > recover(const std::string &rootDir, const SlaveID &slaveId, const FrameworkID &frameworkId, const ExecutorID &executorId, bool strict)
event_base * base
std::vector< StatusUpdate > updates
Definition: state.hpp:250
std::string join(const std::string &path1, const std::string &path2, const char _separator=os::PATH_SEPARATOR)
Definition: path.hpp:56
Option< ContainerID > latest
Definition: state.hpp:297
Option< bool > http
Definition: state.hpp:275
void upgradeResources(google::protobuf::RepeatedPtrField< Resource > *resources)
Definition: hashmap.hpp:38
ExecutorID id
Definition: state.hpp:295
Resources resources
Definition: state.hpp:333
Represents a POSIX or Windows file system path and offers common path manipulations.
Definition: path.hpp:145
Try< Nothing > mkdir(const std::string &directory, bool recursive=true)
Definition: mkdir.hpp:31
RunState()
Definition: state.hpp:258
hashmap< ContainerID, RunState > runs
Definition: state.hpp:298
ExecutorState()
Definition: state.hpp:286
Option< pid_t > forkedPid
Definition: state.hpp:270
Option< ExecutorInfo > info
Definition: state.hpp:296
unsigned int errors
Definition: state.hpp:368
State()
Definition: state.hpp:359
static Try error(const E &e)
Definition: try.hpp:42
Try< State > recover(const std::string &rootDir, bool strict)
unsigned int errors
Definition: state.hpp:321
Option< Task > info
Definition: state.hpp:249
std::string dirname() const
Extracts the component up to, but not including, the final &#39;/&#39;.
Definition: path.hpp:238
FrameworkID id
Definition: state.hpp:313
Result< std::string > read(int_fd fd, size_t size)
Definition: read.hpp:50
Try< Nothing > checkpoint(const std::string &path, const std::string &message)
Definition: state.hpp:115
Try< Nothing > rename(const std::string &from, const std::string &to)
Definition: rename.hpp:27
bool isError() const
Definition: try.hpp:71
ResourcesState()
Definition: state.hpp:327
TaskState()
Definition: state.hpp:237
static Try< FrameworkState > recover(const std::string &rootDir, const SlaveID &slaveId, const FrameworkID &frameworkId, bool strict)
SlaveID id
Definition: state.hpp:348
Protocol< WriteRequest, WriteResponse > write
unsigned int errors
Definition: state.hpp:351
Try< uint32_t > type(const std::string &path)
std::string temp()
Definition: temp.hpp:27
Try< Nothing > checkpoint(const std::string &path, const T &t)
Definition: state.hpp:184
Option< SlaveState > slave
Definition: state.hpp:362
unsigned int errors
Definition: state.hpp:299
bool rebooted
Definition: state.hpp:363
SlaveState()
Definition: state.hpp:341
bool completed
Definition: state.hpp:278
static Try< TaskState > recover(const std::string &rootDir, const SlaveID &slaveId, const FrameworkID &frameworkId, const ExecutorID &executorId, const ContainerID &containerId, const TaskID &taskId, bool strict)
const T & get() const
Definition: try.hpp:73
Option< process::UPID > pid
Definition: state.hpp:318
Option< SlaveInfo > info
Definition: state.hpp:349
TaskID id
Definition: state.hpp:248
hashmap< ExecutorID, ExecutorState > executors
Definition: state.hpp:320
static Try< ResourcesState > recover(const std::string &rootDir, bool strict)
Definition: state.hpp:357