Apache Mesos
master.hpp
Go to the documentation of this file.
1 // Licensed to the Apache Software Foundation (ASF) under one
2 // or more contributor license agreements. See the NOTICE file
3 // distributed with this work for additional information
4 // regarding copyright ownership. The ASF licenses this file
5 // to you under the Apache License, Version 2.0 (the
6 // "License"); you may not use this file except in compliance
7 // with the License. You may obtain a copy of the License at
8 //
9 // http://www.apache.org/licenses/LICENSE-2.0
10 //
11 // Unless required by applicable law or agreed to in writing, software
12 // distributed under the License is distributed on an "AS IS" BASIS,
13 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 // See the License for the specific language governing permissions and
15 // limitations under the License.
16 
17 #ifndef __MASTER_HPP__
18 #define __MASTER_HPP__
19 
20 #include <stdint.h>
21 
22 #include <functional>
23 #include <list>
24 #include <memory>
25 #include <set>
26 #include <string>
27 #include <utility>
28 #include <vector>
29 
30 #include <mesos/mesos.hpp>
31 #include <mesos/resources.hpp>
32 #include <mesos/roles.hpp>
33 #include <mesos/type_utils.hpp>
34 
36 
40 #include <mesos/master/master.hpp>
41 
43 
44 #include <mesos/quota/quota.hpp>
45 
47 
48 #include <process/collect.hpp>
49 #include <process/future.hpp>
50 #include <process/limiter.hpp>
51 #include <process/http.hpp>
52 #include <process/owned.hpp>
53 #include <process/process.hpp>
54 #include <process/protobuf.hpp>
55 #include <process/timer.hpp>
56 
58 
59 #include <stout/boundedhashmap.hpp>
60 #include <stout/cache.hpp>
62 #include <stout/foreach.hpp>
63 #include <stout/hashmap.hpp>
64 #include <stout/hashset.hpp>
65 #include <stout/linkedhashmap.hpp>
66 #include <stout/multihashmap.hpp>
67 #include <stout/nothing.hpp>
68 #include <stout/option.hpp>
69 #include <stout/recordio.hpp>
70 #include <stout/try.hpp>
71 #include <stout/uuid.hpp>
72 
73 #include "common/heartbeater.hpp"
74 #include "common/http.hpp"
76 
77 #include "files/files.hpp"
78 
79 #include "internal/devolve.hpp"
80 #include "internal/evolve.hpp"
81 
82 #include "master/authorization.hpp"
83 #include "master/constants.hpp"
84 #include "master/flags.hpp"
85 #include "master/machine.hpp"
86 #include "master/metrics.hpp"
87 #include "master/validation.hpp"
88 
89 #include "messages/messages.hpp"
90 
91 namespace process {
92 class RateLimiter; // Forward declaration.
93 }
94 
95 namespace mesos {
96 
97 // Forward declarations.
98 class Authorizer;
99 class ObjectApprovers;
100 
101 namespace internal {
102 
103 // Forward declarations.
104 namespace registry {
105 class Slaves;
106 }
107 
108 class Registry;
109 class WhitelistWatcher;
110 
111 namespace master {
112 
113 class Master;
114 class Registrar;
115 class SlaveObserver;
116 
117 struct BoundedRateLimiter;
118 struct Framework;
119 struct Role;
120 
121 
122 struct Slave
123 {
124 Slave(Master* const _master,
125  SlaveInfo _info,
126  const process::UPID& _pid,
127  const MachineID& _machineId,
128  const std::string& _version,
129  std::vector<SlaveInfo::Capability> _capabilites,
130  const process::Time& _registeredTime,
131  std::vector<Resource> _checkpointedResources,
132  const Option<UUID>& _resourceVersion,
133  std::vector<ExecutorInfo> executorInfos = std::vector<ExecutorInfo>(),
134  std::vector<Task> tasks = std::vector<Task>());
135 
136  ~Slave();
137 
138  Task* getTask(
139  const FrameworkID& frameworkId,
140  const TaskID& taskId) const;
141 
142  void addTask(Task* task);
143 
144  // Update slave to recover the resources that were previously
145  // being used by `task`.
146  //
147  // TODO(bmahler): This is a hack for performance. We need to
148  // maintain resource counters because computing task resources
149  // functionally for all tasks is expensive, for now.
150  void recoverResources(Task* task);
151 
152  void removeTask(Task* task);
153 
154  void addOperation(Operation* operation);
155 
156  void recoverResources(Operation* operation);
157 
158  void removeOperation(Operation* operation);
159 
160  // Marks a non-speculative operation as an orphan when the originating
161  // framework is torn down by the master, or when an agent reregisters
162  // with operations from unknown frameworks. If the operation is
163  // non-terminal, this has the side effect of modifying the agent's
164  // total resources, and should therefore be followed by
165  // `allocator->updateSlave()`.
166  void markOperationAsOrphan(Operation* operation);
167 
168  Operation* getOperation(const UUID& uuid) const;
169 
170  void addOffer(Offer* offer);
171 
172  void removeOffer(Offer* offer);
173 
174  void addInverseOffer(InverseOffer* inverseOffer);
175 
176  void removeInverseOffer(InverseOffer* inverseOffer);
177 
178  bool hasExecutor(
179  const FrameworkID& frameworkId,
180  const ExecutorID& executorId) const;
181 
182  void addExecutor(
183  const FrameworkID& frameworkId,
184  const ExecutorInfo& executorInfo);
185 
186  void removeExecutor(
187  const FrameworkID& frameworkId,
188  const ExecutorID& executorId);
189 
190  void apply(const std::vector<ResourceConversion>& conversions);
191 
193  const SlaveInfo& info,
194  const std::string& _version,
195  const std::vector<SlaveInfo::Capability>& _capabilites,
196  const Resources& _checkpointedResources,
197  const Option<UUID>& resourceVersion);
198 
199  Master* const master;
200  const SlaveID id;
201  SlaveInfo info;
202 
203  const MachineID machineId;
204 
206 
207  // TODO(bmahler): Use stout's Version when it can parse labels, etc.
208  std::string version;
209 
210  // Agent capabilities.
212 
215 
216  // Slave becomes disconnected when the socket closes.
217  bool connected;
218 
219  // Slave becomes deactivated when it gets disconnected, or when the
220  // agent is deactivated via the DRAIN_AGENT or DEACTIVATE_AGENT calls.
221  // No offers will be made for a deactivated slave.
222  bool active;
223 
224  // Timer for marking slaves unreachable that become disconnected and
225  // don't reregister. This timeout is larger than the slave
226  // observer's timeout, so typically the slave observer will be the
227  // one to mark such slaves unreachable; this timer is a backup for
228  // when a slave responds to pings but does not reregister (e.g.,
229  // because agent recovery has hung).
231 
232  // Executors running on this slave.
233  //
234  // TODO(bmahler): Make this private to enforce that `addExecutor()`
235  // and `removeExecutor()` are used, and provide a const view into
236  // the executors.
238 
239  // Tasks present on this slave.
240  //
241  // TODO(bmahler): Make this private to enforce that `addTask()` and
242  // `removeTask()` are used, and provide a const view into the tasks.
243  //
244  // TODO(bmahler): The task pointer ownership complexity arises from the fact
245  // that we own the pointer here, but it's shared with the Framework struct.
246  // We should find a way to eliminate this.
248 
249  // Tasks that were asked to kill by frameworks.
250  // This is used for reconciliation when the slave reregisters.
252 
253  // Pending operations or terminal operations that have
254  // unacknowledged status updates on this agent.
256 
257  // Pending operations whose originating framework is unknown.
258  // These operations could be pending, or terminal with unacknowledged
259  // status updates.
260  //
261  // This list can be populated whenever a framework is torn down in the
262  // lifetime of the master, or when an agent reregisters with an operation.
263  //
264  // If the originating framework is completed, the master will
265  // acknowledge any status updates instead of the framework.
266  // If an orphan does not belong to a completed framework, the master
267  // will only acknowledge status updates after a fixed delay.
269 
270  // Active offers on this slave.
272 
273  // Active inverse offers on this slave.
275 
276  // Resources for active task / executors / operations.
277  // Note that we maintain multiple copies of each shared resource in
278  // `usedResources` as they are used by multiple tasks.
280 
282 
283  // Resources that should be checkpointed by the slave (e.g.,
284  // persistent volumes, dynamic reservations, etc). These are either
285  // in use by a task/executor, or are available for use and will be
286  // re-offered to the framework.
287  // TODO(jieyu): `checkpointedResources` is only for agent default
288  // resources. Resources from resource providers are not included in
289  // this field. Consider removing this field.
291 
292  // The current total resources of the slave. Note that this is
293  // different from 'info.resources()' because this also considers
294  // operations (e.g., CREATE, RESERVE) that have been applied and
295  // includes revocable resources and resources from resource
296  // providers as well.
298 
299  // Used to establish the relationship between the operation and the
300  // resources that the operation is operating on. Each resource
301  // provider will keep a resource version UUID, and change it when it
302  // believes that the resources from this resource provider are out
303  // of sync from the master's view. The master will keep track of
304  // the last known resource version UUID for each resource provider,
305  // and attach the resource version UUID in each operation it sends
306  // out. The resource provider should reject operations that have a
307  // different resource version UUID than that it maintains, because
308  // this means the operation is operating on resources that might
309  // have already been invalidated.
311 
312  SlaveObserver* observer;
313 
314  // Time when this agent was last asked to drain. This field
315  // is empty if the agent is not currently draining or drained.
317 
319  ResourceProviderInfo info;
321 
322  // Used to establish the relationship between the operation and the
323  // resources that the operation is operating on. Each resource
324  // provider will keep a resource version UUID, and change it when it
325  // believes that the resources from this resource provider are out
326  // of sync from the master's view. The master will keep track of
327  // the last known resource version UUID for each resource provider,
328  // and attach the resource version UUID in each operation it sends
329  // out. The resource provider should reject operations that have a
330  // different resource version UUID than that it maintains, because
331  // this means the operation is operating on resources that might
332  // have already been invalidated.
334 
335  // Pending operations or terminal operations that have
336  // unacknowledged status updates.
338  };
339 
341 
342 private:
343  Slave(const Slave&); // No copying.
344  Slave& operator=(const Slave&); // No assigning.
345 };
346 
347 
348 inline std::ostream& operator<<(std::ostream& stream, const Slave& slave)
349 {
350  return stream << slave.id << " at " << slave.pid
351  << " (" << slave.info.hostname() << ")";
352 }
353 
354 
355 class Master : public ProtobufProcess<Master>
356 {
357 public:
359  Registrar* registrar,
360  Files* files,
363  const Option<Authorizer*>& authorizer,
364  const Option<std::shared_ptr<process::RateLimiter>>&
365  slaveRemovalLimiter,
366  const Flags& flags = Flags());
367 
368  ~Master() override;
369 
370  // Compare this master's capabilities with registry's minimum capability.
371  // Return the set of capabilities missing from this master.
372  static hashset<std::string> missingMinimumCapabilities(
373  const MasterInfo& masterInfo, const Registry& registry);
374 
375  // Message handlers.
376  void submitScheduler(
377  const std::string& name);
378 
379  void registerFramework(
380  const process::UPID& from,
381  RegisterFrameworkMessage&& registerFrameworkMessage);
382 
383  void reregisterFramework(
384  const process::UPID& from,
385  ReregisterFrameworkMessage&& reregisterFrameworkMessage);
386 
387  void unregisterFramework(
388  const process::UPID& from,
389  const FrameworkID& frameworkId);
390 
391  void deactivateFramework(
392  const process::UPID& from,
393  const FrameworkID& frameworkId);
394 
395  // TODO(vinod): Remove this once the old driver is removed.
396  void resourceRequest(
397  const process::UPID& from,
398  const FrameworkID& frameworkId,
399  const std::vector<Request>& requests);
400 
401  void launchTasks(
402  const process::UPID& from,
403  LaunchTasksMessage&& launchTasksMessage);
404 
405  void reviveOffers(
406  const process::UPID& from,
407  const FrameworkID& frameworkId,
408  const std::vector<std::string>& role);
409 
410  void killTask(
411  const process::UPID& from,
412  const FrameworkID& frameworkId,
413  const TaskID& taskId);
414 
415  void statusUpdateAcknowledgement(
416  const process::UPID& from,
417  StatusUpdateAcknowledgementMessage&& statusUpdateAcknowledgementMessage);
418 
419  void schedulerMessage(
420  const process::UPID& from,
421  FrameworkToExecutorMessage&& frameworkToExecutorMessage);
422 
423  void executorMessage(
424  const process::UPID& from,
425  ExecutorToFrameworkMessage&& executorToFrameworkMessage);
426 
427  void registerSlave(
428  const process::UPID& from,
429  RegisterSlaveMessage&& registerSlaveMessage);
430 
431  void reregisterSlave(
432  const process::UPID& from,
433  ReregisterSlaveMessage&& incomingMessage);
434 
435  void unregisterSlave(
436  const process::UPID& from,
437  const SlaveID& slaveId);
438 
439  void statusUpdate(
440  StatusUpdateMessage&& statusUpdateMessage);
441 
442  void reconcileTasks(
443  const process::UPID& from,
444  ReconcileTasksMessage&& reconcileTasksMessage);
445 
446  void updateOperationStatus(
447  UpdateOperationStatusMessage&& update);
448 
449  void exitedExecutor(
450  const process::UPID& from,
451  const SlaveID& slaveId,
452  const FrameworkID& frameworkId,
453  const ExecutorID& executorId,
454  int32_t status);
455 
456  void updateSlave(UpdateSlaveMessage&& message);
457 
458  void updateUnavailability(
459  const MachineID& machineId,
461 
462  // Marks the agent unreachable and returns whether the agent was
463  // marked unreachable. Returns false if the agent is already
464  // in a transitioning state or has transitioned into another
465  // state (this includes already being marked unreachable).
466  // The `duringMasterFailover` parameter specifies whether this
467  // agent is transitioning from a recovered state (true) or a
468  // registered state (false).
469  //
470  // Discarding currently not supported.
471  //
472  // Will not return a failure (this will crash the master
473  // internally in the case of a registry failure).
474  process::Future<bool> markUnreachable(
475  const SlaveInfo& slave,
476  bool duringMasterFailover,
477  const std::string& message);
478 
479  void markGone(const SlaveID& slaveId, const TimeInfo& goneTime);
480 
481  void authenticate(
482  const process::UPID& from,
483  const process::UPID& pid);
484 
485  // TODO(bmahler): It would be preferred to use a unique libprocess
486  // Process identifier (PID is not sufficient) for identifying the
487  // framework instance, rather than relying on re-registration time.
488  void frameworkFailoverTimeout(
489  const FrameworkID& frameworkId,
490  const process::Time& reregisteredTime);
491 
492  void offer(
493  const FrameworkID& frameworkId,
494  const hashmap<std::string, hashmap<SlaveID, Resources>>& resources);
495 
496  void inverseOffer(
497  const FrameworkID& frameworkId,
498  const hashmap<SlaveID, UnavailableResources>& resources);
499 
500  // Invoked when there is a newly elected leading master.
501  // Made public for testing purposes.
502  void detected(const process::Future<Option<MasterInfo>>& _leader);
503 
504  // Invoked when the contender has lost the candidacy.
505  // Made public for testing purposes.
506  void lostCandidacy(const process::Future<Nothing>& lost);
507 
508  // Continuation of recover().
509  // Made public for testing purposes.
510  process::Future<Nothing> _recover(const Registry& registry);
511 
512  MasterInfo info() const
513  {
514  return info_;
515  }
516 
517 protected:
518  void initialize() override;
519  void finalize() override;
520 
521  void consume(process::MessageEvent&& event) override;
522  void consume(process::ExitedEvent&& event) override;
523 
524  void exited(const process::UPID& pid) override;
525  void exited(
526  const FrameworkID& frameworkId,
528 
529  void _exited(Framework* framework);
530 
531  // Invoked upon noticing a subscriber disconnection.
532  void exited(const id::UUID& id);
533 
534  void agentReregisterTimeout(const SlaveID& slaveId);
535  Nothing _agentReregisterTimeout(const SlaveID& slaveId);
536 
537  // Invoked when the message is ready to be executed after
538  // being throttled.
539  // 'principal' being None indicates it is throttled by
540  // 'defaultLimiter'.
541  void throttled(
542  process::MessageEvent&& event,
543  const Option<std::string>& principal);
544 
545  // Continuations of consume().
546  void _consume(process::MessageEvent&& event);
547  void _consume(process::ExitedEvent&& event);
548 
549  // Helper method invoked when the capacity for a framework
550  // principal is exceeded.
551  void exceededCapacity(
552  const process::MessageEvent& event,
553  const Option<std::string>& principal,
554  uint64_t capacity);
555 
556  // Recovers state from the registrar.
558  void recoveredSlavesTimeout(const Registry& registry);
559 
560  void _registerSlave(
561  const process::UPID& pid,
562  RegisterSlaveMessage&& registerSlaveMessage,
564  const process::Future<bool>& authorized);
565 
566  void __registerSlave(
567  const process::UPID& pid,
568  RegisterSlaveMessage&& registerSlaveMessage,
569  const process::Future<bool>& admit);
570 
571  void _reregisterSlave(
572  const process::UPID& pid,
573  ReregisterSlaveMessage&& incomingMessage,
575  const process::Future<bool>& authorized);
576 
577  void __reregisterSlave(
578  const process::UPID& pid,
579  ReregisterSlaveMessage&& incomingMessage,
580  const process::Future<bool>& readmit);
581 
582  void ___reregisterSlave(
583  const process::UPID& pid,
584  ReregisterSlaveMessage&& incomingMessage,
585  const process::Future<bool>& updated);
586 
587  void updateSlaveFrameworks(
588  Slave* slave,
589  const std::vector<FrameworkInfo>& frameworks);
590 
591  // 'future' is the future returned by the authenticator.
592  void _authenticate(
593  const process::UPID& pid,
594  const process::Future<Option<std::string>>& future);
595 
596  void authenticationTimeout(process::Future<Option<std::string>> future);
597 
598  void fileAttached(const process::Future<Nothing>& result,
599  const std::string& path);
600 
601  // Invoked when the contender has entered the contest.
602  void contended(const process::Future<process::Future<Nothing>>& candidacy);
603 
604  // When a slave that was previously registered with this master
605  // reregisters, we need to reconcile the master's view of the
606  // slave's tasks and executors. This function also sends the
607  // `SlaveReregisteredMessage`.
608  void reconcileKnownSlave(
609  Slave* slave,
610  const std::vector<ExecutorInfo>& executors,
611  const std::vector<Task>& tasks);
612 
613  // Add a framework.
614  void addFramework(
615  Framework* framework,
616  const std::set<std::string>& suppressedRoles);
617 
618  // Recover a framework from its `FrameworkInfo`. This happens after
619  // master failover, when an agent running one of the framework's
620  // tasks reregisters or when the framework itself reregisters,
621  // whichever happens first. The result of this function is a
622  // registered, inactive framework with state `RECOVERED`.
623  void recoverFramework(
624  const FrameworkInfo& info,
625  const std::set<std::string>& suppressedRoles);
626 
627  // Transition a framework from `RECOVERED` to `CONNECTED` state and
628  // activate it. This happens at most once after master failover, the
629  // first time that the framework reregisters with the new master.
630  // Exactly one of `newPid` or `http` must be provided.
631  void connectAndActivateRecoveredFramework(
632  Framework* framework,
633  const FrameworkInfo& frameworkInfo,
634  const Option<process::UPID>& pid,
636  const process::Owned<ObjectApprovers>& objectApprovers,
637  const std::set<std::string>& suppressedRoles);
638 
639  // Replace the scheduler for a framework with a new process ID, in
640  // the event of a scheduler failover.
641  void failoverFramework(
642  Framework* framework,
643  const process::UPID& newPid,
644  const process::Owned<ObjectApprovers>& objectApprovers);
645 
646  // Replace the scheduler for a framework with a new HTTP connection,
647  // in the event of a scheduler failover.
648  void failoverFramework(
649  Framework* framework,
651  const process::Owned<ObjectApprovers>& objectApprovers);
652 
653  void _failoverFramework(Framework* framework);
654 
655  // Kill all of a framework's tasks, delete the framework object, and
656  // reschedule offers that were assigned to this framework.
657  void removeFramework(Framework* framework);
658 
659  // Remove a framework from the slave, i.e., remove its tasks and
660  // executors and recover the resources.
661  void removeFramework(Slave* slave, Framework* framework);
662 
663  // Performs actions common for all the framework update paths.
664  //
665  // NOTE: the fields 'id', 'principal', 'name' and 'checkpoint' in the
666  // 'frameworkInfo' should have the same values as in 'framework->info',
667  // otherwise this method terminates the program.
668  //
669  // TODO(asekretenko): Make sending FrameworkInfo updates to slaves, API
670  // subscribers and anywhere else a responsibility of this method -
671  // currently is is not, see MESOS-9746. After that we can remove the
672  // 'sendFrameworkUpdates()' method.
673  void updateFramework(
674  Framework* framework,
675  const FrameworkInfo& frameworkInfo,
676  const std::set<std::string>& suppressedRoles);
677 
678  void sendFrameworkUpdates(const Framework& framework);
679 
680  void disconnect(Framework* framework);
681  void deactivate(Framework* framework, bool rescind);
682 
683  void disconnect(Slave* slave);
684 
685  // Removes the agent from the resource offer cycle (and rescinds active
686  // offers). Other aspects of the agent will continue to function normally.
687  void deactivate(Slave* slave);
688 
689  // Adds the agent back to the resource offer cycle.
690  // Must *NOT* be called if the agent is `deactivated`.
691  void reactivate(Slave* slave);
692 
693  // Add a slave.
694  void addSlave(
695  Slave* slave,
696  std::vector<Archive::Framework>&& completedFrameworks);
697 
698  void _markUnreachable(
699  const SlaveInfo& slave,
700  const TimeInfo& unreachableTime,
701  bool duringMasterFailover,
702  const std::string& message,
703  bool registrarResult);
704 
705  void sendSlaveLost(const SlaveInfo& slaveInfo);
706 
707  // Remove the slave from the registrar and from the master's state.
708  //
709  // TODO(bmahler): 'reason' is optional until MESOS-2317 is resolved.
710  void removeSlave(
711  Slave* slave,
712  const std::string& message,
714 
715  // Removes an agent from the master's state in the following cases:
716  // * When maintenance is started on an agent
717  // * When an agent registers with a new ID from a previously-known IP + port
718  // * When an agent unregisters itself with an `UnregisterSlaveMessage`
719  void _removeSlave(
720  Slave* slave,
721  const process::Future<bool>& registrarResult,
722  const std::string& removalCause,
724 
725  // Removes an agent from the master's state in the following cases:
726  // * When marking an agent unreachable
727  // * When marking an agent gone
728  //
729  // NOTE that in spite of the name `__removeSlave()`, this function is NOT a
730  // continuation of `_removeSlave()`. Rather, these two functions perform
731  // similar logic for slightly different cases.
732  //
733  // TODO(greggomann): refactor `_removeSlave` and `__removeSlave` into a single
734  // common helper function. (See MESOS-9550)
735  void __removeSlave(
736  Slave* slave,
737  const std::string& message,
738  const Option<TimeInfo>& unreachableTime);
739 
740  // Validates that the framework is authenticated, if required.
741  Option<Error> validateFrameworkAuthentication(
742  const FrameworkInfo& frameworkInfo,
743  const process::UPID& from);
744 
745  // Returns whether the principal is authorized for the specified
746  // action-object pair.
747  // Returns failure for transient authorization failures.
748  process::Future<bool> authorize(
750  authorization::ActionObject&& actionObject);
751 
752  // Overload of authorize() for cases which require multiple action-object
753  // pairs to be authorized simultaneously.
754  process::Future<bool> authorize(
756  std::vector<authorization::ActionObject>&& actionObjects);
757 
758  // Determine if a new executor needs to be launched.
759  bool isLaunchExecutor (
760  const ExecutorID& executorId,
761  Framework* framework,
762  Slave* slave) const;
763 
764  // Add executor to the framework and slave.
765  void addExecutor(
766  const ExecutorInfo& executorInfo,
767  Framework* framework,
768  Slave* slave);
769 
770  // Add task to the framework and slave.
771  void addTask(const TaskInfo& task, Framework* framework, Slave* slave);
772 
773  // Transitions the task, and recovers resources if the task becomes
774  // terminal.
775  void updateTask(Task* task, const StatusUpdate& update);
776 
777  // Removes the task. `unreachable` indicates whether the task is removed due
778  // to being unreachable. Note that we cannot rely on the task state because
779  // it may not reflect unreachability due to being set to TASK_LOST for
780  // backwards compatibility.
781  void removeTask(Task* task, bool unreachable = false);
782 
783  // Remove an executor and recover its resources.
784  void removeExecutor(
785  Slave* slave,
786  const FrameworkID& frameworkId,
787  const ExecutorID& executorId);
788 
789  // Adds the given operation to the framework and the agent.
790  void addOperation(
791  Framework* framework,
792  Slave* slave,
793  Operation* operation);
794 
795  // Transitions the operation, and updates and recovers resources if
796  // the operation becomes terminal. If `convertResources` is `false`
797  // only the consumed resources of terminal operations are recovered,
798  // but no resources are converted.
799  void updateOperation(
800  Operation* operation,
801  const UpdateOperationStatusMessage& update,
802  bool convertResources = true);
803 
804  // Remove the operation.
805  void removeOperation(Operation* operation);
806 
807  // Send operation update for all operations on the agent.
808  void sendBulkOperationFeedback(
809  Slave* slave,
810  OperationState operationState,
811  const std::string& message);
812 
813  // Attempts to update the allocator by applying the given operation.
814  // If successful, updates the slave's resources, sends a
815  // 'CheckpointResourcesMessage' to the slave with the updated
816  // checkpointed resources, and returns a 'Future' with 'Nothing'.
817  // Otherwise, no action is taken and returns a failed 'Future'.
819  Slave* slave,
820  const Offer::Operation& operation);
821 
822  // Forwards the update to the framework.
823  void forward(
824  const StatusUpdate& update,
825  const process::UPID& acknowledgee,
826  Framework* framework);
827 
828  // Remove an offer after specified timeout
829  void offerTimeout(const OfferID& offerId);
830 
831  // Methods for removing an offer and handling associated resources.
832  // Both recover the resources in the allocator (optionally setting offer
833  // filters) and remove the offer in the master. `rescindOffer` further
834  // notifies the framework about the rescind.
835  //
836  // NOTE: the `filters` field in `rescindOffers` is needed only as
837  // a workaround for the race between the master and the allocator
838  // which happens when the master tries to free up resources to satisfy
839  // operator initiated operations.
840  void rescindOffer(Offer* offer, const Option<Filters>& filters = None());
841  void discardOffer(Offer* offer, const Option<Filters>& filters = None());
842 
843  // Helper for rescindOffer() / discardOffer() / _accept().
844  // Do not use directly.
845  //
846  // The offer must belong to the framework.
847  void _removeOffer(Framework* framework, Offer* offer);
848 
849  // Remove an inverse offer after specified timeout
850  void inverseOfferTimeout(const OfferID& inverseOfferId);
851 
852  // Remove an inverse offer and optionally rescind it as well.
853  void removeInverseOffer(InverseOffer* inverseOffer, bool rescind = false);
854 
855  bool isCompletedFramework(const FrameworkID& frameworkId) const;
856 
857  Framework* getFramework(const FrameworkID& frameworkId) const;
858  Offer* getOffer(const OfferID& offerId) const;
859  InverseOffer* getInverseOffer(const OfferID& inverseOfferId) const;
860 
861  FrameworkID newFrameworkId();
862  OfferID newOfferId();
863  SlaveID newSlaveId();
864 
865 private:
866  // Updates the agent's resources by applying the given operation.
867  // Sends either `ApplyOperationMessage` or
868  // `CheckpointResourcesMessage` (with updated checkpointed
869  // resources) to the agent depending on if the agent has
870  // `RESOURCE_PROVIDER` capability.
871  void _apply(
872  Slave* slave,
873  Framework* framework,
874  const Offer::Operation& operationInfo);
875 
876  void drop(
877  const process::UPID& from,
878  const mesos::scheduler::Call& call,
879  const std::string& message);
880 
881  void drop(
882  Framework* framework,
883  const Offer::Operation& operation,
884  const std::string& message);
885 
886  void drop(
887  Framework* framework,
888  const mesos::scheduler::Call& call,
889  const std::string& message);
890 
891  void drop(
892  Framework* framework,
893  const mesos::scheduler::Call::Suppress& suppress,
894  const std::string& message);
895 
896  void drop(
897  Framework* framework,
898  const mesos::scheduler::Call::Revive& revive,
899  const std::string& message);
900 
901  // Call handlers.
902  void receive(
903  const process::UPID& from,
904  mesos::scheduler::Call&& call);
905 
906  void subscribe(
908  mesos::scheduler::Call::Subscribe&& subscribe);
909 
910  void _subscribe(
912  FrameworkInfo&& frameworkInfo,
913  bool force,
914  google::protobuf::RepeatedPtrField<std::string>&& suppressedRoles,
915  const process::Future<process::Owned<ObjectApprovers>>& objectApprovers);
916 
917  void subscribe(
918  const process::UPID& from,
919  mesos::scheduler::Call::Subscribe&& subscribe);
920 
921  void _subscribe(
922  const process::UPID& from,
923  FrameworkInfo&& frameworkInfo,
924  bool force,
925  google::protobuf::RepeatedPtrField<std::string>&& suppressedRoles,
926  const process::Future<process::Owned<ObjectApprovers>>& objectApprovers);
927 
928  // Update framework via SchedulerDriver (i.e. no response
929  // code feedback, FrameworkErrorMessage on error).
930  void updateFramework(
931  const process::UPID& from,
932  mesos::scheduler::Call::UpdateFramework&& call);
933 
934  // Update framework via HTTP API (i.e. returns 200 OK).
936  mesos::scheduler::Call::UpdateFramework&& call);
937 
938  // Subscribes a client to the 'api/vX' endpoint.
939  void subscribe(
941  const process::Owned<ObjectApprovers>& approvers);
942 
943  void teardown(Framework* framework);
944 
945  void accept(
946  Framework* framework,
947  mesos::scheduler::Call::Accept&& accept);
948 
949  void _accept(
950  const FrameworkID& frameworkId,
951  const SlaveID& slaveId,
952  mesos::scheduler::Call::Accept&& accept);
953 
954  void acceptInverseOffers(
955  Framework* framework,
956  const mesos::scheduler::Call::AcceptInverseOffers& accept);
957 
958  void decline(
959  Framework* framework,
960  mesos::scheduler::Call::Decline&& decline);
961 
962  void declineInverseOffers(
963  Framework* framework,
964  const mesos::scheduler::Call::DeclineInverseOffers& decline);
965 
966  // Should be called after each terminal task status update acknowledgement
967  // or terminal operation acknowledgement. If an agent is draining, this
968  // checks if all pending tasks or operations have terminated and then
969  // transitions the DRAINING agent to DRAINED.
970  void checkAndTransitionDrainingAgent(Slave* slave);
971 
972  void revive(
973  Framework* framework,
974  const mesos::scheduler::Call::Revive& revive);
975 
976  void kill(
977  Framework* framework,
978  const mesos::scheduler::Call::Kill& kill);
979 
980  void shutdown(
981  Framework* framework,
982  const mesos::scheduler::Call::Shutdown& shutdown);
983 
984  void acknowledge(
985  Framework* framework,
986  mesos::scheduler::Call::Acknowledge&& acknowledge);
987 
988  void acknowledgeOperationStatus(
989  Framework* framework,
990  mesos::scheduler::Call::AcknowledgeOperationStatus&& acknowledge);
991 
992  void reconcile(
993  Framework* framework,
994  mesos::scheduler::Call::Reconcile&& reconcile);
995 
996  void reconcileOperations(
997  Framework* framework,
998  mesos::scheduler::Call::ReconcileOperations&& reconcile);
999 
1000  void message(
1001  Framework* framework,
1002  mesos::scheduler::Call::Message&& message);
1003 
1004  void request(
1005  Framework* framework,
1006  const mesos::scheduler::Call::Request& request);
1007 
1008  void suppress(
1009  Framework* framework,
1010  const mesos::scheduler::Call::Suppress& suppress);
1011 
1012  bool elected() const
1013  {
1014  return leader.isSome() && leader.get() == info_;
1015  }
1016 
1017  void scheduleRegistryGc();
1018 
1019  void doRegistryGc();
1020 
1021  void _doRegistryGc(
1022  const hashset<SlaveID>& toRemoveUnreachable,
1023  const hashset<SlaveID>& toRemoveGone,
1024  const process::Future<bool>& registrarResult);
1025 
1026  // Returns all roles known to the master, if roles are whitelisted
1027  // this simply returns the whitelist and any ancestors of roles in
1028  // the whitelist. Otherwise, this returns:
1029  //
1030  // (1) Roles with configured weight or quota.
1031  // (2) Roles with reservations.
1032  // (3) Roles with frameworks subscribed or allocated resources.
1033  // (4) Ancestor roles of (1), (2), or (3).
1034  std::vector<std::string> knownRoles() const;
1035 
1043  bool isWhitelistedRole(const std::string& name) const;
1044 
1045  // TODO(bmahler): Store a role tree rather than the existing
1046  // `roles` map which does not track the tree correctly (it does
1047  // not insert ancestor entries, nor does it track roles if there
1048  // are reservations but no frameworks related to them).
1049  struct RoleResourceBreakdown
1050  {
1051  public:
1052  RoleResourceBreakdown(const Master* const master_, const std::string& role_)
1053  : master(master_), role(role_) {}
1054 
1055  ResourceQuantities offered() const;
1056  ResourceQuantities allocated() const;
1057  ResourceQuantities reserved() const;
1058  ResourceQuantities consumedQuota() const;
1059 
1060  private:
1061  const Master* const master;
1062  const std::string role;
1063  };
1064 
1065  // Performs validations of the FrameworkInfo and suppressed roles set
1066  // which do not depend on the current state of this framework.
1067  Option<Error> validateFramework(
1068  const FrameworkInfo& frameworkInfo,
1069  const google::protobuf::RepeatedPtrField<std::string>& suppressedRoles)
1070  const;
1071 
1079  class QuotaHandler
1080  {
1081  public:
1082  explicit QuotaHandler(Master* _master) : master(_master)
1083  {
1084  CHECK_NOTNULL(master);
1085  }
1086 
1087  // Returns a list of set quotas.
1089  const mesos::master::Call& call,
1091  ContentType contentType) const;
1092 
1094  const process::http::Request& request,
1096  principal) const;
1097 
1099  const mesos::master::Call& call,
1101  const;
1102 
1104  const mesos::master::Call& call,
1106  principal) const;
1107 
1111  principal) const;
1112 
1114  const mesos::master::Call& call,
1116  principal) const;
1117 
1121  principal) const;
1122 
1123  private:
1124  // Returns an error if the total quota guarantees overcommits
1125  // the cluster. This is not a quota satisfiability check: it's
1126  // possible that quota is unsatisfiable even if the quota
1127  // does not overcommit the cluster.
1128 
1129  // Returns an error if the total quota guarantees overcommits
1130  // the cluster. This is not a quota satisfiability check: it's
1131  // possible that quota is unsatisfiable even if the quota
1132  // does not overcommit the cluster. Specifically, we verify that
1133  // the following inequality holds:
1134  //
1135  // total cluster capacity >= total quota w/ quota request applied
1136  //
1137  // Note, total cluster capacity accounts resources of all the
1138  // registered agents, including resources from resource providers
1139  // as well as reservations (both static and dynamic ones).
1140  static Option<Error> overcommitCheck(
1141  const std::vector<Resources>& agents,
1142  const hashmap<std::string, Quota>& quotas,
1143  const mesos::quota::QuotaInfo& request);
1144 
1145  // We always want to rescind offers after the capacity heuristic. The
1146  // reason for this is the race between the allocator and the master:
1147  // it can happen that there are not enough free resources at the
1148  // allocator's disposal when it is notified about the quota request,
1149  // but at this point it's too late to rescind.
1150  //
1151  // While rescinding, we adhere to the following rules:
1152  // * Rescind at least as many resources as there are in the quota request.
1153  // * Rescind all offers from an agent in order to make the potential
1154  // offer bigger, which increases the chances that a quota'ed framework
1155  // will be able to use the offer.
1156  // * Rescind offers from at least `numF` agents to make it possible
1157  // (but not guaranteed, due to fair sharing) that each framework in
1158  // the role for which quota is set gets an offer (`numF` is the
1159  // number of frameworks in the quota'ed role). Though this is not
1160  // strictly necessary, we think this will increase the debugability
1161  // and will improve user experience.
1162  //
1163  // TODO(alexr): Consider removing this function once offer management
1164  // (including rescinding) is moved to allocator.
1165  void rescindOffers(const mesos::quota::QuotaInfo& request) const;
1166 
1167  process::Future<bool> authorizeGetQuota(
1169  const std::string& role) const;
1170 
1171  // This auth function is used for legacy `SET_QUOTA` and `REMOVE_QUOTA`
1172  // calls. Remove this function after the associated API calls are
1173  // no longer supported.
1174  process::Future<bool> authorizeUpdateQuota(
1176  const mesos::quota::QuotaInfo& quotaInfo) const;
1177 
1178  process::Future<bool> authorizeUpdateQuotaConfig(
1180  const mesos::quota::QuotaConfig& quotaConfig) const;
1181 
1184  principal) const;
1185 
1187  const google::protobuf::RepeatedPtrField<mesos::quota::QuotaConfig>&
1188  quotaConfigs) const;
1189 
1191  const mesos::quota::QuotaRequest& quotaRequest,
1193  principal) const;
1194 
1196  const mesos::quota::QuotaInfo& quotaInfo,
1197  bool forced) const;
1198 
1200  const std::string& role,
1202  principal) const;
1203 
1205  const std::string& role) const;
1206 
1207  // To perform actions related to quota management, we require access to the
1208  // master data structures. No synchronization primitives are needed here
1209  // since `QuotaHandler`'s functions are invoked in the Master's actor.
1210  Master* master;
1211  };
1212 
1220  class WeightsHandler
1221  {
1222  public:
1223  explicit WeightsHandler(Master* _master) : master(_master)
1224  {
1225  CHECK_NOTNULL(master);
1226  }
1227 
1231  principal) const;
1232 
1234  const mesos::master::Call& call,
1236  ContentType contentType) const;
1237 
1239  const process::http::Request& request,
1241  principal) const;
1242 
1244  const mesos::master::Call& call,
1246  ContentType contentType) const;
1247 
1248  private:
1249  process::Future<bool> authorizeGetWeight(
1251  const WeightInfo& weight) const;
1252 
1253  process::Future<bool> authorizeUpdateWeights(
1255  const std::vector<std::string>& roles) const;
1256 
1258  const std::vector<WeightInfo>& weightInfos,
1259  const std::vector<bool>& roleAuthorizations) const;
1260 
1263  principal) const;
1264 
1267  const google::protobuf::RepeatedPtrField<WeightInfo>& weightInfos)
1268  const;
1269 
1271  const std::vector<WeightInfo>& weightInfos) const;
1272 
1273  // Rescind all outstanding offers if any of the 'weightInfos' roles has
1274  // an active framework.
1275  void rescindOffers(const std::vector<WeightInfo>& weightInfos) const;
1276 
1277  Master* master;
1278  };
1279 
1280 public:
1281  // Inner class used to namespace read-only HTTP handlers; these handlers
1282  // may be executed in parallel.
1283  //
1284  // A synchronously executed post-processing step is provided for any
1285  // cases where the handler is not purely read-only and requires a
1286  // synchronous write (i.e. it's not feasible to perform the write
1287  // asynchronously (e.g. SUBSCRIBE cannot have a gap between serving
1288  // the initial state and registering the subscriber, or else events
1289  // will be missed in the interim)).
1290  //
1291  // The handlers are only permitted to depend on the output content
1292  // type (derived from the request headers), the request query
1293  // parameters and the authorization filters to de-duplicate identical
1294  // responses (this does not de-duplicate all identical responses, e.g.
1295  // different authz principal but same permissions).
1296  //
1297  // NOTE: Most member functions of this class are not routed directly but
1298  // dispatched from their corresponding handlers in the outer `Http` class.
1299  // This is because deciding whether an incoming request is read-only often
1300  // requires some inspection, e.g. distinguishing between "GET" and "POST"
1301  // requests to the same endpoint.
1303  {
1304  public:
1306  {
1307  struct Subscribe
1308  {
1311  };
1312 
1313  // Any additional post-processing cases will add additional
1314  // cases into this variant.
1315  Variant<Subscribe> state;
1316  };
1317 
1318  explicit ReadOnlyHandler(const Master* _master) : master(_master) {}
1319 
1320  // /frameworks
1321  std::pair<process::http::Response, Option<PostProcessing>> frameworks(
1322  ContentType outputContentType,
1323  const hashmap<std::string, std::string>& queryParameters,
1324  const process::Owned<ObjectApprovers>& approvers) const;
1325 
1326  // /roles
1327  std::pair<process::http::Response, Option<PostProcessing>> roles(
1328  ContentType outputContentType,
1329  const hashmap<std::string, std::string>& queryParameters,
1330  const process::Owned<ObjectApprovers>& approvers) const;
1331 
1332  // /slaves
1333  std::pair<process::http::Response, Option<PostProcessing>> slaves(
1334  ContentType outputContentType,
1335  const hashmap<std::string, std::string>& queryParameters,
1336  const process::Owned<ObjectApprovers>& approvers) const;
1337 
1338  // /state
1339  std::pair<process::http::Response, Option<PostProcessing>> state(
1340  ContentType outputContentType,
1341  const hashmap<std::string, std::string>& queryParameters,
1342  const process::Owned<ObjectApprovers>& approvers) const;
1343 
1344  // /state-summary
1345  std::pair<process::http::Response, Option<PostProcessing>> stateSummary(
1346  ContentType outputContentType,
1347  const hashmap<std::string, std::string>& queryParameters,
1348  const process::Owned<ObjectApprovers>& approvers) const;
1349 
1350  // /tasks
1351  std::pair<process::http::Response, Option<PostProcessing>> tasks(
1352  ContentType outputContentType,
1353  const hashmap<std::string, std::string>& queryParameters,
1354  const process::Owned<ObjectApprovers>& approvers) const;
1355 
1356  // master::Call::GET_STATE
1357  std::pair<process::http::Response, Option<PostProcessing>> getState(
1358  ContentType outputContentType,
1359  const hashmap<std::string, std::string>& queryParameters,
1360  const process::Owned<ObjectApprovers>& approvers) const;
1361 
1362  // master::Call::GET_AGENTS
1363  std::pair<process::http::Response, Option<PostProcessing>> getAgents(
1364  ContentType outputContentType,
1365  const hashmap<std::string, std::string>& queryParameters,
1366  const process::Owned<ObjectApprovers>& approvers) const;
1367 
1368  // master::Call::GET_FRAMEWORKS
1369  std::pair<process::http::Response, Option<PostProcessing>> getFrameworks(
1370  ContentType outputContentType,
1371  const hashmap<std::string, std::string>& queryParameters,
1372  const process::Owned<ObjectApprovers>& approvers) const;
1373 
1374  // master::Call::GET_EXECUTORS
1375  std::pair<process::http::Response, Option<PostProcessing>> getExecutors(
1376  ContentType outputContentType,
1377  const hashmap<std::string, std::string>& queryParameters,
1378  const process::Owned<ObjectApprovers>& approvers) const;
1379 
1380  // master::Call::GET_OPERATIONS
1381  std::pair<process::http::Response, Option<PostProcessing>> getOperations(
1382  ContentType outputContentType,
1383  const hashmap<std::string, std::string>& queryParameters,
1384  const process::Owned<ObjectApprovers>& approvers) const;
1385 
1386  // master::Call::GET_TASKS
1387  std::pair<process::http::Response, Option<PostProcessing>> getTasks(
1388  ContentType outputContentType,
1389  const hashmap<std::string, std::string>& queryParameters,
1390  const process::Owned<ObjectApprovers>& approvers) const;
1391 
1392  // master::Call::GET_ROLES
1393  std::pair<process::http::Response, Option<PostProcessing>> getRoles(
1394  ContentType outputContentType,
1395  const hashmap<std::string, std::string>& queryParameters,
1396  const process::Owned<ObjectApprovers>& approvers) const;
1397 
1398  // master::Call::SUBSCRIBE
1399  std::pair<process::http::Response, Option<PostProcessing>> subscribe(
1400  ContentType outputContentType,
1401  const hashmap<std::string, std::string>& queryParameters,
1402  const process::Owned<ObjectApprovers>& approvers) const;
1403 
1404  private:
1405  std::string serializeGetState(
1406  const process::Owned<ObjectApprovers>& approvers) const;
1407  std::string serializeGetAgents(
1408  const process::Owned<ObjectApprovers>& approvers) const;
1409  std::string serializeGetFrameworks(
1410  const process::Owned<ObjectApprovers>& approvers) const;
1411  std::string serializeGetExecutors(
1412  const process::Owned<ObjectApprovers>& approvers) const;
1413  std::string serializeGetOperations(
1414  const process::Owned<ObjectApprovers>& approvers) const;
1415  std::string serializeGetTasks(
1416  const process::Owned<ObjectApprovers>& approvers) const;
1417  std::string serializeGetRoles(
1418  const process::Owned<ObjectApprovers>& approvers) const;
1419  std::string serializeSubscribe(
1420  const process::Owned<ObjectApprovers>& approvers) const;
1421 
1422  std::function<void(JSON::ObjectWriter*)> jsonifyGetState(
1423  const process::Owned<ObjectApprovers>& approvers) const;
1424  std::function<void(JSON::ObjectWriter*)> jsonifyGetAgents(
1425  const process::Owned<ObjectApprovers>& approvers) const;
1426  std::function<void(JSON::ObjectWriter*)> jsonifyGetFrameworks(
1427  const process::Owned<ObjectApprovers>& approvers) const;
1428  std::function<void(JSON::ObjectWriter*)> jsonifyGetExecutors(
1429  const process::Owned<ObjectApprovers>& approvers) const;
1430  std::function<void(JSON::ObjectWriter*)> jsonifyGetOperations(
1431  const process::Owned<ObjectApprovers>& approvers) const;
1432  std::function<void(JSON::ObjectWriter*)> jsonifyGetTasks(
1433  const process::Owned<ObjectApprovers>& approvers) const;
1434  std::function<void(JSON::ObjectWriter*)> jsonifyGetRoles(
1435  const process::Owned<ObjectApprovers>& approvers) const;
1436  std::function<void(JSON::ObjectWriter*)> jsonifySubscribe(
1437  const process::Owned<ObjectApprovers>& approvers) const;
1438 
1439  const Master* master;
1440  };
1441 
1442 private:
1443  // Inner class used to namespace HTTP route handlers (see
1444  // master/http.cpp for implementations).
1445  class Http
1446  {
1447  public:
1448  explicit Http(Master* _master) : master(_master),
1449  readonlyHandler(_master),
1450  quotaHandler(_master),
1451  weightsHandler(_master) {}
1452 
1453  // /api/v1
1455  const process::http::Request& request,
1457  principal) const;
1458 
1459  // /api/v1/scheduler
1461  const process::http::Request& request,
1463  principal) const;
1464 
1465  // /master/create-volumes
1467  const process::http::Request& request,
1469  principal) const;
1470 
1471  // /master/destroy-volumes
1473  const process::http::Request& request,
1475  principal) const;
1476 
1477  // /master/flags
1479  const process::http::Request& request,
1481  principal) const;
1482 
1483  // /master/frameworks
1484  //
1485  // NOTE: Requests to this endpoint are batched.
1487  const process::http::Request& request,
1489  principal) const;
1490 
1491  // /master/health
1493  const process::http::Request& request) const;
1494 
1495  // /master/redirect
1497  const process::http::Request& request) const;
1498 
1499  // /master/reserve
1501  const process::http::Request& request,
1503  principal) const;
1504 
1505  // /master/roles
1506  //
1507  // NOTE: Requests to this endpoint are batched.
1509  const process::http::Request& request,
1511  principal) const;
1512 
1513  // /master/teardown
1515  const process::http::Request& request,
1517  principal) const;
1518 
1519  // /master/slaves
1520  //
1521  // NOTE: Requests to this endpoint are batched.
1523  const process::http::Request& request,
1525  principal) const;
1526 
1527  // /master/state
1528  //
1529  // NOTE: Requests to this endpoint are batched.
1531  const process::http::Request& request,
1533  principal) const;
1534 
1535  // /master/state-summary
1536  //
1537  // NOTE: Requests to this endpoint are batched.
1539  const process::http::Request& request,
1541  principal) const;
1542 
1543  // /master/tasks
1544  //
1545  // NOTE: Requests to this endpoint are batched.
1547  const process::http::Request& request,
1549  principal) const;
1550 
1551  // /master/maintenance/schedule
1552  process::Future<process::http::Response> maintenanceSchedule(
1553  const process::http::Request& request,
1555  principal) const;
1556 
1557  // /master/maintenance/status
1558  process::Future<process::http::Response> maintenanceStatus(
1559  const process::http::Request& request,
1561  principal) const;
1562 
1563  // /master/machine/down
1565  const process::http::Request& request,
1567  principal) const;
1568 
1569  // /master/machine/up
1571  const process::http::Request& request,
1573  principal) const;
1574 
1575  // /master/unreserve
1577  const process::http::Request& request,
1579  principal) const;
1580 
1581  // /master/weights
1583  const process::http::Request& request,
1585  principal) const;
1586 
1587  // /master/quota (DEPRECATED).
1589  const process::http::Request& request,
1591  principal) const;
1592 
1593  static std::string API_HELP();
1594  static std::string SCHEDULER_HELP();
1595  static std::string FLAGS_HELP();
1596  static std::string FRAMEWORKS_HELP();
1597  static std::string HEALTH_HELP();
1598  static std::string REDIRECT_HELP();
1599  static std::string ROLES_HELP();
1600  static std::string TEARDOWN_HELP();
1601  static std::string SLAVES_HELP();
1602  static std::string STATE_HELP();
1603  static std::string STATESUMMARY_HELP();
1604  static std::string TASKS_HELP();
1605  static std::string MAINTENANCE_SCHEDULE_HELP();
1606  static std::string MAINTENANCE_STATUS_HELP();
1607  static std::string MACHINE_DOWN_HELP();
1608  static std::string MACHINE_UP_HELP();
1609  static std::string CREATE_VOLUMES_HELP();
1610  static std::string DESTROY_VOLUMES_HELP();
1611  static std::string RESERVE_HELP();
1612  static std::string UNRESERVE_HELP();
1613  static std::string QUOTA_HELP();
1614  static std::string WEIGHTS_HELP();
1615 
1616  private:
1617  JSON::Object __flags() const;
1618 
1619  class FlagsError; // Forward declaration.
1620 
1623  principal) const;
1624 
1626  const size_t limit,
1627  const size_t offset,
1628  const std::string& order,
1630  principal) const;
1631 
1633  const FrameworkID& id,
1635  principal) const;
1636 
1638  const FrameworkID& id) const;
1639 
1640  process::Future<process::http::Response> _updateMaintenanceSchedule(
1641  const mesos::maintenance::Schedule& schedule,
1643  principal) const;
1644 
1645  process::Future<process::http::Response> __updateMaintenanceSchedule(
1646  const mesos::maintenance::Schedule& schedule,
1647  const process::Owned<ObjectApprovers>& approvers) const;
1648 
1649  process::Future<process::http::Response> ___updateMaintenanceSchedule(
1650  const mesos::maintenance::Schedule& schedule,
1651  bool applied) const;
1652 
1653  mesos::maintenance::Schedule _getMaintenanceSchedule(
1654  const process::Owned<ObjectApprovers>& approvers) const;
1655 
1657  const process::Owned<ObjectApprovers>& approvers) const;
1658 
1659  process::Future<process::http::Response> _startMaintenance(
1660  const google::protobuf::RepeatedPtrField<MachineID>& machineIds,
1661  const process::Owned<ObjectApprovers>& approvers) const;
1662 
1664  const google::protobuf::RepeatedPtrField<MachineID>& machineIds,
1665  const process::Owned<ObjectApprovers>& approvers) const;
1666 
1668  const SlaveID& slaveId,
1669  const Option<DurationInfo>& maxGracePeriod,
1670  const bool markGone,
1671  const process::Owned<ObjectApprovers>& approvers) const;
1672 
1674  const SlaveID& slaveId,
1675  const process::Owned<ObjectApprovers>& approvers) const;
1676 
1678  const SlaveID& slaveId,
1679  const process::Owned<ObjectApprovers>& approvers) const;
1680 
1682  const SlaveID& slaveId,
1683  const google::protobuf::RepeatedPtrField<Resource>& source,
1684  const google::protobuf::RepeatedPtrField<Resource>& resources,
1686  principal) const;
1687 
1689  const SlaveID& slaveId,
1690  const google::protobuf::RepeatedPtrField<Resource>& resources,
1692  principal) const;
1693 
1695  const SlaveID& slaveId,
1696  const google::protobuf::RepeatedPtrField<Resource>& volumes,
1698  principal) const;
1699 
1701  const SlaveID& slaveId,
1702  const google::protobuf::RepeatedPtrField<Resource>& volumes,
1704  principal) const;
1705 
1723  const SlaveID& slaveId,
1724  const Offer::Operation& operation) const;
1725 
1726  // Master API handlers.
1727 
1729  const mesos::master::Call& call,
1731  ContentType contentType) const;
1732 
1734  const mesos::master::Call& call,
1736  ContentType contentType) const;
1737 
1739  const mesos::master::Call& call,
1741  ContentType contentType) const;
1742 
1744  const mesos::master::Call& call,
1746  ContentType contentType) const;
1747 
1749  const mesos::master::Call& call,
1751  ContentType contentType) const;
1752 
1754  const mesos::master::Call& call,
1756  ContentType contentType) const;
1757 
1759  const mesos::master::Call& call,
1761  ContentType contentType) const;
1762 
1764  const mesos::master::Call& call,
1766  ContentType contentType) const;
1767 
1769  const mesos::master::Call& call,
1771  ContentType contentType) const;
1772 
1774  const mesos::master::Call& call,
1776  ContentType contentType) const;
1777 
1778  process::Future<process::http::Response> updateMaintenanceSchedule(
1779  const mesos::master::Call& call,
1781  ContentType contentType) const;
1782 
1783  process::Future<process::http::Response> getMaintenanceSchedule(
1784  const mesos::master::Call& call,
1786  ContentType contentType) const;
1787 
1788  process::Future<process::http::Response> getMaintenanceStatus(
1789  const mesos::master::Call& call,
1791  ContentType contentType) const;
1792 
1794  const mesos::master::Call& call,
1796  ContentType contentType) const;
1797 
1799  const mesos::master::Call& call,
1801  ContentType contentType) const;
1802 
1804  const mesos::master::Call& call,
1806  ContentType contentType) const;
1807 
1809  const mesos::master::Call& call,
1811  ContentType contentType) const;
1812 
1814  const mesos::master::Call& call,
1816  ContentType contentType) const;
1817 
1819  const mesos::master::Call& call,
1821  ContentType contentType) const;
1822 
1824  const mesos::master::Call& call,
1826  ContentType contentType) const;
1827 
1829  const mesos::master::Call& call,
1831  ContentType contentType) const;
1832 
1834  const mesos::master::Call& call,
1836  ContentType contentType) const;
1837 
1839  const mesos::master::Call& call,
1841  ContentType contentType) const;
1842 
1844  const mesos::master::Call& call,
1846  ContentType contentType) const;
1847 
1849  const mesos::master::Call& call,
1851  ContentType contentType) const;
1852 
1853  process::Future<process::http::Response> unreserveResources(
1854  const mesos::master::Call& call,
1856  ContentType contentType) const;
1857 
1859  const mesos::master::Call& call,
1861  ContentType contentType) const;
1862 
1864  const mesos::master::Call& call,
1866  ContentType contentType) const;
1867 
1869  const mesos::master::Call& call,
1871  ContentType contentType) const;
1872 
1873  static std::function<void(JSON::ObjectWriter*)> jsonifySubscribe(
1874  const Master* master,
1875  const process::Owned<ObjectApprovers>& approvers);
1876  std::string serializeSubscribe(
1877  const process::Owned<ObjectApprovers>& approvers) const;
1879  const mesos::master::Call& call,
1881  ContentType contentType) const;
1882 
1884  const mesos::master::Call& call,
1886  ContentType contentType) const;
1887 
1889  const mesos::master::Call& call,
1891  ContentType contentType) const;
1892 
1894  const mesos::master::Call& call,
1896  ContentType contentType) const;
1897 
1899  const SlaveID& slaveId) const;
1900 
1901  process::Future<process::http::Response> reconcileOperations(
1902  Framework* framework,
1903  const mesos::scheduler::Call::ReconcileOperations& call,
1904  ContentType contentType) const;
1905 
1906  Master* master;
1907 
1908  ReadOnlyHandler readonlyHandler;
1909 
1910  // NOTE: The quota specific pieces of the Operator API are factored
1911  // out into this separate class.
1912  QuotaHandler quotaHandler;
1913 
1914  // NOTE: The weights specific pieces of the Operator API are factored
1915  // out into this separate class.
1916  WeightsHandler weightsHandler;
1917 
1918  // Since the Master actor is one of the most loaded in a typical Mesos
1919  // installation, we take some extra care to keep the backlog small.
1920  // In particular, all read-only requests are batched and executed in
1921  // parallel, instead of going through the master queue separately.
1922  // The post-processing step, that depends on the handler, will be
1923  // executed synchronously and serially after the parallel executions
1924  // complete.
1925 
1926  typedef std::pair<
1929  (Master::ReadOnlyHandler::*ReadOnlyRequestHandler)(
1930  ContentType,
1932  const process::Owned<ObjectApprovers>&) const;
1933 
1934  process::Future<process::http::Response> deferBatchedRequest(
1935  ReadOnlyRequestHandler handler,
1937  ContentType outputContentType,
1938  const hashmap<std::string, std::string>& queryParameters,
1939  const process::Owned<ObjectApprovers>& approvers) const;
1940 
1941  void processRequestsBatch() const;
1942 
1943  struct BatchedRequest
1944  {
1945  ReadOnlyRequestHandler handler;
1946  ContentType outputContentType;
1947  hashmap<std::string, std::string> queryParameters;
1951  };
1952 
1953  mutable std::vector<BatchedRequest> batchedRequests;
1954  };
1955 
1956  Master(const Master&); // No copying.
1957  Master& operator=(const Master&); // No assigning.
1958 
1959  friend struct Framework;
1960  friend struct FrameworkMetrics;
1961  friend struct Metrics;
1962  friend struct Role;
1963  friend struct Slave;
1964  friend struct SlavesWriter;
1965  friend struct Subscriber;
1966 
1967  // NOTE: Since 'getOffer', 'getInverseOffer' and 'slaves' are
1968  // protected, we need to make the following functions friends.
1969  friend Offer* validation::offer::getOffer(
1970  Master* master, const OfferID& offerId);
1971 
1972  friend InverseOffer* validation::offer::getInverseOffer(
1973  Master* master, const OfferID& offerId);
1974 
1976  Master* master, const SlaveID& slaveId);
1977 
1978  const Flags flags;
1979 
1980  Http http;
1981 
1982  Option<MasterInfo> leader; // Current leading master.
1983 
1984  mesos::allocator::Allocator* allocator;
1985  WhitelistWatcher* whitelistWatcher;
1986  Registrar* registrar;
1987  Files* files;
1988 
1991 
1992  const Option<Authorizer*> authorizer;
1993 
1994  MasterInfo info_;
1995 
1996  // Holds some info which affects how a machine behaves, as well as state that
1997  // represent the master's view of this machine. See the `MachineInfo` protobuf
1998  // and `Machine` struct for more information.
2000 
2001  struct Maintenance
2002  {
2003  // Holds the maintenance schedule, as given by the operator.
2004  std::list<mesos::maintenance::Schedule> schedules;
2005  } maintenance;
2006 
2007  // Indicates when recovery is complete. Recovery begins once the
2008  // master is elected as a leader.
2010 
2011  // If this is the leading master, we periodically check whether we
2012  // should GC some information from the registry.
2013  Option<process::Timer> registryGcTimer;
2014 
2015  struct Slaves
2016  {
2017  Slaves() : removed(MAX_REMOVED_SLAVES) {}
2018 
2019  // Imposes a time limit for slaves that we recover from the
2020  // registry to reregister with the master.
2021  Option<process::Timer> recoveredTimer;
2022 
2023  // Slaves that have been recovered from the registrar after master
2024  // failover. Slaves are removed from this collection when they
2025  // either reregister with the master or are marked unreachable
2026  // because they do not reregister before `recoveredTimer` fires.
2027  // We must not answer questions related to these slaves (e.g.,
2028  // during task reconciliation) until we determine their fate
2029  // because their are in this transitioning state.
2030  hashmap<SlaveID, SlaveInfo> recovered;
2031 
2032  // Agents that are in the process of (re-)registering. They are
2033  // maintained here while the (re-)registration is in progress and
2034  // possibly pending in the authorizer or the registrar in order
2035  // to help deduplicate (re-)registration requests.
2036  hashset<process::UPID> registering;
2037  hashset<SlaveID> reregistering;
2038 
2039  // Registered slaves are indexed by SlaveID and UPID. Note that
2040  // iteration is supported but is exposed as iteration over a
2041  // hashmap<SlaveID, Slave*> since it is tedious to convert
2042  // the map's key/value iterator into a value iterator.
2043  //
2044  // TODO(bmahler): Consider pulling in boost's multi_index,
2045  // or creating a simpler indexing abstraction in stout.
2046  struct
2047  {
2048  bool contains(const SlaveID& slaveId) const
2049  {
2050  return ids.contains(slaveId);
2051  }
2052 
2053  bool contains(const process::UPID& pid) const
2054  {
2055  return pids.contains(pid);
2056  }
2057 
2058  Slave* get(const SlaveID& slaveId) const
2059  {
2060  return ids.get(slaveId).getOrElse(nullptr);
2061  }
2062 
2063  Slave* get(const process::UPID& pid) const
2064  {
2065  return pids.get(pid).getOrElse(nullptr);
2066  }
2067 
2068  void put(Slave* slave)
2069  {
2070  CHECK_NOTNULL(slave);
2071  ids[slave->id] = slave;
2072  pids[slave->pid] = slave;
2073  }
2074 
2075  void remove(Slave* slave)
2076  {
2077  CHECK_NOTNULL(slave);
2078  ids.erase(slave->id);
2079  pids.erase(slave->pid);
2080  }
2081 
2082  void clear()
2083  {
2084  ids.clear();
2085  pids.clear();
2086  }
2087 
2088  size_t size() const { return ids.size(); }
2089 
2090  typedef hashmap<SlaveID, Slave*>::iterator iterator;
2091  typedef hashmap<SlaveID, Slave*>::const_iterator const_iterator;
2092 
2093  iterator begin() { return ids.begin(); }
2094  iterator end() { return ids.end(); }
2095 
2096  const_iterator begin() const { return ids.begin(); }
2097  const_iterator end() const { return ids.end(); }
2098 
2099  private:
2102  } registered;
2103 
2104  // Slaves that are in the process of being removed from the
2105  // registrar.
2106  hashset<SlaveID> removing;
2107 
2108  // Slaves that are in the process of being marked unreachable.
2109  hashset<SlaveID> markingUnreachable;
2110 
2111  // Slaves that are in the process of being marked gone.
2112  hashset<SlaveID> markingGone;
2113 
2114  // Agents which have been marked for draining, including recovered,
2115  // admitted, and unreachable agents. All draining agents will also
2116  // be deactivated. If an agent in this set reregisters, the master
2117  // will send it a `DrainSlaveMessage`.
2118  //
2119  // These values are checkpointed to the registry.
2120  hashmap<SlaveID, DrainInfo> draining;
2121 
2122  // Agents which have been deactivated, including recovered, admitted,
2123  // and unreachable agents. Agents in this set will not have resource
2124  // offers generated and will thus be unable to launch new operations,
2125  // but existing operations will be unaffected.
2126  //
2127  // These values are checkpointed to the registry.
2128  hashset<SlaveID> deactivated;
2129 
2130  // This collection includes agents that have gracefully shutdown,
2131  // as well as those that have been marked unreachable or gone. We
2132  // keep a cache here to prevent this from growing in an unbounded
2133  // manner.
2134  //
2135  // TODO(bmahler): Ideally we could use a cache with set semantics.
2136  //
2137  // TODO(neilc): Consider storing all agent IDs that have been
2138  // marked unreachable by this master.
2140 
2141  // Slaves that have been marked unreachable. We recover this from
2142  // the registry, so it includes slaves marked as unreachable by
2143  // other instances of the master. Note that we use a LinkedHashMap
2144  // to ensure the order of elements here matches the order in the
2145  // registry's unreachable list, which matches the order in which
2146  // agents are marked unreachable. This list is garbage collected;
2147  // GC behavior is governed by the `registry_gc_interval`,
2148  // `registry_max_agent_age`, and `registry_max_agent_count` flags.
2150 
2151  // This helps us look up all unreachable tasks on an agent so we can remove
2152  // them from their primary storage `framework.unreachableTasks` when an
2153  // agent reregisters. This map is bounded by the same GC behavior as
2154  // `unreachable`. When the agent is GC'd from unreachable it's also
2155  // erased from `unreachableTasks`.
2157  unreachableTasks;
2158 
2159  // Slaves that have been marked gone. We recover this from the
2160  // registry, so it includes slaves marked as gone by other instances
2161  // of the master. Note that we use a LinkedHashMap to ensure the order
2162  // of elements here matches the order in the registry's gone list, which
2163  // matches the order in which agents are marked gone.
2165 
2166  // This rate limiter is used to limit the removal of slaves failing
2167  // health checks.
2168  // NOTE: Using a 'shared_ptr' here is OK because 'RateLimiter' is
2169  // a wrapper around libprocess process which is thread safe.
2171  } slaves;
2172 
2173  struct Frameworks
2174  {
2175  Frameworks(const Flags& masterFlags)
2176  : completed(masterFlags.max_completed_frameworks) {}
2177 
2179 
2181 
2182  // Principals of frameworks keyed by PID.
2183  // NOTE: Multiple PIDs can map to the same principal. The
2184  // principal is None when the framework doesn't specify it.
2185  // The differences between this map and 'authenticated' are:
2186  // 1) This map only includes *registered* frameworks. The mapping
2187  // is added when a framework (re-)registers.
2188  // 2) This map includes unauthenticated frameworks (when Master
2189  // allows them) if they have principals specified in
2190  // FrameworkInfo.
2192 
2193  // BoundedRateLimiters keyed by the framework principal.
2194  // Like Metrics::Frameworks, all frameworks of the same principal
2195  // are throttled together at a common rate limit.
2197 
2198  // The default limiter is for frameworks not specified in
2199  // 'flags.rate_limits'.
2201  } frameworks;
2202 
2203  struct Subscribers
2204  {
2205  Subscribers(Master* _master, size_t maxSubscribers)
2206  : master(_master),
2207  subscribed(maxSubscribers) {};
2208 
2209  // Represents a client subscribed to the 'api/vX' endpoint.
2210  //
2211  // TODO(anand): Add support for filtering. Some subscribers
2212  // might only be interested in a subset of events.
2213  struct Subscriber
2214  {
2217  const process::Owned<ObjectApprovers>& _approvers)
2218  : http(_http),
2219  heartbeater(
2220  "subscriber " + stringify(http.streamId),
2221  []() {
2222  mesos::master::Event event;
2223  event.set_type(mesos::master::Event::HEARTBEAT);
2224  return event;
2225  }(),
2226  http,
2229  approvers(_approvers) {}
2230 
2231 
2232  // Not copyable, not assignable.
2233  Subscriber(const Subscriber&) = delete;
2234  Subscriber& operator=(const Subscriber&) = delete;
2235 
2236  // TODO(greggomann): Refactor this function into multiple event-specific
2237  // overloads. See MESOS-8475.
2238  void send(
2239  const mesos::master::Event& event,
2240  const Option<FrameworkInfo>& frameworkInfo,
2241  const Option<Task>& task);
2242 
2244  {
2245  // TODO(anand): Refactor `HttpConnection` to being a RAII class instead.
2246  // It is possible that a caller might accidentally invoke `close()`
2247  // after passing ownership to the `Subscriber` object. See MESOS-5843
2248  // for more details.
2249  http.close();
2250  }
2251 
2255  };
2256 
2257  // Sends the event to all subscribers connected to the 'api/vX' endpoint.
2258  void send(
2259  const mesos::master::Event& event,
2260  const Option<FrameworkInfo>& frameworkInfo = None(),
2261  const Option<Task>& task = None());
2262 
2263  Master* master;
2264 
2265  // Active subscribers to the 'api/vX' endpoint keyed by the stream
2266  // identifier.
2268  };
2269 
2270  Subscribers subscribers;
2271 
2272  hashmap<OfferID, Offer*> offers;
2274 
2275  hashmap<OfferID, InverseOffer*> inverseOffers;
2276  hashmap<OfferID, process::Timer> inverseOfferTimers;
2277 
2278  // We track information about roles that we're aware of in the system.
2279  // Specifically, we keep track of the roles when a framework subscribes to
2280  // the role, and/or when there are resources allocated to the role
2281  // (e.g. some tasks and/or executors are consuming resources under the role).
2283 
2284  // Configured role whitelist if using the (deprecated) "explicit
2285  // roles" feature. If this is `None`, any role is allowed.
2286  Option<hashset<std::string>> roleWhitelist;
2287 
2288  // Configured weight for each role, if any. If a role does not
2289  // appear here, it has the default weight of 1.
2291 
2292  // Configured quota for each role, if any. We store quotas by role
2293  // because we set them at the role level.
2295 
2296  // Authenticator names as supplied via flags.
2297  std::vector<std::string> authenticatorNames;
2298 
2299  Option<Authenticator*> authenticator;
2300 
2301  // Frameworks/slaves that are currently in the process of authentication.
2302  // 'authenticating' future is completed when authenticator
2303  // completes authentication.
2304  // The future is removed from the map when master completes authentication.
2306 
2307  // Principals of authenticated frameworks/slaves keyed by PID.
2309 
2310  int64_t nextFrameworkId; // Used to give each framework a unique ID.
2311  int64_t nextOfferId; // Used to give each slot offer a unique ID.
2312  int64_t nextSlaveId; // Used to give each slave a unique ID.
2313 
2314  // NOTE: It is safe to use a 'shared_ptr' because 'Metrics' is
2315  // thread safe.
2316  // TODO(dhamon): This does not need to be a shared_ptr. Metrics contains
2317  // copyable metric types only.
2318  std::shared_ptr<Metrics> metrics;
2319 
2320  // PullGauge handlers.
2321  double _uptime_secs()
2322  {
2323  return (process::Clock::now() - startTime).secs();
2324  }
2325 
2326  double _elected()
2327  {
2328  return elected() ? 1 : 0;
2329  }
2330 
2331  double _slaves_connected();
2332  double _slaves_disconnected();
2333  double _slaves_active();
2334  double _slaves_inactive();
2335  double _slaves_unreachable();
2336 
2337  // TODO(bevers): Remove these and make the above functions
2338  // const instead after MESOS-4995 is resolved.
2339  double _const_slaves_connected() const;
2340  double _const_slaves_disconnected() const;
2341  double _const_slaves_active() const;
2342  double _const_slaves_inactive() const;
2343  double _const_slaves_unreachable() const;
2344 
2345  double _frameworks_connected();
2346  double _frameworks_disconnected();
2347  double _frameworks_active();
2348  double _frameworks_inactive();
2349 
2350  double _outstanding_offers()
2351  {
2352  return static_cast<double>(offers.size());
2353  }
2354 
2355  double _event_queue_messages()
2356  {
2357  return static_cast<double>(eventCount<process::MessageEvent>());
2358  }
2359 
2360  double _event_queue_dispatches()
2361  {
2362  return static_cast<double>(eventCount<process::DispatchEvent>());
2363  }
2364 
2365  double _event_queue_http_requests()
2366  {
2367  return static_cast<double>(eventCount<process::HttpEvent>());
2368  }
2369 
2370  double _tasks_staging();
2371  double _tasks_starting();
2372  double _tasks_running();
2373  double _tasks_unreachable();
2374  double _tasks_killing();
2375 
2376  double _resources_total(const std::string& name);
2377  double _resources_used(const std::string& name);
2378  double _resources_percent(const std::string& name);
2379 
2380  double _resources_revocable_total(const std::string& name);
2381  double _resources_revocable_used(const std::string& name);
2382  double _resources_revocable_percent(const std::string& name);
2383 
2384  process::Time startTime; // Start time used to calculate uptime.
2385 
2386  Option<process::Time> electedTime; // Time when this master is elected.
2387 };
2388 
2389 
2390 inline std::ostream& operator<<(
2391  std::ostream& stream,
2392  const Framework& framework);
2393 
2394 
2395 // TODO(bmahler): Keeping the task and executor information in sync
2396 // across the Slave and Framework structs is error prone!
2398 {
2399  enum State
2400  {
2401  // Framework has never connected to this master. This implies the
2402  // master failed over and the framework has not yet reregistered,
2403  // but some framework state has been recovered from reregistering
2404  // agents that are running tasks for the framework.
2406 
2407  // The framework is connected. The framework may or may not be eligible to
2408  // receive offers; this property is tracked separately.
2410 
2411  // Framework was previously connected to this master,
2412  // but is not connected now.
2413  DISCONNECTED
2414  };
2415 
2416  Framework(
2417  Master* const master,
2418  const Flags& masterFlags,
2419  const FrameworkInfo& info,
2420  const process::UPID& _pid,
2421  const process::Owned<ObjectApprovers>& objectApprovers,
2423 
2424  Framework(Master* const master,
2425  const Flags& masterFlags,
2426  const FrameworkInfo& info,
2428  const process::Owned<ObjectApprovers>& objectApprovers,
2430 
2431  Framework(Master* const master,
2432  const Flags& masterFlags,
2433  const FrameworkInfo& info);
2434 
2435  ~Framework();
2436 
2437  Task* getTask(const TaskID& taskId);
2438 
2439  void addTask(Task* task);
2440 
2441  // Update framework to recover the resources that were previously
2442  // being used by `task`.
2443  //
2444  // TODO(bmahler): This is a hack for performance. We need to
2445  // maintain resource counters because computing task resources
2446  // functionally for all tasks is expensive, for now.
2447  void recoverResources(Task* task);
2448 
2449  // Sends a message to the connected framework.
2450  template <typename Message>
2451  void send(const Message& message);
2452 
2453  void addCompletedTask(Task&& task);
2454 
2455  void addUnreachableTask(const Task& task);
2456 
2457  // Removes the task. `unreachable` indicates whether the task is removed due
2458  // to being unreachable. Note that we cannot rely on the task state because
2459  // it may not reflect unreachability due to being set to TASK_LOST for
2460  // backwards compatibility.
2461  void removeTask(Task* task, bool unreachable);
2462 
2463  void addOffer(Offer* offer);
2464 
2465  void removeOffer(Offer* offer);
2466 
2467  void addInverseOffer(InverseOffer* inverseOffer);
2468 
2469  void removeInverseOffer(InverseOffer* inverseOffer);
2470 
2471  bool hasExecutor(const SlaveID& slaveId,
2472  const ExecutorID& executorId);
2473 
2474  void addExecutor(const SlaveID& slaveId,
2475  const ExecutorInfo& executorInfo);
2476 
2477  void removeExecutor(const SlaveID& slaveId,
2478  const ExecutorID& executorId);
2479 
2480  void addOperation(Operation* operation);
2481 
2482  Option<Operation*> getOperation(const OperationID& id);
2483 
2484  void recoverResources(Operation* operation);
2485 
2486  void removeOperation(Operation* operation);
2487 
2488  const FrameworkID id() const;
2489 
2490  // Update fields in 'info' using those in 'newInfo'. Currently this
2491  // only updates `role`/`roles`, 'name', 'failover_timeout', 'hostname',
2492  // 'webui_url', 'capabilities', and 'labels'.
2493  void update(const FrameworkInfo& newInfo);
2494 
2495  // Reactivate framework with new connection: update connection-related state
2496  // and mark the framework as CONNECTED, regardless of the previous state.
2497  void updateConnection(
2498  const process::UPID& newPid,
2499  const process::Owned<ObjectApprovers>& objectApprovers);
2500 
2501  void updateConnection(
2503  const process::Owned<ObjectApprovers>& objectApprovers);
2504 
2505  // If the framework is CONNECTED, clear all state associated with
2506  // the scheduler being connected (close http connection, stop heartbeater,
2507  // clear object approvers, etc.), mark the framework DISCONNECTED and return
2508  // `true`. Otherwise, return `false`.
2509  bool disconnect();
2510 
2511  // Mark the framework as active (eligible to receive offers if connected)
2512  // or inactive. Returns true if this property changed, false otherwise.
2513  bool activate();
2514  bool deactivate();
2515 
2516  void heartbeat();
2517 
2518  bool active() const { return active_; }
2519 
2520  bool connected() const {return state == State::CONNECTED;}
2521  bool recovered() const {return state == State::RECOVERED;}
2522 
2523  bool isTrackedUnderRole(const std::string& role) const;
2524  void trackUnderRole(const std::string& role);
2525  void untrackUnderRole(const std::string& role);
2526 
2528  {
2529  return http_;
2530  }
2531 
2532  const Option<process::UPID>& pid() const { return pid_; }
2533 
2534  // Returns ObjectApprovers for all actions
2535  // needed to authorize scheduler API calls.
2536  static process::Future<process::Owned<ObjectApprovers>> createObjectApprovers(
2537  const Option<Authorizer*>& _authorizer,
2538  const FrameworkInfo& frameworkInfo);
2539 
2540  // Returns whether the framework principal is authorized to perform
2541  // action on object.
2542  Try<bool> approved(const authorization::ActionObject& actionObject) const;
2543 
2544  Master* const master;
2545 
2546  FrameworkInfo info;
2547 
2548  std::set<std::string> roles;
2549 
2551 
2555 
2556  // TODO(bmahler): Make this private to enforce that `addTask()` and
2557  // `removeTask()` are used, and provide a const view into the tasks.
2559 
2560  // Tasks launched by this framework that have reached a terminal
2561  // state and have had all their updates acknowledged. We only keep a
2562  // fixed-size cache to avoid consuming too much memory. We use
2563  // circular_buffer rather than BoundedHashMap because there
2564  // can be multiple completed tasks with the same task ID.
2565  circular_buffer<process::Owned<Task>> completedTasks;
2566 
2567  // When an agent is marked unreachable, tasks running on it are stored
2568  // here. We only keep a fixed-size cache to avoid consuming too much memory.
2569  // NOTE: Non-partition-aware unreachable tasks in this map are marked
2570  // TASK_LOST instead of TASK_UNREACHABLE for backward compatibility.
2572 
2573  hashset<Offer*> offers; // Active offers for framework.
2574 
2575  hashset<InverseOffer*> inverseOffers; // Active inverse offers for framework.
2576 
2577  // TODO(bmahler): Make this private to enforce that `addExecutor()`
2578  // and `removeExecutor()` are used, and provide a const view into
2579  // the executors.
2581 
2582  // Pending operations or terminal operations that have
2583  // unacknowledged status updates.
2585 
2586  // The map from the framework-specified operation ID to the
2587  // corresponding internal operation UUID.
2589 
2590  // NOTE: For the used and offered resources below, we keep the
2591  // total as well as partitioned by SlaveID.
2592  // We expose the total resources via the HTTP endpoint, and we
2593  // keep a running total of the resources because looping over the
2594  // slaves to sum the resources has led to perf issues (MESOS-1862).
2595  // We keep the resources partitioned by SlaveID because non-scalar
2596  // resources can be lost when summing them up across multiple
2597  // slaves (MESOS-2373).
2598  //
2599  // Also note that keeping the totals is safe even though it yields
2600  // incorrect results for non-scalar resources.
2601  // (1) For overlapping set items / ranges across slaves, these
2602  // will get added N times but only represented once.
2603  // (2) When an initial subtraction occurs (N-1), the resource is
2604  // no longer represented. (This is the source of the bug).
2605  // (3) When any further subtractions occur (N-(1+M)), the
2606  // Resources simply ignores the subtraction since there's
2607  // nothing to remove, so this is safe for now.
2608 
2609  // TODO(mpark): Strip the non-scalar resources out of the totals
2610  // in order to avoid reporting incorrect statistics (MESOS-2623).
2611 
2612  // Active task / executor / operation resources.
2614 
2615  // Note that we maintain multiple copies of each shared resource in
2616  // `usedResources` as they are used by multiple tasks.
2618 
2619  // Offered resources.
2622 
2623  // This is used for per-framework metrics.
2625 
2626 private:
2627  Framework(Master* const _master,
2628  const Flags& masterFlags,
2629  const FrameworkInfo& _info,
2630  State state,
2631  bool active,
2632  const process::Owned<ObjectApprovers>& objectApprovers,
2633  const process::Time& time);
2634 
2635  Framework(const Framework&); // No copying.
2636  Framework& operator=(const Framework&); // No assigning.
2637 
2638  // Indicates whether this framework should be receiving offers
2639  // when it is connected.
2640  bool active_;
2641 
2642  // NOTE: `state` should never modified by means other than `setState()`.
2643  //
2644  // TODO(asekretenko): Encapsulate `state` to ensure that `metrics.subscribed`
2645  // is updated together with any `state` change.
2646  State state;
2647 
2648  void setState(State state_);
2649 
2650  // Frameworks can either be connected via HTTP or by message passing
2651  // (scheduler driver). At most one of `http` and `pid` will be set
2652  // according to the last connection made by the framework; neither
2653  // field will be set if the framework is in state `RECOVERED`.
2655  Option<process::UPID> pid_;
2656 
2657  // This is only set for HTTP frameworks.
2659  heartbeater;
2660 
2661  // ObjectApprovers for the framework's principal.
2662  process::Owned<ObjectApprovers> objectApprovers;
2663 };
2664 
2665 
2666 // Sends a message to the connected framework.
2667 template <typename Message>
2668 void Framework::send(const Message& message)
2669 {
2670  metrics.incrementEvent(message);
2671 
2672  if (!connected()) {
2673  LOG(WARNING) << "Master attempting to send message to disconnected"
2674  << " framework " << *this;
2675 
2676  // NOTE: We proceed here without returning to support the case where a
2677  // "disconnected" framework is still talking to the master and the master
2678  // wants to shut it down by sending a `FrameworkErrorMessage`. This can
2679  // occur in a one-way network partition where the master -> framework link
2680  // is broken but the framework -> master link remains intact. Note that we
2681  // have no periodic heartbeats between the master and pid-based schedulers.
2682  //
2683  // TODO(chhsiao): Update the `FrameworkErrorMessage` call-sites that rely on
2684  // the lack of a `return` here to directly call `process::send` so that this
2685  // function doesn't need to deal with the special case. Then we can check
2686  // that one of `http` or `pid` is set if the framework is connected.
2687  }
2688 
2689  if (http_.isSome()) {
2690  if (!http_->send(message)) {
2691  LOG(WARNING) << "Unable to send message to framework " << *this << ":"
2692  << " connection closed";
2693  }
2694  } else if (pid().isSome()) {
2695  master->send(pid().get(), message);
2696  } else {
2697  LOG(WARNING) << "Unable to send message to framework " << *this << ":"
2698  << " framework is recovered but has not reregistered";
2699  }
2700 }
2701 
2702 
2703 // TODO(bevers): Check if there is anything preventing us from
2704 // returning a const reference here.
2705 inline const FrameworkID Framework::id() const
2706 {
2707  return info.id();
2708 }
2709 
2710 
2711 
2712 inline std::ostream& operator<<(
2713  std::ostream& stream,
2714  const Framework& framework)
2715 {
2716  // TODO(vinod): Also log the hostname once FrameworkInfo is properly
2717  // updated on framework failover (MESOS-1784).
2718  stream << framework.id() << " (" << framework.info.name() << ")";
2719 
2720  if (framework.pid().isSome()) {
2721  stream << " at " << framework.pid().get();
2722  }
2723 
2724  return stream;
2725 }
2726 
2727 
2728 // Information about an active role.
2729 struct Role
2730 {
2731  Role() = delete;
2732 
2733  Role(const Master* _master,
2734  const std::string& _role)
2735  : master(_master), role(_role) {}
2736 
2737  void addFramework(Framework* framework)
2738  {
2739  frameworks[framework->id()] = framework;
2740  }
2741 
2742  void removeFramework(Framework* framework)
2743  {
2744  frameworks.erase(framework->id());
2745  }
2746 
2747  const Master* master;
2748  const std::string role;
2749 
2750  // NOTE: The dynamic role/quota relation is stored in and administrated
2751  // by the master. There is no direct representation of quota information
2752  // here to avoid duplication and to support that an operator can associate
2753  // quota with a role before the role is created. Such ordering of operator
2754  // requests prevents a race of premature unbounded allocation that setting
2755  // quota first is intended to contain.
2756 
2758 };
2759 
2760 
2761 mesos::master::Response::GetFrameworks::Framework model(
2762  const Framework& framework);
2763 
2764 
2765 } // namespace master {
2766 } // namespace internal {
2767 } // namespace mesos {
2768 
2769 #endif // __MASTER_HPP__
Protocol< RecoverRequest, RecoverResponse > recover
Definition: path.hpp:29
ReadOnlyHandler(const Master *_master)
Definition: master.hpp:1318
hashmap< ResourceProviderID, ResourceProvider > resourceProviders
Definition: master.hpp:340
Definition: nothing.hpp:16
hashmap< TaskID, Task * > tasks
Definition: master.hpp:2558
Definition: master.hpp:2729
Master *const master
Definition: master.hpp:2544
ContentType
Definition: http.hpp:43
hashmap< UUID, Operation * > operations
Definition: master.hpp:255
std::ostream & operator<<(std::ostream &stream, const Future< T > &future)
Definition: future.hpp:1826
Try< bool > update(const std::string &link, const Handle &parent, uint16_t protocol, const action::Mirror &mirror)
void finalize(bool finalize_wsa=false)
Clean up the library.
Try< Bytes > size(const std::string &path, const FollowSymlink follow=FollowSymlink::FOLLOW_SYMLINK)
Definition: stat.hpp:130
Future< Response > request(const Request &request, bool streamedResponse=false)
Asynchronously sends an HTTP request to the process and returns the HTTP response once the entire res...
SlaveInfo info
Definition: master.hpp:201
bool connected() const
Definition: master.hpp:2520
Definition: master.hpp:27
process::Owned< ObjectApprovers > approvers
Definition: master.hpp:1309
Definition: check.hpp:33
const SlaveID id
Definition: master.hpp:200
hashset< Offer * > offers
Definition: master.hpp:271
Option< process::Timer > reregistrationTimer
Definition: master.hpp:230
bool connected
Definition: master.hpp:217
const Option< StreamingHttpConnection< v1::scheduler::Event > > & http() const
Definition: master.hpp:2527
Definition: resource_quantities.hpp:63
bool initialize(const Option< std::string > &delegate=None(), const Option< std::string > &readwriteAuthenticationRealm=None(), const Option< std::string > &readonlyAuthenticationRealm=None())
Initialize the library.
Resources totalResources
Definition: master.hpp:297
constexpr Duration DEFAULT_HEARTBEAT_INTERVAL
Definition: constants.hpp:61
Definition: protobuf_utils.hpp:332
hashmap< SlaveID, Resources > offeredResources
Definition: master.hpp:2621
protobuf::framework::Capabilities capabilities
Definition: master.hpp:2550
const Option< process::UPID > & pid() const
Definition: master.hpp:2532
std::set< std::string > getRoles(const FrameworkInfo &frameworkInfo)
Result< ProcessStatus > status(pid_t pid)
Definition: proc.hpp:166
InverseOffer * getInverseOffer(Master *master, const OfferID &offerId)
Try< Nothing > machines(const google::protobuf::RepeatedPtrField< MachineID > &ids)
Performs the following checks on a list of machines:
mesos::v1::scheduler::Call Call
Definition: mesos.hpp:2797
Definition: resources.hpp:83
Resources totalUsedResources
Definition: master.hpp:2613
Slave * getSlave(Master *master, const SlaveID &slaveId)
Definition: authorization.hpp:36
Definition: flags.hpp:42
Definition: registrar.hpp:88
void addFramework(Framework *framework)
Definition: master.hpp:2737
Option< Error > reregisterSlave(const ReregisterSlaveMessage &message)
Definition: files.hpp:73
Operation
Definition: cgroups.hpp:458
Future< Nothing > redirect(int_fd from, Option< int_fd > to, size_t chunk=4096, const std::vector< lambda::function< void(const std::string &)>> &hooks={})
Redirect output from the &#39;from&#39; file descriptor to the &#39;to&#39; file descriptor (or /dev/null if &#39;to&#39; is ...
Option< process::Time > estimatedDrainStartTime
Definition: master.hpp:316
hashmap< UUID, Operation * > operations
Definition: master.hpp:337
void send(const process::UPID &to, const google::protobuf::Message &message)
Definition: protobuf.hpp:126
bool isSome() const
Definition: option.hpp:116
Definition: event.hpp:209
Definition: http.hpp:533
Definition: json.hpp:158
hashmap< FrameworkID, hashmap< TaskID, Task * > > tasks
Definition: master.hpp:247
multihashmap< FrameworkID, TaskID > killedTasks
Definition: master.hpp:251
mesos::v1::scheduler::Event Event
Definition: mesos.hpp:2798
Definition: hashmap.hpp:38
Option< UUID > resourceVersion
Definition: master.hpp:310
FrameworkMetrics metrics
Definition: master.hpp:2624
bool active() const
Definition: master.hpp:2518
Resources checkpointedResources
Definition: master.hpp:290
SlaveObserver * observer
Definition: master.hpp:312
Try< Nothing > unavailability(const Unavailability &unavailability)
An "untyped" PID, used to encapsulate the process ID for lower-layer abstractions (eg...
Definition: pid.hpp:39
BoundedHashMap< TaskID, process::Owned< Task > > unreachableTasks
Definition: master.hpp:2571
Variant< Subscribe > state
Definition: master.hpp:1315
bool recovered() const
Definition: master.hpp:2521
Result< std::vector< Filter< Classifier > > > filters(const std::string &_link, const Handle &parent)
Definition: internal.hpp:769
process::Time registeredTime
Definition: master.hpp:213
bool active
Definition: master.hpp:222
An abstraction for contending to be a leading master.
Definition: contender.hpp:40
std::set< std::string > roles
Definition: master.hpp:2548
const Master * master
Definition: master.hpp:2747
hashset< UUID > orphanedOperations
Definition: master.hpp:268
process::UPID pid
Definition: master.hpp:205
Definition: uuid.hpp:35
Definition: protobuf_utils.hpp:631
Option< process::Time > reregisteredTime
Definition: master.hpp:214
Definition: agent.hpp:25
process::Time reregisteredTime
Definition: master.hpp:2553
Master *const master
Definition: master.hpp:199
Option< Error > quotaInfo(const mesos::quota::QuotaInfo &quotaInfo)
const T & get() const &
Definition: option.hpp:119
const std::string role
Definition: master.hpp:2748
Protocol< PromiseRequest, PromiseResponse > promise
Definition: protobuf.hpp:108
process::Time registeredTime
Definition: master.hpp:2552
Try< int_fd > accept(int_fd s)
Definition: network.hpp:31
Definition: whitelist_watcher.hpp:37
void removeFramework(Framework *framework)
Definition: master.hpp:2742
MasterInfo info() const
Definition: master.hpp:512
StreamingHttpConnection< v1::master::Event > http
Definition: master.hpp:2252
UUID resourceVersion
Definition: master.hpp:333
Definition: time.hpp:23
FrameworkInfo info
Definition: master.hpp:2546
ResponseHeartbeater< mesos::master::Event, v1::master::Event > heartbeater
Definition: master.hpp:2253
bool exited(const UPID &from, const UPID &to)
Simulates disconnection of the link between &#39;from&#39; and &#39;to&#39; by sending an ExitedEvent to &#39;to&#39;...
Definition: boundedhashmap.hpp:27
Basic model of an allocator: resources are allocated to a framework in the form of offers...
Definition: allocator.hpp:82
const process::Owned< ObjectApprovers > approvers
Definition: master.hpp:2254
#define flags
Definition: decoder.hpp:18
Definition: none.hpp:27
Definition: attributes.hpp:24
hashset< InverseOffer * > inverseOffers
Definition: master.hpp:2575
Subscriber(const StreamingHttpConnection< v1::master::Event > &_http, const process::Owned< ObjectApprovers > &_approvers)
Definition: master.hpp:2215
Definition: executor.hpp:48
const MachineID machineId
Definition: master.hpp:203
Definition: master.hpp:122
constexpr size_t MAX_REMOVED_SLAVES
Definition: constants.hpp:93
hashset< InverseOffer * > inverseOffers
Definition: master.hpp:274
An abstraction of a Master detector which can be used to detect the leading master from a group...
Definition: detector.hpp:38
static Time now()
The current clock time for either the current process that makes this call or the global clock time i...
Resources totalResources
Definition: master.hpp:320
Definition: event.hpp:103
Definition: metrics.hpp:41
State
Definition: master.hpp:2399
Offer * getOffer(Master *master, const OfferID &offerId)
Try< Nothing > kill(const std::string &hierarchy, const std::string &cgroup, int signal)
hashmap< FrameworkID, Framework * > frameworks
Definition: master.hpp:2757
mesos::master::Response::GetFrameworks::Framework model(const Framework &framework)
process::Time unregisteredTime
Definition: master.hpp:2554
mesos::v1::scheduler::Response Response
Definition: mesos.hpp:2800
std::string version
Definition: master.hpp:208
URI http(const std::string &host, const std::string &path="/", const Option< int > &port=None(), const Option< std::string > &query=None(), const Option< std::string > &fragment=None(), const Option< std::string > &user=None(), const Option< std::string > &password=None())
Creates an http URI with the given parameters.
Definition: http.hpp:35
ResourceProviderInfo info
Definition: master.hpp:319
hashmap< FrameworkID, hashmap< ExecutorID, ExecutorInfo > > executors
Definition: master.hpp:237
hashset< Offer * > offers
Definition: master.hpp:2573
std::string stringify(int flags)
Definition: owned.hpp:36
Definition: master.hpp:2397
protobuf::slave::Capabilities capabilities
Definition: master.hpp:211
hashmap< SlaveID, hashmap< ExecutorID, ExecutorInfo > > executors
Definition: master.hpp:2580
circular_buffer< process::Owned< Task > > completedTasks
Definition: master.hpp:2565
hashmap< OperationID, UUID > operationUUIDs
Definition: master.hpp:2588
hashmap< UUID, Operation * > operations
Definition: master.hpp:2584
Definition: parse.hpp:33
Try< std::set< pid_t > > pids()
Definition: freebsd.hpp:62
hashmap< SlaveID, Resources > usedResources
Definition: master.hpp:2617
Resources totalOfferedResources
Definition: master.hpp:2620
Definition: master.hpp:355
PID< MetricsProcess > metrics
Resources offeredResources
Definition: master.hpp:281
const FrameworkID id() const
Definition: master.hpp:2705
constexpr const char * name
Definition: shell.hpp:43
hashmap< FrameworkID, Resources > usedResources
Definition: master.hpp:279
bool contains(const Resource &left, const Resource &right)
Try< std::vector< Value > > time(const std::string &hierarchy, const std::string &cgroup)
Try< Nothing > schedule(const mesos::maintenance::Schedule &schedule, const hashmap< MachineID, Machine > &machines)
Performs the following checks on the new maintenance schedule:
Option< Error > registerSlave(const RegisterSlaveMessage &message)
StreamingHttpConnection< v1::master::Event > connection
Definition: master.hpp:1310
Role(const Master *_master, const std::string &_role)
Definition: master.hpp:2733
Future< size_t > send(const int_fd &fd, const void *buf, size_t size)