Apache Mesos
slave.hpp
Go to the documentation of this file.
1 // Licensed to the Apache Software Foundation (ASF) under one
2 // or more contributor license agreements. See the NOTICE file
3 // distributed with this work for additional information
4 // regarding copyright ownership. The ASF licenses this file
5 // to you under the Apache License, Version 2.0 (the
6 // "License"); you may not use this file except in compliance
7 // with the License. You may obtain a copy of the License at
8 //
9 // http://www.apache.org/licenses/LICENSE-2.0
10 //
11 // Unless required by applicable law or agreed to in writing, software
12 // distributed under the License is distributed on an "AS IS" BASIS,
13 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 // See the License for the specific language governing permissions and
15 // limitations under the License.
16 
17 #ifndef __SLAVE_HPP__
18 #define __SLAVE_HPP__
19 
20 #include <stdint.h>
21 
22 #include <list>
23 #include <memory>
24 #include <string>
25 #include <utility>
26 #include <vector>
27 
28 #include <mesos/attributes.hpp>
29 #include <mesos/resources.hpp>
30 #include <mesos/type_utils.hpp>
31 
32 #include <mesos/agent/agent.hpp>
33 
35 
37 
39 
41 
45 
47 
48 #include <process/http.hpp>
49 #include <process/future.hpp>
50 #include <process/owned.hpp>
51 #include <process/limiter.hpp>
52 #include <process/process.hpp>
53 #include <process/protobuf.hpp>
54 #include <process/shared.hpp>
55 #include <process/sequence.hpp>
56 
57 #include <stout/boundedhashmap.hpp>
58 #include <stout/bytes.hpp>
60 #include <stout/linkedhashmap.hpp>
61 #include <stout/hashmap.hpp>
62 #include <stout/hashset.hpp>
63 #include <stout/option.hpp>
64 #include <stout/os.hpp>
65 #include <stout/path.hpp>
66 #include <stout/recordio.hpp>
67 #include <stout/uuid.hpp>
68 
69 #include "common/heartbeater.hpp"
70 #include "common/http.hpp"
72 #include "common/recordio.hpp"
73 
74 #include "files/files.hpp"
75 
76 #include "internal/evolve.hpp"
77 
78 #include "messages/messages.hpp"
79 
82 
83 #include "slave/constants.hpp"
85 #include "slave/flags.hpp"
86 #include "slave/gc.hpp"
87 #include "slave/http.hpp"
88 #include "slave/metrics.hpp"
89 #include "slave/paths.hpp"
90 #include "slave/state.hpp"
91 
93 
94 // `REGISTERING` is used as an enum value, but it's actually defined as a
95 // constant in the Windows SDK.
96 #ifdef __WINDOWS__
97 #undef REGISTERING
98 #undef REGISTERED
99 #endif // __WINDOWS__
100 
101 namespace mesos {
102 
103 // Forward declarations.
104 class Authorizer;
105 class DiskProfileAdaptor;
106 
107 namespace internal {
108 namespace slave {
109 
110 // Some forward declarations.
111 class TaskStatusUpdateManager;
112 class Executor;
113 class Framework;
114 
115 struct ResourceProvider;
116 
117 
118 class Slave : public ProtobufProcess<Slave>
119 {
120 public:
121  Slave(const std::string& id,
122  const Flags& flags,
124  Containerizer* containerizer,
125  Files* files,
126  GarbageCollector* gc,
127  TaskStatusUpdateManager* taskStatusUpdateManager,
128  mesos::slave::ResourceEstimator* resourceEstimator,
129  mesos::slave::QoSController* qosController,
130  mesos::SecretGenerator* secretGenerator,
131  VolumeGidManager* volumeGidManager,
132  PendingFutureTracker* futureTracker,
133  process::Owned<CSIServer>&& csiServer,
134 #ifndef __WINDOWS__
135  const Option<process::network::unix::Socket>& executorSocket,
136 #endif // __WINDOWS__
137  const Option<Authorizer*>& authorizer);
138 
139  ~Slave() override;
140 
141  void shutdown(const process::UPID& from, const std::string& message);
142 
143  void registered(
144  const process::UPID& from,
145  const SlaveID& slaveId,
146  const MasterSlaveConnection& connection);
147 
148  void reregistered(
149  const process::UPID& from,
150  const SlaveID& slaveId,
151  const std::vector<ReconcileTasksMessage>& reconciliations,
152  const MasterSlaveConnection& connection);
153 
154  void doReliableRegistration(Duration maxBackoff);
155 
156  // TODO(mzhu): Combine this with `runTask()' and replace all `runTask()'
157  // mock with `run()` mock.
159  const process::UPID& from,
160  RunTaskMessage&& runTaskMessage);
161 
163  const std::vector<TaskInfo>& tasks);
164 
165  // Made 'virtual' for Slave mocking.
166  virtual void runTask(
167  const process::UPID& from,
168  const FrameworkInfo& frameworkInfo,
169  const FrameworkID& frameworkId,
170  const process::UPID& pid,
171  const TaskInfo& task,
172  const std::vector<ResourceVersionUUID>& resourceVersionUuids,
174 
175  void run(
176  const FrameworkInfo& frameworkInfo,
177  ExecutorInfo executorInfo,
178  Option<TaskInfo> task,
179  Option<TaskGroupInfo> taskGroup,
180  const std::vector<ResourceVersionUUID>& resourceVersionUuids,
181  const process::UPID& pid,
182  const Option<bool>& launchExecutor,
183  bool executorGeneratedForCommandTask);
184 
185  // Made 'virtual' for Slave mocking.
186  //
187  // This function returns a future so that we can encapsulate a task(group)
188  // launch operation (from agent receiving the run message to the completion
189  // of `_run()`) into a single future. This includes all the asynchronous
190  // steps (currently two: unschedule GC and task authorization) prior to the
191  // executor launch.
193  const FrameworkInfo& frameworkInfo,
194  const ExecutorInfo& executorInfo,
195  const Option<TaskInfo>& task,
196  const Option<TaskGroupInfo>& taskGroup,
197  const std::vector<ResourceVersionUUID>& resourceVersionUuids,
198  const Option<bool>& launchExecutor);
199 
200  // Made 'virtual' for Slave mocking.
201  virtual void __run(
202  const FrameworkInfo& frameworkInfo,
203  const ExecutorInfo& executorInfo,
204  const Option<TaskInfo>& task,
205  const Option<TaskGroupInfo>& taskGroup,
206  const std::vector<ResourceVersionUUID>& resourceVersionUuids,
207  const Option<bool>& launchExecutor,
208  bool executorGeneratedForCommandTask);
209 
210  // This is called when the resource limits of the container have
211  // been updated for the given tasks and task groups. If the update is
212  // successful, we flush the given tasks to the executor by sending
213  // RunTaskMessages or `LAUNCH_GROUP` events.
214  //
215  // Made 'virtual' for Slave mocking.
216  virtual void ___run(
217  const process::Future<Nothing>& future,
218  const FrameworkID& frameworkId,
219  const ExecutorID& executorId,
220  const ContainerID& containerId,
221  const std::vector<TaskInfo>& tasks,
222  const std::vector<TaskGroupInfo>& taskGroups);
223 
224  // TODO(mzhu): Combine this with `runTaskGroup()' and replace all
225  // `runTaskGroup()' mock with `run()` mock.
227  const process::UPID& from,
228  RunTaskGroupMessage&& runTaskGroupMessage);
229 
230  // Made 'virtual' for Slave mocking.
231  virtual void runTaskGroup(
232  const process::UPID& from,
233  const FrameworkInfo& frameworkInfo,
234  const ExecutorInfo& executorInfo,
235  const TaskGroupInfo& taskGroupInfo,
236  const std::vector<ResourceVersionUUID>& resourceVersionUuids,
237  const Option<bool>& launchExecutor);
238 
239  // Handler for the `KillTaskMessage`. Made 'virtual' for Slave mocking.
240  virtual void killTask(
241  const process::UPID& from,
242  const KillTaskMessage& killTaskMessage);
243 
244  // Helper to kill a pending task, which may or may not be associated with a
245  // valid `Executor` struct.
246  void killPendingTask(
247  const FrameworkID& frameworkId,
248  Framework* framework,
249  const TaskID& taskId);
250 
251  // Helper to kill a task belonging to a valid framework and executor. This
252  // function should be used to kill tasks which are queued or launched, but
253  // not tasks which are pending.
254  void kill(
255  const FrameworkID& frameworkId,
256  Framework* framework,
257  Executor* executor,
258  const TaskID& taskId,
259  const Option<KillPolicy>& killPolicy);
260 
261  // Made 'virtual' for Slave mocking.
262  virtual void shutdownExecutor(
263  const process::UPID& from,
264  const FrameworkID& frameworkId,
265  const ExecutorID& executorId);
266 
267  // Shut down an executor. This is a two phase process. First, an
268  // executor receives a shut down message (shut down phase), then
269  // after a configurable timeout the slave actually forces a kill
270  // (kill phase, via the isolator) if the executor has not
271  // exited.
272  //
273  // Made 'virtual' for Slave mocking.
274  virtual void _shutdownExecutor(Framework* framework, Executor* executor);
275 
276  void shutdownFramework(
277  const process::UPID& from,
278  const FrameworkID& frameworkId);
279 
280  void schedulerMessage(
281  const SlaveID& slaveId,
282  const FrameworkID& frameworkId,
283  const ExecutorID& executorId,
284  const std::string& data);
285 
286  void updateFramework(
287  const UpdateFrameworkMessage& message);
288 
289  void checkpointResourceState(const Resources& resources, bool changeTotal);
290 
292  std::vector<Resource> resources,
293  bool changeTotal);
294 
296  const std::vector<Resource>& resources);
297 
298  // Made 'virtual' for Slave mocking.
299  virtual void applyOperation(const ApplyOperationMessage& message);
300 
301  // Reconciles pending operations with the master. This is necessary to handle
302  // cases in which operations were dropped in transit, or in which an agent's
303  // `UpdateSlaveMessage` was sent at the same time as an operation was en route
304  // from the master to the agent.
305  void reconcileOperations(const ReconcileOperationsMessage& message);
306 
307  void subscribe(
309  const executor::Call::Subscribe& subscribe,
310  Framework* framework,
311  Executor* executor);
312 
313  void registerExecutor(
314  const process::UPID& from,
315  const FrameworkID& frameworkId,
316  const ExecutorID& executorId);
317 
318  // Called when an executor reregisters with a recovering slave.
319  // 'tasks' : Unacknowledged tasks (i.e., tasks that the executor
320  // driver never received an ACK for.)
321  // 'updates' : Unacknowledged updates.
322  void reregisterExecutor(
323  const process::UPID& from,
324  const FrameworkID& frameworkId,
325  const ExecutorID& executorId,
326  const std::vector<TaskInfo>& tasks,
327  const std::vector<StatusUpdate>& updates);
328 
329  void _reregisterExecutor(
330  const process::Future<Nothing>& future,
331  const FrameworkID& frameworkId,
332  const ExecutorID& executorId,
333  const ContainerID& containerId);
334 
335  void executorMessage(
336  const SlaveID& slaveId,
337  const FrameworkID& frameworkId,
338  const ExecutorID& executorId,
339  const std::string& data);
340 
341  void ping(const process::UPID& from, bool connected);
342 
343  // Handles the task status update.
344  // NOTE: If 'pid' is a valid UPID an ACK is sent to this pid
345  // after the update is successfully handled. If pid == UPID()
346  // no ACK is sent. The latter is used by the slave to send
347  // status updates it generated (e.g., TASK_LOST). If pid == None()
348  // an ACK is sent to the corresponding HTTP based executor.
349  // NOTE: StatusUpdate is passed by value because it is modified
350  // to ensure source field is set.
351  void statusUpdate(StatusUpdate update, const Option<process::UPID>& pid);
352 
353  // Called when the slave receives a `StatusUpdate` from an executor
354  // and the slave needs to retrieve the container status for the
355  // container associated with the executor.
356  void _statusUpdate(
357  StatusUpdate update,
358  const Option<process::UPID>& pid,
359  const ExecutorID& executorId,
360  const Option<process::Future<ContainerStatus>>& containerStatus);
361 
362  // Continue handling the status update after optionally updating the
363  // container's resources.
364  void __statusUpdate(
365  const Option<process::Future<Nothing>>& future,
366  const StatusUpdate& update,
367  const Option<process::UPID>& pid,
368  const ExecutorID& executorId,
369  const ContainerID& containerId,
370  bool checkpoint);
371 
372  // This is called when the task status update manager finishes
373  // handling the update. If the handling is successful, an
374  // acknowledgment is sent to the executor.
375  void ___statusUpdate(
376  const process::Future<Nothing>& future,
377  const StatusUpdate& update,
378  const Option<process::UPID>& pid);
379 
380  // This is called by task status update manager to forward a status
381  // update to the master. Note that the latest state of the task is
382  // added to the update before forwarding.
383  void forward(StatusUpdate update);
384 
385  // This is called by the operation status update manager to forward operation
386  // status updates to the master.
387  void sendOperationStatusUpdate(const UpdateOperationStatusMessage& update);
388 
390  const process::UPID& from,
391  const SlaveID& slaveId,
392  const FrameworkID& frameworkId,
393  const TaskID& taskId,
394  const std::string& uuid);
395 
397  const process::Future<bool>& future,
398  const TaskID& taskId,
399  const FrameworkID& frameworkId,
400  const UUID& uuid);
401 
403  const process::UPID& from,
404  const AcknowledgeOperationStatusMessage& acknowledgement);
405 
406  void drain(
407  const process::UPID& from,
408  DrainSlaveMessage&& drainSlaveMessage);
409 
410  void executorLaunched(
411  const FrameworkID& frameworkId,
412  const ExecutorID& executorId,
413  const ContainerID& containerId,
415 
416  // Made 'virtual' for Slave mocking.
417  virtual void executorTerminated(
418  const FrameworkID& frameworkId,
419  const ExecutorID& executorId,
420  const process::Future<Option<
421  mesos::slave::ContainerTermination>>& termination);
422 
423  // NOTE: Pulled these to public to make it visible for testing.
424  // TODO(vinod): Make tests friends to this class instead.
425 
426  // Garbage collects the directories based on the current disk usage.
427  // TODO(vinod): Instead of making this function public, we need to
428  // mock both GarbageCollector (and pass it through slave's constructor)
429  // and os calls.
431 
432  // Garbage collect image layers based on the disk usage of image
433  // store.
435 
436  // Invoked whenever the detector detects a change in masters.
437  // Made public for testing purposes.
438  void detected(const process::Future<Option<MasterInfo>>& _master);
439 
440  enum State
441  {
442  RECOVERING, // Slave is doing recovery.
443  DISCONNECTED, // Slave is not connected to the master.
444  RUNNING, // Slave has (re-)registered.
445  TERMINATING, // Slave is shutting down.
446  } state;
447 
448  // Describes information about agent recovery.
450  {
451  // Flag to indicate if recovery, including reconciling
452  // (i.e., reconnect/kill) with executors is finished.
454 
455  // Flag to indicate that HTTP based executors can
456  // subscribe with the agent. We allow them to subscribe
457  // after the agent recovers the containerizer.
458  bool reconnect = false;
459  } recoveryInfo;
460 
461  // TODO(benh): Clang requires members to be public in order to take
462  // their address which we do in tests (for things like
463  // FUTURE_DISPATCH).
464 // protected:
465  void initialize() override;
466  void finalize() override;
467  void exited(const process::UPID& pid) override;
468 
469  // Generates a secret for executor authentication. Returns None if there is
470  // no secret generator.
472  const FrameworkID& frameworkId,
473  const ExecutorID& executorId,
474  const ContainerID& containerId);
475 
476  // `executorInfo` is a mutated executor info with some default fields and
477  // resources. If an executor is launched for a task group, `taskInfo` would
478  // not be set.
479  void launchExecutor(
480  const process::Future<Option<Secret>>& authorizationToken,
481  const FrameworkID& frameworkId,
482  const ExecutorInfo& executorInfo,
483  const google::protobuf::Map<std::string, Value::Scalar>& executorLimits,
484  const Option<TaskInfo>& taskInfo);
485 
486  void fileAttached(const process::Future<Nothing>& result,
487  const std::string& path,
488  const std::string& virtualPath);
489 
490  Nothing detachFile(const std::string& path);
491 
492  // TODO(qianzhang): This is a workaround to make the default executor task's
493  // volume directory visible in MESOS UI. It handles two cases:
494  // 1. The task has disk resources specified. In this case any disk resources
495  // specified for the task are mounted on the top level container since
496  // currently all resources of nested containers are merged in the top
497  // level executor container. To make sure the task can access any volumes
498  // specified in its disk resources from its sandbox, a workaround was
499  // introduced to the default executor in MESOS-7225, i.e., adding a
500  // `SANDBOX_PATH` volume with type `PARENT` to the corresponding nested
501  // container. This volume gets translated into a bind mount in the nested
502  // container's mount namespace, which is not visible in Mesos UI because
503  // it operates in the host namespace. See MESOS-8279 for details.
504  // 2. The executor has disk resources specified and the task's ContainerInfo
505  // has a `SANDBOX_PATH` volume with type `PARENT` specified to share the
506  // executor's disk volume. Similar to the first case, this `SANDBOX_PATH`
507  // volume gets translated into a bind mount which is not visible in Mesos
508  // UI. See MESOS-8565 for details.
509  //
510  // To make the task's volume directory visible in Mesos UI, here we attach the
511  // executor's volume directory to it so that it can be accessed via the /files
512  // endpoint. So when users browse task's volume directory in Mesos UI, what
513  // they actually browse is the executor's volume directory. Note when calling
514  // `Files::attach()`, the third argument `authorized` is not specified because
515  // it is already specified when we do the attach for the executor's sandbox
516  // and it also applies to the executor's tasks. Note that for the second case
517  // we can not do the attach when the task's ContainerInfo has a `SANDBOX_PATH`
518  // volume with type `PARENT` but the executor has NO disk resources, because
519  // in such case the attach will fail due to the executor's volume directory
520  // not existing which will actually be created by the `volume/sandbox_path`
521  // isolator when launching the nested container. But if the executor has disk
522  // resources, then we will not have this issue since the executor's volume
523  // directory will be created by the `filesystem/linux` isolator when launching
524  // the executor before we do the attach.
526  const ExecutorInfo& executorInfo,
527  const ContainerID& executorContainerId,
528  const Task& task);
529 
530  // TODO(qianzhang): Remove the task's volume directory from the /files
531  // endpoint. This is a workaround for MESOS-8279 and MESOS-8565.
533  const ExecutorInfo& executorInfo,
534  const ContainerID& executorContainerId,
535  const std::vector<Task>& tasks);
536 
537  // Triggers a re-detection of the master when the slave does
538  // not receive a ping.
540 
541  // Made virtual for testing purpose.
542  virtual void authenticate(Duration minTimeout, Duration maxTimeout);
543 
544  // Helper routines to lookup a framework/executor.
545  Framework* getFramework(const FrameworkID& frameworkId) const;
546 
548  const FrameworkID& frameworkId,
549  const ExecutorID& executorId) const;
550 
551  Executor* getExecutor(const ContainerID& containerId) const;
552 
553  // Returns the ExecutorInfo associated with a TaskInfo. If the task has no
554  // ExecutorInfo, then we generate an ExecutorInfo corresponding to the
555  // command executor.
556  ExecutorInfo getExecutorInfo(
557  const FrameworkInfo& frameworkInfo,
558  const TaskInfo& task) const;
559 
560  // Shuts down the executor if it did not register yet.
562  const FrameworkID& frameworkId,
563  const ExecutorID& executorId,
564  const ContainerID& containerId);
565 
566  // Cleans up all un-reregistered executors during recovery.
568 
569  // This function returns the max age of executor/slave directories allowed,
570  // given a disk usage. This value could be used to tune gc.
571  Duration age(double usage);
572 
573  // Checks the current disk usage and schedules for gc as necessary.
574  void checkDiskUsage();
575 
576  // Checks the current container image disk usage and trigger image
577  // gc if necessary.
578  void checkImageDiskUsage();
579 
580  // Recovers the slave, task status update manager and isolator.
582 
583  // This is called after 'recover()'. If 'flags.reconnect' is
584  // 'reconnect', the slave attempts to reconnect to any old live
585  // executors. Otherwise, the slave attempts to shutdown/kill them.
587 
588  // This is a helper to call `recover()` on the volume gid manager.
590 
591  // This is a helper to call `recover()` on the task status update manager.
593  const Option<state::SlaveState>& slaveState);
594 
595  // This is a helper to call `recover()` on the containerizer at the end of
596  // `recover()` and before `__recover()`.
597  // TODO(idownes): Remove this when we support defers to objects.
599  const Option<state::SlaveState>& state);
600 
601  // This is called after `_recoverContainerizer()`. It will add all
602  // checkpointed operations affecting agent default resources and call
603  // `OperationStatusUpdateManager::recover()`.
605  const Option<state::SlaveState>& state);
606 
607  // This is called after `OperationStatusUpdateManager::recover()`
608  // completes.
609  //
610  // If necessary it will add any missing operation status updates
611  // that couldn't be checkpointed before the agent failed over.
614 
615  // This is called when recovery finishes.
616  // Made 'virtual' for Slave mocking.
617  virtual void __recover(const process::Future<Nothing>& future);
618 
619  // Helper to recover a framework from the specified state.
620  void recoverFramework(
621  const state::FrameworkState& state,
622  const hashset<ExecutorID>& executorsToRecheckpoint,
623  const hashmap<ExecutorID, hashset<TaskID>>& tasksToRecheckpoint);
624 
625  // Removes and garbage collects the executor.
626  void removeExecutor(Framework* framework, Executor* executor);
627 
628  // Removes and garbage collects the framework.
629  // Made 'virtual' for Slave mocking.
630  virtual void removeFramework(Framework* framework);
631 
632  // Schedules a 'path' for gc based on its modification time.
633  process::Future<Nothing> garbageCollect(const std::string& path);
634 
635  // Called when the slave was signaled from the specified user.
636  void signaled(int signal, int uid);
637 
638  // Made 'virtual' for Slave mocking.
639  virtual void qosCorrections();
640 
641  // Made 'virtual' for Slave mocking.
642  virtual void _qosCorrections(
644  mesos::slave::QoSCorrection>>& correction);
645 
646  // Returns the resource usage information for all executors.
648 
649  // Handle the second phase of shutting down an executor for those
650  // executors that have not properly shutdown within a timeout.
652  const FrameworkID& frameworkId,
653  const ExecutorID& executorId,
654  const ContainerID& containerId);
655 
656 private:
657  friend class Executor;
658  friend class Framework;
659  friend class Http;
660 
661  friend struct Metrics;
662 
663  Slave(const Slave&) = delete;
664  Slave& operator=(const Slave&) = delete;
665 
666  void _authenticate(Duration currentMinTimeout, Duration currentMaxTimeout);
667 
668  // Process creation of persistent volumes (for CREATE) and/or deletion
669  // of persistent volumes (for DESTROY) as a part of handling
670  // checkpointed resources, and commit the checkpointed resources on
671  // successful completion of all the operations.
672  Try<Nothing> syncCheckpointedResources(
673  const Resources& newCheckpointedResources);
674 
675  process::Future<bool> authorizeTask(
676  const TaskInfo& task,
677  const FrameworkInfo& frameworkInfo);
678 
679  process::Future<bool> authorizeSandboxAccess(
681  const FrameworkID& frameworkId,
682  const ExecutorID& executorId);
683 
684  void sendExecutorTerminatedStatusUpdate(
685  const TaskID& taskId,
686  const process::Future<Option<
687  mesos::slave::ContainerTermination>>& termination,
688  const FrameworkID& frameworkId,
689  const Executor* executor);
690 
691  void sendExitedExecutorMessage(
692  const FrameworkID& frameworkId,
693  const ExecutorID& executorId,
694  const Option<int>& status = None());
695 
696  // Forwards the current total of oversubscribed resources.
697  void forwardOversubscribed();
698  void _forwardOversubscribed(
699  const process::Future<Resources>& oversubscribable);
700 
701  // Helper functions to generate `UpdateSlaveMessage` for either
702  // just updates to resource provider-related information, or both
703  // resource provider-related information and oversubscribed
704  // resources.
705  UpdateSlaveMessage generateResourceProviderUpdate() const;
706  UpdateSlaveMessage generateUpdateSlaveMessage() const;
707 
708  void handleResourceProviderMessage(
710 
711  void addOperation(Operation* operation);
712 
713  // Transitions the operation, and recovers resource if the operation becomes
714  // terminal.
715  void updateOperation(
716  Operation* operation,
717  const UpdateOperationStatusMessage& update);
718 
719  // Update the `latest_status` of the operation if it is not terminal.
720  void updateOperationLatestStatus(
721  Operation* operation,
722  const OperationStatus& status);
723 
724  void removeOperation(Operation* operation);
725 
726  process::Future<Nothing> markResourceProviderGone(
727  const ResourceProviderID& resourceProviderId) const;
728 
729  Operation* getOperation(const UUID& uuid) const;
730 
731  void addResourceProvider(ResourceProvider* resourceProvider);
732  ResourceProvider* getResourceProvider(const ResourceProviderID& id) const;
733 
734  void apply(Operation* operation);
735 
736  // Prepare all resources to be consumed by the specified container.
737  process::Future<Nothing> publishResources(
738  const ContainerID& containerId, const Resources& resources);
739 
740  // PullGauge methods.
741  double _frameworks_active()
742  {
743  return static_cast<double>(frameworks.size());
744  }
745 
746  double _uptime_secs()
747  {
748  return (process::Clock::now() - startTime).secs();
749  }
750 
751  double _registered()
752  {
753  return master.isSome() ? 1 : 0;
754  }
755 
756  double _tasks_staging();
757  double _tasks_starting();
758  double _tasks_running();
759  double _tasks_killing();
760 
761  double _executors_registering();
762  double _executors_running();
763  double _executors_terminating();
764 
765  double _executor_directory_max_allowed_age_secs();
766 
767  double _resources_total(const std::string& name);
768  double _resources_used(const std::string& name);
769  double _resources_percent(const std::string& name);
770 
771  double _resources_revocable_total(const std::string& name);
772  double _resources_revocable_used(const std::string& name);
773  double _resources_revocable_percent(const std::string& name);
774 
775  // Checks whether the two `SlaveInfo` objects are considered
776  // compatible based on the value of the `--configuration_policy`
777  // flag.
778  Try<Nothing> compatible(
779  const SlaveInfo& previous,
780  const SlaveInfo& current) const;
781 
782  void initializeResourceProviderManager(
783  const Flags& flags,
784  const SlaveID& slaveId);
785 
786  // This function is used to compute limits for executors before they
787  // are launched as well as when updating running executors, so we must
788  // accept both `TaskInfo` and `Task` types to handle both cases.
789  google::protobuf::Map<std::string, Value::Scalar> computeExecutorLimits(
790  const Resources& executorResources,
791  const std::vector<TaskInfo>& taskInfos,
792  const std::vector<Task*>& tasks = {}) const;
793 
794  protobuf::master::Capabilities requiredMasterCapabilities;
795 
796  const Flags flags;
797 
798  const Http http;
799 
800  SlaveInfo info;
801 
802  protobuf::slave::Capabilities capabilities;
803 
804  // Resources that are checkpointed by the slave.
805  Resources checkpointedResources;
806 
807  // The current total resources of the agent, i.e., `info.resources()` with
808  // checkpointed resources applied and resource provider resources.
809  Resources totalResources;
810 
812 
814 
815  // Note that these frameworks are "completed" only in that
816  // they no longer have any active tasks or executors on this
817  // particular agent.
818  //
819  // TODO(bmahler): Implement a more accurate framework lifecycle
820  // in the agent code, ideally the master can inform the agent
821  // when a framework is actually completed, and the agent can
822  // perhaps store a cache of "idle" frameworks. See MESOS-7890.
824 
826 
827  Containerizer* containerizer;
828 
829  Files* files;
830 
832 
833  process::Time startTime;
834 
835  GarbageCollector* gc;
836 
837  TaskStatusUpdateManager* taskStatusUpdateManager;
838 
839  OperationStatusUpdateManager operationStatusUpdateManager;
840 
841  // Master detection future.
843 
844  // Master's ping timeout value, updated on reregistration.
845  Duration masterPingTimeout;
846 
847  // Timer for triggering re-detection when no ping is received from
848  // the master.
849  process::Timer pingTimer;
850 
851  // Timer for triggering agent (re)registration after detecting a new master.
852  process::Timer agentRegistrationTimer;
853 
854  // Root meta directory containing checkpointed data.
855  const std::string metaDir;
856 
857  // Indicates the number of errors ignored in "--no-strict" recovery mode.
858  unsigned int recoveryErrors;
859 
860  Option<Credential> credential;
861 
862  // Authenticatee name as supplied via flags.
863  std::string authenticateeName;
864 
865  Authenticatee* authenticatee;
866 
867  // Indicates if an authentication attempt is in progress.
868  Option<process::Future<bool>> authenticating;
869 
870  // Indicates if the authentication is successful.
871  bool authenticated;
872 
873  // Indicates if a new authentication attempt should be enforced.
874  bool reauthenticate;
875 
876  // Maximum age of executor directories. Will be recomputed
877  // periodically every flags.disk_watch_interval.
878  Duration executorDirectoryMaxAllowedAge;
879 
880  mesos::slave::ResourceEstimator* resourceEstimator;
881 
882  mesos::slave::QoSController* qosController;
883 
884  std::shared_ptr<DiskProfileAdaptor> diskProfileAdaptor;
885 
886  mesos::SecretGenerator* secretGenerator;
887 
888  VolumeGidManager* volumeGidManager;
889 
890  PendingFutureTracker* futureTracker;
891 
892  process::Owned<CSIServer> csiServer;
893 
894 #ifndef __WINDOWS__
896 #endif // __WINDOWS__
897 
898  Option<process::http::Server> executorSocketServer;
899 
900  const Option<Authorizer*> authorizer;
901 
902  // The most recent estimate of the total amount of oversubscribed
903  // (allocated and oversubscribable) resources.
904  Option<Resources> oversubscribedResources;
905 
906  process::Owned<ResourceProviderManager> resourceProviderManager;
907  process::Owned<LocalResourceProviderDaemon> localResourceProviderDaemon;
908 
909  // Local resource providers known by the agent.
911 
912  // Used to establish the relationship between the operation and the
913  // resources that the operation is operating on. Each resource
914  // provider will keep a resource version UUID, and change it when it
915  // believes that the resources from this resource provider are out
916  // of sync from the master's view. The master will keep track of
917  // the last known resource version UUID for each resource provider,
918  // and attach the resource version UUID in each operation it sends
919  // out. The resource provider should reject operations that have a
920  // different resource version UUID than that it maintains, because
921  // this means the operation is operating on resources that might
922  // have already been invalidated.
923  UUID resourceVersion;
924 
925  // Keeps track of pending operations or terminal operations that
926  // have unacknowledged status updates. These operations may affect
927  // either agent default resources or resources offered by a resource
928  // provider.
929  hashmap<UUID, Operation*> operations;
930 
931  // Maps framework-supplied operation IDs to the operation's internal UUID.
932  // This is used to satisfy some reconciliation requests which are forwarded
933  // from the master to the agent.
935 
936  // Operations that are checkpointed by the agent.
937  hashmap<UUID, Operation> checkpointedOperations;
938 
939  // If the agent is currently draining, contains the configuration used to
940  // drain the agent. If NONE, the agent is not currently draining.
941  Option<DrainConfig> drainConfig;
942 
943  // Time when this agent was last asked to drain. This field
944  // is empty if the agent is not currently draining.
945  Option<process::Time> estimatedDrainStartTime;
946 
947  // Check whether draining is finished and possibly remove
948  // both in-memory and persisted drain configuration.
949  void updateDrainStatus();
950 };
951 
952 
953 std::ostream& operator<<(std::ostream& stream, const Executor& executor);
954 
955 
956 // Information describing an executor.
957 class Executor
958 {
959 public:
960  Executor(
961  Slave* slave,
962  const FrameworkID& frameworkId,
963  const ExecutorInfo& info,
964  const ContainerID& containerId,
965  const std::string& directory,
966  const Option<std::string>& user,
967  bool checkpoint,
968  bool isGeneratedForCommandTask);
969 
970  ~Executor();
971 
972  // Note that these tasks will also be tracked within `queuedTasks`.
973  void enqueueTaskGroup(const TaskGroupInfo& taskGroup);
974 
975  void enqueueTask(const TaskInfo& task);
976  Option<TaskInfo> dequeueTask(const TaskID& taskId);
977  Task* addLaunchedTask(const TaskInfo& task);
978  void completeTask(const TaskID& taskId);
979  void checkpointExecutor();
980  void checkpointTask(const TaskInfo& task);
981  void checkpointTask(const Task& task);
982 
983  void recoverTask(const state::TaskState& state, bool recheckpointTask);
984 
985  void addPendingTaskStatus(const TaskStatus& status);
986  void removePendingTaskStatus(const TaskStatus& status);
987 
988  Try<Nothing> updateTaskState(const TaskStatus& status);
989 
990  // Returns true if there are any queued/launched/terminated tasks.
991  bool incompleteTasks();
992 
993  // Returns true if the agent ever sent any tasks to this executor.
994  // More precisely, this function returns whether either:
995  //
996  // (1) There are terminated/completed tasks with a
997  // SOURCE_EXECUTOR status.
998  //
999  // (2) `launchedTasks` is not empty.
1000  //
1001  // If this function returns false and there are no queued tasks,
1002  // we probably (see TODO below) have killed or dropped all of its
1003  // initial tasks, in which case the agent will shut down the executor.
1004  //
1005  // TODO(mzhu): Note however, that since we look through the cache
1006  // of completed tasks, we can have false positives when a task
1007  // that was delivered to the executor has been evicted from the
1008  // completed task cache by tasks that have been killed by the
1009  // agent before delivery. This should be fixed.
1010  //
1011  // NOTE: This function checks whether any tasks has ever been sent,
1012  // this does not necessarily mean the executor has ever received
1013  // any tasks. Specifically, tasks in `launchedTasks` may be dropped
1014  // before received by the executor if the agent restarts.
1015  bool everSentTask() const;
1016 
1017  // Sends a message to the connected executor.
1018  template <typename Message>
1019  void send(const Message& message)
1020  {
1021  if (state == REGISTERING || state == TERMINATED) {
1022  LOG(WARNING) << "Attempting to send message to disconnected"
1023  << " executor " << *this << " in state " << state;
1024  }
1025 
1026  if (http.isSome()) {
1027  if (!http->send(message)) {
1028  LOG(WARNING) << "Unable to send event to executor " << *this
1029  << ": connection closed";
1030  }
1031  } else if (pid.isSome()) {
1032  slave->send(pid.get(), message);
1033  } else {
1034  LOG(WARNING) << "Unable to send event to executor " << *this
1035  << ": unknown connection type";
1036  }
1037  }
1038 
1039  // Returns true if this executor is generated by Mesos for a command
1040  // task (either command executor for MesosContainerizer or docker
1041  // executor for DockerContainerizer).
1042  bool isGeneratedForCommandTask() const;
1043 
1044  // Closes the HTTP connection.
1045  void closeHttpConnection();
1046 
1047  // Returns the task group associated with the task.
1048  Option<TaskGroupInfo> getQueuedTaskGroup(const TaskID& taskId);
1049 
1050  Resources allocatedResources() const;
1051 
1052  enum State
1053  {
1054  REGISTERING, // Executor is launched but not (re-)registered yet.
1055  RUNNING, // Executor has (re-)registered.
1056  TERMINATING, // Executor is being shutdown/killed.
1057  TERMINATED, // Executor has terminated but there might be pending updates.
1058  } state;
1059 
1060  // We store the pointer to 'Slave' to get access to its methods
1061  // variables. One could imagine 'Executor' as being an inner class
1062  // of the 'Slave' class.
1064 
1065  const ExecutorID id;
1066  const ExecutorInfo info;
1067 
1068  const FrameworkID frameworkId;
1069 
1070  const ContainerID containerId;
1071 
1072  const std::string directory;
1073 
1074  // The sandbox will be owned by this user and the executor will
1075  // run as this user. This can be set to None when --switch_user
1076  // is false or when compiled for Windows.
1078 
1079  const bool checkpoint;
1080 
1081  // An Executor can either be connected via HTTP or by libprocess
1082  // message passing. The following are the possible states:
1083  //
1084  // Agent State Executor State http pid Executor Type
1085  // ----------- -------------- ---- ---- -------------
1086  // RECOVERING REGISTERING None UPID() Unknown
1087  // REGISTERING None Some Libprocess
1088  // REGISTERING None None HTTP
1089  //
1090  // * REGISTERING None None Not known yet
1091  // * * None Some Libprocess
1092  // * * Some None HTTP
1096 
1098 
1099  // Tasks can be found in one of the following four data structures:
1100  //
1101  // TODO(bmahler): Make these private to enforce that the task
1102  // lifecycle helper functions are not being bypassed, and provide
1103  // public views into them.
1104 
1105  // Not yet launched tasks. This also includes tasks from `queuedTaskGroups`.
1107 
1108  // Not yet launched task groups. This is needed for correctly sending
1109  // TASK_KILLED status updates for all tasks in the group if any of the
1110  // tasks were killed before the executor could register with the agent.
1111  std::vector<TaskGroupInfo> queuedTaskGroups;
1112 
1113  // Running.
1115 
1116  // Terminated but pending updates.
1118 
1119  // Terminated and updates acked.
1120  // NOTE: We use a shared pointer for Task because clang doesn't like
1121  // stout's implementation of circular_buffer with Task (the Boost code
1122  // used internally by stout attempts to do some memset's which are unsafe).
1123  circular_buffer<std::shared_ptr<Task>> completedTasks;
1124 
1125  // When the slave initiates a destroy of the container, we expect a
1126  // termination to occur. The 'pendingTermation' indicates why the
1127  // slave initiated the destruction and will influence the
1128  // information sent in the status updates for any remaining
1129  // non-terminal tasks.
1131 
1132  // Task status updates that are being processed by the agent.
1134 
1135 private:
1136  Executor(const Executor&) = delete;
1137  Executor& operator=(const Executor&) = delete;
1138 
1139  bool isGeneratedForCommandTask_;
1140 };
1141 
1142 
1143 // Information about a framework.
1145 {
1146 public:
1147  Framework(
1148  Slave* slave,
1149  const Flags& slaveFlags,
1150  const FrameworkInfo& info,
1151  const Option<process::UPID>& pid);
1152 
1153  ~Framework();
1154 
1155  // Returns whether the framework is idle, where idle is
1156  // defined as having no activity:
1157  // (1) The framework has no non-terminal tasks and executors.
1158  // (2) All status updates have been acknowledged.
1159  //
1160  // TODO(bmahler): The framework should also not be considered
1161  // idle if there are unacknowledged updates for "pending" tasks.
1162  bool idle() const;
1163 
1164  void checkpointFramework() const;
1165 
1166  const FrameworkID id() const { return info.id(); }
1167 
1168  Try<Executor*> addExecutor(
1169  const ExecutorInfo& executorInfo,
1170  bool isGeneratedForCommandTask);
1171 
1172  Executor* getExecutor(const ExecutorID& executorId) const;
1173  Executor* getExecutor(const TaskID& taskId) const;
1174 
1175  void destroyExecutor(const ExecutorID& executorId);
1176 
1177  void recoverExecutor(
1178  const state::ExecutorState& state,
1179  bool recheckpointExecutor,
1180  const hashset<TaskID>& tasksToRecheckpoint);
1181 
1182  void addPendingTask(
1183  const ExecutorID& executorId,
1184  const TaskInfo& task);
1185 
1186  // Note that these tasks will also be tracked within `pendingTasks`.
1187  void addPendingTaskGroup(
1188  const ExecutorID& executorId,
1189  const TaskGroupInfo& taskGroup);
1190 
1191  bool hasTask(const TaskID& taskId) const;
1192  bool isPending(const TaskID& taskId) const;
1193 
1194  // Returns the task group associated with a pending task.
1195  Option<TaskGroupInfo> getTaskGroupForPendingTask(const TaskID& taskId);
1196 
1197  // Returns whether the pending task was removed.
1198  bool removePendingTask(const TaskID& taskId);
1199 
1200  Option<ExecutorID> getExecutorIdForPendingTask(const TaskID& taskId) const;
1201 
1202  Resources allocatedResources() const;
1203 
1204  enum State
1205  {
1206  RUNNING, // First state of a newly created framework.
1207  TERMINATING, // Framework is shutting down in the cluster.
1208  } state;
1209 
1210  // We store the pointer to 'Slave' to get access to its methods and
1211  // variables. One could imagine 'Framework' being an inner class of
1212  // the 'Slave' class.
1214 
1215  FrameworkInfo info;
1216 
1218 
1219  // Frameworks using the scheduler driver will have a 'pid',
1220  // which allows us to send executor messages directly to the
1221  // driver. Frameworks using the HTTP API (in 0.24.0) will
1222  // not have a 'pid', in which case executor messages are
1223  // sent through the master.
1225 
1226  // Executors can be found in one of the following
1227  // data structures:
1228  //
1229  // TODO(bmahler): Make these private to enforce that
1230  // the executors lifecycle helper functions are not
1231  // being bypassed, and provide public views into them.
1232 
1233  // Executors with pending tasks.
1235 
1236  // Sequences in this map are used to enforce the order of tasks launched on
1237  // each executor.
1238  //
1239  // Note on the lifecycle of the sequence: if the corresponding executor struct
1240  // has not been created, we tie the lifecycle of the sequence to the first
1241  // task in the sequence (which must have the `launch_executor` flag set to
1242  // true modulo MESOS-3870). If the task fails to launch before creating the
1243  // executor struct, we will delete the sequence. Once the executor struct is
1244  // created, we tie the lifecycle of the sequence to the executor struct.
1245  //
1246  // TODO(mzhu): Create the executor struct early and put the sequence in it.
1248 
1249  // Pending task groups. This is needed for correctly sending
1250  // TASK_KILLED status updates for all tasks in the group if
1251  // any of the tasks are killed while pending.
1252  std::vector<TaskGroupInfo> pendingTaskGroups;
1253 
1254  // Current running executors.
1256 
1257  circular_buffer<process::Owned<Executor>> completedExecutors;
1258 
1259 private:
1260  Framework(const Framework&) = delete;
1261  Framework& operator=(const Framework&) = delete;
1262 };
1263 
1264 
1266 {
1268  const ResourceProviderInfo& _info,
1269  const Resources& _totalResources,
1270  const Option<UUID>& _resourceVersion)
1271  : info(_info),
1272  totalResources(_totalResources),
1273  resourceVersion(_resourceVersion) {}
1274 
1275  void addOperation(Operation* operation);
1276  void removeOperation(Operation* operation);
1277 
1278  ResourceProviderInfo info;
1280 
1281  // Used to establish the relationship between the operation and the
1282  // resources that the operation is operating on. Each resource
1283  // provider will keep a resource version UUID, and change it when it
1284  // believes that the resources from this resource provider are out
1285  // of sync from the master's view. The master will keep track of
1286  // the last known resource version UUID for each resource provider,
1287  // and attach the resource version UUID in each operation it sends
1288  // out. The resource provider should reject operations that have a
1289  // different resource version UUID than that it maintains, because
1290  // this means the operation is operating on resources that might
1291  // have already been invalidated.
1293 
1294  // Pending operations or terminal operations that have
1295  // unacknowledged status updates.
1297 };
1298 
1299 
1313 std::map<std::string, std::string> executorEnvironment(
1314  const Flags& flags,
1315  const ExecutorInfo& executorInfo,
1316  const std::string& directory,
1317  const SlaveID& slaveId,
1318  const process::PID<Slave>& slavePid,
1319  const Option<Secret>& authenticationToken,
1320  bool checkpoint);
1321 
1322 
1323 std::ostream& operator<<(std::ostream& stream, Executor::State state);
1324 std::ostream& operator<<(std::ostream& stream, Framework::State state);
1325 std::ostream& operator<<(std::ostream& stream, Slave::State state);
1326 
1327 } // namespace slave {
1328 } // namespace internal {
1329 } // namespace mesos {
1330 
1331 #endif // __SLAVE_HPP__
void schedulerMessage(const SlaveID &slaveId, const FrameworkID &frameworkId, const ExecutorID &executorId, const std::string &data)
struct mesos::internal::slave::Slave::RecoveryInfo recoveryInfo
Definition: path.hpp:29
virtual process::Future< ResourceUsage > usage()
hashmap< TaskID, LinkedHashMap< id::UUID, TaskStatus > > pendingStatusUpdates
Definition: slave.hpp:1133
Try< Nothing > checkpoint(const std::string &path, const std::string &message, bool sync, bool downgradeResources)
Definition: state.hpp:123
void executorMessage(const SlaveID &slaveId, const FrameworkID &frameworkId, const ExecutorID &executorId, const std::string &data)
circular_buffer< process::Owned< Executor > > completedExecutors
Definition: slave.hpp:1257
void subscribe(StreamingHttpConnection< v1::executor::Event > http, const executor::Call::Subscribe &subscribe, Framework *framework, Executor *executor)
void sendOperationStatusUpdate(const UpdateOperationStatusMessage &update)
Try< uid_t > uid(const std::string &path, const FollowSymlink follow=FollowSymlink::FOLLOW_SYMLINK)
Definition: stat.hpp:224
Definition: nothing.hpp:16
Definition: http.hpp:42
Try< bool > update(const std::string &link, const Handle &parent, uint16_t protocol, const action::Mirror &mirror)
State
Definition: slave.hpp:1052
circular_buffer< std::shared_ptr< Task > > completedTasks
Definition: slave.hpp:1123
process::Owned< ResponseHeartbeater< executor::Event, v1::executor::Event > > heartbeater
Definition: slave.hpp:1095
void _reregisterExecutor(const process::Future< Nothing > &future, const FrameworkID &frameworkId, const ExecutorID &executorId, const ContainerID &containerId)
Duration age(double usage)
Definition: master.hpp:27
const Option< std::string > user
Definition: slave.hpp:1077
std::ostream & operator<<(std::ostream &stream, const MesosContainerizerProcess::State &state)
const ExecutorInfo info
Definition: slave.hpp:1066
virtual void removeFramework(Framework *framework)
Definition: check.hpp:33
void send(const Message &message)
Definition: slave.hpp:1019
virtual void shutdownExecutor(const process::UPID &from, const FrameworkID &frameworkId, const ExecutorID &executorId)
void detachTaskVolumeDirectories(const ExecutorInfo &executorInfo, const ContainerID &executorContainerId, const std::vector< Task > &tasks)
void _checkDiskUsage(const process::Future< double > &usage)
Framework * getFramework(const FrameworkID &frameworkId) const
Definition: hashset.hpp:53
virtual void ___run(const process::Future< Nothing > &future, const FrameworkID &frameworkId, const ExecutorID &executorId, const ContainerID &containerId, const std::vector< TaskInfo > &tasks, const std::vector< TaskGroupInfo > &taskGroups)
Definition: protobuf_utils.hpp:332
void _statusUpdateAcknowledgement(const process::Future< bool > &future, const TaskID &taskId, const FrameworkID &frameworkId, const UUID &uuid)
ResourceProvider(const ResourceProviderInfo &_info, const Resources &_totalResources, const Option< UUID > &_resourceVersion)
Definition: slave.hpp:1267
void removeExecutor(Framework *framework, Executor *executor)
process::Future< Nothing > _recoverContainerizer(const Option< state::SlaveState > &state)
process::Future< Nothing > _recoverVolumeGidManager(bool rebooted)
Result< std::string > user(Option< uid_t > uid=None())
Definition: su.hpp:284
hashmap< ExecutorID, process::Sequence > taskLaunchSequences
Definition: slave.hpp:1247
Result< ProcessStatus > status(pid_t pid)
Definition: proc.hpp:166
Option< UUID > resourceVersion
Definition: slave.hpp:1292
Option< StreamingHttpConnection< v1::executor::Event > > http
Definition: slave.hpp:1093
Definition: resources.hpp:83
friend class Executor
Definition: slave.hpp:657
void handleRunTaskGroupMessage(const process::UPID &from, RunTaskGroupMessage &&runTaskGroupMessage)
virtual void runTaskGroup(const process::UPID &from, const FrameworkInfo &frameworkInfo, const ExecutorInfo &executorInfo, const TaskGroupInfo &taskGroupInfo, const std::vector< ResourceVersionUUID > &resourceVersionUuids, const Option< bool > &launchExecutor)
bool reconnect
Definition: slave.hpp:458
virtual void executorTerminated(const FrameworkID &frameworkId, const ExecutorID &executorId, const process::Future< Option< mesos::slave::ContainerTermination >> &termination)
Definition: volume_gid_manager.hpp:42
Option< process::UPID > pid
Definition: slave.hpp:1224
Definition: files.hpp:73
Option< Error > validateResourceLimitsAndIsolators(const std::vector< TaskInfo > &tasks)
hashmap< ExecutorID, hashmap< TaskID, TaskInfo > > pendingTasks
Definition: slave.hpp:1234
enum mesos::internal::slave::Slave::State state
Slave(const std::string &id, const Flags &flags, mesos::master::detector::MasterDetector *detector, Containerizer *containerizer, Files *files, GarbageCollector *gc, TaskStatusUpdateManager *taskStatusUpdateManager, mesos::slave::ResourceEstimator *resourceEstimator, mesos::slave::QoSController *qosController, mesos::SecretGenerator *secretGenerator, VolumeGidManager *volumeGidManager, PendingFutureTracker *futureTracker, process::Owned< CSIServer > &&csiServer, const Option< process::network::unix::Socket > &executorSocket, const Option< Authorizer * > &authorizer)
Operation
Definition: cgroups.hpp:444
void _checkImageDiskUsage(const process::Future< double > &usage)
void reregisterExecutor(const process::UPID &from, const FrameworkID &frameworkId, const ExecutorID &executorId, const std::vector< TaskInfo > &tasks, const std::vector< StatusUpdate > &updates)
Definition: flags.hpp:39
Slave * slave
Definition: slave.hpp:1213
Definition: duration.hpp:32
Option< mesos::slave::ContainerTermination > pendingTermination
Definition: slave.hpp:1130
std::map< std::string, std::string > executorEnvironment(const Flags &flags, const ExecutorInfo &executorInfo, const std::string &directory, const SlaveID &slaveId, const process::PID< Slave > &slavePid, const Option< Secret > &authenticationToken, bool checkpoint)
Returns a map of environment variables necessary in order to launch an executor.
void recoverFramework(const state::FrameworkState &state, const hashset< ExecutorID > &executorsToRecheckpoint, const hashmap< ExecutorID, hashset< TaskID >> &tasksToRecheckpoint)
Definition: protobuf_utils.hpp:548
void send(const process::UPID &to, const google::protobuf::Message &message)
Definition: protobuf.hpp:126
bool isSome() const
Definition: option.hpp:116
process::Future< Option< Secret > > generateSecret(const FrameworkID &frameworkId, const ExecutorID &executorId, const ContainerID &containerId)
Definition: task_status_update_manager.hpp:58
LinkedHashMap< TaskID, TaskInfo > queuedTasks
Definition: slave.hpp:1106
void statusUpdateAcknowledgement(const process::UPID &from, const SlaveID &slaveId, const FrameworkID &frameworkId, const TaskID &taskId, const std::string &uuid)
const bool checkpoint
Definition: slave.hpp:1079
void signaled(int signal, int uid)
process::Future< Nothing > __recoverOperations(const process::Future< OperationStatusUpdateManagerState > &state)
virtual void _shutdownExecutor(Framework *framework, Executor *executor)
Definition: hashmap.hpp:38
process::Future< Nothing > garbageCollect(const std::string &path)
void finalize() override
Invoked when a process is terminated.
void operationStatusAcknowledgement(const process::UPID &from, const AcknowledgeOperationStatusMessage &acknowledgement)
Definition: resource_estimator.hpp:37
hashmap< std::string, MessageHandler > message
Definition: process.hpp:455
Resources allocatedResources(const Resources &resources, const std::string &role)
Definition: containerizer.hpp:64
void shutdownExecutorTimeout(const FrameworkID &frameworkId, const ExecutorID &executorId, const ContainerID &containerId)
void attachTaskVolumeDirectory(const ExecutorInfo &executorInfo, const ContainerID &executorContainerId, const Task &task)
Definition: slave.hpp:118
void registered(const process::UPID &from, const SlaveID &slaveId, const MasterSlaveConnection &connection)
An "untyped" PID, used to encapsulate the process ID for lower-layer abstractions (eg...
Definition: pid.hpp:39
Definition: future_tracker.hpp:84
void pingTimeout(process::Future< Option< MasterInfo >> future)
void handleRunTaskMessage(const process::UPID &from, RunTaskMessage &&runTaskMessage)
const ContainerID containerId
Definition: slave.hpp:1070
void kill(const FrameworkID &frameworkId, Framework *framework, Executor *executor, const TaskID &taskId, const Option< KillPolicy > &killPolicy)
Option< process::UPID > pid
Definition: slave.hpp:1097
virtual void __run(const FrameworkInfo &frameworkInfo, const ExecutorInfo &executorInfo, const Option< TaskInfo > &task, const Option< TaskGroupInfo > &taskGroup, const std::vector< ResourceVersionUUID > &resourceVersionUuids, const Option< bool > &launchExecutor, bool executorGeneratedForCommandTask)
bool send(const Message &message)
Definition: http.hpp:159
ExecutorInfo getExecutorInfo(const FrameworkInfo &frameworkInfo, const TaskInfo &task) const
void forward(StatusUpdate update)
Definition: protobuf_utils.hpp:631
void reconcileOperations(const ReconcileOperationsMessage &message)
Definition: agent.hpp:25
const FrameworkID id() const
Definition: slave.hpp:1166
void initialize() override
Invoked when a process gets spawned.
const T & get() const &
Definition: option.hpp:119
protobuf::framework::Capabilities capabilities
Definition: slave.hpp:1217
process::Future< Nothing > _recover()
virtual process::Future< Nothing > _run(const FrameworkInfo &frameworkInfo, const ExecutorInfo &executorInfo, const Option< TaskInfo > &task, const Option< TaskGroupInfo > &taskGroup, const std::vector< ResourceVersionUUID > &resourceVersionUuids, const Option< bool > &launchExecutor)
void exited(const process::UPID &pid) override
Invoked when a linked process has exited.
ResourceProviderInfo info
Definition: slave.hpp:1278
void checkpointResourcesMessage(const std::vector< Resource > &resources)
The SecretGenerator interface represents a mechanism to create a secret from a principal.
Definition: secret_generator.hpp:34
Definition: protobuf.hpp:108
const ExecutorID id
Definition: slave.hpp:1065
void run(const FrameworkInfo &frameworkInfo, ExecutorInfo executorInfo, Option< TaskInfo > task, Option< TaskGroupInfo > taskGroup, const std::vector< ResourceVersionUUID > &resourceVersionUuids, const process::UPID &pid, const Option< bool > &launchExecutor, bool executorGeneratedForCommandTask)
State
Definition: slave.hpp:1204
void executorLaunched(const FrameworkID &frameworkId, const ExecutorID &executorId, const ContainerID &containerId, const process::Future< Containerizer::LaunchResult > &future)
process::Future< Option< state::SlaveState > > _recoverTaskStatusUpdates(const Option< state::SlaveState > &slaveState)
Definition: time.hpp:23
void reregistered(const process::UPID &from, const SlaveID &slaveId, const std::vector< ReconcileTasksMessage > &reconciliations, const MasterSlaveConnection &connection)
void fileAttached(const process::Future< Nothing > &result, const std::string &path, const std::string &virtualPath)
std::vector< TaskGroupInfo > pendingTaskGroups
Definition: slave.hpp:1252
Try< std::vector< Entry > > list(const std::string &hierarchy, const std::string &cgroup)
void _statusUpdate(StatusUpdate update, const Option< process::UPID > &pid, const ExecutorID &executorId, const Option< process::Future< ContainerStatus >> &containerStatus)
void launchExecutor(const process::Future< Option< Secret >> &authorizationToken, const FrameworkID &frameworkId, const ExecutorInfo &executorInfo, const google::protobuf::Map< std::string, Value::Scalar > &executorLimits, const Option< TaskInfo > &taskInfo)
Definition: boundedhashmap.hpp:27
const FrameworkID frameworkId
Definition: slave.hpp:1068
void detected(const process::Future< Option< MasterInfo >> &_master)
#define flags
Definition: decoder.hpp:18
A "process identifier" used to uniquely identify a process when dispatching messages.
Definition: pid.hpp:289
hashmap< ExecutorID, Executor * > executors
Definition: slave.hpp:1255
Definition: none.hpp:27
Definition: attributes.hpp:24
Slave * slave
Definition: slave.hpp:1063
LinkedHashMap< TaskID, Task * > launchedTasks
Definition: slave.hpp:1114
process::Future< Nothing > recover(const Try< state::State > &state)
void ___statusUpdate(const process::Future< Nothing > &future, const StatusUpdate &update, const Option< process::UPID > &pid)
Definition: timer.hpp:30
void shutdownFramework(const process::UPID &from, const FrameworkID &frameworkId)
Nothing detachFile(const std::string &path)
virtual void killTask(const process::UPID &from, const KillTaskMessage &killTaskMessage)
friend class Framework
Definition: slave.hpp:658
virtual void __recover(const process::Future< Nothing > &future)
Resources totalResources
Definition: slave.hpp:1279
std::vector< TaskGroupInfo > queuedTaskGroups
Definition: slave.hpp:1111
An abstraction of a Master detector which can be used to detect the leading master from a group...
Definition: detector.hpp:38
static Time now()
The current clock time for either the current process that makes this call or the global clock time i...
hashmap< UUID, Operation * > operations
Definition: slave.hpp:1296
void ping(const process::UPID &from, bool connected)
Definition: metrics.hpp:32
void __statusUpdate(const Option< process::Future< Nothing >> &future, const StatusUpdate &update, const Option< process::UPID > &pid, const ExecutorID &executorId, const ContainerID &containerId, bool checkpoint)
void doReliableRegistration(Duration maxBackoff)
process::Promise< Nothing > recovered
Definition: slave.hpp:453
void drain(const process::UPID &from, DrainSlaveMessage &&drainSlaveMessage)
Definition: qos_controller.hpp:44
const std::string directory
Definition: slave.hpp:1072
void statusUpdate(StatusUpdate update, const Option< process::UPID > &pid)
void checkpointResourceState(const Resources &resources, bool changeTotal)
URI http(const std::string &host, const std::string &path="/", const Option< int > &port=None(), const Option< std::string > &query=None(), const Option< std::string > &fragment=None(), const Option< std::string > &user=None(), const Option< std::string > &password=None())
Creates an http URI with the given parameters.
Definition: http.hpp:35
State
Definition: slave.hpp:440
virtual void authenticate(Duration minTimeout, Duration maxTimeout)
virtual void runTask(const process::UPID &from, const FrameworkInfo &frameworkInfo, const FrameworkID &frameworkId, const process::UPID &pid, const TaskInfo &task, const std::vector< ResourceVersionUUID > &resourceVersionUuids, const Option< bool > &launchExecutor)
FrameworkInfo info
Definition: slave.hpp:1215
Definition: owned.hpp:36
void shutdown(const process::UPID &from, const std::string &message)
process::Future< Nothing > _recoverOperations(const Option< state::SlaveState > &state)
Definition: slave.hpp:957
virtual void _qosCorrections(const process::Future< std::list< mesos::slave::QoSCorrection >> &correction)
Executor * getExecutor(const FrameworkID &frameworkId, const ExecutorID &executorId) const
Definition: parse.hpp:33
void registerExecutor(const process::UPID &from, const FrameworkID &frameworkId, const ExecutorID &executorId)
PID< MetricsProcess > metrics
void updateFramework(const UpdateFrameworkMessage &message)
LinkedHashMap< TaskID, Task * > terminatedTasks
Definition: slave.hpp:1117
constexpr const char * name
Definition: shell.hpp:41
void killPendingTask(const FrameworkID &frameworkId, Framework *framework, const TaskID &taskId)
void registerExecutorTimeout(const FrameworkID &frameworkId, const ExecutorID &executorId, const ContainerID &containerId)
Definition: slave.hpp:1144
virtual void applyOperation(const ApplyOperationMessage &message)
Definition: authenticatee.hpp:29