Apache Mesos
master.hpp
Go to the documentation of this file.
1 // Licensed to the Apache Software Foundation (ASF) under one
2 // or more contributor license agreements. See the NOTICE file
3 // distributed with this work for additional information
4 // regarding copyright ownership. The ASF licenses this file
5 // to you under the Apache License, Version 2.0 (the
6 // "License"); you may not use this file except in compliance
7 // with the License. You may obtain a copy of the License at
8 //
9 // http://www.apache.org/licenses/LICENSE-2.0
10 //
11 // Unless required by applicable law or agreed to in writing, software
12 // distributed under the License is distributed on an "AS IS" BASIS,
13 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 // See the License for the specific language governing permissions and
15 // limitations under the License.
16 
17 #ifndef __MASTER_HPP__
18 #define __MASTER_HPP__
19 
20 #include <stdint.h>
21 
22 #include <list>
23 #include <memory>
24 #include <set>
25 #include <string>
26 #include <vector>
27 
28 #include <boost/circular_buffer.hpp>
29 
30 #include <mesos/mesos.hpp>
31 #include <mesos/resources.hpp>
32 #include <mesos/type_utils.hpp>
33 
35 
39 #include <mesos/master/master.hpp>
40 
42 
43 #include <mesos/quota/quota.hpp>
44 
46 
47 #include <process/limiter.hpp>
48 #include <process/http.hpp>
49 #include <process/owned.hpp>
50 #include <process/process.hpp>
51 #include <process/protobuf.hpp>
52 #include <process/timer.hpp>
53 
55 
56 #include <stout/boundedhashmap.hpp>
57 #include <stout/cache.hpp>
58 #include <stout/foreach.hpp>
59 #include <stout/hashmap.hpp>
60 #include <stout/hashset.hpp>
61 #include <stout/linkedhashmap.hpp>
62 #include <stout/multihashmap.hpp>
63 #include <stout/nothing.hpp>
64 #include <stout/option.hpp>
65 #include <stout/recordio.hpp>
66 #include <stout/try.hpp>
67 #include <stout/uuid.hpp>
68 
69 #include "common/http.hpp"
72 
73 #include "files/files.hpp"
74 
75 #include "internal/devolve.hpp"
76 #include "internal/evolve.hpp"
77 
78 #include "master/constants.hpp"
79 #include "master/flags.hpp"
80 #include "master/machine.hpp"
81 #include "master/metrics.hpp"
82 #include "master/validation.hpp"
83 
84 #include "messages/messages.hpp"
85 
86 namespace process {
87 class RateLimiter; // Forward declaration.
88 }
89 
90 namespace mesos {
91 
92 // Forward declarations.
93 class Authorizer;
94 class ObjectApprovers;
95 
96 namespace internal {
97 
98 // Forward declarations.
99 namespace registry {
100 class Slaves;
101 }
102 
103 class Registry;
104 class WhitelistWatcher;
105 
106 namespace master {
107 
108 class Master;
109 class Registrar;
110 class SlaveObserver;
111 
112 struct BoundedRateLimiter;
113 struct Framework;
114 struct Role;
115 
116 
117 struct Slave
118 {
119 Slave(Master* const _master,
120  SlaveInfo _info,
121  const process::UPID& _pid,
122  const MachineID& _machineId,
123  const std::string& _version,
124  std::vector<SlaveInfo::Capability> _capabilites,
125  const process::Time& _registeredTime,
126  std::vector<Resource> _checkpointedResources,
127  const Option<UUID>& _resourceVersion,
128  std::vector<ExecutorInfo> executorInfos = std::vector<ExecutorInfo>(),
129  std::vector<Task> tasks = std::vector<Task>());
130 
131  ~Slave();
132 
133  Task* getTask(
134  const FrameworkID& frameworkId,
135  const TaskID& taskId) const;
136 
137  void addTask(Task* task);
138 
139  // Update slave to recover the resources that were previously
140  // being used by `task`.
141  //
142  // TODO(bmahler): This is a hack for performance. We need to
143  // maintain resource counters because computing task resources
144  // functionally for all tasks is expensive, for now.
145  void recoverResources(Task* task);
146 
147  void removeTask(Task* task);
148 
149  void addOperation(Operation* operation);
150 
151  void recoverResources(Operation* operation);
152 
153  void removeOperation(Operation* operation);
154 
155  Operation* getOperation(const UUID& uuid) const;
156 
157  void addOffer(Offer* offer);
158 
159  void removeOffer(Offer* offer);
160 
161  void addInverseOffer(InverseOffer* inverseOffer);
162 
163  void removeInverseOffer(InverseOffer* inverseOffer);
164 
165  bool hasExecutor(
166  const FrameworkID& frameworkId,
167  const ExecutorID& executorId) const;
168 
169  void addExecutor(
170  const FrameworkID& frameworkId,
171  const ExecutorInfo& executorInfo);
172 
173  void removeExecutor(
174  const FrameworkID& frameworkId,
175  const ExecutorID& executorId);
176 
177  void apply(const std::vector<ResourceConversion>& conversions);
178 
180  const SlaveInfo& info,
181  const std::string& _version,
182  const std::vector<SlaveInfo::Capability>& _capabilites,
183  const Resources& _checkpointedResources,
184  const Option<UUID>& resourceVersion);
185 
186  Master* const master;
187  const SlaveID id;
188  SlaveInfo info;
189 
190  const MachineID machineId;
191 
193 
194  // TODO(bmahler): Use stout's Version when it can parse labels, etc.
195  std::string version;
196 
197  // Agent capabilities.
199 
202 
203  // Slave becomes disconnected when the socket closes.
204  bool connected;
205 
206  // Slave becomes deactivated when it gets disconnected. In the
207  // future this might also happen via HTTP endpoint.
208  // No offers will be made for a deactivated slave.
209  bool active;
210 
211  // Timer for marking slaves unreachable that become disconnected and
212  // don't reregister. This timeout is larger than the slave
213  // observer's timeout, so typically the slave observer will be the
214  // one to mark such slaves unreachable; this timer is a backup for
215  // when a slave responds to pings but does not reregister (e.g.,
216  // because agent recovery has hung).
218 
219  // Executors running on this slave.
220  //
221  // TODO(bmahler): Make this private to enforce that `addExecutor()`
222  // and `removeExecutor()` are used, and provide a const view into
223  // the executors.
225 
226  // Tasks that have not yet been launched because they are currently
227  // being authorized. This is similar to Framework's pendingTasks but we
228  // track pendingTasks per agent separately to determine if any offer
229  // operation for this agent would change resources requested by these tasks.
231 
232  // Tasks present on this slave.
233  //
234  // TODO(bmahler): Make this private to enforce that `addTask()` and
235  // `removeTask()` are used, and provide a const view into the tasks.
236  //
237  // TODO(bmahler): The task pointer ownership complexity arises from the fact
238  // that we own the pointer here, but it's shared with the Framework struct.
239  // We should find a way to eliminate this.
241 
242  // Tasks that were asked to kill by frameworks.
243  // This is used for reconciliation when the slave reregisters.
245 
246  // Pending operations or terminal operations that have
247  // unacknowledged status updates on this agent.
249 
250  // Active offers on this slave.
252 
253  // Active inverse offers on this slave.
255 
256  // Resources for active task / executors / operations.
257  // Note that we maintain multiple copies of each shared resource in
258  // `usedResources` as they are used by multiple tasks.
260 
262 
263  // Resources that should be checkpointed by the slave (e.g.,
264  // persistent volumes, dynamic reservations, etc). These are either
265  // in use by a task/executor, or are available for use and will be
266  // re-offered to the framework.
267  // TODO(jieyu): `checkpointedResources` is only for agent default
268  // resources. Resources from resource providers are not included in
269  // this field. Consider removing this field.
271 
272  // The current total resources of the slave. Note that this is
273  // different from 'info.resources()' because this also considers
274  // operations (e.g., CREATE, RESERVE) that have been applied and
275  // includes revocable resources and resources from resource
276  // providers as well.
278 
279  // Used to establish the relationship between the operation and the
280  // resources that the operation is operating on. Each resource
281  // provider will keep a resource version UUID, and change it when it
282  // believes that the resources from this resource provider are out
283  // of sync from the master's view. The master will keep track of
284  // the last known resource version UUID for each resource provider,
285  // and attach the resource version UUID in each operation it sends
286  // out. The resource provider should reject operations that have a
287  // different resource version UUID than that it maintains, because
288  // this means the operation is operating on resources that might
289  // have already been invalidated.
291 
292  SlaveObserver* observer;
293 
295  ResourceProviderInfo info;
297 
298  // Used to establish the relationship between the operation and the
299  // resources that the operation is operating on. Each resource
300  // provider will keep a resource version UUID, and change it when it
301  // believes that the resources from this resource provider are out
302  // of sync from the master's view. The master will keep track of
303  // the last known resource version UUID for each resource provider,
304  // and attach the resource version UUID in each operation it sends
305  // out. The resource provider should reject operations that have a
306  // different resource version UUID than that it maintains, because
307  // this means the operation is operating on resources that might
308  // have already been invalidated.
310 
311  // Pending operations or terminal operations that have
312  // unacknowledged status updates.
314  };
315 
317 
318 private:
319  Slave(const Slave&); // No copying.
320  Slave& operator=(const Slave&); // No assigning.
321 };
322 
323 
324 inline std::ostream& operator<<(std::ostream& stream, const Slave& slave)
325 {
326  return stream << slave.id << " at " << slave.pid
327  << " (" << slave.info.hostname() << ")";
328 }
329 
330 
331 // Represents the streaming HTTP connection to a framework or a client
332 // subscribed to the '/api/vX' endpoint.
334 {
336  ContentType _contentType,
337  id::UUID _streamId)
338  : writer(_writer),
339  contentType(_contentType),
340  streamId(_streamId) {}
341 
342  // We need to evolve the internal old style message/unversioned event into a
343  // versioned event e.g., `v1::scheduler::Event` or `v1::master::Event`.
344  template <typename Message, typename Event = v1::scheduler::Event>
345  bool send(const Message& message)
346  {
348  serialize, contentType, lambda::_1));
349 
350  return writer.write(encoder.encode(evolve(message)));
351  }
352 
353  bool close()
354  {
355  return writer.close();
356  }
357 
359  {
360  return writer.readerClosed();
361  }
362 
366 };
367 
368 
369 // This process periodically sends heartbeats to a given HTTP connection.
370 // The `Message` template parameter is the type of the heartbeat event passed
371 // into the heartbeater during construction, while the `Event` template
372 // parameter is the versioned event type which is sent to the client.
373 // The optional delay parameter is used to specify the delay period before it
374 // sends the first heartbeat.
375 template <typename Message, typename Event>
376 class Heartbeater : public process::Process<Heartbeater<Message, Event>>
377 {
378 public:
379  Heartbeater(const std::string& _logMessage,
380  const Message& _heartbeatMessage,
381  const HttpConnection& _http,
382  const Duration& _interval,
383  const Option<Duration>& _delay = None())
384  : process::ProcessBase(process::ID::generate("heartbeater")),
385  logMessage(_logMessage),
386  heartbeatMessage(_heartbeatMessage),
387  http(_http),
388  interval(_interval),
389  delay(_delay) {}
390 
391 protected:
392  virtual void initialize() override
393  {
394  if (delay.isSome()) {
396  delay.get(),
397  this,
399  } else {
400  heartbeat();
401  }
402  }
403 
404 private:
405  void heartbeat()
406  {
407  // Only send a heartbeat if the connection is not closed.
408  if (http.closed().isPending()) {
409  VLOG(2) << "Sending heartbeat to " << logMessage;
410 
411  Message message(heartbeatMessage);
412  http.send<Message, Event>(message);
413  }
414 
416  }
417 
418  const std::string logMessage;
419  const Message heartbeatMessage;
421  const Duration interval;
422  const Option<Duration> delay;
423 };
424 
425 
426 class Master : public ProtobufProcess<Master>
427 {
428 public:
430  Registrar* registrar,
431  Files* files,
434  const Option<Authorizer*>& authorizer,
435  const Option<std::shared_ptr<process::RateLimiter>>&
436  slaveRemovalLimiter,
437  const Flags& flags = Flags());
438 
439  virtual ~Master();
440 
441  // Message handlers.
442  void submitScheduler(
443  const std::string& name);
444 
445  void registerFramework(
446  const process::UPID& from,
447  const FrameworkInfo& frameworkInfo);
448 
449  void reregisterFramework(
450  const process::UPID& from,
451  const FrameworkInfo& frameworkInfo,
452  bool failover);
453 
454  void unregisterFramework(
455  const process::UPID& from,
456  const FrameworkID& frameworkId);
457 
458  void deactivateFramework(
459  const process::UPID& from,
460  const FrameworkID& frameworkId);
461 
462  // TODO(vinod): Remove this once the old driver is removed.
463  void resourceRequest(
464  const process::UPID& from,
465  const FrameworkID& frameworkId,
466  const std::vector<Request>& requests);
467 
468  void launchTasks(
469  const process::UPID& from,
470  LaunchTasksMessage&& launchTasksMessage);
471 
472  void reviveOffers(
473  const process::UPID& from,
474  const FrameworkID& frameworkId,
475  const std::vector<std::string>& role);
476 
477  void killTask(
478  const process::UPID& from,
479  const FrameworkID& frameworkId,
480  const TaskID& taskId);
481 
482  void statusUpdateAcknowledgement(
483  const process::UPID& from,
484  StatusUpdateAcknowledgementMessage&& statusUpdateAcknowledgementMessage);
485 
486  void schedulerMessage(
487  const process::UPID& from,
488  FrameworkToExecutorMessage&& frameworkToExecutorMessage);
489 
490  void executorMessage(
491  const process::UPID& from,
492  ExecutorToFrameworkMessage&& executorToFrameworkMessage);
493 
494  void registerSlave(
495  const process::UPID& from,
496  RegisterSlaveMessage&& registerSlaveMessage);
497 
498  void reregisterSlave(
499  const process::UPID& from,
500  ReregisterSlaveMessage&& incomingMessage);
501 
502  void unregisterSlave(
503  const process::UPID& from,
504  const SlaveID& slaveId);
505 
506  void statusUpdate(
507  StatusUpdateMessage&& statusUpdateMessage);
508 
509  void reconcileTasks(
510  const process::UPID& from,
511  ReconcileTasksMessage&& reconcileTasksMessage);
512 
513  void updateOperationStatus(
514  const UpdateOperationStatusMessage& update);
515 
516  void exitedExecutor(
517  const process::UPID& from,
518  const SlaveID& slaveId,
519  const FrameworkID& frameworkId,
520  const ExecutorID& executorId,
521  int32_t status);
522 
523  void updateSlave(UpdateSlaveMessage&& message);
524 
525  void updateUnavailability(
526  const MachineID& machineId,
528 
529  // Marks the agent unreachable and returns whether the agent was
530  // marked unreachable. Returns false if the agent is already
531  // in a transitioning state or has transitioned into another
532  // state (this includes already being marked unreachable).
533  // The `duringMasterFailover` parameter specifies whether this
534  // agent is transitioning from a recovered state (true) or a
535  // registered state (false).
536  //
537  // Discarding currently not supported.
538  //
539  // Will not return a failure (this will crash the master
540  // internally in the case of a registry failure).
541  process::Future<bool> markUnreachable(
542  const SlaveInfo& slave,
543  bool duringMasterFailover,
544  const std::string& message);
545 
546  void markGone(Slave* slave, const TimeInfo& goneTime);
547 
548  void authenticate(
549  const process::UPID& from,
550  const process::UPID& pid);
551 
552  // TODO(bmahler): It would be preferred to use a unique libprocess
553  // Process identifier (PID is not sufficient) for identifying the
554  // framework instance, rather than relying on re-registration time.
555  void frameworkFailoverTimeout(
556  const FrameworkID& frameworkId,
557  const process::Time& reregisteredTime);
558 
559  void offer(
560  const FrameworkID& frameworkId,
561  const hashmap<std::string, hashmap<SlaveID, Resources>>& resources);
562 
563  void inverseOffer(
564  const FrameworkID& frameworkId,
565  const hashmap<SlaveID, UnavailableResources>& resources);
566 
567  // Invoked when there is a newly elected leading master.
568  // Made public for testing purposes.
569  void detected(const process::Future<Option<MasterInfo>>& _leader);
570 
571  // Invoked when the contender has lost the candidacy.
572  // Made public for testing purposes.
573  void lostCandidacy(const process::Future<Nothing>& lost);
574 
575  // Continuation of recover().
576  // Made public for testing purposes.
577  process::Future<Nothing> _recover(const Registry& registry);
578 
579  MasterInfo info() const
580  {
581  return info_;
582  }
583 
584 protected:
585  void initialize() override;
586  void finalize() override;
587 
588  void consume(process::MessageEvent&& event) override;
589  void consume(process::ExitedEvent&& event) override;
590 
591  void exited(const process::UPID& pid) override;
592  void exited(const FrameworkID& frameworkId, const HttpConnection& http);
593  void _exited(Framework* framework);
594 
595  // Invoked upon noticing a subscriber disconnection.
596  void exited(const id::UUID& id);
597 
598  void agentReregisterTimeout(const SlaveID& slaveId);
599  Nothing _agentReregisterTimeout(const SlaveID& slaveId);
600 
601  // Invoked when the message is ready to be executed after
602  // being throttled.
603  // 'principal' being None indicates it is throttled by
604  // 'defaultLimiter'.
605  void throttled(
606  process::MessageEvent&& event,
607  const Option<std::string>& principal);
608 
609  // Continuations of consume().
610  void _consume(process::MessageEvent&& event);
611  void _consume(process::ExitedEvent&& event);
612 
613  // Helper method invoked when the capacity for a framework
614  // principal is exceeded.
615  void exceededCapacity(
616  const process::MessageEvent& event,
617  const Option<std::string>& principal,
618  uint64_t capacity);
619 
620  // Recovers state from the registrar.
622  void recoveredSlavesTimeout(const Registry& registry);
623 
624  void _registerSlave(
625  const process::UPID& pid,
626  RegisterSlaveMessage&& registerSlaveMessage,
628  const process::Future<bool>& authorized);
629 
630  void __registerSlave(
631  const process::UPID& pid,
632  RegisterSlaveMessage&& registerSlaveMessage,
633  const process::Future<bool>& admit);
634 
635  void _reregisterSlave(
636  const process::UPID& pid,
637  ReregisterSlaveMessage&& incomingMessage,
639  const process::Future<bool>& authorized);
640 
641  void __reregisterSlave(
642  const process::UPID& pid,
643  ReregisterSlaveMessage&& incomingMessage,
644  const process::Future<bool>& readmit);
645 
646  void ___reregisterSlave(
647  const process::UPID& pid,
648  ReregisterSlaveMessage&& incomingMessage,
649  const process::Future<bool>& updated);
650 
651  void updateSlaveFrameworks(
652  Slave* slave,
653  const std::vector<FrameworkInfo>& frameworks);
654 
655  // 'future' is the future returned by the authenticator.
656  void _authenticate(
657  const process::UPID& pid,
658  const process::Future<Option<std::string>>& future);
659 
660  void authenticationTimeout(process::Future<Option<std::string>> future);
661 
662  void fileAttached(const process::Future<Nothing>& result,
663  const std::string& path);
664 
665  // Invoked when the contender has entered the contest.
666  void contended(const process::Future<process::Future<Nothing>>& candidacy);
667 
668  // When a slave that was previously registered with this master
669  // reregisters, we need to reconcile the master's view of the
670  // slave's tasks and executors. This function also sends the
671  // `SlaveReregisteredMessage`.
672  void reconcileKnownSlave(
673  Slave* slave,
674  const std::vector<ExecutorInfo>& executors,
675  const std::vector<Task>& tasks);
676 
677  // Add a framework.
678  void addFramework(
679  Framework* framework,
680  const std::set<std::string>& suppressedRoles);
681 
682  // Recover a framework from its `FrameworkInfo`. This happens after
683  // master failover, when an agent running one of the framework's
684  // tasks reregisters or when the framework itself reregisters,
685  // whichever happens first. The result of this function is a
686  // registered, inactive framework with state `RECOVERED`.
687  void recoverFramework(
688  const FrameworkInfo& info,
689  const std::set<std::string>& suppressedRoles);
690 
691  // Transition a framework from `RECOVERED` to `CONNECTED` state and
692  // activate it. This happens at most once after master failover, the
693  // first time that the framework reregisters with the new master.
694  // Exactly one of `newPid` or `http` must be provided.
695  Try<Nothing> activateRecoveredFramework(
696  Framework* framework,
697  const FrameworkInfo& frameworkInfo,
698  const Option<process::UPID>& pid,
699  const Option<HttpConnection>& http,
700  const std::set<std::string>& suppressedRoles);
701 
702  // Replace the scheduler for a framework with a new process ID, in
703  // the event of a scheduler failover.
704  void failoverFramework(Framework* framework, const process::UPID& newPid);
705 
706  // Replace the scheduler for a framework with a new HTTP connection,
707  // in the event of a scheduler failover.
708  void failoverFramework(Framework* framework, const HttpConnection& http);
709 
710  void _failoverFramework(Framework* framework);
711 
712  // Kill all of a framework's tasks, delete the framework object, and
713  // reschedule offers that were assigned to this framework.
714  void removeFramework(Framework* framework);
715 
716  // Remove a framework from the slave, i.e., remove its tasks and
717  // executors and recover the resources.
718  void removeFramework(Slave* slave, Framework* framework);
719 
720  void updateFramework(
721  Framework* framework,
722  const FrameworkInfo& frameworkInfo,
723  const std::set<std::string>& suppressedRoles);
724 
725  void disconnect(Framework* framework);
726  void deactivate(Framework* framework, bool rescind);
727 
728  void disconnect(Slave* slave);
729  void deactivate(Slave* slave);
730 
731  // Add a slave.
732  void addSlave(
733  Slave* slave,
734  std::vector<Archive::Framework>&& completedFrameworks);
735 
736  void _markUnreachable(
737  const SlaveInfo& slave,
738  const TimeInfo& unreachableTime,
739  bool duringMasterFailover,
740  const std::string& message,
741  bool registrarResult);
742 
743  void sendSlaveLost(const SlaveInfo& slaveInfo);
744 
745  // Remove the slave from the registrar and from the master's state.
746  //
747  // TODO(bmahler): 'reason' is optional until MESOS-2317 is resolved.
748  void removeSlave(
749  Slave* slave,
750  const std::string& message,
752 
753  void _removeSlave(
754  Slave* slave,
755  const process::Future<bool>& registrarResult,
756  const std::string& removalCause,
758 
759  void __removeSlave(
760  Slave* slave,
761  const std::string& message,
762  const Option<TimeInfo>& unreachableTime);
763 
764  // Validates that the framework is authenticated, if required.
765  Option<Error> validateFrameworkAuthentication(
766  const FrameworkInfo& frameworkInfo,
767  const process::UPID& from);
768 
769  // Returns whether the framework is authorized.
770  // Returns failure for transient authorization failures.
771  process::Future<bool> authorizeFramework(
772  const FrameworkInfo& frameworkInfo);
773 
774  // Returns whether the principal is authorized to (re-)register an agent
775  // and whether the `SlaveInfo` is authorized.
776  process::Future<bool> authorizeSlave(
777  const SlaveInfo& slaveInfo,
779 
780  // Returns whether the task is authorized.
781  // Returns failure for transient authorization failures.
782  process::Future<bool> authorizeTask(
783  const TaskInfo& task,
784  Framework* framework);
785 
803  process::Future<bool> authorizeReserveResources(
804  const Offer::Operation::Reserve& reserve,
806 
807  // Authorizes whether the provided `principal` is allowed to reserve
808  // the specified `resources`.
809  process::Future<bool> authorizeReserveResources(
810  const Resources& resources,
812 
830  process::Future<bool> authorizeUnreserveResources(
831  const Offer::Operation::Unreserve& unreserve,
833 
851  process::Future<bool> authorizeCreateVolume(
852  const Offer::Operation::Create& create,
854 
872  process::Future<bool> authorizeDestroyVolume(
873  const Offer::Operation::Destroy& destroy,
875 
876  // Determine if a new executor needs to be launched.
877  bool isLaunchExecutor (
878  const ExecutorID& executorId,
879  Framework* framework,
880  Slave* slave) const;
881 
882  // Add executor to the framework and slave.
883  void addExecutor(
884  const ExecutorInfo& executorInfo,
885  Framework* framework,
886  Slave* slave);
887 
888  // Add task to the framework and slave.
889  void addTask(const TaskInfo& task, Framework* framework, Slave* slave);
890 
891  // Transitions the task, and recovers resources if the task becomes
892  // terminal.
893  void updateTask(Task* task, const StatusUpdate& update);
894 
895  // Removes the task. `unreachable` indicates whether the task is removed due
896  // to being unreachable. Note that we cannot rely on the task state because
897  // it may not reflect unreachability due to being set to TASK_LOST for
898  // backwards compatibility.
899  void removeTask(Task* task, bool unreachable = false);
900 
901  // Remove an executor and recover its resources.
902  void removeExecutor(
903  Slave* slave,
904  const FrameworkID& frameworkId,
905  const ExecutorID& executorId);
906 
907  // Adds the given operation to the framework and the agent.
908  void addOperation(
909  Framework* framework,
910  Slave* slave,
911  Operation* operation);
912 
913  // Transitions the operation, and updates and recovers resources if
914  // the operation becomes terminal. If `convertResources` is `false`
915  // only the consumed resources of terminal operations are recovered,
916  // but no resources are converted.
917  void updateOperation(
918  Operation* operation,
919  const UpdateOperationStatusMessage& update,
920  bool convertResources = true);
921 
922  // Remove the operation.
923  void removeOperation(Operation* operation);
924 
925  // Attempts to update the allocator by applying the given operation.
926  // If successful, updates the slave's resources, sends a
927  // 'CheckpointResourcesMessage' to the slave with the updated
928  // checkpointed resources, and returns a 'Future' with 'Nothing'.
929  // Otherwise, no action is taken and returns a failed 'Future'.
931  Slave* slave,
932  const Offer::Operation& operation);
933 
934  // Forwards the update to the framework.
935  void forward(
936  const StatusUpdate& update,
937  const process::UPID& acknowledgee,
938  Framework* framework);
939 
940  // Remove an offer after specified timeout
941  void offerTimeout(const OfferID& offerId);
942 
943  // Remove an offer and optionally rescind the offer as well.
944  void removeOffer(Offer* offer, bool rescind = false);
945 
946  // Remove an inverse offer after specified timeout
947  void inverseOfferTimeout(const OfferID& inverseOfferId);
948 
949  // Remove an inverse offer and optionally rescind it as well.
950  void removeInverseOffer(InverseOffer* inverseOffer, bool rescind = false);
951 
952  bool isCompletedFramework(const FrameworkID& frameworkId);
953 
954  Framework* getFramework(const FrameworkID& frameworkId) const;
955  Offer* getOffer(const OfferID& offerId) const;
956  InverseOffer* getInverseOffer(const OfferID& inverseOfferId) const;
957 
958  FrameworkID newFrameworkId();
959  OfferID newOfferId();
960  SlaveID newSlaveId();
961 
962 private:
963  // Updates the agent's resources by applying the given operation.
964  // Sends either `ApplyOperationMessage` or
965  // `CheckpointResourcesMessage` (with updated checkpointed
966  // resources) to the agent depending on if the agent has
967  // `RESOURCE_PROVIDER` capability.
968  void _apply(
969  Slave* slave,
970  Framework* framework,
971  const Offer::Operation& operationInfo);
972 
973  void drop(
974  const process::UPID& from,
975  const scheduler::Call& call,
976  const std::string& message);
977 
978  void drop(
979  Framework* framework,
980  const Offer::Operation& operation,
981  const std::string& message);
982 
983  void drop(
984  Framework* framework,
985  const scheduler::Call& call,
986  const std::string& message);
987 
988  void drop(
989  Framework* framework,
990  const scheduler::Call::Suppress& suppress,
991  const std::string& message);
992 
993  void drop(
994  Framework* framework,
995  const scheduler::Call::Revive& revive,
996  const std::string& message);
997 
998  // Call handlers.
999  void receive(
1000  const process::UPID& from,
1001  scheduler::Call&& call);
1002 
1003  void subscribe(
1004  HttpConnection http,
1005  const scheduler::Call::Subscribe& subscribe);
1006 
1007  void _subscribe(
1008  HttpConnection http,
1009  const FrameworkInfo& frameworkInfo,
1010  bool force,
1011  const std::set<std::string>& suppressedRoles,
1012  const process::Future<bool>& authorized);
1013 
1014  void subscribe(
1015  const process::UPID& from,
1016  const scheduler::Call::Subscribe& subscribe);
1017 
1018  void _subscribe(
1019  const process::UPID& from,
1020  const FrameworkInfo& frameworkInfo,
1021  bool force,
1022  const std::set<std::string>& suppressedRoles,
1023  const process::Future<bool>& authorized);
1024 
1025  // Subscribes a client to the 'api/vX' endpoint.
1026  void subscribe(
1027  const HttpConnection& http,
1029 
1030  void teardown(Framework* framework);
1031 
1032  void accept(
1033  Framework* framework,
1034  scheduler::Call::Accept&& accept);
1035 
1036  void _accept(
1037  const FrameworkID& frameworkId,
1038  const SlaveID& slaveId,
1039  const Resources& offeredResources,
1040  scheduler::Call::Accept&& accept,
1041  const process::Future<std::list<process::Future<bool>>>& authorizations);
1042 
1043  void acceptInverseOffers(
1044  Framework* framework,
1045  const scheduler::Call::AcceptInverseOffers& accept);
1046 
1047  void decline(
1048  Framework* framework,
1049  scheduler::Call::Decline&& decline);
1050 
1051  void declineInverseOffers(
1052  Framework* framework,
1053  const scheduler::Call::DeclineInverseOffers& decline);
1054 
1055  void revive(
1056  Framework* framework,
1057  const scheduler::Call::Revive& revive);
1058 
1059  void kill(
1060  Framework* framework,
1061  const scheduler::Call::Kill& kill);
1062 
1063  void shutdown(
1064  Framework* framework,
1065  const scheduler::Call::Shutdown& shutdown);
1066 
1067  void acknowledge(
1068  Framework* framework,
1069  scheduler::Call::Acknowledge&& acknowledge);
1070 
1071  void acknowledgeOperationStatus(
1072  Framework* framework,
1073  scheduler::Call::AcknowledgeOperationStatus&& acknowledge);
1074 
1075  void reconcile(
1076  Framework* framework,
1077  scheduler::Call::Reconcile&& reconcile);
1078 
1079  void reconcileOperations(
1080  Framework* framework,
1081  const scheduler::Call::ReconcileOperations& reconcile);
1082 
1083  void message(
1084  Framework* framework,
1085  scheduler::Call::Message&& message);
1086 
1087  void request(
1088  Framework* framework,
1089  const scheduler::Call::Request& request);
1090 
1091  void suppress(
1092  Framework* framework,
1093  const scheduler::Call::Suppress& suppress);
1094 
1095  bool elected() const
1096  {
1097  return leader.isSome() && leader.get() == info_;
1098  }
1099 
1100  void scheduleRegistryGc();
1101 
1102  void doRegistryGc();
1103 
1104  void _doRegistryGc(
1105  const hashset<SlaveID>& toRemoveUnreachable,
1106  const hashset<SlaveID>& toRemoveGone,
1107  const process::Future<bool>& registrarResult);
1108 
1109  process::Future<bool> authorizeLogAccess(
1111 
1119  bool isWhitelistedRole(const std::string& name) const;
1120 
1128  class QuotaHandler
1129  {
1130  public:
1131  explicit QuotaHandler(Master* _master) : master(_master)
1132  {
1133  CHECK_NOTNULL(master);
1134  }
1135 
1136  // Returns a list of set quotas.
1138  const mesos::master::Call& call,
1140  ContentType contentType) const;
1141 
1143  const process::http::Request& request,
1145  principal) const;
1146 
1148  const mesos::master::Call& call,
1150  principal) const;
1151 
1155  principal) const;
1156 
1158  const mesos::master::Call& call,
1160  principal) const;
1161 
1165  principal) const;
1166 
1167  private:
1168  // Heuristically tries to determine whether a quota request could
1169  // reasonably be satisfied given the current cluster capacity. The
1170  // goal is to determine whether a user may accidentally request an
1171  // amount of resources that would prevent frameworks without quota
1172  // from getting any offers. A force flag will allow users to bypass
1173  // this check.
1174  //
1175  // The heuristic tests whether the total quota, including the new
1176  // request, does not exceed the sum of non-static cluster resources,
1177  // i.e. the following inequality holds:
1178  // total - statically reserved >= total quota + quota request
1179  //
1180  // Please be advised that:
1181  // * It is up to an allocator how to satisfy quota (for example,
1182  // what resources to account towards quota, as well as which
1183  // resources to consider allocatable for quota).
1184  // * Even if there are enough resources at the moment of this check,
1185  // agents may terminate at any time, rendering the cluster under
1186  // quota.
1187  Option<Error> capacityHeuristic(
1188  const mesos::quota::QuotaInfo& request) const;
1189 
1190  // We always want to rescind offers after the capacity heuristic. The
1191  // reason for this is the race between the allocator and the master:
1192  // it can happen that there are not enough free resources at the
1193  // allocator's disposal when it is notified about the quota request,
1194  // but at this point it's too late to rescind.
1195  //
1196  // While rescinding, we adhere to the following rules:
1197  // * Rescind at least as many resources as there are in the quota request.
1198  // * Rescind all offers from an agent in order to make the potential
1199  // offer bigger, which increases the chances that a quota'ed framework
1200  // will be able to use the offer.
1201  // * Rescind offers from at least `numF` agents to make it possible
1202  // (but not guaranteed, due to fair sharing) that each framework in
1203  // the role for which quota is set gets an offer (`numF` is the
1204  // number of frameworks in the quota'ed role). Though this is not
1205  // strictly necessary, we think this will increase the debugability
1206  // and will improve user experience.
1207  //
1208  // TODO(alexr): Consider removing this function once offer management
1209  // (including rescinding) is moved to allocator.
1210  void rescindOffers(const mesos::quota::QuotaInfo& request) const;
1211 
1212  process::Future<bool> authorizeGetQuota(
1214  const mesos::quota::QuotaInfo& quotaInfo) const;
1215 
1216  process::Future<bool> authorizeUpdateQuota(
1218  const mesos::quota::QuotaInfo& quotaInfo) const;
1219 
1222  principal) const;
1223 
1225  const mesos::quota::QuotaRequest& quotaRequest,
1227  principal) const;
1228 
1230  const mesos::quota::QuotaInfo& quotaInfo,
1231  bool forced) const;
1232 
1234  const std::string& role,
1236  principal) const;
1237 
1239  const std::string& role) const;
1240 
1241  // To perform actions related to quota management, we require access to the
1242  // master data structures. No synchronization primitives are needed here
1243  // since `QuotaHandler`'s functions are invoked in the Master's actor.
1244  Master* master;
1245  };
1246 
1254  class WeightsHandler
1255  {
1256  public:
1257  explicit WeightsHandler(Master* _master) : master(_master)
1258  {
1259  CHECK_NOTNULL(master);
1260  }
1261 
1265  principal) const;
1266 
1268  const mesos::master::Call& call,
1270  ContentType contentType) const;
1271 
1273  const process::http::Request& request,
1275  principal) const;
1276 
1278  const mesos::master::Call& call,
1280  ContentType contentType) const;
1281 
1282  private:
1283  process::Future<bool> authorizeGetWeight(
1285  const WeightInfo& weight) const;
1286 
1287  process::Future<bool> authorizeUpdateWeights(
1289  const std::vector<std::string>& roles) const;
1290 
1292  const std::vector<WeightInfo>& weightInfos,
1293  const std::list<bool>& roleAuthorizations) const;
1294 
1297  principal) const;
1298 
1301  const google::protobuf::RepeatedPtrField<WeightInfo>& weightInfos)
1302  const;
1303 
1305  const std::vector<WeightInfo>& weightInfos) const;
1306 
1307  // Rescind all outstanding offers if any of the 'weightInfos' roles has
1308  // an active framework.
1309  void rescindOffers(const std::vector<WeightInfo>& weightInfos) const;
1310 
1311  Master* master;
1312  };
1313 
1314  // Inner class used to namespace HTTP route handlers (see
1315  // master/http.cpp for implementations).
1316  class Http
1317  {
1318  public:
1319  explicit Http(Master* _master) : master(_master),
1320  quotaHandler(_master),
1321  weightsHandler(_master) {}
1322 
1323  // /api/v1
1325  const process::http::Request& request,
1327  principal) const;
1328 
1329  // /api/v1/scheduler
1331  const process::http::Request& request,
1333  principal) const;
1334 
1335  // /master/create-volumes
1337  const process::http::Request& request,
1339  principal) const;
1340 
1341  // /master/destroy-volumes
1343  const process::http::Request& request,
1345  principal) const;
1346 
1347  // /master/flags
1349  const process::http::Request& request,
1351  principal) const;
1352 
1353  // /master/frameworks
1355  const process::http::Request& request,
1357  principal) const;
1358 
1359  // /master/health
1361  const process::http::Request& request) const;
1362 
1363  // /master/redirect
1365  const process::http::Request& request) const;
1366 
1367  // /master/reserve
1369  const process::http::Request& request,
1371  principal) const;
1372 
1373  // /master/roles
1375  const process::http::Request& request,
1377  principal) const;
1378 
1379  // /master/teardown
1381  const process::http::Request& request,
1383  principal) const;
1384 
1385  // /master/slaves
1387  const process::http::Request& request,
1389  principal) const;
1390 
1391  // /master/state
1393  const process::http::Request& request,
1395  principal) const;
1396 
1397  // /master/state-summary
1399  const process::http::Request& request,
1401  principal) const;
1402 
1403  // /master/tasks
1405  const process::http::Request& request,
1407  principal) const;
1408 
1409  // /master/maintenance/schedule
1410  process::Future<process::http::Response> maintenanceSchedule(
1411  const process::http::Request& request,
1413  principal) const;
1414 
1415  // /master/maintenance/status
1416  process::Future<process::http::Response> maintenanceStatus(
1417  const process::http::Request& request,
1419  principal) const;
1420 
1421  // /master/machine/down
1423  const process::http::Request& request,
1425  principal) const;
1426 
1427  // /master/machine/up
1429  const process::http::Request& request,
1431  principal) const;
1432 
1433  // /master/unreserve
1435  const process::http::Request& request,
1437  principal) const;
1438 
1439  // /master/quota
1441  const process::http::Request& request,
1443  principal) const;
1444 
1445  // /master/weights
1447  const process::http::Request& request,
1449  principal) const;
1450 
1451  static std::string API_HELP();
1452  static std::string SCHEDULER_HELP();
1453  static std::string FLAGS_HELP();
1454  static std::string FRAMEWORKS_HELP();
1455  static std::string HEALTH_HELP();
1456  static std::string REDIRECT_HELP();
1457  static std::string ROLES_HELP();
1458  static std::string TEARDOWN_HELP();
1459  static std::string SLAVES_HELP();
1460  static std::string STATE_HELP();
1461  static std::string STATESUMMARY_HELP();
1462  static std::string TASKS_HELP();
1463  static std::string MAINTENANCE_SCHEDULE_HELP();
1464  static std::string MAINTENANCE_STATUS_HELP();
1465  static std::string MACHINE_DOWN_HELP();
1466  static std::string MACHINE_UP_HELP();
1467  static std::string CREATE_VOLUMES_HELP();
1468  static std::string DESTROY_VOLUMES_HELP();
1469  static std::string RESERVE_HELP();
1470  static std::string UNRESERVE_HELP();
1471  static std::string QUOTA_HELP();
1472  static std::string WEIGHTS_HELP();
1473 
1474  private:
1475  JSON::Object __flags() const;
1476 
1477  class FlagsError; // Forward declaration.
1478 
1481  principal) const;
1482 
1484  const size_t limit,
1485  const size_t offset,
1486  const std::string& order,
1488  principal) const;
1489 
1491  const FrameworkID& id,
1493  principal) const;
1494 
1496  const FrameworkID& id) const;
1497 
1498  process::Future<process::http::Response> _updateMaintenanceSchedule(
1499  const mesos::maintenance::Schedule& schedule,
1501  principal) const;
1502 
1503  process::Future<process::http::Response> __updateMaintenanceSchedule(
1504  const mesos::maintenance::Schedule& schedule,
1505  const process::Owned<ObjectApprovers>& approvers) const;
1506 
1507  process::Future<process::http::Response> ___updateMaintenanceSchedule(
1508  const mesos::maintenance::Schedule& schedule,
1509  bool applied) const;
1510 
1511  mesos::maintenance::Schedule _getMaintenanceSchedule(
1512  const process::Owned<ObjectApprovers>& approvers) const;
1513 
1515  const process::Owned<ObjectApprovers>& approvers) const;
1516 
1517  process::Future<process::http::Response> _startMaintenance(
1518  const google::protobuf::RepeatedPtrField<MachineID>& machineIds,
1519  const process::Owned<ObjectApprovers>& approvers) const;
1520 
1522  const google::protobuf::RepeatedPtrField<MachineID>& machineIds,
1523  const process::Owned<ObjectApprovers>& approvers) const;
1524 
1526  const SlaveID& slaveId,
1527  const google::protobuf::RepeatedPtrField<Resource>& resources,
1529  principal) const;
1530 
1532  const SlaveID& slaveId,
1533  const google::protobuf::RepeatedPtrField<Resource>& resources,
1535  principal) const;
1536 
1538  const SlaveID& slaveId,
1539  const google::protobuf::RepeatedPtrField<Resource>& volumes,
1541  principal) const;
1542 
1544  const SlaveID& slaveId,
1545  const google::protobuf::RepeatedPtrField<Resource>& volumes,
1547  principal) const;
1548 
1569  const SlaveID& slaveId,
1570  Resources required,
1571  const Offer::Operation& operation) const;
1572 
1575  principal) const;
1576 
1577  // Master API handlers.
1578 
1580  const mesos::master::Call& call,
1582  ContentType contentType) const;
1583 
1584  mesos::master::Response::GetAgents _getAgents(
1585  const process::Owned<ObjectApprovers>& approvers) const;
1586 
1588  const mesos::master::Call& call,
1590  ContentType contentType) const;
1591 
1593  const mesos::master::Call& call,
1595  ContentType contentType) const;
1596 
1598  const mesos::master::Call& call,
1600  ContentType contentType) const;
1601 
1603  const mesos::master::Call& call,
1605  ContentType contentType) const;
1606 
1608  const mesos::master::Call& call,
1610  ContentType contentType) const;
1611 
1613  const mesos::master::Call& call,
1615  ContentType contentType) const;
1616 
1618  const mesos::master::Call& call,
1620  ContentType contentType) const;
1621 
1623  const mesos::master::Call& call,
1625  ContentType contentType) const;
1626 
1628  const mesos::master::Call& call,
1630  ContentType contentType) const;
1631 
1632  process::Future<process::http::Response> updateMaintenanceSchedule(
1633  const mesos::master::Call& call,
1635  ContentType contentType) const;
1636 
1637  process::Future<process::http::Response> getMaintenanceSchedule(
1638  const mesos::master::Call& call,
1640  ContentType contentType) const;
1641 
1642  process::Future<process::http::Response> getMaintenanceStatus(
1643  const mesos::master::Call& call,
1645  ContentType contentType) const;
1646 
1648  const mesos::master::Call& call,
1650  ContentType contentType) const;
1651 
1653  const mesos::master::Call& call,
1655  ContentType contentType) const;
1656 
1658  const mesos::master::Call& call,
1660  ContentType contentType) const;
1661 
1663  const mesos::master::Call& call,
1665  ContentType contentType) const;
1666 
1667  mesos::master::Response::GetTasks _getTasks(
1668  const process::Owned<ObjectApprovers>& approvers) const;
1669 
1671  const mesos::master::Call& call,
1673  ContentType contentType) const;
1674 
1676  const mesos::master::Call& call,
1678  ContentType contentType) const;
1679 
1681  const mesos::master::Call& call,
1683  ContentType contentType) const;
1684 
1685  process::Future<process::http::Response> unreserveResources(
1686  const mesos::master::Call& call,
1688  ContentType contentType) const;
1689 
1691  const mesos::master::Call& call,
1693  ContentType contentType) const;
1694 
1695  mesos::master::Response::GetFrameworks _getFrameworks(
1696  const process::Owned<ObjectApprovers>& approvers) const;
1697 
1699  const mesos::master::Call& call,
1701  ContentType contentType) const;
1702 
1703  mesos::master::Response::GetExecutors _getExecutors(
1704  const process::Owned<ObjectApprovers>& approvers) const;
1705 
1707  const mesos::master::Call& call,
1709  ContentType contentType) const;
1710 
1711  mesos::master::Response::GetState _getState(
1712  const process::Owned<ObjectApprovers>& approvers) const;
1713 
1715  const mesos::master::Call& call,
1717  ContentType contentType) const;
1718 
1720  const mesos::master::Call& call,
1722  ContentType contentType) const;
1723 
1725  const mesos::master::Call& call,
1727  ContentType contentType) const;
1728 
1730  const mesos::master::Call& call,
1732  ContentType contentType) const;
1733 
1735  const SlaveID& slaveId) const;
1736 
1737  Master* master;
1738 
1739  // NOTE: The quota specific pieces of the Operator API are factored
1740  // out into this separate class.
1741  QuotaHandler quotaHandler;
1742 
1743  // NOTE: The weights specific pieces of the Operator API are factored
1744  // out into this separate class.
1745  WeightsHandler weightsHandler;
1746  };
1747 
1748  Master(const Master&); // No copying.
1749  Master& operator=(const Master&); // No assigning.
1750 
1751  friend struct Framework;
1752  friend struct Metrics;
1753  friend struct Slave;
1754  friend struct SlavesWriter;
1755  friend struct Subscriber;
1756 
1757  // NOTE: Since 'getOffer', 'getInverseOffer' and 'slaves' are
1758  // protected, we need to make the following functions friends.
1759  friend Offer* validation::offer::getOffer(
1760  Master* master, const OfferID& offerId);
1761 
1762  friend InverseOffer* validation::offer::getInverseOffer(
1763  Master* master, const OfferID& offerId);
1764 
1766  Master* master, const SlaveID& slaveId);
1767 
1768  const Flags flags;
1769 
1770  Http http;
1771 
1772  Option<MasterInfo> leader; // Current leading master.
1773 
1774  mesos::allocator::Allocator* allocator;
1775  WhitelistWatcher* whitelistWatcher;
1776  Registrar* registrar;
1777  Files* files;
1778 
1781 
1782  const Option<Authorizer*> authorizer;
1783 
1784  MasterInfo info_;
1785 
1786  // Holds some info which affects how a machine behaves, as well as state that
1787  // represent the master's view of this machine. See the `MachineInfo` protobuf
1788  // and `Machine` struct for more information.
1790 
1791  struct Maintenance
1792  {
1793  // Holds the maintenance schedule, as given by the operator.
1794  std::list<mesos::maintenance::Schedule> schedules;
1795  } maintenance;
1796 
1797  // Indicates when recovery is complete. Recovery begins once the
1798  // master is elected as a leader.
1800 
1801  // If this is the leading master, we periodically check whether we
1802  // should GC some information from the registry.
1803  Option<process::Timer> registryGcTimer;
1804 
1805  struct Slaves
1806  {
1807  Slaves() : removed(MAX_REMOVED_SLAVES) {}
1808 
1809  // Imposes a time limit for slaves that we recover from the
1810  // registry to reregister with the master.
1811  Option<process::Timer> recoveredTimer;
1812 
1813  // Slaves that have been recovered from the registrar after master
1814  // failover. Slaves are removed from this collection when they
1815  // either reregister with the master or are marked unreachable
1816  // because they do not reregister before `recoveredTimer` fires.
1817  // We must not answer questions related to these slaves (e.g.,
1818  // during task reconciliation) until we determine their fate
1819  // because their are in this transitioning state.
1820  hashmap<SlaveID, SlaveInfo> recovered;
1821 
1822  // Agents that are in the process of (re-)registering. They are
1823  // maintained here while the (re-)registration is in progress and
1824  // possibly pending in the authorizer or the registrar in order
1825  // to help deduplicate (re-)registration requests.
1826  hashset<process::UPID> registering;
1827  hashset<SlaveID> reregistering;
1828 
1829  // Registered slaves are indexed by SlaveID and UPID. Note that
1830  // iteration is supported but is exposed as iteration over a
1831  // hashmap<SlaveID, Slave*> since it is tedious to convert
1832  // the map's key/value iterator into a value iterator.
1833  //
1834  // TODO(bmahler): Consider pulling in boost's multi_index,
1835  // or creating a simpler indexing abstraction in stout.
1836  struct
1837  {
1838  bool contains(const SlaveID& slaveId) const
1839  {
1840  return ids.contains(slaveId);
1841  }
1842 
1843  bool contains(const process::UPID& pid) const
1844  {
1845  return pids.contains(pid);
1846  }
1847 
1848  Slave* get(const SlaveID& slaveId) const
1849  {
1850  return ids.get(slaveId).getOrElse(nullptr);
1851  }
1852 
1853  Slave* get(const process::UPID& pid) const
1854  {
1855  return pids.get(pid).getOrElse(nullptr);
1856  }
1857 
1858  void put(Slave* slave)
1859  {
1860  CHECK_NOTNULL(slave);
1861  ids[slave->id] = slave;
1862  pids[slave->pid] = slave;
1863  }
1864 
1865  void remove(Slave* slave)
1866  {
1867  CHECK_NOTNULL(slave);
1868  ids.erase(slave->id);
1869  pids.erase(slave->pid);
1870  }
1871 
1872  void clear()
1873  {
1874  ids.clear();
1875  pids.clear();
1876  }
1877 
1878  size_t size() const { return ids.size(); }
1879 
1880  typedef hashmap<SlaveID, Slave*>::iterator iterator;
1881  typedef hashmap<SlaveID, Slave*>::const_iterator const_iterator;
1882 
1883  iterator begin() { return ids.begin(); }
1884  iterator end() { return ids.end(); }
1885 
1886  const_iterator begin() const { return ids.begin(); }
1887  const_iterator end() const { return ids.end(); }
1888 
1889  private:
1892  } registered;
1893 
1894  // Slaves that are in the process of being removed from the
1895  // registrar.
1896  hashset<SlaveID> removing;
1897 
1898  // Slaves that are in the process of being marked unreachable.
1899  hashset<SlaveID> markingUnreachable;
1900 
1901  // Slaves that are in the process of being marked gone.
1902  hashset<SlaveID> markingGone;
1903 
1904  // This collection includes agents that have gracefully shutdown,
1905  // as well as those that have been marked unreachable or gone. We
1906  // keep a cache here to prevent this from growing in an unbounded
1907  // manner.
1908  //
1909  // TODO(bmahler): Ideally we could use a cache with set semantics.
1910  //
1911  // TODO(neilc): Consider storing all agent IDs that have been
1912  // marked unreachable by this master.
1914 
1915  // Slaves that have been marked unreachable. We recover this from
1916  // the registry, so it includes slaves marked as unreachable by
1917  // other instances of the master. Note that we use a LinkedHashMap
1918  // to ensure the order of elements here matches the order in the
1919  // registry's unreachable list, which matches the order in which
1920  // agents are marked unreachable. This list is garbage collected;
1921  // GC behavior is governed by the `registry_gc_interval`,
1922  // `registry_max_agent_age`, and `registry_max_agent_count` flags.
1924 
1925  // Slaves that have been marked gone. We recover this from the
1926  // registry, so it includes slaves marked as gone by other instances
1927  // of the master. Note that we use a LinkedHashMap to ensure the order
1928  // of elements here matches the order in the registry's gone list, which
1929  // matches the order in which agents are marked gone.
1931 
1932  // This rate limiter is used to limit the removal of slaves failing
1933  // health checks.
1934  // NOTE: Using a 'shared_ptr' here is OK because 'RateLimiter' is
1935  // a wrapper around libprocess process which is thread safe.
1937  } slaves;
1938 
1939  struct Frameworks
1940  {
1941  Frameworks(const Flags& masterFlags)
1942  : completed(masterFlags.max_completed_frameworks) {}
1943 
1945 
1947 
1948  // Principals of frameworks keyed by PID.
1949  // NOTE: Multiple PIDs can map to the same principal. The
1950  // principal is None when the framework doesn't specify it.
1951  // The differences between this map and 'authenticated' are:
1952  // 1) This map only includes *registered* frameworks. The mapping
1953  // is added when a framework (re-)registers.
1954  // 2) This map includes unauthenticated frameworks (when Master
1955  // allows them) if they have principals specified in
1956  // FrameworkInfo.
1958 
1959  // BoundedRateLimiters keyed by the framework principal.
1960  // Like Metrics::Frameworks, all frameworks of the same principal
1961  // are throttled together at a common rate limit.
1963 
1964  // The default limiter is for frameworks not specified in
1965  // 'flags.rate_limits'.
1967  } frameworks;
1968 
1969  struct Subscribers
1970  {
1971  Subscribers(Master* _master) : master(_master) {};
1972 
1973  // Represents a client subscribed to the 'api/vX' endpoint.
1974  //
1975  // TODO(anand): Add support for filtering. Some subscribers
1976  // might only be interested in a subset of events.
1977  struct Subscriber
1978  {
1980  const HttpConnection& _http,
1982  : http(_http),
1983  principal(_principal)
1984  {
1985  mesos::master::Event event;
1986  event.set_type(mesos::master::Event::HEARTBEAT);
1987 
1988  heartbeater =
1991  "subscriber " + stringify(http.streamId),
1992  event,
1993  http,
1996 
1997  process::spawn(heartbeater.get());
1998  }
1999 
2000  // Not copyable, not assignable.
2001  Subscriber(const Subscriber&) = delete;
2002  Subscriber& operator=(const Subscriber&) = delete;
2003 
2004  // TODO(greggomann): Refactor this function into multiple event-specific
2005  // overloads. See MESOS-8475.
2006  void send(
2008  const process::Owned<ObjectApprovers>& approvers,
2009  const process::Shared<FrameworkInfo>& frameworkInfo,
2010  const process::Shared<Task>& task);
2011 
2013  {
2014  // TODO(anand): Refactor `HttpConnection` to being a RAII class instead.
2015  // It is possible that a caller might accidentally invoke `close()`
2016  // after passing ownership to the `Subscriber` object. See MESOS-5843
2017  // for more details.
2018  http.close();
2019 
2020  terminate(heartbeater.get());
2021  wait(heartbeater.get());
2022  }
2023 
2028  };
2029 
2030  // Sends the event to all subscribers connected to the 'api/vX' endpoint.
2031  void send(
2032  mesos::master::Event&& event,
2033  const Option<FrameworkInfo>& frameworkInfo = None(),
2034  const Option<Task>& task = None());
2035 
2036  Master* master;
2037 
2038  // Active subscribers to the 'api/vX' endpoint keyed by the stream
2039  // identifier.
2041  };
2042 
2043  Subscribers subscribers;
2044 
2045  hashmap<OfferID, Offer*> offers;
2047 
2048  hashmap<OfferID, InverseOffer*> inverseOffers;
2049  hashmap<OfferID, process::Timer> inverseOfferTimers;
2050 
2051  // We track information about roles that we're aware of in the system.
2052  // Specifically, we keep track of the roles when a framework subscribes to
2053  // the role, and/or when there are resources allocated to the role
2054  // (e.g. some tasks and/or executors are consuming resources under the role).
2056 
2057  // Configured role whitelist if using the (deprecated) "explicit
2058  // roles" feature. If this is `None`, any role is allowed.
2059  Option<hashset<std::string>> roleWhitelist;
2060 
2061  // Configured weight for each role, if any. If a role does not
2062  // appear here, it has the default weight of 1.
2064 
2065  // Configured quota for each role, if any. We store quotas by role
2066  // because we set them at the role level.
2068 
2069  // Authenticator names as supplied via flags.
2070  std::vector<std::string> authenticatorNames;
2071 
2072  Option<Authenticator*> authenticator;
2073 
2074  // Frameworks/slaves that are currently in the process of authentication.
2075  // 'authenticating' future is completed when authenticator
2076  // completes authentication.
2077  // The future is removed from the map when master completes authentication.
2079 
2080  // Principals of authenticated frameworks/slaves keyed by PID.
2082 
2083  int64_t nextFrameworkId; // Used to give each framework a unique ID.
2084  int64_t nextOfferId; // Used to give each slot offer a unique ID.
2085  int64_t nextSlaveId; // Used to give each slave a unique ID.
2086 
2087  // NOTE: It is safe to use a 'shared_ptr' because 'Metrics' is
2088  // thread safe.
2089  // TODO(dhamon): This does not need to be a shared_ptr. Metrics contains
2090  // copyable metric types only.
2091  std::shared_ptr<Metrics> metrics;
2092 
2093  // Gauge handlers.
2094  double _uptime_secs()
2095  {
2096  return (process::Clock::now() - startTime).secs();
2097  }
2098 
2099  double _elected()
2100  {
2101  return elected() ? 1 : 0;
2102  }
2103 
2104  double _slaves_connected();
2105  double _slaves_disconnected();
2106  double _slaves_active();
2107  double _slaves_inactive();
2108  double _slaves_unreachable();
2109 
2110  double _frameworks_connected();
2111  double _frameworks_disconnected();
2112  double _frameworks_active();
2113  double _frameworks_inactive();
2114 
2115  double _outstanding_offers()
2116  {
2117  return static_cast<double>(offers.size());
2118  }
2119 
2120  double _event_queue_messages()
2121  {
2122  return static_cast<double>(eventCount<process::MessageEvent>());
2123  }
2124 
2125  double _event_queue_dispatches()
2126  {
2127  return static_cast<double>(eventCount<process::DispatchEvent>());
2128  }
2129 
2130  double _event_queue_http_requests()
2131  {
2132  return static_cast<double>(eventCount<process::HttpEvent>());
2133  }
2134 
2135  double _tasks_staging();
2136  double _tasks_starting();
2137  double _tasks_running();
2138  double _tasks_unreachable();
2139  double _tasks_killing();
2140 
2141  double _resources_total(const std::string& name);
2142  double _resources_used(const std::string& name);
2143  double _resources_percent(const std::string& name);
2144 
2145  double _resources_revocable_total(const std::string& name);
2146  double _resources_revocable_used(const std::string& name);
2147  double _resources_revocable_percent(const std::string& name);
2148 
2149  process::Time startTime; // Start time used to calculate uptime.
2150 
2151  Option<process::Time> electedTime; // Time when this master is elected.
2152 
2153  // Validates the framework including authorization.
2154  // Returns None if the framework is valid.
2155  // Returns Error if the framework is invalid.
2156  // Returns Failure if authorization returns 'Failure'.
2158  const FrameworkInfo& frameworkInfo,
2159  const process::UPID& from);
2160 };
2161 
2162 
2163 inline std::ostream& operator<<(
2164  std::ostream& stream,
2165  const Framework& framework);
2166 
2167 
2168 // TODO(bmahler): Keeping the task and executor information in sync
2169 // across the Slave and Framework structs is error prone!
2171 {
2172  enum State
2173  {
2174  // Framework has never connected to this master. This implies the
2175  // master failed over and the framework has not yet reregistered,
2176  // but some framework state has been recovered from reregistering
2177  // agents that are running tasks for the framework.
2179 
2180  // Framework was previously connected to this master. A framework
2181  // becomes disconnected when there is a socket error.
2183 
2184  // The framework is connected but not active.
2186 
2187  // Framework is connected and eligible to receive offers. No
2188  // offers will be made to frameworks that are not active.
2189  ACTIVE
2190  };
2191 
2193  const Flags& masterFlags,
2194  const FrameworkInfo& info,
2195  const process::UPID& _pid,
2197  : Framework(master, masterFlags, info, ACTIVE, time)
2198  {
2199  pid = _pid;
2200  }
2201 
2202  Framework(Master* const master,
2203  const Flags& masterFlags,
2204  const FrameworkInfo& info,
2205  const HttpConnection& _http,
2207  : Framework(master, masterFlags, info, ACTIVE, time)
2208  {
2209  http = _http;
2210  }
2211 
2212  Framework(Master* const master,
2213  const Flags& masterFlags,
2214  const FrameworkInfo& info)
2215  : Framework(master, masterFlags, info, RECOVERED, process::Time()) {}
2216 
2218  {
2219  if (http.isSome()) {
2220  closeHttpConnection();
2221  }
2222  }
2223 
2224  Task* getTask(const TaskID& taskId)
2225  {
2226  if (tasks.count(taskId) > 0) {
2227  return tasks[taskId];
2228  }
2229 
2230  return nullptr;
2231  }
2232 
2233  void addTask(Task* task)
2234  {
2235  CHECK(!tasks.contains(task->task_id()))
2236  << "Duplicate task " << task->task_id()
2237  << " of framework " << task->framework_id();
2238 
2239  // Verify that Resource.AllocationInfo is set,
2240  // this should be guaranteed by the master.
2241  foreach (const Resource& resource, task->resources()) {
2242  CHECK(resource.has_allocation_info());
2243  }
2244 
2245  tasks[task->task_id()] = task;
2246 
2247  // Unreachable tasks should be added via `addUnreachableTask`.
2248  CHECK(task->state() != TASK_UNREACHABLE)
2249  << "Task '" << task->task_id() << "' of framework " << id()
2250  << " added in TASK_UNREACHABLE state";
2251 
2252  // Since we track terminal but unacknowledged tasks within
2253  // `tasks` rather than `completedTasks`, we need to handle
2254  // them here: don't count them as consuming resources.
2255  //
2256  // TODO(bmahler): Users currently get confused because
2257  // terminal tasks can show up as "active" tasks in the UI and
2258  // endpoints. Ideally, we show the terminal unacknowledged
2259  // tasks as "completed" as well.
2260  if (!protobuf::isTerminalState(task->state())) {
2261  // Note that we explicitly convert from protobuf to `Resources` once
2262  // and then use the result for calculations to avoid performance penalty
2263  // for multiple conversions and validations implied by `+=` with protobuf
2264  // arguments.
2265  // Conversion is safe, as resources have already passed validation.
2266  const Resources resources = task->resources();
2267  totalUsedResources += resources;
2268  usedResources[task->slave_id()] += resources;
2269 
2270  // It's possible that we're not tracking the task's role for
2271  // this framework if the role is absent from the framework's
2272  // set of roles. In this case, we track the role's allocation
2273  // for this framework.
2274  CHECK(!task->resources().empty());
2275  const std::string& role =
2276  task->resources().begin()->allocation_info().role();
2277 
2278  if (!isTrackedUnderRole(role)) {
2279  trackUnderRole(role);
2280  }
2281  }
2282 
2283  if (!master->subscribers.subscribed.empty()) {
2284  master->subscribers.send(
2286  info);
2287  }
2288  }
2289 
2290  // Update framework to recover the resources that were previously
2291  // being used by `task`.
2292  //
2293  // TODO(bmahler): This is a hack for performance. We need to
2294  // maintain resource counters because computing task resources
2295  // functionally for all tasks is expensive, for now.
2296  void recoverResources(Task* task)
2297  {
2298  CHECK(tasks.contains(task->task_id()))
2299  << "Unknown task " << task->task_id()
2300  << " of framework " << task->framework_id();
2301 
2302  totalUsedResources -= task->resources();
2303  usedResources[task->slave_id()] -= task->resources();
2304  if (usedResources[task->slave_id()].empty()) {
2305  usedResources.erase(task->slave_id());
2306  }
2307 
2308  // If we are no longer subscribed to the role to which these resources are
2309  // being returned to, and we have no more resources allocated to us for that
2310  // role, stop tracking the framework under the role.
2311  CHECK(!task->resources().empty());
2312  const std::string& role =
2313  task->resources().begin()->allocation_info().role();
2314 
2315  auto allocatedToRole = [&role](const Resource& resource) {
2316  return resource.allocation_info().role() == role;
2317  };
2318 
2319  if (roles.count(role) == 0 &&
2320  totalUsedResources.filter(allocatedToRole).empty()) {
2321  CHECK(totalOfferedResources.filter(allocatedToRole).empty());
2322  untrackUnderRole(role);
2323  }
2324  }
2325 
2326  // Sends a message to the connected framework.
2327  template <typename Message>
2328  void send(const Message& message)
2329  {
2330  if (!connected()) {
2331  LOG(WARNING) << "Master attempted to send message to disconnected"
2332  << " framework " << *this;
2333  }
2334 
2335  if (http.isSome()) {
2336  if (!http->send(message)) {
2337  LOG(WARNING) << "Unable to send event to framework " << *this << ":"
2338  << " connection closed";
2339  }
2340  } else {
2341  CHECK_SOME(pid);
2342  master->send(pid.get(), message);
2343  }
2344  }
2345 
2346  void addCompletedTask(Task&& task)
2347  {
2348  // TODO(neilc): We currently allow frameworks to reuse the task
2349  // IDs of completed tasks (although this is discouraged). This
2350  // means that there might be multiple completed tasks with the
2351  // same task ID. We should consider rejecting attempts to reuse
2352  // task IDs (MESOS-6779).
2353  completedTasks.push_back(process::Owned<Task>(new Task(std::move(task))));
2354  }
2355 
2356  void addUnreachableTask(const Task& task)
2357  {
2358  // TODO(adam-mesos): Check if unreachable task already exists.
2359  unreachableTasks.set(task.task_id(), process::Owned<Task>(new Task(task)));
2360  }
2361 
2362  // Removes the task. `unreachable` indicates whether the task is removed due
2363  // to being unreachable. Note that we cannot rely on the task state because
2364  // it may not reflect unreachability due to being set to TASK_LOST for
2365  // backwards compatibility.
2366  void removeTask(Task* task, bool unreachable)
2367  {
2368  CHECK(tasks.contains(task->task_id()))
2369  << "Unknown task " << task->task_id()
2370  << " of framework " << task->framework_id();
2371 
2372  // The invariant here is that the master will have already called
2373  // `recoverResources()` prior to removing terminal or unreachable tasks.
2374  if (!protobuf::isTerminalState(task->state()) &&
2375  task->state() != TASK_UNREACHABLE) {
2376  recoverResources(task);
2377  }
2378 
2379  if (unreachable) {
2380  addUnreachableTask(*task);
2381  } else {
2382  CHECK(task->state() != TASK_UNREACHABLE);
2383 
2384  // TODO(bmahler): This moves a potentially non-terminal task into
2385  // the completed list!
2386  addCompletedTask(Task(*task));
2387  }
2388 
2389  tasks.erase(task->task_id());
2390  }
2391 
2392  void addOffer(Offer* offer)
2393  {
2394  CHECK(!offers.contains(offer)) << "Duplicate offer " << offer->id();
2395  offers.insert(offer);
2396  totalOfferedResources += offer->resources();
2397  offeredResources[offer->slave_id()] += offer->resources();
2398  }
2399 
2400  void removeOffer(Offer* offer)
2401  {
2402  CHECK(offers.find(offer) != offers.end())
2403  << "Unknown offer " << offer->id();
2404 
2405  totalOfferedResources -= offer->resources();
2406  offeredResources[offer->slave_id()] -= offer->resources();
2407  if (offeredResources[offer->slave_id()].empty()) {
2408  offeredResources.erase(offer->slave_id());
2409  }
2410 
2411  offers.erase(offer);
2412  }
2413 
2414  void addInverseOffer(InverseOffer* inverseOffer)
2415  {
2416  CHECK(!inverseOffers.contains(inverseOffer))
2417  << "Duplicate inverse offer " << inverseOffer->id();
2418  inverseOffers.insert(inverseOffer);
2419  }
2420 
2421  void removeInverseOffer(InverseOffer* inverseOffer)
2422  {
2423  CHECK(inverseOffers.contains(inverseOffer))
2424  << "Unknown inverse offer " << inverseOffer->id();
2425 
2426  inverseOffers.erase(inverseOffer);
2427  }
2428 
2429  bool hasExecutor(const SlaveID& slaveId,
2430  const ExecutorID& executorId)
2431  {
2432  return executors.contains(slaveId) &&
2433  executors[slaveId].contains(executorId);
2434  }
2435 
2436  void addExecutor(const SlaveID& slaveId,
2437  const ExecutorInfo& executorInfo)
2438  {
2439  CHECK(!hasExecutor(slaveId, executorInfo.executor_id()))
2440  << "Duplicate executor '" << executorInfo.executor_id()
2441  << "' on agent " << slaveId;
2442 
2443  // Verify that Resource.AllocationInfo is set,
2444  // this should be guaranteed by the master.
2445  foreach (const Resource& resource, executorInfo.resources()) {
2446  CHECK(resource.has_allocation_info());
2447  }
2448 
2449  executors[slaveId][executorInfo.executor_id()] = executorInfo;
2450  totalUsedResources += executorInfo.resources();
2451  usedResources[slaveId] += executorInfo.resources();
2452 
2453  // It's possible that we're not tracking the task's role for
2454  // this framework if the role is absent from the framework's
2455  // set of roles. In this case, we track the role's allocation
2456  // for this framework.
2457  if (!executorInfo.resources().empty()) {
2458  const std::string& role =
2459  executorInfo.resources().begin()->allocation_info().role();
2460 
2461  if (!isTrackedUnderRole(role)) {
2462  trackUnderRole(role);
2463  }
2464  }
2465  }
2466 
2467  void removeExecutor(const SlaveID& slaveId,
2468  const ExecutorID& executorId)
2469  {
2470  CHECK(hasExecutor(slaveId, executorId))
2471  << "Unknown executor '" << executorId
2472  << "' of framework " << id()
2473  << " of agent " << slaveId;
2474 
2475  const ExecutorInfo& executorInfo = executors[slaveId][executorId];
2476 
2477  totalUsedResources -= executorInfo.resources();
2478  usedResources[slaveId] -= executorInfo.resources();
2479  if (usedResources[slaveId].empty()) {
2480  usedResources.erase(slaveId);
2481  }
2482 
2483  // If we are no longer subscribed to the role to which these resources are
2484  // being returned to, and we have no more resources allocated to us for that
2485  // role, stop tracking the framework under the role.
2486  if (!executorInfo.resources().empty()) {
2487  const std::string& role =
2488  executorInfo.resources().begin()->allocation_info().role();
2489 
2490  auto allocatedToRole = [&role](const Resource& resource) {
2491  return resource.allocation_info().role() == role;
2492  };
2493 
2494  if (roles.count(role) == 0 &&
2495  totalUsedResources.filter(allocatedToRole).empty()) {
2496  CHECK(totalOfferedResources.filter(allocatedToRole).empty());
2497  untrackUnderRole(role);
2498  }
2499  }
2500 
2501  executors[slaveId].erase(executorId);
2502  if (executors[slaveId].empty()) {
2503  executors.erase(slaveId);
2504  }
2505  }
2506 
2507  void addOperation(Operation* operation)
2508  {
2509  CHECK(operation->has_framework_id());
2510 
2511  const FrameworkID& frameworkId = operation->framework_id();
2512 
2513  const UUID& uuid = operation->uuid();
2514 
2515  CHECK(!operations.contains(uuid))
2516  << "Duplicate operation '" << operation->info().id()
2517  << "' (uuid: " << uuid << ") "
2518  << "of framework " << frameworkId;
2519 
2520  operations.put(uuid, operation);
2521 
2522  if (operation->info().has_id()) {
2523  operationUUIDs.put(operation->info().id(), uuid);
2524  }
2525 
2526  if (!protobuf::isSpeculativeOperation(operation->info()) &&
2527  !protobuf::isTerminalState(operation->latest_status().state())) {
2528  Try<Resources> consumed =
2529  protobuf::getConsumedResources(operation->info());
2530  CHECK_SOME(consumed);
2531 
2532  CHECK(operation->has_slave_id())
2533  << "External resource provider is not supported yet";
2534 
2535  const SlaveID& slaveId = operation->slave_id();
2536 
2537  totalUsedResources += consumed.get();
2538  usedResources[slaveId] += consumed.get();
2539 
2540  // It's possible that we're not tracking the role from the
2541  // resources in the operation for this framework if the role is
2542  // absent from the framework's set of roles. In this case, we
2543  // track the role's allocation for this framework.
2544  foreachkey (const std::string& role, consumed->allocations()) {
2545  if (!isTrackedUnderRole(role)) {
2546  trackUnderRole(role);
2547  }
2548  }
2549  }
2550  }
2551 
2552  void recoverResources(Operation* operation)
2553  {
2554  CHECK(operation->has_slave_id())
2555  << "External resource provider is not supported yet";
2556 
2557  const SlaveID& slaveId = operation->slave_id();
2558 
2559  if (protobuf::isSpeculativeOperation(operation->info())) {
2560  return;
2561  }
2562 
2563  Try<Resources> consumed = protobuf::getConsumedResources(operation->info());
2564  CHECK_SOME(consumed);
2565 
2566  CHECK(totalUsedResources.contains(consumed.get()))
2567  << "Tried to recover resources " << consumed.get()
2568  << " which do not seem used";
2569 
2570  CHECK(usedResources[slaveId].contains(consumed.get()))
2571  << "Tried to recover resources " << consumed.get() << " of agent "
2572  << slaveId << " which do not seem used";
2573 
2574  totalUsedResources -= consumed.get();
2575  usedResources[slaveId] -= consumed.get();
2576  if (usedResources[slaveId].empty()) {
2577  usedResources.erase(slaveId);
2578  }
2579 
2580  // If we are no longer subscribed to the role to which these
2581  // resources are being returned to, and we have no more resources
2582  // allocated to us for that role, stop tracking the framework
2583  // under the role.
2584  foreachkey (const std::string& role, consumed->allocations()) {
2585  auto allocatedToRole = [&role](const Resource& resource) {
2586  return resource.allocation_info().role() == role;
2587  };
2588 
2589  if (roles.count(role) == 0 &&
2590  totalUsedResources.filter(allocatedToRole).empty()) {
2591  CHECK(totalOfferedResources.filter(allocatedToRole).empty());
2592  untrackUnderRole(role);
2593  }
2594  }
2595  }
2596 
2597  void removeOperation(Operation* operation)
2598  {
2599  const UUID& uuid = operation->uuid();
2600 
2601  CHECK(operations.contains(uuid))
2602  << "Unknown operation '" << operation->info().id()
2603  << "' (uuid: " << uuid << ") "
2604  << "of framework " << operation->framework_id();
2605 
2606  if (!protobuf::isSpeculativeOperation(operation->info()) &&
2607  !protobuf::isTerminalState(operation->latest_status().state())) {
2608  recoverResources(operation);
2609  }
2610 
2611  if (operation->info().has_id()) {
2612  operationUUIDs.erase(operation->info().id());
2613  }
2614 
2615  operations.erase(uuid);
2616  }
2617 
2618  const FrameworkID id() const { return info.id(); }
2619 
2620  // Update fields in 'info' using those in 'newInfo'. Currently this
2621  // only updates `role`/`roles`, 'name', 'failover_timeout', 'hostname',
2622  // 'webui_url', 'capabilities', and 'labels'.
2623  void update(const FrameworkInfo& newInfo)
2624  {
2625  // We only merge 'info' from the same framework 'id'.
2626  CHECK_EQ(info.id(), newInfo.id());
2627 
2628  // Save the old list of roles for later.
2629  std::set<std::string> oldRoles = roles;
2630 
2631  // TODO(jmlvanre): Merge other fields as per design doc in
2632  // MESOS-703.
2633 
2634  info.clear_role();
2635  info.clear_roles();
2636 
2637  if (newInfo.has_role()) {
2638  info.set_role(newInfo.role());
2639  }
2640 
2641  if (newInfo.roles_size() > 0) {
2642  info.mutable_roles()->CopyFrom(newInfo.roles());
2643  }
2644 
2645  roles = protobuf::framework::getRoles(newInfo);
2646 
2647  if (newInfo.user() != info.user()) {
2648  LOG(WARNING) << "Cannot update FrameworkInfo.user to '" << newInfo.user()
2649  << "' for framework " << id() << ". Check MESOS-703";
2650  }
2651 
2652  info.set_name(newInfo.name());
2653 
2654  if (newInfo.has_failover_timeout()) {
2655  info.set_failover_timeout(newInfo.failover_timeout());
2656  } else {
2657  info.clear_failover_timeout();
2658  }
2659 
2660  if (newInfo.checkpoint() != info.checkpoint()) {
2661  LOG(WARNING) << "Cannot update FrameworkInfo.checkpoint to '"
2662  << stringify(newInfo.checkpoint()) << "' for framework "
2663  << id() << ". Check MESOS-703";
2664  }
2665 
2666  if (newInfo.has_hostname()) {
2667  info.set_hostname(newInfo.hostname());
2668  } else {
2669  info.clear_hostname();
2670  }
2671 
2672  if (newInfo.principal() != info.principal()) {
2673  LOG(WARNING) << "Cannot update FrameworkInfo.principal to '"
2674  << newInfo.principal() << "' for framework " << id()
2675  << ". Check MESOS-703";
2676  }
2677 
2678  if (newInfo.has_webui_url()) {
2679  info.set_webui_url(newInfo.webui_url());
2680  } else {
2681  info.clear_webui_url();
2682  }
2683 
2684  if (newInfo.capabilities_size() > 0) {
2685  info.mutable_capabilities()->CopyFrom(newInfo.capabilities());
2686  } else {
2687  info.clear_capabilities();
2688  }
2689  capabilities = protobuf::framework::Capabilities(info.capabilities());
2690 
2691  if (newInfo.has_labels()) {
2692  info.mutable_labels()->CopyFrom(newInfo.labels());
2693  } else {
2694  info.clear_labels();
2695  }
2696 
2697  const std::set<std::string>& newRoles = roles;
2698 
2699  const std::set<std::string> removedRoles = [&]() {
2700  std::set<std::string> result = oldRoles;
2701  foreach (const std::string& role, newRoles) {
2702  result.erase(role);
2703  }
2704  return result;
2705  }();
2706 
2707  foreach (const std::string& role, removedRoles) {
2708  auto allocatedToRole = [&role](const Resource& resource) {
2709  return resource.allocation_info().role() == role;
2710  };
2711 
2712  // Stop tracking the framework under this role if there are
2713  // no longer any resources allocated to it.
2714  if (totalUsedResources.filter(allocatedToRole).empty()) {
2715  CHECK(totalOfferedResources.filter(allocatedToRole).empty());
2716  untrackUnderRole(role);
2717  }
2718  }
2719 
2720  const std::set<std::string> addedRoles = [&]() {
2721  std::set<std::string> result = newRoles;
2722  foreach (const std::string& role, oldRoles) {
2723  result.erase(role);
2724  }
2725  return result;
2726  }();
2727 
2728  foreach (const std::string& role, addedRoles) {
2729  // NOTE: It's possible that we're already tracking this framework
2730  // under the role because a framework can unsubscribe from a role
2731  // while it still has resources allocated to the role.
2732  if (!isTrackedUnderRole(role)) {
2733  trackUnderRole(role);
2734  }
2735  }
2736  }
2737 
2738  void updateConnection(const process::UPID& newPid)
2739  {
2740  // Cleanup the HTTP connnection if this is a downgrade from HTTP
2741  // to PID. Note that the connection may already be closed.
2742  if (http.isSome()) {
2743  closeHttpConnection();
2744  }
2745 
2746  // TODO(benh): unlink(oldPid);
2747  pid = newPid;
2748  }
2749 
2750  void updateConnection(const HttpConnection& newHttp)
2751  {
2752  if (pid.isSome()) {
2753  // Wipe the PID if this is an upgrade from PID to HTTP.
2754  // TODO(benh): unlink(oldPid);
2755  pid = None();
2756  } else if (http.isSome()) {
2757  // Cleanup the old HTTP connection.
2758  // Note that master creates a new HTTP connection for every
2759  // subscribe request, so 'newHttp' should always be different
2760  // from 'http'.
2761  closeHttpConnection();
2762  }
2763 
2764  CHECK_NONE(http);
2765 
2766  http = newHttp;
2767  }
2768 
2769  // Closes the HTTP connection and stops the heartbeat.
2770  //
2771  // TODO(vinod): Currently `state` variable is set separately
2772  // from this method. We need to make sure these are in sync.
2774  {
2775  CHECK_SOME(http);
2776 
2777  if (connected() && !http->close()) {
2778  LOG(WARNING) << "Failed to close HTTP pipe for " << *this;
2779  }
2780 
2781  http = None();
2782 
2783  CHECK_SOME(heartbeater);
2784 
2785  terminate(heartbeater->get());
2786  wait(heartbeater->get());
2787 
2788  heartbeater = None();
2789  }
2790 
2791  void heartbeat()
2792  {
2793  CHECK_NONE(heartbeater);
2794  CHECK_SOME(http);
2795 
2796  // TODO(vinod): Make heartbeat interval configurable and include
2797  // this information in the SUBSCRIBED response.
2798  scheduler::Event event;
2799  event.set_type(scheduler::Event::HEARTBEAT);
2800 
2801  heartbeater =
2803  "framework " + stringify(info.id()),
2804  event,
2805  http.get(),
2807 
2808  process::spawn(heartbeater->get());
2809  }
2810 
2811  bool active() const { return state == ACTIVE; }
2812  bool connected() const { return state == ACTIVE || state == INACTIVE; }
2813  bool recovered() const { return state == RECOVERED; }
2814 
2815  bool isTrackedUnderRole(const std::string& role) const;
2816  void trackUnderRole(const std::string& role);
2817  void untrackUnderRole(const std::string& role);
2818 
2819  Master* const master;
2820 
2821  FrameworkInfo info;
2822 
2823  std::set<std::string> roles;
2824 
2826 
2827  // Frameworks can either be connected via HTTP or by message passing
2828  // (scheduler driver). At most one of `http` and `pid` will be set
2829  // according to the last connection made by the framework; neither
2830  // field will be set if the framework is in state `RECOVERED`.
2833 
2835 
2839 
2840  // Tasks that have not yet been launched because they are currently
2841  // being authorized.
2843 
2844  // TODO(bmahler): Make this private to enforce that `addTask()` and
2845  // `removeTask()` are used, and provide a const view into the tasks.
2847 
2848  // Tasks launched by this framework that have reached a terminal
2849  // state and have had all their updates acknowledged. We only keep a
2850  // fixed-size cache to avoid consuming too much memory. We use
2851  // boost::circular_buffer rather than BoundedHashMap because there
2852  // can be multiple completed tasks with the same task ID.
2853  boost::circular_buffer<process::Owned<Task>> completedTasks;
2854 
2855  // When an agent is marked unreachable, tasks running on it are stored
2856  // here. We only keep a fixed-size cache to avoid consuming too much memory.
2857  // NOTE: Non-partition-aware unreachable tasks in this map are marked
2858  // TASK_LOST instead of TASK_UNREACHABLE for backward compatibility.
2860 
2861  hashset<Offer*> offers; // Active offers for framework.
2862 
2863  hashset<InverseOffer*> inverseOffers; // Active inverse offers for framework.
2864 
2865  // TODO(bmahler): Make this private to enforce that `addExecutor()`
2866  // and `removeExecutor()` are used, and provide a const view into
2867  // the executors.
2869 
2870  // Pending operations or terminal operations that have
2871  // unacknowledged status updates.
2873 
2874  // The map from the framework-specified operation ID to the
2875  // corresponding internal operation UUID.
2877 
2878  // NOTE: For the used and offered resources below, we keep the
2879  // total as well as partitioned by SlaveID.
2880  // We expose the total resources via the HTTP endpoint, and we
2881  // keep a running total of the resources because looping over the
2882  // slaves to sum the resources has led to perf issues (MESOS-1862).
2883  // We keep the resources partitioned by SlaveID because non-scalar
2884  // resources can be lost when summing them up across multiple
2885  // slaves (MESOS-2373).
2886  //
2887  // Also note that keeping the totals is safe even though it yields
2888  // incorrect results for non-scalar resources.
2889  // (1) For overlapping set items / ranges across slaves, these
2890  // will get added N times but only represented once.
2891  // (2) When an initial subtraction occurs (N-1), the resource is
2892  // no longer represented. (This is the source of the bug).
2893  // (3) When any further subtractions occur (N-(1+M)), the
2894  // Resources simply ignores the subtraction since there's
2895  // nothing to remove, so this is safe for now.
2896 
2897  // TODO(mpark): Strip the non-scalar resources out of the totals
2898  // in order to avoid reporting incorrect statistics (MESOS-2623).
2899 
2900  // Active task / executor / operation resources.
2902 
2903  // Note that we maintain multiple copies of each shared resource in
2904  // `usedResources` as they are used by multiple tasks.
2906 
2907  // Offered resources.
2910 
2911  // This is only set for HTTP frameworks.
2914 
2915 private:
2916  Framework(Master* const _master,
2917  const Flags& masterFlags,
2918  const FrameworkInfo& _info,
2919  State state,
2920  const process::Time& time)
2921  : master(_master),
2922  info(_info),
2923  roles(protobuf::framework::getRoles(_info)),
2924  capabilities(_info.capabilities()),
2925  state(state),
2926  registeredTime(time),
2927  reregisteredTime(time),
2928  completedTasks(masterFlags.max_completed_tasks_per_framework),
2929  unreachableTasks(masterFlags.max_unreachable_tasks_per_framework)
2930  {
2931  foreach (const std::string& role, roles) {
2932  // NOTE: It's possible that we're already being tracked under the role
2933  // because a framework can unsubscribe from a role while it still has
2934  // resources allocated to the role.
2935  if (!isTrackedUnderRole(role)) {
2936  trackUnderRole(role);
2937  }
2938  }
2939  }
2940 
2941  Framework(const Framework&); // No copying.
2942  Framework& operator=(const Framework&); // No assigning.
2943 };
2944 
2945 
2946 inline std::ostream& operator<<(
2947  std::ostream& stream,
2948  const Framework& framework)
2949 {
2950  // TODO(vinod): Also log the hostname once FrameworkInfo is properly
2951  // updated on framework failover (MESOS-1784).
2952  stream << framework.id() << " (" << framework.info.name() << ")";
2953 
2954  if (framework.pid.isSome()) {
2955  stream << " at " << framework.pid.get();
2956  }
2957 
2958  return stream;
2959 }
2960 
2961 
2962 // Information about an active role.
2963 struct Role
2964 {
2965  Role() = delete;
2966 
2967  Role(const std::string& _role) : role(_role) {}
2968 
2969  void addFramework(Framework* framework)
2970  {
2971  frameworks[framework->id()] = framework;
2972  }
2973 
2974  void removeFramework(Framework* framework)
2975  {
2976  frameworks.erase(framework->id());
2977  }
2978 
2980  {
2981  Resources resources;
2982 
2983  auto allocatedTo = [](const std::string& role) {
2984  return [role](const Resource& resource) {
2985  CHECK(resource.has_allocation_info());
2986  return resource.allocation_info().role() == role;
2987  };
2988  };
2989 
2990  foreachvalue (Framework* framework, frameworks) {
2991  resources += framework->totalUsedResources.filter(allocatedTo(role));
2992  resources += framework->totalOfferedResources.filter(allocatedTo(role));
2993  }
2994 
2995  return resources;
2996  }
2997 
2998  const std::string role;
2999 
3000  // NOTE: The dynamic role/quota relation is stored in and administrated
3001  // by the master. There is no direct representation of quota information
3002  // here to avoid duplication and to support that an operator can associate
3003  // quota with a role before the role is created. Such ordering of operator
3004  // requests prevents a race of premature unbounded allocation that setting
3005  // quota first is intended to contain.
3006 
3008 };
3009 
3010 } // namespace master {
3011 } // namespace internal {
3012 } // namespace mesos {
3013 
3014 #endif // __MASTER_HPP__
Protocol< RecoverRequest, RecoverResponse > recover
Definition: path.hpp:26
std::string generate(const std::string &prefix="")
Returns &#39;prefix(N)&#39; where N represents the number of instances where the same prefix (wrt...
Try< Resources > getConsumedResources(const Offer::Operation &operation)
bool send(const Message &message)
Definition: master.hpp:345
Framework(Master *const master, const Flags &masterFlags, const FrameworkInfo &info, const process::UPID &_pid, const process::Time &time=process::Clock::now())
Definition: master.hpp:2192
void recoverResources(Operation *operation)
Definition: master.hpp:2552
hashmap< ResourceProviderID, ResourceProvider > resourceProviders
Definition: master.hpp:316
Definition: nothing.hpp:16
hashmap< TaskID, Task * > tasks
Definition: master.hpp:2846
Definition: master.hpp:2963
Master *const master
Definition: master.hpp:2819
ContentType
Definition: http.hpp:43
Option< Error > validate(const std::string &imageDir)
hashmap< UUID, Operation * > operations
Definition: master.hpp:248
std::ostream & operator<<(std::ostream &stream, const Future< T > &future)
Definition: future.hpp:1831
Try< bool > update(const std::string &link, const Handle &parent, uint16_t protocol, const action::Mirror &mirror)
void finalize(bool finalize_wsa=false)
Clean up the library.
Try< Bytes > size(const std::string &path, const FollowSymlink follow=FollowSymlink::FOLLOW_SYMLINK)
Definition: stat.hpp:100
Future< Response > request(const Request &request, bool streamedResponse=false)
Asynchronously sends an HTTP request to the process and returns the HTTP response once the entire res...
Subscriber(const HttpConnection &_http, const Option< process::http::authentication::Principal > _principal)
Definition: master.hpp:1979
T & get()&
Definition: try.hpp:73
SlaveInfo info
Definition: master.hpp:188
bool connected() const
Definition: master.hpp:2812
Definition: master.hpp:27
~Framework()
Definition: master.hpp:2217
Role(const std::string &_role)
Definition: master.hpp:2967
Definition: check.hpp:33
const SlaveID id
Definition: master.hpp:187
hashset< Offer * > offers
Definition: master.hpp:251
Option< process::Timer > reregistrationTimer
Definition: master.hpp:217
bool connected
Definition: master.hpp:204
#define CHECK_NONE(expression)
Definition: check.hpp:54
bool initialize(const Option< std::string > &delegate=None(), const Option< std::string > &readwriteAuthenticationRealm=None(), const Option< std::string > &readonlyAuthenticationRealm=None())
Initialize the library.
Task * getTask(const TaskID &taskId)
Definition: master.hpp:2224
Resources totalResources
Definition: master.hpp:277
constexpr Duration DEFAULT_HEARTBEAT_INTERVAL
Definition: constants.hpp:52
Definition: protobuf_utils.hpp:261
hashmap< SlaveID, Resources > offeredResources
Definition: master.hpp:2909
protobuf::framework::Capabilities capabilities
Definition: master.hpp:2825
bool hasExecutor(const SlaveID &slaveId, const ExecutorID &executorId)
Definition: master.hpp:2429
std::set< std::string > getRoles(const FrameworkInfo &frameworkInfo)
Resources filter(const lambda::function< bool(const Resource &)> &predicate) const
v1::AgentID evolve(const SlaveID &slaveId)
Result< ProcessStatus > status(pid_t pid)
Definition: proc.hpp:166
InverseOffer * getInverseOffer(Master *master, const OfferID &offerId)
Try< Nothing > machines(const google::protobuf::RepeatedPtrField< MachineID > &ids)
Performs the following checks on a list of machines:
void addUnreachableTask(const Task &task)
Definition: master.hpp:2356
mesos::v1::scheduler::Call Call
Definition: mesos.hpp:2555
Definition: resources.hpp:79
Resources totalUsedResources
Definition: master.hpp:2901
Slave * getSlave(Master *master, const SlaveID &slaveId)
void removeExecutor(const SlaveID &slaveId, const ExecutorID &executorId)
Definition: master.hpp:2467
Option< HttpConnection > http
Definition: master.hpp:2831
void update(const FrameworkInfo &newInfo)
Definition: master.hpp:2623
Definition: flags.hpp:42
Definition: registrar.hpp:91
void addFramework(Framework *framework)
Definition: master.hpp:2969
Option< Error > reregisterSlave(const ReregisterSlaveMessage &message)
Definition: files.hpp:73
void addCompletedTask(Task &&task)
Definition: master.hpp:2346
Operation
Definition: cgroups.hpp:441
Future< Nothing > redirect(int_fd from, Option< int_fd > to, size_t chunk=4096, const std::vector< lambda::function< void(const std::string &)>> &hooks={})
Redirect output from the &#39;from&#39; file descriptor to the &#39;to&#39; file descriptor (or /dev/null if &#39;to&#39; is ...
void updateConnection(const HttpConnection &newHttp)
Definition: master.hpp:2750
UPID spawn(ProcessBase *process, bool manage=false)
Spawn a new process.
Framework(Master *const master, const Flags &masterFlags, const FrameworkInfo &info)
Definition: master.hpp:2212
Definition: duration.hpp:32
void removeOperation(Operation *operation)
Definition: master.hpp:2597
void terminate(const UPID &pid, bool inject=true)
Sends a TerminateEvent to the given process.
hashmap< UUID, Operation * > operations
Definition: master.hpp:313
void heartbeat()
Definition: master.hpp:2791
void send(const process::UPID &to, const google::protobuf::Message &message)
Definition: protobuf.hpp:118
bool isSome() const
Definition: option.hpp:115
Definition: event.hpp:214
process::Future< Nothing > closed() const
Definition: master.hpp:358
Definition: http.hpp:518
Definition: json.hpp:154
hashmap< FrameworkID, hashmap< TaskID, Task * > > tasks
Definition: master.hpp:240
void updateConnection(const process::UPID &newPid)
Definition: master.hpp:2738
void removeOffer(Offer *offer)
Definition: master.hpp:2400
multihashmap< FrameworkID, TaskID > killedTasks
Definition: master.hpp:244
mesos::v1::scheduler::Event Event
Definition: mesos.hpp:2556
ssize_t send(const os::WindowsFD &fd, const void *buf, size_t len, int flags)
Definition: socket.hpp:162
Definition: hashmap.hpp:38
Option< UUID > resourceVersion
Definition: master.hpp:290
mesos::master::Event createTaskAdded(const Task &task)
bool contains(const std::string &s, const std::string &substr)
Definition: strings.hpp:406
#define CHECK_SOME(expression)
Definition: check.hpp:50
Definition: master.hpp:333
bool active() const
Definition: master.hpp:2811
Resources checkpointedResources
Definition: master.hpp:270
SlaveObserver * observer
Definition: master.hpp:292
bool isTerminalState(const TaskState &state)
Definition: owned.hpp:26
Try< Nothing > unavailability(const Unavailability &unavailability)
An "untyped" PID, used to encapsulate the process ID for lower-layer abstractions (eg...
Definition: pid.hpp:39
BoundedHashMap< TaskID, process::Owned< Task > > unreachableTasks
Definition: master.hpp:2859
bool recovered() const
Definition: master.hpp:2813
process::Time registeredTime
Definition: master.hpp:200
bool active
Definition: master.hpp:209
Definition: http.hpp:340
An abstraction for contending to be a leading master.
Definition: contender.hpp:40
std::set< std::string > roles
Definition: master.hpp:2823
process::UPID pid
Definition: master.hpp:192
Definition: uuid.hpp:35
Definition: protobuf_utils.hpp:448
process::http::Pipe::Writer writer
Definition: master.hpp:363
Option< process::Time > reregisteredTime
Definition: master.hpp:201
Definition: spec.hpp:30
process::Time reregisteredTime
Definition: master.hpp:2837
Master *const master
Definition: master.hpp:186
void addOffer(Offer *offer)
Definition: master.hpp:2392
Timer delay(const Duration &duration, const PID< T > &pid, void(T::*method)())
Definition: delay.hpp:31
std::string encode(const T &record) const
Returns the "Record-IO" encoded record.
Definition: recordio.hpp:66
Option< Error > quotaInfo(const mesos::quota::QuotaInfo &quotaInfo)
const T & get() const &
Definition: option.hpp:118
const std::string role
Definition: master.hpp:2998
Framework(Master *const master, const Flags &masterFlags, const FrameworkInfo &info, const HttpConnection &_http, const process::Time &time=process::Clock::now())
Definition: master.hpp:2202
Definition: protobuf.hpp:100
process::Time registeredTime
Definition: master.hpp:2836
process::Future< Nothing > destroy(const std::string &hierarchy, const std::string &cgroup="/")
#define foreachvalue(VALUE, ELEMS)
Definition: foreach.hpp:77
Try< int_fd > accept(int_fd s)
Definition: network.hpp:31
void removeInverseOffer(InverseOffer *inverseOffer)
Definition: master.hpp:2421
Definition: whitelist_watcher.hpp:37
Definition: protobuf.hpp:55
void removeFramework(Framework *framework)
Definition: master.hpp:2974
MasterInfo info() const
Definition: master.hpp:579
bool wait(const UPID &pid, const Duration &duration=Seconds(-1))
Wait for the process to exit for no more than the specified seconds.
UUID resourceVersion
Definition: master.hpp:309
Definition: time.hpp:23
virtual void initialize() override
Invoked when a process gets spawned.
Definition: master.hpp:392
FrameworkInfo info
Definition: master.hpp:2821
ContentType contentType
Definition: master.hpp:364
Try< std::vector< Entry > > list(const std::string &hierarchy, const std::string &cgroup)
bool exited(const UPID &from, const UPID &to)
Simulates disconnection of the link between &#39;from&#39; and &#39;to&#39; by sending an ExitedEvent to &#39;to&#39;...
id::UUID streamId
Definition: master.hpp:365
HttpConnection http
Definition: master.hpp:2024
void send(const Message &message)
Definition: master.hpp:2328
Definition: boundedhashmap.hpp:27
Basic model of an allocator: resources are allocated to a framework in the form of offers...
Definition: allocator.hpp:55
void recoverResources(Task *task)
Definition: master.hpp:2296
#define flags
Definition: decoder.hpp:18
bool empty() const
Definition: resources.hpp:388
Definition: none.hpp:27
Definition: attributes.hpp:24
hashset< InverseOffer * > inverseOffers
Definition: master.hpp:2863
void removeTask(Task *task, bool unreachable)
Definition: master.hpp:2366
Definition: executor.hpp:47
const MachineID machineId
Definition: master.hpp:190
boost::circular_buffer< process::Owned< Task > > completedTasks
Definition: master.hpp:2853
Definition: master.hpp:117
Resources allocatedResources() const
Definition: master.hpp:2979
#define foreachkey(KEY, ELEMS)
Definition: foreach.hpp:74
HttpConnection(const process::http::Pipe::Writer &_writer, ContentType _contentType, id::UUID _streamId)
Definition: master.hpp:335
constexpr size_t MAX_REMOVED_SLAVES
Definition: constants.hpp:84
hashset< InverseOffer * > inverseOffers
Definition: master.hpp:254
bool close()
Definition: master.hpp:353
An abstraction of a Master detector which can be used to detect the leading master from a group...
Definition: detector.hpp:38
static Time now()
The current clock time for either the current process that makes this call or the global clock time i...
Resources totalResources
Definition: master.hpp:296
void addExecutor(const SlaveID &slaveId, const ExecutorInfo &executorInfo)
Definition: master.hpp:2436
Definition: event.hpp:103
hashmap< TaskID, TaskInfo > pendingTasks
Definition: master.hpp:2842
Heartbeater(const std::string &_logMessage, const Message &_heartbeatMessage, const HttpConnection &_http, const Duration &_interval, const Option< Duration > &_delay=None())
Definition: master.hpp:379
Definition: master.hpp:376
Given an encoding function for individual records, this provides encoding from typed records into "Re...
Definition: recordio.hpp:57
Option< process::Owned< Heartbeater< scheduler::Event, v1::scheduler::Event > > > heartbeater
Definition: master.hpp:2913
Option< process::UPID > pid
Definition: master.hpp:2832
Definition: metrics.hpp:38
bool isSpeculativeOperation(const Offer::Operation &operation)
State
Definition: master.hpp:2172
Offer * getOffer(Master *master, const OfferID &offerId)
Try< Nothing > create(const std::string &hierarchy, const std::string &cgroup, bool recursive=false)
Try< Nothing > kill(const std::string &hierarchy, const std::string &cgroup, int signal)
hashmap< FrameworkID, Framework * > frameworks
Definition: master.hpp:3007
Try< Nothing > bind(int_fd s, const Address &address)
Definition: network.hpp:46
process::Time unregisteredTime
Definition: master.hpp:2838
std::string serialize(ContentType contentType, const google::protobuf::Message &message)
void addOperation(Operation *operation)
Definition: master.hpp:2507
std::string version
Definition: master.hpp:195
URI http(const std::string &host, const std::string &path="/", const Option< int > &port=None(), const Option< std::string > &query=None(), const Option< std::string > &fragment=None(), const Option< std::string > &user=None(), const Option< std::string > &password=None())
Creates an http URI with the given parameters.
Definition: http.hpp:35
ResourceProviderInfo info
Definition: master.hpp:295
hashmap< FrameworkID, hashmap< ExecutorID, ExecutorInfo > > executors
Definition: master.hpp:224
hashset< Offer * > offers
Definition: master.hpp:2861
std::string stringify(int flags)
Definition: owned.hpp:36
void addTask(Task *task)
Definition: master.hpp:2233
void closeHttpConnection()
Definition: master.hpp:2773
Definition: master.hpp:2170
protobuf::slave::Capabilities capabilities
Definition: master.hpp:198
hashmap< SlaveID, hashmap< ExecutorID, ExecutorInfo > > executors
Definition: master.hpp:2868
Definition: process.hpp:501
process::Owned< Heartbeater< mesos::master::Event, v1::master::Event > > heartbeater
Definition: master.hpp:2026
bool contains(const Key &key) const
Definition: hashmap.hpp:86
hashmap< OperationID, UUID > operationUUIDs
Definition: master.hpp:2876
hashmap< UUID, Operation * > operations
Definition: master.hpp:2872
Definition: parse.hpp:33
Try< std::set< pid_t > > pids()
Definition: freebsd.hpp:62
const Option< process::http::authentication::Principal > principal
Definition: master.hpp:2027
hashmap< SlaveID, Resources > usedResources
Definition: master.hpp:2905
Resources totalOfferedResources
Definition: master.hpp:2908
Definition: master.hpp:426
PID< MetricsProcess > metrics
Resources offeredResources
Definition: master.hpp:261
hashmap< FrameworkID, hashmap< TaskID, TaskInfo > > pendingTasks
Definition: master.hpp:230
const FrameworkID id() const
Definition: master.hpp:2618
constexpr const char * name
Definition: shell.hpp:43
hashmap< FrameworkID, Resources > usedResources
Definition: master.hpp:259
Try< std::vector< Value > > time(const std::string &hierarchy, const std::string &cgroup)
Try< Nothing > schedule(const mesos::maintenance::Schedule &schedule, const hashmap< MachineID, Machine > &machines)
Performs the following checks on the new maintenance schedule:
Option< Error > registerSlave(const RegisterSlaveMessage &message)
void addInverseOffer(InverseOffer *inverseOffer)
Definition: master.hpp:2414
State state
Definition: master.hpp:2834