Apache Mesos
master.hpp
Go to the documentation of this file.
1 // Licensed to the Apache Software Foundation (ASF) under one
2 // or more contributor license agreements. See the NOTICE file
3 // distributed with this work for additional information
4 // regarding copyright ownership. The ASF licenses this file
5 // to you under the Apache License, Version 2.0 (the
6 // "License"); you may not use this file except in compliance
7 // with the License. You may obtain a copy of the License at
8 //
9 // http://www.apache.org/licenses/LICENSE-2.0
10 //
11 // Unless required by applicable law or agreed to in writing, software
12 // distributed under the License is distributed on an "AS IS" BASIS,
13 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 // See the License for the specific language governing permissions and
15 // limitations under the License.
16 
17 #ifndef __MASTER_HPP__
18 #define __MASTER_HPP__
19 
20 #include <stdint.h>
21 
22 #include <functional>
23 #include <list>
24 #include <memory>
25 #include <set>
26 #include <string>
27 #include <vector>
28 
29 #include <mesos/mesos.hpp>
30 #include <mesos/resources.hpp>
31 #include <mesos/roles.hpp>
32 #include <mesos/type_utils.hpp>
33 
35 
39 #include <mesos/master/master.hpp>
40 
42 
43 #include <mesos/quota/quota.hpp>
44 
46 
47 #include <process/collect.hpp>
48 #include <process/future.hpp>
49 #include <process/limiter.hpp>
50 #include <process/http.hpp>
51 #include <process/owned.hpp>
52 #include <process/process.hpp>
53 #include <process/protobuf.hpp>
54 #include <process/sequence.hpp>
55 #include <process/timer.hpp>
56 
58 
59 #include <stout/boundedhashmap.hpp>
60 #include <stout/cache.hpp>
62 #include <stout/foreach.hpp>
63 #include <stout/hashmap.hpp>
64 #include <stout/hashset.hpp>
65 #include <stout/linkedhashmap.hpp>
66 #include <stout/multihashmap.hpp>
67 #include <stout/nothing.hpp>
68 #include <stout/option.hpp>
69 #include <stout/recordio.hpp>
70 #include <stout/try.hpp>
71 #include <stout/uuid.hpp>
72 
73 #include "common/heartbeater.hpp"
74 #include "common/http.hpp"
76 
77 #include "files/files.hpp"
78 
79 #include "internal/devolve.hpp"
80 #include "internal/evolve.hpp"
81 
82 #include "master/authorization.hpp"
83 #include "master/constants.hpp"
84 #include "master/flags.hpp"
85 #include "master/machine.hpp"
86 #include "master/metrics.hpp"
87 #include "master/validation.hpp"
88 
89 #include "messages/messages.hpp"
90 
91 namespace process {
92 class RateLimiter; // Forward declaration.
93 }
94 
95 namespace mesos {
96 
97 // Forward declarations.
98 class Authorizer;
99 class ObjectApprovers;
100 
101 namespace internal {
102 
103 // Forward declarations.
104 namespace registry {
105 class Slaves;
106 }
107 
108 class Registry;
109 class WhitelistWatcher;
110 
111 namespace master {
112 
113 class Master;
114 class Registrar;
115 class SlaveObserver;
116 
117 struct BoundedRateLimiter;
118 struct Framework;
119 struct Role;
120 
121 
122 struct Slave
123 {
124 Slave(Master* const _master,
125  SlaveInfo _info,
126  const process::UPID& _pid,
127  const MachineID& _machineId,
128  const std::string& _version,
129  std::vector<SlaveInfo::Capability> _capabilites,
130  const process::Time& _registeredTime,
131  std::vector<Resource> _checkpointedResources,
132  const Option<UUID>& _resourceVersion,
133  std::vector<ExecutorInfo> executorInfos = std::vector<ExecutorInfo>(),
134  std::vector<Task> tasks = std::vector<Task>());
135 
136  ~Slave();
137 
138  Task* getTask(
139  const FrameworkID& frameworkId,
140  const TaskID& taskId) const;
141 
142  void addTask(Task* task);
143 
144  // Update slave to recover the resources that were previously
145  // being used by `task`.
146  //
147  // TODO(bmahler): This is a hack for performance. We need to
148  // maintain resource counters because computing task resources
149  // functionally for all tasks is expensive, for now.
150  void recoverResources(Task* task);
151 
152  void removeTask(Task* task);
153 
154  void addOperation(Operation* operation);
155 
156  void recoverResources(Operation* operation);
157 
158  void removeOperation(Operation* operation);
159 
160  // Marks a non-speculative operation as an orphan when the originating
161  // framework is torn down by the master, or when an agent reregisters
162  // with operations from unknown frameworks. If the operation is
163  // non-terminal, this has the side effect of modifying the agent's
164  // total resources, and should therefore be followed by
165  // `allocator->updateSlave()`.
166  void markOperationAsOrphan(Operation* operation);
167 
168  Operation* getOperation(const UUID& uuid) const;
169 
170  void addOffer(Offer* offer);
171 
172  void removeOffer(Offer* offer);
173 
174  void addInverseOffer(InverseOffer* inverseOffer);
175 
176  void removeInverseOffer(InverseOffer* inverseOffer);
177 
178  bool hasExecutor(
179  const FrameworkID& frameworkId,
180  const ExecutorID& executorId) const;
181 
182  void addExecutor(
183  const FrameworkID& frameworkId,
184  const ExecutorInfo& executorInfo);
185 
186  void removeExecutor(
187  const FrameworkID& frameworkId,
188  const ExecutorID& executorId);
189 
190  void apply(const std::vector<ResourceConversion>& conversions);
191 
193  const SlaveInfo& info,
194  const std::string& _version,
195  const std::vector<SlaveInfo::Capability>& _capabilites,
196  const Resources& _checkpointedResources,
197  const Option<UUID>& resourceVersion);
198 
199  Master* const master;
200  const SlaveID id;
201  SlaveInfo info;
202 
203  const MachineID machineId;
204 
206 
207  // TODO(bmahler): Use stout's Version when it can parse labels, etc.
208  std::string version;
209 
210  // Agent capabilities.
212 
215 
216  // Slave becomes disconnected when the socket closes.
217  bool connected;
218 
219  // Slave becomes deactivated when it gets disconnected, or when the
220  // agent is deactivated via the DRAIN_AGENT or DEACTIVATE_AGENT calls.
221  // No offers will be made for a deactivated slave.
222  bool active;
223 
224  // Timer for marking slaves unreachable that become disconnected and
225  // don't reregister. This timeout is larger than the slave
226  // observer's timeout, so typically the slave observer will be the
227  // one to mark such slaves unreachable; this timer is a backup for
228  // when a slave responds to pings but does not reregister (e.g.,
229  // because agent recovery has hung).
231 
232  // Executors running on this slave.
233  //
234  // TODO(bmahler): Make this private to enforce that `addExecutor()`
235  // and `removeExecutor()` are used, and provide a const view into
236  // the executors.
238 
239  // Tasks that have not yet been launched because they are currently
240  // being authorized. This is similar to Framework's pendingTasks but we
241  // track pendingTasks per agent separately to determine if any offer
242  // operation for this agent would change resources requested by these tasks.
244 
245  // Tasks present on this slave.
246  //
247  // TODO(bmahler): Make this private to enforce that `addTask()` and
248  // `removeTask()` are used, and provide a const view into the tasks.
249  //
250  // TODO(bmahler): The task pointer ownership complexity arises from the fact
251  // that we own the pointer here, but it's shared with the Framework struct.
252  // We should find a way to eliminate this.
254 
255  // Tasks that were asked to kill by frameworks.
256  // This is used for reconciliation when the slave reregisters.
258 
259  // Pending operations or terminal operations that have
260  // unacknowledged status updates on this agent.
262 
263  // Pending operations whose originating framework is unknown.
264  // These operations could be pending, or terminal with unacknowledged
265  // status updates.
266  //
267  // This list can be populated whenever a framework is torn down in the
268  // lifetime of the master, or when an agent reregisters with an operation.
269  //
270  // If the originating framework is completed, the master will
271  // acknowledge any status updates instead of the framework.
272  // If an orphan does not belong to a completed framework, the master
273  // will only acknowledge status updates after a fixed delay.
275 
276  // Active offers on this slave.
278 
279  // Active inverse offers on this slave.
281 
282  // Resources for active task / executors / operations.
283  // Note that we maintain multiple copies of each shared resource in
284  // `usedResources` as they are used by multiple tasks.
286 
288 
289  // Resources that should be checkpointed by the slave (e.g.,
290  // persistent volumes, dynamic reservations, etc). These are either
291  // in use by a task/executor, or are available for use and will be
292  // re-offered to the framework.
293  // TODO(jieyu): `checkpointedResources` is only for agent default
294  // resources. Resources from resource providers are not included in
295  // this field. Consider removing this field.
297 
298  // The current total resources of the slave. Note that this is
299  // different from 'info.resources()' because this also considers
300  // operations (e.g., CREATE, RESERVE) that have been applied and
301  // includes revocable resources and resources from resource
302  // providers as well.
304 
305  // Used to establish the relationship between the operation and the
306  // resources that the operation is operating on. Each resource
307  // provider will keep a resource version UUID, and change it when it
308  // believes that the resources from this resource provider are out
309  // of sync from the master's view. The master will keep track of
310  // the last known resource version UUID for each resource provider,
311  // and attach the resource version UUID in each operation it sends
312  // out. The resource provider should reject operations that have a
313  // different resource version UUID than that it maintains, because
314  // this means the operation is operating on resources that might
315  // have already been invalidated.
317 
318  SlaveObserver* observer;
319 
320  // Time when this agent was last asked to drain. This field
321  // is empty if the agent is not currently draining or drained.
323 
325  ResourceProviderInfo info;
327 
328  // Used to establish the relationship between the operation and the
329  // resources that the operation is operating on. Each resource
330  // provider will keep a resource version UUID, and change it when it
331  // believes that the resources from this resource provider are out
332  // of sync from the master's view. The master will keep track of
333  // the last known resource version UUID for each resource provider,
334  // and attach the resource version UUID in each operation it sends
335  // out. The resource provider should reject operations that have a
336  // different resource version UUID than that it maintains, because
337  // this means the operation is operating on resources that might
338  // have already been invalidated.
340 
341  // Pending operations or terminal operations that have
342  // unacknowledged status updates.
344  };
345 
347 
348 private:
349  Slave(const Slave&); // No copying.
350  Slave& operator=(const Slave&); // No assigning.
351 };
352 
353 
354 inline std::ostream& operator<<(std::ostream& stream, const Slave& slave)
355 {
356  return stream << slave.id << " at " << slave.pid
357  << " (" << slave.info.hostname() << ")";
358 }
359 
360 
361 class Master : public ProtobufProcess<Master>
362 {
363 public:
365  Registrar* registrar,
366  Files* files,
369  const Option<Authorizer*>& authorizer,
370  const Option<std::shared_ptr<process::RateLimiter>>&
371  slaveRemovalLimiter,
372  const Flags& flags = Flags());
373 
374  ~Master() override;
375 
376  // Compare this master's capabilities with registry's minimum capability.
377  // Return the set of capabilities missing from this master.
378  static hashset<std::string> missingMinimumCapabilities(
379  const MasterInfo& masterInfo, const Registry& registry);
380 
381  // Message handlers.
382  void submitScheduler(
383  const std::string& name);
384 
385  void registerFramework(
386  const process::UPID& from,
387  RegisterFrameworkMessage&& registerFrameworkMessage);
388 
389  void reregisterFramework(
390  const process::UPID& from,
391  ReregisterFrameworkMessage&& reregisterFrameworkMessage);
392 
393  void unregisterFramework(
394  const process::UPID& from,
395  const FrameworkID& frameworkId);
396 
397  void deactivateFramework(
398  const process::UPID& from,
399  const FrameworkID& frameworkId);
400 
401  // TODO(vinod): Remove this once the old driver is removed.
402  void resourceRequest(
403  const process::UPID& from,
404  const FrameworkID& frameworkId,
405  const std::vector<Request>& requests);
406 
407  void launchTasks(
408  const process::UPID& from,
409  LaunchTasksMessage&& launchTasksMessage);
410 
411  void reviveOffers(
412  const process::UPID& from,
413  const FrameworkID& frameworkId,
414  const std::vector<std::string>& role);
415 
416  void killTask(
417  const process::UPID& from,
418  const FrameworkID& frameworkId,
419  const TaskID& taskId);
420 
421  void statusUpdateAcknowledgement(
422  const process::UPID& from,
423  StatusUpdateAcknowledgementMessage&& statusUpdateAcknowledgementMessage);
424 
425  void schedulerMessage(
426  const process::UPID& from,
427  FrameworkToExecutorMessage&& frameworkToExecutorMessage);
428 
429  void executorMessage(
430  const process::UPID& from,
431  ExecutorToFrameworkMessage&& executorToFrameworkMessage);
432 
433  void registerSlave(
434  const process::UPID& from,
435  RegisterSlaveMessage&& registerSlaveMessage);
436 
437  void reregisterSlave(
438  const process::UPID& from,
439  ReregisterSlaveMessage&& incomingMessage);
440 
441  void unregisterSlave(
442  const process::UPID& from,
443  const SlaveID& slaveId);
444 
445  void statusUpdate(
446  StatusUpdateMessage&& statusUpdateMessage);
447 
448  void reconcileTasks(
449  const process::UPID& from,
450  ReconcileTasksMessage&& reconcileTasksMessage);
451 
452  void updateOperationStatus(
453  UpdateOperationStatusMessage&& update);
454 
455  void exitedExecutor(
456  const process::UPID& from,
457  const SlaveID& slaveId,
458  const FrameworkID& frameworkId,
459  const ExecutorID& executorId,
460  int32_t status);
461 
462  void updateSlave(UpdateSlaveMessage&& message);
463 
464  void updateUnavailability(
465  const MachineID& machineId,
467 
468  // Marks the agent unreachable and returns whether the agent was
469  // marked unreachable. Returns false if the agent is already
470  // in a transitioning state or has transitioned into another
471  // state (this includes already being marked unreachable).
472  // The `duringMasterFailover` parameter specifies whether this
473  // agent is transitioning from a recovered state (true) or a
474  // registered state (false).
475  //
476  // Discarding currently not supported.
477  //
478  // Will not return a failure (this will crash the master
479  // internally in the case of a registry failure).
480  process::Future<bool> markUnreachable(
481  const SlaveInfo& slave,
482  bool duringMasterFailover,
483  const std::string& message);
484 
485  void markGone(const SlaveID& slaveId, const TimeInfo& goneTime);
486 
487  void authenticate(
488  const process::UPID& from,
489  const process::UPID& pid);
490 
491  // TODO(bmahler): It would be preferred to use a unique libprocess
492  // Process identifier (PID is not sufficient) for identifying the
493  // framework instance, rather than relying on re-registration time.
494  void frameworkFailoverTimeout(
495  const FrameworkID& frameworkId,
496  const process::Time& reregisteredTime);
497 
498  void offer(
499  const FrameworkID& frameworkId,
500  const hashmap<std::string, hashmap<SlaveID, Resources>>& resources);
501 
502  void inverseOffer(
503  const FrameworkID& frameworkId,
504  const hashmap<SlaveID, UnavailableResources>& resources);
505 
506  // Invoked when there is a newly elected leading master.
507  // Made public for testing purposes.
508  void detected(const process::Future<Option<MasterInfo>>& _leader);
509 
510  // Invoked when the contender has lost the candidacy.
511  // Made public for testing purposes.
512  void lostCandidacy(const process::Future<Nothing>& lost);
513 
514  // Continuation of recover().
515  // Made public for testing purposes.
516  process::Future<Nothing> _recover(const Registry& registry);
517 
518  MasterInfo info() const
519  {
520  return info_;
521  }
522 
523 protected:
524  void initialize() override;
525  void finalize() override;
526 
527  void consume(process::MessageEvent&& event) override;
528  void consume(process::ExitedEvent&& event) override;
529 
530  void exited(const process::UPID& pid) override;
531  void exited(
532  const FrameworkID& frameworkId,
534 
535  void _exited(Framework* framework);
536 
537  // Invoked upon noticing a subscriber disconnection.
538  void exited(const id::UUID& id);
539 
540  void agentReregisterTimeout(const SlaveID& slaveId);
541  Nothing _agentReregisterTimeout(const SlaveID& slaveId);
542 
543  // Invoked when the message is ready to be executed after
544  // being throttled.
545  // 'principal' being None indicates it is throttled by
546  // 'defaultLimiter'.
547  void throttled(
548  process::MessageEvent&& event,
549  const Option<std::string>& principal);
550 
551  // Continuations of consume().
552  void _consume(process::MessageEvent&& event);
553  void _consume(process::ExitedEvent&& event);
554 
555  // Helper method invoked when the capacity for a framework
556  // principal is exceeded.
557  void exceededCapacity(
558  const process::MessageEvent& event,
559  const Option<std::string>& principal,
560  uint64_t capacity);
561 
562  // Recovers state from the registrar.
564  void recoveredSlavesTimeout(const Registry& registry);
565 
566  void _registerSlave(
567  const process::UPID& pid,
568  RegisterSlaveMessage&& registerSlaveMessage,
570  const process::Future<bool>& authorized);
571 
572  void __registerSlave(
573  const process::UPID& pid,
574  RegisterSlaveMessage&& registerSlaveMessage,
575  const process::Future<bool>& admit);
576 
577  void _reregisterSlave(
578  const process::UPID& pid,
579  ReregisterSlaveMessage&& incomingMessage,
581  const process::Future<bool>& authorized);
582 
583  void __reregisterSlave(
584  const process::UPID& pid,
585  ReregisterSlaveMessage&& incomingMessage,
586  const process::Future<bool>& readmit);
587 
588  void ___reregisterSlave(
589  const process::UPID& pid,
590  ReregisterSlaveMessage&& incomingMessage,
591  const process::Future<bool>& updated);
592 
593  void updateSlaveFrameworks(
594  Slave* slave,
595  const std::vector<FrameworkInfo>& frameworks);
596 
597  // 'future' is the future returned by the authenticator.
598  void _authenticate(
599  const process::UPID& pid,
600  const process::Future<Option<std::string>>& future);
601 
602  void authenticationTimeout(process::Future<Option<std::string>> future);
603 
604  void fileAttached(const process::Future<Nothing>& result,
605  const std::string& path);
606 
607  // Invoked when the contender has entered the contest.
608  void contended(const process::Future<process::Future<Nothing>>& candidacy);
609 
610  // When a slave that was previously registered with this master
611  // reregisters, we need to reconcile the master's view of the
612  // slave's tasks and executors. This function also sends the
613  // `SlaveReregisteredMessage`.
614  void reconcileKnownSlave(
615  Slave* slave,
616  const std::vector<ExecutorInfo>& executors,
617  const std::vector<Task>& tasks);
618 
619  // Add a framework.
620  void addFramework(
621  Framework* framework,
622  const std::set<std::string>& suppressedRoles);
623 
624  // Recover a framework from its `FrameworkInfo`. This happens after
625  // master failover, when an agent running one of the framework's
626  // tasks reregisters or when the framework itself reregisters,
627  // whichever happens first. The result of this function is a
628  // registered, inactive framework with state `RECOVERED`.
629  void recoverFramework(
630  const FrameworkInfo& info,
631  const std::set<std::string>& suppressedRoles);
632 
633  // Transition a framework from `RECOVERED` to `CONNECTED` state and
634  // activate it. This happens at most once after master failover, the
635  // first time that the framework reregisters with the new master.
636  // Exactly one of `newPid` or `http` must be provided.
637  void activateRecoveredFramework(
638  Framework* framework,
639  const FrameworkInfo& frameworkInfo,
640  const Option<process::UPID>& pid,
642  const std::set<std::string>& suppressedRoles);
643 
644  // Replace the scheduler for a framework with a new process ID, in
645  // the event of a scheduler failover.
646  void failoverFramework(Framework* framework, const process::UPID& newPid);
647 
648  // Replace the scheduler for a framework with a new HTTP connection,
649  // in the event of a scheduler failover.
650  void failoverFramework(
651  Framework* framework,
653 
654  void _failoverFramework(Framework* framework);
655 
656  // Kill all of a framework's tasks, delete the framework object, and
657  // reschedule offers that were assigned to this framework.
658  void removeFramework(Framework* framework);
659 
660  // Remove a framework from the slave, i.e., remove its tasks and
661  // executors and recover the resources.
662  void removeFramework(Slave* slave, Framework* framework);
663 
664  // Performs actions common for all the framework update paths.
665  //
666  // NOTE: the fields 'id', 'principal', 'name' and 'checkpoint' in the
667  // 'frameworkInfo' should have the same values as in 'framework->info',
668  // otherwise this method terminates the program.
669  //
670  // TODO(asekretenko): Make sending FrameworkInfo updates to slaves, API
671  // subscribers and anywhere else a responsibility of this method -
672  // currently is is not, see MESOS-9746. After that we can remove the
673  // 'sendFrameworkUpdates()' method.
674  void updateFramework(
675  Framework* framework,
676  const FrameworkInfo& frameworkInfo,
677  const std::set<std::string>& suppressedRoles);
678 
679  void sendFrameworkUpdates(const Framework& framework);
680 
681  void disconnect(Framework* framework);
682  void deactivate(Framework* framework, bool rescind);
683 
684  void disconnect(Slave* slave);
685 
686  // Removes the agent from the resource offer cycle (and rescinds active
687  // offers). Other aspects of the agent will continue to function normally.
688  void deactivate(Slave* slave);
689 
690  // Adds the agent back to the resource offer cycle.
691  // Must *NOT* be called if the agent is `deactivated`.
692  void reactivate(Slave* slave);
693 
694  // Add a slave.
695  void addSlave(
696  Slave* slave,
697  std::vector<Archive::Framework>&& completedFrameworks);
698 
699  void _markUnreachable(
700  const SlaveInfo& slave,
701  const TimeInfo& unreachableTime,
702  bool duringMasterFailover,
703  const std::string& message,
704  bool registrarResult);
705 
706  void sendSlaveLost(const SlaveInfo& slaveInfo);
707 
708  // Remove the slave from the registrar and from the master's state.
709  //
710  // TODO(bmahler): 'reason' is optional until MESOS-2317 is resolved.
711  void removeSlave(
712  Slave* slave,
713  const std::string& message,
715 
716  // Removes an agent from the master's state in the following cases:
717  // * When maintenance is started on an agent
718  // * When an agent registers with a new ID from a previously-known IP + port
719  // * When an agent unregisters itself with an `UnregisterSlaveMessage`
720  void _removeSlave(
721  Slave* slave,
722  const process::Future<bool>& registrarResult,
723  const std::string& removalCause,
725 
726  // Removes an agent from the master's state in the following cases:
727  // * When marking an agent unreachable
728  // * When marking an agent gone
729  //
730  // NOTE that in spite of the name `__removeSlave()`, this function is NOT a
731  // continuation of `_removeSlave()`. Rather, these two functions perform
732  // similar logic for slightly different cases.
733  //
734  // TODO(greggomann): refactor `_removeSlave` and `__removeSlave` into a single
735  // common helper function. (See MESOS-9550)
736  void __removeSlave(
737  Slave* slave,
738  const std::string& message,
739  const Option<TimeInfo>& unreachableTime);
740 
741  // Validates that the framework is authenticated, if required.
742  Option<Error> validateFrameworkAuthentication(
743  const FrameworkInfo& frameworkInfo,
744  const process::UPID& from);
745 
746  // Returns whether the principal is authorized for the specified
747  // action-object pair.
748  // Returns failure for transient authorization failures.
749  process::Future<bool> authorize(
751  authorization::ActionObject&& actionObject);
752 
753  // Overload of authorize() for cases which require multiple action-object
754  // pairs to be authorized simultaneously.
755  process::Future<bool> authorize(
757  std::vector<authorization::ActionObject>&& actionObjects);
758 
759  // Returns whether the framework is authorized.
760  // Returns failure for transient authorization failures.
761  process::Future<bool> authorizeFramework(
762  const FrameworkInfo& frameworkInfo);
763 
764  // Determine if a new executor needs to be launched.
765  bool isLaunchExecutor (
766  const ExecutorID& executorId,
767  Framework* framework,
768  Slave* slave) const;
769 
770  // Add executor to the framework and slave.
771  void addExecutor(
772  const ExecutorInfo& executorInfo,
773  Framework* framework,
774  Slave* slave);
775 
776  // Add task to the framework and slave.
777  void addTask(const TaskInfo& task, Framework* framework, Slave* slave);
778 
779  // Transitions the task, and recovers resources if the task becomes
780  // terminal.
781  void updateTask(Task* task, const StatusUpdate& update);
782 
783  // Removes the task. `unreachable` indicates whether the task is removed due
784  // to being unreachable. Note that we cannot rely on the task state because
785  // it may not reflect unreachability due to being set to TASK_LOST for
786  // backwards compatibility.
787  void removeTask(Task* task, bool unreachable = false);
788 
789  // Remove an executor and recover its resources.
790  void removeExecutor(
791  Slave* slave,
792  const FrameworkID& frameworkId,
793  const ExecutorID& executorId);
794 
795  // Adds the given operation to the framework and the agent.
796  void addOperation(
797  Framework* framework,
798  Slave* slave,
799  Operation* operation);
800 
801  // Transitions the operation, and updates and recovers resources if
802  // the operation becomes terminal. If `convertResources` is `false`
803  // only the consumed resources of terminal operations are recovered,
804  // but no resources are converted.
805  void updateOperation(
806  Operation* operation,
807  const UpdateOperationStatusMessage& update,
808  bool convertResources = true);
809 
810  // Remove the operation.
811  void removeOperation(Operation* operation);
812 
813  // Send operation update for all operations on the agent.
814  void sendBulkOperationFeedback(
815  Slave* slave,
816  OperationState operationState,
817  const std::string& message);
818 
819  // Attempts to update the allocator by applying the given operation.
820  // If successful, updates the slave's resources, sends a
821  // 'CheckpointResourcesMessage' to the slave with the updated
822  // checkpointed resources, and returns a 'Future' with 'Nothing'.
823  // Otherwise, no action is taken and returns a failed 'Future'.
825  Slave* slave,
826  const Offer::Operation& operation);
827 
828  // Forwards the update to the framework.
829  void forward(
830  const StatusUpdate& update,
831  const process::UPID& acknowledgee,
832  Framework* framework);
833 
834  // Remove an offer after specified timeout
835  void offerTimeout(const OfferID& offerId);
836 
837  // Methods for removing an offer and handling associated resources.
838  // Both recover the resources in the allocator (optionally setting offer
839  // filters) and remove the offer in the master. `rescindOffer` further
840  // notifies the framework about the rescind.
841  //
842  // NOTE: the `filters` field in `rescindOffers` is needed only as
843  // a workaround for the race between the master and the allocator
844  // which happens when the master tries to free up resources to satisfy
845  // operator initiated operations.
846  void rescindOffer(Offer* offer, const Option<Filters>& filters = None());
847  void discardOffer(Offer* offer, const Option<Filters>& filters = None());
848 
849  // Helper for rescindOffer() / discardOffer() / _accept().
850  // Do not use directly.
851  //
852  // The offer must belong to the framework.
853  void _removeOffer(Framework* framework, Offer* offer);
854 
855  // Remove an inverse offer after specified timeout
856  void inverseOfferTimeout(const OfferID& inverseOfferId);
857 
858  // Remove an inverse offer and optionally rescind it as well.
859  void removeInverseOffer(InverseOffer* inverseOffer, bool rescind = false);
860 
861  bool isCompletedFramework(const FrameworkID& frameworkId) const;
862 
863  Framework* getFramework(const FrameworkID& frameworkId) const;
864  Offer* getOffer(const OfferID& offerId) const;
865  InverseOffer* getInverseOffer(const OfferID& inverseOfferId) const;
866 
867  FrameworkID newFrameworkId();
868  OfferID newOfferId();
869  SlaveID newSlaveId();
870 
871 private:
872  // Updates the agent's resources by applying the given operation.
873  // Sends either `ApplyOperationMessage` or
874  // `CheckpointResourcesMessage` (with updated checkpointed
875  // resources) to the agent depending on if the agent has
876  // `RESOURCE_PROVIDER` capability.
877  void _apply(
878  Slave* slave,
879  Framework* framework,
880  const Offer::Operation& operationInfo);
881 
882  void drop(
883  const process::UPID& from,
884  const mesos::scheduler::Call& call,
885  const std::string& message);
886 
887  void drop(
888  Framework* framework,
889  const Offer::Operation& operation,
890  const std::string& message);
891 
892  void drop(
893  Framework* framework,
894  const mesos::scheduler::Call& call,
895  const std::string& message);
896 
897  void drop(
898  Framework* framework,
899  const mesos::scheduler::Call::Suppress& suppress,
900  const std::string& message);
901 
902  void drop(
903  Framework* framework,
904  const mesos::scheduler::Call::Revive& revive,
905  const std::string& message);
906 
907  // Call handlers.
908  void receive(
909  const process::UPID& from,
910  mesos::scheduler::Call&& call);
911 
912  void subscribe(
914  mesos::scheduler::Call::Subscribe&& subscribe);
915 
916  void _subscribe(
918  FrameworkInfo&& frameworkInfo,
919  bool force,
920  google::protobuf::RepeatedPtrField<std::string>&& suppressedRoles,
921  const process::Future<bool>& authorized);
922 
923  void subscribe(
924  const process::UPID& from,
925  mesos::scheduler::Call::Subscribe&& subscribe);
926 
927  void _subscribe(
928  const process::UPID& from,
929  FrameworkInfo&& frameworkInfo,
930  bool force,
931  google::protobuf::RepeatedPtrField<std::string>&& suppressedRoles,
932  const process::Future<bool>& authorized);
933 
934  // Update framework via SchedulerDriver (i.e. no response
935  // code feedback, FrameworkErrorMessage on error).
936  void updateFramework(
937  const process::UPID& from,
938  mesos::scheduler::Call::UpdateFramework&& call);
939 
940  // Update framework via HTTP API (i.e. returns 200 OK).
942  mesos::scheduler::Call::UpdateFramework&& call);
943 
945  mesos::scheduler::Call::UpdateFramework&& call,
946  const process::Future<bool>& authorized);
947 
948  // Subscribes a client to the 'api/vX' endpoint.
949  void subscribe(
952 
953  void teardown(Framework* framework);
954 
955  void accept(
956  Framework* framework,
957  mesos::scheduler::Call::Accept&& accept);
958 
959  void _accept(
960  const FrameworkID& frameworkId,
961  const SlaveID& slaveId,
962  mesos::scheduler::Call::Accept&& accept,
963  const process::Future<
964  std::vector<process::Future<bool>>>& authorizations);
965 
966  void acceptInverseOffers(
967  Framework* framework,
968  const mesos::scheduler::Call::AcceptInverseOffers& accept);
969 
970  void decline(
971  Framework* framework,
972  mesos::scheduler::Call::Decline&& decline);
973 
974  void declineInverseOffers(
975  Framework* framework,
976  const mesos::scheduler::Call::DeclineInverseOffers& decline);
977 
978  // Should be called after each terminal task status update acknowledgement
979  // or terminal operation acknowledgement. If an agent is draining, this
980  // checks if all pending tasks or operations have terminated and then
981  // transitions the DRAINING agent to DRAINED.
982  void checkAndTransitionDrainingAgent(Slave* slave);
983 
984  void revive(
985  Framework* framework,
986  const mesos::scheduler::Call::Revive& revive);
987 
988  void kill(
989  Framework* framework,
990  const mesos::scheduler::Call::Kill& kill);
991 
992  void shutdown(
993  Framework* framework,
994  const mesos::scheduler::Call::Shutdown& shutdown);
995 
996  void acknowledge(
997  Framework* framework,
998  mesos::scheduler::Call::Acknowledge&& acknowledge);
999 
1000  void acknowledgeOperationStatus(
1001  Framework* framework,
1002  mesos::scheduler::Call::AcknowledgeOperationStatus&& acknowledge);
1003 
1004  void reconcile(
1005  Framework* framework,
1006  mesos::scheduler::Call::Reconcile&& reconcile);
1007 
1008  void reconcileOperations(
1009  Framework* framework,
1010  mesos::scheduler::Call::ReconcileOperations&& reconcile);
1011 
1012  void message(
1013  Framework* framework,
1014  mesos::scheduler::Call::Message&& message);
1015 
1016  void request(
1017  Framework* framework,
1018  const mesos::scheduler::Call::Request& request);
1019 
1020  void suppress(
1021  Framework* framework,
1022  const mesos::scheduler::Call::Suppress& suppress);
1023 
1024  bool elected() const
1025  {
1026  return leader.isSome() && leader.get() == info_;
1027  }
1028 
1029  void scheduleRegistryGc();
1030 
1031  void doRegistryGc();
1032 
1033  void _doRegistryGc(
1034  const hashset<SlaveID>& toRemoveUnreachable,
1035  const hashset<SlaveID>& toRemoveGone,
1036  const process::Future<bool>& registrarResult);
1037 
1038  // Returns all roles known to the master, if roles are whitelisted
1039  // this simply returns the whitelist and any ancestors of roles in
1040  // the whitelist. Otherwise, this returns:
1041  //
1042  // (1) Roles with configured weight or quota.
1043  // (2) Roles with reservations.
1044  // (3) Roles with frameworks subscribed or allocated resources.
1045  // (4) Ancestor roles of (1), (2), or (3).
1046  std::vector<std::string> knownRoles() const;
1047 
1055  bool isWhitelistedRole(const std::string& name) const;
1056 
1057  // TODO(bmahler): Store a role tree rather than the existing
1058  // `roles` map which does not track the tree correctly (it does
1059  // not insert ancestor entries, nor does it track roles if there
1060  // are reservations but no frameworks related to them).
1061  struct RoleResourceBreakdown
1062  {
1063  public:
1064  RoleResourceBreakdown(const Master* const master_, const std::string& role_)
1065  : master(master_), role(role_) {}
1066 
1067  ResourceQuantities offered() const;
1068  ResourceQuantities allocated() const;
1069  ResourceQuantities reserved() const;
1070  ResourceQuantities consumedQuota() const;
1071 
1072  private:
1073  const Master* const master;
1074  const std::string role;
1075  };
1076 
1077  // Performs validations of the FrameworkInfo and suppressed roles set
1078  // which do not depend on the current state of this framework.
1079  Option<Error> validateFramework(
1080  const FrameworkInfo& frameworkInfo,
1081  const google::protobuf::RepeatedPtrField<std::string>& suppressedRoles)
1082  const;
1083 
1091  class QuotaHandler
1092  {
1093  public:
1094  explicit QuotaHandler(Master* _master) : master(_master)
1095  {
1096  CHECK_NOTNULL(master);
1097  }
1098 
1099  // Returns a list of set quotas.
1101  const mesos::master::Call& call,
1103  ContentType contentType) const;
1104 
1106  const process::http::Request& request,
1108  principal) const;
1109 
1111  const mesos::master::Call& call,
1113  const;
1114 
1116  const mesos::master::Call& call,
1118  principal) const;
1119 
1123  principal) const;
1124 
1126  const mesos::master::Call& call,
1128  principal) const;
1129 
1133  principal) const;
1134 
1135  private:
1136  // Returns an error if the total quota guarantees overcommits
1137  // the cluster. This is not a quota satisfiability check: it's
1138  // possible that quota is unsatisfiable even if the quota
1139  // does not overcommit the cluster.
1140 
1141  // Returns an error if the total quota guarantees overcommits
1142  // the cluster. This is not a quota satisfiability check: it's
1143  // possible that quota is unsatisfiable even if the quota
1144  // does not overcommit the cluster. Specifically, we verify that
1145  // the following inequality holds:
1146  //
1147  // total cluster capacity >= total quota w/ quota request applied
1148  //
1149  // Note, total cluster capacity accounts resources of all the
1150  // registered agents, including resources from resource providers
1151  // as well as reservations (both static and dynamic ones).
1152  static Option<Error> overcommitCheck(
1153  const std::vector<Resources>& agents,
1154  const hashmap<std::string, Quota>& quotas,
1155  const mesos::quota::QuotaInfo& request);
1156 
1157  // We always want to rescind offers after the capacity heuristic. The
1158  // reason for this is the race between the allocator and the master:
1159  // it can happen that there are not enough free resources at the
1160  // allocator's disposal when it is notified about the quota request,
1161  // but at this point it's too late to rescind.
1162  //
1163  // While rescinding, we adhere to the following rules:
1164  // * Rescind at least as many resources as there are in the quota request.
1165  // * Rescind all offers from an agent in order to make the potential
1166  // offer bigger, which increases the chances that a quota'ed framework
1167  // will be able to use the offer.
1168  // * Rescind offers from at least `numF` agents to make it possible
1169  // (but not guaranteed, due to fair sharing) that each framework in
1170  // the role for which quota is set gets an offer (`numF` is the
1171  // number of frameworks in the quota'ed role). Though this is not
1172  // strictly necessary, we think this will increase the debugability
1173  // and will improve user experience.
1174  //
1175  // TODO(alexr): Consider removing this function once offer management
1176  // (including rescinding) is moved to allocator.
1177  void rescindOffers(const mesos::quota::QuotaInfo& request) const;
1178 
1179  process::Future<bool> authorizeGetQuota(
1181  const std::string& role) const;
1182 
1183  // This auth function is used for legacy `SET_QUOTA` and `REMOVE_QUOTA`
1184  // calls. Remove this function after the associated API calls are
1185  // no longer supported.
1186  process::Future<bool> authorizeUpdateQuota(
1188  const mesos::quota::QuotaInfo& quotaInfo) const;
1189 
1190  process::Future<bool> authorizeUpdateQuotaConfig(
1192  const mesos::quota::QuotaConfig& quotaConfig) const;
1193 
1196  principal) const;
1197 
1199  const google::protobuf::RepeatedPtrField<mesos::quota::QuotaConfig>&
1200  quotaConfigs) const;
1201 
1203  const mesos::quota::QuotaRequest& quotaRequest,
1205  principal) const;
1206 
1208  const mesos::quota::QuotaInfo& quotaInfo,
1209  bool forced) const;
1210 
1212  const std::string& role,
1214  principal) const;
1215 
1217  const std::string& role) const;
1218 
1219  // To perform actions related to quota management, we require access to the
1220  // master data structures. No synchronization primitives are needed here
1221  // since `QuotaHandler`'s functions are invoked in the Master's actor.
1222  Master* master;
1223  };
1224 
1232  class WeightsHandler
1233  {
1234  public:
1235  explicit WeightsHandler(Master* _master) : master(_master)
1236  {
1237  CHECK_NOTNULL(master);
1238  }
1239 
1243  principal) const;
1244 
1246  const mesos::master::Call& call,
1248  ContentType contentType) const;
1249 
1251  const process::http::Request& request,
1253  principal) const;
1254 
1256  const mesos::master::Call& call,
1258  ContentType contentType) const;
1259 
1260  private:
1261  process::Future<bool> authorizeGetWeight(
1263  const WeightInfo& weight) const;
1264 
1265  process::Future<bool> authorizeUpdateWeights(
1267  const std::vector<std::string>& roles) const;
1268 
1270  const std::vector<WeightInfo>& weightInfos,
1271  const std::vector<bool>& roleAuthorizations) const;
1272 
1275  principal) const;
1276 
1279  const google::protobuf::RepeatedPtrField<WeightInfo>& weightInfos)
1280  const;
1281 
1283  const std::vector<WeightInfo>& weightInfos) const;
1284 
1285  // Rescind all outstanding offers if any of the 'weightInfos' roles has
1286  // an active framework.
1287  void rescindOffers(const std::vector<WeightInfo>& weightInfos) const;
1288 
1289  Master* master;
1290  };
1291 
1292 public:
1293  // Inner class used to namespace HTTP handlers that do not change the
1294  // underlying master object.
1295  //
1296  // Endpoints served by this handler are only permitted to depend on
1297  // the request query parameters and the authorization filters to
1298  // make caching of responses possible.
1299  //
1300  // NOTE: Most member functions of this class are not routed directly but
1301  // dispatched from their corresponding handlers in the outer `Http` class.
1302  // This is because deciding whether an incoming request is read-only often
1303  // requires some inspection, e.g. distinguishing between "GET" and "POST"
1304  // requests to the same endpoint.
1306  {
1307  public:
1308  explicit ReadOnlyHandler(const Master* _master) : master(_master) {}
1309 
1310  // /frameworks
1311  process::http::Response frameworks(
1312  ContentType outputContentType,
1313  const hashmap<std::string, std::string>& queryParameters,
1314  const process::Owned<ObjectApprovers>& approvers) const;
1315 
1316  // /roles
1318  ContentType outputContentType,
1319  const hashmap<std::string, std::string>& queryParameters,
1320  const process::Owned<ObjectApprovers>& approvers) const;
1321 
1322  // /slaves
1323  process::http::Response slaves(
1324  ContentType outputContentType,
1325  const hashmap<std::string, std::string>& queryParameters,
1326  const process::Owned<ObjectApprovers>& approvers) const;
1327 
1328  // /state
1330  ContentType outputContentType,
1331  const hashmap<std::string, std::string>& queryParameters,
1332  const process::Owned<ObjectApprovers>& approvers) const;
1333 
1334  // /state-summary
1335  process::http::Response stateSummary(
1336  ContentType outputContentType,
1337  const hashmap<std::string, std::string>& queryParameters,
1338  const process::Owned<ObjectApprovers>& approvers) const;
1339 
1340  // /tasks
1342  ContentType outputContentType,
1343  const hashmap<std::string, std::string>& queryParameters,
1344  const process::Owned<ObjectApprovers>& approvers) const;
1345 
1346  // master::Call::GET_STATE
1347  process::http::Response getState(
1348  ContentType outputContentType,
1349  const hashmap<std::string, std::string>& queryParameters,
1350  const process::Owned<ObjectApprovers>& approvers) const;
1351 
1352  // master::Call::GET_AGENTS
1353  process::http::Response getAgents(
1354  ContentType outputContentType,
1355  const hashmap<std::string, std::string>& queryParameters,
1356  const process::Owned<ObjectApprovers>& approvers) const;
1357 
1358  // master::Call::GET_FRAMEWORKS
1359  process::http::Response getFrameworks(
1360  ContentType outputContentType,
1361  const hashmap<std::string, std::string>& queryParameters,
1362  const process::Owned<ObjectApprovers>& approvers) const;
1363 
1364  // master::Call::GET_EXECUTORS
1365  process::http::Response getExecutors(
1366  ContentType outputContentType,
1367  const hashmap<std::string, std::string>& queryParameters,
1368  const process::Owned<ObjectApprovers>& approvers) const;
1369 
1370  // master::Call::GET_OPERATIONS
1371  process::http::Response getOperations(
1372  ContentType outputContentType,
1373  const hashmap<std::string, std::string>& queryParameters,
1374  const process::Owned<ObjectApprovers>& approvers) const;
1375 
1376  // master::Call::GET_TASKS
1377  process::http::Response getTasks(
1378  ContentType outputContentType,
1379  const hashmap<std::string, std::string>& queryParameters,
1380  const process::Owned<ObjectApprovers>& approvers) const;
1381 
1382  // master::Call::GET_ROLES
1384  ContentType outputContentType,
1385  const hashmap<std::string, std::string>& queryParameters,
1386  const process::Owned<ObjectApprovers>& approvers) const;
1387 
1388  // TODO(bmahler): These could just live in the .cpp file,
1389  // however they are shared with SUBSCRIBE which currently
1390  // is not implemented as a read only handler here. Make these
1391  // private or only in the .cpp file once SUBSCRIBE is moved
1392  // into readonly_handler.cpp.
1393  std::string serializeGetState(
1394  const process::Owned<ObjectApprovers>& approvers) const;
1395  std::string serializeGetAgents(
1396  const process::Owned<ObjectApprovers>& approvers) const;
1397  std::string serializeGetFrameworks(
1398  const process::Owned<ObjectApprovers>& approvers) const;
1399  std::string serializeGetExecutors(
1400  const process::Owned<ObjectApprovers>& approvers) const;
1401  std::string serializeGetOperations(
1402  const process::Owned<ObjectApprovers>& approvers) const;
1403  std::string serializeGetTasks(
1404  const process::Owned<ObjectApprovers>& approvers) const;
1405  std::string serializeGetRoles(
1406  const process::Owned<ObjectApprovers>& approvers) const;
1407 
1408  std::function<void(JSON::ObjectWriter*)> jsonifyGetState(
1409  const process::Owned<ObjectApprovers>& approvers) const;
1410  std::function<void(JSON::ObjectWriter*)> jsonifyGetAgents(
1411  const process::Owned<ObjectApprovers>& approvers) const;
1412  std::function<void(JSON::ObjectWriter*)> jsonifyGetFrameworks(
1413  const process::Owned<ObjectApprovers>& approvers) const;
1414  std::function<void(JSON::ObjectWriter*)> jsonifyGetExecutors(
1415  const process::Owned<ObjectApprovers>& approvers) const;
1416  std::function<void(JSON::ObjectWriter*)> jsonifyGetOperations(
1417  const process::Owned<ObjectApprovers>& approvers) const;
1418  std::function<void(JSON::ObjectWriter*)> jsonifyGetTasks(
1419  const process::Owned<ObjectApprovers>& approvers) const;
1420  std::function<void(JSON::ObjectWriter*)> jsonifyGetRoles(
1421  const process::Owned<ObjectApprovers>& approvers) const;
1422 
1423  private:
1424  const Master* master;
1425  };
1426 
1427 private:
1428  // Inner class used to namespace HTTP route handlers (see
1429  // master/http.cpp for implementations).
1430  class Http
1431  {
1432  public:
1433  explicit Http(Master* _master) : master(_master),
1434  readonlyHandler(_master),
1435  quotaHandler(_master),
1436  weightsHandler(_master) {}
1437 
1438  // /api/v1
1440  const process::http::Request& request,
1442  principal) const;
1443 
1444  // /api/v1/scheduler
1446  const process::http::Request& request,
1448  principal) const;
1449 
1450  // /master/create-volumes
1452  const process::http::Request& request,
1454  principal) const;
1455 
1456  // /master/destroy-volumes
1458  const process::http::Request& request,
1460  principal) const;
1461 
1462  // /master/flags
1464  const process::http::Request& request,
1466  principal) const;
1467 
1468  // /master/frameworks
1469  //
1470  // NOTE: Requests to this endpoint are batched.
1472  const process::http::Request& request,
1474  principal) const;
1475 
1476  // /master/health
1478  const process::http::Request& request) const;
1479 
1480  // /master/redirect
1482  const process::http::Request& request) const;
1483 
1484  // /master/reserve
1486  const process::http::Request& request,
1488  principal) const;
1489 
1490  // /master/roles
1491  //
1492  // NOTE: Requests to this endpoint are batched.
1494  const process::http::Request& request,
1496  principal) const;
1497 
1498  // /master/teardown
1500  const process::http::Request& request,
1502  principal) const;
1503 
1504  // /master/slaves
1505  //
1506  // NOTE: Requests to this endpoint are batched.
1508  const process::http::Request& request,
1510  principal) const;
1511 
1512  // /master/state
1513  //
1514  // NOTE: Requests to this endpoint are batched.
1516  const process::http::Request& request,
1518  principal) const;
1519 
1520  // /master/state-summary
1521  //
1522  // NOTE: Requests to this endpoint are batched.
1524  const process::http::Request& request,
1526  principal) const;
1527 
1528  // /master/tasks
1529  //
1530  // NOTE: Requests to this endpoint are batched.
1532  const process::http::Request& request,
1534  principal) const;
1535 
1536  // /master/maintenance/schedule
1537  process::Future<process::http::Response> maintenanceSchedule(
1538  const process::http::Request& request,
1540  principal) const;
1541 
1542  // /master/maintenance/status
1543  process::Future<process::http::Response> maintenanceStatus(
1544  const process::http::Request& request,
1546  principal) const;
1547 
1548  // /master/machine/down
1550  const process::http::Request& request,
1552  principal) const;
1553 
1554  // /master/machine/up
1556  const process::http::Request& request,
1558  principal) const;
1559 
1560  // /master/unreserve
1562  const process::http::Request& request,
1564  principal) const;
1565 
1566  // /master/weights
1568  const process::http::Request& request,
1570  principal) const;
1571 
1572  // /master/quota (DEPRECATED).
1574  const process::http::Request& request,
1576  principal) const;
1577 
1578  static std::string API_HELP();
1579  static std::string SCHEDULER_HELP();
1580  static std::string FLAGS_HELP();
1581  static std::string FRAMEWORKS_HELP();
1582  static std::string HEALTH_HELP();
1583  static std::string REDIRECT_HELP();
1584  static std::string ROLES_HELP();
1585  static std::string TEARDOWN_HELP();
1586  static std::string SLAVES_HELP();
1587  static std::string STATE_HELP();
1588  static std::string STATESUMMARY_HELP();
1589  static std::string TASKS_HELP();
1590  static std::string MAINTENANCE_SCHEDULE_HELP();
1591  static std::string MAINTENANCE_STATUS_HELP();
1592  static std::string MACHINE_DOWN_HELP();
1593  static std::string MACHINE_UP_HELP();
1594  static std::string CREATE_VOLUMES_HELP();
1595  static std::string DESTROY_VOLUMES_HELP();
1596  static std::string RESERVE_HELP();
1597  static std::string UNRESERVE_HELP();
1598  static std::string QUOTA_HELP();
1599  static std::string WEIGHTS_HELP();
1600 
1601  private:
1602  JSON::Object __flags() const;
1603 
1604  class FlagsError; // Forward declaration.
1605 
1608  principal) const;
1609 
1611  const size_t limit,
1612  const size_t offset,
1613  const std::string& order,
1615  principal) const;
1616 
1618  const FrameworkID& id,
1620  principal) const;
1621 
1623  const FrameworkID& id) const;
1624 
1625  process::Future<process::http::Response> _updateMaintenanceSchedule(
1626  const mesos::maintenance::Schedule& schedule,
1628  principal) const;
1629 
1630  process::Future<process::http::Response> __updateMaintenanceSchedule(
1631  const mesos::maintenance::Schedule& schedule,
1632  const process::Owned<ObjectApprovers>& approvers) const;
1633 
1634  process::Future<process::http::Response> ___updateMaintenanceSchedule(
1635  const mesos::maintenance::Schedule& schedule,
1636  bool applied) const;
1637 
1638  mesos::maintenance::Schedule _getMaintenanceSchedule(
1639  const process::Owned<ObjectApprovers>& approvers) const;
1640 
1642  const process::Owned<ObjectApprovers>& approvers) const;
1643 
1644  process::Future<process::http::Response> _startMaintenance(
1645  const google::protobuf::RepeatedPtrField<MachineID>& machineIds,
1646  const process::Owned<ObjectApprovers>& approvers) const;
1647 
1649  const google::protobuf::RepeatedPtrField<MachineID>& machineIds,
1650  const process::Owned<ObjectApprovers>& approvers) const;
1651 
1653  const SlaveID& slaveId,
1654  const Option<DurationInfo>& maxGracePeriod,
1655  const bool markGone,
1656  const process::Owned<ObjectApprovers>& approvers) const;
1657 
1659  const SlaveID& slaveId,
1660  const process::Owned<ObjectApprovers>& approvers) const;
1661 
1663  const SlaveID& slaveId,
1664  const process::Owned<ObjectApprovers>& approvers) const;
1665 
1667  const SlaveID& slaveId,
1668  const google::protobuf::RepeatedPtrField<Resource>& source,
1669  const google::protobuf::RepeatedPtrField<Resource>& resources,
1671  principal) const;
1672 
1674  const SlaveID& slaveId,
1675  const google::protobuf::RepeatedPtrField<Resource>& resources,
1677  principal) const;
1678 
1680  const SlaveID& slaveId,
1681  const google::protobuf::RepeatedPtrField<Resource>& volumes,
1683  principal) const;
1684 
1686  const SlaveID& slaveId,
1687  const google::protobuf::RepeatedPtrField<Resource>& volumes,
1689  principal) const;
1690 
1708  const SlaveID& slaveId,
1709  const Offer::Operation& operation) const;
1710 
1711  // Master API handlers.
1712 
1714  const mesos::master::Call& call,
1716  ContentType contentType) const;
1717  mesos::master::Response::GetAgents _getAgents(
1718  const process::Owned<ObjectApprovers>& approvers) const;
1719 
1721  const mesos::master::Call& call,
1723  ContentType contentType) const;
1724 
1726  const mesos::master::Call& call,
1728  ContentType contentType) const;
1729 
1731  const mesos::master::Call& call,
1733  ContentType contentType) const;
1734 
1736  const mesos::master::Call& call,
1738  ContentType contentType) const;
1739 
1741  const mesos::master::Call& call,
1743  ContentType contentType) const;
1744 
1746  const mesos::master::Call& call,
1748  ContentType contentType) const;
1749 
1751  const mesos::master::Call& call,
1753  ContentType contentType) const;
1754 
1756  const mesos::master::Call& call,
1758  ContentType contentType) const;
1759 
1761  const mesos::master::Call& call,
1763  ContentType contentType) const;
1764 
1765  process::Future<process::http::Response> updateMaintenanceSchedule(
1766  const mesos::master::Call& call,
1768  ContentType contentType) const;
1769 
1770  process::Future<process::http::Response> getMaintenanceSchedule(
1771  const mesos::master::Call& call,
1773  ContentType contentType) const;
1774 
1775  process::Future<process::http::Response> getMaintenanceStatus(
1776  const mesos::master::Call& call,
1778  ContentType contentType) const;
1779 
1781  const mesos::master::Call& call,
1783  ContentType contentType) const;
1784 
1786  const mesos::master::Call& call,
1788  ContentType contentType) const;
1789 
1791  const mesos::master::Call& call,
1793  ContentType contentType) const;
1794 
1796  const mesos::master::Call& call,
1798  ContentType contentType) const;
1799 
1801  const mesos::master::Call& call,
1803  ContentType contentType) const;
1804 
1806  const mesos::master::Call& call,
1808  ContentType contentType) const;
1809 
1811  const mesos::master::Call& call,
1813  ContentType contentType) const;
1814  mesos::master::Response::GetTasks _getTasks(
1815  const process::Owned<ObjectApprovers>& approvers) const;
1816 
1818  const mesos::master::Call& call,
1820  ContentType contentType) const;
1821 
1823  const mesos::master::Call& call,
1825  ContentType contentType) const;
1826 
1828  const mesos::master::Call& call,
1830  ContentType contentType) const;
1831 
1833  const mesos::master::Call& call,
1835  ContentType contentType) const;
1836 
1838  const mesos::master::Call& call,
1840  ContentType contentType) const;
1841 
1842  process::Future<process::http::Response> unreserveResources(
1843  const mesos::master::Call& call,
1845  ContentType contentType) const;
1846 
1848  const mesos::master::Call& call,
1850  ContentType contentType) const;
1851  mesos::master::Response::GetFrameworks _getFrameworks(
1852  const process::Owned<ObjectApprovers>& approvers) const;
1853 
1855  const mesos::master::Call& call,
1857  ContentType contentType) const;
1858  mesos::master::Response::GetExecutors _getExecutors(
1859  const process::Owned<ObjectApprovers>& approvers) const;
1860 
1862  const mesos::master::Call& call,
1864  ContentType contentType) const;
1865  mesos::master::Response::GetState _getState(
1866  const process::Owned<ObjectApprovers>& approvers) const;
1867 
1868  static std::function<void(JSON::ObjectWriter*)> jsonifySubscribe(
1869  const Master* master,
1870  const process::Owned<ObjectApprovers>& approvers);
1871  std::string serializeSubscribe(
1872  const process::Owned<ObjectApprovers>& approvers) const;
1874  const mesos::master::Call& call,
1876  ContentType contentType) const;
1877 
1879  const mesos::master::Call& call,
1881  ContentType contentType) const;
1882 
1884  const mesos::master::Call& call,
1886  ContentType contentType) const;
1887 
1889  const mesos::master::Call& call,
1891  ContentType contentType) const;
1892 
1894  const SlaveID& slaveId) const;
1895 
1896  process::Future<process::http::Response> reconcileOperations(
1897  Framework* framework,
1898  const mesos::scheduler::Call::ReconcileOperations& call,
1899  ContentType contentType) const;
1900 
1901  Master* master;
1902 
1903  ReadOnlyHandler readonlyHandler;
1904 
1905  // NOTE: The quota specific pieces of the Operator API are factored
1906  // out into this separate class.
1907  QuotaHandler quotaHandler;
1908 
1909  // NOTE: The weights specific pieces of the Operator API are factored
1910  // out into this separate class.
1911  WeightsHandler weightsHandler;
1912 
1913  // Since the Master actor is one of the most loaded in a typical Mesos
1914  // installation, we take some extra care to keep the backlog small.
1915  // In particular, all read-only requests are batched and executed in
1916  // parallel, instead of going through the master queue separately.
1917 
1918  typedef process::http::Response
1919  (Master::ReadOnlyHandler::*ReadOnlyRequestHandler)(
1920  ContentType,
1922  const process::Owned<ObjectApprovers>&) const;
1923 
1924  process::Future<process::http::Response> deferBatchedRequest(
1925  ReadOnlyRequestHandler handler,
1927  ContentType outputContentType,
1928  const hashmap<std::string, std::string>& queryParameters,
1929  const process::Owned<ObjectApprovers>& approvers) const;
1930 
1931  void processRequestsBatch() const;
1932 
1933  struct BatchedRequest
1934  {
1935  ReadOnlyRequestHandler handler;
1936  ContentType outputContentType;
1937  hashmap<std::string, std::string> queryParameters;
1940 
1941  // NOTE: The returned response should be either of type
1942  // `BODY` or `PATH`, since `PIPE`-type responses would
1943  // break the deduplication mechanism.
1945  };
1946 
1947  mutable std::vector<BatchedRequest> batchedRequests;
1948  };
1949 
1950  Master(const Master&); // No copying.
1951  Master& operator=(const Master&); // No assigning.
1952 
1953  friend struct Framework;
1954  friend struct FrameworkMetrics;
1955  friend struct Metrics;
1956  friend struct Role;
1957  friend struct Slave;
1958  friend struct SlavesWriter;
1959  friend struct Subscriber;
1960 
1961  // NOTE: Since 'getOffer', 'getInverseOffer' and 'slaves' are
1962  // protected, we need to make the following functions friends.
1963  friend Offer* validation::offer::getOffer(
1964  Master* master, const OfferID& offerId);
1965 
1966  friend InverseOffer* validation::offer::getInverseOffer(
1967  Master* master, const OfferID& offerId);
1968 
1970  Master* master, const SlaveID& slaveId);
1971 
1972  const Flags flags;
1973 
1974  Http http;
1975 
1976  Option<MasterInfo> leader; // Current leading master.
1977 
1978  mesos::allocator::Allocator* allocator;
1979  WhitelistWatcher* whitelistWatcher;
1980  Registrar* registrar;
1981  Files* files;
1982 
1985 
1986  const Option<Authorizer*> authorizer;
1987 
1988  MasterInfo info_;
1989 
1990  // Holds some info which affects how a machine behaves, as well as state that
1991  // represent the master's view of this machine. See the `MachineInfo` protobuf
1992  // and `Machine` struct for more information.
1994 
1995  struct Maintenance
1996  {
1997  // Holds the maintenance schedule, as given by the operator.
1998  std::list<mesos::maintenance::Schedule> schedules;
1999  } maintenance;
2000 
2001  // Indicates when recovery is complete. Recovery begins once the
2002  // master is elected as a leader.
2004 
2005  // If this is the leading master, we periodically check whether we
2006  // should GC some information from the registry.
2007  Option<process::Timer> registryGcTimer;
2008 
2009  struct Slaves
2010  {
2011  Slaves() : removed(MAX_REMOVED_SLAVES) {}
2012 
2013  // Imposes a time limit for slaves that we recover from the
2014  // registry to reregister with the master.
2015  Option<process::Timer> recoveredTimer;
2016 
2017  // Slaves that have been recovered from the registrar after master
2018  // failover. Slaves are removed from this collection when they
2019  // either reregister with the master or are marked unreachable
2020  // because they do not reregister before `recoveredTimer` fires.
2021  // We must not answer questions related to these slaves (e.g.,
2022  // during task reconciliation) until we determine their fate
2023  // because their are in this transitioning state.
2024  hashmap<SlaveID, SlaveInfo> recovered;
2025 
2026  // Agents that are in the process of (re-)registering. They are
2027  // maintained here while the (re-)registration is in progress and
2028  // possibly pending in the authorizer or the registrar in order
2029  // to help deduplicate (re-)registration requests.
2030  hashset<process::UPID> registering;
2031  hashset<SlaveID> reregistering;
2032 
2033  // Registered slaves are indexed by SlaveID and UPID. Note that
2034  // iteration is supported but is exposed as iteration over a
2035  // hashmap<SlaveID, Slave*> since it is tedious to convert
2036  // the map's key/value iterator into a value iterator.
2037  //
2038  // TODO(bmahler): Consider pulling in boost's multi_index,
2039  // or creating a simpler indexing abstraction in stout.
2040  struct
2041  {
2042  bool contains(const SlaveID& slaveId) const
2043  {
2044  return ids.contains(slaveId);
2045  }
2046 
2047  bool contains(const process::UPID& pid) const
2048  {
2049  return pids.contains(pid);
2050  }
2051 
2052  Slave* get(const SlaveID& slaveId) const
2053  {
2054  return ids.get(slaveId).getOrElse(nullptr);
2055  }
2056 
2057  Slave* get(const process::UPID& pid) const
2058  {
2059  return pids.get(pid).getOrElse(nullptr);
2060  }
2061 
2062  void put(Slave* slave)
2063  {
2064  CHECK_NOTNULL(slave);
2065  ids[slave->id] = slave;
2066  pids[slave->pid] = slave;
2067  }
2068 
2069  void remove(Slave* slave)
2070  {
2071  CHECK_NOTNULL(slave);
2072  ids.erase(slave->id);
2073  pids.erase(slave->pid);
2074  }
2075 
2076  void clear()
2077  {
2078  ids.clear();
2079  pids.clear();
2080  }
2081 
2082  size_t size() const { return ids.size(); }
2083 
2084  typedef hashmap<SlaveID, Slave*>::iterator iterator;
2085  typedef hashmap<SlaveID, Slave*>::const_iterator const_iterator;
2086 
2087  iterator begin() { return ids.begin(); }
2088  iterator end() { return ids.end(); }
2089 
2090  const_iterator begin() const { return ids.begin(); }
2091  const_iterator end() const { return ids.end(); }
2092 
2093  private:
2096  } registered;
2097 
2098  // Slaves that are in the process of being removed from the
2099  // registrar.
2100  hashset<SlaveID> removing;
2101 
2102  // Slaves that are in the process of being marked unreachable.
2103  hashset<SlaveID> markingUnreachable;
2104 
2105  // Slaves that are in the process of being marked gone.
2106  hashset<SlaveID> markingGone;
2107 
2108  // Agents which have been marked for draining, including recovered,
2109  // admitted, and unreachable agents. All draining agents will also
2110  // be deactivated. If an agent in this set reregisters, the master
2111  // will send it a `DrainSlaveMessage`.
2112  //
2113  // These values are checkpointed to the registry.
2114  hashmap<SlaveID, DrainInfo> draining;
2115 
2116  // Agents which have been deactivated, including recovered, admitted,
2117  // and unreachable agents. Agents in this set will not have resource
2118  // offers generated and will thus be unable to launch new operations,
2119  // but existing operations will be unaffected.
2120  //
2121  // These values are checkpointed to the registry.
2122  hashset<SlaveID> deactivated;
2123 
2124  // This collection includes agents that have gracefully shutdown,
2125  // as well as those that have been marked unreachable or gone. We
2126  // keep a cache here to prevent this from growing in an unbounded
2127  // manner.
2128  //
2129  // TODO(bmahler): Ideally we could use a cache with set semantics.
2130  //
2131  // TODO(neilc): Consider storing all agent IDs that have been
2132  // marked unreachable by this master.
2134 
2135  // Slaves that have been marked unreachable. We recover this from
2136  // the registry, so it includes slaves marked as unreachable by
2137  // other instances of the master. Note that we use a LinkedHashMap
2138  // to ensure the order of elements here matches the order in the
2139  // registry's unreachable list, which matches the order in which
2140  // agents are marked unreachable. This list is garbage collected;
2141  // GC behavior is governed by the `registry_gc_interval`,
2142  // `registry_max_agent_age`, and `registry_max_agent_count` flags.
2144 
2145  // This helps us look up all unreachable tasks on an agent so we can remove
2146  // them from their primary storage `framework.unreachableTasks` when an
2147  // agent reregisters. This map is bounded by the same GC behavior as
2148  // `unreachable`. When the agent is GC'd from unreachable it's also
2149  // erased from `unreachableTasks`.
2151  unreachableTasks;
2152 
2153  // Slaves that have been marked gone. We recover this from the
2154  // registry, so it includes slaves marked as gone by other instances
2155  // of the master. Note that we use a LinkedHashMap to ensure the order
2156  // of elements here matches the order in the registry's gone list, which
2157  // matches the order in which agents are marked gone.
2159 
2160  // This rate limiter is used to limit the removal of slaves failing
2161  // health checks.
2162  // NOTE: Using a 'shared_ptr' here is OK because 'RateLimiter' is
2163  // a wrapper around libprocess process which is thread safe.
2165  } slaves;
2166 
2167  struct Frameworks
2168  {
2169  Frameworks(const Flags& masterFlags)
2170  : completed(masterFlags.max_completed_frameworks) {}
2171 
2173 
2175 
2176  // Principals of frameworks keyed by PID.
2177  // NOTE: Multiple PIDs can map to the same principal. The
2178  // principal is None when the framework doesn't specify it.
2179  // The differences between this map and 'authenticated' are:
2180  // 1) This map only includes *registered* frameworks. The mapping
2181  // is added when a framework (re-)registers.
2182  // 2) This map includes unauthenticated frameworks (when Master
2183  // allows them) if they have principals specified in
2184  // FrameworkInfo.
2186 
2187  // BoundedRateLimiters keyed by the framework principal.
2188  // Like Metrics::Frameworks, all frameworks of the same principal
2189  // are throttled together at a common rate limit.
2191 
2192  // The default limiter is for frameworks not specified in
2193  // 'flags.rate_limits'.
2195  } frameworks;
2196 
2197  struct Subscribers
2198  {
2199  Subscribers(Master* _master, size_t maxSubscribers)
2200  : master(_master),
2201  subscribed(maxSubscribers) {};
2202 
2203  // Represents a client subscribed to the 'api/vX' endpoint.
2204  //
2205  // TODO(anand): Add support for filtering. Some subscribers
2206  // might only be interested in a subset of events.
2207  struct Subscriber
2208  {
2212  : http(_http),
2213  heartbeater(
2214  "subscriber " + stringify(http.streamId),
2215  []() {
2216  mesos::master::Event event;
2217  event.set_type(mesos::master::Event::HEARTBEAT);
2218  return event;
2219  }(),
2220  http,
2223  principal(_principal) {}
2224 
2225  // Not copyable, not assignable.
2226  Subscriber(const Subscriber&) = delete;
2227  Subscriber& operator=(const Subscriber&) = delete;
2228 
2229  // Creates object approvers. The futures returned by this method will be
2230  // completed in the calling order.
2232  const Option<Authorizer*>& authorizer,
2233  std::initializer_list<authorization::Action> actions);
2234 
2235  // TODO(greggomann): Refactor this function into multiple event-specific
2236  // overloads. See MESOS-8475.
2237  void send(
2239  const process::Owned<ObjectApprovers>& approvers,
2240  const process::Shared<FrameworkInfo>& frameworkInfo,
2241  const process::Shared<Task>& task);
2242 
2244  {
2245  // TODO(anand): Refactor `HttpConnection` to being a RAII class instead.
2246  // It is possible that a caller might accidentally invoke `close()`
2247  // after passing ownership to the `Subscriber` object. See MESOS-5843
2248  // for more details.
2249  http.close();
2250  }
2251 
2255 
2256  // We maintain a sequence to coordinate the creation of object approvers
2257  // in order to sequentialize all events to the subscriber.
2259  };
2260 
2261  // Sends the event to all subscribers connected to the 'api/vX' endpoint.
2262  void send(
2263  mesos::master::Event&& event,
2264  const Option<FrameworkInfo>& frameworkInfo = None(),
2265  const Option<Task>& task = None());
2266 
2267  Master* master;
2268 
2269  // Active subscribers to the 'api/vX' endpoint keyed by the stream
2270  // identifier.
2272  };
2273 
2274  Subscribers subscribers;
2275 
2276  hashmap<OfferID, Offer*> offers;
2278 
2279  hashmap<OfferID, InverseOffer*> inverseOffers;
2280  hashmap<OfferID, process::Timer> inverseOfferTimers;
2281 
2282  // We track information about roles that we're aware of in the system.
2283  // Specifically, we keep track of the roles when a framework subscribes to
2284  // the role, and/or when there are resources allocated to the role
2285  // (e.g. some tasks and/or executors are consuming resources under the role).
2287 
2288  // Configured role whitelist if using the (deprecated) "explicit
2289  // roles" feature. If this is `None`, any role is allowed.
2290  Option<hashset<std::string>> roleWhitelist;
2291 
2292  // Configured weight for each role, if any. If a role does not
2293  // appear here, it has the default weight of 1.
2295 
2296  // Configured quota for each role, if any. We store quotas by role
2297  // because we set them at the role level.
2299 
2300  // Authenticator names as supplied via flags.
2301  std::vector<std::string> authenticatorNames;
2302 
2303  Option<Authenticator*> authenticator;
2304 
2305  // Frameworks/slaves that are currently in the process of authentication.
2306  // 'authenticating' future is completed when authenticator
2307  // completes authentication.
2308  // The future is removed from the map when master completes authentication.
2310 
2311  // Principals of authenticated frameworks/slaves keyed by PID.
2313 
2314  int64_t nextFrameworkId; // Used to give each framework a unique ID.
2315  int64_t nextOfferId; // Used to give each slot offer a unique ID.
2316  int64_t nextSlaveId; // Used to give each slave a unique ID.
2317 
2318  // NOTE: It is safe to use a 'shared_ptr' because 'Metrics' is
2319  // thread safe.
2320  // TODO(dhamon): This does not need to be a shared_ptr. Metrics contains
2321  // copyable metric types only.
2322  std::shared_ptr<Metrics> metrics;
2323 
2324  // PullGauge handlers.
2325  double _uptime_secs()
2326  {
2327  return (process::Clock::now() - startTime).secs();
2328  }
2329 
2330  double _elected()
2331  {
2332  return elected() ? 1 : 0;
2333  }
2334 
2335  double _slaves_connected();
2336  double _slaves_disconnected();
2337  double _slaves_active();
2338  double _slaves_inactive();
2339  double _slaves_unreachable();
2340 
2341  // TODO(bevers): Remove these and make the above functions
2342  // const instead after MESOS-4995 is resolved.
2343  double _const_slaves_connected() const;
2344  double _const_slaves_disconnected() const;
2345  double _const_slaves_active() const;
2346  double _const_slaves_inactive() const;
2347  double _const_slaves_unreachable() const;
2348 
2349  double _frameworks_connected();
2350  double _frameworks_disconnected();
2351  double _frameworks_active();
2352  double _frameworks_inactive();
2353 
2354  double _outstanding_offers()
2355  {
2356  return static_cast<double>(offers.size());
2357  }
2358 
2359  double _event_queue_messages()
2360  {
2361  return static_cast<double>(eventCount<process::MessageEvent>());
2362  }
2363 
2364  double _event_queue_dispatches()
2365  {
2366  return static_cast<double>(eventCount<process::DispatchEvent>());
2367  }
2368 
2369  double _event_queue_http_requests()
2370  {
2371  return static_cast<double>(eventCount<process::HttpEvent>());
2372  }
2373 
2374  double _tasks_staging();
2375  double _tasks_starting();
2376  double _tasks_running();
2377  double _tasks_unreachable();
2378  double _tasks_killing();
2379 
2380  double _resources_total(const std::string& name);
2381  double _resources_used(const std::string& name);
2382  double _resources_percent(const std::string& name);
2383 
2384  double _resources_revocable_total(const std::string& name);
2385  double _resources_revocable_used(const std::string& name);
2386  double _resources_revocable_percent(const std::string& name);
2387 
2388  process::Time startTime; // Start time used to calculate uptime.
2389 
2390  Option<process::Time> electedTime; // Time when this master is elected.
2391 };
2392 
2393 
2394 inline std::ostream& operator<<(
2395  std::ostream& stream,
2396  const Framework& framework);
2397 
2398 
2399 // TODO(bmahler): Keeping the task and executor information in sync
2400 // across the Slave and Framework structs is error prone!
2402 {
2403  enum State
2404  {
2405  // Framework has never connected to this master. This implies the
2406  // master failed over and the framework has not yet reregistered,
2407  // but some framework state has been recovered from reregistering
2408  // agents that are running tasks for the framework.
2410 
2411  // Framework was previously connected to this master. A framework
2412  // becomes disconnected when there is a socket error.
2414 
2415  // The framework is connected but not active.
2417 
2418  // Framework is connected and eligible to receive offers. No
2419  // offers will be made to frameworks that are not active.
2420  ACTIVE
2421  };
2422 
2423  Framework(Master* const master,
2424  const Flags& masterFlags,
2425  const FrameworkInfo& info,
2426  const process::UPID& _pid,
2428 
2429  Framework(Master* const master,
2430  const Flags& masterFlags,
2431  const FrameworkInfo& info,
2434 
2435  Framework(Master* const master,
2436  const Flags& masterFlags,
2437  const FrameworkInfo& info);
2438 
2439  ~Framework();
2440 
2441  Task* getTask(const TaskID& taskId);
2442 
2443  void addTask(Task* task);
2444 
2445  // Update framework to recover the resources that were previously
2446  // being used by `task`.
2447  //
2448  // TODO(bmahler): This is a hack for performance. We need to
2449  // maintain resource counters because computing task resources
2450  // functionally for all tasks is expensive, for now.
2451  void recoverResources(Task* task);
2452 
2453  // Sends a message to the connected framework.
2454  template <typename Message>
2455  void send(const Message& message);
2456 
2457  void addCompletedTask(Task&& task);
2458 
2459  void addUnreachableTask(const Task& task);
2460 
2461  // Removes the task. `unreachable` indicates whether the task is removed due
2462  // to being unreachable. Note that we cannot rely on the task state because
2463  // it may not reflect unreachability due to being set to TASK_LOST for
2464  // backwards compatibility.
2465  void removeTask(Task* task, bool unreachable);
2466 
2467  void addOffer(Offer* offer);
2468 
2469  void removeOffer(Offer* offer);
2470 
2471  void addInverseOffer(InverseOffer* inverseOffer);
2472 
2473  void removeInverseOffer(InverseOffer* inverseOffer);
2474 
2475  bool hasExecutor(const SlaveID& slaveId,
2476  const ExecutorID& executorId);
2477 
2478  void addExecutor(const SlaveID& slaveId,
2479  const ExecutorInfo& executorInfo);
2480 
2481  void removeExecutor(const SlaveID& slaveId,
2482  const ExecutorID& executorId);
2483 
2484  void addOperation(Operation* operation);
2485 
2486  Option<Operation*> getOperation(const OperationID& id);
2487 
2488  void recoverResources(Operation* operation);
2489 
2490  void removeOperation(Operation* operation);
2491 
2492  const FrameworkID id() const;
2493 
2494  // Update fields in 'info' using those in 'newInfo'. Currently this
2495  // only updates `role`/`roles`, 'name', 'failover_timeout', 'hostname',
2496  // 'webui_url', 'capabilities', and 'labels'.
2497  void update(const FrameworkInfo& newInfo);
2498 
2499  void updateConnection(const process::UPID& newPid);
2500 
2501  void updateConnection(
2503 
2504  // Closes the HTTP connection and stops the heartbeat.
2505  //
2506  // TODO(vinod): Currently `state` variable is set separately
2507  // from this method. We need to make sure these are in sync.
2508  void closeHttpConnection();
2509 
2510  void heartbeat();
2511 
2512  bool active() const;
2513  bool connected() const;
2514  bool recovered() const;
2515 
2516  bool isTrackedUnderRole(const std::string& role) const;
2517  void trackUnderRole(const std::string& role);
2518  void untrackUnderRole(const std::string& role);
2519 
2520  void setFrameworkState(const State& _state);
2521 
2522  Master* const master;
2523 
2524  FrameworkInfo info;
2525 
2526  std::set<std::string> roles;
2527 
2529 
2530  // Frameworks can either be connected via HTTP or by message passing
2531  // (scheduler driver). At most one of `http` and `pid` will be set
2532  // according to the last connection made by the framework; neither
2533  // field will be set if the framework is in state `RECOVERED`.
2536 
2538 
2542 
2543  // Tasks that have not yet been launched because they are currently
2544  // being authorized.
2546 
2547  // TODO(bmahler): Make this private to enforce that `addTask()` and
2548  // `removeTask()` are used, and provide a const view into the tasks.
2550 
2551  // Tasks launched by this framework that have reached a terminal
2552  // state and have had all their updates acknowledged. We only keep a
2553  // fixed-size cache to avoid consuming too much memory. We use
2554  // circular_buffer rather than BoundedHashMap because there
2555  // can be multiple completed tasks with the same task ID.
2556  circular_buffer<process::Owned<Task>> completedTasks;
2557 
2558  // When an agent is marked unreachable, tasks running on it are stored
2559  // here. We only keep a fixed-size cache to avoid consuming too much memory.
2560  // NOTE: Non-partition-aware unreachable tasks in this map are marked
2561  // TASK_LOST instead of TASK_UNREACHABLE for backward compatibility.
2563 
2564  hashset<Offer*> offers; // Active offers for framework.
2565 
2566  hashset<InverseOffer*> inverseOffers; // Active inverse offers for framework.
2567 
2568  // TODO(bmahler): Make this private to enforce that `addExecutor()`
2569  // and `removeExecutor()` are used, and provide a const view into
2570  // the executors.
2572 
2573  // Pending operations or terminal operations that have
2574  // unacknowledged status updates.
2576 
2577  // The map from the framework-specified operation ID to the
2578  // corresponding internal operation UUID.
2580 
2581  // NOTE: For the used and offered resources below, we keep the
2582  // total as well as partitioned by SlaveID.
2583  // We expose the total resources via the HTTP endpoint, and we
2584  // keep a running total of the resources because looping over the
2585  // slaves to sum the resources has led to perf issues (MESOS-1862).
2586  // We keep the resources partitioned by SlaveID because non-scalar
2587  // resources can be lost when summing them up across multiple
2588  // slaves (MESOS-2373).
2589  //
2590  // Also note that keeping the totals is safe even though it yields
2591  // incorrect results for non-scalar resources.
2592  // (1) For overlapping set items / ranges across slaves, these
2593  // will get added N times but only represented once.
2594  // (2) When an initial subtraction occurs (N-1), the resource is
2595  // no longer represented. (This is the source of the bug).
2596  // (3) When any further subtractions occur (N-(1+M)), the
2597  // Resources simply ignores the subtraction since there's
2598  // nothing to remove, so this is safe for now.
2599 
2600  // TODO(mpark): Strip the non-scalar resources out of the totals
2601  // in order to avoid reporting incorrect statistics (MESOS-2623).
2602 
2603  // Active task / executor / operation resources.
2605 
2606  // Note that we maintain multiple copies of each shared resource in
2607  // `usedResources` as they are used by multiple tasks.
2609 
2610  // Offered resources.
2613 
2614  // This is only set for HTTP frameworks.
2617 
2618  // This is used for per-framework metrics.
2620 
2621 private:
2622  Framework(Master* const _master,
2623  const Flags& masterFlags,
2624  const FrameworkInfo& _info,
2625  State state,
2626  const process::Time& time);
2627 
2628  Framework(const Framework&); // No copying.
2629  Framework& operator=(const Framework&); // No assigning.
2630 };
2631 
2632 
2633 // Sends a message to the connected framework.
2634 template <typename Message>
2635 void Framework::send(const Message& message)
2636 {
2637  metrics.incrementEvent(message);
2638 
2639  if (!connected()) {
2640  LOG(WARNING) << "Master attempting to send message to disconnected"
2641  << " framework " << *this;
2642 
2643  // NOTE: We proceed here without returning to support the case where a
2644  // "disconnected" framework is still talking to the master and the master
2645  // wants to shut it down by sending a `FrameworkErrorMessage`. This can
2646  // occur in a one-way network partition where the master -> framework link
2647  // is broken but the framework -> master link remains intact. Note that we
2648  // have no periodic heartbeats between the master and pid-based schedulers.
2649  //
2650  // TODO(chhsiao): Update the `FrameworkErrorMessage` call-sites that rely on
2651  // the lack of a `return` here to directly call `process::send` so that this
2652  // function doesn't need to deal with the special case. Then we can check
2653  // that one of `http` or `pid` is set if the framework is connected.
2654  }
2655 
2656  if (http.isSome()) {
2657  if (!http->send(message)) {
2658  LOG(WARNING) << "Unable to send message to framework " << *this << ":"
2659  << " connection closed";
2660  }
2661  } else if (pid.isSome()) {
2662  master->send(pid.get(), message);
2663  } else {
2664  LOG(WARNING) << "Unable to send message to framework " << *this << ":"
2665  << " framework is recovered but has not reregistered";
2666  }
2667 }
2668 
2669 
2670 // TODO(bevers): Check if there is anything preventing us from
2671 // returning a const reference here.
2672 inline const FrameworkID Framework::id() const
2673 {
2674  return info.id();
2675 }
2676 
2677 
2678 inline bool Framework::active() const
2679 {
2680  return state == ACTIVE;
2681 }
2682 
2683 
2684 inline bool Framework::connected() const
2685 {
2686  return state == ACTIVE || state == INACTIVE;
2687 }
2688 
2689 
2690 inline bool Framework::recovered() const
2691 {
2692  return state == RECOVERED;
2693 }
2694 
2695 
2696 inline std::ostream& operator<<(
2697  std::ostream& stream,
2698  const Framework& framework)
2699 {
2700  // TODO(vinod): Also log the hostname once FrameworkInfo is properly
2701  // updated on framework failover (MESOS-1784).
2702  stream << framework.id() << " (" << framework.info.name() << ")";
2703 
2704  if (framework.pid.isSome()) {
2705  stream << " at " << framework.pid.get();
2706  }
2707 
2708  return stream;
2709 }
2710 
2711 
2712 // Information about an active role.
2713 struct Role
2714 {
2715  Role() = delete;
2716 
2717  Role(const Master* _master,
2718  const std::string& _role)
2719  : master(_master), role(_role) {}
2720 
2721  void addFramework(Framework* framework)
2722  {
2723  frameworks[framework->id()] = framework;
2724  }
2725 
2726  void removeFramework(Framework* framework)
2727  {
2728  frameworks.erase(framework->id());
2729  }
2730 
2731  const Master* master;
2732  const std::string role;
2733 
2734  // NOTE: The dynamic role/quota relation is stored in and administrated
2735  // by the master. There is no direct representation of quota information
2736  // here to avoid duplication and to support that an operator can associate
2737  // quota with a role before the role is created. Such ordering of operator
2738  // requests prevents a race of premature unbounded allocation that setting
2739  // quota first is intended to contain.
2740 
2742 };
2743 
2744 
2745 mesos::master::Response::GetFrameworks::Framework model(
2746  const Framework& framework);
2747 
2748 
2749 } // namespace master {
2750 } // namespace internal {
2751 } // namespace mesos {
2752 
2753 #endif // __MASTER_HPP__
Protocol< RecoverRequest, RecoverResponse > recover
Definition: path.hpp:29
Option< StreamingHttpConnection< v1::scheduler::Event > > http
Definition: master.hpp:2534
ReadOnlyHandler(const Master *_master)
Definition: master.hpp:1308
hashmap< ResourceProviderID, ResourceProvider > resourceProviders
Definition: master.hpp:346
Definition: nothing.hpp:16
hashmap< TaskID, Task * > tasks
Definition: master.hpp:2549
Definition: master.hpp:2713
Master *const master
Definition: master.hpp:2522
ContentType
Definition: http.hpp:43
hashmap< UUID, Operation * > operations
Definition: master.hpp:261
std::ostream & operator<<(std::ostream &stream, const Future< T > &future)
Definition: future.hpp:1826
Try< bool > update(const std::string &link, const Handle &parent, uint16_t protocol, const action::Mirror &mirror)
void finalize(bool finalize_wsa=false)
Clean up the library.
Try< Bytes > size(const std::string &path, const FollowSymlink follow=FollowSymlink::FOLLOW_SYMLINK)
Definition: stat.hpp:130
Future< Response > request(const Request &request, bool streamedResponse=false)
Asynchronously sends an HTTP request to the process and returns the HTTP response once the entire res...
SlaveInfo info
Definition: master.hpp:201
Definition: master.hpp:27
Definition: check.hpp:33
const SlaveID id
Definition: master.hpp:200
hashset< Offer * > offers
Definition: master.hpp:277
Option< process::Timer > reregistrationTimer
Definition: master.hpp:230
bool connected
Definition: master.hpp:217
Definition: resource_quantities.hpp:63
bool initialize(const Option< std::string > &delegate=None(), const Option< std::string > &readwriteAuthenticationRealm=None(), const Option< std::string > &readonlyAuthenticationRealm=None())
Initialize the library.
Resources totalResources
Definition: master.hpp:303
constexpr Duration DEFAULT_HEARTBEAT_INTERVAL
Definition: constants.hpp:61
Definition: protobuf_utils.hpp:332
hashmap< SlaveID, Resources > offeredResources
Definition: master.hpp:2612
protobuf::framework::Capabilities capabilities
Definition: master.hpp:2528
std::set< std::string > getRoles(const FrameworkInfo &frameworkInfo)
Result< ProcessStatus > status(pid_t pid)
Definition: proc.hpp:166
InverseOffer * getInverseOffer(Master *master, const OfferID &offerId)
Try< Nothing > machines(const google::protobuf::RepeatedPtrField< MachineID > &ids)
Performs the following checks on a list of machines:
mesos::v1::scheduler::Call Call
Definition: mesos.hpp:2782
Definition: resources.hpp:83
Resources totalUsedResources
Definition: master.hpp:2604
Slave * getSlave(Master *master, const SlaveID &slaveId)
Definition: authorization.hpp:36
Definition: flags.hpp:42
Definition: registrar.hpp:88
void addFramework(Framework *framework)
Definition: master.hpp:2721
Option< Error > reregisterSlave(const ReregisterSlaveMessage &message)
Definition: files.hpp:73
Operation
Definition: cgroups.hpp:458
Future< Nothing > redirect(int_fd from, Option< int_fd > to, size_t chunk=4096, const std::vector< lambda::function< void(const std::string &)>> &hooks={})
Redirect output from the &#39;from&#39; file descriptor to the &#39;to&#39; file descriptor (or /dev/null if &#39;to&#39; is ...
Option< process::Time > estimatedDrainStartTime
Definition: master.hpp:322
process::Sequence approversSequence
Definition: master.hpp:2258
hashmap< UUID, Operation * > operations
Definition: master.hpp:343
void send(const process::UPID &to, const google::protobuf::Message &message)
Definition: protobuf.hpp:118
bool isSome() const
Definition: option.hpp:116
Definition: event.hpp:209
Definition: http.hpp:533
Definition: json.hpp:158
Definition: sequence.hpp:33
hashmap< FrameworkID, hashmap< TaskID, Task * > > tasks
Definition: master.hpp:253
multihashmap< FrameworkID, TaskID > killedTasks
Definition: master.hpp:257
mesos::v1::scheduler::Event Event
Definition: mesos.hpp:2783
Definition: hashmap.hpp:38
Option< UUID > resourceVersion
Definition: master.hpp:316
FrameworkMetrics metrics
Definition: master.hpp:2619
Resources checkpointedResources
Definition: master.hpp:296
SlaveObserver * observer
Definition: master.hpp:318
Definition: owned.hpp:26
Try< Nothing > unavailability(const Unavailability &unavailability)
An "untyped" PID, used to encapsulate the process ID for lower-layer abstractions (eg...
Definition: pid.hpp:39
BoundedHashMap< TaskID, process::Owned< Task > > unreachableTasks
Definition: master.hpp:2562
Result< std::vector< Filter< Classifier > > > filters(const std::string &_link, const Handle &parent)
Definition: internal.hpp:769
process::Time registeredTime
Definition: master.hpp:213
bool active
Definition: master.hpp:222
An abstraction for contending to be a leading master.
Definition: contender.hpp:40
std::set< std::string > roles
Definition: master.hpp:2526
const Master * master
Definition: master.hpp:2731
hashset< UUID > orphanedOperations
Definition: master.hpp:274
process::UPID pid
Definition: master.hpp:205
Definition: uuid.hpp:35
Definition: protobuf_utils.hpp:624
Option< process::Time > reregisteredTime
Definition: master.hpp:214
Definition: agent.hpp:25
process::Time reregisteredTime
Definition: master.hpp:2540
Master *const master
Definition: master.hpp:199
Option< Error > quotaInfo(const mesos::quota::QuotaInfo &quotaInfo)
const T & get() const &
Definition: option.hpp:119
const std::string role
Definition: master.hpp:2732
Protocol< PromiseRequest, PromiseResponse > promise
Definition: protobuf.hpp:100
process::Time registeredTime
Definition: master.hpp:2539
Try< int_fd > accept(int_fd s)
Definition: network.hpp:31
Definition: whitelist_watcher.hpp:37
void removeFramework(Framework *framework)
Definition: master.hpp:2726
MasterInfo info() const
Definition: master.hpp:518
process::Owned< ResponseHeartbeater< scheduler::Event, v1::scheduler::Event > > heartbeater
Definition: master.hpp:2616
StreamingHttpConnection< v1::master::Event > http
Definition: master.hpp:2252
UUID resourceVersion
Definition: master.hpp:339
Definition: time.hpp:23
FrameworkInfo info
Definition: master.hpp:2524
ResponseHeartbeater< mesos::master::Event, v1::master::Event > heartbeater
Definition: master.hpp:2253
bool exited(const UPID &from, const UPID &to)
Simulates disconnection of the link between &#39;from&#39; and &#39;to&#39; by sending an ExitedEvent to &#39;to&#39;...
Definition: boundedhashmap.hpp:27
Basic model of an allocator: resources are allocated to a framework in the form of offers...
Definition: allocator.hpp:82
#define flags
Definition: decoder.hpp:18
Definition: none.hpp:27
Definition: attributes.hpp:24
hashset< InverseOffer * > inverseOffers
Definition: master.hpp:2566
Definition: executor.hpp:48
Definition: http.hpp:612
const MachineID machineId
Definition: master.hpp:203
Definition: master.hpp:122
constexpr size_t MAX_REMOVED_SLAVES
Definition: constants.hpp:93
hashset< InverseOffer * > inverseOffers
Definition: master.hpp:280
An abstraction of a Master detector which can be used to detect the leading master from a group...
Definition: detector.hpp:38
static Time now()
The current clock time for either the current process that makes this call or the global clock time i...
Resources totalResources
Definition: master.hpp:326
Definition: event.hpp:103
hashmap< TaskID, TaskInfo > pendingTasks
Definition: master.hpp:2545
Option< process::UPID > pid
Definition: master.hpp:2535
Definition: metrics.hpp:41
State
Definition: master.hpp:2403
Offer * getOffer(Master *master, const OfferID &offerId)
Try< Nothing > kill(const std::string &hierarchy, const std::string &cgroup, int signal)
hashmap< FrameworkID, Framework * > frameworks
Definition: master.hpp:2741
mesos::master::Response::GetFrameworks::Framework model(const Framework &framework)
process::Time unregisteredTime
Definition: master.hpp:2541
mesos::v1::scheduler::Response Response
Definition: mesos.hpp:2785
std::string version
Definition: master.hpp:208
URI http(const std::string &host, const std::string &path="/", const Option< int > &port=None(), const Option< std::string > &query=None(), const Option< std::string > &fragment=None(), const Option< std::string > &user=None(), const Option< std::string > &password=None())
Creates an http URI with the given parameters.
Definition: http.hpp:35
ResourceProviderInfo info
Definition: master.hpp:325
hashmap< FrameworkID, hashmap< ExecutorID, ExecutorInfo > > executors
Definition: master.hpp:237
hashset< Offer * > offers
Definition: master.hpp:2564
std::string stringify(int flags)
Definition: owned.hpp:36
Definition: master.hpp:2401
protobuf::slave::Capabilities capabilities
Definition: master.hpp:211
hashmap< SlaveID, hashmap< ExecutorID, ExecutorInfo > > executors
Definition: master.hpp:2571
circular_buffer< process::Owned< Task > > completedTasks
Definition: master.hpp:2556
hashmap< OperationID, UUID > operationUUIDs
Definition: master.hpp:2579
hashmap< UUID, Operation * > operations
Definition: master.hpp:2575
Definition: parse.hpp:33
Try< std::set< pid_t > > pids()
Definition: freebsd.hpp:62
const Option< process::http::authentication::Principal > principal
Definition: master.hpp:2254
hashmap< SlaveID, Resources > usedResources
Definition: master.hpp:2608
Resources totalOfferedResources
Definition: master.hpp:2611
Definition: master.hpp:361
PID< MetricsProcess > metrics
Resources offeredResources
Definition: master.hpp:287
hashmap< FrameworkID, hashmap< TaskID, TaskInfo > > pendingTasks
Definition: master.hpp:243
const FrameworkID id() const
Definition: master.hpp:2672
constexpr const char * name
Definition: shell.hpp:43
hashmap< FrameworkID, Resources > usedResources
Definition: master.hpp:285
bool contains(const Resource &left, const Resource &right)
Try< std::vector< Value > > time(const std::string &hierarchy, const std::string &cgroup)
Subscriber(const StreamingHttpConnection< v1::master::Event > &_http, const Option< process::http::authentication::Principal > _principal)
Definition: master.hpp:2209
Try< Nothing > schedule(const mesos::maintenance::Schedule &schedule, const hashmap< MachineID, Machine > &machines)
Performs the following checks on the new maintenance schedule:
Option< Error > registerSlave(const RegisterSlaveMessage &message)
Role(const Master *_master, const std::string &_role)
Definition: master.hpp:2717
State state
Definition: master.hpp:2537
Future< size_t > send(const int_fd &fd, const void *buf, size_t size)