Apache Mesos
master.hpp
Go to the documentation of this file.
1 // Licensed to the Apache Software Foundation (ASF) under one
2 // or more contributor license agreements. See the NOTICE file
3 // distributed with this work for additional information
4 // regarding copyright ownership. The ASF licenses this file
5 // to you under the Apache License, Version 2.0 (the
6 // "License"); you may not use this file except in compliance
7 // with the License. You may obtain a copy of the License at
8 //
9 // http://www.apache.org/licenses/LICENSE-2.0
10 //
11 // Unless required by applicable law or agreed to in writing, software
12 // distributed under the License is distributed on an "AS IS" BASIS,
13 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 // See the License for the specific language governing permissions and
15 // limitations under the License.
16 
17 #ifndef __MASTER_HPP__
18 #define __MASTER_HPP__
19 
20 #include <stdint.h>
21 
22 #include <functional>
23 #include <list>
24 #include <memory>
25 #include <set>
26 #include <string>
27 #include <utility>
28 #include <vector>
29 
30 #include <mesos/mesos.hpp>
31 #include <mesos/resources.hpp>
32 #include <mesos/roles.hpp>
33 #include <mesos/type_utils.hpp>
34 
36 
40 #include <mesos/master/master.hpp>
41 
43 
44 #include <mesos/quota/quota.hpp>
45 
47 
48 #include <process/collect.hpp>
49 #include <process/future.hpp>
50 #include <process/limiter.hpp>
51 #include <process/http.hpp>
52 #include <process/owned.hpp>
53 #include <process/process.hpp>
54 #include <process/protobuf.hpp>
55 #include <process/timer.hpp>
56 
58 
59 #include <stout/boundedhashmap.hpp>
60 #include <stout/cache.hpp>
62 #include <stout/foreach.hpp>
63 #include <stout/hashmap.hpp>
64 #include <stout/hashset.hpp>
65 #include <stout/linkedhashmap.hpp>
66 #include <stout/multihashmap.hpp>
67 #include <stout/nothing.hpp>
68 #include <stout/option.hpp>
69 #include <stout/recordio.hpp>
70 #include <stout/try.hpp>
71 #include <stout/uuid.hpp>
72 
73 #include "common/heartbeater.hpp"
74 #include "common/http.hpp"
76 
77 #include "files/files.hpp"
78 
79 #include "internal/devolve.hpp"
80 #include "internal/evolve.hpp"
81 
82 #include "master/authorization.hpp"
83 #include "master/constants.hpp"
84 #include "master/flags.hpp"
85 #include "master/machine.hpp"
86 #include "master/metrics.hpp"
87 #include "master/validation.hpp"
88 
89 #include "messages/messages.hpp"
90 
91 namespace process {
92 class RateLimiter; // Forward declaration.
93 }
94 
95 namespace mesos {
96 
97 // Forward declarations.
98 class Authorizer;
99 class ObjectApprovers;
100 
101 namespace internal {
102 
103 // Forward declarations.
104 namespace registry {
105 class Slaves;
106 }
107 
108 class Registry;
109 class WhitelistWatcher;
110 
111 namespace master {
112 
113 class Master;
114 class Registrar;
115 class SlaveObserver;
116 
117 struct BoundedRateLimiter;
118 struct Framework;
119 struct Role;
120 
121 
122 struct Slave
123 {
124 Slave(Master* const _master,
125  SlaveInfo _info,
126  const process::UPID& _pid,
127  const MachineID& _machineId,
128  const std::string& _version,
129  std::vector<SlaveInfo::Capability> _capabilites,
130  const process::Time& _registeredTime,
131  std::vector<Resource> _checkpointedResources,
132  const Option<UUID>& _resourceVersion,
133  std::vector<ExecutorInfo> executorInfos = std::vector<ExecutorInfo>(),
134  std::vector<Task> tasks = std::vector<Task>());
135 
136  ~Slave();
137 
138  Task* getTask(
139  const FrameworkID& frameworkId,
140  const TaskID& taskId) const;
141 
142  void addTask(Task* task);
143 
144  // Update slave to recover the resources that were previously
145  // being used by `task`.
146  //
147  // TODO(bmahler): This is a hack for performance. We need to
148  // maintain resource counters because computing task resources
149  // functionally for all tasks is expensive, for now.
150  void recoverResources(Task* task);
151 
152  void removeTask(Task* task);
153 
154  void addOperation(Operation* operation);
155 
156  void recoverResources(Operation* operation);
157 
158  void removeOperation(Operation* operation);
159 
160  // Marks a non-speculative operation as an orphan when the originating
161  // framework is torn down by the master, or when an agent reregisters
162  // with operations from unknown frameworks. If the operation is
163  // non-terminal, this has the side effect of modifying the agent's
164  // total resources, and should therefore be followed by
165  // `allocator->updateSlave()`.
166  void markOperationAsOrphan(Operation* operation);
167 
168  Operation* getOperation(const UUID& uuid) const;
169 
170  void addOffer(Offer* offer);
171 
172  void removeOffer(Offer* offer);
173 
174  void addInverseOffer(InverseOffer* inverseOffer);
175 
176  void removeInverseOffer(InverseOffer* inverseOffer);
177 
178  bool hasExecutor(
179  const FrameworkID& frameworkId,
180  const ExecutorID& executorId) const;
181 
182  void addExecutor(
183  const FrameworkID& frameworkId,
184  const ExecutorInfo& executorInfo);
185 
186  void removeExecutor(
187  const FrameworkID& frameworkId,
188  const ExecutorID& executorId);
189 
190  void apply(const std::vector<ResourceConversion>& conversions);
191 
193  const SlaveInfo& info,
194  const std::string& _version,
195  const std::vector<SlaveInfo::Capability>& _capabilites,
196  const Resources& _checkpointedResources,
197  const Option<UUID>& resourceVersion);
198 
199  Master* const master;
200  const SlaveID id;
201  SlaveInfo info;
202 
203  const MachineID machineId;
204 
206 
207  // TODO(bmahler): Use stout's Version when it can parse labels, etc.
208  std::string version;
209 
210  // Agent capabilities.
212 
215 
216  // Slave becomes disconnected when the socket closes.
217  bool connected;
218 
219  // Slave becomes deactivated when it gets disconnected, or when the
220  // agent is deactivated via the DRAIN_AGENT or DEACTIVATE_AGENT calls.
221  // No offers will be made for a deactivated slave.
222  bool active;
223 
224  // Timer for marking slaves unreachable that become disconnected and
225  // don't reregister. This timeout is larger than the slave
226  // observer's timeout, so typically the slave observer will be the
227  // one to mark such slaves unreachable; this timer is a backup for
228  // when a slave responds to pings but does not reregister (e.g.,
229  // because agent recovery has hung).
231 
232  // Executors running on this slave.
233  //
234  // TODO(bmahler): Make this private to enforce that `addExecutor()`
235  // and `removeExecutor()` are used, and provide a const view into
236  // the executors.
238 
239  // Tasks present on this slave.
240  //
241  // TODO(bmahler): Make this private to enforce that `addTask()` and
242  // `removeTask()` are used, and provide a const view into the tasks.
243  //
244  // TODO(bmahler): The task pointer ownership complexity arises from the fact
245  // that we own the pointer here, but it's shared with the Framework struct.
246  // We should find a way to eliminate this.
248 
249  // Tasks that were asked to kill by frameworks.
250  // This is used for reconciliation when the slave reregisters.
252 
253  // Pending operations or terminal operations that have
254  // unacknowledged status updates on this agent.
256 
257  // Pending operations whose originating framework is unknown.
258  // These operations could be pending, or terminal with unacknowledged
259  // status updates.
260  //
261  // This list can be populated whenever a framework is torn down in the
262  // lifetime of the master, or when an agent reregisters with an operation.
263  //
264  // If the originating framework is completed, the master will
265  // acknowledge any status updates instead of the framework.
266  // If an orphan does not belong to a completed framework, the master
267  // will only acknowledge status updates after a fixed delay.
269 
270  // Active offers on this slave.
272 
273  // Active inverse offers on this slave.
275 
276  // Resources for active task / executors / operations.
277  // Note that we maintain multiple copies of each shared resource in
278  // `usedResources` as they are used by multiple tasks.
280 
282 
283  // Resources that should be checkpointed by the slave (e.g.,
284  // persistent volumes, dynamic reservations, etc). These are either
285  // in use by a task/executor, or are available for use and will be
286  // re-offered to the framework.
287  // TODO(jieyu): `checkpointedResources` is only for agent default
288  // resources. Resources from resource providers are not included in
289  // this field. Consider removing this field.
291 
292  // The current total resources of the slave. Note that this is
293  // different from 'info.resources()' because this also considers
294  // operations (e.g., CREATE, RESERVE) that have been applied and
295  // includes revocable resources and resources from resource
296  // providers as well.
298 
299  // Used to establish the relationship between the operation and the
300  // resources that the operation is operating on. Each resource
301  // provider will keep a resource version UUID, and change it when it
302  // believes that the resources from this resource provider are out
303  // of sync from the master's view. The master will keep track of
304  // the last known resource version UUID for each resource provider,
305  // and attach the resource version UUID in each operation it sends
306  // out. The resource provider should reject operations that have a
307  // different resource version UUID than that it maintains, because
308  // this means the operation is operating on resources that might
309  // have already been invalidated.
311 
312  SlaveObserver* observer;
313 
314  // Time when this agent was last asked to drain. This field
315  // is empty if the agent is not currently draining or drained.
317 
319  ResourceProviderInfo info;
321 
322  // Used to establish the relationship between the operation and the
323  // resources that the operation is operating on. Each resource
324  // provider will keep a resource version UUID, and change it when it
325  // believes that the resources from this resource provider are out
326  // of sync from the master's view. The master will keep track of
327  // the last known resource version UUID for each resource provider,
328  // and attach the resource version UUID in each operation it sends
329  // out. The resource provider should reject operations that have a
330  // different resource version UUID than that it maintains, because
331  // this means the operation is operating on resources that might
332  // have already been invalidated.
334 
335  // Pending operations or terminal operations that have
336  // unacknowledged status updates.
338  };
339 
341 
342 private:
343  Slave(const Slave&); // No copying.
344  Slave& operator=(const Slave&); // No assigning.
345 };
346 
347 
348 inline std::ostream& operator<<(std::ostream& stream, const Slave& slave)
349 {
350  return stream << slave.id << " at " << slave.pid
351  << " (" << slave.info.hostname() << ")";
352 }
353 
354 
355 class Master : public ProtobufProcess<Master>
356 {
357 public:
359  Registrar* registrar,
360  Files* files,
363  const Option<Authorizer*>& authorizer,
364  const Option<std::shared_ptr<process::RateLimiter>>&
365  slaveRemovalLimiter,
366  const Flags& flags = Flags());
367 
368  ~Master() override;
369 
370  // Compare this master's capabilities with registry's minimum capability.
371  // Return the set of capabilities missing from this master.
372  static hashset<std::string> missingMinimumCapabilities(
373  const MasterInfo& masterInfo, const Registry& registry);
374 
375  // Message handlers.
376  void submitScheduler(
377  const std::string& name);
378 
379  void registerFramework(
380  const process::UPID& from,
381  RegisterFrameworkMessage&& registerFrameworkMessage);
382 
383  void reregisterFramework(
384  const process::UPID& from,
385  ReregisterFrameworkMessage&& reregisterFrameworkMessage);
386 
387  void unregisterFramework(
388  const process::UPID& from,
389  const FrameworkID& frameworkId);
390 
391  void deactivateFramework(
392  const process::UPID& from,
393  const FrameworkID& frameworkId);
394 
395  // TODO(vinod): Remove this once the old driver is removed.
396  void resourceRequest(
397  const process::UPID& from,
398  const FrameworkID& frameworkId,
399  const std::vector<Request>& requests);
400 
401  void launchTasks(
402  const process::UPID& from,
403  LaunchTasksMessage&& launchTasksMessage);
404 
405  void reviveOffers(
406  const process::UPID& from,
407  const FrameworkID& frameworkId,
408  const std::vector<std::string>& role);
409 
410  void killTask(
411  const process::UPID& from,
412  const FrameworkID& frameworkId,
413  const TaskID& taskId);
414 
415  void statusUpdateAcknowledgement(
416  const process::UPID& from,
417  StatusUpdateAcknowledgementMessage&& statusUpdateAcknowledgementMessage);
418 
419  void schedulerMessage(
420  const process::UPID& from,
421  FrameworkToExecutorMessage&& frameworkToExecutorMessage);
422 
423  void executorMessage(
424  const process::UPID& from,
425  ExecutorToFrameworkMessage&& executorToFrameworkMessage);
426 
427  void registerSlave(
428  const process::UPID& from,
429  RegisterSlaveMessage&& registerSlaveMessage);
430 
431  void reregisterSlave(
432  const process::UPID& from,
433  ReregisterSlaveMessage&& incomingMessage);
434 
435  void unregisterSlave(
436  const process::UPID& from,
437  const SlaveID& slaveId);
438 
439  void statusUpdate(
440  StatusUpdateMessage&& statusUpdateMessage);
441 
442  void reconcileTasks(
443  const process::UPID& from,
444  ReconcileTasksMessage&& reconcileTasksMessage);
445 
446  void updateOperationStatus(
447  UpdateOperationStatusMessage&& update);
448 
449  void exitedExecutor(
450  const process::UPID& from,
451  const SlaveID& slaveId,
452  const FrameworkID& frameworkId,
453  const ExecutorID& executorId,
454  int32_t status);
455 
456  void updateSlave(UpdateSlaveMessage&& message);
457 
458  void updateUnavailability(
459  const MachineID& machineId,
461 
462  // Marks the agent unreachable and returns whether the agent was
463  // marked unreachable. Returns false if the agent is already
464  // in a transitioning state or has transitioned into another
465  // state (this includes already being marked unreachable).
466  // The `duringMasterFailover` parameter specifies whether this
467  // agent is transitioning from a recovered state (true) or a
468  // registered state (false).
469  //
470  // Discarding currently not supported.
471  //
472  // Will not return a failure (this will crash the master
473  // internally in the case of a registry failure).
474  process::Future<bool> markUnreachable(
475  const SlaveInfo& slave,
476  bool duringMasterFailover,
477  const std::string& message);
478 
479  void markGone(const SlaveID& slaveId, const TimeInfo& goneTime);
480 
481  void authenticate(
482  const process::UPID& from,
483  const process::UPID& pid);
484 
485  // TODO(bmahler): It would be preferred to use a unique libprocess
486  // Process identifier (PID is not sufficient) for identifying the
487  // framework instance, rather than relying on re-registration time.
488  void frameworkFailoverTimeout(
489  const FrameworkID& frameworkId,
490  const process::Time& reregisteredTime);
491 
492  void offer(
493  const FrameworkID& frameworkId,
494  const hashmap<std::string, hashmap<SlaveID, Resources>>& resources);
495 
496  void inverseOffer(
497  const FrameworkID& frameworkId,
498  const hashmap<SlaveID, UnavailableResources>& resources);
499 
500  // Invoked when there is a newly elected leading master.
501  // Made public for testing purposes.
502  void detected(const process::Future<Option<MasterInfo>>& _leader);
503 
504  // Invoked when the contender has lost the candidacy.
505  // Made public for testing purposes.
506  void lostCandidacy(const process::Future<Nothing>& lost);
507 
508  // Continuation of recover().
509  // Made public for testing purposes.
510  process::Future<Nothing> _recover(const Registry& registry);
511 
512  MasterInfo info() const
513  {
514  return info_;
515  }
516 
517 protected:
518  void initialize() override;
519  void finalize() override;
520 
521  void consume(process::MessageEvent&& event) override;
522  void consume(process::ExitedEvent&& event) override;
523 
524  void exited(const process::UPID& pid) override;
525  void exited(
526  const FrameworkID& frameworkId,
528 
529  void _exited(Framework* framework);
530 
531  // Invoked upon noticing a subscriber disconnection.
532  void exited(const id::UUID& id);
533 
534  void agentReregisterTimeout(const SlaveID& slaveId);
535  Nothing _agentReregisterTimeout(const SlaveID& slaveId);
536 
537  // Invoked when the message is ready to be executed after
538  // being throttled.
539  // 'principal' being None indicates it is throttled by
540  // 'defaultLimiter'.
541  void throttled(
542  process::MessageEvent&& event,
543  const Option<std::string>& principal);
544 
545  // Continuations of consume().
546  void _consume(process::MessageEvent&& event);
547  void _consume(process::ExitedEvent&& event);
548 
549  // Helper method invoked when the capacity for a framework
550  // principal is exceeded.
551  void exceededCapacity(
552  const process::MessageEvent& event,
553  const Option<std::string>& principal,
554  uint64_t capacity);
555 
556  // Recovers state from the registrar.
558  void recoveredSlavesTimeout(const Registry& registry);
559 
560  void _registerSlave(
561  const process::UPID& pid,
562  RegisterSlaveMessage&& registerSlaveMessage,
564  const process::Future<bool>& authorized);
565 
566  void __registerSlave(
567  const process::UPID& pid,
568  RegisterSlaveMessage&& registerSlaveMessage,
569  const process::Future<bool>& admit);
570 
571  void _reregisterSlave(
572  const process::UPID& pid,
573  ReregisterSlaveMessage&& incomingMessage,
575  const process::Future<bool>& authorized);
576 
577  void __reregisterSlave(
578  const process::UPID& pid,
579  ReregisterSlaveMessage&& incomingMessage,
580  const process::Future<bool>& readmit);
581 
582  void ___reregisterSlave(
583  const process::UPID& pid,
584  ReregisterSlaveMessage&& incomingMessage,
585  const process::Future<bool>& updated);
586 
587  void updateSlaveFrameworks(
588  Slave* slave,
589  const std::vector<FrameworkInfo>& frameworks);
590 
591  // 'future' is the future returned by the authenticator.
592  void _authenticate(
593  const process::UPID& pid,
594  const process::Future<Option<std::string>>& future);
595 
596  void authenticationTimeout(process::Future<Option<std::string>> future);
597 
598  void fileAttached(const process::Future<Nothing>& result,
599  const std::string& path);
600 
601  // Invoked when the contender has entered the contest.
602  void contended(const process::Future<process::Future<Nothing>>& candidacy);
603 
604  // When a slave that was previously registered with this master
605  // reregisters, we need to reconcile the master's view of the
606  // slave's tasks and executors. This function also sends the
607  // `SlaveReregisteredMessage`.
608  void reconcileKnownSlave(
609  Slave* slave,
610  const std::vector<ExecutorInfo>& executors,
611  const std::vector<Task>& tasks);
612 
613  // Add a framework.
614  void addFramework(
615  Framework* framework,
616  ::mesos::allocator::FrameworkOptions&& allocatorOptions);
617 
618  // Recover a framework from its `FrameworkInfo`. This happens after
619  // master failover, when an agent running one of the framework's
620  // tasks reregisters or when the framework itself reregisters,
621  // whichever happens first. The result of this function is a
622  // registered, inactive framework with state `RECOVERED` and empty
623  // FrameworkOptions in the allocator.
624  void recoverFramework(const FrameworkInfo& info);
625 
626  // Transition a framework from `RECOVERED` to `CONNECTED` state and
627  // activate it. This happens at most once after master failover, the
628  // first time that the framework reregisters with the new master.
629  // Exactly one of `newPid` or `http` must be provided.
630  void connectAndActivateRecoveredFramework(
631  Framework* framework,
632  const Option<process::UPID>& pid,
634  const process::Owned<ObjectApprovers>& objectApprovers);
635 
636  // Replace the scheduler for a framework with a new process ID, in
637  // the event of a scheduler failover.
638  void failoverFramework(
639  Framework* framework,
640  const process::UPID& newPid,
641  const process::Owned<ObjectApprovers>& objectApprovers);
642 
643  // Replace the scheduler for a framework with a new HTTP connection,
644  // in the event of a scheduler failover.
645  void failoverFramework(
646  Framework* framework,
648  const process::Owned<ObjectApprovers>& objectApprovers);
649 
650  void _failoverFramework(Framework* framework);
651 
652  // Kill all of a framework's tasks, delete the framework object, and
653  // reschedule offers that were assigned to this framework.
654  void removeFramework(Framework* framework);
655 
656  // Remove a framework from the slave, i.e., remove its tasks and
657  // executors and recover the resources.
658  void removeFramework(Slave* slave, Framework* framework);
659 
660  // Performs actions common for all the framework update paths.
661  //
662  // NOTE: the fields 'id', 'principal', 'name' and 'checkpoint' in the
663  // 'frameworkInfo' should have the same values as in 'framework->info',
664  // otherwise this method terminates the program.
665  //
666  // TODO(asekretenko): Make sending FrameworkInfo updates to slaves, API
667  // subscribers and anywhere else a responsibility of this method -
668  // currently is is not, see MESOS-9746. After that we can remove the
669  // 'sendFrameworkUpdates()' method.
670  void updateFramework(
671  Framework* framework,
672  const FrameworkInfo& frameworkInfo,
673  ::mesos::scheduler::OfferConstraints&& offerConstraints,
674  ::mesos::allocator::FrameworkOptions&& allocatorOptions);
675 
676  void sendFrameworkUpdates(const Framework& framework);
677 
678  void disconnect(Framework* framework);
679  void deactivate(Framework* framework, bool rescind);
680 
681  void disconnect(Slave* slave);
682 
683  // Removes the agent from the resource offer cycle (and rescinds active
684  // offers). Other aspects of the agent will continue to function normally.
685  void deactivate(Slave* slave);
686 
687  // Add a slave.
688  void addSlave(
689  Slave* slave,
690  std::vector<Archive::Framework>&& completedFrameworks);
691 
692  void _markUnreachable(
693  const SlaveInfo& slave,
694  const TimeInfo& unreachableTime,
695  bool duringMasterFailover,
696  const std::string& message,
697  bool registrarResult);
698 
699  void sendSlaveLost(const SlaveInfo& slaveInfo);
700 
701  // Remove the slave from the registrar and from the master's state.
702  //
703  // TODO(bmahler): 'reason' is optional until MESOS-2317 is resolved.
704  void removeSlave(
705  Slave* slave,
706  const std::string& message,
708 
709  // Removes an agent from the master's state in the following cases:
710  // * When maintenance is started on an agent
711  // * When an agent registers with a new ID from a previously-known IP + port
712  // * When an agent unregisters itself with an `UnregisterSlaveMessage`
713  void _removeSlave(
714  Slave* slave,
715  const process::Future<bool>& registrarResult,
716  const std::string& removalCause,
718 
719  // Removes an agent from the master's state in the following cases:
720  // * When marking an agent unreachable
721  // * When marking an agent gone
722  //
723  // NOTE that in spite of the name `__removeSlave()`, this function is NOT a
724  // continuation of `_removeSlave()`. Rather, these two functions perform
725  // similar logic for slightly different cases.
726  //
727  // TODO(greggomann): refactor `_removeSlave` and `__removeSlave` into a single
728  // common helper function. (See MESOS-9550)
729  void __removeSlave(
730  Slave* slave,
731  const std::string& message,
732  const Option<TimeInfo>& unreachableTime);
733 
734  // Validates that the framework is authenticated, if required.
735  Option<Error> validateFrameworkAuthentication(
736  const FrameworkInfo& frameworkInfo,
737  const process::UPID& from);
738 
739  // Returns whether the principal is authorized for the specified
740  // action-object pair.
741  // Returns failure for transient authorization failures.
742  process::Future<bool> authorize(
744  authorization::ActionObject&& actionObject);
745 
746  // Overload of authorize() for cases which require multiple action-object
747  // pairs to be authorized simultaneously.
748  process::Future<bool> authorize(
750  std::vector<authorization::ActionObject>&& actionObjects);
751 
752  // Determine if a new executor needs to be launched.
753  bool isLaunchExecutor (
754  const ExecutorID& executorId,
755  Framework* framework,
756  Slave* slave) const;
757 
758  // Add executor to the framework and slave.
759  void addExecutor(
760  const ExecutorInfo& executorInfo,
761  Framework* framework,
762  Slave* slave);
763 
764  // Add task to the framework and slave.
765  void addTask(const TaskInfo& task, Framework* framework, Slave* slave);
766 
767  // Transitions the task, and recovers resources if the task becomes
768  // terminal.
769  void updateTask(Task* task, const StatusUpdate& update);
770 
771  // Removes the task. `unreachable` indicates whether the task is removed due
772  // to being unreachable. Note that we cannot rely on the task state because
773  // it may not reflect unreachability due to being set to TASK_LOST for
774  // backwards compatibility.
775  void removeTask(Task* task, bool unreachable = false);
776 
777  // Remove an executor and recover its resources.
778  void removeExecutor(
779  Slave* slave,
780  const FrameworkID& frameworkId,
781  const ExecutorID& executorId);
782 
783  // Adds the given operation to the framework and the agent.
784  void addOperation(
785  Framework* framework,
786  Slave* slave,
787  Operation* operation);
788 
789  // Transitions the operation, and updates and recovers resources if
790  // the operation becomes terminal. If `convertResources` is `false`
791  // only the consumed resources of terminal operations are recovered,
792  // but no resources are converted.
793  void updateOperation(
794  Operation* operation,
795  const UpdateOperationStatusMessage& update,
796  bool convertResources = true);
797 
798  // Remove the operation.
799  void removeOperation(Operation* operation);
800 
801  // Send operation update for all operations on the agent.
802  void sendBulkOperationFeedback(
803  Slave* slave,
804  OperationState operationState,
805  const std::string& message);
806 
807  // Attempts to update the allocator by applying the given operation.
808  // If successful, updates the slave's resources, sends a
809  // 'CheckpointResourcesMessage' to the slave with the updated
810  // checkpointed resources, and returns a 'Future' with 'Nothing'.
811  // Otherwise, no action is taken and returns a failed 'Future'.
813  Slave* slave,
814  const Offer::Operation& operation);
815 
816  // Forwards the update to the framework.
817  void forward(
818  const StatusUpdate& update,
819  const process::UPID& acknowledgee,
820  Framework* framework);
821 
822  // Remove an offer after specified timeout
823  void offerTimeout(const OfferID& offerId);
824 
825  // Methods for removing an offer and handling associated resources.
826  // Both recover the resources in the allocator (optionally setting offer
827  // filters) and remove the offer in the master. `rescindOffer` further
828  // notifies the framework about the rescind.
829  //
830  // NOTE: the `filters` field in `rescindOffers` is needed only as
831  // a workaround for the race between the master and the allocator
832  // which happens when the master tries to free up resources to satisfy
833  // operator initiated operations.
834  void rescindOffer(Offer* offer, const Option<Filters>& filters = None());
835  void discardOffer(Offer* offer, const Option<Filters>& filters = None());
836 
837  // Helper for rescindOffer() / discardOffer() / _accept().
838  // Do not use directly.
839  //
840  // The offer must belong to the framework.
841  void _removeOffer(Framework* framework, Offer* offer);
842 
843  // Remove an inverse offer after specified timeout
844  void inverseOfferTimeout(const OfferID& inverseOfferId);
845 
846  // Remove an inverse offer and optionally rescind it as well.
847  void removeInverseOffer(InverseOffer* inverseOffer, bool rescind = false);
848 
849  bool isCompletedFramework(const FrameworkID& frameworkId) const;
850 
851  Framework* getFramework(const FrameworkID& frameworkId) const;
852  Offer* getOffer(const OfferID& offerId) const;
853  InverseOffer* getInverseOffer(const OfferID& inverseOfferId) const;
854 
855  FrameworkID newFrameworkId();
856  OfferID newOfferId();
857  SlaveID newSlaveId();
858 
859 private:
860  // Updates the agent's resources by applying the given operation.
861  // Sends either `ApplyOperationMessage` or
862  // `CheckpointResourcesMessage` (with updated checkpointed
863  // resources) to the agent depending on if the agent has
864  // `RESOURCE_PROVIDER` capability.
865  void _apply(
866  Slave* slave,
867  Framework* framework,
868  const Offer::Operation& operationInfo);
869 
870  void drop(
871  const process::UPID& from,
872  const mesos::scheduler::Call& call,
873  const std::string& message);
874 
875  void drop(
876  Framework* framework,
877  const Offer::Operation& operation,
878  const std::string& message);
879 
880  void drop(
881  Framework* framework,
882  const mesos::scheduler::Call& call,
883  const std::string& message);
884 
885  void drop(
886  Framework* framework,
887  const mesos::scheduler::Call::Suppress& suppress,
888  const std::string& message);
889 
890  void drop(
891  Framework* framework,
892  const mesos::scheduler::Call::Revive& revive,
893  const std::string& message);
894 
895  // Call handlers.
896  void receive(
897  const process::UPID& from,
898  mesos::scheduler::Call&& call);
899 
900  void subscribe(
902  mesos::scheduler::Call::Subscribe&& subscribe);
903 
904  void _subscribe(
906  FrameworkInfo&& frameworkInfo,
907  scheduler::OfferConstraints&& offerConstraints,
908  bool force,
909  ::mesos::allocator::FrameworkOptions&& allocatorOptions,
910  const process::Future<process::Owned<ObjectApprovers>>& objectApprovers);
911 
912  void subscribe(
913  const process::UPID& from,
914  mesos::scheduler::Call::Subscribe&& subscribe);
915 
916  void _subscribe(
917  const process::UPID& from,
918  FrameworkInfo&& frameworkInfo,
919  scheduler::OfferConstraints&& offerConstraints,
920  bool force,
921  ::mesos::allocator::FrameworkOptions&& allocatorOptions,
922  const process::Future<process::Owned<ObjectApprovers>>& objectApprovers);
923 
924  // Update framework via SchedulerDriver (i.e. no response
925  // code feedback, FrameworkErrorMessage on error).
926  void updateFramework(
927  const process::UPID& from,
928  mesos::scheduler::Call::UpdateFramework&& call);
929 
930  // Update framework via HTTP API (i.e. returns 200 OK).
932  mesos::scheduler::Call::UpdateFramework&& call);
933 
934  // Subscribes a client to the 'api/vX' endpoint.
935  void subscribe(
937  const process::Owned<ObjectApprovers>& approvers);
938 
939  void teardown(Framework* framework);
940 
941  void accept(
942  Framework* framework,
943  mesos::scheduler::Call::Accept&& accept);
944 
945  void _accept(
946  const FrameworkID& frameworkId,
947  const SlaveID& slaveId,
948  mesos::scheduler::Call::Accept&& accept);
949 
950  void acceptInverseOffers(
951  Framework* framework,
952  const mesos::scheduler::Call::AcceptInverseOffers& accept);
953 
954  void decline(
955  Framework* framework,
956  mesos::scheduler::Call::Decline&& decline);
957 
958  void declineInverseOffers(
959  Framework* framework,
960  const mesos::scheduler::Call::DeclineInverseOffers& decline);
961 
962  // Should be called after each terminal task status update acknowledgement
963  // or terminal operation acknowledgement. If an agent is draining, this
964  // checks if all pending tasks or operations have terminated and then
965  // transitions the DRAINING agent to DRAINED.
966  void checkAndTransitionDrainingAgent(Slave* slave);
967 
968  void revive(
969  Framework* framework,
970  const mesos::scheduler::Call::Revive& revive);
971 
972  void kill(
973  Framework* framework,
974  const mesos::scheduler::Call::Kill& kill);
975 
976  void shutdown(
977  Framework* framework,
978  const mesos::scheduler::Call::Shutdown& shutdown);
979 
980  void acknowledge(
981  Framework* framework,
982  mesos::scheduler::Call::Acknowledge&& acknowledge);
983 
984  void acknowledgeOperationStatus(
985  Framework* framework,
986  mesos::scheduler::Call::AcknowledgeOperationStatus&& acknowledge);
987 
988  void reconcile(
989  Framework* framework,
990  mesos::scheduler::Call::Reconcile&& reconcile);
991 
992  void reconcileOperations(
993  Framework* framework,
994  mesos::scheduler::Call::ReconcileOperations&& reconcile);
995 
996  void message(
997  Framework* framework,
998  mesos::scheduler::Call::Message&& message);
999 
1000  void request(
1001  Framework* framework,
1002  const mesos::scheduler::Call::Request& request);
1003 
1004  void suppress(
1005  Framework* framework,
1006  const mesos::scheduler::Call::Suppress& suppress);
1007 
1008  bool elected() const
1009  {
1010  return leader.isSome() && leader.get() == info_;
1011  }
1012 
1013  void scheduleRegistryGc();
1014 
1015  void doRegistryGc();
1016 
1017  void _doRegistryGc(
1018  const hashset<SlaveID>& toRemoveUnreachable,
1019  const hashset<SlaveID>& toRemoveGone,
1020  const process::Future<bool>& registrarResult);
1021 
1022  // Returns all roles known to the master, if roles are whitelisted
1023  // this simply returns the whitelist and any ancestors of roles in
1024  // the whitelist. Otherwise, this returns:
1025  //
1026  // (1) Roles with configured weight or quota.
1027  // (2) Roles with reservations.
1028  // (3) Roles with frameworks subscribed or allocated resources.
1029  // (4) Ancestor roles of (1), (2), or (3).
1030  std::vector<std::string> knownRoles() const;
1031 
1039  bool isWhitelistedRole(const std::string& name) const;
1040 
1041  // TODO(bmahler): Store a role tree rather than the existing
1042  // `roles` map which does not track the tree correctly (it does
1043  // not insert ancestor entries, nor does it track roles if there
1044  // are reservations but no frameworks related to them).
1045  struct RoleResourceBreakdown
1046  {
1047  public:
1048  RoleResourceBreakdown(const Master* const master_, const std::string& role_)
1049  : master(master_), role(role_) {}
1050 
1051  ResourceQuantities offered() const;
1052  ResourceQuantities allocated() const;
1053  ResourceQuantities reserved() const;
1054  ResourceQuantities consumedQuota() const;
1055 
1056  private:
1057  const Master* const master;
1058  const std::string role;
1059  };
1060 
1061  // Performs stateless and stateful validation of the FrameworkInfo.
1062  // The stateful validation only uses the master flags and whether
1063  // the framework is completed.
1064 
1065  // TODO(asekretenko): Pass in the state explicitly to move this function into
1066  // validation.hpp for unit testability and better separation of concerns.
1067  Option<Error> validateFramework(const FrameworkInfo& frameworkInfo) const;
1068 
1076  class QuotaHandler
1077  {
1078  public:
1079  explicit QuotaHandler(Master* _master) : master(_master)
1080  {
1081  CHECK_NOTNULL(master);
1082  }
1083 
1084  // Returns a list of set quotas.
1086  const mesos::master::Call& call,
1088  ContentType contentType) const;
1089 
1091  const process::http::Request& request,
1093  principal) const;
1094 
1096  const mesos::master::Call& call,
1098  const;
1099 
1101  const mesos::master::Call& call,
1103  principal) const;
1104 
1108  principal) const;
1109 
1111  const mesos::master::Call& call,
1113  principal) const;
1114 
1118  principal) const;
1119 
1120  private:
1121  // Returns an error if the total quota guarantees overcommits
1122  // the cluster. This is not a quota satisfiability check: it's
1123  // possible that quota is unsatisfiable even if the quota
1124  // does not overcommit the cluster.
1125 
1126  // Returns an error if the total quota guarantees overcommits
1127  // the cluster. This is not a quota satisfiability check: it's
1128  // possible that quota is unsatisfiable even if the quota
1129  // does not overcommit the cluster. Specifically, we verify that
1130  // the following inequality holds:
1131  //
1132  // total cluster capacity >= total quota w/ quota request applied
1133  //
1134  // Note, total cluster capacity accounts resources of all the
1135  // registered agents, including resources from resource providers
1136  // as well as reservations (both static and dynamic ones).
1137  static Option<Error> overcommitCheck(
1138  const std::vector<Resources>& agents,
1139  const hashmap<std::string, Quota>& quotas,
1140  const mesos::quota::QuotaInfo& request);
1141 
1142  // We always want to rescind offers after the capacity heuristic. The
1143  // reason for this is the race between the allocator and the master:
1144  // it can happen that there are not enough free resources at the
1145  // allocator's disposal when it is notified about the quota request,
1146  // but at this point it's too late to rescind.
1147  //
1148  // While rescinding, we adhere to the following rules:
1149  // * Rescind at least as many resources as there are in the quota request.
1150  // * Rescind all offers from an agent in order to make the potential
1151  // offer bigger, which increases the chances that a quota'ed framework
1152  // will be able to use the offer.
1153  // * Rescind offers from at least `numF` agents to make it possible
1154  // (but not guaranteed, due to fair sharing) that each framework in
1155  // the role for which quota is set gets an offer (`numF` is the
1156  // number of frameworks in the quota'ed role). Though this is not
1157  // strictly necessary, we think this will increase the debugability
1158  // and will improve user experience.
1159  //
1160  // TODO(alexr): Consider removing this function once offer management
1161  // (including rescinding) is moved to allocator.
1162  void rescindOffers(const mesos::quota::QuotaInfo& request) const;
1163 
1164  process::Future<bool> authorizeGetQuota(
1166  const std::string& role) const;
1167 
1168  // This auth function is used for legacy `SET_QUOTA` and `REMOVE_QUOTA`
1169  // calls. Remove this function after the associated API calls are
1170  // no longer supported.
1171  process::Future<bool> authorizeUpdateQuota(
1173  const mesos::quota::QuotaInfo& quotaInfo) const;
1174 
1175  process::Future<bool> authorizeUpdateQuotaConfig(
1177  const mesos::quota::QuotaConfig& quotaConfig) const;
1178 
1181  principal) const;
1182 
1184  const google::protobuf::RepeatedPtrField<mesos::quota::QuotaConfig>&
1185  quotaConfigs) const;
1186 
1188  const mesos::quota::QuotaRequest& quotaRequest,
1190  principal) const;
1191 
1193  const mesos::quota::QuotaInfo& quotaInfo,
1194  bool forced) const;
1195 
1197  const std::string& role,
1199  principal) const;
1200 
1202  const std::string& role) const;
1203 
1204  // To perform actions related to quota management, we require access to the
1205  // master data structures. No synchronization primitives are needed here
1206  // since `QuotaHandler`'s functions are invoked in the Master's actor.
1207  Master* master;
1208  };
1209 
1217  class WeightsHandler
1218  {
1219  public:
1220  explicit WeightsHandler(Master* _master) : master(_master)
1221  {
1222  CHECK_NOTNULL(master);
1223  }
1224 
1228  principal) const;
1229 
1231  const mesos::master::Call& call,
1233  ContentType contentType) const;
1234 
1236  const process::http::Request& request,
1238  principal) const;
1239 
1241  const mesos::master::Call& call,
1243  ContentType contentType) const;
1244 
1245  private:
1246  process::Future<bool> authorizeGetWeight(
1248  const WeightInfo& weight) const;
1249 
1250  process::Future<bool> authorizeUpdateWeights(
1252  const std::vector<std::string>& roles) const;
1253 
1255  const std::vector<WeightInfo>& weightInfos,
1256  const std::vector<bool>& roleAuthorizations) const;
1257 
1260  principal) const;
1261 
1264  const google::protobuf::RepeatedPtrField<WeightInfo>& weightInfos)
1265  const;
1266 
1268  const std::vector<WeightInfo>& weightInfos) const;
1269 
1270  // Rescind all outstanding offers if any of the 'weightInfos' roles has
1271  // an active framework.
1272  void rescindOffers(const std::vector<WeightInfo>& weightInfos) const;
1273 
1274  Master* master;
1275  };
1276 
1277 public:
1278  // Inner class used to namespace read-only HTTP handlers; these handlers
1279  // may be executed in parallel.
1280  //
1281  // A synchronously executed post-processing step is provided for any
1282  // cases where the handler is not purely read-only and requires a
1283  // synchronous write (i.e. it's not feasible to perform the write
1284  // asynchronously (e.g. SUBSCRIBE cannot have a gap between serving
1285  // the initial state and registering the subscriber, or else events
1286  // will be missed in the interim)).
1287  //
1288  // The handlers are only permitted to depend on the output content
1289  // type (derived from the request headers), the request query
1290  // parameters and the authorization filters to de-duplicate identical
1291  // responses (this does not de-duplicate all identical responses, e.g.
1292  // different authz principal but same permissions).
1293  //
1294  // NOTE: Most member functions of this class are not routed directly but
1295  // dispatched from their corresponding handlers in the outer `Http` class.
1296  // This is because deciding whether an incoming request is read-only often
1297  // requires some inspection, e.g. distinguishing between "GET" and "POST"
1298  // requests to the same endpoint.
1300  {
1301  public:
1303  {
1304  struct Subscribe
1305  {
1308  };
1309 
1310  // Any additional post-processing cases will add additional
1311  // cases into this variant.
1312  Variant<Subscribe> state;
1313  };
1314 
1315  explicit ReadOnlyHandler(const Master* _master) : master(_master) {}
1316 
1317  // /frameworks
1318  std::pair<process::http::Response, Option<PostProcessing>> frameworks(
1319  ContentType outputContentType,
1320  const hashmap<std::string, std::string>& queryParameters,
1321  const process::Owned<ObjectApprovers>& approvers) const;
1322 
1323  // /roles
1324  std::pair<process::http::Response, Option<PostProcessing>> roles(
1325  ContentType outputContentType,
1326  const hashmap<std::string, std::string>& queryParameters,
1327  const process::Owned<ObjectApprovers>& approvers) const;
1328 
1329  // /slaves
1330  std::pair<process::http::Response, Option<PostProcessing>> slaves(
1331  ContentType outputContentType,
1332  const hashmap<std::string, std::string>& queryParameters,
1333  const process::Owned<ObjectApprovers>& approvers) const;
1334 
1335  // /state
1336  std::pair<process::http::Response, Option<PostProcessing>> state(
1337  ContentType outputContentType,
1338  const hashmap<std::string, std::string>& queryParameters,
1339  const process::Owned<ObjectApprovers>& approvers) const;
1340 
1341  // /state-summary
1342  std::pair<process::http::Response, Option<PostProcessing>> stateSummary(
1343  ContentType outputContentType,
1344  const hashmap<std::string, std::string>& queryParameters,
1345  const process::Owned<ObjectApprovers>& approvers) const;
1346 
1347  // /tasks
1348  std::pair<process::http::Response, Option<PostProcessing>> tasks(
1349  ContentType outputContentType,
1350  const hashmap<std::string, std::string>& queryParameters,
1351  const process::Owned<ObjectApprovers>& approvers) const;
1352 
1353  // master::Call::GET_STATE
1354  std::pair<process::http::Response, Option<PostProcessing>> getState(
1355  ContentType outputContentType,
1356  const hashmap<std::string, std::string>& queryParameters,
1357  const process::Owned<ObjectApprovers>& approvers) const;
1358 
1359  // master::Call::GET_AGENTS
1360  std::pair<process::http::Response, Option<PostProcessing>> getAgents(
1361  ContentType outputContentType,
1362  const hashmap<std::string, std::string>& queryParameters,
1363  const process::Owned<ObjectApprovers>& approvers) const;
1364 
1365  // master::Call::GET_FRAMEWORKS
1366  std::pair<process::http::Response, Option<PostProcessing>> getFrameworks(
1367  ContentType outputContentType,
1368  const hashmap<std::string, std::string>& queryParameters,
1369  const process::Owned<ObjectApprovers>& approvers) const;
1370 
1371  // master::Call::GET_EXECUTORS
1372  std::pair<process::http::Response, Option<PostProcessing>> getExecutors(
1373  ContentType outputContentType,
1374  const hashmap<std::string, std::string>& queryParameters,
1375  const process::Owned<ObjectApprovers>& approvers) const;
1376 
1377  // master::Call::GET_OPERATIONS
1378  std::pair<process::http::Response, Option<PostProcessing>> getOperations(
1379  ContentType outputContentType,
1380  const hashmap<std::string, std::string>& queryParameters,
1381  const process::Owned<ObjectApprovers>& approvers) const;
1382 
1383  // master::Call::GET_TASKS
1384  std::pair<process::http::Response, Option<PostProcessing>> getTasks(
1385  ContentType outputContentType,
1386  const hashmap<std::string, std::string>& queryParameters,
1387  const process::Owned<ObjectApprovers>& approvers) const;
1388 
1389  // master::Call::GET_ROLES
1390  std::pair<process::http::Response, Option<PostProcessing>> getRoles(
1391  ContentType outputContentType,
1392  const hashmap<std::string, std::string>& queryParameters,
1393  const process::Owned<ObjectApprovers>& approvers) const;
1394 
1395  // master::Call::SUBSCRIBE
1396  std::pair<process::http::Response, Option<PostProcessing>> subscribe(
1397  ContentType outputContentType,
1398  const hashmap<std::string, std::string>& queryParameters,
1399  const process::Owned<ObjectApprovers>& approvers) const;
1400 
1401  private:
1402  std::string serializeGetState(
1403  const process::Owned<ObjectApprovers>& approvers) const;
1404  std::string serializeGetAgents(
1405  const process::Owned<ObjectApprovers>& approvers) const;
1406  std::string serializeGetFrameworks(
1407  const process::Owned<ObjectApprovers>& approvers) const;
1408  std::string serializeGetExecutors(
1409  const process::Owned<ObjectApprovers>& approvers) const;
1410  std::string serializeGetOperations(
1411  const process::Owned<ObjectApprovers>& approvers) const;
1412  std::string serializeGetTasks(
1413  const process::Owned<ObjectApprovers>& approvers) const;
1414  std::string serializeGetRoles(
1415  const process::Owned<ObjectApprovers>& approvers) const;
1416  std::string serializeSubscribe(
1417  const process::Owned<ObjectApprovers>& approvers) const;
1418 
1419  std::function<void(JSON::ObjectWriter*)> jsonifyGetState(
1420  const process::Owned<ObjectApprovers>& approvers) const;
1421  std::function<void(JSON::ObjectWriter*)> jsonifyGetAgents(
1422  const process::Owned<ObjectApprovers>& approvers) const;
1423  std::function<void(JSON::ObjectWriter*)> jsonifyGetFrameworks(
1424  const process::Owned<ObjectApprovers>& approvers) const;
1425  std::function<void(JSON::ObjectWriter*)> jsonifyGetExecutors(
1426  const process::Owned<ObjectApprovers>& approvers) const;
1427  std::function<void(JSON::ObjectWriter*)> jsonifyGetOperations(
1428  const process::Owned<ObjectApprovers>& approvers) const;
1429  std::function<void(JSON::ObjectWriter*)> jsonifyGetTasks(
1430  const process::Owned<ObjectApprovers>& approvers) const;
1431  std::function<void(JSON::ObjectWriter*)> jsonifyGetRoles(
1432  const process::Owned<ObjectApprovers>& approvers) const;
1433  std::function<void(JSON::ObjectWriter*)> jsonifySubscribe(
1434  const process::Owned<ObjectApprovers>& approvers) const;
1435 
1436  const Master* master;
1437  };
1438 
1439 private:
1440  // Inner class used to namespace HTTP route handlers (see
1441  // master/http.cpp for implementations).
1442  class Http
1443  {
1444  public:
1445  explicit Http(Master* _master) : master(_master),
1446  readonlyHandler(_master),
1447  quotaHandler(_master),
1448  weightsHandler(_master) {}
1449 
1450  // /api/v1
1452  const process::http::Request& request,
1454  principal) const;
1455 
1456  // /api/v1/scheduler
1458  const process::http::Request& request,
1460  principal) const;
1461 
1462  // /master/create-volumes
1464  const process::http::Request& request,
1466  principal) const;
1467 
1468  // /master/destroy-volumes
1470  const process::http::Request& request,
1472  principal) const;
1473 
1474  // /master/flags
1476  const process::http::Request& request,
1478  principal) const;
1479 
1480  // /master/frameworks
1481  //
1482  // NOTE: Requests to this endpoint are batched.
1484  const process::http::Request& request,
1486  principal) const;
1487 
1488  // /master/health
1490  const process::http::Request& request) const;
1491 
1492  // /master/redirect
1494  const process::http::Request& request) const;
1495 
1496  // /master/reserve
1498  const process::http::Request& request,
1500  principal) const;
1501 
1502  // /master/roles
1503  //
1504  // NOTE: Requests to this endpoint are batched.
1506  const process::http::Request& request,
1508  principal) const;
1509 
1510  // /master/teardown
1512  const process::http::Request& request,
1514  principal) const;
1515 
1516  // /master/slaves
1517  //
1518  // NOTE: Requests to this endpoint are batched.
1520  const process::http::Request& request,
1522  principal) const;
1523 
1524  // /master/state
1525  //
1526  // NOTE: Requests to this endpoint are batched.
1528  const process::http::Request& request,
1530  principal) const;
1531 
1532  // /master/state-summary
1533  //
1534  // NOTE: Requests to this endpoint are batched.
1536  const process::http::Request& request,
1538  principal) const;
1539 
1540  // /master/tasks
1541  //
1542  // NOTE: Requests to this endpoint are batched.
1544  const process::http::Request& request,
1546  principal) const;
1547 
1548  // /master/maintenance/schedule
1549  process::Future<process::http::Response> maintenanceSchedule(
1550  const process::http::Request& request,
1552  principal) const;
1553 
1554  // /master/maintenance/status
1555  process::Future<process::http::Response> maintenanceStatus(
1556  const process::http::Request& request,
1558  principal) const;
1559 
1560  // /master/machine/down
1562  const process::http::Request& request,
1564  principal) const;
1565 
1566  // /master/machine/up
1568  const process::http::Request& request,
1570  principal) const;
1571 
1572  // /master/unreserve
1574  const process::http::Request& request,
1576  principal) const;
1577 
1578  // /master/weights
1580  const process::http::Request& request,
1582  principal) const;
1583 
1584  // /master/quota (DEPRECATED).
1586  const process::http::Request& request,
1588  principal) const;
1589 
1590  static std::string API_HELP();
1591  static std::string SCHEDULER_HELP();
1592  static std::string FLAGS_HELP();
1593  static std::string FRAMEWORKS_HELP();
1594  static std::string HEALTH_HELP();
1595  static std::string REDIRECT_HELP();
1596  static std::string ROLES_HELP();
1597  static std::string TEARDOWN_HELP();
1598  static std::string SLAVES_HELP();
1599  static std::string STATE_HELP();
1600  static std::string STATESUMMARY_HELP();
1601  static std::string TASKS_HELP();
1602  static std::string MAINTENANCE_SCHEDULE_HELP();
1603  static std::string MAINTENANCE_STATUS_HELP();
1604  static std::string MACHINE_DOWN_HELP();
1605  static std::string MACHINE_UP_HELP();
1606  static std::string CREATE_VOLUMES_HELP();
1607  static std::string DESTROY_VOLUMES_HELP();
1608  static std::string RESERVE_HELP();
1609  static std::string UNRESERVE_HELP();
1610  static std::string QUOTA_HELP();
1611  static std::string WEIGHTS_HELP();
1612 
1613  private:
1614  JSON::Object __flags() const;
1615 
1616  class FlagsError; // Forward declaration.
1617 
1620  principal) const;
1621 
1623  const size_t limit,
1624  const size_t offset,
1625  const std::string& order,
1627  principal) const;
1628 
1630  const FrameworkID& id,
1632  principal) const;
1633 
1635  const FrameworkID& id) const;
1636 
1637  process::Future<process::http::Response> _updateMaintenanceSchedule(
1638  const mesos::maintenance::Schedule& schedule,
1640  principal) const;
1641 
1642  process::Future<process::http::Response> __updateMaintenanceSchedule(
1643  const mesos::maintenance::Schedule& schedule,
1644  const process::Owned<ObjectApprovers>& approvers) const;
1645 
1646  process::Future<process::http::Response> ___updateMaintenanceSchedule(
1647  const mesos::maintenance::Schedule& schedule,
1648  bool applied) const;
1649 
1650  mesos::maintenance::Schedule _getMaintenanceSchedule(
1651  const process::Owned<ObjectApprovers>& approvers) const;
1652 
1654  const process::Owned<ObjectApprovers>& approvers) const;
1655 
1656  process::Future<process::http::Response> _startMaintenance(
1657  const google::protobuf::RepeatedPtrField<MachineID>& machineIds,
1658  const process::Owned<ObjectApprovers>& approvers) const;
1659 
1661  const google::protobuf::RepeatedPtrField<MachineID>& machineIds,
1662  const process::Owned<ObjectApprovers>& approvers) const;
1663 
1665  const SlaveID& slaveId,
1666  const Option<DurationInfo>& maxGracePeriod,
1667  const bool markGone,
1668  const process::Owned<ObjectApprovers>& approvers) const;
1669 
1671  const SlaveID& slaveId,
1672  const process::Owned<ObjectApprovers>& approvers) const;
1673 
1675  const SlaveID& slaveId,
1676  const process::Owned<ObjectApprovers>& approvers) const;
1677 
1679  const SlaveID& slaveId,
1680  const google::protobuf::RepeatedPtrField<Resource>& source,
1681  const google::protobuf::RepeatedPtrField<Resource>& resources,
1683  principal) const;
1684 
1686  const SlaveID& slaveId,
1687  const google::protobuf::RepeatedPtrField<Resource>& resources,
1689  principal) const;
1690 
1692  const SlaveID& slaveId,
1693  const google::protobuf::RepeatedPtrField<Resource>& volumes,
1695  principal) const;
1696 
1698  const SlaveID& slaveId,
1699  const google::protobuf::RepeatedPtrField<Resource>& volumes,
1701  principal) const;
1702 
1720  const SlaveID& slaveId,
1721  const Offer::Operation& operation) const;
1722 
1723  // Master API handlers.
1724 
1726  const mesos::master::Call& call,
1728  ContentType contentType) const;
1729 
1731  const mesos::master::Call& call,
1733  ContentType contentType) const;
1734 
1736  const mesos::master::Call& call,
1738  ContentType contentType) const;
1739 
1741  const mesos::master::Call& call,
1743  ContentType contentType) const;
1744 
1746  const mesos::master::Call& call,
1748  ContentType contentType) const;
1749 
1751  const mesos::master::Call& call,
1753  ContentType contentType) const;
1754 
1756  const mesos::master::Call& call,
1758  ContentType contentType) const;
1759 
1761  const mesos::master::Call& call,
1763  ContentType contentType) const;
1764 
1766  const mesos::master::Call& call,
1768  ContentType contentType) const;
1769 
1771  const mesos::master::Call& call,
1773  ContentType contentType) const;
1774 
1775  process::Future<process::http::Response> updateMaintenanceSchedule(
1776  const mesos::master::Call& call,
1778  ContentType contentType) const;
1779 
1780  process::Future<process::http::Response> getMaintenanceSchedule(
1781  const mesos::master::Call& call,
1783  ContentType contentType) const;
1784 
1785  process::Future<process::http::Response> getMaintenanceStatus(
1786  const mesos::master::Call& call,
1788  ContentType contentType) const;
1789 
1791  const mesos::master::Call& call,
1793  ContentType contentType) const;
1794 
1796  const mesos::master::Call& call,
1798  ContentType contentType) const;
1799 
1801  const mesos::master::Call& call,
1803  ContentType contentType) const;
1804 
1806  const mesos::master::Call& call,
1808  ContentType contentType) const;
1809 
1811  const mesos::master::Call& call,
1813  ContentType contentType) const;
1814 
1816  const mesos::master::Call& call,
1818  ContentType contentType) const;
1819 
1821  const mesos::master::Call& call,
1823  ContentType contentType) const;
1824 
1826  const mesos::master::Call& call,
1828  ContentType contentType) const;
1829 
1831  const mesos::master::Call& call,
1833  ContentType contentType) const;
1834 
1836  const mesos::master::Call& call,
1838  ContentType contentType) const;
1839 
1841  const mesos::master::Call& call,
1843  ContentType contentType) const;
1844 
1846  const mesos::master::Call& call,
1848  ContentType contentType) const;
1849 
1850  process::Future<process::http::Response> unreserveResources(
1851  const mesos::master::Call& call,
1853  ContentType contentType) const;
1854 
1856  const mesos::master::Call& call,
1858  ContentType contentType) const;
1859 
1861  const mesos::master::Call& call,
1863  ContentType contentType) const;
1864 
1866  const mesos::master::Call& call,
1868  ContentType contentType) const;
1869 
1870  static std::function<void(JSON::ObjectWriter*)> jsonifySubscribe(
1871  const Master* master,
1872  const process::Owned<ObjectApprovers>& approvers);
1873  std::string serializeSubscribe(
1874  const process::Owned<ObjectApprovers>& approvers) const;
1876  const mesos::master::Call& call,
1878  ContentType contentType) const;
1879 
1881  const mesos::master::Call& call,
1883  ContentType contentType) const;
1884 
1886  const mesos::master::Call& call,
1888  ContentType contentType) const;
1889 
1891  const mesos::master::Call& call,
1893  ContentType contentType) const;
1894 
1896  const SlaveID& slaveId) const;
1897 
1898  process::Future<process::http::Response> reconcileOperations(
1899  Framework* framework,
1900  const mesos::scheduler::Call::ReconcileOperations& call,
1901  ContentType contentType) const;
1902 
1903  Master* master;
1904 
1905  ReadOnlyHandler readonlyHandler;
1906 
1907  // NOTE: The quota specific pieces of the Operator API are factored
1908  // out into this separate class.
1909  QuotaHandler quotaHandler;
1910 
1911  // NOTE: The weights specific pieces of the Operator API are factored
1912  // out into this separate class.
1913  WeightsHandler weightsHandler;
1914 
1915  // Since the Master actor is one of the most loaded in a typical Mesos
1916  // installation, we take some extra care to keep the backlog small.
1917  // In particular, all read-only requests are batched and executed in
1918  // parallel, instead of going through the master queue separately.
1919  // The post-processing step, that depends on the handler, will be
1920  // executed synchronously and serially after the parallel executions
1921  // complete.
1922 
1923  typedef std::pair<
1926  (Master::ReadOnlyHandler::*ReadOnlyRequestHandler)(
1927  ContentType,
1929  const process::Owned<ObjectApprovers>&) const;
1930 
1931  process::Future<process::http::Response> deferBatchedRequest(
1932  ReadOnlyRequestHandler handler,
1934  ContentType outputContentType,
1935  const hashmap<std::string, std::string>& queryParameters,
1936  const process::Owned<ObjectApprovers>& approvers) const;
1937 
1938  void processRequestsBatch() const;
1939 
1940  struct BatchedRequest
1941  {
1942  ReadOnlyRequestHandler handler;
1943  ContentType outputContentType;
1944  hashmap<std::string, std::string> queryParameters;
1948  };
1949 
1950  mutable std::vector<BatchedRequest> batchedRequests;
1951  };
1952 
1953  Master(const Master&); // No copying.
1954  Master& operator=(const Master&); // No assigning.
1955 
1956  friend struct Framework;
1957  friend struct FrameworkMetrics;
1958  friend struct Metrics;
1959  friend struct Role;
1960  friend struct Slave;
1961  friend struct SlavesWriter;
1962  friend struct Subscriber;
1963 
1964  // NOTE: Since 'getOffer', 'getInverseOffer' and 'slaves' are
1965  // protected, we need to make the following functions friends.
1966  friend Offer* validation::offer::getOffer(
1967  Master* master, const OfferID& offerId);
1968 
1969  friend InverseOffer* validation::offer::getInverseOffer(
1970  Master* master, const OfferID& offerId);
1971 
1973  Master* master, const SlaveID& slaveId);
1974 
1975  const Flags flags;
1976 
1977  Http http;
1978 
1979  Option<MasterInfo> leader; // Current leading master.
1980 
1981  mesos::allocator::Allocator* allocator;
1982  WhitelistWatcher* whitelistWatcher;
1983  Registrar* registrar;
1984  Files* files;
1985 
1988 
1989  const Option<Authorizer*> authorizer;
1990 
1991  MasterInfo info_;
1992 
1993  // Holds some info which affects how a machine behaves, as well as state that
1994  // represent the master's view of this machine. See the `MachineInfo` protobuf
1995  // and `Machine` struct for more information.
1997 
1998  struct Maintenance
1999  {
2000  // Holds the maintenance schedule, as given by the operator.
2001  std::list<mesos::maintenance::Schedule> schedules;
2002  } maintenance;
2003 
2004  // Indicates when recovery is complete. Recovery begins once the
2005  // master is elected as a leader.
2007 
2008  // If this is the leading master, we periodically check whether we
2009  // should GC some information from the registry.
2010  Option<process::Timer> registryGcTimer;
2011 
2012  struct Slaves
2013  {
2014  Slaves() : removed(MAX_REMOVED_SLAVES) {}
2015 
2016  // Imposes a time limit for slaves that we recover from the
2017  // registry to reregister with the master.
2018  Option<process::Timer> recoveredTimer;
2019 
2020  // Slaves that have been recovered from the registrar after master
2021  // failover. Slaves are removed from this collection when they
2022  // either reregister with the master or are marked unreachable
2023  // because they do not reregister before `recoveredTimer` fires.
2024  // We must not answer questions related to these slaves (e.g.,
2025  // during task reconciliation) until we determine their fate
2026  // because their are in this transitioning state.
2027  hashmap<SlaveID, SlaveInfo> recovered;
2028 
2029  // Agents that are in the process of (re-)registering. They are
2030  // maintained here while the (re-)registration is in progress and
2031  // possibly pending in the authorizer or the registrar in order
2032  // to help deduplicate (re-)registration requests.
2033  hashset<process::UPID> registering;
2034  hashset<SlaveID> reregistering;
2035 
2036  // Registered slaves are indexed by SlaveID and UPID. Note that
2037  // iteration is supported but is exposed as iteration over a
2038  // hashmap<SlaveID, Slave*> since it is tedious to convert
2039  // the map's key/value iterator into a value iterator.
2040  //
2041  // TODO(bmahler): Consider pulling in boost's multi_index,
2042  // or creating a simpler indexing abstraction in stout.
2043  struct
2044  {
2045  bool contains(const SlaveID& slaveId) const
2046  {
2047  return ids.contains(slaveId);
2048  }
2049 
2050  bool contains(const process::UPID& pid) const
2051  {
2052  return pids.contains(pid);
2053  }
2054 
2055  Slave* get(const SlaveID& slaveId) const
2056  {
2057  return ids.get(slaveId).getOrElse(nullptr);
2058  }
2059 
2060  Slave* get(const process::UPID& pid) const
2061  {
2062  return pids.get(pid).getOrElse(nullptr);
2063  }
2064 
2065  void put(Slave* slave)
2066  {
2067  CHECK_NOTNULL(slave);
2068  ids[slave->id] = slave;
2069  pids[slave->pid] = slave;
2070  }
2071 
2072  void remove(Slave* slave)
2073  {
2074  CHECK_NOTNULL(slave);
2075  ids.erase(slave->id);
2076  pids.erase(slave->pid);
2077  }
2078 
2079  void clear()
2080  {
2081  ids.clear();
2082  pids.clear();
2083  }
2084 
2085  size_t size() const { return ids.size(); }
2086 
2087  typedef hashmap<SlaveID, Slave*>::iterator iterator;
2088  typedef hashmap<SlaveID, Slave*>::const_iterator const_iterator;
2089 
2090  iterator begin() { return ids.begin(); }
2091  iterator end() { return ids.end(); }
2092 
2093  const_iterator begin() const { return ids.begin(); }
2094  const_iterator end() const { return ids.end(); }
2095 
2096  private:
2099  } registered;
2100 
2101  // Slaves that are in the process of being removed from the
2102  // registrar.
2103  hashset<SlaveID> removing;
2104 
2105  // Slaves that are in the process of being marked unreachable.
2106  hashset<SlaveID> markingUnreachable;
2107 
2108  // Slaves that are in the process of being marked gone.
2109  hashset<SlaveID> markingGone;
2110 
2111  // Agents which have been marked for draining, including recovered,
2112  // admitted, and unreachable agents. All draining agents will also
2113  // be deactivated. If an agent in this set reregisters, the master
2114  // will send it a `DrainSlaveMessage`.
2115  //
2116  // These values are checkpointed to the registry.
2117  hashmap<SlaveID, DrainInfo> draining;
2118 
2119  // Agents which have been deactivated, including recovered, admitted,
2120  // and unreachable agents. Agents in this set will not have resource
2121  // offers generated and will thus be unable to launch new operations,
2122  // but existing operations will be unaffected.
2123  //
2124  // These values are checkpointed to the registry.
2125  hashset<SlaveID> deactivated;
2126 
2127  // This collection includes agents that have gracefully shutdown,
2128  // as well as those that have been marked unreachable or gone. We
2129  // keep a cache here to prevent this from growing in an unbounded
2130  // manner.
2131  //
2132  // TODO(bmahler): Ideally we could use a cache with set semantics.
2133  //
2134  // TODO(neilc): Consider storing all agent IDs that have been
2135  // marked unreachable by this master.
2137 
2138  // Slaves that have been marked unreachable. We recover this from
2139  // the registry, so it includes slaves marked as unreachable by
2140  // other instances of the master. Note that we use a LinkedHashMap
2141  // to ensure the order of elements here matches the order in the
2142  // registry's unreachable list, which matches the order in which
2143  // agents are marked unreachable. This list is garbage collected;
2144  // GC behavior is governed by the `registry_gc_interval`,
2145  // `registry_max_agent_age`, and `registry_max_agent_count` flags.
2147 
2148  // This helps us look up all unreachable tasks on an agent so we can remove
2149  // them from their primary storage `framework.unreachableTasks` when an
2150  // agent reregisters. This map is bounded by the same GC behavior as
2151  // `unreachable`. When the agent is GC'd from unreachable it's also
2152  // erased from `unreachableTasks`.
2154  unreachableTasks;
2155 
2156  // Slaves that have been marked gone. We recover this from the
2157  // registry, so it includes slaves marked as gone by other instances
2158  // of the master. Note that we use a LinkedHashMap to ensure the order
2159  // of elements here matches the order in the registry's gone list, which
2160  // matches the order in which agents are marked gone.
2162 
2163  // This rate limiter is used to limit the removal of slaves failing
2164  // health checks.
2165  // NOTE: Using a 'shared_ptr' here is OK because 'RateLimiter' is
2166  // a wrapper around libprocess process which is thread safe.
2168  } slaves;
2169 
2170  struct Frameworks
2171  {
2172  Frameworks(const Flags& masterFlags)
2173  : completed(masterFlags.max_completed_frameworks) {}
2174 
2176 
2178 
2179  // Principals of frameworks keyed by PID.
2180  // NOTE: Multiple PIDs can map to the same principal. The
2181  // principal is None when the framework doesn't specify it.
2182  // The differences between this map and 'authenticated' are:
2183  // 1) This map only includes *registered* frameworks. The mapping
2184  // is added when a framework (re-)registers.
2185  // 2) This map includes unauthenticated frameworks (when Master
2186  // allows them) if they have principals specified in
2187  // FrameworkInfo.
2189 
2190  // BoundedRateLimiters keyed by the framework principal.
2191  // Like Metrics::Frameworks, all frameworks of the same principal
2192  // are throttled together at a common rate limit.
2194 
2195  // The default limiter is for frameworks not specified in
2196  // 'flags.rate_limits'.
2198  } frameworks;
2199 
2200  struct Subscribers
2201  {
2202  Subscribers(Master* _master, size_t maxSubscribers)
2203  : master(_master),
2204  subscribed(maxSubscribers) {};
2205 
2206  // Represents a client subscribed to the 'api/vX' endpoint.
2207  //
2208  // TODO(anand): Add support for filtering. Some subscribers
2209  // might only be interested in a subset of events.
2210  struct Subscriber
2211  {
2214  const process::Owned<ObjectApprovers>& _approvers)
2215  : http(_http),
2216  heartbeater(
2217  "subscriber " + stringify(http.streamId),
2218  []() {
2219  mesos::master::Event event;
2220  event.set_type(mesos::master::Event::HEARTBEAT);
2221  return event;
2222  }(),
2223  http,
2226  approvers(_approvers) {}
2227 
2228 
2229  // Not copyable, not assignable.
2230  Subscriber(const Subscriber&) = delete;
2231  Subscriber& operator=(const Subscriber&) = delete;
2232 
2233  // TODO(greggomann): Refactor this function into multiple event-specific
2234  // overloads. See MESOS-8475.
2235  void send(
2236  const mesos::master::Event& event,
2237  const Option<FrameworkInfo>& frameworkInfo,
2238  const Option<Task>& task);
2239 
2241  {
2242  // TODO(anand): Refactor `HttpConnection` to being a RAII class instead.
2243  // It is possible that a caller might accidentally invoke `close()`
2244  // after passing ownership to the `Subscriber` object. See MESOS-5843
2245  // for more details.
2246  http.close();
2247  }
2248 
2252  };
2253 
2254  // Sends the event to all subscribers connected to the 'api/vX' endpoint.
2255  void send(
2256  const mesos::master::Event& event,
2257  const Option<FrameworkInfo>& frameworkInfo = None(),
2258  const Option<Task>& task = None());
2259 
2260  Master* master;
2261 
2262  // Active subscribers to the 'api/vX' endpoint keyed by the stream
2263  // identifier.
2265  };
2266 
2267  Subscribers subscribers;
2268 
2269  hashmap<OfferID, Offer*> offers;
2271 
2272  hashmap<OfferID, InverseOffer*> inverseOffers;
2273  hashmap<OfferID, process::Timer> inverseOfferTimers;
2274 
2275  // We track information about roles that we're aware of in the system.
2276  // Specifically, we keep track of the roles when a framework subscribes to
2277  // the role, and/or when there are resources allocated to the role
2278  // (e.g. some tasks and/or executors are consuming resources under the role).
2280 
2281  // Configured role whitelist if using the (deprecated) "explicit
2282  // roles" feature. If this is `None`, any role is allowed.
2283  Option<hashset<std::string>> roleWhitelist;
2284 
2285  // Configured weight for each role, if any. If a role does not
2286  // appear here, it has the default weight of 1.
2288 
2289  // Configured quota for each role, if any. We store quotas by role
2290  // because we set them at the role level.
2292 
2293  // Authenticator names as supplied via flags.
2294  std::vector<std::string> authenticatorNames;
2295 
2296  Option<Authenticator*> authenticator;
2297 
2298  // Frameworks/slaves that are currently in the process of authentication.
2299  // 'authenticating' future is completed when authenticator
2300  // completes authentication.
2301  // The future is removed from the map when master completes authentication.
2303 
2304  // Principals of authenticated frameworks/slaves keyed by PID.
2306 
2307  int64_t nextFrameworkId; // Used to give each framework a unique ID.
2308  int64_t nextOfferId; // Used to give each slot offer a unique ID.
2309  int64_t nextSlaveId; // Used to give each slave a unique ID.
2310 
2311  // NOTE: It is safe to use a 'shared_ptr' because 'Metrics' is
2312  // thread safe.
2313  // TODO(dhamon): This does not need to be a shared_ptr. Metrics contains
2314  // copyable metric types only.
2315  std::shared_ptr<Metrics> metrics;
2316 
2317  // PullGauge handlers.
2318  double _uptime_secs()
2319  {
2320  return (process::Clock::now() - startTime).secs();
2321  }
2322 
2323  double _elected()
2324  {
2325  return elected() ? 1 : 0;
2326  }
2327 
2328  double _slaves_connected();
2329  double _slaves_disconnected();
2330  double _slaves_active();
2331  double _slaves_inactive();
2332  double _slaves_unreachable();
2333 
2334  // TODO(bevers): Remove these and make the above functions
2335  // const instead after MESOS-4995 is resolved.
2336  double _const_slaves_connected() const;
2337  double _const_slaves_disconnected() const;
2338  double _const_slaves_active() const;
2339  double _const_slaves_inactive() const;
2340  double _const_slaves_unreachable() const;
2341 
2342  double _frameworks_connected();
2343  double _frameworks_disconnected();
2344  double _frameworks_active();
2345  double _frameworks_inactive();
2346 
2347  double _outstanding_offers()
2348  {
2349  return static_cast<double>(offers.size());
2350  }
2351 
2352  double _event_queue_messages()
2353  {
2354  return static_cast<double>(eventCount<process::MessageEvent>());
2355  }
2356 
2357  double _event_queue_dispatches()
2358  {
2359  return static_cast<double>(eventCount<process::DispatchEvent>());
2360  }
2361 
2362  double _event_queue_http_requests()
2363  {
2364  return static_cast<double>(eventCount<process::HttpEvent>());
2365  }
2366 
2367  double _tasks_staging();
2368  double _tasks_starting();
2369  double _tasks_running();
2370  double _tasks_unreachable();
2371  double _tasks_killing();
2372 
2373  double _resources_total(const std::string& name);
2374  double _resources_used(const std::string& name);
2375  double _resources_percent(const std::string& name);
2376 
2377  double _resources_revocable_total(const std::string& name);
2378  double _resources_revocable_used(const std::string& name);
2379  double _resources_revocable_percent(const std::string& name);
2380 
2381  process::Time startTime; // Start time used to calculate uptime.
2382 
2383  Option<process::Time> electedTime; // Time when this master is elected.
2384 
2386  offerConstraintsFilterOptions;
2387 };
2388 
2389 
2390 inline std::ostream& operator<<(
2391  std::ostream& stream,
2392  const Framework& framework);
2393 
2394 
2395 // TODO(bmahler): Keeping the task and executor information in sync
2396 // across the Slave and Framework structs is error prone!
2398 {
2399  enum State
2400  {
2401  // Framework has never connected to this master. This implies the
2402  // master failed over and the framework has not yet reregistered,
2403  // but some framework state has been recovered from reregistering
2404  // agents that are running tasks for the framework.
2406 
2407  // The framework is connected. The framework may or may not be eligible to
2408  // receive offers; this property is tracked separately.
2410 
2411  // Framework was previously connected to this master,
2412  // but is not connected now.
2413  DISCONNECTED
2414  };
2415 
2416  Framework(
2417  Master* const master,
2418  const Flags& masterFlags,
2419  const FrameworkInfo& info,
2420  ::mesos::scheduler::OfferConstraints&& offerConstraints,
2421  const process::UPID& _pid,
2422  const process::Owned<ObjectApprovers>& objectApprovers,
2424 
2425  Framework(Master* const master,
2426  const Flags& masterFlags,
2427  const FrameworkInfo& info,
2428  ::mesos::scheduler::OfferConstraints&& offerConstraints,
2430  const process::Owned<ObjectApprovers>& objectApprovers,
2432 
2433  Framework(Master* const master,
2434  const Flags& masterFlags,
2435  const FrameworkInfo& info);
2436 
2437  ~Framework();
2438 
2439  Task* getTask(const TaskID& taskId);
2440 
2441  void addTask(Task* task);
2442 
2443  // Update framework to recover the resources that were previously
2444  // being used by `task`.
2445  //
2446  // TODO(bmahler): This is a hack for performance. We need to
2447  // maintain resource counters because computing task resources
2448  // functionally for all tasks is expensive, for now.
2449  void recoverResources(Task* task);
2450 
2451  // Sends a message to the connected framework.
2452  template <typename Message>
2453  void send(const Message& message);
2454 
2455  void addCompletedTask(Task&& task);
2456 
2457  void addUnreachableTask(const Task& task);
2458 
2459  // Removes the task. `unreachable` indicates whether the task is removed due
2460  // to being unreachable. Note that we cannot rely on the task state because
2461  // it may not reflect unreachability due to being set to TASK_LOST for
2462  // backwards compatibility.
2463  void removeTask(Task* task, bool unreachable);
2464 
2465  void addOffer(Offer* offer);
2466 
2467  void removeOffer(Offer* offer);
2468 
2469  void addInverseOffer(InverseOffer* inverseOffer);
2470 
2471  void removeInverseOffer(InverseOffer* inverseOffer);
2472 
2473  bool hasExecutor(const SlaveID& slaveId,
2474  const ExecutorID& executorId);
2475 
2476  void addExecutor(const SlaveID& slaveId,
2477  const ExecutorInfo& executorInfo);
2478 
2479  void removeExecutor(const SlaveID& slaveId,
2480  const ExecutorID& executorId);
2481 
2482  void addOperation(Operation* operation);
2483 
2484  Option<Operation*> getOperation(const OperationID& id);
2485 
2486  void recoverResources(Operation* operation);
2487 
2488  void removeOperation(Operation* operation);
2489 
2490  const FrameworkID id() const;
2491 
2492  // Update fields in 'info' using those in 'newInfo'. Currently this
2493  // only updates `role`/`roles`, 'name', 'failover_timeout', 'hostname',
2494  // 'webui_url', 'capabilities', and 'labels'.
2495  void update(
2496  const FrameworkInfo& newInfo,
2497  ::mesos::scheduler::OfferConstraints&& offerConstraints);
2498 
2499  // Reactivate framework with new connection: update connection-related state
2500  // and mark the framework as CONNECTED, regardless of the previous state.
2501  void updateConnection(
2502  const process::UPID& newPid,
2503  const process::Owned<ObjectApprovers>& objectApprovers);
2504 
2505  void updateConnection(
2507  const process::Owned<ObjectApprovers>& objectApprovers);
2508 
2509  // If the framework is CONNECTED, clear all state associated with
2510  // the scheduler being connected (close http connection, stop heartbeater,
2511  // clear object approvers, etc.), mark the framework DISCONNECTED and return
2512  // `true`. Otherwise, return `false`.
2513  bool disconnect();
2514 
2515  // Mark the framework as active (eligible to receive offers if connected)
2516  // or inactive. Returns true if this property changed, false otherwise.
2517  bool activate();
2518  bool deactivate();
2519 
2520  void heartbeat();
2521 
2522  bool active() const { return active_; }
2523 
2524  bool connected() const {return state == State::CONNECTED;}
2525  bool recovered() const {return state == State::RECOVERED;}
2526 
2527  bool isTrackedUnderRole(const std::string& role) const;
2528  void trackUnderRole(const std::string& role);
2529  void untrackUnderRole(const std::string& role);
2530 
2532  {
2533  return http_;
2534  }
2535 
2536  const Option<process::UPID>& pid() const { return pid_; }
2537 
2538  // Returns ObjectApprovers for all actions
2539  // needed to authorize scheduler API calls.
2540  static process::Future<process::Owned<ObjectApprovers>> createObjectApprovers(
2541  const Option<Authorizer*>& _authorizer,
2542  const FrameworkInfo& frameworkInfo);
2543 
2544  // Returns whether the framework principal is authorized to perform
2545  // action on object.
2546  Try<bool> approved(const authorization::ActionObject& actionObject) const;
2547 
2548  const ::mesos::scheduler::OfferConstraints& offerConstraints() const
2549  {
2550  return offerConstraints_;
2551  }
2552 
2553  Master* const master;
2554 
2555  FrameworkInfo info;
2556 
2557  std::set<std::string> roles;
2558 
2560 
2564 
2565  // TODO(bmahler): Make this private to enforce that `addTask()` and
2566  // `removeTask()` are used, and provide a const view into the tasks.
2568 
2569  // Tasks launched by this framework that have reached a terminal
2570  // state and have had all their updates acknowledged. We only keep a
2571  // fixed-size cache to avoid consuming too much memory. We use
2572  // circular_buffer rather than BoundedHashMap because there
2573  // can be multiple completed tasks with the same task ID.
2574  circular_buffer<process::Owned<Task>> completedTasks;
2575 
2576  // When an agent is marked unreachable, tasks running on it are stored
2577  // here. We only keep a fixed-size cache to avoid consuming too much memory.
2578  // NOTE: Non-partition-aware unreachable tasks in this map are marked
2579  // TASK_LOST instead of TASK_UNREACHABLE for backward compatibility.
2581 
2582  hashset<Offer*> offers; // Active offers for framework.
2583 
2584  hashset<InverseOffer*> inverseOffers; // Active inverse offers for framework.
2585 
2586  // TODO(bmahler): Make this private to enforce that `addExecutor()`
2587  // and `removeExecutor()` are used, and provide a const view into
2588  // the executors.
2590 
2591  // Pending operations or terminal operations that have
2592  // unacknowledged status updates.
2594 
2595  // The map from the framework-specified operation ID to the
2596  // corresponding internal operation UUID.
2598 
2599  // NOTE: For the used and offered resources below, we keep the
2600  // total as well as partitioned by SlaveID.
2601  // We expose the total resources via the HTTP endpoint, and we
2602  // keep a running total of the resources because looping over the
2603  // slaves to sum the resources has led to perf issues (MESOS-1862).
2604  // We keep the resources partitioned by SlaveID because non-scalar
2605  // resources can be lost when summing them up across multiple
2606  // slaves (MESOS-2373).
2607  //
2608  // Also note that keeping the totals is safe even though it yields
2609  // incorrect results for non-scalar resources.
2610  // (1) For overlapping set items / ranges across slaves, these
2611  // will get added N times but only represented once.
2612  // (2) When an initial subtraction occurs (N-1), the resource is
2613  // no longer represented. (This is the source of the bug).
2614  // (3) When any further subtractions occur (N-(1+M)), the
2615  // Resources simply ignores the subtraction since there's
2616  // nothing to remove, so this is safe for now.
2617 
2618  // TODO(mpark): Strip the non-scalar resources out of the totals
2619  // in order to avoid reporting incorrect statistics (MESOS-2623).
2620 
2621  // Active task / executor / operation resources.
2623 
2624  // Note that we maintain multiple copies of each shared resource in
2625  // `usedResources` as they are used by multiple tasks.
2627 
2628  // Offered resources.
2631 
2632  // This is used for per-framework metrics.
2634 
2635 private:
2636  Framework(Master* const _master,
2637  const Flags& masterFlags,
2638  const FrameworkInfo& _info,
2639  ::mesos::scheduler::OfferConstraints&& offerConstraints,
2640  State state,
2641  bool active,
2642  const process::Owned<ObjectApprovers>& objectApprovers,
2643  const process::Time& time);
2644 
2645  Framework(const Framework&); // No copying.
2646  Framework& operator=(const Framework&); // No assigning.
2647 
2648  // Indicates whether this framework should be receiving offers
2649  // when it is connected.
2650  bool active_;
2651 
2652  // NOTE: `state` should never modified by means other than `setState()`.
2653  //
2654  // TODO(asekretenko): Encapsulate `state` to ensure that `metrics.subscribed`
2655  // is updated together with any `state` change.
2656  State state;
2657 
2658  void setState(State state_);
2659 
2660  // Frameworks can either be connected via HTTP or by message passing
2661  // (scheduler driver). At most one of `http` and `pid` will be set
2662  // according to the last connection made by the framework; neither
2663  // field will be set if the framework is in state `RECOVERED`.
2665  Option<process::UPID> pid_;
2666 
2667  // This is only set for HTTP frameworks.
2669  heartbeater;
2670 
2671  // ObjectApprovers for the framework's principal.
2672  process::Owned<ObjectApprovers> objectApprovers;
2673 
2674  // The last offer constraints with which the framework has been subscribed.
2675  ::mesos::scheduler::OfferConstraints offerConstraints_;
2676 };
2677 
2678 
2679 // Sends a message to the connected framework.
2680 template <typename Message>
2681 void Framework::send(const Message& message)
2682 {
2683  metrics.incrementEvent(message);
2684 
2685  if (!connected()) {
2686  LOG(WARNING) << "Master attempting to send message to disconnected"
2687  << " framework " << *this;
2688 
2689  // NOTE: We proceed here without returning to support the case where a
2690  // "disconnected" framework is still talking to the master and the master
2691  // wants to shut it down by sending a `FrameworkErrorMessage`. This can
2692  // occur in a one-way network partition where the master -> framework link
2693  // is broken but the framework -> master link remains intact. Note that we
2694  // have no periodic heartbeats between the master and pid-based schedulers.
2695  //
2696  // TODO(chhsiao): Update the `FrameworkErrorMessage` call-sites that rely on
2697  // the lack of a `return` here to directly call `process::send` so that this
2698  // function doesn't need to deal with the special case. Then we can check
2699  // that one of `http` or `pid` is set if the framework is connected.
2700  }
2701 
2702  if (http_.isSome()) {
2703  if (!http_->send(message)) {
2704  LOG(WARNING) << "Unable to send message to framework " << *this << ":"
2705  << " connection closed";
2706  }
2707  } else if (pid().isSome()) {
2708  master->send(pid().get(), message);
2709  } else {
2710  LOG(WARNING) << "Unable to send message to framework " << *this << ":"
2711  << " framework is recovered but has not reregistered";
2712  }
2713 }
2714 
2715 
2716 // TODO(bevers): Check if there is anything preventing us from
2717 // returning a const reference here.
2718 inline const FrameworkID Framework::id() const
2719 {
2720  return info.id();
2721 }
2722 
2723 
2724 
2725 inline std::ostream& operator<<(
2726  std::ostream& stream,
2727  const Framework& framework)
2728 {
2729  // TODO(vinod): Also log the hostname once FrameworkInfo is properly
2730  // updated on framework failover (MESOS-1784).
2731  stream << framework.id() << " (" << framework.info.name() << ")";
2732 
2733  if (framework.pid().isSome()) {
2734  stream << " at " << framework.pid().get();
2735  }
2736 
2737  return stream;
2738 }
2739 
2740 
2741 // Information about an active role.
2742 struct Role
2743 {
2744  Role() = delete;
2745 
2746  Role(const Master* _master,
2747  const std::string& _role)
2748  : master(_master), role(_role) {}
2749 
2750  void addFramework(Framework* framework)
2751  {
2752  frameworks[framework->id()] = framework;
2753  }
2754 
2755  void removeFramework(Framework* framework)
2756  {
2757  frameworks.erase(framework->id());
2758  }
2759 
2760  const Master* master;
2761  const std::string role;
2762 
2763  // NOTE: The dynamic role/quota relation is stored in and administrated
2764  // by the master. There is no direct representation of quota information
2765  // here to avoid duplication and to support that an operator can associate
2766  // quota with a role before the role is created. Such ordering of operator
2767  // requests prevents a race of premature unbounded allocation that setting
2768  // quota first is intended to contain.
2769 
2771 };
2772 
2773 
2774 mesos::master::Response::GetFrameworks::Framework model(
2775  const Framework& framework);
2776 
2777 
2778 } // namespace master {
2779 } // namespace internal {
2780 } // namespace mesos {
2781 
2782 #endif // __MASTER_HPP__
Protocol< RecoverRequest, RecoverResponse > recover
Definition: path.hpp:29
ReadOnlyHandler(const Master *_master)
Definition: master.hpp:1315
hashmap< ResourceProviderID, ResourceProvider > resourceProviders
Definition: master.hpp:340
Per-framework allocator-specific options that are not part of FrameworkInfo.
Definition: allocator.hpp:147
Definition: nothing.hpp:16
hashmap< TaskID, Task * > tasks
Definition: master.hpp:2567
Definition: master.hpp:2742
Master *const master
Definition: master.hpp:2553
ContentType
Definition: http.hpp:43
hashmap< UUID, Operation * > operations
Definition: master.hpp:255
std::ostream & operator<<(std::ostream &stream, const Future< T > &future)
Definition: future.hpp:1826
Try< bool > update(const std::string &link, const Handle &parent, uint16_t protocol, const action::Mirror &mirror)
void finalize(bool finalize_wsa=false)
Clean up the library.
Try< Bytes > size(const std::string &path, const FollowSymlink follow=FollowSymlink::FOLLOW_SYMLINK)
Definition: stat.hpp:130
Future< Response > request(const Request &request, bool streamedResponse=false)
Asynchronously sends an HTTP request to the process and returns the HTTP response once the entire res...
SlaveInfo info
Definition: master.hpp:201
bool connected() const
Definition: master.hpp:2524
Definition: master.hpp:27
process::Owned< ObjectApprovers > approvers
Definition: master.hpp:1306
Definition: check.hpp:33
const SlaveID id
Definition: master.hpp:200
hashset< Offer * > offers
Definition: master.hpp:271
Option< process::Timer > reregistrationTimer
Definition: master.hpp:230
bool connected
Definition: master.hpp:217
const Option< StreamingHttpConnection< v1::scheduler::Event > > & http() const
Definition: master.hpp:2531
Definition: resource_quantities.hpp:63
bool initialize(const Option< std::string > &delegate=None(), const Option< std::string > &readwriteAuthenticationRealm=None(), const Option< std::string > &readonlyAuthenticationRealm=None())
Initialize the library.
Resources totalResources
Definition: master.hpp:297
constexpr Duration DEFAULT_HEARTBEAT_INTERVAL
Definition: constants.hpp:61
Definition: protobuf_utils.hpp:332
hashmap< SlaveID, Resources > offeredResources
Definition: master.hpp:2630
protobuf::framework::Capabilities capabilities
Definition: master.hpp:2559
const Option< process::UPID > & pid() const
Definition: master.hpp:2536
std::set< std::string > getRoles(const FrameworkInfo &frameworkInfo)
Result< ProcessStatus > status(pid_t pid)
Definition: proc.hpp:166
InverseOffer * getInverseOffer(Master *master, const OfferID &offerId)
Try< Nothing > machines(const google::protobuf::RepeatedPtrField< MachineID > &ids)
Performs the following checks on a list of machines:
mesos::v1::scheduler::Call Call
Definition: mesos.hpp:2851
Definition: resources.hpp:83
Resources totalUsedResources
Definition: master.hpp:2622
Slave * getSlave(Master *master, const SlaveID &slaveId)
Definition: authorization.hpp:36
Definition: flags.hpp:42
Definition: registrar.hpp:88
void addFramework(Framework *framework)
Definition: master.hpp:2750
Option< Error > reregisterSlave(const ReregisterSlaveMessage &message)
Definition: files.hpp:73
Operation
Definition: cgroups.hpp:444
Future< Nothing > redirect(int_fd from, Option< int_fd > to, size_t chunk=4096, const std::vector< lambda::function< void(const std::string &)>> &hooks={})
Redirect output from the &#39;from&#39; file descriptor to the &#39;to&#39; file descriptor (or /dev/null if &#39;to&#39; is ...
Option< process::Time > estimatedDrainStartTime
Definition: master.hpp:316
hashmap< UUID, Operation * > operations
Definition: master.hpp:337
void send(const process::UPID &to, const google::protobuf::Message &message)
Definition: protobuf.hpp:126
bool isSome() const
Definition: option.hpp:116
Definition: event.hpp:209
Definition: http.hpp:533
Definition: json.hpp:158
hashmap< FrameworkID, hashmap< TaskID, Task * > > tasks
Definition: master.hpp:247
multihashmap< FrameworkID, TaskID > killedTasks
Definition: master.hpp:251
mesos::v1::scheduler::Event Event
Definition: mesos.hpp:2852
Definition: hashmap.hpp:38
Option< UUID > resourceVersion
Definition: master.hpp:310
FrameworkMetrics metrics
Definition: master.hpp:2633
bool active() const
Definition: master.hpp:2522
Resources checkpointedResources
Definition: master.hpp:290
SlaveObserver * observer
Definition: master.hpp:312
Try< Nothing > unavailability(const Unavailability &unavailability)
An "untyped" PID, used to encapsulate the process ID for lower-layer abstractions (eg...
Definition: pid.hpp:39
BoundedHashMap< TaskID, process::Owned< Task > > unreachableTasks
Definition: master.hpp:2580
Variant< Subscribe > state
Definition: master.hpp:1312
bool recovered() const
Definition: master.hpp:2525
Result< std::vector< Filter< Classifier > > > filters(const std::string &_link, const Handle &parent)
Definition: internal.hpp:769
process::Time registeredTime
Definition: master.hpp:213
bool active
Definition: master.hpp:222
An abstraction for contending to be a leading master.
Definition: contender.hpp:40
std::set< std::string > roles
Definition: master.hpp:2557
const Master * master
Definition: master.hpp:2760
hashset< UUID > orphanedOperations
Definition: master.hpp:268
process::UPID pid
Definition: master.hpp:205
Definition: uuid.hpp:35
Definition: protobuf_utils.hpp:631
Option< process::Time > reregisteredTime
Definition: master.hpp:214
Definition: agent.hpp:25
process::Time reregisteredTime
Definition: master.hpp:2562
Master *const master
Definition: master.hpp:199
Option< Error > quotaInfo(const mesos::quota::QuotaInfo &quotaInfo)
const T & get() const &
Definition: option.hpp:119
const std::string role
Definition: master.hpp:2761
Protocol< PromiseRequest, PromiseResponse > promise
Definition: protobuf.hpp:108
process::Time registeredTime
Definition: master.hpp:2561
Try< int_fd > accept(int_fd s)
Definition: network.hpp:31
Definition: whitelist_watcher.hpp:37
void removeFramework(Framework *framework)
Definition: master.hpp:2755
MasterInfo info() const
Definition: master.hpp:512
StreamingHttpConnection< v1::master::Event > http
Definition: master.hpp:2249
UUID resourceVersion
Definition: master.hpp:333
Definition: time.hpp:23
FrameworkInfo info
Definition: master.hpp:2555
ResponseHeartbeater< mesos::master::Event, v1::master::Event > heartbeater
Definition: master.hpp:2250
bool exited(const UPID &from, const UPID &to)
Simulates disconnection of the link between &#39;from&#39; and &#39;to&#39; by sending an ExitedEvent to &#39;to&#39;...
Definition: boundedhashmap.hpp:27
Basic model of an allocator: resources are allocated to a framework in the form of offers...
Definition: allocator.hpp:172
const process::Owned< ObjectApprovers > approvers
Definition: master.hpp:2251
#define flags
Definition: decoder.hpp:18
Definition: none.hpp:27
Definition: attributes.hpp:24
hashset< InverseOffer * > inverseOffers
Definition: master.hpp:2584
Subscriber(const StreamingHttpConnection< v1::master::Event > &_http, const process::Owned< ObjectApprovers > &_approvers)
Definition: master.hpp:2212
Definition: executor.hpp:48
const MachineID machineId
Definition: master.hpp:203
Definition: master.hpp:122
constexpr size_t MAX_REMOVED_SLAVES
Definition: constants.hpp:93
hashset< InverseOffer * > inverseOffers
Definition: master.hpp:274
An abstraction of a Master detector which can be used to detect the leading master from a group...
Definition: detector.hpp:38
static Time now()
The current clock time for either the current process that makes this call or the global clock time i...
Resources totalResources
Definition: master.hpp:320
Definition: event.hpp:103
Definition: metrics.hpp:41
State
Definition: master.hpp:2399
Offer * getOffer(Master *master, const OfferID &offerId)
Try< Nothing > kill(const std::string &hierarchy, const std::string &cgroup, int signal)
hashmap< FrameworkID, Framework * > frameworks
Definition: master.hpp:2770
mesos::master::Response::GetFrameworks::Framework model(const Framework &framework)
process::Time unregisteredTime
Definition: master.hpp:2563
mesos::v1::scheduler::Response Response
Definition: mesos.hpp:2854
std::string version
Definition: master.hpp:208
URI http(const std::string &host, const std::string &path="/", const Option< int > &port=None(), const Option< std::string > &query=None(), const Option< std::string > &fragment=None(), const Option< std::string > &user=None(), const Option< std::string > &password=None())
Creates an http URI with the given parameters.
Definition: http.hpp:35
ResourceProviderInfo info
Definition: master.hpp:319
hashmap< FrameworkID, hashmap< ExecutorID, ExecutorInfo > > executors
Definition: master.hpp:237
hashset< Offer * > offers
Definition: master.hpp:2582
std::string stringify(int flags)
Definition: owned.hpp:36
Definition: master.hpp:2397
protobuf::slave::Capabilities capabilities
Definition: master.hpp:211
hashmap< SlaveID, hashmap< ExecutorID, ExecutorInfo > > executors
Definition: master.hpp:2589
circular_buffer< process::Owned< Task > > completedTasks
Definition: master.hpp:2574
hashmap< OperationID, UUID > operationUUIDs
Definition: master.hpp:2597
hashmap< UUID, Operation * > operations
Definition: master.hpp:2593
Definition: parse.hpp:33
Try< std::set< pid_t > > pids()
Definition: freebsd.hpp:62
hashmap< SlaveID, Resources > usedResources
Definition: master.hpp:2626
Resources totalOfferedResources
Definition: master.hpp:2629
Definition: master.hpp:355
PID< MetricsProcess > metrics
Resources offeredResources
Definition: master.hpp:281
const FrameworkID id() const
Definition: master.hpp:2718
constexpr const char * name
Definition: shell.hpp:41
hashmap< FrameworkID, Resources > usedResources
Definition: master.hpp:279
bool contains(const Resource &left, const Resource &right)
Try< std::vector< Value > > time(const std::string &hierarchy, const std::string &cgroup)
Try< Nothing > schedule(const mesos::maintenance::Schedule &schedule, const hashmap< MachineID, Machine > &machines)
Performs the following checks on the new maintenance schedule:
const ::mesos::scheduler::OfferConstraints & offerConstraints() const
Definition: master.hpp:2548
Option< Error > registerSlave(const RegisterSlaveMessage &message)
StreamingHttpConnection< v1::master::Event > connection
Definition: master.hpp:1307
Role(const Master *_master, const std::string &_role)
Definition: master.hpp:2746
Future< size_t > send(const int_fd &fd, const void *buf, size_t size)