Apache Mesos
master.hpp
Go to the documentation of this file.
1 // Licensed to the Apache Software Foundation (ASF) under one
2 // or more contributor license agreements. See the NOTICE file
3 // distributed with this work for additional information
4 // regarding copyright ownership. The ASF licenses this file
5 // to you under the Apache License, Version 2.0 (the
6 // "License"); you may not use this file except in compliance
7 // with the License. You may obtain a copy of the License at
8 //
9 // http://www.apache.org/licenses/LICENSE-2.0
10 //
11 // Unless required by applicable law or agreed to in writing, software
12 // distributed under the License is distributed on an "AS IS" BASIS,
13 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 // See the License for the specific language governing permissions and
15 // limitations under the License.
16 
17 #ifndef __MASTER_HPP__
18 #define __MASTER_HPP__
19 
20 #include <stdint.h>
21 
22 #include <list>
23 #include <memory>
24 #include <set>
25 #include <string>
26 #include <vector>
27 
28 #include <mesos/mesos.hpp>
29 #include <mesos/resources.hpp>
30 #include <mesos/type_utils.hpp>
31 
33 
37 #include <mesos/master/master.hpp>
38 
40 
41 #include <mesos/quota/quota.hpp>
42 
44 
45 #include <process/future.hpp>
46 #include <process/limiter.hpp>
47 #include <process/http.hpp>
48 #include <process/owned.hpp>
49 #include <process/process.hpp>
50 #include <process/protobuf.hpp>
51 #include <process/timer.hpp>
52 
54 
55 #include <stout/boundedhashmap.hpp>
56 #include <stout/cache.hpp>
58 #include <stout/foreach.hpp>
59 #include <stout/hashmap.hpp>
60 #include <stout/hashset.hpp>
61 #include <stout/linkedhashmap.hpp>
62 #include <stout/multihashmap.hpp>
63 #include <stout/nothing.hpp>
64 #include <stout/option.hpp>
65 #include <stout/recordio.hpp>
66 #include <stout/try.hpp>
67 #include <stout/uuid.hpp>
68 
69 #include "common/http.hpp"
71 
72 #include "files/files.hpp"
73 
74 #include "internal/devolve.hpp"
75 #include "internal/evolve.hpp"
76 
77 #include "master/constants.hpp"
78 #include "master/flags.hpp"
79 #include "master/machine.hpp"
80 #include "master/metrics.hpp"
81 #include "master/validation.hpp"
82 
83 #include "messages/messages.hpp"
84 
85 namespace process {
86 class RateLimiter; // Forward declaration.
87 }
88 
89 namespace mesos {
90 
91 // Forward declarations.
92 class Authorizer;
93 class ObjectApprovers;
94 
95 namespace internal {
96 
97 // Forward declarations.
98 namespace registry {
99 class Slaves;
100 }
101 
102 class Registry;
103 class WhitelistWatcher;
104 
105 namespace master {
106 
107 class Master;
108 class Registrar;
109 class SlaveObserver;
110 
111 struct BoundedRateLimiter;
112 struct Framework;
113 struct Role;
114 
115 
116 struct Slave
117 {
118 Slave(Master* const _master,
119  SlaveInfo _info,
120  const process::UPID& _pid,
121  const MachineID& _machineId,
122  const std::string& _version,
123  std::vector<SlaveInfo::Capability> _capabilites,
124  const process::Time& _registeredTime,
125  std::vector<Resource> _checkpointedResources,
126  const Option<UUID>& _resourceVersion,
127  std::vector<ExecutorInfo> executorInfos = std::vector<ExecutorInfo>(),
128  std::vector<Task> tasks = std::vector<Task>());
129 
130  ~Slave();
131 
132  Task* getTask(
133  const FrameworkID& frameworkId,
134  const TaskID& taskId) const;
135 
136  void addTask(Task* task);
137 
138  // Update slave to recover the resources that were previously
139  // being used by `task`.
140  //
141  // TODO(bmahler): This is a hack for performance. We need to
142  // maintain resource counters because computing task resources
143  // functionally for all tasks is expensive, for now.
144  void recoverResources(Task* task);
145 
146  void removeTask(Task* task);
147 
148  void addOperation(Operation* operation);
149 
150  void recoverResources(Operation* operation);
151 
152  void removeOperation(Operation* operation);
153 
154  Operation* getOperation(const UUID& uuid) const;
155 
156  void addOffer(Offer* offer);
157 
158  void removeOffer(Offer* offer);
159 
160  void addInverseOffer(InverseOffer* inverseOffer);
161 
162  void removeInverseOffer(InverseOffer* inverseOffer);
163 
164  bool hasExecutor(
165  const FrameworkID& frameworkId,
166  const ExecutorID& executorId) const;
167 
168  void addExecutor(
169  const FrameworkID& frameworkId,
170  const ExecutorInfo& executorInfo);
171 
172  void removeExecutor(
173  const FrameworkID& frameworkId,
174  const ExecutorID& executorId);
175 
176  void apply(const std::vector<ResourceConversion>& conversions);
177 
179  const SlaveInfo& info,
180  const std::string& _version,
181  const std::vector<SlaveInfo::Capability>& _capabilites,
182  const Resources& _checkpointedResources,
183  const Option<UUID>& resourceVersion);
184 
185  Master* const master;
186  const SlaveID id;
187  SlaveInfo info;
188 
189  const MachineID machineId;
190 
192 
193  // TODO(bmahler): Use stout's Version when it can parse labels, etc.
194  std::string version;
195 
196  // Agent capabilities.
198 
201 
202  // Slave becomes disconnected when the socket closes.
203  bool connected;
204 
205  // Slave becomes deactivated when it gets disconnected. In the
206  // future this might also happen via HTTP endpoint.
207  // No offers will be made for a deactivated slave.
208  bool active;
209 
210  // Timer for marking slaves unreachable that become disconnected and
211  // don't reregister. This timeout is larger than the slave
212  // observer's timeout, so typically the slave observer will be the
213  // one to mark such slaves unreachable; this timer is a backup for
214  // when a slave responds to pings but does not reregister (e.g.,
215  // because agent recovery has hung).
217 
218  // Executors running on this slave.
219  //
220  // TODO(bmahler): Make this private to enforce that `addExecutor()`
221  // and `removeExecutor()` are used, and provide a const view into
222  // the executors.
224 
225  // Tasks that have not yet been launched because they are currently
226  // being authorized. This is similar to Framework's pendingTasks but we
227  // track pendingTasks per agent separately to determine if any offer
228  // operation for this agent would change resources requested by these tasks.
230 
231  // Tasks present on this slave.
232  //
233  // TODO(bmahler): Make this private to enforce that `addTask()` and
234  // `removeTask()` are used, and provide a const view into the tasks.
235  //
236  // TODO(bmahler): The task pointer ownership complexity arises from the fact
237  // that we own the pointer here, but it's shared with the Framework struct.
238  // We should find a way to eliminate this.
240 
241  // Tasks that were asked to kill by frameworks.
242  // This is used for reconciliation when the slave reregisters.
244 
245  // Pending operations or terminal operations that have
246  // unacknowledged status updates on this agent.
248 
249  // Active offers on this slave.
251 
252  // Active inverse offers on this slave.
254 
255  // Resources for active task / executors / operations.
256  // Note that we maintain multiple copies of each shared resource in
257  // `usedResources` as they are used by multiple tasks.
259 
261 
262  // Resources that should be checkpointed by the slave (e.g.,
263  // persistent volumes, dynamic reservations, etc). These are either
264  // in use by a task/executor, or are available for use and will be
265  // re-offered to the framework.
266  // TODO(jieyu): `checkpointedResources` is only for agent default
267  // resources. Resources from resource providers are not included in
268  // this field. Consider removing this field.
270 
271  // The current total resources of the slave. Note that this is
272  // different from 'info.resources()' because this also considers
273  // operations (e.g., CREATE, RESERVE) that have been applied and
274  // includes revocable resources and resources from resource
275  // providers as well.
277 
278  // Used to establish the relationship between the operation and the
279  // resources that the operation is operating on. Each resource
280  // provider will keep a resource version UUID, and change it when it
281  // believes that the resources from this resource provider are out
282  // of sync from the master's view. The master will keep track of
283  // the last known resource version UUID for each resource provider,
284  // and attach the resource version UUID in each operation it sends
285  // out. The resource provider should reject operations that have a
286  // different resource version UUID than that it maintains, because
287  // this means the operation is operating on resources that might
288  // have already been invalidated.
290 
291  SlaveObserver* observer;
292 
294  ResourceProviderInfo info;
296 
297  // Used to establish the relationship between the operation and the
298  // resources that the operation is operating on. Each resource
299  // provider will keep a resource version UUID, and change it when it
300  // believes that the resources from this resource provider are out
301  // of sync from the master's view. The master will keep track of
302  // the last known resource version UUID for each resource provider,
303  // and attach the resource version UUID in each operation it sends
304  // out. The resource provider should reject operations that have a
305  // different resource version UUID than that it maintains, because
306  // this means the operation is operating on resources that might
307  // have already been invalidated.
309 
310  // Pending operations or terminal operations that have
311  // unacknowledged status updates.
313  };
314 
316 
317 private:
318  Slave(const Slave&); // No copying.
319  Slave& operator=(const Slave&); // No assigning.
320 };
321 
322 
323 inline std::ostream& operator<<(std::ostream& stream, const Slave& slave)
324 {
325  return stream << slave.id << " at " << slave.pid
326  << " (" << slave.info.hostname() << ")";
327 }
328 
329 
330 // Represents the streaming HTTP connection to a framework or a client
331 // subscribed to the '/api/vX' endpoint.
333 {
335  ContentType _contentType,
336  id::UUID _streamId)
337  : writer(_writer),
338  contentType(_contentType),
339  streamId(_streamId) {}
340 
341  // We need to evolve the internal old style message/unversioned event into a
342  // versioned event e.g., `v1::scheduler::Event` or `v1::master::Event`.
343  template <typename Message, typename Event = v1::scheduler::Event>
344  bool send(const Message& message)
345  {
347  serialize, contentType, lambda::_1));
348 
349  return writer.write(encoder.encode(evolve(message)));
350  }
351 
352  bool close()
353  {
354  return writer.close();
355  }
356 
358  {
359  return writer.readerClosed();
360  }
361 
365 };
366 
367 
368 // This process periodically sends heartbeats to a given HTTP connection.
369 // The `Message` template parameter is the type of the heartbeat event passed
370 // into the heartbeater during construction, while the `Event` template
371 // parameter is the versioned event type which is sent to the client.
372 // The optional delay parameter is used to specify the delay period before it
373 // sends the first heartbeat.
374 template <typename Message, typename Event>
375 class Heartbeater : public process::Process<Heartbeater<Message, Event>>
376 {
377 public:
378  Heartbeater(const std::string& _logMessage,
379  const Message& _heartbeatMessage,
380  const HttpConnection& _http,
381  const Duration& _interval,
382  const Option<Duration>& _delay = None(),
383  const Option<lambda::function<void(const Message&)>>&
384  _callback = None())
385  : process::ProcessBase(process::ID::generate("heartbeater")),
386  logMessage(_logMessage),
387  heartbeatMessage(_heartbeatMessage),
388  http(_http),
389  interval(_interval),
390  delay(_delay),
391  callback(_callback) {}
392 
393 protected:
394  void initialize() override
395  {
396  if (delay.isSome()) {
398  delay.get(),
399  this,
401  } else {
402  heartbeat();
403  }
404  }
405 
406 private:
407  void heartbeat()
408  {
409  // Only send a heartbeat if the connection is not closed.
410  if (http.closed().isPending()) {
411  VLOG(2) << "Sending heartbeat to " << logMessage;
412 
413  if (callback.isSome()) {
414  callback.get()(heartbeatMessage);
415  }
416 
417  Message message(heartbeatMessage);
418  http.send<Message, Event>(message);
419  }
420 
422  }
423 
424  const std::string logMessage;
425  const Message heartbeatMessage;
427  const Duration interval;
428  const Option<Duration> delay;
430 };
431 
432 
433 class Master : public ProtobufProcess<Master>
434 {
435 public:
437  Registrar* registrar,
438  Files* files,
441  const Option<Authorizer*>& authorizer,
442  const Option<std::shared_ptr<process::RateLimiter>>&
443  slaveRemovalLimiter,
444  const Flags& flags = Flags());
445 
446  ~Master() override;
447 
448  // Message handlers.
449  void submitScheduler(
450  const std::string& name);
451 
452  void registerFramework(
453  const process::UPID& from,
454  RegisterFrameworkMessage&& registerFrameworkMessage);
455 
456  void reregisterFramework(
457  const process::UPID& from,
458  ReregisterFrameworkMessage&& reregisterFrameworkMessage);
459 
460  void unregisterFramework(
461  const process::UPID& from,
462  const FrameworkID& frameworkId);
463 
464  void deactivateFramework(
465  const process::UPID& from,
466  const FrameworkID& frameworkId);
467 
468  // TODO(vinod): Remove this once the old driver is removed.
469  void resourceRequest(
470  const process::UPID& from,
471  const FrameworkID& frameworkId,
472  const std::vector<Request>& requests);
473 
474  void launchTasks(
475  const process::UPID& from,
476  LaunchTasksMessage&& launchTasksMessage);
477 
478  void reviveOffers(
479  const process::UPID& from,
480  const FrameworkID& frameworkId,
481  const std::vector<std::string>& role);
482 
483  void killTask(
484  const process::UPID& from,
485  const FrameworkID& frameworkId,
486  const TaskID& taskId);
487 
488  void statusUpdateAcknowledgement(
489  const process::UPID& from,
490  StatusUpdateAcknowledgementMessage&& statusUpdateAcknowledgementMessage);
491 
492  void schedulerMessage(
493  const process::UPID& from,
494  FrameworkToExecutorMessage&& frameworkToExecutorMessage);
495 
496  void executorMessage(
497  const process::UPID& from,
498  ExecutorToFrameworkMessage&& executorToFrameworkMessage);
499 
500  void registerSlave(
501  const process::UPID& from,
502  RegisterSlaveMessage&& registerSlaveMessage);
503 
504  void reregisterSlave(
505  const process::UPID& from,
506  ReregisterSlaveMessage&& incomingMessage);
507 
508  void unregisterSlave(
509  const process::UPID& from,
510  const SlaveID& slaveId);
511 
512  void statusUpdate(
513  StatusUpdateMessage&& statusUpdateMessage);
514 
515  void reconcileTasks(
516  const process::UPID& from,
517  ReconcileTasksMessage&& reconcileTasksMessage);
518 
519  void updateOperationStatus(
520  UpdateOperationStatusMessage&& update);
521 
522  void exitedExecutor(
523  const process::UPID& from,
524  const SlaveID& slaveId,
525  const FrameworkID& frameworkId,
526  const ExecutorID& executorId,
527  int32_t status);
528 
529  void updateSlave(UpdateSlaveMessage&& message);
530 
531  void updateUnavailability(
532  const MachineID& machineId,
534 
535  // Marks the agent unreachable and returns whether the agent was
536  // marked unreachable. Returns false if the agent is already
537  // in a transitioning state or has transitioned into another
538  // state (this includes already being marked unreachable).
539  // The `duringMasterFailover` parameter specifies whether this
540  // agent is transitioning from a recovered state (true) or a
541  // registered state (false).
542  //
543  // Discarding currently not supported.
544  //
545  // Will not return a failure (this will crash the master
546  // internally in the case of a registry failure).
547  process::Future<bool> markUnreachable(
548  const SlaveInfo& slave,
549  bool duringMasterFailover,
550  const std::string& message);
551 
552  void markGone(Slave* slave, const TimeInfo& goneTime);
553 
554  void authenticate(
555  const process::UPID& from,
556  const process::UPID& pid);
557 
558  // TODO(bmahler): It would be preferred to use a unique libprocess
559  // Process identifier (PID is not sufficient) for identifying the
560  // framework instance, rather than relying on re-registration time.
561  void frameworkFailoverTimeout(
562  const FrameworkID& frameworkId,
563  const process::Time& reregisteredTime);
564 
565  void offer(
566  const FrameworkID& frameworkId,
567  const hashmap<std::string, hashmap<SlaveID, Resources>>& resources);
568 
569  void inverseOffer(
570  const FrameworkID& frameworkId,
571  const hashmap<SlaveID, UnavailableResources>& resources);
572 
573  // Invoked when there is a newly elected leading master.
574  // Made public for testing purposes.
575  void detected(const process::Future<Option<MasterInfo>>& _leader);
576 
577  // Invoked when the contender has lost the candidacy.
578  // Made public for testing purposes.
579  void lostCandidacy(const process::Future<Nothing>& lost);
580 
581  // Continuation of recover().
582  // Made public for testing purposes.
583  process::Future<Nothing> _recover(const Registry& registry);
584 
585  MasterInfo info() const
586  {
587  return info_;
588  }
589 
590 protected:
591  void initialize() override;
592  void finalize() override;
593 
594  void consume(process::MessageEvent&& event) override;
595  void consume(process::ExitedEvent&& event) override;
596 
597  void exited(const process::UPID& pid) override;
598  void exited(const FrameworkID& frameworkId, const HttpConnection& http);
599  void _exited(Framework* framework);
600 
601  // Invoked upon noticing a subscriber disconnection.
602  void exited(const id::UUID& id);
603 
604  void agentReregisterTimeout(const SlaveID& slaveId);
605  Nothing _agentReregisterTimeout(const SlaveID& slaveId);
606 
607  // Invoked when the message is ready to be executed after
608  // being throttled.
609  // 'principal' being None indicates it is throttled by
610  // 'defaultLimiter'.
611  void throttled(
612  process::MessageEvent&& event,
613  const Option<std::string>& principal);
614 
615  // Continuations of consume().
616  void _consume(process::MessageEvent&& event);
617  void _consume(process::ExitedEvent&& event);
618 
619  // Helper method invoked when the capacity for a framework
620  // principal is exceeded.
621  void exceededCapacity(
622  const process::MessageEvent& event,
623  const Option<std::string>& principal,
624  uint64_t capacity);
625 
626  // Recovers state from the registrar.
628  void recoveredSlavesTimeout(const Registry& registry);
629 
630  void _registerSlave(
631  const process::UPID& pid,
632  RegisterSlaveMessage&& registerSlaveMessage,
634  const process::Future<bool>& authorized);
635 
636  void __registerSlave(
637  const process::UPID& pid,
638  RegisterSlaveMessage&& registerSlaveMessage,
639  const process::Future<bool>& admit);
640 
641  void _reregisterSlave(
642  const process::UPID& pid,
643  ReregisterSlaveMessage&& incomingMessage,
645  const process::Future<bool>& authorized);
646 
647  void __reregisterSlave(
648  const process::UPID& pid,
649  ReregisterSlaveMessage&& incomingMessage,
650  const process::Future<bool>& readmit);
651 
652  void ___reregisterSlave(
653  const process::UPID& pid,
654  ReregisterSlaveMessage&& incomingMessage,
655  const process::Future<bool>& updated);
656 
657  void updateSlaveFrameworks(
658  Slave* slave,
659  const std::vector<FrameworkInfo>& frameworks);
660 
661  // 'future' is the future returned by the authenticator.
662  void _authenticate(
663  const process::UPID& pid,
664  const process::Future<Option<std::string>>& future);
665 
666  void authenticationTimeout(process::Future<Option<std::string>> future);
667 
668  void fileAttached(const process::Future<Nothing>& result,
669  const std::string& path);
670 
671  // Invoked when the contender has entered the contest.
672  void contended(const process::Future<process::Future<Nothing>>& candidacy);
673 
674  // When a slave that was previously registered with this master
675  // reregisters, we need to reconcile the master's view of the
676  // slave's tasks and executors. This function also sends the
677  // `SlaveReregisteredMessage`.
678  void reconcileKnownSlave(
679  Slave* slave,
680  const std::vector<ExecutorInfo>& executors,
681  const std::vector<Task>& tasks);
682 
683  // Add a framework.
684  void addFramework(
685  Framework* framework,
686  const std::set<std::string>& suppressedRoles);
687 
688  // Recover a framework from its `FrameworkInfo`. This happens after
689  // master failover, when an agent running one of the framework's
690  // tasks reregisters or when the framework itself reregisters,
691  // whichever happens first. The result of this function is a
692  // registered, inactive framework with state `RECOVERED`.
693  void recoverFramework(
694  const FrameworkInfo& info,
695  const std::set<std::string>& suppressedRoles);
696 
697  // Transition a framework from `RECOVERED` to `CONNECTED` state and
698  // activate it. This happens at most once after master failover, the
699  // first time that the framework reregisters with the new master.
700  // Exactly one of `newPid` or `http` must be provided.
701  Try<Nothing> activateRecoveredFramework(
702  Framework* framework,
703  const FrameworkInfo& frameworkInfo,
704  const Option<process::UPID>& pid,
705  const Option<HttpConnection>& http,
706  const std::set<std::string>& suppressedRoles);
707 
708  // Replace the scheduler for a framework with a new process ID, in
709  // the event of a scheduler failover.
710  void failoverFramework(Framework* framework, const process::UPID& newPid);
711 
712  // Replace the scheduler for a framework with a new HTTP connection,
713  // in the event of a scheduler failover.
714  void failoverFramework(Framework* framework, const HttpConnection& http);
715 
716  void _failoverFramework(Framework* framework);
717 
718  // Kill all of a framework's tasks, delete the framework object, and
719  // reschedule offers that were assigned to this framework.
720  void removeFramework(Framework* framework);
721 
722  // Remove a framework from the slave, i.e., remove its tasks and
723  // executors and recover the resources.
724  void removeFramework(Slave* slave, Framework* framework);
725 
726  void updateFramework(
727  Framework* framework,
728  const FrameworkInfo& frameworkInfo,
729  const std::set<std::string>& suppressedRoles);
730 
731  void disconnect(Framework* framework);
732  void deactivate(Framework* framework, bool rescind);
733 
734  void disconnect(Slave* slave);
735  void deactivate(Slave* slave);
736 
737  // Add a slave.
738  void addSlave(
739  Slave* slave,
740  std::vector<Archive::Framework>&& completedFrameworks);
741 
742  void _markUnreachable(
743  const SlaveInfo& slave,
744  const TimeInfo& unreachableTime,
745  bool duringMasterFailover,
746  const std::string& message,
747  bool registrarResult);
748 
749  void sendSlaveLost(const SlaveInfo& slaveInfo);
750 
751  // Remove the slave from the registrar and from the master's state.
752  //
753  // TODO(bmahler): 'reason' is optional until MESOS-2317 is resolved.
754  void removeSlave(
755  Slave* slave,
756  const std::string& message,
758 
759  void _removeSlave(
760  Slave* slave,
761  const process::Future<bool>& registrarResult,
762  const std::string& removalCause,
764 
765  void __removeSlave(
766  Slave* slave,
767  const std::string& message,
768  const Option<TimeInfo>& unreachableTime);
769 
770  // Validates that the framework is authenticated, if required.
771  Option<Error> validateFrameworkAuthentication(
772  const FrameworkInfo& frameworkInfo,
773  const process::UPID& from);
774 
775  // Returns whether the framework is authorized.
776  // Returns failure for transient authorization failures.
777  process::Future<bool> authorizeFramework(
778  const FrameworkInfo& frameworkInfo);
779 
780  // Returns whether the principal is authorized to (re-)register an agent
781  // and whether the `SlaveInfo` is authorized.
782  process::Future<bool> authorizeSlave(
783  const SlaveInfo& slaveInfo,
785 
786  // Returns whether the task is authorized.
787  // Returns failure for transient authorization failures.
788  process::Future<bool> authorizeTask(
789  const TaskInfo& task,
790  Framework* framework);
791 
809  process::Future<bool> authorizeReserveResources(
810  const Offer::Operation::Reserve& reserve,
812 
813  // Authorizes whether the provided `principal` is allowed to reserve
814  // the specified `resources`.
815  process::Future<bool> authorizeReserveResources(
816  const Resources& resources,
818 
836  process::Future<bool> authorizeUnreserveResources(
837  const Offer::Operation::Unreserve& unreserve,
839 
857  process::Future<bool> authorizeCreateVolume(
858  const Offer::Operation::Create& create,
860 
878  process::Future<bool> authorizeDestroyVolume(
879  const Offer::Operation::Destroy& destroy,
881 
900  process::Future<bool> authorizeResizeVolume(
901  const Resource& volume,
903 
904 
921  process::Future<bool> authorizeCreateDisk(
922  const Offer::Operation::CreateDisk& createDisk,
924 
925 
942  process::Future<bool> authorizeDestroyDisk(
943  const Offer::Operation::DestroyDisk& destroyDisk,
945 
946 
947  // Determine if a new executor needs to be launched.
948  bool isLaunchExecutor (
949  const ExecutorID& executorId,
950  Framework* framework,
951  Slave* slave) const;
952 
953  // Add executor to the framework and slave.
954  void addExecutor(
955  const ExecutorInfo& executorInfo,
956  Framework* framework,
957  Slave* slave);
958 
959  // Add task to the framework and slave.
960  void addTask(const TaskInfo& task, Framework* framework, Slave* slave);
961 
962  // Transitions the task, and recovers resources if the task becomes
963  // terminal.
964  void updateTask(Task* task, const StatusUpdate& update);
965 
966  // Removes the task. `unreachable` indicates whether the task is removed due
967  // to being unreachable. Note that we cannot rely on the task state because
968  // it may not reflect unreachability due to being set to TASK_LOST for
969  // backwards compatibility.
970  void removeTask(Task* task, bool unreachable = false);
971 
972  // Remove an executor and recover its resources.
973  void removeExecutor(
974  Slave* slave,
975  const FrameworkID& frameworkId,
976  const ExecutorID& executorId);
977 
978  // Adds the given operation to the framework and the agent.
979  void addOperation(
980  Framework* framework,
981  Slave* slave,
982  Operation* operation);
983 
984  // Transitions the operation, and updates and recovers resources if
985  // the operation becomes terminal. If `convertResources` is `false`
986  // only the consumed resources of terminal operations are recovered,
987  // but no resources are converted.
988  void updateOperation(
989  Operation* operation,
990  const UpdateOperationStatusMessage& update,
991  bool convertResources = true);
992 
993  // Remove the operation.
994  void removeOperation(Operation* operation);
995 
996  // Attempts to update the allocator by applying the given operation.
997  // If successful, updates the slave's resources, sends a
998  // 'CheckpointResourcesMessage' to the slave with the updated
999  // checkpointed resources, and returns a 'Future' with 'Nothing'.
1000  // Otherwise, no action is taken and returns a failed 'Future'.
1002  Slave* slave,
1003  const Offer::Operation& operation);
1004 
1005  // Forwards the update to the framework.
1006  void forward(
1007  const StatusUpdate& update,
1008  const process::UPID& acknowledgee,
1009  Framework* framework);
1010 
1011  // Remove an offer after specified timeout
1012  void offerTimeout(const OfferID& offerId);
1013 
1014  // Remove an offer and optionally rescind the offer as well.
1015  void removeOffer(Offer* offer, bool rescind = false);
1016 
1017  // Remove an inverse offer after specified timeout
1018  void inverseOfferTimeout(const OfferID& inverseOfferId);
1019 
1020  // Remove an inverse offer and optionally rescind it as well.
1021  void removeInverseOffer(InverseOffer* inverseOffer, bool rescind = false);
1022 
1023  bool isCompletedFramework(const FrameworkID& frameworkId);
1024 
1025  Framework* getFramework(const FrameworkID& frameworkId) const;
1026  Offer* getOffer(const OfferID& offerId) const;
1027  InverseOffer* getInverseOffer(const OfferID& inverseOfferId) const;
1028 
1029  FrameworkID newFrameworkId();
1030  OfferID newOfferId();
1031  SlaveID newSlaveId();
1032 
1033 private:
1034  // Updates the agent's resources by applying the given operation.
1035  // Sends either `ApplyOperationMessage` or
1036  // `CheckpointResourcesMessage` (with updated checkpointed
1037  // resources) to the agent depending on if the agent has
1038  // `RESOURCE_PROVIDER` capability.
1039  void _apply(
1040  Slave* slave,
1041  Framework* framework,
1042  const Offer::Operation& operationInfo);
1043 
1044  void drop(
1045  const process::UPID& from,
1046  const mesos::scheduler::Call& call,
1047  const std::string& message);
1048 
1049  void drop(
1050  Framework* framework,
1051  const Offer::Operation& operation,
1052  const std::string& message);
1053 
1054  void drop(
1055  Framework* framework,
1056  const mesos::scheduler::Call& call,
1057  const std::string& message);
1058 
1059  void drop(
1060  Framework* framework,
1061  const mesos::scheduler::Call::Suppress& suppress,
1062  const std::string& message);
1063 
1064  void drop(
1065  Framework* framework,
1066  const mesos::scheduler::Call::Revive& revive,
1067  const std::string& message);
1068 
1069  // Call handlers.
1070  void receive(
1071  const process::UPID& from,
1072  mesos::scheduler::Call&& call);
1073 
1074  void subscribe(
1075  HttpConnection http,
1076  const mesos::scheduler::Call::Subscribe& subscribe);
1077 
1078  void _subscribe(
1079  HttpConnection http,
1080  const FrameworkInfo& frameworkInfo,
1081  bool force,
1082  const std::set<std::string>& suppressedRoles,
1083  const process::Future<bool>& authorized);
1084 
1085  void subscribe(
1086  const process::UPID& from,
1087  const mesos::scheduler::Call::Subscribe& subscribe);
1088 
1089  void _subscribe(
1090  const process::UPID& from,
1091  const FrameworkInfo& frameworkInfo,
1092  bool force,
1093  const std::set<std::string>& suppressedRoles,
1094  const process::Future<bool>& authorized);
1095 
1096  // Subscribes a client to the 'api/vX' endpoint.
1097  void subscribe(
1098  const HttpConnection& http,
1100 
1101  void teardown(Framework* framework);
1102 
1103  void accept(
1104  Framework* framework,
1105  mesos::scheduler::Call::Accept&& accept);
1106 
1107  void _accept(
1108  const FrameworkID& frameworkId,
1109  const SlaveID& slaveId,
1110  const Resources& offeredResources,
1111  mesos::scheduler::Call::Accept&& accept,
1112  const process::Future<
1113  std::vector<process::Future<bool>>>& authorizations);
1114 
1115  void acceptInverseOffers(
1116  Framework* framework,
1117  const mesos::scheduler::Call::AcceptInverseOffers& accept);
1118 
1119  void decline(
1120  Framework* framework,
1121  mesos::scheduler::Call::Decline&& decline);
1122 
1123  void declineInverseOffers(
1124  Framework* framework,
1125  const mesos::scheduler::Call::DeclineInverseOffers& decline);
1126 
1127  void revive(
1128  Framework* framework,
1129  const mesos::scheduler::Call::Revive& revive);
1130 
1131  void kill(
1132  Framework* framework,
1133  const mesos::scheduler::Call::Kill& kill);
1134 
1135  void shutdown(
1136  Framework* framework,
1137  const mesos::scheduler::Call::Shutdown& shutdown);
1138 
1139  void acknowledge(
1140  Framework* framework,
1141  mesos::scheduler::Call::Acknowledge&& acknowledge);
1142 
1143  void acknowledgeOperationStatus(
1144  Framework* framework,
1145  mesos::scheduler::Call::AcknowledgeOperationStatus&& acknowledge);
1146 
1147  void reconcile(
1148  Framework* framework,
1149  mesos::scheduler::Call::Reconcile&& reconcile);
1150 
1151  mesos::scheduler::Response::ReconcileOperations reconcileOperations(
1152  Framework* framework,
1153  const mesos::scheduler::Call::ReconcileOperations& reconcile);
1154 
1155  void message(
1156  Framework* framework,
1157  mesos::scheduler::Call::Message&& message);
1158 
1159  void request(
1160  Framework* framework,
1161  const mesos::scheduler::Call::Request& request);
1162 
1163  void suppress(
1164  Framework* framework,
1165  const mesos::scheduler::Call::Suppress& suppress);
1166 
1167  bool elected() const
1168  {
1169  return leader.isSome() && leader.get() == info_;
1170  }
1171 
1172  void scheduleRegistryGc();
1173 
1174  void doRegistryGc();
1175 
1176  void _doRegistryGc(
1177  const hashset<SlaveID>& toRemoveUnreachable,
1178  const hashset<SlaveID>& toRemoveGone,
1179  const process::Future<bool>& registrarResult);
1180 
1181  process::Future<bool> authorizeLogAccess(
1183 
1184  std::vector<std::string> filterRoles(
1185  const process::Owned<ObjectApprovers>& approvers) const;
1186 
1194  bool isWhitelistedRole(const std::string& name) const;
1195 
1203  class QuotaHandler
1204  {
1205  public:
1206  explicit QuotaHandler(Master* _master) : master(_master)
1207  {
1208  CHECK_NOTNULL(master);
1209  }
1210 
1211  // Returns a list of set quotas.
1213  const mesos::master::Call& call,
1215  ContentType contentType) const;
1216 
1218  const process::http::Request& request,
1220  principal) const;
1221 
1223  const mesos::master::Call& call,
1225  principal) const;
1226 
1230  principal) const;
1231 
1233  const mesos::master::Call& call,
1235  principal) const;
1236 
1240  principal) const;
1241 
1242  private:
1243  // Heuristically tries to determine whether a quota request could
1244  // reasonably be satisfied given the current cluster capacity. The
1245  // goal is to determine whether a user may accidentally request an
1246  // amount of resources that would prevent frameworks without quota
1247  // from getting any offers. A force flag will allow users to bypass
1248  // this check.
1249  //
1250  // The heuristic tests whether the total quota, including the new
1251  // request, does not exceed the sum of non-static cluster resources,
1252  // i.e. the following inequality holds:
1253  // total - statically reserved >= total quota + quota request
1254  //
1255  // Please be advised that:
1256  // * It is up to an allocator how to satisfy quota (for example,
1257  // what resources to account towards quota, as well as which
1258  // resources to consider allocatable for quota).
1259  // * Even if there are enough resources at the moment of this check,
1260  // agents may terminate at any time, rendering the cluster under
1261  // quota.
1262  Option<Error> capacityHeuristic(
1263  const mesos::quota::QuotaInfo& request) const;
1264 
1265  // We always want to rescind offers after the capacity heuristic. The
1266  // reason for this is the race between the allocator and the master:
1267  // it can happen that there are not enough free resources at the
1268  // allocator's disposal when it is notified about the quota request,
1269  // but at this point it's too late to rescind.
1270  //
1271  // While rescinding, we adhere to the following rules:
1272  // * Rescind at least as many resources as there are in the quota request.
1273  // * Rescind all offers from an agent in order to make the potential
1274  // offer bigger, which increases the chances that a quota'ed framework
1275  // will be able to use the offer.
1276  // * Rescind offers from at least `numF` agents to make it possible
1277  // (but not guaranteed, due to fair sharing) that each framework in
1278  // the role for which quota is set gets an offer (`numF` is the
1279  // number of frameworks in the quota'ed role). Though this is not
1280  // strictly necessary, we think this will increase the debugability
1281  // and will improve user experience.
1282  //
1283  // TODO(alexr): Consider removing this function once offer management
1284  // (including rescinding) is moved to allocator.
1285  void rescindOffers(const mesos::quota::QuotaInfo& request) const;
1286 
1287  process::Future<bool> authorizeGetQuota(
1289  const mesos::quota::QuotaInfo& quotaInfo) const;
1290 
1291  process::Future<bool> authorizeUpdateQuota(
1293  const mesos::quota::QuotaInfo& quotaInfo) const;
1294 
1297  principal) const;
1298 
1300  const mesos::quota::QuotaRequest& quotaRequest,
1302  principal) const;
1303 
1305  const mesos::quota::QuotaInfo& quotaInfo,
1306  bool forced) const;
1307 
1309  const std::string& role,
1311  principal) const;
1312 
1314  const std::string& role) const;
1315 
1316  // To perform actions related to quota management, we require access to the
1317  // master data structures. No synchronization primitives are needed here
1318  // since `QuotaHandler`'s functions are invoked in the Master's actor.
1319  Master* master;
1320  };
1321 
1329  class WeightsHandler
1330  {
1331  public:
1332  explicit WeightsHandler(Master* _master) : master(_master)
1333  {
1334  CHECK_NOTNULL(master);
1335  }
1336 
1340  principal) const;
1341 
1343  const mesos::master::Call& call,
1345  ContentType contentType) const;
1346 
1348  const process::http::Request& request,
1350  principal) const;
1351 
1353  const mesos::master::Call& call,
1355  ContentType contentType) const;
1356 
1357  private:
1358  process::Future<bool> authorizeGetWeight(
1360  const WeightInfo& weight) const;
1361 
1362  process::Future<bool> authorizeUpdateWeights(
1364  const std::vector<std::string>& roles) const;
1365 
1367  const std::vector<WeightInfo>& weightInfos,
1368  const std::vector<bool>& roleAuthorizations) const;
1369 
1372  principal) const;
1373 
1376  const google::protobuf::RepeatedPtrField<WeightInfo>& weightInfos)
1377  const;
1378 
1380  const std::vector<WeightInfo>& weightInfos) const;
1381 
1382  // Rescind all outstanding offers if any of the 'weightInfos' roles has
1383  // an active framework.
1384  void rescindOffers(const std::vector<WeightInfo>& weightInfos) const;
1385 
1386  Master* master;
1387  };
1388 
1389  // Inner class used to namespace HTTP handlers that do not change the
1390  // underlying master object.
1391  //
1392  // NOTE: Most member functions of this class are not routed directly but
1393  // dispatched from their corresponding handlers in the outer `Http` class.
1394  // This is because deciding whether an incoming request is read-only often
1395  // requires some inspection, e.g. distinguishing between "GET" and "POST"
1396  // requests to the same endpoint.
1397  class ReadOnlyHandler
1398  {
1399  public:
1400  explicit ReadOnlyHandler(const Master* _master) : master(_master) {}
1401 
1402  // /frameworks
1403  process::http::Response frameworks(
1404  const process::http::Request& request,
1405  const process::Owned<ObjectApprovers>& approvers) const;
1406 
1407  // /roles
1409  const process::http::Request& request,
1410  const process::Owned<ObjectApprovers>& approvers) const;
1411 
1412  // /slaves
1413  process::http::Response slaves(
1414  const process::http::Request& request,
1415  const process::Owned<ObjectApprovers>& approvers) const;
1416 
1417  // /state
1419  const process::http::Request& request,
1420  const process::Owned<ObjectApprovers>& approvers) const;
1421 
1422  // /state-summary
1423  process::http::Response stateSummary(
1424  const process::http::Request& request,
1425  const process::Owned<ObjectApprovers>& approvers) const;
1426 
1427  // /tasks
1429  const process::http::Request& request,
1430  const process::Owned<ObjectApprovers>& approvers) const;
1431 
1432  private:
1433  const Master* master;
1434  };
1435 
1436  // Inner class used to namespace HTTP route handlers (see
1437  // master/http.cpp for implementations).
1438  class Http
1439  {
1440  public:
1441  explicit Http(Master* _master) : master(_master),
1442  readonlyHandler(_master),
1443  quotaHandler(_master),
1444  weightsHandler(_master) {}
1445 
1446  // /api/v1
1448  const process::http::Request& request,
1450  principal) const;
1451 
1452  // /api/v1/scheduler
1454  const process::http::Request& request,
1456  principal) const;
1457 
1458  // /master/create-volumes
1460  const process::http::Request& request,
1462  principal) const;
1463 
1464  // /master/destroy-volumes
1466  const process::http::Request& request,
1468  principal) const;
1469 
1470  // /master/flags
1472  const process::http::Request& request,
1474  principal) const;
1475 
1476  // /master/frameworks
1477  //
1478  // NOTE: Requests to this endpoint are batched.
1480  const process::http::Request& request,
1482  principal) const;
1483 
1484  // /master/health
1486  const process::http::Request& request) const;
1487 
1488  // /master/redirect
1490  const process::http::Request& request) const;
1491 
1492  // /master/reserve
1494  const process::http::Request& request,
1496  principal) const;
1497 
1498  // /master/roles
1499  //
1500  // NOTE: Requests to this endpoint are batched.
1502  const process::http::Request& request,
1504  principal) const;
1505 
1506  // /master/teardown
1508  const process::http::Request& request,
1510  principal) const;
1511 
1512  // /master/slaves
1513  //
1514  // NOTE: Requests to this endpoint are batched.
1516  const process::http::Request& request,
1518  principal) const;
1519 
1520  // /master/state
1521  //
1522  // NOTE: Requests to this endpoint are batched.
1524  const process::http::Request& request,
1526  principal) const;
1527 
1528  // /master/state-summary
1529  //
1530  // NOTE: Requests to this endpoint are batched.
1532  const process::http::Request& request,
1534  principal) const;
1535 
1536  // /master/tasks
1537  //
1538  // NOTE: Requests to this endpoint are batched.
1540  const process::http::Request& request,
1542  principal) const;
1543 
1544  // /master/maintenance/schedule
1545  process::Future<process::http::Response> maintenanceSchedule(
1546  const process::http::Request& request,
1548  principal) const;
1549 
1550  // /master/maintenance/status
1551  process::Future<process::http::Response> maintenanceStatus(
1552  const process::http::Request& request,
1554  principal) const;
1555 
1556  // /master/machine/down
1558  const process::http::Request& request,
1560  principal) const;
1561 
1562  // /master/machine/up
1564  const process::http::Request& request,
1566  principal) const;
1567 
1568  // /master/unreserve
1570  const process::http::Request& request,
1572  principal) const;
1573 
1574  // /master/quota
1576  const process::http::Request& request,
1578  principal) const;
1579 
1580  // /master/weights
1582  const process::http::Request& request,
1584  principal) const;
1585 
1586  static std::string API_HELP();
1587  static std::string SCHEDULER_HELP();
1588  static std::string FLAGS_HELP();
1589  static std::string FRAMEWORKS_HELP();
1590  static std::string HEALTH_HELP();
1591  static std::string REDIRECT_HELP();
1592  static std::string ROLES_HELP();
1593  static std::string TEARDOWN_HELP();
1594  static std::string SLAVES_HELP();
1595  static std::string STATE_HELP();
1596  static std::string STATESUMMARY_HELP();
1597  static std::string TASKS_HELP();
1598  static std::string MAINTENANCE_SCHEDULE_HELP();
1599  static std::string MAINTENANCE_STATUS_HELP();
1600  static std::string MACHINE_DOWN_HELP();
1601  static std::string MACHINE_UP_HELP();
1602  static std::string CREATE_VOLUMES_HELP();
1603  static std::string DESTROY_VOLUMES_HELP();
1604  static std::string RESERVE_HELP();
1605  static std::string UNRESERVE_HELP();
1606  static std::string QUOTA_HELP();
1607  static std::string WEIGHTS_HELP();
1608 
1609  private:
1610  JSON::Object __flags() const;
1611 
1612  class FlagsError; // Forward declaration.
1613 
1616  principal) const;
1617 
1619  const size_t limit,
1620  const size_t offset,
1621  const std::string& order,
1623  principal) const;
1624 
1626  const FrameworkID& id,
1628  principal) const;
1629 
1631  const FrameworkID& id) const;
1632 
1633  process::Future<process::http::Response> _updateMaintenanceSchedule(
1634  const mesos::maintenance::Schedule& schedule,
1636  principal) const;
1637 
1638  process::Future<process::http::Response> __updateMaintenanceSchedule(
1639  const mesos::maintenance::Schedule& schedule,
1640  const process::Owned<ObjectApprovers>& approvers) const;
1641 
1642  process::Future<process::http::Response> ___updateMaintenanceSchedule(
1643  const mesos::maintenance::Schedule& schedule,
1644  bool applied) const;
1645 
1646  mesos::maintenance::Schedule _getMaintenanceSchedule(
1647  const process::Owned<ObjectApprovers>& approvers) const;
1648 
1650  const process::Owned<ObjectApprovers>& approvers) const;
1651 
1652  process::Future<process::http::Response> _startMaintenance(
1653  const google::protobuf::RepeatedPtrField<MachineID>& machineIds,
1654  const process::Owned<ObjectApprovers>& approvers) const;
1655 
1657  const google::protobuf::RepeatedPtrField<MachineID>& machineIds,
1658  const process::Owned<ObjectApprovers>& approvers) const;
1659 
1661  const SlaveID& slaveId,
1662  const google::protobuf::RepeatedPtrField<Resource>& resources,
1664  principal) const;
1665 
1667  const SlaveID& slaveId,
1668  const google::protobuf::RepeatedPtrField<Resource>& resources,
1670  principal) const;
1671 
1673  const SlaveID& slaveId,
1674  const google::protobuf::RepeatedPtrField<Resource>& volumes,
1676  principal) const;
1677 
1679  const SlaveID& slaveId,
1680  const google::protobuf::RepeatedPtrField<Resource>& volumes,
1682  principal) const;
1683 
1704  const SlaveID& slaveId,
1705  Resources required,
1706  const Offer::Operation& operation) const;
1707 
1708  // Master API handlers.
1709 
1711  const mesos::master::Call& call,
1713  ContentType contentType) const;
1714 
1715  mesos::master::Response::GetAgents _getAgents(
1716  const process::Owned<ObjectApprovers>& approvers) const;
1717 
1719  const mesos::master::Call& call,
1721  ContentType contentType) const;
1722 
1724  const mesos::master::Call& call,
1726  ContentType contentType) const;
1727 
1729  const mesos::master::Call& call,
1731  ContentType contentType) const;
1732 
1734  const mesos::master::Call& call,
1736  ContentType contentType) const;
1737 
1739  const mesos::master::Call& call,
1741  ContentType contentType) const;
1742 
1744  const mesos::master::Call& call,
1746  ContentType contentType) const;
1747 
1749  const mesos::master::Call& call,
1751  ContentType contentType) const;
1752 
1754  const mesos::master::Call& call,
1756  ContentType contentType) const;
1757 
1759  const mesos::master::Call& call,
1761  ContentType contentType) const;
1762 
1763  process::Future<process::http::Response> updateMaintenanceSchedule(
1764  const mesos::master::Call& call,
1766  ContentType contentType) const;
1767 
1768  process::Future<process::http::Response> getMaintenanceSchedule(
1769  const mesos::master::Call& call,
1771  ContentType contentType) const;
1772 
1773  process::Future<process::http::Response> getMaintenanceStatus(
1774  const mesos::master::Call& call,
1776  ContentType contentType) const;
1777 
1779  const mesos::master::Call& call,
1781  ContentType contentType) const;
1782 
1784  const mesos::master::Call& call,
1786  ContentType contentType) const;
1787 
1789  const mesos::master::Call& call,
1791  ContentType contentType) const;
1792 
1794  const mesos::master::Call& call,
1796  ContentType contentType) const;
1797 
1798  mesos::master::Response::GetTasks _getTasks(
1799  const process::Owned<ObjectApprovers>& approvers) const;
1800 
1802  const mesos::master::Call& call,
1804  ContentType contentType) const;
1805 
1807  const mesos::master::Call& call,
1809  ContentType contentType) const;
1810 
1812  const mesos::master::Call& call,
1814  ContentType contentType) const;
1815 
1817  const mesos::master::Call& call,
1819  ContentType contentType) const;
1820 
1822  const mesos::master::Call& call,
1824  ContentType contentType) const;
1825 
1826  process::Future<process::http::Response> unreserveResources(
1827  const mesos::master::Call& call,
1829  ContentType contentType) const;
1830 
1832  const mesos::master::Call& call,
1834  ContentType contentType) const;
1835 
1836  mesos::master::Response::GetFrameworks _getFrameworks(
1837  const process::Owned<ObjectApprovers>& approvers) const;
1838 
1840  const mesos::master::Call& call,
1842  ContentType contentType) const;
1843 
1844  mesos::master::Response::GetExecutors _getExecutors(
1845  const process::Owned<ObjectApprovers>& approvers) const;
1846 
1848  const mesos::master::Call& call,
1850  ContentType contentType) const;
1851 
1852  mesos::master::Response::GetState _getState(
1853  const process::Owned<ObjectApprovers>& approvers) const;
1854 
1856  const mesos::master::Call& call,
1858  ContentType contentType) const;
1859 
1861  const mesos::master::Call& call,
1863  ContentType contentType) const;
1864 
1866  const mesos::master::Call& call,
1868  ContentType contentType) const;
1869 
1871  const mesos::master::Call& call,
1873  ContentType contentType) const;
1874 
1876  const SlaveID& slaveId) const;
1877 
1878  process::Future<process::http::Response> reconcileOperations(
1879  Framework* framework,
1880  const mesos::scheduler::Call::ReconcileOperations& call,
1881  ContentType contentType) const;
1882 
1883  Master* master;
1884 
1885  ReadOnlyHandler readonlyHandler;
1886 
1887  // NOTE: The quota specific pieces of the Operator API are factored
1888  // out into this separate class.
1889  QuotaHandler quotaHandler;
1890 
1891  // NOTE: The weights specific pieces of the Operator API are factored
1892  // out into this separate class.
1893  WeightsHandler weightsHandler;
1894 
1895  // Since the Master actor is one of the most loaded in a typical Mesos
1896  // installation, we take some extra care to keep the backlog small.
1897  // In particular, all read-only requests are batched and executed in
1898  // parallel, instead of going through the master queue separately.
1899 
1900  typedef process::http::Response
1901  (Master::ReadOnlyHandler::*ReadOnlyRequestHandler)(
1902  const process::http::Request&,
1903  const process::Owned<ObjectApprovers>&) const;
1904 
1905  process::Future<process::http::Response> deferBatchedRequest(
1906  ReadOnlyRequestHandler handler,
1907  const process::http::Request& request,
1908  const process::Owned<ObjectApprovers>& approvers) const;
1909 
1910  void processRequestsBatch() const;
1911 
1912  struct BatchedRequest
1913  {
1914  ReadOnlyRequestHandler handler;
1915  process::http::Request request;
1918  };
1919 
1920  mutable std::vector<BatchedRequest> batchedRequests;
1921  };
1922 
1923  Master(const Master&); // No copying.
1924  Master& operator=(const Master&); // No assigning.
1925 
1926  friend struct Framework;
1927  friend struct FrameworkMetrics;
1928  friend struct Metrics;
1929  friend struct Slave;
1930  friend struct SlavesWriter;
1931  friend struct Subscriber;
1932 
1933  // NOTE: Since 'getOffer', 'getInverseOffer' and 'slaves' are
1934  // protected, we need to make the following functions friends.
1935  friend Offer* validation::offer::getOffer(
1936  Master* master, const OfferID& offerId);
1937 
1938  friend InverseOffer* validation::offer::getInverseOffer(
1939  Master* master, const OfferID& offerId);
1940 
1942  Master* master, const SlaveID& slaveId);
1943 
1944  const Flags flags;
1945 
1946  Http http;
1947 
1948  Option<MasterInfo> leader; // Current leading master.
1949 
1950  mesos::allocator::Allocator* allocator;
1951  WhitelistWatcher* whitelistWatcher;
1952  Registrar* registrar;
1953  Files* files;
1954 
1957 
1958  const Option<Authorizer*> authorizer;
1959 
1960  MasterInfo info_;
1961 
1962  // Holds some info which affects how a machine behaves, as well as state that
1963  // represent the master's view of this machine. See the `MachineInfo` protobuf
1964  // and `Machine` struct for more information.
1966 
1967  struct Maintenance
1968  {
1969  // Holds the maintenance schedule, as given by the operator.
1970  std::list<mesos::maintenance::Schedule> schedules;
1971  } maintenance;
1972 
1973  // Indicates when recovery is complete. Recovery begins once the
1974  // master is elected as a leader.
1976 
1977  // If this is the leading master, we periodically check whether we
1978  // should GC some information from the registry.
1979  Option<process::Timer> registryGcTimer;
1980 
1981  struct Slaves
1982  {
1983  Slaves() : removed(MAX_REMOVED_SLAVES) {}
1984 
1985  // Imposes a time limit for slaves that we recover from the
1986  // registry to reregister with the master.
1987  Option<process::Timer> recoveredTimer;
1988 
1989  // Slaves that have been recovered from the registrar after master
1990  // failover. Slaves are removed from this collection when they
1991  // either reregister with the master or are marked unreachable
1992  // because they do not reregister before `recoveredTimer` fires.
1993  // We must not answer questions related to these slaves (e.g.,
1994  // during task reconciliation) until we determine their fate
1995  // because their are in this transitioning state.
1996  hashmap<SlaveID, SlaveInfo> recovered;
1997 
1998  // Agents that are in the process of (re-)registering. They are
1999  // maintained here while the (re-)registration is in progress and
2000  // possibly pending in the authorizer or the registrar in order
2001  // to help deduplicate (re-)registration requests.
2002  hashset<process::UPID> registering;
2003  hashset<SlaveID> reregistering;
2004 
2005  // Registered slaves are indexed by SlaveID and UPID. Note that
2006  // iteration is supported but is exposed as iteration over a
2007  // hashmap<SlaveID, Slave*> since it is tedious to convert
2008  // the map's key/value iterator into a value iterator.
2009  //
2010  // TODO(bmahler): Consider pulling in boost's multi_index,
2011  // or creating a simpler indexing abstraction in stout.
2012  struct
2013  {
2014  bool contains(const SlaveID& slaveId) const
2015  {
2016  return ids.contains(slaveId);
2017  }
2018 
2019  bool contains(const process::UPID& pid) const
2020  {
2021  return pids.contains(pid);
2022  }
2023 
2024  Slave* get(const SlaveID& slaveId) const
2025  {
2026  return ids.get(slaveId).getOrElse(nullptr);
2027  }
2028 
2029  Slave* get(const process::UPID& pid) const
2030  {
2031  return pids.get(pid).getOrElse(nullptr);
2032  }
2033 
2034  void put(Slave* slave)
2035  {
2036  CHECK_NOTNULL(slave);
2037  ids[slave->id] = slave;
2038  pids[slave->pid] = slave;
2039  }
2040 
2041  void remove(Slave* slave)
2042  {
2043  CHECK_NOTNULL(slave);
2044  ids.erase(slave->id);
2045  pids.erase(slave->pid);
2046  }
2047 
2048  void clear()
2049  {
2050  ids.clear();
2051  pids.clear();
2052  }
2053 
2054  size_t size() const { return ids.size(); }
2055 
2056  typedef hashmap<SlaveID, Slave*>::iterator iterator;
2057  typedef hashmap<SlaveID, Slave*>::const_iterator const_iterator;
2058 
2059  iterator begin() { return ids.begin(); }
2060  iterator end() { return ids.end(); }
2061 
2062  const_iterator begin() const { return ids.begin(); }
2063  const_iterator end() const { return ids.end(); }
2064 
2065  private:
2068  } registered;
2069 
2070  // Slaves that are in the process of being removed from the
2071  // registrar.
2072  hashset<SlaveID> removing;
2073 
2074  // Slaves that are in the process of being marked unreachable.
2075  hashset<SlaveID> markingUnreachable;
2076 
2077  // Slaves that are in the process of being marked gone.
2078  hashset<SlaveID> markingGone;
2079 
2080  // This collection includes agents that have gracefully shutdown,
2081  // as well as those that have been marked unreachable or gone. We
2082  // keep a cache here to prevent this from growing in an unbounded
2083  // manner.
2084  //
2085  // TODO(bmahler): Ideally we could use a cache with set semantics.
2086  //
2087  // TODO(neilc): Consider storing all agent IDs that have been
2088  // marked unreachable by this master.
2090 
2091  // Slaves that have been marked unreachable. We recover this from
2092  // the registry, so it includes slaves marked as unreachable by
2093  // other instances of the master. Note that we use a LinkedHashMap
2094  // to ensure the order of elements here matches the order in the
2095  // registry's unreachable list, which matches the order in which
2096  // agents are marked unreachable. This list is garbage collected;
2097  // GC behavior is governed by the `registry_gc_interval`,
2098  // `registry_max_agent_age`, and `registry_max_agent_count` flags.
2100 
2101  // This helps us look up all unreachable tasks on an agent so we can remove
2102  // them from their primary storage `framework.unreachableTasks` when an
2103  // agent reregisters. This map is bounded by the same GC behavior as
2104  // `unreachable`. When the agent is GC'd from unreachable it's also
2105  // erased from `unreachableTasks`.
2107 
2108  // Slaves that have been marked gone. We recover this from the
2109  // registry, so it includes slaves marked as gone by other instances
2110  // of the master. Note that we use a LinkedHashMap to ensure the order
2111  // of elements here matches the order in the registry's gone list, which
2112  // matches the order in which agents are marked gone.
2114 
2115  // This rate limiter is used to limit the removal of slaves failing
2116  // health checks.
2117  // NOTE: Using a 'shared_ptr' here is OK because 'RateLimiter' is
2118  // a wrapper around libprocess process which is thread safe.
2120  } slaves;
2121 
2122  struct Frameworks
2123  {
2124  Frameworks(const Flags& masterFlags)
2125  : completed(masterFlags.max_completed_frameworks) {}
2126 
2128 
2130 
2131  // Principals of frameworks keyed by PID.
2132  // NOTE: Multiple PIDs can map to the same principal. The
2133  // principal is None when the framework doesn't specify it.
2134  // The differences between this map and 'authenticated' are:
2135  // 1) This map only includes *registered* frameworks. The mapping
2136  // is added when a framework (re-)registers.
2137  // 2) This map includes unauthenticated frameworks (when Master
2138  // allows them) if they have principals specified in
2139  // FrameworkInfo.
2141 
2142  // BoundedRateLimiters keyed by the framework principal.
2143  // Like Metrics::Frameworks, all frameworks of the same principal
2144  // are throttled together at a common rate limit.
2146 
2147  // The default limiter is for frameworks not specified in
2148  // 'flags.rate_limits'.
2150  } frameworks;
2151 
2152  struct Subscribers
2153  {
2154  Subscribers(Master* _master) : master(_master) {};
2155 
2156  // Represents a client subscribed to the 'api/vX' endpoint.
2157  //
2158  // TODO(anand): Add support for filtering. Some subscribers
2159  // might only be interested in a subset of events.
2160  struct Subscriber
2161  {
2163  const HttpConnection& _http,
2165  : http(_http),
2166  principal(_principal)
2167  {
2168  mesos::master::Event event;
2169  event.set_type(mesos::master::Event::HEARTBEAT);
2170 
2171  heartbeater =
2174  "subscriber " + stringify(http.streamId),
2175  event,
2176  http,
2179 
2180  process::spawn(heartbeater.get());
2181  }
2182 
2183  // Not copyable, not assignable.
2184  Subscriber(const Subscriber&) = delete;
2185  Subscriber& operator=(const Subscriber&) = delete;
2186 
2187  // TODO(greggomann): Refactor this function into multiple event-specific
2188  // overloads. See MESOS-8475.
2189  void send(
2191  const process::Owned<ObjectApprovers>& approvers,
2192  const process::Shared<FrameworkInfo>& frameworkInfo,
2193  const process::Shared<Task>& task);
2194 
2196  {
2197  // TODO(anand): Refactor `HttpConnection` to being a RAII class instead.
2198  // It is possible that a caller might accidentally invoke `close()`
2199  // after passing ownership to the `Subscriber` object. See MESOS-5843
2200  // for more details.
2201  http.close();
2202 
2203  terminate(heartbeater.get());
2204  wait(heartbeater.get());
2205  }
2206 
2211  };
2212 
2213  // Sends the event to all subscribers connected to the 'api/vX' endpoint.
2214  void send(
2215  mesos::master::Event&& event,
2216  const Option<FrameworkInfo>& frameworkInfo = None(),
2217  const Option<Task>& task = None());
2218 
2219  Master* master;
2220 
2221  // Active subscribers to the 'api/vX' endpoint keyed by the stream
2222  // identifier.
2224  };
2225 
2226  Subscribers subscribers;
2227 
2228  hashmap<OfferID, Offer*> offers;
2230 
2231  hashmap<OfferID, InverseOffer*> inverseOffers;
2232  hashmap<OfferID, process::Timer> inverseOfferTimers;
2233 
2234  // We track information about roles that we're aware of in the system.
2235  // Specifically, we keep track of the roles when a framework subscribes to
2236  // the role, and/or when there are resources allocated to the role
2237  // (e.g. some tasks and/or executors are consuming resources under the role).
2239 
2240  // Configured role whitelist if using the (deprecated) "explicit
2241  // roles" feature. If this is `None`, any role is allowed.
2242  Option<hashset<std::string>> roleWhitelist;
2243 
2244  // Configured weight for each role, if any. If a role does not
2245  // appear here, it has the default weight of 1.
2247 
2248  // Configured quota for each role, if any. We store quotas by role
2249  // because we set them at the role level.
2251 
2252  // Authenticator names as supplied via flags.
2253  std::vector<std::string> authenticatorNames;
2254 
2255  Option<Authenticator*> authenticator;
2256 
2257  // Frameworks/slaves that are currently in the process of authentication.
2258  // 'authenticating' future is completed when authenticator
2259  // completes authentication.
2260  // The future is removed from the map when master completes authentication.
2262 
2263  // Principals of authenticated frameworks/slaves keyed by PID.
2265 
2266  int64_t nextFrameworkId; // Used to give each framework a unique ID.
2267  int64_t nextOfferId; // Used to give each slot offer a unique ID.
2268  int64_t nextSlaveId; // Used to give each slave a unique ID.
2269 
2270  // NOTE: It is safe to use a 'shared_ptr' because 'Metrics' is
2271  // thread safe.
2272  // TODO(dhamon): This does not need to be a shared_ptr. Metrics contains
2273  // copyable metric types only.
2274  std::shared_ptr<Metrics> metrics;
2275 
2276  // PullGauge handlers.
2277  double _uptime_secs()
2278  {
2279  return (process::Clock::now() - startTime).secs();
2280  }
2281 
2282  double _elected()
2283  {
2284  return elected() ? 1 : 0;
2285  }
2286 
2287  double _slaves_connected();
2288  double _slaves_disconnected();
2289  double _slaves_active();
2290  double _slaves_inactive();
2291  double _slaves_unreachable();
2292 
2293  // TODO(bevers): Remove these and make the above functions
2294  // const instead after MESOS-4995 is resolved.
2295  double _const_slaves_connected() const;
2296  double _const_slaves_disconnected() const;
2297  double _const_slaves_active() const;
2298  double _const_slaves_inactive() const;
2299  double _const_slaves_unreachable() const;
2300 
2301  double _frameworks_connected();
2302  double _frameworks_disconnected();
2303  double _frameworks_active();
2304  double _frameworks_inactive();
2305 
2306  double _outstanding_offers()
2307  {
2308  return static_cast<double>(offers.size());
2309  }
2310 
2311  double _event_queue_messages()
2312  {
2313  return static_cast<double>(eventCount<process::MessageEvent>());
2314  }
2315 
2316  double _event_queue_dispatches()
2317  {
2318  return static_cast<double>(eventCount<process::DispatchEvent>());
2319  }
2320 
2321  double _event_queue_http_requests()
2322  {
2323  return static_cast<double>(eventCount<process::HttpEvent>());
2324  }
2325 
2326  double _tasks_staging();
2327  double _tasks_starting();
2328  double _tasks_running();
2329  double _tasks_unreachable();
2330  double _tasks_killing();
2331 
2332  double _resources_total(const std::string& name);
2333  double _resources_used(const std::string& name);
2334  double _resources_percent(const std::string& name);
2335 
2336  double _resources_revocable_total(const std::string& name);
2337  double _resources_revocable_used(const std::string& name);
2338  double _resources_revocable_percent(const std::string& name);
2339 
2340  process::Time startTime; // Start time used to calculate uptime.
2341 
2342  Option<process::Time> electedTime; // Time when this master is elected.
2343 
2344  // Validates the framework including authorization.
2345  // Returns None if the framework is valid.
2346  // Returns Error if the framework is invalid.
2347  // Returns Failure if authorization returns 'Failure'.
2349  const FrameworkInfo& frameworkInfo,
2350  const process::UPID& from);
2351 };
2352 
2353 
2354 inline std::ostream& operator<<(
2355  std::ostream& stream,
2356  const Framework& framework);
2357 
2358 
2359 // TODO(bmahler): Keeping the task and executor information in sync
2360 // across the Slave and Framework structs is error prone!
2362 {
2363  enum State
2364  {
2365  // Framework has never connected to this master. This implies the
2366  // master failed over and the framework has not yet reregistered,
2367  // but some framework state has been recovered from reregistering
2368  // agents that are running tasks for the framework.
2370 
2371  // Framework was previously connected to this master. A framework
2372  // becomes disconnected when there is a socket error.
2374 
2375  // The framework is connected but not active.
2377 
2378  // Framework is connected and eligible to receive offers. No
2379  // offers will be made to frameworks that are not active.
2380  ACTIVE
2381  };
2382 
2383  Framework(Master* const master,
2384  const Flags& masterFlags,
2385  const FrameworkInfo& info,
2386  const process::UPID& _pid,
2388 
2389  Framework(Master* const master,
2390  const Flags& masterFlags,
2391  const FrameworkInfo& info,
2392  const HttpConnection& _http,
2394 
2395  Framework(Master* const master,
2396  const Flags& masterFlags,
2397  const FrameworkInfo& info);
2398 
2399  ~Framework();
2400 
2401  Task* getTask(const TaskID& taskId);
2402 
2403  void addTask(Task* task);
2404 
2405  // Update framework to recover the resources that were previously
2406  // being used by `task`.
2407  //
2408  // TODO(bmahler): This is a hack for performance. We need to
2409  // maintain resource counters because computing task resources
2410  // functionally for all tasks is expensive, for now.
2411  void recoverResources(Task* task);
2412 
2413  // Sends a message to the connected framework.
2414  template <typename Message>
2415  void send(const Message& message);
2416 
2417  void addCompletedTask(Task&& task);
2418 
2419  void addUnreachableTask(const Task& task);
2420 
2421  // Removes the task. `unreachable` indicates whether the task is removed due
2422  // to being unreachable. Note that we cannot rely on the task state because
2423  // it may not reflect unreachability due to being set to TASK_LOST for
2424  // backwards compatibility.
2425  void removeTask(Task* task, bool unreachable);
2426 
2427  void addOffer(Offer* offer);
2428 
2429  void removeOffer(Offer* offer);
2430 
2431  void addInverseOffer(InverseOffer* inverseOffer);
2432 
2433  void removeInverseOffer(InverseOffer* inverseOffer);
2434 
2435  bool hasExecutor(const SlaveID& slaveId,
2436  const ExecutorID& executorId);
2437 
2438  void addExecutor(const SlaveID& slaveId,
2439  const ExecutorInfo& executorInfo);
2440 
2441  void removeExecutor(const SlaveID& slaveId,
2442  const ExecutorID& executorId);
2443 
2444  void addOperation(Operation* operation);
2445 
2446  Option<Operation*> getOperation(const OperationID& id);
2447 
2448  void recoverResources(Operation* operation);
2449 
2450  void removeOperation(Operation* operation);
2451 
2452  const FrameworkID id() const;
2453 
2454  // Update fields in 'info' using those in 'newInfo'. Currently this
2455  // only updates `role`/`roles`, 'name', 'failover_timeout', 'hostname',
2456  // 'webui_url', 'capabilities', and 'labels'.
2457  void update(const FrameworkInfo& newInfo);
2458 
2459  void updateConnection(const process::UPID& newPid);
2460 
2461  void updateConnection(const HttpConnection& newHttp);
2462 
2463  // Closes the HTTP connection and stops the heartbeat.
2464  //
2465  // TODO(vinod): Currently `state` variable is set separately
2466  // from this method. We need to make sure these are in sync.
2467  void closeHttpConnection();
2468 
2469  void heartbeat();
2470 
2471  bool active() const;
2472  bool connected() const;
2473  bool recovered() const;
2474 
2475  bool isTrackedUnderRole(const std::string& role) const;
2476  void trackUnderRole(const std::string& role);
2477  void untrackUnderRole(const std::string& role);
2478 
2479  void setFrameworkState(const State& _state);
2480 
2481  Master* const master;
2482 
2483  FrameworkInfo info;
2484 
2485  std::set<std::string> roles;
2486 
2488 
2489  // Frameworks can either be connected via HTTP or by message passing
2490  // (scheduler driver). At most one of `http` and `pid` will be set
2491  // according to the last connection made by the framework; neither
2492  // field will be set if the framework is in state `RECOVERED`.
2495 
2497 
2501 
2502  // Tasks that have not yet been launched because they are currently
2503  // being authorized.
2505 
2506  // TODO(bmahler): Make this private to enforce that `addTask()` and
2507  // `removeTask()` are used, and provide a const view into the tasks.
2509 
2510  // Tasks launched by this framework that have reached a terminal
2511  // state and have had all their updates acknowledged. We only keep a
2512  // fixed-size cache to avoid consuming too much memory. We use
2513  // circular_buffer rather than BoundedHashMap because there
2514  // can be multiple completed tasks with the same task ID.
2515  circular_buffer<process::Owned<Task>> completedTasks;
2516 
2517  // When an agent is marked unreachable, tasks running on it are stored
2518  // here. We only keep a fixed-size cache to avoid consuming too much memory.
2519  // NOTE: Non-partition-aware unreachable tasks in this map are marked
2520  // TASK_LOST instead of TASK_UNREACHABLE for backward compatibility.
2522 
2523  hashset<Offer*> offers; // Active offers for framework.
2524 
2525  hashset<InverseOffer*> inverseOffers; // Active inverse offers for framework.
2526 
2527  // TODO(bmahler): Make this private to enforce that `addExecutor()`
2528  // and `removeExecutor()` are used, and provide a const view into
2529  // the executors.
2531 
2532  // Pending operations or terminal operations that have
2533  // unacknowledged status updates.
2535 
2536  // The map from the framework-specified operation ID to the
2537  // corresponding internal operation UUID.
2539 
2540  // NOTE: For the used and offered resources below, we keep the
2541  // total as well as partitioned by SlaveID.
2542  // We expose the total resources via the HTTP endpoint, and we
2543  // keep a running total of the resources because looping over the
2544  // slaves to sum the resources has led to perf issues (MESOS-1862).
2545  // We keep the resources partitioned by SlaveID because non-scalar
2546  // resources can be lost when summing them up across multiple
2547  // slaves (MESOS-2373).
2548  //
2549  // Also note that keeping the totals is safe even though it yields
2550  // incorrect results for non-scalar resources.
2551  // (1) For overlapping set items / ranges across slaves, these
2552  // will get added N times but only represented once.
2553  // (2) When an initial subtraction occurs (N-1), the resource is
2554  // no longer represented. (This is the source of the bug).
2555  // (3) When any further subtractions occur (N-(1+M)), the
2556  // Resources simply ignores the subtraction since there's
2557  // nothing to remove, so this is safe for now.
2558 
2559  // TODO(mpark): Strip the non-scalar resources out of the totals
2560  // in order to avoid reporting incorrect statistics (MESOS-2623).
2561 
2562  // Active task / executor / operation resources.
2564 
2565  // Note that we maintain multiple copies of each shared resource in
2566  // `usedResources` as they are used by multiple tasks.
2568 
2569  // Offered resources.
2572 
2573  // This is only set for HTTP frameworks.
2576 
2577  // This is used for per-framwork metrics.
2579 
2580 private:
2581  Framework(Master* const _master,
2582  const Flags& masterFlags,
2583  const FrameworkInfo& _info,
2584  State state,
2585  const process::Time& time);
2586 
2587  Framework(const Framework&); // No copying.
2588  Framework& operator=(const Framework&); // No assigning.
2589 };
2590 
2591 
2592 // Sends a message to the connected framework.
2593 template <typename Message>
2594 void Framework::send(const Message& message)
2595 {
2596  if (!connected()) {
2597  LOG(WARNING) << "Master attempted to send message to disconnected"
2598  << " framework " << *this;
2599  }
2600 
2601  metrics.incrementEvent(message);
2602 
2603  if (http.isSome()) {
2604  if (!http->send(message)) {
2605  LOG(WARNING) << "Unable to send event to framework " << *this << ":"
2606  << " connection closed";
2607  }
2608  } else {
2609  CHECK_SOME(pid);
2610  master->send(pid.get(), message);
2611  }
2612 }
2613 
2614 
2615 // TODO(bevers): Check if there is anything preventing us from
2616 // returning a const reference here.
2617 inline const FrameworkID Framework::id() const
2618 {
2619  return info.id();
2620 }
2621 
2622 
2623 inline bool Framework::active() const
2624 {
2625  return state == ACTIVE;
2626 }
2627 
2628 
2629 inline bool Framework::connected() const
2630 {
2631  return state == ACTIVE || state == INACTIVE;
2632 }
2633 
2634 
2635 inline bool Framework::recovered() const
2636 {
2637  return state == RECOVERED;
2638 }
2639 
2640 
2641 inline std::ostream& operator<<(
2642  std::ostream& stream,
2643  const Framework& framework)
2644 {
2645  // TODO(vinod): Also log the hostname once FrameworkInfo is properly
2646  // updated on framework failover (MESOS-1784).
2647  stream << framework.id() << " (" << framework.info.name() << ")";
2648 
2649  if (framework.pid.isSome()) {
2650  stream << " at " << framework.pid.get();
2651  }
2652 
2653  return stream;
2654 }
2655 
2656 
2657 // Information about an active role.
2658 struct Role
2659 {
2660  Role() = delete;
2661 
2662  Role(const std::string& _role) : role(_role) {}
2663 
2664  void addFramework(Framework* framework)
2665  {
2666  frameworks[framework->id()] = framework;
2667  }
2668 
2669  void removeFramework(Framework* framework)
2670  {
2671  frameworks.erase(framework->id());
2672  }
2673 
2675  {
2676  Resources resources;
2677 
2678  auto allocatedTo = [](const std::string& role) {
2679  return [role](const Resource& resource) {
2680  CHECK(resource.has_allocation_info());
2681  return resource.allocation_info().role() == role;
2682  };
2683  };
2684 
2685  foreachvalue (Framework* framework, frameworks) {
2686  resources += framework->totalUsedResources.filter(allocatedTo(role));
2687  resources += framework->totalOfferedResources.filter(allocatedTo(role));
2688  }
2689 
2690  return resources;
2691  }
2692 
2693  const std::string role;
2694 
2695  // NOTE: The dynamic role/quota relation is stored in and administrated
2696  // by the master. There is no direct representation of quota information
2697  // here to avoid duplication and to support that an operator can associate
2698  // quota with a role before the role is created. Such ordering of operator
2699  // requests prevents a race of premature unbounded allocation that setting
2700  // quota first is intended to contain.
2701 
2703 };
2704 
2705 } // namespace master {
2706 } // namespace internal {
2707 } // namespace mesos {
2708 
2709 #endif // __MASTER_HPP__
Protocol< RecoverRequest, RecoverResponse > recover
Definition: path.hpp:26
void initialize() override
Invoked when a process gets spawned.
Definition: master.hpp:394
std::string generate(const std::string &prefix="")
Returns &#39;prefix(N)&#39; where N represents the number of instances where the same prefix (wrt...
bool send(const Message &message)
Definition: master.hpp:344
hashmap< ResourceProviderID, ResourceProvider > resourceProviders
Definition: master.hpp:315
Definition: nothing.hpp:16
hashmap< TaskID, Task * > tasks
Definition: master.hpp:2508
Definition: master.hpp:2658
Master *const master
Definition: master.hpp:2481
ContentType
Definition: http.hpp:43
Option< Error > validate(const std::string &imageDir)
hashmap< UUID, Operation * > operations
Definition: master.hpp:247
std::ostream & operator<<(std::ostream &stream, const Future< T > &future)
Definition: future.hpp:1826
Try< bool > update(const std::string &link, const Handle &parent, uint16_t protocol, const action::Mirror &mirror)
void finalize(bool finalize_wsa=false)
Clean up the library.
Try< Bytes > size(const std::string &path, const FollowSymlink follow=FollowSymlink::FOLLOW_SYMLINK)
Definition: stat.hpp:121
Future< Response > request(const Request &request, bool streamedResponse=false)
Asynchronously sends an HTTP request to the process and returns the HTTP response once the entire res...
Subscriber(const HttpConnection &_http, const Option< process::http::authentication::Principal > _principal)
Definition: master.hpp:2162
Option< process::Owned< Heartbeater< mesos::scheduler::Event, v1::scheduler::Event > > > heartbeater
Definition: master.hpp:2575
SlaveInfo info
Definition: master.hpp:187
Definition: master.hpp:27
Role(const std::string &_role)
Definition: master.hpp:2662
Definition: check.hpp:33
const SlaveID id
Definition: master.hpp:186
hashset< Offer * > offers
Definition: master.hpp:250
Option< process::Timer > reregistrationTimer
Definition: master.hpp:216
bool connected
Definition: master.hpp:203
bool initialize(const Option< std::string > &delegate=None(), const Option< std::string > &readwriteAuthenticationRealm=None(), const Option< std::string > &readonlyAuthenticationRealm=None())
Initialize the library.
Resources totalResources
Definition: master.hpp:276
constexpr Duration DEFAULT_HEARTBEAT_INTERVAL
Definition: constants.hpp:60
Definition: protobuf_utils.hpp:261
hashmap< SlaveID, Resources > offeredResources
Definition: master.hpp:2571
Heartbeater(const std::string &_logMessage, const Message &_heartbeatMessage, const HttpConnection &_http, const Duration &_interval, const Option< Duration > &_delay=None(), const Option< lambda::function< void(const Message &)>> &_callback=None())
Definition: master.hpp:378
protobuf::framework::Capabilities capabilities
Definition: master.hpp:2487
std::set< std::string > getRoles(const FrameworkInfo &frameworkInfo)
Resources filter(const lambda::function< bool(const Resource &)> &predicate) const
v1::AgentID evolve(const SlaveID &slaveId)
Result< ProcessStatus > status(pid_t pid)
Definition: proc.hpp:166
InverseOffer * getInverseOffer(Master *master, const OfferID &offerId)
Try< Nothing > machines(const google::protobuf::RepeatedPtrField< MachineID > &ids)
Performs the following checks on a list of machines:
mesos::v1::scheduler::Call Call
Definition: mesos.hpp:2616
Definition: resources.hpp:81
Resources totalUsedResources
Definition: master.hpp:2563
Slave * getSlave(Master *master, const SlaveID &slaveId)
Option< HttpConnection > http
Definition: master.hpp:2493
Definition: flags.hpp:42
Definition: registrar.hpp:88
void addFramework(Framework *framework)
Definition: master.hpp:2664
Option< Error > reregisterSlave(const ReregisterSlaveMessage &message)
Definition: files.hpp:73
Operation
Definition: cgroups.hpp:458
Future< Nothing > redirect(int_fd from, Option< int_fd > to, size_t chunk=4096, const std::vector< lambda::function< void(const std::string &)>> &hooks={})
Redirect output from the &#39;from&#39; file descriptor to the &#39;to&#39; file descriptor (or /dev/null if &#39;to&#39; is ...
UPID spawn(ProcessBase *process, bool manage=false)
Spawn a new process.
Definition: duration.hpp:32
void terminate(const UPID &pid, bool inject=true)
Sends a TerminateEvent to the given process.
hashmap< UUID, Operation * > operations
Definition: master.hpp:312
void send(const process::UPID &to, const google::protobuf::Message &message)
Definition: protobuf.hpp:118
bool isSome() const
Definition: option.hpp:115
Definition: event.hpp:209
process::Future< Nothing > closed() const
Definition: master.hpp:357
Definition: http.hpp:520
Definition: json.hpp:158
hashmap< FrameworkID, hashmap< TaskID, Task * > > tasks
Definition: master.hpp:239
multihashmap< FrameworkID, TaskID > killedTasks
Definition: master.hpp:243
mesos::v1::scheduler::Event Event
Definition: mesos.hpp:2617
Definition: hashmap.hpp:38
Option< UUID > resourceVersion
Definition: master.hpp:289
FrameworkMetrics metrics
Definition: master.hpp:2578
bool contains(const std::string &s, const std::string &substr)
Definition: strings.hpp:423
#define CHECK_SOME(expression)
Definition: check.hpp:50
Definition: master.hpp:332
Resources checkpointedResources
Definition: master.hpp:269
SlaveObserver * observer
Definition: master.hpp:291
Definition: owned.hpp:26
Try< Nothing > unavailability(const Unavailability &unavailability)
An "untyped" PID, used to encapsulate the process ID for lower-layer abstractions (eg...
Definition: pid.hpp:39
BoundedHashMap< TaskID, process::Owned< Task > > unreachableTasks
Definition: master.hpp:2521
process::Time registeredTime
Definition: master.hpp:199
bool active
Definition: master.hpp:208
Definition: http.hpp:341
An abstraction for contending to be a leading master.
Definition: contender.hpp:40
std::set< std::string > roles
Definition: master.hpp:2485
process::UPID pid
Definition: master.hpp:191
Definition: uuid.hpp:35
Definition: protobuf_utils.hpp:455
process::http::Pipe::Writer writer
Definition: master.hpp:362
Option< process::Time > reregisteredTime
Definition: master.hpp:200
Definition: spec.hpp:26
process::Time reregisteredTime
Definition: master.hpp:2499
Master *const master
Definition: master.hpp:185
Timer delay(const Duration &duration, const PID< T > &pid, void(T::*method)())
Definition: delay.hpp:31
std::string encode(const T &record) const
Returns the "Record-IO" encoded record.
Definition: recordio.hpp:66
Option< Error > quotaInfo(const mesos::quota::QuotaInfo &quotaInfo)
const T & get() const &
Definition: option.hpp:118
const std::string role
Definition: master.hpp:2693
Protocol< PromiseRequest, PromiseResponse > promise
Definition: protobuf.hpp:100
process::Time registeredTime
Definition: master.hpp:2498
process::Future< Nothing > destroy(const std::string &hierarchy, const std::string &cgroup="/")
#define foreachvalue(VALUE, ELEMS)
Definition: foreach.hpp:77
Try< int_fd > accept(int_fd s)
Definition: network.hpp:31
Definition: whitelist_watcher.hpp:37
void removeFramework(Framework *framework)
Definition: master.hpp:2669
MasterInfo info() const
Definition: master.hpp:585
bool wait(const UPID &pid, const Duration &duration=Seconds(-1))
Wait for the process to exit for no more than the specified seconds.
UUID resourceVersion
Definition: master.hpp:308
Definition: time.hpp:23
FrameworkInfo info
Definition: master.hpp:2483
ContentType contentType
Definition: master.hpp:363
bool exited(const UPID &from, const UPID &to)
Simulates disconnection of the link between &#39;from&#39; and &#39;to&#39; by sending an ExitedEvent to &#39;to&#39;...
id::UUID streamId
Definition: master.hpp:364
HttpConnection http
Definition: master.hpp:2207
Definition: boundedhashmap.hpp:27
Basic model of an allocator: resources are allocated to a framework in the form of offers...
Definition: allocator.hpp:55
#define flags
Definition: decoder.hpp:18
Definition: none.hpp:27
Definition: attributes.hpp:24
hashset< InverseOffer * > inverseOffers
Definition: master.hpp:2525
Definition: executor.hpp:48
Definition: http.hpp:599
const MachineID machineId
Definition: master.hpp:189
Definition: master.hpp:116
Resources allocatedResources() const
Definition: master.hpp:2674
HttpConnection(const process::http::Pipe::Writer &_writer, ContentType _contentType, id::UUID _streamId)
Definition: master.hpp:334
constexpr size_t MAX_REMOVED_SLAVES
Definition: constants.hpp:92
hashset< InverseOffer * > inverseOffers
Definition: master.hpp:253
bool close()
Definition: master.hpp:352
An abstraction of a Master detector which can be used to detect the leading master from a group...
Definition: detector.hpp:38
static Time now()
The current clock time for either the current process that makes this call or the global clock time i...
Resources totalResources
Definition: master.hpp:295
Definition: event.hpp:103
hashmap< TaskID, TaskInfo > pendingTasks
Definition: master.hpp:2504
Definition: master.hpp:375
Given an encoding function for individual records, this provides encoding from typed records into "Re...
Definition: recordio.hpp:57
Option< process::UPID > pid
Definition: master.hpp:2494
Definition: metrics.hpp:41
State
Definition: master.hpp:2363
Offer * getOffer(Master *master, const OfferID &offerId)
Try< Nothing > create(const std::string &hierarchy, const std::string &cgroup, bool recursive=false)
Try< Nothing > kill(const std::string &hierarchy, const std::string &cgroup, int signal)
hashmap< FrameworkID, Framework * > frameworks
Definition: master.hpp:2702
Try< Nothing > bind(int_fd s, const Address &address)
Definition: network.hpp:46
process::Time unregisteredTime
Definition: master.hpp:2500
std::string serialize(ContentType contentType, const google::protobuf::Message &message)
mesos::v1::scheduler::Response Response
Definition: mesos.hpp:2619
std::string version
Definition: master.hpp:194
URI http(const std::string &host, const std::string &path="/", const Option< int > &port=None(), const Option< std::string > &query=None(), const Option< std::string > &fragment=None(), const Option< std::string > &user=None(), const Option< std::string > &password=None())
Creates an http URI with the given parameters.
Definition: http.hpp:35
ResourceProviderInfo info
Definition: master.hpp:294
hashmap< FrameworkID, hashmap< ExecutorID, ExecutorInfo > > executors
Definition: master.hpp:223
hashset< Offer * > offers
Definition: master.hpp:2523
std::string stringify(int flags)
Definition: owned.hpp:36
Definition: master.hpp:2361
protobuf::slave::Capabilities capabilities
Definition: master.hpp:197
hashmap< SlaveID, hashmap< ExecutorID, ExecutorInfo > > executors
Definition: master.hpp:2530
Definition: process.hpp:501
process::Owned< Heartbeater< mesos::master::Event, v1::master::Event > > heartbeater
Definition: master.hpp:2209
circular_buffer< process::Owned< Task > > completedTasks
Definition: master.hpp:2515
hashmap< OperationID, UUID > operationUUIDs
Definition: master.hpp:2538
hashmap< UUID, Operation * > operations
Definition: master.hpp:2534
Definition: parse.hpp:33
Try< std::set< pid_t > > pids()
Definition: freebsd.hpp:62
const Option< process::http::authentication::Principal > principal
Definition: master.hpp:2210
hashmap< SlaveID, Resources > usedResources
Definition: master.hpp:2567
Resources totalOfferedResources
Definition: master.hpp:2570
Definition: master.hpp:433
PID< MetricsProcess > metrics
Resources offeredResources
Definition: master.hpp:260
hashmap< FrameworkID, hashmap< TaskID, TaskInfo > > pendingTasks
Definition: master.hpp:229
const FrameworkID id() const
Definition: master.hpp:2617
constexpr const char * name
Definition: shell.hpp:43
hashmap< FrameworkID, Resources > usedResources
Definition: master.hpp:258
Try< std::vector< Value > > time(const std::string &hierarchy, const std::string &cgroup)
Try< Nothing > schedule(const mesos::maintenance::Schedule &schedule, const hashmap< MachineID, Machine > &machines)
Performs the following checks on the new maintenance schedule:
Option< Error > registerSlave(const RegisterSlaveMessage &message)
State state
Definition: master.hpp:2496
Future< size_t > send(const int_fd &fd, const void *buf, size_t size)