Apache Mesos
master.hpp
Go to the documentation of this file.
1 // Licensed to the Apache Software Foundation (ASF) under one
2 // or more contributor license agreements. See the NOTICE file
3 // distributed with this work for additional information
4 // regarding copyright ownership. The ASF licenses this file
5 // to you under the Apache License, Version 2.0 (the
6 // "License"); you may not use this file except in compliance
7 // with the License. You may obtain a copy of the License at
8 //
9 // http://www.apache.org/licenses/LICENSE-2.0
10 //
11 // Unless required by applicable law or agreed to in writing, software
12 // distributed under the License is distributed on an "AS IS" BASIS,
13 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 // See the License for the specific language governing permissions and
15 // limitations under the License.
16 
17 #ifndef __MASTER_HPP__
18 #define __MASTER_HPP__
19 
20 #include <stdint.h>
21 
22 #include <list>
23 #include <memory>
24 #include <set>
25 #include <string>
26 #include <vector>
27 
28 #include <boost/circular_buffer.hpp>
29 
30 #include <mesos/mesos.hpp>
31 #include <mesos/resources.hpp>
32 #include <mesos/type_utils.hpp>
33 
35 
39 #include <mesos/master/master.hpp>
40 
42 
43 #include <mesos/quota/quota.hpp>
44 
46 
47 #include <process/limiter.hpp>
48 #include <process/http.hpp>
49 #include <process/owned.hpp>
50 #include <process/process.hpp>
51 #include <process/protobuf.hpp>
52 #include <process/timer.hpp>
53 
55 
56 #include <stout/boundedhashmap.hpp>
57 #include <stout/cache.hpp>
58 #include <stout/foreach.hpp>
59 #include <stout/hashmap.hpp>
60 #include <stout/hashset.hpp>
61 #include <stout/linkedhashmap.hpp>
62 #include <stout/multihashmap.hpp>
63 #include <stout/nothing.hpp>
64 #include <stout/option.hpp>
65 #include <stout/recordio.hpp>
66 #include <stout/try.hpp>
67 #include <stout/uuid.hpp>
68 
69 #include "common/http.hpp"
72 
73 #include "files/files.hpp"
74 
75 #include "internal/devolve.hpp"
76 #include "internal/evolve.hpp"
77 
78 #include "master/constants.hpp"
79 #include "master/flags.hpp"
80 #include "master/machine.hpp"
81 #include "master/metrics.hpp"
82 #include "master/validation.hpp"
83 
84 #include "messages/messages.hpp"
85 
86 namespace process {
87 class RateLimiter; // Forward declaration.
88 }
89 
90 namespace mesos {
91 
92 // Forward declarations.
93 class Authorizer;
94 class ObjectApprover;
95 
96 namespace internal {
97 
98 // Forward declarations.
99 namespace registry {
100 class Slaves;
101 }
102 
103 class Registry;
104 class WhitelistWatcher;
105 
106 namespace master {
107 
108 class Master;
109 class Registrar;
110 class SlaveObserver;
111 
112 struct BoundedRateLimiter;
113 struct Framework;
114 struct Role;
115 
116 
117 struct Slave
118 {
119 Slave(Master* const _master,
120  SlaveInfo _info,
121  const process::UPID& _pid,
122  const MachineID& _machineId,
123  const std::string& _version,
124  std::vector<SlaveInfo::Capability> _capabilites,
125  const process::Time& _registeredTime,
126  std::vector<Resource> _checkpointedResources,
127  const Option<id::UUID>& resourceVersion,
128  std::vector<ExecutorInfo> executorInfos = std::vector<ExecutorInfo>(),
129  std::vector<Task> tasks = std::vector<Task>());
130 
131  ~Slave();
132 
133  Task* getTask(
134  const FrameworkID& frameworkId,
135  const TaskID& taskId) const;
136 
137  void addTask(Task* task);
138 
139  // Update slave to recover the resources that were previously
140  // being used by `task`.
141  //
142  // TODO(bmahler): This is a hack for performance. We need to
143  // maintain resource counters because computing task resources
144  // functionally for all tasks is expensive, for now.
145  void recoverResources(Task* task);
146 
147  void removeTask(Task* task);
148 
149  void addOperation(Operation* operation);
150 
151  void recoverResources(Operation* operation);
152 
153  void removeOperation(Operation* operation);
154 
155  Operation* getOperation(const id::UUID& uuid) const;
156 
157  void addOffer(Offer* offer);
158 
159  void removeOffer(Offer* offer);
160 
161  void addInverseOffer(InverseOffer* inverseOffer);
162 
163  void removeInverseOffer(InverseOffer* inverseOffer);
164 
165  bool hasExecutor(
166  const FrameworkID& frameworkId,
167  const ExecutorID& executorId) const;
168 
169  void addExecutor(
170  const FrameworkID& frameworkId,
171  const ExecutorInfo& executorInfo);
172 
173  void removeExecutor(
174  const FrameworkID& frameworkId,
175  const ExecutorID& executorId);
176 
177  void apply(const std::vector<ResourceConversion>& conversions);
178 
180  const SlaveInfo& info,
181  const std::string& _version,
182  const std::vector<SlaveInfo::Capability>& _capabilites,
183  const Resources& _checkpointedResources,
184  const Option<id::UUID>& resourceVersion);
185 
186  Master* const master;
187  const SlaveID id;
188  SlaveInfo info;
189 
190  const MachineID machineId;
191 
193 
194  // TODO(bmahler): Use stout's Version when it can parse labels, etc.
195  std::string version;
196 
197  // Agent capabilities.
199 
202 
203  // Slave becomes disconnected when the socket closes.
204  bool connected;
205 
206  // Slave becomes deactivated when it gets disconnected. In the
207  // future this might also happen via HTTP endpoint.
208  // No offers will be made for a deactivated slave.
209  bool active;
210 
211  // Timer for marking slaves unreachable that become disconnected and
212  // don't re-register. This timeout is larger than the slave
213  // observer's timeout, so typically the slave observer will be the
214  // one to mark such slaves unreachable; this timer is a backup for
215  // when a slave responds to pings but does not re-register (e.g.,
216  // because agent recovery has hung).
218 
219  // Executors running on this slave.
220  //
221  // TODO(bmahler): Make this private to enforce that `addExecutor()`
222  // and `removeExecutor()` are used, and provide a const view into
223  // the executors.
225 
226  // Tasks that have not yet been launched because they are currently
227  // being authorized. This is similar to Framework's pendingTasks but we
228  // track pendingTasks per agent separately to determine if any offer
229  // operation for this agent would change resources requested by these tasks.
231 
232  // Tasks present on this slave.
233  //
234  // TODO(bmahler): Make this private to enforce that `addTask()` and
235  // `removeTask()` are used, and provide a const view into the tasks.
236  //
237  // TODO(bmahler): The task pointer ownership complexity arises from the fact
238  // that we own the pointer here, but it's shared with the Framework struct.
239  // We should find a way to eliminate this.
241 
242  // Tasks that were asked to kill by frameworks.
243  // This is used for reconciliation when the slave re-registers.
245 
246  // Pending operations or terminal operations that have
247  // unacknowledged status updates on this agent.
249 
250  // Active offers on this slave.
252 
253  // Active inverse offers on this slave.
255 
256  // Resources for active task / executors / operations.
257  // Note that we maintain multiple copies of each shared resource in
258  // `usedResources` as they are used by multiple tasks.
260 
262 
263  // Resources that should be checkpointed by the slave (e.g.,
264  // persistent volumes, dynamic reservations, etc). These are either
265  // in use by a task/executor, or are available for use and will be
266  // re-offered to the framework.
267  // TODO(jieyu): `checkpointedResources` is only for agent default
268  // resources. Resources from resource providers are not included in
269  // this field. Consider removing this field.
271 
272  // The current total resources of the slave. Note that this is
273  // different from 'info.resources()' because this also considers
274  // operations (e.g., CREATE, RESERVE) that have been applied and
275  // includes revocable resources and resources from resource
276  // providers as well.
278 
279  SlaveObserver* observer;
280 
283 
284 private:
285  Slave(const Slave&); // No copying.
286  Slave& operator=(const Slave&); // No assigning.
287 };
288 
289 
290 inline std::ostream& operator<<(std::ostream& stream, const Slave& slave)
291 {
292  return stream << slave.id << " at " << slave.pid
293  << " (" << slave.info.hostname() << ")";
294 }
295 
296 
297 // Represents the streaming HTTP connection to a framework or a client
298 // subscribed to the '/api/vX' endpoint.
300 {
302  ContentType _contentType,
303  id::UUID _streamId)
304  : writer(_writer),
305  contentType(_contentType),
306  streamId(_streamId) {}
307 
308  // We need to evolve the internal old style message/unversioned event into a
309  // versioned event e.g., `v1::scheduler::Event` or `v1::master::Event`.
310  template <typename Message, typename Event = v1::scheduler::Event>
311  bool send(const Message& message)
312  {
314  serialize, contentType, lambda::_1));
315 
316  return writer.write(encoder.encode(evolve(message)));
317  }
318 
319  bool close()
320  {
321  return writer.close();
322  }
323 
325  {
326  return writer.readerClosed();
327  }
328 
332 };
333 
334 
335 // This process periodically sends heartbeats to a given HTTP connection.
336 // The `Message` template parameter is the type of the heartbeat event passed
337 // into the heartbeater during construction, while the `Event` template
338 // parameter is the versioned event type which is sent to the client.
339 // The optional delay parameter is used to specify the delay period before it
340 // sends the first heartbeat.
341 template <typename Message, typename Event>
342 class Heartbeater : public process::Process<Heartbeater<Message, Event>>
343 {
344 public:
345  Heartbeater(const std::string& _logMessage,
346  const Message& _heartbeatMessage,
347  const HttpConnection& _http,
348  const Duration& _interval,
349  const Option<Duration>& _delay = None())
350  : process::ProcessBase(process::ID::generate("heartbeater")),
351  logMessage(_logMessage),
352  heartbeatMessage(_heartbeatMessage),
353  http(_http),
354  interval(_interval),
355  delay(_delay) {}
356 
357 protected:
358  virtual void initialize() override
359  {
360  if (delay.isSome()) {
362  delay.get(),
363  this,
365  } else {
366  heartbeat();
367  }
368  }
369 
370 private:
371  void heartbeat()
372  {
373  // Only send a heartbeat if the connection is not closed.
374  if (http.closed().isPending()) {
375  VLOG(2) << "Sending heartbeat to " << logMessage;
376 
377  Message message(heartbeatMessage);
378  http.send<Message, Event>(message);
379  }
380 
381  process::delay(interval, this, &Heartbeater<Message, Event>::heartbeat);
382  }
383 
384  const std::string logMessage;
385  const Message heartbeatMessage;
386  HttpConnection http;
387  const Duration interval;
388  const Option<Duration> delay;
389 };
390 
391 
392 class Master : public ProtobufProcess<Master>
393 {
394 public:
396  Registrar* registrar,
397  Files* files,
400  const Option<Authorizer*>& authorizer,
401  const Option<std::shared_ptr<process::RateLimiter>>&
402  slaveRemovalLimiter,
403  const Flags& flags = Flags());
404 
405  virtual ~Master();
406 
407  // Message handlers.
408  void submitScheduler(
409  const std::string& name);
410 
411  void registerFramework(
412  const process::UPID& from,
413  const FrameworkInfo& frameworkInfo);
414 
415  void reregisterFramework(
416  const process::UPID& from,
417  const FrameworkInfo& frameworkInfo,
418  bool failover);
419 
420  void unregisterFramework(
421  const process::UPID& from,
422  const FrameworkID& frameworkId);
423 
424  void deactivateFramework(
425  const process::UPID& from,
426  const FrameworkID& frameworkId);
427 
428  // TODO(vinod): Remove this once the old driver is removed.
429  void resourceRequest(
430  const process::UPID& from,
431  const FrameworkID& frameworkId,
432  const std::vector<Request>& requests);
433 
434  void launchTasks(
435  const process::UPID& from,
436  const FrameworkID& frameworkId,
437  const std::vector<TaskInfo>& tasks,
438  const Filters& filters,
439  const std::vector<OfferID>& offerIds);
440 
441  void reviveOffers(
442  const process::UPID& from,
443  const FrameworkID& frameworkId,
444  const std::vector<std::string>& role);
445 
446  void killTask(
447  const process::UPID& from,
448  const FrameworkID& frameworkId,
449  const TaskID& taskId);
450 
452  const process::UPID& from,
453  const SlaveID& slaveId,
454  const FrameworkID& frameworkId,
455  const TaskID& taskId,
456  const std::string& uuid);
457 
458  void schedulerMessage(
459  const process::UPID& from,
460  const SlaveID& slaveId,
461  const FrameworkID& frameworkId,
462  const ExecutorID& executorId,
463  const std::string& data);
464 
465  void executorMessage(
466  const process::UPID& from,
467  const SlaveID& slaveId,
468  const FrameworkID& frameworkId,
469  const ExecutorID& executorId,
470  const std::string& data);
471 
472  void registerSlave(
473  const process::UPID& from,
474  RegisterSlaveMessage&& registerSlaveMessage);
475 
476  void reregisterSlave(
477  const process::UPID& from,
478  ReregisterSlaveMessage&& incomingMessage);
479 
480  void unregisterSlave(
481  const process::UPID& from,
482  const SlaveID& slaveId);
483 
484  void statusUpdate(
485  StatusUpdate update,
486  const process::UPID& pid);
487 
488  void reconcileTasks(
489  const process::UPID& from,
490  const FrameworkID& frameworkId,
491  const std::vector<TaskStatus>& statuses);
492 
494  const UpdateOperationStatusMessage& update);
495 
496  void exitedExecutor(
497  const process::UPID& from,
498  const SlaveID& slaveId,
499  const FrameworkID& frameworkId,
500  const ExecutorID& executorId,
501  int32_t status);
502 
503  void updateSlave(UpdateSlaveMessage&& message);
504 
506  const MachineID& machineId,
508 
509  // Marks the agent unreachable and returns whether the agent was
510  // marked unreachable. Returns false if the agent is already
511  // in a transitioning state or has transitioned into another
512  // state (this includes already being marked unreachable).
513  // The `duringMasterFailover` parameter specifies whether this
514  // agent is transitioning from a recovered state (true) or a
515  // registered state (false).
516  //
517  // Discarding currently not supported.
518  //
519  // Will not return a failure (this will crash the master
520  // internally in the case of a registry failure).
522  const SlaveInfo& slave,
523  bool duringMasterFailover,
524  const std::string& message);
525 
526  void markGone(Slave* slave, const TimeInfo& goneTime);
527 
528  void authenticate(
529  const process::UPID& from,
530  const process::UPID& pid);
531 
532  // TODO(bmahler): It would be preferred to use a unique libprocess
533  // Process identifier (PID is not sufficient) for identifying the
534  // framework instance, rather than relying on re-registration time.
536  const FrameworkID& frameworkId,
537  const process::Time& reregisteredTime);
538 
539  void offer(
540  const FrameworkID& frameworkId,
541  const hashmap<std::string, hashmap<SlaveID, Resources>>& resources);
542 
543  void inverseOffer(
544  const FrameworkID& frameworkId,
545  const hashmap<SlaveID, UnavailableResources>& resources);
546 
547  // Invoked when there is a newly elected leading master.
548  // Made public for testing purposes.
549  void detected(const process::Future<Option<MasterInfo>>& _leader);
550 
551  // Invoked when the contender has lost the candidacy.
552  // Made public for testing purposes.
554 
555  // Continuation of recover().
556  // Made public for testing purposes.
557  process::Future<Nothing> _recover(const Registry& registry);
558 
559  MasterInfo info() const
560  {
561  return info_;
562  }
563 
564 protected:
565  void initialize() override;
566  void finalize() override;
567 
568  void consume(process::MessageEvent&& event) override;
569  void consume(process::ExitedEvent&& event) override;
570 
571  void exited(const process::UPID& pid) override;
572  void exited(const FrameworkID& frameworkId, const HttpConnection& http);
573  void _exited(Framework* framework);
574 
575  // Invoked upon noticing a subscriber disconnection.
576  void exited(const id::UUID& id);
577 
578  void agentReregisterTimeout(const SlaveID& slaveId);
579  Nothing _agentReregisterTimeout(const SlaveID& slaveId);
580 
581  // Invoked when the message is ready to be executed after
582  // being throttled.
583  // 'principal' being None indicates it is throttled by
584  // 'defaultLimiter'.
585  void throttled(
586  process::MessageEvent&& event,
587  const Option<std::string>& principal);
588 
589  // Continuations of consume().
590  void _consume(process::MessageEvent&& event);
591  void _consume(process::ExitedEvent&& event);
592 
593  // Helper method invoked when the capacity for a framework
594  // principal is exceeded.
595  void exceededCapacity(
596  const process::MessageEvent& event,
597  const Option<std::string>& principal,
598  uint64_t capacity);
599 
600  // Recovers state from the registrar.
602  void recoveredSlavesTimeout(const Registry& registry);
603 
604  void _registerSlave(
605  const process::UPID& pid,
606  RegisterSlaveMessage&& registerSlaveMessage,
608  const process::Future<bool>& authorized);
609 
610  void __registerSlave(
611  const process::UPID& pid,
612  RegisterSlaveMessage&& registerSlaveMessage,
613  const process::Future<bool>& admit);
614 
615  void _reregisterSlave(
616  const process::UPID& pid,
617  ReregisterSlaveMessage&& incomingMessage,
619  const process::Future<bool>& authorized);
620 
621  void __reregisterSlave(
622  const process::UPID& pid,
623  ReregisterSlaveMessage&& incomingMessage,
624  const process::Future<bool>& readmit);
625 
626  void ___reregisterSlave(
627  const process::UPID& pid,
628  ReregisterSlaveMessage&& incomingMessage,
629  const process::Future<bool>& updated);
630 
632  Slave* slave,
633  const std::vector<FrameworkInfo>& frameworks);
634 
635  // 'future' is the future returned by the authenticator.
636  void _authenticate(
637  const process::UPID& pid,
638  const process::Future<Option<std::string>>& future);
639 
641 
642  void fileAttached(const process::Future<Nothing>& result,
643  const std::string& path);
644 
645  // Invoked when the contender has entered the contest.
646  void contended(const process::Future<process::Future<Nothing>>& candidacy);
647 
648  // Task reconciliation, split from the message handler
649  // to allow re-use.
650  void _reconcileTasks(
651  Framework* framework,
652  const std::vector<TaskStatus>& statuses);
653 
654  // When a slave that was previously registered with this master
655  // re-registers, we need to reconcile the master's view of the
656  // slave's tasks and executors. This function also sends the
657  // `SlaveReregisteredMessage`.
658  void reconcileKnownSlave(
659  Slave* slave,
660  const std::vector<ExecutorInfo>& executors,
661  const std::vector<Task>& tasks);
662 
663  // Add a framework.
664  void addFramework(
665  Framework* framework,
666  const std::set<std::string>& suppressedRoles);
667 
668  // Recover a framework from its `FrameworkInfo`. This happens after
669  // master failover, when an agent running one of the framework's
670  // tasks re-registers or when the framework itself re-registers,
671  // whichever happens first. The result of this function is a
672  // registered, inactive framework with state `RECOVERED`.
673  void recoverFramework(
674  const FrameworkInfo& info,
675  const std::set<std::string>& suppressedRoles);
676 
677  // Transition a framework from `RECOVERED` to `CONNECTED` state and
678  // activate it. This happens at most once after master failover, the
679  // first time that the framework re-registers with the new master.
680  // Exactly one of `newPid` or `http` must be provided.
682  Framework* framework,
683  const FrameworkInfo& frameworkInfo,
684  const Option<process::UPID>& pid,
685  const Option<HttpConnection>& http,
686  const std::set<std::string>& suppressedRoles);
687 
688  // Replace the scheduler for a framework with a new process ID, in
689  // the event of a scheduler failover.
690  void failoverFramework(Framework* framework, const process::UPID& newPid);
691 
692  // Replace the scheduler for a framework with a new HTTP connection,
693  // in the event of a scheduler failover.
694  void failoverFramework(Framework* framework, const HttpConnection& http);
695 
696  void _failoverFramework(Framework* framework);
697 
698  // Kill all of a framework's tasks, delete the framework object, and
699  // reschedule offers that were assigned to this framework.
700  void removeFramework(Framework* framework);
701 
702  // Remove a framework from the slave, i.e., remove its tasks and
703  // executors and recover the resources.
704  void removeFramework(Slave* slave, Framework* framework);
705 
706  void updateFramework(
707  Framework* framework,
708  const FrameworkInfo& frameworkInfo,
709  const std::set<std::string>& suppressedRoles);
710 
711  void disconnect(Framework* framework);
712  void deactivate(Framework* framework, bool rescind);
713 
714  void disconnect(Slave* slave);
715  void deactivate(Slave* slave);
716 
717  // Add a slave.
718  void addSlave(
719  Slave* slave,
720  std::vector<Archive::Framework>&& completedFrameworks);
721 
722  void _markUnreachable(
723  const SlaveInfo& slave,
724  const TimeInfo& unreachableTime,
725  bool duringMasterFailover,
726  const std::string& message,
727  bool registrarResult);
728 
729  void sendSlaveLost(const SlaveInfo& slaveInfo);
730 
731  // Remove the slave from the registrar and from the master's state.
732  //
733  // TODO(bmahler): 'reason' is optional until MESOS-2317 is resolved.
734  void removeSlave(
735  Slave* slave,
736  const std::string& message,
738 
739  void _removeSlave(
740  Slave* slave,
741  const process::Future<bool>& registrarResult,
742  const std::string& removalCause,
744 
745  void __removeSlave(
746  Slave* slave,
747  const std::string& message,
748  const Option<TimeInfo>& unreachableTime);
749 
750  // Validates that the framework is authenticated, if required.
752  const FrameworkInfo& frameworkInfo,
753  const process::UPID& from);
754 
755  // Returns whether the framework is authorized.
756  // Returns failure for transient authorization failures.
758  const FrameworkInfo& frameworkInfo);
759 
760  // Returns whether the principal is authorized to (re-)register an agent
761  // and whether the `SlaveInfo` is authorized.
763  const SlaveInfo& slaveInfo,
765 
766  // Returns whether the task is authorized.
767  // Returns failure for transient authorization failures.
769  const TaskInfo& task,
770  Framework* framework);
771 
790  const Offer::Operation::Reserve& reserve,
792 
793  // Authorizes whether the provided `principal` is allowed to reserve
794  // the specified `resources`.
796  const Resources& resources,
798 
817  const Offer::Operation::Unreserve& unreserve,
819 
838  const Offer::Operation::Create& create,
840 
859  const Offer::Operation::Destroy& destroy,
861 
862  // Add the task and its executor (if not already running) to the
863  // framework and slave. Returns the resources consumed as a result,
864  // which includes resources for the task and its executor
865  // (if not already running).
866  Resources addTask(const TaskInfo& task, Framework* framework, Slave* slave);
867 
868  // Transitions the task, and recovers resources if the task becomes
869  // terminal.
870  void updateTask(Task* task, const StatusUpdate& update);
871 
872  // Removes the task. `unreachable` indicates whether the task is removed due
873  // to being unreachable. Note that we cannot rely on the task state because
874  // it may not reflect unreachability due to being set to TASK_LOST for
875  // backwards compatibility.
876  void removeTask(Task* task, bool unreachable = false);
877 
878  // Remove an executor and recover its resources.
879  void removeExecutor(
880  Slave* slave,
881  const FrameworkID& frameworkId,
882  const ExecutorID& executorId);
883 
884  // Adds the given operation to the framework and the agent.
885  void addOperation(
886  Framework* framework,
887  Slave* slave,
888  Operation* operation);
889 
890  // Transitions the operation, and updates and recovers resources if
891  // the operation becomes terminal. If `convertResources` is `false`
892  // only the consumed resources of terminal operations are recovered,
893  // but no resources are converted.
894  void updateOperation(
895  Operation* operation,
896  const UpdateOperationStatusMessage& update,
897  bool convertResources = true);
898 
899  // Remove the operation.
900  void removeOperation(Operation* operation);
901 
902  // Attempts to update the allocator by applying the given operation.
903  // If successful, updates the slave's resources, sends a
904  // 'CheckpointResourcesMessage' to the slave with the updated
905  // checkpointed resources, and returns a 'Future' with 'Nothing'.
906  // Otherwise, no action is taken and returns a failed 'Future'.
908  Slave* slave,
909  const Offer::Operation& operation);
910 
911  // Forwards the update to the framework.
912  void forward(
913  const StatusUpdate& update,
914  const process::UPID& acknowledgee,
915  Framework* framework);
916 
917  // Remove an offer after specified timeout
918  void offerTimeout(const OfferID& offerId);
919 
920  // Remove an offer and optionally rescind the offer as well.
921  void removeOffer(Offer* offer, bool rescind = false);
922 
923  // Remove an inverse offer after specified timeout
924  void inverseOfferTimeout(const OfferID& inverseOfferId);
925 
926  // Remove an inverse offer and optionally rescind it as well.
927  void removeInverseOffer(InverseOffer* inverseOffer, bool rescind = false);
928 
929  bool isCompletedFramework(const FrameworkID& frameworkId);
930 
931  Framework* getFramework(const FrameworkID& frameworkId) const;
932  Offer* getOffer(const OfferID& offerId) const;
933  InverseOffer* getInverseOffer(const OfferID& inverseOfferId) const;
934 
935  FrameworkID newFrameworkId();
936  OfferID newOfferId();
937  SlaveID newSlaveId();
938 
939 private:
940  // Updates the agent's resources by applying the given operation.
941  // Sends either `ApplyOperationMessage` or
942  // `CheckpointResourcesMessage` (with updated checkpointed
943  // resources) to the agent depending on if the agent has
944  // `RESOURCE_PROVIDER` capability.
945  void _apply(
946  Slave* slave,
947  Framework* framework,
948  const Offer::Operation& operationInfo);
949 
950  void drop(
951  const process::UPID& from,
952  const scheduler::Call& call,
953  const std::string& message);
954 
955  void drop(
956  Framework* framework,
957  const Offer::Operation& operation,
958  const std::string& message);
959 
960  void drop(
961  Framework* framework,
962  const scheduler::Call& call,
963  const std::string& message);
964 
965  void drop(
966  Framework* framework,
967  const scheduler::Call::Suppress& suppress,
968  const std::string& message);
969 
970  void drop(
971  Framework* framework,
972  const scheduler::Call::Revive& revive,
973  const std::string& message);
974 
975  // Call handlers.
976  void receive(
977  const process::UPID& from,
978  const scheduler::Call& call);
979 
980  void subscribe(
981  HttpConnection http,
982  const scheduler::Call::Subscribe& subscribe);
983 
984  void _subscribe(
985  HttpConnection http,
986  const FrameworkInfo& frameworkInfo,
987  bool force,
988  const std::set<std::string>& suppressedRoles,
989  const process::Future<bool>& authorized);
990 
991  void subscribe(
992  const process::UPID& from,
993  const scheduler::Call::Subscribe& subscribe);
994 
995  void _subscribe(
996  const process::UPID& from,
997  const FrameworkInfo& frameworkInfo,
998  bool force,
999  const std::set<std::string>& suppressedRoles,
1000  const process::Future<bool>& authorized);
1001 
1002  // Subscribes a client to the 'api/vX' endpoint.
1003  void subscribe(
1004  const HttpConnection& http,
1006 
1007  void teardown(Framework* framework);
1008 
1009  void accept(
1010  Framework* framework,
1011  scheduler::Call::Accept accept);
1012 
1013  void _accept(
1014  const FrameworkID& frameworkId,
1015  const SlaveID& slaveId,
1016  const Resources& offeredResources,
1017  const scheduler::Call::Accept& accept,
1018  const process::Future<std::list<process::Future<bool>>>& authorizations);
1019 
1020  void acceptInverseOffers(
1021  Framework* framework,
1022  const scheduler::Call::AcceptInverseOffers& accept);
1023 
1024  void decline(
1025  Framework* framework,
1026  const scheduler::Call::Decline& decline);
1027 
1028  void declineInverseOffers(
1029  Framework* framework,
1030  const scheduler::Call::DeclineInverseOffers& decline);
1031 
1032  void revive(
1033  Framework* framework,
1034  const scheduler::Call::Revive& revive);
1035 
1036  void kill(
1037  Framework* framework,
1038  const scheduler::Call::Kill& kill);
1039 
1040  void shutdown(
1041  Framework* framework,
1042  const scheduler::Call::Shutdown& shutdown);
1043 
1044  void acknowledge(
1045  Framework* framework,
1046  const scheduler::Call::Acknowledge& acknowledge);
1047 
1048  void acknowledgeOperationStatus(
1049  Framework* framework,
1050  const scheduler::Call::AcknowledgeOperationStatus& acknowledge);
1051 
1052  void reconcile(
1053  Framework* framework,
1054  const scheduler::Call::Reconcile& reconcile);
1055 
1056  void reconcileOperations(
1057  Framework* framework,
1058  const scheduler::Call::ReconcileOperations& reconcile);
1059 
1060  void message(
1061  Framework* framework,
1062  const scheduler::Call::Message& message);
1063 
1064  void request(
1065  Framework* framework,
1066  const scheduler::Call::Request& request);
1067 
1068  void suppress(
1069  Framework* framework,
1070  const scheduler::Call::Suppress& suppress);
1071 
1072  bool elected() const
1073  {
1074  return leader.isSome() && leader.get() == info_;
1075  }
1076 
1077  void scheduleRegistryGc();
1078 
1079  void doRegistryGc();
1080 
1081  void _doRegistryGc(
1082  const hashset<SlaveID>& toRemoveUnreachable,
1083  const hashset<SlaveID>& toRemoveGone,
1084  const process::Future<bool>& registrarResult);
1085 
1086  process::Future<bool> authorizeLogAccess(
1088 
1096  bool isWhitelistedRole(const std::string& name) const;
1097 
1105  static bool isRemovable(const TaskState& state)
1106  {
1107  if (state == TASK_UNREACHABLE) {
1108  return true;
1109  }
1110 
1111  return protobuf::isTerminalState(state);
1112  }
1113 
1121  class QuotaHandler
1122  {
1123  public:
1124  explicit QuotaHandler(Master* _master) : master(_master)
1125  {
1126  CHECK_NOTNULL(master);
1127  }
1128 
1129  // Returns a list of set quotas.
1131  const mesos::master::Call& call,
1133  ContentType contentType) const;
1134 
1136  const process::http::Request& request,
1138  principal) const;
1139 
1141  const mesos::master::Call& call,
1143  principal) const;
1144 
1146  const process::http::Request& request,
1148  principal) const;
1149 
1151  const mesos::master::Call& call,
1153  principal) const;
1154 
1158  principal) const;
1159 
1160  private:
1161  // Heuristically tries to determine whether a quota request could
1162  // reasonably be satisfied given the current cluster capacity. The
1163  // goal is to determine whether a user may accidentally request an
1164  // amount of resources that would prevent frameworks without quota
1165  // from getting any offers. A force flag will allow users to bypass
1166  // this check.
1167  //
1168  // The heuristic tests whether the total quota, including the new
1169  // request, does not exceed the sum of non-static cluster resources,
1170  // i.e. the following inequality holds:
1171  // total - statically reserved >= total quota + quota request
1172  //
1173  // Please be advised that:
1174  // * It is up to an allocator how to satisfy quota (for example,
1175  // what resources to account towards quota, as well as which
1176  // resources to consider allocatable for quota).
1177  // * Even if there are enough resources at the moment of this check,
1178  // agents may terminate at any time, rendering the cluster under
1179  // quota.
1180  Option<Error> capacityHeuristic(
1181  const mesos::quota::QuotaInfo& request) const;
1182 
1183  // We always want to rescind offers after the capacity heuristic. The
1184  // reason for this is the race between the allocator and the master:
1185  // it can happen that there are not enough free resources at the
1186  // allocator's disposal when it is notified about the quota request,
1187  // but at this point it's too late to rescind.
1188  //
1189  // While rescinding, we adhere to the following rules:
1190  // * Rescind at least as many resources as there are in the quota request.
1191  // * Rescind all offers from an agent in order to make the potential
1192  // offer bigger, which increases the chances that a quota'ed framework
1193  // will be able to use the offer.
1194  // * Rescind offers from at least `numF` agents to make it possible
1195  // (but not guaranteed, due to fair sharing) that each framework in
1196  // the role for which quota is set gets an offer (`numF` is the
1197  // number of frameworks in the quota'ed role). Though this is not
1198  // strictly necessary, we think this will increase the debugability
1199  // and will improve user experience.
1200  //
1201  // TODO(alexr): Consider removing this function once offer management
1202  // (including rescinding) is moved to allocator.
1203  void rescindOffers(const mesos::quota::QuotaInfo& request) const;
1204 
1205  process::Future<bool> authorizeGetQuota(
1207  const mesos::quota::QuotaInfo& quotaInfo) const;
1208 
1209  process::Future<bool> authorizeUpdateQuota(
1211  const mesos::quota::QuotaInfo& quotaInfo) const;
1212 
1215  principal) const;
1216 
1218  const mesos::quota::QuotaRequest& quotaRequest,
1220  principal) const;
1221 
1223  const mesos::quota::QuotaInfo& quotaInfo,
1224  bool forced) const;
1225 
1227  const std::string& role,
1229  principal) const;
1230 
1232  const std::string& role) const;
1233 
1234  // To perform actions related to quota management, we require access to the
1235  // master data structures. No synchronization primitives are needed here
1236  // since `QuotaHandler`'s functions are invoked in the Master's actor.
1237  Master* master;
1238  };
1239 
1247  class WeightsHandler
1248  {
1249  public:
1250  explicit WeightsHandler(Master* _master) : master(_master)
1251  {
1252  CHECK_NOTNULL(master);
1253  }
1254 
1258  principal) const;
1259 
1261  const mesos::master::Call& call,
1263  ContentType contentType) const;
1264 
1266  const process::http::Request& request,
1268  principal) const;
1269 
1271  const mesos::master::Call& call,
1273  ContentType contentType) const;
1274 
1275  private:
1276  process::Future<bool> authorizeGetWeight(
1278  const WeightInfo& weight) const;
1279 
1280  process::Future<bool> authorizeUpdateWeights(
1282  const std::vector<std::string>& roles) const;
1283 
1285  const std::vector<WeightInfo>& weightInfos,
1286  const std::list<bool>& roleAuthorizations) const;
1287 
1290  principal) const;
1291 
1294  const google::protobuf::RepeatedPtrField<WeightInfo>& weightInfos)
1295  const;
1296 
1298  const std::vector<WeightInfo>& weightInfos) const;
1299 
1300  // Rescind all outstanding offers if any of the 'weightInfos' roles has
1301  // an active framework.
1302  void rescindOffers(const std::vector<WeightInfo>& weightInfos) const;
1303 
1304  Master* master;
1305  };
1306 
1307  // Inner class used to namespace HTTP route handlers (see
1308  // master/http.cpp for implementations).
1309  class Http
1310  {
1311  public:
1312  explicit Http(Master* _master) : master(_master),
1313  quotaHandler(_master),
1314  weightsHandler(_master) {}
1315 
1316  // /api/v1
1318  const process::http::Request& request,
1320  principal) const;
1321 
1322  // /api/v1/scheduler
1324  const process::http::Request& request,
1326  principal) const;
1327 
1328  // /master/create-volumes
1330  const process::http::Request& request,
1332  principal) const;
1333 
1334  // /master/destroy-volumes
1336  const process::http::Request& request,
1338  principal) const;
1339 
1340  // /master/flags
1342  const process::http::Request& request,
1344  principal) const;
1345 
1346  // /master/frameworks
1348  const process::http::Request& request,
1350  principal) const;
1351 
1352  // /master/health
1354  const process::http::Request& request) const;
1355 
1356  // /master/redirect
1358  const process::http::Request& request) const;
1359 
1360  // /master/reserve
1362  const process::http::Request& request,
1364  principal) const;
1365 
1366  // /master/roles
1368  const process::http::Request& request,
1370  principal) const;
1371 
1372  // /master/teardown
1374  const process::http::Request& request,
1376  principal) const;
1377 
1378  // /master/slaves
1380  const process::http::Request& request,
1382  principal) const;
1383 
1384  // /master/state
1386  const process::http::Request& request,
1388  principal) const;
1389 
1390  // /master/state-summary
1392  const process::http::Request& request,
1394  principal) const;
1395 
1396  // /master/tasks
1398  const process::http::Request& request,
1400  principal) const;
1401 
1402  // /master/maintenance/schedule
1403  process::Future<process::http::Response> maintenanceSchedule(
1404  const process::http::Request& request,
1406  principal) const;
1407 
1408  // /master/maintenance/status
1409  process::Future<process::http::Response> maintenanceStatus(
1410  const process::http::Request& request,
1412  principal) const;
1413 
1414  // /master/machine/down
1416  const process::http::Request& request,
1418  principal) const;
1419 
1420  // /master/machine/up
1422  const process::http::Request& request,
1424  principal) const;
1425 
1426  // /master/unreserve
1428  const process::http::Request& request,
1430  principal) const;
1431 
1432  // /master/quota
1434  const process::http::Request& request,
1436  principal) const;
1437 
1438  // /master/weights
1440  const process::http::Request& request,
1442  principal) const;
1443 
1444  static std::string API_HELP();
1445  static std::string SCHEDULER_HELP();
1446  static std::string FLAGS_HELP();
1447  static std::string FRAMEWORKS_HELP();
1448  static std::string HEALTH_HELP();
1449  static std::string REDIRECT_HELP();
1450  static std::string ROLES_HELP();
1451  static std::string TEARDOWN_HELP();
1452  static std::string SLAVES_HELP();
1453  static std::string STATE_HELP();
1454  static std::string STATESUMMARY_HELP();
1455  static std::string TASKS_HELP();
1456  static std::string MAINTENANCE_SCHEDULE_HELP();
1457  static std::string MAINTENANCE_STATUS_HELP();
1458  static std::string MACHINE_DOWN_HELP();
1459  static std::string MACHINE_UP_HELP();
1460  static std::string CREATE_VOLUMES_HELP();
1461  static std::string DESTROY_VOLUMES_HELP();
1462  static std::string RESERVE_HELP();
1463  static std::string UNRESERVE_HELP();
1464  static std::string QUOTA_HELP();
1465  static std::string WEIGHTS_HELP();
1466 
1467  private:
1468  JSON::Object __flags() const;
1469 
1470  class FlagsError; // Forward declaration.
1471 
1474  principal) const;
1475 
1477  const size_t limit,
1478  const size_t offset,
1479  const std::string& order,
1481  principal) const;
1482 
1484  const FrameworkID& id,
1486  principal) const;
1487 
1489  const FrameworkID& id) const;
1490 
1491  process::Future<process::http::Response> _updateMaintenanceSchedule(
1492  const mesos::maintenance::Schedule& schedule,
1494  principal) const;
1495 
1496  process::Future<process::http::Response> __updateMaintenanceSchedule(
1497  const mesos::maintenance::Schedule& schedule,
1498  const process::Owned<ObjectApprover>& approver) const;
1499 
1500  process::Future<process::http::Response> ___updateMaintenanceSchedule(
1501  const mesos::maintenance::Schedule& schedule,
1502  bool applied) const;
1503 
1504  mesos::maintenance::Schedule _getMaintenanceSchedule(
1505  const process::Owned<ObjectApprover>& approver) const;
1506 
1508  const process::Owned<ObjectApprover>& approver) const;
1509 
1510  process::Future<process::http::Response> _startMaintenance(
1511  const google::protobuf::RepeatedPtrField<MachineID>& machineIds,
1512  const process::Owned<ObjectApprover>& approver) const;
1513 
1515  const google::protobuf::RepeatedPtrField<MachineID>& machineIds,
1516  const process::Owned<ObjectApprover>& approver) const;
1517 
1519  const SlaveID& slaveId,
1520  const google::protobuf::RepeatedPtrField<Resource>& resources,
1522  principal) const;
1523 
1525  const SlaveID& slaveId,
1526  const google::protobuf::RepeatedPtrField<Resource>& resources,
1528  principal) const;
1529 
1531  const SlaveID& slaveId,
1532  const google::protobuf::RepeatedPtrField<Resource>& volumes,
1534  principal) const;
1535 
1537  const SlaveID& slaveId,
1538  const google::protobuf::RepeatedPtrField<Resource>& volumes,
1540  principal) const;
1541 
1562  const SlaveID& slaveId,
1563  Resources required,
1564  const Offer::Operation& operation) const;
1565 
1568  principal) const;
1569 
1570  // Master API handlers.
1571 
1573  const mesos::master::Call& call,
1575  ContentType contentType) const;
1576 
1577  mesos::master::Response::GetAgents _getAgents(
1578  const process::Owned<AuthorizationAcceptor>& rolesAcceptor) const;
1579 
1581  const mesos::master::Call& call,
1583  ContentType contentType) const;
1584 
1586  const mesos::master::Call& call,
1588  ContentType contentType) const;
1589 
1591  const mesos::master::Call& call,
1593  ContentType contentType) const;
1594 
1596  const mesos::master::Call& call,
1598  ContentType contentType) const;
1599 
1601  const mesos::master::Call& call,
1603  ContentType contentType) const;
1604 
1606  const mesos::master::Call& call,
1608  ContentType contentType) const;
1609 
1611  const mesos::master::Call& call,
1613  ContentType contentType) const;
1614 
1616  const mesos::master::Call& call,
1618  ContentType contentType) const;
1619 
1621  const mesos::master::Call& call,
1623  ContentType contentType) const;
1624 
1625  process::Future<process::http::Response> updateMaintenanceSchedule(
1626  const mesos::master::Call& call,
1628  ContentType contentType) const;
1629 
1630  process::Future<process::http::Response> getMaintenanceSchedule(
1631  const mesos::master::Call& call,
1633  ContentType contentType) const;
1634 
1635  process::Future<process::http::Response> getMaintenanceStatus(
1636  const mesos::master::Call& call,
1638  ContentType contentType) const;
1639 
1641  const mesos::master::Call& call,
1643  ContentType contentType) const;
1644 
1646  const mesos::master::Call& call,
1648  ContentType contentType) const;
1649 
1651  const mesos::master::Call& call,
1653  ContentType contentType) const;
1654 
1655  mesos::master::Response::GetTasks _getTasks(
1656  const process::Owned<ObjectApprover>& frameworksApprover,
1657  const process::Owned<ObjectApprover>& tasksApprover) const;
1658 
1660  const mesos::master::Call& call,
1662  ContentType contentType) const;
1663 
1665  const mesos::master::Call& call,
1667  ContentType contentType) const;
1668 
1670  const mesos::master::Call& call,
1672  ContentType contentType) const;
1673 
1674  process::Future<process::http::Response> unreserveResources(
1675  const mesos::master::Call& call,
1677  ContentType contentType) const;
1678 
1680  const mesos::master::Call& call,
1682  ContentType contentType) const;
1683 
1684  mesos::master::Response::GetFrameworks _getFrameworks(
1685  const process::Owned<ObjectApprover>& frameworksApprover) const;
1686 
1688  const mesos::master::Call& call,
1690  ContentType contentType) const;
1691 
1692  mesos::master::Response::GetExecutors _getExecutors(
1693  const process::Owned<ObjectApprover>& frameworksApprover,
1694  const process::Owned<ObjectApprover>& executorsApprover) const;
1695 
1697  const mesos::master::Call& call,
1699  ContentType contentType) const;
1700 
1701  mesos::master::Response::GetState _getState(
1702  const process::Owned<ObjectApprover>& frameworksApprover,
1703  const process::Owned<ObjectApprover>& taskApprover,
1704  const process::Owned<ObjectApprover>& executorsApprover,
1705  const process::Owned<AuthorizationAcceptor>& rolesAcceptor) const;
1706 
1708  const mesos::master::Call& call,
1710  ContentType contentType) const;
1711 
1713  const mesos::master::Call& call,
1715  ContentType contentType) const;
1716 
1718  const mesos::master::Call& call,
1720  ContentType contentType) const;
1721 
1723  const mesos::master::Call& call,
1725  ContentType contentType) const;
1726 
1728  const SlaveID& slaveId) const;
1729 
1730  Master* master;
1731 
1732  // NOTE: The quota specific pieces of the Operator API are factored
1733  // out into this separate class.
1734  QuotaHandler quotaHandler;
1735 
1736  // NOTE: The weights specific pieces of the Operator API are factored
1737  // out into this separate class.
1738  WeightsHandler weightsHandler;
1739  };
1740 
1741  Master(const Master&); // No copying.
1742  Master& operator=(const Master&); // No assigning.
1743 
1744  friend struct Framework;
1745  friend struct Metrics;
1746  friend struct Slave;
1747  friend struct SlavesWriter;
1748  friend struct Subscriber;
1749 
1750  // NOTE: Since 'getOffer', 'getInverseOffer' and 'slaves' are
1751  // protected, we need to make the following functions friends.
1752  friend Offer* validation::offer::getOffer(
1753  Master* master, const OfferID& offerId);
1754 
1755  friend InverseOffer* validation::offer::getInverseOffer(
1756  Master* master, const OfferID& offerId);
1757 
1759  Master* master, const SlaveID& slaveId);
1760 
1761  const Flags flags;
1762 
1763  Http http;
1764 
1765  Option<MasterInfo> leader; // Current leading master.
1766 
1767  mesos::allocator::Allocator* allocator;
1768  WhitelistWatcher* whitelistWatcher;
1769  Registrar* registrar;
1770  Files* files;
1771 
1774 
1775  const Option<Authorizer*> authorizer;
1776 
1777  MasterInfo info_;
1778 
1779  // Holds some info which affects how a machine behaves, as well as state that
1780  // represent the master's view of this machine. See the `MachineInfo` protobuf
1781  // and `Machine` struct for more information.
1783 
1784  struct Maintenance
1785  {
1786  // Holds the maintenance schedule, as given by the operator.
1787  std::list<mesos::maintenance::Schedule> schedules;
1788  } maintenance;
1789 
1790  // Indicates when recovery is complete. Recovery begins once the
1791  // master is elected as a leader.
1793 
1794  // If this is the leading master, we periodically check whether we
1795  // should GC some information from the registry.
1796  Option<process::Timer> registryGcTimer;
1797 
1798  struct Slaves
1799  {
1800  Slaves() : removed(MAX_REMOVED_SLAVES) {}
1801 
1802  // Imposes a time limit for slaves that we recover from the
1803  // registry to re-register with the master.
1804  Option<process::Timer> recoveredTimer;
1805 
1806  // Slaves that have been recovered from the registrar after master
1807  // failover. Slaves are removed from this collection when they
1808  // either re-register with the master or are marked unreachable
1809  // because they do not re-register before `recoveredTimer` fires.
1810  // We must not answer questions related to these slaves (e.g.,
1811  // during task reconciliation) until we determine their fate
1812  // because their are in this transitioning state.
1813  hashmap<SlaveID, SlaveInfo> recovered;
1814 
1815  // Agents that are in the process of (re-)registering. They are
1816  // maintained here while the (re-)registration is in progress and
1817  // possibly pending in the authorizer or the registrar in order
1818  // to help deduplicate (re-)registration requests.
1819  hashset<process::UPID> registering;
1820  hashset<SlaveID> reregistering;
1821 
1822  // Registered slaves are indexed by SlaveID and UPID. Note that
1823  // iteration is supported but is exposed as iteration over a
1824  // hashmap<SlaveID, Slave*> since it is tedious to convert
1825  // the map's key/value iterator into a value iterator.
1826  //
1827  // TODO(bmahler): Consider pulling in boost's multi_index,
1828  // or creating a simpler indexing abstraction in stout.
1829  struct
1830  {
1831  bool contains(const SlaveID& slaveId) const
1832  {
1833  return ids.contains(slaveId);
1834  }
1835 
1836  bool contains(const process::UPID& pid) const
1837  {
1838  return pids.contains(pid);
1839  }
1840 
1841  Slave* get(const SlaveID& slaveId) const
1842  {
1843  return ids.get(slaveId).getOrElse(nullptr);
1844  }
1845 
1846  Slave* get(const process::UPID& pid) const
1847  {
1848  return pids.get(pid).getOrElse(nullptr);
1849  }
1850 
1851  void put(Slave* slave)
1852  {
1853  CHECK_NOTNULL(slave);
1854  ids[slave->id] = slave;
1855  pids[slave->pid] = slave;
1856  }
1857 
1858  void remove(Slave* slave)
1859  {
1860  CHECK_NOTNULL(slave);
1861  ids.erase(slave->id);
1862  pids.erase(slave->pid);
1863  }
1864 
1865  void clear()
1866  {
1867  ids.clear();
1868  pids.clear();
1869  }
1870 
1871  size_t size() const { return ids.size(); }
1872 
1873  typedef hashmap<SlaveID, Slave*>::iterator iterator;
1874  typedef hashmap<SlaveID, Slave*>::const_iterator const_iterator;
1875 
1876  iterator begin() { return ids.begin(); }
1877  iterator end() { return ids.end(); }
1878 
1879  const_iterator begin() const { return ids.begin(); }
1880  const_iterator end() const { return ids.end(); }
1881 
1882  private:
1885  } registered;
1886 
1887  // Slaves that are in the process of being removed from the
1888  // registrar.
1889  hashset<SlaveID> removing;
1890 
1891  // Slaves that are in the process of being marked unreachable.
1892  hashset<SlaveID> markingUnreachable;
1893 
1894  // Slaves that are in the process of being marked gone.
1895  hashset<SlaveID> markingGone;
1896 
1897  // This collection includes agents that have gracefully shutdown,
1898  // as well as those that have been marked unreachable or gone. We
1899  // keep a cache here to prevent this from growing in an unbounded
1900  // manner.
1901  //
1902  // TODO(bmahler): Ideally we could use a cache with set semantics.
1903  //
1904  // TODO(neilc): Consider storing all agent IDs that have been
1905  // marked unreachable by this master.
1907 
1908  // Slaves that have been marked unreachable. We recover this from
1909  // the registry, so it includes slaves marked as unreachable by
1910  // other instances of the master. Note that we use a LinkedHashMap
1911  // to ensure the order of elements here matches the order in the
1912  // registry's unreachable list, which matches the order in which
1913  // agents are marked unreachable. This list is garbage collected;
1914  // GC behavior is governed by the `registry_gc_interval`,
1915  // `registry_max_agent_age`, and `registry_max_agent_count` flags.
1917 
1918  // Slaves that have been marked gone. We recover this from the
1919  // registry, so it includes slaves marked as gone by other instances
1920  // of the master. Note that we use a LinkedHashMap to ensure the order
1921  // of elements here matches the order in the registry's gone list, which
1922  // matches the order in which agents are marked gone.
1924 
1925  // This rate limiter is used to limit the removal of slaves failing
1926  // health checks.
1927  // NOTE: Using a 'shared_ptr' here is OK because 'RateLimiter' is
1928  // a wrapper around libprocess process which is thread safe.
1930  } slaves;
1931 
1932  struct Frameworks
1933  {
1934  Frameworks(const Flags& masterFlags)
1935  : completed(masterFlags.max_completed_frameworks) {}
1936 
1938 
1940 
1941  // Principals of frameworks keyed by PID.
1942  // NOTE: Multiple PIDs can map to the same principal. The
1943  // principal is None when the framework doesn't specify it.
1944  // The differences between this map and 'authenticated' are:
1945  // 1) This map only includes *registered* frameworks. The mapping
1946  // is added when a framework (re-)registers.
1947  // 2) This map includes unauthenticated frameworks (when Master
1948  // allows them) if they have principals specified in
1949  // FrameworkInfo.
1951 
1952  // BoundedRateLimiters keyed by the framework principal.
1953  // Like Metrics::Frameworks, all frameworks of the same principal
1954  // are throttled together at a common rate limit.
1956 
1957  // The default limiter is for frameworks not specified in
1958  // 'flags.rate_limits'.
1960  } frameworks;
1961 
1962  struct Subscribers
1963  {
1964  // Represents a client subscribed to the 'api/vX' endpoint.
1965  //
1966  // TODO(anand): Add support for filtering. Some subscribers
1967  // might only be interested in a subset of events.
1968  struct Subscriber
1969  {
1971  Master* _master,
1972  const HttpConnection& _http,
1974  : master(_master),
1975  http(_http),
1976  principal(_principal)
1977  {
1978  mesos::master::Event event;
1979  event.set_type(mesos::master::Event::HEARTBEAT);
1980 
1981  heartbeater =
1984  "subscriber " + stringify(http.streamId),
1985  event,
1986  http,
1989 
1990  process::spawn(heartbeater.get());
1991  }
1992 
1993  // Not copyable, not assignable.
1994  Subscriber(const Subscriber&) = delete;
1995  Subscriber& operator=(const Subscriber&) = delete;
1996 
1997  void send(const mesos::master::Event& event,
1998  const process::Owned<AuthorizationAcceptor>& authorizeRole,
2001  const process::Owned<AuthorizationAcceptor>& authorizeExecutor);
2002 
2004  {
2005  // TODO(anand): Refactor `HttpConnection` to being a RAII class instead.
2006  // It is possible that a caller might accidentally invoke `close()`
2007  // after passing ownership to the `Subscriber` object. See MESOS-5843
2008  // for more details.
2009  http.close();
2010 
2011  terminate(heartbeater.get());
2012  wait(heartbeater.get());
2013  }
2014 
2020  };
2021 
2022  // Sends the event to all subscribers connected to the 'api/vX' endpoint.
2023  void send(const mesos::master::Event& event);
2024 
2025  // Active subscribers to the 'api/vX' endpoint keyed by the stream
2026  // identifier.
2028  } subscribers;
2029 
2030  hashmap<OfferID, Offer*> offers;
2032 
2033  hashmap<OfferID, InverseOffer*> inverseOffers;
2034  hashmap<OfferID, process::Timer> inverseOfferTimers;
2035 
2036  // We track information about roles that we're aware of in the system.
2037  // Specifically, we keep track of the roles when a framework subscribes to
2038  // the role, and/or when there are resources allocated to the role
2039  // (e.g. some tasks and/or executors are consuming resources under the role).
2041 
2042  // Configured role whitelist if using the (deprecated) "explicit
2043  // roles" feature. If this is `None`, any role is allowed.
2044  Option<hashset<std::string>> roleWhitelist;
2045 
2046  // Configured weight for each role, if any. If a role does not
2047  // appear here, it has the default weight of 1.
2049 
2050  // Configured quota for each role, if any. We store quotas by role
2051  // because we set them at the role level.
2053 
2054  // Authenticator names as supplied via flags.
2055  std::vector<std::string> authenticatorNames;
2056 
2057  Option<Authenticator*> authenticator;
2058 
2059  // Frameworks/slaves that are currently in the process of authentication.
2060  // 'authenticating' future is completed when authenticator
2061  // completes authentication.
2062  // The future is removed from the map when master completes authentication.
2064 
2065  // Principals of authenticated frameworks/slaves keyed by PID.
2067 
2068  int64_t nextFrameworkId; // Used to give each framework a unique ID.
2069  int64_t nextOfferId; // Used to give each slot offer a unique ID.
2070  int64_t nextSlaveId; // Used to give each slave a unique ID.
2071 
2072  // NOTE: It is safe to use a 'shared_ptr' because 'Metrics' is
2073  // thread safe.
2074  // TODO(dhamon): This does not need to be a shared_ptr. Metrics contains
2075  // copyable metric types only.
2076  std::shared_ptr<Metrics> metrics;
2077 
2078  // Gauge handlers.
2079  double _uptime_secs()
2080  {
2081  return (process::Clock::now() - startTime).secs();
2082  }
2083 
2084  double _elected()
2085  {
2086  return elected() ? 1 : 0;
2087  }
2088 
2089  double _slaves_connected();
2090  double _slaves_disconnected();
2091  double _slaves_active();
2092  double _slaves_inactive();
2093  double _slaves_unreachable();
2094 
2095  double _frameworks_connected();
2096  double _frameworks_disconnected();
2097  double _frameworks_active();
2098  double _frameworks_inactive();
2099 
2100  double _outstanding_offers()
2101  {
2102  return static_cast<double>(offers.size());
2103  }
2104 
2105  double _event_queue_messages()
2106  {
2107  return static_cast<double>(eventCount<process::MessageEvent>());
2108  }
2109 
2110  double _event_queue_dispatches()
2111  {
2112  return static_cast<double>(eventCount<process::DispatchEvent>());
2113  }
2114 
2115  double _event_queue_http_requests()
2116  {
2117  return static_cast<double>(eventCount<process::HttpEvent>());
2118  }
2119 
2120  double _tasks_staging();
2121  double _tasks_starting();
2122  double _tasks_running();
2123  double _tasks_unreachable();
2124  double _tasks_killing();
2125 
2126  double _resources_total(const std::string& name);
2127  double _resources_used(const std::string& name);
2128  double _resources_percent(const std::string& name);
2129 
2130  double _resources_revocable_total(const std::string& name);
2131  double _resources_revocable_used(const std::string& name);
2132  double _resources_revocable_percent(const std::string& name);
2133 
2134  process::Time startTime; // Start time used to calculate uptime.
2135 
2136  Option<process::Time> electedTime; // Time when this master is elected.
2137 
2138  // Validates the framework including authorization.
2139  // Returns None if the framework is valid.
2140  // Returns Error if the framework is invalid.
2141  // Returns Failure if authorization returns 'Failure'.
2143  const FrameworkInfo& frameworkInfo,
2144  const process::UPID& from);
2145 };
2146 
2147 
2148 inline std::ostream& operator<<(
2149  std::ostream& stream,
2150  const Framework& framework);
2151 
2152 
2153 // TODO(bmahler): Keeping the task and executor information in sync
2154 // across the Slave and Framework structs is error prone!
2156 {
2157  enum State
2158  {
2159  // Framework has never connected to this master. This implies the
2160  // master failed over and the framework has not yet re-registered,
2161  // but some framework state has been recovered from re-registering
2162  // agents that are running tasks for the framework.
2164 
2165  // Framework was previously connected to this master. A framework
2166  // becomes disconnected when there is a socket error.
2168 
2169  // The framework is connected but not active.
2171 
2172  // Framework is connected and eligible to receive offers. No
2173  // offers will be made to frameworks that are not active.
2175  };
2176 
2178  const Flags& masterFlags,
2179  const FrameworkInfo& info,
2180  const process::UPID& _pid,
2182  : Framework(master, masterFlags, info, ACTIVE, time)
2183  {
2184  pid = _pid;
2185  }
2186 
2187  Framework(Master* const master,
2188  const Flags& masterFlags,
2189  const FrameworkInfo& info,
2190  const HttpConnection& _http,
2192  : Framework(master, masterFlags, info, ACTIVE, time)
2193  {
2194  http = _http;
2195  }
2196 
2197  Framework(Master* const master,
2198  const Flags& masterFlags,
2199  const FrameworkInfo& info)
2200  : Framework(master, masterFlags, info, RECOVERED, process::Time()) {}
2201 
2203  {
2204  if (http.isSome()) {
2206  }
2207  }
2208 
2209  Task* getTask(const TaskID& taskId)
2210  {
2211  if (tasks.count(taskId) > 0) {
2212  return tasks[taskId];
2213  }
2214 
2215  return nullptr;
2216  }
2217 
2218  void addTask(Task* task)
2219  {
2220  CHECK(!tasks.contains(task->task_id()))
2221  << "Duplicate task " << task->task_id()
2222  << " of framework " << task->framework_id();
2223 
2224  // Verify that Resource.AllocationInfo is set,
2225  // this should be guaranteed by the master.
2226  foreach (const Resource& resource, task->resources()) {
2227  CHECK(resource.has_allocation_info());
2228  }
2229 
2230  tasks[task->task_id()] = task;
2231 
2232  if (!Master::isRemovable(task->state())) {
2233  // Note that we explicitly convert from protobuf to `Resources` once
2234  // and then use the result for calculations to avoid performance penalty
2235  // for multiple conversions and validations implied by `+=` with protobuf
2236  // arguments.
2237  // Conversion is safe, as resources have already passed validation.
2238  const Resources resources = task->resources();
2239  totalUsedResources += resources;
2240  usedResources[task->slave_id()] += resources;
2241 
2242  // It's possible that we're not tracking the task's role for
2243  // this framework if the role is absent from the framework's
2244  // set of roles. In this case, we track the role's allocation
2245  // for this framework.
2246  CHECK(!task->resources().empty());
2247  const std::string& role =
2248  task->resources().begin()->allocation_info().role();
2249 
2250  if (!isTrackedUnderRole(role)) {
2251  trackUnderRole(role);
2252  }
2253  }
2254  }
2255 
2256  // Update framework to recover the resources that were previously
2257  // being used by `task`.
2258  //
2259  // TODO(bmahler): This is a hack for performance. We need to
2260  // maintain resource counters because computing task resources
2261  // functionally for all tasks is expensive, for now.
2262  void recoverResources(Task* task)
2263  {
2264  CHECK(tasks.contains(task->task_id()))
2265  << "Unknown task " << task->task_id()
2266  << " of framework " << task->framework_id();
2267 
2268  totalUsedResources -= task->resources();
2269  usedResources[task->slave_id()] -= task->resources();
2270  if (usedResources[task->slave_id()].empty()) {
2271  usedResources.erase(task->slave_id());
2272  }
2273 
2274  // If we are no longer subscribed to the role to which these resources are
2275  // being returned to, and we have no more resources allocated to us for that
2276  // role, stop tracking the framework under the role.
2277  CHECK(!task->resources().empty());
2278  const std::string& role =
2279  task->resources().begin()->allocation_info().role();
2280 
2281  auto allocatedToRole = [&role](const Resource& resource) {
2282  return resource.allocation_info().role() == role;
2283  };
2284 
2285  if (roles.count(role) == 0 &&
2286  totalUsedResources.filter(allocatedToRole).empty()) {
2287  CHECK(totalOfferedResources.filter(allocatedToRole).empty());
2288  untrackUnderRole(role);
2289  }
2290  }
2291 
2292  // Sends a message to the connected framework.
2293  template <typename Message>
2294  void send(const Message& message)
2295  {
2296  if (!connected()) {
2297  LOG(WARNING) << "Master attempted to send message to disconnected"
2298  << " framework " << *this;
2299  }
2300 
2301  if (http.isSome()) {
2302  if (!http.get().send(message)) {
2303  LOG(WARNING) << "Unable to send event to framework " << *this << ":"
2304  << " connection closed";
2305  }
2306  } else {
2307  CHECK_SOME(pid);
2308  master->send(pid.get(), message);
2309  }
2310  }
2311 
2312  void addCompletedTask(Task&& task)
2313  {
2314  // TODO(neilc): We currently allow frameworks to reuse the task
2315  // IDs of completed tasks (although this is discouraged). This
2316  // means that there might be multiple completed tasks with the
2317  // same task ID. We should consider rejecting attempts to reuse
2318  // task IDs (MESOS-6779).
2319  completedTasks.push_back(process::Owned<Task>(new Task(std::move(task))));
2320  }
2321 
2322  void addUnreachableTask(const Task& task)
2323  {
2324  // TODO(adam-mesos): Check if unreachable task already exists.
2325  unreachableTasks.set(task.task_id(), process::Owned<Task>(new Task(task)));
2326  }
2327 
2328  // Removes the task. `unreachable` indicates whether the task is removed due
2329  // to being unreachable. Note that we cannot rely on the task state because
2330  // it may not reflect unreachability due to being set to TASK_LOST for
2331  // backwards compatibility.
2332  void removeTask(Task* task, bool unreachable)
2333  {
2334  CHECK(tasks.contains(task->task_id()))
2335  << "Unknown task " << task->task_id()
2336  << " of framework " << task->framework_id();
2337 
2338  if (!Master::isRemovable(task->state())) {
2339  recoverResources(task);
2340  }
2341 
2342  if (unreachable) {
2343  addUnreachableTask(*task);
2344  } else {
2345  addCompletedTask(Task(*task));
2346  }
2347 
2348  tasks.erase(task->task_id());
2349  }
2350 
2351  void addOffer(Offer* offer)
2352  {
2353  CHECK(!offers.contains(offer)) << "Duplicate offer " << offer->id();
2354  offers.insert(offer);
2355  totalOfferedResources += offer->resources();
2356  offeredResources[offer->slave_id()] += offer->resources();
2357  }
2358 
2359  void removeOffer(Offer* offer)
2360  {
2361  CHECK(offers.find(offer) != offers.end())
2362  << "Unknown offer " << offer->id();
2363 
2364  totalOfferedResources -= offer->resources();
2365  offeredResources[offer->slave_id()] -= offer->resources();
2366  if (offeredResources[offer->slave_id()].empty()) {
2367  offeredResources.erase(offer->slave_id());
2368  }
2369 
2370  offers.erase(offer);
2371  }
2372 
2373  void addInverseOffer(InverseOffer* inverseOffer)
2374  {
2375  CHECK(!inverseOffers.contains(inverseOffer))
2376  << "Duplicate inverse offer " << inverseOffer->id();
2377  inverseOffers.insert(inverseOffer);
2378  }
2379 
2380  void removeInverseOffer(InverseOffer* inverseOffer)
2381  {
2382  CHECK(inverseOffers.contains(inverseOffer))
2383  << "Unknown inverse offer " << inverseOffer->id();
2384 
2385  inverseOffers.erase(inverseOffer);
2386  }
2387 
2388  bool hasExecutor(const SlaveID& slaveId,
2389  const ExecutorID& executorId)
2390  {
2391  return executors.contains(slaveId) &&
2392  executors[slaveId].contains(executorId);
2393  }
2394 
2395  void addExecutor(const SlaveID& slaveId,
2396  const ExecutorInfo& executorInfo)
2397  {
2398  CHECK(!hasExecutor(slaveId, executorInfo.executor_id()))
2399  << "Duplicate executor '" << executorInfo.executor_id()
2400  << "' on agent " << slaveId;
2401 
2402  // Verify that Resource.AllocationInfo is set,
2403  // this should be guaranteed by the master.
2404  foreach (const Resource& resource, executorInfo.resources()) {
2405  CHECK(resource.has_allocation_info());
2406  }
2407 
2408  executors[slaveId][executorInfo.executor_id()] = executorInfo;
2409  totalUsedResources += executorInfo.resources();
2410  usedResources[slaveId] += executorInfo.resources();
2411 
2412  // It's possible that we're not tracking the task's role for
2413  // this framework if the role is absent from the framework's
2414  // set of roles. In this case, we track the role's allocation
2415  // for this framework.
2416  if (!executorInfo.resources().empty()) {
2417  const std::string& role =
2418  executorInfo.resources().begin()->allocation_info().role();
2419 
2420  if (!isTrackedUnderRole(role)) {
2421  trackUnderRole(role);
2422  }
2423  }
2424  }
2425 
2426  void removeExecutor(const SlaveID& slaveId,
2427  const ExecutorID& executorId)
2428  {
2429  CHECK(hasExecutor(slaveId, executorId))
2430  << "Unknown executor '" << executorId
2431  << "' of framework " << id()
2432  << " of agent " << slaveId;
2433 
2434  const ExecutorInfo& executorInfo = executors[slaveId][executorId];
2435 
2436  totalUsedResources -= executorInfo.resources();
2437  usedResources[slaveId] -= executorInfo.resources();
2438  if (usedResources[slaveId].empty()) {
2439  usedResources.erase(slaveId);
2440  }
2441 
2442  // If we are no longer subscribed to the role to which these resources are
2443  // being returned to, and we have no more resources allocated to us for that
2444  // role, stop tracking the framework under the role.
2445  if (!executorInfo.resources().empty()) {
2446  const std::string& role =
2447  executorInfo.resources().begin()->allocation_info().role();
2448 
2449  auto allocatedToRole = [&role](const Resource& resource) {
2450  return resource.allocation_info().role() == role;
2451  };
2452 
2453  if (roles.count(role) == 0 &&
2454  totalUsedResources.filter(allocatedToRole).empty()) {
2455  CHECK(totalOfferedResources.filter(allocatedToRole).empty());
2456  untrackUnderRole(role);
2457  }
2458  }
2459 
2460  executors[slaveId].erase(executorId);
2461  if (executors[slaveId].empty()) {
2462  executors.erase(slaveId);
2463  }
2464  }
2465 
2466  void addOperation(Operation* operation)
2467  {
2468  CHECK(operation->has_framework_id());
2469 
2470  const FrameworkID& frameworkId = operation->framework_id();
2471 
2472  Try<id::UUID> uuid = id::UUID::fromBytes(operation->uuid().value());
2473  CHECK_SOME(uuid);
2474 
2475  CHECK(!operations.contains(uuid.get()))
2476  << "Duplicate operation '" << operation->info().id()
2477  << "' (uuid: " << uuid->toString() << ") "
2478  << "of framework " << frameworkId;
2479 
2480  operations.put(uuid.get(), operation);
2481 
2482  if (operation->info().has_id()) {
2483  operationUUIDs.put(operation->info().id(), uuid.get());
2484  }
2485 
2486  if (!protobuf::isSpeculativeOperation(operation->info()) &&
2487  !protobuf::isTerminalState(operation->latest_status().state())) {
2488  Try<Resources> consumed =
2489  protobuf::getConsumedResources(operation->info());
2490  CHECK_SOME(consumed);
2491 
2492  CHECK(operation->has_slave_id())
2493  << "External resource provider is not supported yet";
2494 
2495  const SlaveID& slaveId = operation->slave_id();
2496 
2497  totalUsedResources += consumed.get();
2498  usedResources[slaveId] += consumed.get();
2499 
2500  // It's possible that we're not tracking the role from the
2501  // resources in the operation for this framework if the role is
2502  // absent from the framework's set of roles. In this case, we
2503  // track the role's allocation for this framework.
2504  foreachkey (const std::string& role, consumed->allocations()) {
2505  if (!isTrackedUnderRole(role)) {
2506  trackUnderRole(role);
2507  }
2508  }
2509  }
2510  }
2511 
2512  void recoverResources(Operation* operation)
2513  {
2514  CHECK(operation->has_slave_id())
2515  << "External resource provider is not supported yet";
2516 
2517  const SlaveID& slaveId = operation->slave_id();
2518 
2519  if (protobuf::isSpeculativeOperation(operation->info())) {
2520  return;
2521  }
2522 
2523  Try<Resources> consumed = protobuf::getConsumedResources(operation->info());
2524  CHECK_SOME(consumed);
2525 
2526  CHECK(totalUsedResources.contains(consumed.get()))
2527  << "Tried to recover resources " << consumed.get()
2528  << " which do not seem used";
2529 
2530  CHECK(usedResources[slaveId].contains(consumed.get()))
2531  << "Tried to recover resources " << consumed.get() << " of agent "
2532  << slaveId << " which do not seem used";
2533 
2534  totalUsedResources -= consumed.get();
2535  usedResources[slaveId] -= consumed.get();
2536  if (usedResources[slaveId].empty()) {
2537  usedResources.erase(slaveId);
2538  }
2539 
2540  // If we are no longer subscribed to the role to which these
2541  // resources are being returned to, and we have no more resources
2542  // allocated to us for that role, stop tracking the framework
2543  // under the role.
2544  foreachkey (const std::string& role, consumed->allocations()) {
2545  auto allocatedToRole = [&role](const Resource& resource) {
2546  return resource.allocation_info().role() == role;
2547  };
2548 
2549  if (roles.count(role) == 0 &&
2550  totalUsedResources.filter(allocatedToRole).empty()) {
2551  CHECK(totalOfferedResources.filter(allocatedToRole).empty());
2552  untrackUnderRole(role);
2553  }
2554  }
2555  }
2556 
2557  void removeOperation(Operation* operation)
2558  {
2559  Try<id::UUID> uuid = id::UUID::fromBytes(operation->uuid().value());
2560  CHECK_SOME(uuid);
2561 
2562  CHECK(operations.contains(uuid.get()))
2563  << "Unknown operation '" << operation->info().id()
2564  << "' (uuid: " << uuid->toString() << ") "
2565  << "of framework " << operation->framework_id();
2566 
2567  if (!protobuf::isSpeculativeOperation(operation->info()) &&
2568  !protobuf::isTerminalState(operation->latest_status().state())) {
2569  recoverResources(operation);
2570  }
2571 
2572  operations.erase(uuid.get());
2573  }
2574 
2575  const FrameworkID id() const { return info.id(); }
2576 
2577  // Update fields in 'info' using those in 'newInfo'. Currently this
2578  // only updates `role`/`roles`, 'name', 'failover_timeout', 'hostname',
2579  // 'webui_url', 'capabilities', and 'labels'.
2580  void update(const FrameworkInfo& newInfo)
2581  {
2582  // We only merge 'info' from the same framework 'id'.
2583  CHECK_EQ(info.id(), newInfo.id());
2584 
2585  // Save the old list of roles for later.
2586  std::set<std::string> oldRoles = roles;
2587 
2588  // TODO(jmlvanre): Merge other fields as per design doc in
2589  // MESOS-703.
2590 
2591  info.clear_role();
2592  info.clear_roles();
2593 
2594  if (newInfo.has_role()) {
2595  info.set_role(newInfo.role());
2596  }
2597 
2598  if (newInfo.roles_size() > 0) {
2599  info.mutable_roles()->CopyFrom(newInfo.roles());
2600  }
2601 
2603 
2604  if (newInfo.user() != info.user()) {
2605  LOG(WARNING) << "Cannot update FrameworkInfo.user to '" << newInfo.user()
2606  << "' for framework " << id() << ". Check MESOS-703";
2607  }
2608 
2609  info.set_name(newInfo.name());
2610 
2611  if (newInfo.has_failover_timeout()) {
2612  info.set_failover_timeout(newInfo.failover_timeout());
2613  } else {
2614  info.clear_failover_timeout();
2615  }
2616 
2617  if (newInfo.checkpoint() != info.checkpoint()) {
2618  LOG(WARNING) << "Cannot update FrameworkInfo.checkpoint to '"
2619  << stringify(newInfo.checkpoint()) << "' for framework "
2620  << id() << ". Check MESOS-703";
2621  }
2622 
2623  if (newInfo.has_hostname()) {
2624  info.set_hostname(newInfo.hostname());
2625  } else {
2626  info.clear_hostname();
2627  }
2628 
2629  if (newInfo.principal() != info.principal()) {
2630  LOG(WARNING) << "Cannot update FrameworkInfo.principal to '"
2631  << newInfo.principal() << "' for framework " << id()
2632  << ". Check MESOS-703";
2633  }
2634 
2635  if (newInfo.has_webui_url()) {
2636  info.set_webui_url(newInfo.webui_url());
2637  } else {
2638  info.clear_webui_url();
2639  }
2640 
2641  if (newInfo.capabilities_size() > 0) {
2642  info.mutable_capabilities()->CopyFrom(newInfo.capabilities());
2643  } else {
2644  info.clear_capabilities();
2645  }
2646  capabilities = protobuf::framework::Capabilities(info.capabilities());
2647 
2648  if (newInfo.has_labels()) {
2649  info.mutable_labels()->CopyFrom(newInfo.labels());
2650  } else {
2651  info.clear_labels();
2652  }
2653 
2654  const std::set<std::string>& newRoles = roles;
2655 
2656  const std::set<std::string> removedRoles = [&]() {
2657  std::set<std::string> result = oldRoles;
2658  foreach (const std::string& role, newRoles) {
2659  result.erase(role);
2660  }
2661  return result;
2662  }();
2663 
2664  foreach (const std::string& role, removedRoles) {
2665  auto allocatedToRole = [&role](const Resource& resource) {
2666  return resource.allocation_info().role() == role;
2667  };
2668 
2669  // Stop tracking the framework under this role if there are
2670  // no longer any resources allocated to it.
2671  if (totalUsedResources.filter(allocatedToRole).empty()) {
2672  CHECK(totalOfferedResources.filter(allocatedToRole).empty());
2673  untrackUnderRole(role);
2674  }
2675  }
2676 
2677  const std::set<std::string> addedRoles = [&]() {
2678  std::set<std::string> result = newRoles;
2679  foreach (const std::string& role, oldRoles) {
2680  result.erase(role);
2681  }
2682  return result;
2683  }();
2684 
2685  foreach (const std::string& role, addedRoles) {
2686  // NOTE: It's possible that we're already tracking this framework
2687  // under the role because a framework can unsubscribe from a role
2688  // while it still has resources allocated to the role.
2689  if (!isTrackedUnderRole(role)) {
2690  trackUnderRole(role);
2691  }
2692  }
2693  }
2694 
2695  void updateConnection(const process::UPID& newPid)
2696  {
2697  // Cleanup the HTTP connnection if this is a downgrade from HTTP
2698  // to PID. Note that the connection may already be closed.
2699  if (http.isSome()) {
2701  }
2702 
2703  // TODO(benh): unlink(oldPid);
2704  pid = newPid;
2705  }
2706 
2707  void updateConnection(const HttpConnection& newHttp)
2708  {
2709  if (pid.isSome()) {
2710  // Wipe the PID if this is an upgrade from PID to HTTP.
2711  // TODO(benh): unlink(oldPid);
2712  pid = None();
2713  } else if (http.isSome()) {
2714  // Cleanup the old HTTP connection.
2715  // Note that master creates a new HTTP connection for every
2716  // subscribe request, so 'newHttp' should always be different
2717  // from 'http'.
2719  }
2720 
2721  CHECK_NONE(http);
2722 
2723  http = newHttp;
2724  }
2725 
2726  // Closes the HTTP connection and stops the heartbeat.
2727  //
2728  // TODO(vinod): Currently `state` variable is set separately
2729  // from this method. We need to make sure these are in sync.
2731  {
2732  CHECK_SOME(http);
2733 
2734  if (connected() && !http.get().close()) {
2735  LOG(WARNING) << "Failed to close HTTP pipe for " << *this;
2736  }
2737 
2738  http = None();
2739 
2741 
2742  terminate(heartbeater.get().get());
2743  wait(heartbeater.get().get());
2744 
2745  heartbeater = None();
2746  }
2747 
2748  void heartbeat()
2749  {
2751  CHECK_SOME(http);
2752 
2753  // TODO(vinod): Make heartbeat interval configurable and include
2754  // this information in the SUBSCRIBED response.
2755  scheduler::Event event;
2756  event.set_type(scheduler::Event::HEARTBEAT);
2757 
2758  heartbeater =
2760  "framework " + stringify(info.id()),
2761  event,
2762  http.get(),
2764 
2765  process::spawn(heartbeater.get().get());
2766  }
2767 
2768  bool active() const { return state == ACTIVE; }
2769  bool connected() const { return state == ACTIVE || state == INACTIVE; }
2770  bool recovered() const { return state == RECOVERED; }
2771 
2772  bool isTrackedUnderRole(const std::string& role) const;
2773  void trackUnderRole(const std::string& role);
2774  void untrackUnderRole(const std::string& role);
2775 
2776  Master* const master;
2777 
2778  FrameworkInfo info;
2779 
2780  std::set<std::string> roles;
2781 
2783 
2784  // Frameworks can either be connected via HTTP or by message passing
2785  // (scheduler driver). At most one of `http` and `pid` will be set
2786  // according to the last connection made by the framework; neither
2787  // field will be set if the framework is in state `RECOVERED`.
2790 
2792 
2796 
2797  // Tasks that have not yet been launched because they are currently
2798  // being authorized.
2800 
2801  // TODO(bmahler): Make this private to enforce that `addTask()` and
2802  // `removeTask()` are used, and provide a const view into the tasks.
2804 
2805  // Tasks launched by this framework that have reached a terminal
2806  // state and have had all their updates acknowledged. We only keep a
2807  // fixed-size cache to avoid consuming too much memory. We use
2808  // boost::circular_buffer rather than BoundedHashMap because there
2809  // can be multiple completed tasks with the same task ID.
2810  boost::circular_buffer<process::Owned<Task>> completedTasks;
2811 
2812  // When an agent is marked unreachable, tasks running on it are stored
2813  // here. We only keep a fixed-size cache to avoid consuming too much memory.
2814  // NOTE: Non-partition-aware unreachable tasks in this map are marked
2815  // TASK_LOST instead of TASK_UNREACHABLE for backward compatibility.
2817 
2818  hashset<Offer*> offers; // Active offers for framework.
2819 
2820  hashset<InverseOffer*> inverseOffers; // Active inverse offers for framework.
2821 
2822  // TODO(bmahler): Make this private to enforce that `addExecutor()`
2823  // and `removeExecutor()` are used, and provide a const view into
2824  // the executors.
2826 
2827  // Pending operations or terminal operations that have
2828  // unacknowledged status updates.
2830 
2831  // The map from the framework-specified operation ID to the
2832  // corresponding internal operation UUID.
2834 
2835  // NOTE: For the used and offered resources below, we keep the
2836  // total as well as partitioned by SlaveID.
2837  // We expose the total resources via the HTTP endpoint, and we
2838  // keep a running total of the resources because looping over the
2839  // slaves to sum the resources has led to perf issues (MESOS-1862).
2840  // We keep the resources partitioned by SlaveID because non-scalar
2841  // resources can be lost when summing them up across multiple
2842  // slaves (MESOS-2373).
2843  //
2844  // Also note that keeping the totals is safe even though it yields
2845  // incorrect results for non-scalar resources.
2846  // (1) For overlapping set items / ranges across slaves, these
2847  // will get added N times but only represented once.
2848  // (2) When an initial subtraction occurs (N-1), the resource is
2849  // no longer represented. (This is the source of the bug).
2850  // (3) When any further subtractions occur (N-(1+M)), the
2851  // Resources simply ignores the subtraction since there's
2852  // nothing to remove, so this is safe for now.
2853 
2854  // TODO(mpark): Strip the non-scalar resources out of the totals
2855  // in order to avoid reporting incorrect statistics (MESOS-2623).
2856 
2857  // Active task / executor / operation resources.
2859 
2860  // Note that we maintain multiple copies of each shared resource in
2861  // `usedResources` as they are used by multiple tasks.
2863 
2864  // Offered resources.
2867 
2868  // This is only set for HTTP frameworks.
2871 
2872 private:
2873  Framework(Master* const _master,
2874  const Flags& masterFlags,
2875  const FrameworkInfo& _info,
2876  State state,
2877  const process::Time& time)
2878  : master(_master),
2879  info(_info),
2880  roles(protobuf::framework::getRoles(_info)),
2881  capabilities(_info.capabilities()),
2882  state(state),
2883  registeredTime(time),
2884  reregisteredTime(time),
2885  completedTasks(masterFlags.max_completed_tasks_per_framework),
2886  unreachableTasks(masterFlags.max_unreachable_tasks_per_framework)
2887  {
2888  foreach (const std::string& role, roles) {
2889  // NOTE: It's possible that we're already being tracked under the role
2890  // because a framework can unsubscribe from a role while it still has
2891  // resources allocated to the role.
2892  if (!isTrackedUnderRole(role)) {
2893  trackUnderRole(role);
2894  }
2895  }
2896  }
2897 
2898  Framework(const Framework&); // No copying.
2899  Framework& operator=(const Framework&); // No assigning.
2900 };
2901 
2902 
2903 inline std::ostream& operator<<(
2904  std::ostream& stream,
2905  const Framework& framework)
2906 {
2907  // TODO(vinod): Also log the hostname once FrameworkInfo is properly
2908  // updated on framework failover (MESOS-1784).
2909  stream << framework.id() << " (" << framework.info.name() << ")";
2910 
2911  if (framework.pid.isSome()) {
2912  stream << " at " << framework.pid.get();
2913  }
2914 
2915  return stream;
2916 }
2917 
2918 
2919 // Information about an active role.
2920 struct Role
2921 {
2922  Role() = delete;
2923 
2924  Role(const std::string& _role) : role(_role) {}
2925 
2926  void addFramework(Framework* framework)
2927  {
2928  frameworks[framework->id()] = framework;
2929  }
2930 
2931  void removeFramework(Framework* framework)
2932  {
2933  frameworks.erase(framework->id());
2934  }
2935 
2937  {
2938  Resources resources;
2939 
2940  auto allocatedTo = [](const std::string& role) {
2941  return [role](const Resource& resource) {
2942  CHECK(resource.has_allocation_info());
2943  return resource.allocation_info().role() == role;
2944  };
2945  };
2946 
2947  foreachvalue (Framework* framework, frameworks) {
2948  resources += framework->totalUsedResources.filter(allocatedTo(role));
2949  resources += framework->totalOfferedResources.filter(allocatedTo(role));
2950  }
2951 
2952  return resources;
2953  }
2954 
2955  const std::string role;
2956 
2957  // NOTE: The dynamic role/quota relation is stored in and administrated
2958  // by the master. There is no direct representation of quota information
2959  // here to avoid duplication and to support that an operator can associate
2960  // quota with a role before the role is created. Such ordering of operator
2961  // requests prevents a race of premature unbounded allocation that setting
2962  // quota first is intended to contain.
2963 
2965 };
2966 
2967 } // namespace master {
2968 } // namespace internal {
2969 } // namespace mesos {
2970 
2971 #endif // __MASTER_HPP__
void _consume(process::MessageEvent &&event)
void recoverFramework(const FrameworkInfo &info, const std::set< std::string > &suppressedRoles)
std::string generate(const std::string &prefix="")
Returns &#39;prefix(N)&#39; where N represents the number of instances where the same prefix (wrt...
Try< Resources > getConsumedResources(const Offer::Operation &operation)
bool send(const Message &message)
Definition: master.hpp:311
Framework(Master *const master, const Flags &masterFlags, const FrameworkInfo &info, const process::UPID &_pid, const process::Time &time=process::Clock::now())
Definition: master.hpp:2177
void recoverResources(Operation *operation)
Definition: master.hpp:2512
Definition: nothing.hpp:16
hashmap< TaskID, Task * > tasks
Definition: master.hpp:2803
Definition: master.hpp:2920
Master *const master
Definition: master.hpp:2776
void removeOperation(Operation *operation)
ContentType
Definition: http.hpp:43
void removeExecutor(const FrameworkID &frameworkId, const ExecutorID &executorId)
Try< bool > update(const std::string &link, const Handle &parent, uint16_t protocol, const action::Mirror &mirror)
void _removeSlave(Slave *slave, const process::Future< bool > &registrarResult, const std::string &removalCause, Option< process::metrics::Counter > reason=None())
Try< Bytes > size(const std::string &path, const FollowSymlink follow=FollowSymlink::FOLLOW_SYMLINK)
Definition: stat.hpp:100
Future< Response > request(const Request &request, bool streamedResponse=false)
Asynchronously sends an HTTP request to the process and returns the HTTP response once the entire res...
ProcessBase(const std::string &id="")
void finalize() override
Invoked when a process is terminated.
void launchTasks(const process::UPID &from, const FrameworkID &frameworkId, const std::vector< TaskInfo > &tasks, const Filters &filters, const std::vector< OfferID > &offerIds)
SlaveInfo info
Definition: master.hpp:188
bool connected() const
Definition: master.hpp:2769
void removeSlave(Slave *slave, const std::string &message, Option< process::metrics::Counter > reason=None())
~Framework()
Definition: master.hpp:2202
Role(const std::string &_role)
Definition: master.hpp:2924
Definition: try.hpp:34
Slave(Master *const _master, SlaveInfo _info, const process::UPID &_pid, const MachineID &_machineId, const std::string &_version, std::vector< SlaveInfo::Capability > _capabilites, const process::Time &_registeredTime, std::vector< Resource > _checkpointedResources, const Option< id::UUID > &resourceVersion, std::vector< ExecutorInfo > executorInfos=std::vector< ExecutorInfo >(), std::vector< Task > tasks=std::vector< Task >())
const SlaveID id
Definition: master.hpp:187
void exceededCapacity(const process::MessageEvent &event, const Option< std::string > &principal, uint64_t capacity)
hashset< Offer * > offers
Definition: master.hpp:251
process::Future< Nothing > recover()
friend struct Subscriber
Definition: master.hpp:1748
Option< process::Timer > reregistrationTimer
Definition: master.hpp:217
bool connected
Definition: master.hpp:204
#define CHECK_NONE(expression)
Definition: check.hpp:48
void updateSlave(UpdateSlaveMessage &&message)
Task * getTask(const TaskID &taskId)
Definition: master.hpp:2209
Resources totalResources
Definition: master.hpp:277
void addFramework(Framework *framework, const std::set< std::string > &suppressedRoles)
constexpr Duration DEFAULT_HEARTBEAT_INTERVAL
Definition: constants.hpp:52
Definition: protobuf_utils.hpp:246
hashmap< SlaveID, Resources > offeredResources
Definition: master.hpp:2866
protobuf::framework::Capabilities capabilities
Definition: master.hpp:2782
bool hasExecutor(const SlaveID &slaveId, const ExecutorID &executorId)
Definition: master.hpp:2388
std::set< std::string > getRoles(const FrameworkInfo &frameworkInfo)
void deactivateFramework(const process::UPID &from, const FrameworkID &frameworkId)
Try< Nothing > activateRecoveredFramework(Framework *framework, const FrameworkInfo &frameworkInfo, const Option< process::UPID > &pid, const Option< HttpConnection > &http, const std::set< std::string > &suppressedRoles)
Resources filter(const lambda::function< bool(const Resource &)> &predicate) const
v1::AgentID evolve(const SlaveID &slaveId)
Result< ProcessStatus > status(pid_t pid)
Definition: proc.hpp:166
InverseOffer * getInverseOffer(Master *master, const OfferID &offerId)
Try< Nothing > machines(const google::protobuf::RepeatedPtrField< MachineID > &ids)
Performs the following checks on a list of machines:
void addUnreachableTask(const Task &task)
Definition: master.hpp:2322
process::Future< bool > authorizeTask(const TaskInfo &task, Framework *framework)
void offer(const FrameworkID &frameworkId, const hashmap< std::string, hashmap< SlaveID, Resources >> &resources)
mesos::v1::scheduler::Call Call
Definition: mesos.hpp:2583
Definition: resources.hpp:79
Resources totalUsedResources
Definition: master.hpp:2858
void exited(const process::UPID &pid) override
Invoked when a linked process has exited.
Slave * getSlave(Master *master, const SlaveID &slaveId)
void _registerSlave(const process::UPID &pid, RegisterSlaveMessage &&registerSlaveMessage, const Option< process::http::authentication::Principal > &principal, const process::Future< bool > &authorized)
void removeExecutor(const SlaveID &slaveId, const ExecutorID &executorId)
Definition: master.hpp:2426
Option< HttpConnection > http
Definition: master.hpp:2788
bool contains(const Elem &elem) const
Definition: hashset.hpp:102
void unregisterFramework(const process::UPID &from, const FrameworkID &frameworkId)
void update(const FrameworkInfo &newInfo)
Definition: master.hpp:2580
Definition: flags.hpp:42
Definition: registrar.hpp:91
void addFramework(Framework *framework)
Definition: master.hpp:2926
void executorMessage(const process::UPID &from, const SlaveID &slaveId, const FrameworkID &frameworkId, const ExecutorID &executorId, const std::string &data)
Definition: files.hpp:73
void unregisterSlave(const process::UPID &from, const SlaveID &slaveId)
void addCompletedTask(Task &&task)
Definition: master.hpp:2312
void addOperation(Operation *operation)
Operation
Definition: cgroups.hpp:441
Future< Nothing > redirect(int_fd from, Option< int_fd > to, size_t chunk=4096, const std::vector< lambda::function< void(const std::string &)>> &hooks={})
Redirect output from the &#39;from&#39; file descriptor to the &#39;to&#39; file descriptor (or /dev/null if &#39;to&#39; is ...
void updateConnection(const HttpConnection &newHttp)
Definition: master.hpp:2707
UPID spawn(ProcessBase *process, bool manage=false)
Spawn a new process.
void __reregisterSlave(const process::UPID &pid, ReregisterSlaveMessage &&incomingMessage, const process::Future< bool > &readmit)
Framework(Master *const master, const Flags &masterFlags, const FrameworkInfo &info)
Definition: master.hpp:2197
Definition: duration.hpp:32
void removeOperation(Operation *operation)
Definition: master.hpp:2557
void killTask(const process::UPID &from, const FrameworkID &frameworkId, const TaskID &taskId)
void resourceRequest(const process::UPID &from, const FrameworkID &frameworkId, const std::vector< Request > &requests)
void terminate(const UPID &pid, bool inject=true)
Sends a TerminateEvent to the given process.
bool isPending() const
Definition: future.hpp:1224
Try< Nothing > update(const SlaveInfo &info, const std::string &_version, const std::vector< SlaveInfo::Capability > &_capabilites, const Resources &_checkpointedResources, const Option< id::UUID > &resourceVersion)
void lostCandidacy(const process::Future< Nothing > &lost)
void heartbeat()
Definition: master.hpp:2748
std::ostream & operator<<(std::ostream &stream, const Slave &slave)
Definition: master.hpp:290
void send(const process::UPID &to, const google::protobuf::Message &message)
Definition: protobuf.hpp:118
void removeInverseOffer(InverseOffer *inverseOffer, bool rescind=false)
void _exited(Framework *framework)
bool isSome() const
Definition: option.hpp:115
Definition: event.hpp:214
process::Future< Nothing > closed() const
Definition: master.hpp:324
Definition: http.hpp:518
Definition: json.hpp:154
void trackUnderRole(const std::string &role)
hashmap< FrameworkID, hashmap< TaskID, Task * > > tasks
Definition: master.hpp:240
void updateConnection(const process::UPID &newPid)
Definition: master.hpp:2695
void removeOffer(Offer *offer)
Definition: master.hpp:2359
void detected(const process::Future< Option< MasterInfo >> &_leader)
multihashmap< FrameworkID, TaskID > killedTasks
Definition: master.hpp:244
mesos::v1::scheduler::Event Event
Definition: mesos.hpp:2584
void __registerSlave(const process::UPID &pid, RegisterSlaveMessage &&registerSlaveMessage, const process::Future< bool > &admit)
void registerSlave(const process::UPID &from, RegisterSlaveMessage &&registerSlaveMessage)
void updateUnavailability(const MachineID &machineId, const Option< Unavailability > &unavailability)
Task * getTask(const FrameworkID &frameworkId, const TaskID &taskId) const
Definition: hashmap.hpp:38
void failoverFramework(Framework *framework, const process::UPID &newPid)
process::Future< Nothing > _recover(const Registry &registry)
hashmap< ResourceProviderID, ResourceProviderInfo > resourceProviders
Definition: master.hpp:282
void removeTask(Task *task, bool unreachable=false)
bool contains(const std::string &s, const std::string &substr)
Definition: strings.hpp:406
#define CHECK_SOME(expression)
Definition: check.hpp:44
hashmap< std::string, MessageHandler > message
Definition: process.hpp:443
Definition: master.hpp:299
bool active() const
Definition: master.hpp:2768
void exitedExecutor(const process::UPID &from, const SlaveID &slaveId, const FrameworkID &frameworkId, const ExecutorID &executorId, int32_t status)
Resources checkpointedResources
Definition: master.hpp:270
SlaveObserver * observer
Definition: master.hpp:279
bool isTerminalState(const TaskState &state)
Try< Nothing > unavailability(const Unavailability &unavailability)
An &quot;untyped&quot; PID, used to encapsulate the process ID for lower-layer abstractions (eg...
Definition: pid.hpp:39
void addOffer(Offer *offer)
BoundedHashMap< TaskID, process::Owned< Task > > unreachableTasks
Definition: master.hpp:2816
Option< Error > validateFrameworkAuthentication(const FrameworkInfo &frameworkInfo, const process::UPID &from)
void markGone(Slave *slave, const TimeInfo &goneTime)
bool recovered() const
Definition: master.hpp:2770
Result< std::vector< Filter< Classifier > > > filters(const std::string &_link, const Handle &parent)
Definition: internal.hpp:776
process::Time registeredTime
Definition: master.hpp:200
bool active
Definition: master.hpp:209
hashmap< id::UUID, Operation * > operations
Definition: master.hpp:2829
Definition: http.hpp:340
An abstraction for contending to be a leading master.
Definition: contender.hpp:40
std::set< std::string > roles
Definition: master.hpp:2780
void reregisterSlave(const process::UPID &from, ReregisterSlaveMessage &&incomingMessage)
process::UPID pid
Definition: master.hpp:192
Definition: uuid.hpp:35
process::Future< Nothing > apply(Slave *slave, const Offer::Operation &operation)
Definition: protobuf_utils.hpp:434
void addSlave(Slave *slave, std::vector< Archive::Framework > &&completedFrameworks)
process::http::Pipe::Writer writer
Definition: master.hpp:329
bool write(std::string s)
Option< process::Time > reregisteredTime
Definition: master.hpp:201
Future< Nothing > readerClosed() const
void apply(const std::vector< ResourceConversion > &conversions)
void consume(process::MessageEvent &&event) override
process::Time reregisteredTime
Definition: master.hpp:2794
Master *const master
Definition: master.hpp:186
friend struct SlavesWriter
Definition: master.hpp:1747
void addOffer(Offer *offer)
Definition: master.hpp:2351
Timer delay(const Duration &duration, const PID< T > &pid, void(T::*method)())
Definition: delay.hpp:31
void put(const Key &key, const Value &value)
Definition: hashmap.hpp:104
void disconnect(Framework *framework)
Subscriber(Master *_master, const HttpConnection &_http, const Option< process::http::authentication::Principal > _principal)
Definition: master.hpp:1970
Option< Error > quotaInfo(const mesos::quota::QuotaInfo &quotaInfo)
const T & get() const &
Definition: option.hpp:118
void removeExecutor(Slave *slave, const FrameworkID &frameworkId, const ExecutorID &executorId)
const std::string role
Definition: master.hpp:2955
Framework(Master *const master, const Flags &masterFlags, const FrameworkInfo &info, const HttpConnection &_http, const process::Time &time=process::Clock::now())
Definition: master.hpp:2187
void removeOffer(Offer *offer, bool rescind=false)
process::Future< bool > authorizeDestroyVolume(const Offer::Operation::Destroy &destroy, const Option< process::http::authentication::Principal > &principal)
Authorizes a DESTROY operation.
void updateSlaveFrameworks(Slave *slave, const std::vector< FrameworkInfo > &frameworks)
Definition: protobuf.hpp:100
process::Time registeredTime
Definition: master.hpp:2793
process::Future< Nothing > destroy(const std::string &hierarchy, const std::string &cgroup="/")
#define foreachvalue(VALUE, ELEMS)
Definition: foreach.hpp:77
bool isTrackedUnderRole(const std::string &role) const
void removeInverseOffer(InverseOffer *inverseOffer)
Definition: master.hpp:2380
void addExecutor(const FrameworkID &frameworkId, const ExecutorInfo &executorInfo)
Operation * getOperation(const id::UUID &uuid) const
Definition: whitelist_watcher.hpp:37
void removeFramework(Framework *framework)
Definition: master.hpp:2931
MasterInfo info() const
Definition: master.hpp:559
void inverseOfferTimeout(const OfferID &inverseOfferId)
bool isCompletedFramework(const FrameworkID &frameworkId)
hashmap< Option< ResourceProviderID >, id::UUID > resourceVersions
Definition: master.hpp:281
bool wait(const UPID &pid, const Duration &duration=Seconds(-1))
Wait for the process to exit for no more than the specified seconds.
Definition: time.hpp:23
void offerTimeout(const OfferID &offerId)
void schedulerMessage(const process::UPID &from, const SlaveID &slaveId, const FrameworkID &frameworkId, const ExecutorID &executorId, const std::string &data)
virtual void initialize() override
Invoked when a process gets spawned.
Definition: master.hpp:358
FrameworkInfo info
Definition: master.hpp:2778
ContentType contentType
Definition: master.hpp:330
void statusUpdateAcknowledgement(const process::UPID &from, const SlaveID &slaveId, const FrameworkID &frameworkId, const TaskID &taskId, const std::string &uuid)
Try< std::vector< Entry > > list(const std::string &hierarchy, const std::string &cgroup)
void initialize() override
Invoked when a process gets spawned.
id::UUID streamId
Definition: master.hpp:331
HttpConnection http
Definition: master.hpp:2016
void send(const Message &message)
Definition: master.hpp:2294
Definition: boundedhashmap.hpp:27
Basic model of an allocator: resources are allocated to a framework in the form of offers...
Definition: allocator.hpp:55
void _markUnreachable(const SlaveInfo &slave, const TimeInfo &unreachableTime, bool duringMasterFailover, const std::string &message, bool registrarResult)
void removeInverseOffer(InverseOffer *inverseOffer)
void agentReregisterTimeout(const SlaveID &slaveId)
void registerFramework(const process::UPID &from, const FrameworkInfo &frameworkInfo)
Offer * getOffer(const OfferID &offerId) const
void recoverResources(Task *task)
Definition: master.hpp:2262
Result< Process > process(pid_t pid)
Definition: freebsd.hpp:30
#define flags
Definition: decoder.hpp:18
process::Future< bool > markUnreachable(const SlaveInfo &slave, bool duringMasterFailover, const std::string &message)
bool empty() const
Definition: resources.hpp:388
Resources addTask(const TaskInfo &task, Framework *framework, Slave *slave)
void contended(const process::Future< process::Future< Nothing >> &candidacy)
Definition: none.hpp:27
InverseOffer * getInverseOffer(const OfferID &inverseOfferId) const
friend void * schedule(void *)
static Try< UUID > fromBytes(const std::string &s)
Definition: uuid.hpp:49
void updateOperation(Operation *operation, const UpdateOperationStatusMessage &update, bool convertResources=true)
hashset< InverseOffer * > inverseOffers
Definition: master.hpp:2820
void updateTask(Task *task, const StatusUpdate &update)
void removeTask(Task *task, bool unreachable)
Definition: master.hpp:2332
void send(const mesos::master::Event &event, const process::Owned< AuthorizationAcceptor > &authorizeRole, const process::Owned< AuthorizationAcceptor > &authorizeFramework, const process::Owned< AuthorizationAcceptor > &authorizeTask, const process::Owned< AuthorizationAcceptor > &authorizeExecutor)
friend struct Slave
Definition: master.hpp:1746
void updateFramework(Framework *framework, const FrameworkInfo &frameworkInfo, const std::set< std::string > &suppressedRoles)
Object protobuf(const google::protobuf::Message &message)
Definition: protobuf.hpp:836
void addInverseOffer(InverseOffer *inverseOffer)
void _failoverFramework(Framework *framework)
const MachineID machineId
Definition: master.hpp:190
void set(const Key &key, const Value &value)
Definition: boundedhashmap.hpp:39
void frameworkFailoverTimeout(const FrameworkID &frameworkId, const process::Time &reregisteredTime)
void ___reregisterSlave(const process::UPID &pid, ReregisterSlaveMessage &&incomingMessage, const process::Future< bool > &updated)
hashmap< OperationID, id::UUID > operationUUIDs
Definition: master.hpp:2833
void reregisterFramework(const process::UPID &from, const FrameworkInfo &frameworkInfo, bool failover)
boost::circular_buffer< process::Owned< Task > > completedTasks
Definition: master.hpp:2810
Definition: master.hpp:117
hashmap< uint16_t, std::string > * statuses
Resources allocatedResources() const
Definition: master.hpp:2936
#define foreachkey(KEY, ELEMS)
Definition: foreach.hpp:74
HttpConnection(const process::http::Pipe::Writer &_writer, ContentType _contentType, id::UUID _streamId)
Definition: master.hpp:301
void recoverResources(Task *task)
constexpr size_t MAX_REMOVED_SLAVES
Definition: constants.hpp:84
hashset< InverseOffer * > inverseOffers
Definition: master.hpp:254
bool close()
Definition: master.hpp:319
void authenticate(const process::UPID &from, const process::UPID &pid)
An abstraction of a Master detector which can be used to detect the leading master from a group...
Definition: detector.hpp:38
static Time now()
The current clock time for either the current process that makes this call or the global clock time i...
void addExecutor(const SlaveID &slaveId, const ExecutorInfo &executorInfo)
Definition: master.hpp:2395
process::Future< bool > authorizeUnreserveResources(const Offer::Operation::Unreserve &unreserve, const Option< process::http::authentication::Principal > &principal)
Authorizes an UNRESERVE operation.
void _reconcileTasks(Framework *framework, const std::vector< TaskStatus > &statuses)
Definition: event.hpp:103
void statusUpdate(StatusUpdate update, const process::UPID &pid)
process::Future< bool > authorizeSlave(const SlaveInfo &slaveInfo, const Option< process::http::authentication::Principal > &principal)
void inverseOffer(const FrameworkID &frameworkId, const hashmap< SlaveID, UnavailableResources > &resources)
hashmap< TaskID, TaskInfo > pendingTasks
Definition: master.hpp:2799
process::Future< bool > authorizeFramework(const FrameworkInfo &frameworkInfo)
Heartbeater(const std::string &_logMessage, const Message &_heartbeatMessage, const HttpConnection &_http, const Duration &_interval, const Option< Duration > &_delay=None())
Definition: master.hpp:345
void recoveredSlavesTimeout(const Registry &registry)
virtual void lost(const UPID &)
Invoked when a linked process can no longer be monitored.
Definition: process.hpp:133
Definition: master.hpp:342
bool hasExecutor(const FrameworkID &frameworkId, const ExecutorID &executorId) const
Given an encoding function for individual records, this provides encoding from typed records into &quot;Re...
Definition: recordio.hpp:57
Option< process::Owned< Heartbeater< scheduler::Event, v1::scheduler::Event > > > heartbeater
Definition: master.hpp:2870
Option< process::UPID > pid
Definition: master.hpp:2789
Master(mesos::allocator::Allocator *allocator, Registrar *registrar, Files *files, mesos::master::contender::MasterContender *contender, mesos::master::detector::MasterDetector *detector, const Option< Authorizer * > &authorizer, const Option< std::shared_ptr< process::RateLimiter >> &slaveRemovalLimiter, const Flags &flags=Flags())
void _reregisterSlave(const process::UPID &pid, ReregisterSlaveMessage &&incomingMessage, const Option< process::http::authentication::Principal > &principal, const process::Future< bool > &authorized)
void authenticationTimeout(process::Future< Option< std::string >> future)
Definition: metrics.hpp:38
std::string toString() const
Definition: uuid.hpp:87
bool isSpeculativeOperation(const Offer::Operation &operation)
State
Definition: master.hpp:2157
Offer * getOffer(Master *master, const OfferID &offerId)
Try< Nothing > create(const std::string &hierarchy, const std::string &cgroup, bool recursive=false)
hashmap< FrameworkID, Framework * > frameworks
Definition: master.hpp:2964
Subscriber & operator=(const Subscriber &)=delete
Try< Nothing > bind(int_fd s, const Address &address)
Definition: network.hpp:46
process::Time unregisteredTime
Definition: master.hpp:2795
std::string serialize(ContentType contentType, const google::protobuf::Message &message)
void addOperation(Operation *operation)
Definition: master.hpp:2466
std::string version
Definition: master.hpp:195
URI http(const std::string &host, const std::string &path="/", const Option< int > &port=None(), const Option< std::string > &query=None(), const Option< std::string > &fragment=None(), const Option< std::string > &user=None(), const Option< std::string > &password=None())
Creates an http URI with the given parameters.
Definition: http.hpp:35
hashmap< FrameworkID, hashmap< ExecutorID, ExecutorInfo > > executors
Definition: master.hpp:224
void reviveOffers(const process::UPID &from, const FrameworkID &frameworkId, const std::vector< std::string > &role)
hashset< Offer * > offers
Definition: master.hpp:2818
std::string stringify(int flags)
Definition: owned.hpp:35
void fileAttached(const process::Future< Nothing > &result, const std::string &path)
void addTask(Task *task)
Definition: master.hpp:2218
void reconcileTasks(const process::UPID &from, const FrameworkID &frameworkId, const std::vector< TaskStatus > &statuses)
void closeHttpConnection()
Definition: master.hpp:2730
Definition: master.hpp:2155
void untrackUnderRole(const std::string &role)
bool contains(const Resources &that) const
protobuf::slave::Capabilities capabilities
Definition: master.hpp:198
hashmap< SlaveID, hashmap< ExecutorID, ExecutorInfo > > executors
Definition: master.hpp:2825
Definition: process.hpp:493
void sendSlaveLost(const SlaveInfo &slaveInfo)
process::Owned< Heartbeater< mesos::master::Event, v1::master::Event > > heartbeater
Definition: master.hpp:2018
hashmap< id::UUID, Operation * > operations
Definition: master.hpp:248
void reconcileKnownSlave(Slave *slave, const std::vector< ExecutorInfo > &executors, const std::vector< Task > &tasks)
bool contains(const Key &key) const
Definition: hashmap.hpp:86
void forward(const StatusUpdate &update, const process::UPID &acknowledgee, Framework *framework)
Nothing _agentReregisterTimeout(const SlaveID &slaveId)
void removeFramework(Framework *framework)
Try< std::set< pid_t > > pids()
Definition: freebsd.hpp:62
const Option< process::http::authentication::Principal > principal
Definition: master.hpp:2019
hashmap< SlaveID, Resources > usedResources
Definition: master.hpp:2862
Resources totalOfferedResources
Definition: master.hpp:2865
Definition: master.hpp:392
Resources offeredResources
Definition: master.hpp:261
hashmap< FrameworkID, hashmap< TaskID, TaskInfo > > pendingTasks
Definition: master.hpp:230
const FrameworkID id() const
Definition: master.hpp:2575
process::Future< bool > authorizeReserveResources(const Offer::Operation::Reserve &reserve, const Option< process::http::authentication::Principal > &principal)
Authorizes a RESERVE operation.
void submitScheduler(const std::string &name)
void removeOperation(Operation *operation)
void deactivate(Framework *framework, bool rescind)
const T & get() const
Definition: try.hpp:73
constexpr const char * name
Definition: shell.hpp:41
hashmap< FrameworkID, Resources > usedResources
Definition: master.hpp:259
void throttled(process::MessageEvent &&event, const Option< std::string > &principal)
void addOperation(Framework *framework, Slave *slave, Operation *operation)
void __removeSlave(Slave *slave, const std::string &message, const Option< TimeInfo > &unreachableTime)
process::Future< bool > authorizeCreateVolume(const Offer::Operation::Create &create, const Option< process::http::authentication::Principal > &principal)
Authorizes a CREATE operation.
void removeOffer(Offer *offer)
void updateOperationStatus(const UpdateOperationStatusMessage &update)
Try< std::vector< Value > > time(const std::string &hierarchy, const std::string &cgroup)
Framework * getFramework(const FrameworkID &frameworkId) const
void addInverseOffer(InverseOffer *inverseOffer)
Definition: master.hpp:2373
State state
Definition: master.hpp:2791
void _authenticate(const process::UPID &pid, const process::Future< Option< std::string >> &future)