Apache Mesos
master.hpp
Go to the documentation of this file.
1 // Licensed to the Apache Software Foundation (ASF) under one
2 // or more contributor license agreements. See the NOTICE file
3 // distributed with this work for additional information
4 // regarding copyright ownership. The ASF licenses this file
5 // to you under the Apache License, Version 2.0 (the
6 // "License"); you may not use this file except in compliance
7 // with the License. You may obtain a copy of the License at
8 //
9 // http://www.apache.org/licenses/LICENSE-2.0
10 //
11 // Unless required by applicable law or agreed to in writing, software
12 // distributed under the License is distributed on an "AS IS" BASIS,
13 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 // See the License for the specific language governing permissions and
15 // limitations under the License.
16 
17 #ifndef __MASTER_HPP__
18 #define __MASTER_HPP__
19 
20 #include <stdint.h>
21 
22 #include <list>
23 #include <memory>
24 #include <set>
25 #include <string>
26 #include <vector>
27 
28 #include <boost/circular_buffer.hpp>
29 
30 #include <mesos/mesos.hpp>
31 #include <mesos/resources.hpp>
32 #include <mesos/type_utils.hpp>
33 
35 
39 #include <mesos/master/master.hpp>
40 
42 
43 #include <mesos/quota/quota.hpp>
44 
46 
47 #include <process/limiter.hpp>
48 #include <process/http.hpp>
49 #include <process/owned.hpp>
50 #include <process/process.hpp>
51 #include <process/protobuf.hpp>
52 #include <process/timer.hpp>
53 
55 
56 #include <stout/boundedhashmap.hpp>
57 #include <stout/cache.hpp>
58 #include <stout/foreach.hpp>
59 #include <stout/hashmap.hpp>
60 #include <stout/hashset.hpp>
61 #include <stout/linkedhashmap.hpp>
62 #include <stout/multihashmap.hpp>
63 #include <stout/nothing.hpp>
64 #include <stout/option.hpp>
65 #include <stout/recordio.hpp>
66 #include <stout/try.hpp>
67 #include <stout/uuid.hpp>
68 
69 #include "common/http.hpp"
72 
73 #include "files/files.hpp"
74 
75 #include "internal/devolve.hpp"
76 #include "internal/evolve.hpp"
77 
78 #include "master/constants.hpp"
79 #include "master/flags.hpp"
80 #include "master/machine.hpp"
81 #include "master/metrics.hpp"
82 #include "master/validation.hpp"
83 
84 #include "messages/messages.hpp"
85 
86 namespace process {
87 class RateLimiter; // Forward declaration.
88 }
89 
90 namespace mesos {
91 
92 // Forward declarations.
93 class Authorizer;
94 class ObjectApprovers;
95 
96 namespace internal {
97 
98 // Forward declarations.
99 namespace registry {
100 class Slaves;
101 }
102 
103 class Registry;
104 class WhitelistWatcher;
105 
106 namespace master {
107 
108 class Master;
109 class Registrar;
110 class SlaveObserver;
111 
112 struct BoundedRateLimiter;
113 struct Framework;
114 struct Role;
115 
116 
117 struct Slave
118 {
119 Slave(Master* const _master,
120  SlaveInfo _info,
121  const process::UPID& _pid,
122  const MachineID& _machineId,
123  const std::string& _version,
124  std::vector<SlaveInfo::Capability> _capabilites,
125  const process::Time& _registeredTime,
126  std::vector<Resource> _checkpointedResources,
127  const Option<UUID>& _resourceVersion,
128  std::vector<ExecutorInfo> executorInfos = std::vector<ExecutorInfo>(),
129  std::vector<Task> tasks = std::vector<Task>());
130 
131  ~Slave();
132 
133  Task* getTask(
134  const FrameworkID& frameworkId,
135  const TaskID& taskId) const;
136 
137  void addTask(Task* task);
138 
139  // Update slave to recover the resources that were previously
140  // being used by `task`.
141  //
142  // TODO(bmahler): This is a hack for performance. We need to
143  // maintain resource counters because computing task resources
144  // functionally for all tasks is expensive, for now.
145  void recoverResources(Task* task);
146 
147  void removeTask(Task* task);
148 
149  void addOperation(Operation* operation);
150 
151  void recoverResources(Operation* operation);
152 
153  void removeOperation(Operation* operation);
154 
155  Operation* getOperation(const UUID& uuid) const;
156 
157  void addOffer(Offer* offer);
158 
159  void removeOffer(Offer* offer);
160 
161  void addInverseOffer(InverseOffer* inverseOffer);
162 
163  void removeInverseOffer(InverseOffer* inverseOffer);
164 
165  bool hasExecutor(
166  const FrameworkID& frameworkId,
167  const ExecutorID& executorId) const;
168 
169  void addExecutor(
170  const FrameworkID& frameworkId,
171  const ExecutorInfo& executorInfo);
172 
173  void removeExecutor(
174  const FrameworkID& frameworkId,
175  const ExecutorID& executorId);
176 
177  void apply(const std::vector<ResourceConversion>& conversions);
178 
180  const SlaveInfo& info,
181  const std::string& _version,
182  const std::vector<SlaveInfo::Capability>& _capabilites,
183  const Resources& _checkpointedResources,
184  const Option<UUID>& resourceVersion);
185 
186  Master* const master;
187  const SlaveID id;
188  SlaveInfo info;
189 
190  const MachineID machineId;
191 
193 
194  // TODO(bmahler): Use stout's Version when it can parse labels, etc.
195  std::string version;
196 
197  // Agent capabilities.
199 
202 
203  // Slave becomes disconnected when the socket closes.
204  bool connected;
205 
206  // Slave becomes deactivated when it gets disconnected. In the
207  // future this might also happen via HTTP endpoint.
208  // No offers will be made for a deactivated slave.
209  bool active;
210 
211  // Timer for marking slaves unreachable that become disconnected and
212  // don't reregister. This timeout is larger than the slave
213  // observer's timeout, so typically the slave observer will be the
214  // one to mark such slaves unreachable; this timer is a backup for
215  // when a slave responds to pings but does not reregister (e.g.,
216  // because agent recovery has hung).
218 
219  // Executors running on this slave.
220  //
221  // TODO(bmahler): Make this private to enforce that `addExecutor()`
222  // and `removeExecutor()` are used, and provide a const view into
223  // the executors.
225 
226  // Tasks that have not yet been launched because they are currently
227  // being authorized. This is similar to Framework's pendingTasks but we
228  // track pendingTasks per agent separately to determine if any offer
229  // operation for this agent would change resources requested by these tasks.
231 
232  // Tasks present on this slave.
233  //
234  // TODO(bmahler): Make this private to enforce that `addTask()` and
235  // `removeTask()` are used, and provide a const view into the tasks.
236  //
237  // TODO(bmahler): The task pointer ownership complexity arises from the fact
238  // that we own the pointer here, but it's shared with the Framework struct.
239  // We should find a way to eliminate this.
241 
242  // Tasks that were asked to kill by frameworks.
243  // This is used for reconciliation when the slave reregisters.
245 
246  // Pending operations or terminal operations that have
247  // unacknowledged status updates on this agent.
249 
250  // Active offers on this slave.
252 
253  // Active inverse offers on this slave.
255 
256  // Resources for active task / executors / operations.
257  // Note that we maintain multiple copies of each shared resource in
258  // `usedResources` as they are used by multiple tasks.
260 
262 
263  // Resources that should be checkpointed by the slave (e.g.,
264  // persistent volumes, dynamic reservations, etc). These are either
265  // in use by a task/executor, or are available for use and will be
266  // re-offered to the framework.
267  // TODO(jieyu): `checkpointedResources` is only for agent default
268  // resources. Resources from resource providers are not included in
269  // this field. Consider removing this field.
271 
272  // The current total resources of the slave. Note that this is
273  // different from 'info.resources()' because this also considers
274  // operations (e.g., CREATE, RESERVE) that have been applied and
275  // includes revocable resources and resources from resource
276  // providers as well.
278 
279  // Used to establish the relationship between the operation and the
280  // resources that the operation is operating on. Each resource
281  // provider will keep a resource version UUID, and change it when it
282  // believes that the resources from this resource provider are out
283  // of sync from the master's view. The master will keep track of
284  // the last known resource version UUID for each resource provider,
285  // and attach the resource version UUID in each operation it sends
286  // out. The resource provider should reject operations that have a
287  // different resource version UUID than that it maintains, because
288  // this means the operation is operating on resources that might
289  // have already been invalidated.
291 
292  SlaveObserver* observer;
293 
295  ResourceProviderInfo info;
297 
298  // Used to establish the relationship between the operation and the
299  // resources that the operation is operating on. Each resource
300  // provider will keep a resource version UUID, and change it when it
301  // believes that the resources from this resource provider are out
302  // of sync from the master's view. The master will keep track of
303  // the last known resource version UUID for each resource provider,
304  // and attach the resource version UUID in each operation it sends
305  // out. The resource provider should reject operations that have a
306  // different resource version UUID than that it maintains, because
307  // this means the operation is operating on resources that might
308  // have already been invalidated.
310 
311  // Pending operations or terminal operations that have
312  // unacknowledged status updates.
314  };
315 
317 
318 private:
319  Slave(const Slave&); // No copying.
320  Slave& operator=(const Slave&); // No assigning.
321 };
322 
323 
324 inline std::ostream& operator<<(std::ostream& stream, const Slave& slave)
325 {
326  return stream << slave.id << " at " << slave.pid
327  << " (" << slave.info.hostname() << ")";
328 }
329 
330 
331 // Represents the streaming HTTP connection to a framework or a client
332 // subscribed to the '/api/vX' endpoint.
334 {
336  ContentType _contentType,
337  id::UUID _streamId)
338  : writer(_writer),
339  contentType(_contentType),
340  streamId(_streamId) {}
341 
342  // We need to evolve the internal old style message/unversioned event into a
343  // versioned event e.g., `v1::scheduler::Event` or `v1::master::Event`.
344  template <typename Message, typename Event = v1::scheduler::Event>
345  bool send(const Message& message)
346  {
348  serialize, contentType, lambda::_1));
349 
350  return writer.write(encoder.encode(evolve(message)));
351  }
352 
353  bool close()
354  {
355  return writer.close();
356  }
357 
359  {
360  return writer.readerClosed();
361  }
362 
366 };
367 
368 
369 // This process periodically sends heartbeats to a given HTTP connection.
370 // The `Message` template parameter is the type of the heartbeat event passed
371 // into the heartbeater during construction, while the `Event` template
372 // parameter is the versioned event type which is sent to the client.
373 // The optional delay parameter is used to specify the delay period before it
374 // sends the first heartbeat.
375 template <typename Message, typename Event>
376 class Heartbeater : public process::Process<Heartbeater<Message, Event>>
377 {
378 public:
379  Heartbeater(const std::string& _logMessage,
380  const Message& _heartbeatMessage,
381  const HttpConnection& _http,
382  const Duration& _interval,
383  const Option<Duration>& _delay = None())
384  : process::ProcessBase(process::ID::generate("heartbeater")),
385  logMessage(_logMessage),
386  heartbeatMessage(_heartbeatMessage),
387  http(_http),
388  interval(_interval),
389  delay(_delay) {}
390 
391 protected:
392  void initialize() override
393  {
394  if (delay.isSome()) {
396  delay.get(),
397  this,
399  } else {
400  heartbeat();
401  }
402  }
403 
404 private:
405  void heartbeat()
406  {
407  // Only send a heartbeat if the connection is not closed.
408  if (http.closed().isPending()) {
409  VLOG(2) << "Sending heartbeat to " << logMessage;
410 
411  Message message(heartbeatMessage);
412  http.send<Message, Event>(message);
413  }
414 
416  }
417 
418  const std::string logMessage;
419  const Message heartbeatMessage;
421  const Duration interval;
422  const Option<Duration> delay;
423 };
424 
425 
426 class Master : public ProtobufProcess<Master>
427 {
428 public:
430  Registrar* registrar,
431  Files* files,
434  const Option<Authorizer*>& authorizer,
435  const Option<std::shared_ptr<process::RateLimiter>>&
436  slaveRemovalLimiter,
437  const Flags& flags = Flags());
438 
439  ~Master() override;
440 
441  // Message handlers.
442  void submitScheduler(
443  const std::string& name);
444 
445  void registerFramework(
446  const process::UPID& from,
447  RegisterFrameworkMessage&& registerFrameworkMessage);
448 
449  void reregisterFramework(
450  const process::UPID& from,
451  ReregisterFrameworkMessage&& reregisterFrameworkMessage);
452 
453  void unregisterFramework(
454  const process::UPID& from,
455  const FrameworkID& frameworkId);
456 
457  void deactivateFramework(
458  const process::UPID& from,
459  const FrameworkID& frameworkId);
460 
461  // TODO(vinod): Remove this once the old driver is removed.
462  void resourceRequest(
463  const process::UPID& from,
464  const FrameworkID& frameworkId,
465  const std::vector<Request>& requests);
466 
467  void launchTasks(
468  const process::UPID& from,
469  LaunchTasksMessage&& launchTasksMessage);
470 
471  void reviveOffers(
472  const process::UPID& from,
473  const FrameworkID& frameworkId,
474  const std::vector<std::string>& role);
475 
476  void killTask(
477  const process::UPID& from,
478  const FrameworkID& frameworkId,
479  const TaskID& taskId);
480 
481  void statusUpdateAcknowledgement(
482  const process::UPID& from,
483  StatusUpdateAcknowledgementMessage&& statusUpdateAcknowledgementMessage);
484 
485  void schedulerMessage(
486  const process::UPID& from,
487  FrameworkToExecutorMessage&& frameworkToExecutorMessage);
488 
489  void executorMessage(
490  const process::UPID& from,
491  ExecutorToFrameworkMessage&& executorToFrameworkMessage);
492 
493  void registerSlave(
494  const process::UPID& from,
495  RegisterSlaveMessage&& registerSlaveMessage);
496 
497  void reregisterSlave(
498  const process::UPID& from,
499  ReregisterSlaveMessage&& incomingMessage);
500 
501  void unregisterSlave(
502  const process::UPID& from,
503  const SlaveID& slaveId);
504 
505  void statusUpdate(
506  StatusUpdateMessage&& statusUpdateMessage);
507 
508  void reconcileTasks(
509  const process::UPID& from,
510  ReconcileTasksMessage&& reconcileTasksMessage);
511 
512  void updateOperationStatus(
513  UpdateOperationStatusMessage&& update);
514 
515  void exitedExecutor(
516  const process::UPID& from,
517  const SlaveID& slaveId,
518  const FrameworkID& frameworkId,
519  const ExecutorID& executorId,
520  int32_t status);
521 
522  void updateSlave(UpdateSlaveMessage&& message);
523 
524  void updateUnavailability(
525  const MachineID& machineId,
527 
528  // Marks the agent unreachable and returns whether the agent was
529  // marked unreachable. Returns false if the agent is already
530  // in a transitioning state or has transitioned into another
531  // state (this includes already being marked unreachable).
532  // The `duringMasterFailover` parameter specifies whether this
533  // agent is transitioning from a recovered state (true) or a
534  // registered state (false).
535  //
536  // Discarding currently not supported.
537  //
538  // Will not return a failure (this will crash the master
539  // internally in the case of a registry failure).
540  process::Future<bool> markUnreachable(
541  const SlaveInfo& slave,
542  bool duringMasterFailover,
543  const std::string& message);
544 
545  void markGone(Slave* slave, const TimeInfo& goneTime);
546 
547  void authenticate(
548  const process::UPID& from,
549  const process::UPID& pid);
550 
551  // TODO(bmahler): It would be preferred to use a unique libprocess
552  // Process identifier (PID is not sufficient) for identifying the
553  // framework instance, rather than relying on re-registration time.
554  void frameworkFailoverTimeout(
555  const FrameworkID& frameworkId,
556  const process::Time& reregisteredTime);
557 
558  void offer(
559  const FrameworkID& frameworkId,
560  const hashmap<std::string, hashmap<SlaveID, Resources>>& resources);
561 
562  void inverseOffer(
563  const FrameworkID& frameworkId,
564  const hashmap<SlaveID, UnavailableResources>& resources);
565 
566  // Invoked when there is a newly elected leading master.
567  // Made public for testing purposes.
568  void detected(const process::Future<Option<MasterInfo>>& _leader);
569 
570  // Invoked when the contender has lost the candidacy.
571  // Made public for testing purposes.
572  void lostCandidacy(const process::Future<Nothing>& lost);
573 
574  // Continuation of recover().
575  // Made public for testing purposes.
576  process::Future<Nothing> _recover(const Registry& registry);
577 
578  MasterInfo info() const
579  {
580  return info_;
581  }
582 
583 protected:
584  void initialize() override;
585  void finalize() override;
586 
587  void consume(process::MessageEvent&& event) override;
588  void consume(process::ExitedEvent&& event) override;
589 
590  void exited(const process::UPID& pid) override;
591  void exited(const FrameworkID& frameworkId, const HttpConnection& http);
592  void _exited(Framework* framework);
593 
594  // Invoked upon noticing a subscriber disconnection.
595  void exited(const id::UUID& id);
596 
597  void agentReregisterTimeout(const SlaveID& slaveId);
598  Nothing _agentReregisterTimeout(const SlaveID& slaveId);
599 
600  // Invoked when the message is ready to be executed after
601  // being throttled.
602  // 'principal' being None indicates it is throttled by
603  // 'defaultLimiter'.
604  void throttled(
605  process::MessageEvent&& event,
606  const Option<std::string>& principal);
607 
608  // Continuations of consume().
609  void _consume(process::MessageEvent&& event);
610  void _consume(process::ExitedEvent&& event);
611 
612  // Helper method invoked when the capacity for a framework
613  // principal is exceeded.
614  void exceededCapacity(
615  const process::MessageEvent& event,
616  const Option<std::string>& principal,
617  uint64_t capacity);
618 
619  // Recovers state from the registrar.
621  void recoveredSlavesTimeout(const Registry& registry);
622 
623  void _registerSlave(
624  const process::UPID& pid,
625  RegisterSlaveMessage&& registerSlaveMessage,
627  const process::Future<bool>& authorized);
628 
629  void __registerSlave(
630  const process::UPID& pid,
631  RegisterSlaveMessage&& registerSlaveMessage,
632  const process::Future<bool>& admit);
633 
634  void _reregisterSlave(
635  const process::UPID& pid,
636  ReregisterSlaveMessage&& incomingMessage,
638  const process::Future<bool>& authorized);
639 
640  void __reregisterSlave(
641  const process::UPID& pid,
642  ReregisterSlaveMessage&& incomingMessage,
643  const process::Future<bool>& readmit);
644 
645  void ___reregisterSlave(
646  const process::UPID& pid,
647  ReregisterSlaveMessage&& incomingMessage,
648  const process::Future<bool>& updated);
649 
650  void updateSlaveFrameworks(
651  Slave* slave,
652  const std::vector<FrameworkInfo>& frameworks);
653 
654  // 'future' is the future returned by the authenticator.
655  void _authenticate(
656  const process::UPID& pid,
657  const process::Future<Option<std::string>>& future);
658 
659  void authenticationTimeout(process::Future<Option<std::string>> future);
660 
661  void fileAttached(const process::Future<Nothing>& result,
662  const std::string& path);
663 
664  // Invoked when the contender has entered the contest.
665  void contended(const process::Future<process::Future<Nothing>>& candidacy);
666 
667  // When a slave that was previously registered with this master
668  // reregisters, we need to reconcile the master's view of the
669  // slave's tasks and executors. This function also sends the
670  // `SlaveReregisteredMessage`.
671  void reconcileKnownSlave(
672  Slave* slave,
673  const std::vector<ExecutorInfo>& executors,
674  const std::vector<Task>& tasks);
675 
676  // Add a framework.
677  void addFramework(
678  Framework* framework,
679  const std::set<std::string>& suppressedRoles);
680 
681  // Recover a framework from its `FrameworkInfo`. This happens after
682  // master failover, when an agent running one of the framework's
683  // tasks reregisters or when the framework itself reregisters,
684  // whichever happens first. The result of this function is a
685  // registered, inactive framework with state `RECOVERED`.
686  void recoverFramework(
687  const FrameworkInfo& info,
688  const std::set<std::string>& suppressedRoles);
689 
690  // Transition a framework from `RECOVERED` to `CONNECTED` state and
691  // activate it. This happens at most once after master failover, the
692  // first time that the framework reregisters with the new master.
693  // Exactly one of `newPid` or `http` must be provided.
694  Try<Nothing> activateRecoveredFramework(
695  Framework* framework,
696  const FrameworkInfo& frameworkInfo,
697  const Option<process::UPID>& pid,
698  const Option<HttpConnection>& http,
699  const std::set<std::string>& suppressedRoles);
700 
701  // Replace the scheduler for a framework with a new process ID, in
702  // the event of a scheduler failover.
703  void failoverFramework(Framework* framework, const process::UPID& newPid);
704 
705  // Replace the scheduler for a framework with a new HTTP connection,
706  // in the event of a scheduler failover.
707  void failoverFramework(Framework* framework, const HttpConnection& http);
708 
709  void _failoverFramework(Framework* framework);
710 
711  // Kill all of a framework's tasks, delete the framework object, and
712  // reschedule offers that were assigned to this framework.
713  void removeFramework(Framework* framework);
714 
715  // Remove a framework from the slave, i.e., remove its tasks and
716  // executors and recover the resources.
717  void removeFramework(Slave* slave, Framework* framework);
718 
719  void updateFramework(
720  Framework* framework,
721  const FrameworkInfo& frameworkInfo,
722  const std::set<std::string>& suppressedRoles);
723 
724  void disconnect(Framework* framework);
725  void deactivate(Framework* framework, bool rescind);
726 
727  void disconnect(Slave* slave);
728  void deactivate(Slave* slave);
729 
730  // Add a slave.
731  void addSlave(
732  Slave* slave,
733  std::vector<Archive::Framework>&& completedFrameworks);
734 
735  void _markUnreachable(
736  const SlaveInfo& slave,
737  const TimeInfo& unreachableTime,
738  bool duringMasterFailover,
739  const std::string& message,
740  bool registrarResult);
741 
742  void sendSlaveLost(const SlaveInfo& slaveInfo);
743 
744  // Remove the slave from the registrar and from the master's state.
745  //
746  // TODO(bmahler): 'reason' is optional until MESOS-2317 is resolved.
747  void removeSlave(
748  Slave* slave,
749  const std::string& message,
751 
752  void _removeSlave(
753  Slave* slave,
754  const process::Future<bool>& registrarResult,
755  const std::string& removalCause,
757 
758  void __removeSlave(
759  Slave* slave,
760  const std::string& message,
761  const Option<TimeInfo>& unreachableTime);
762 
763  // Validates that the framework is authenticated, if required.
764  Option<Error> validateFrameworkAuthentication(
765  const FrameworkInfo& frameworkInfo,
766  const process::UPID& from);
767 
768  // Returns whether the framework is authorized.
769  // Returns failure for transient authorization failures.
770  process::Future<bool> authorizeFramework(
771  const FrameworkInfo& frameworkInfo);
772 
773  // Returns whether the principal is authorized to (re-)register an agent
774  // and whether the `SlaveInfo` is authorized.
775  process::Future<bool> authorizeSlave(
776  const SlaveInfo& slaveInfo,
778 
779  // Returns whether the task is authorized.
780  // Returns failure for transient authorization failures.
781  process::Future<bool> authorizeTask(
782  const TaskInfo& task,
783  Framework* framework);
784 
802  process::Future<bool> authorizeReserveResources(
803  const Offer::Operation::Reserve& reserve,
805 
806  // Authorizes whether the provided `principal` is allowed to reserve
807  // the specified `resources`.
808  process::Future<bool> authorizeReserveResources(
809  const Resources& resources,
811 
829  process::Future<bool> authorizeUnreserveResources(
830  const Offer::Operation::Unreserve& unreserve,
832 
850  process::Future<bool> authorizeCreateVolume(
851  const Offer::Operation::Create& create,
853 
871  process::Future<bool> authorizeDestroyVolume(
872  const Offer::Operation::Destroy& destroy,
874 
893  process::Future<bool> authorizeResizeVolume(
894  const Resource& volume,
896 
897 
914  process::Future<bool> authorizeCreateDisk(
915  const Offer::Operation::CreateDisk& createDisk,
917 
918 
935  process::Future<bool> authorizeDestroyDisk(
936  const Offer::Operation::DestroyDisk& destroyDisk,
938 
939 
940  // Determine if a new executor needs to be launched.
941  bool isLaunchExecutor (
942  const ExecutorID& executorId,
943  Framework* framework,
944  Slave* slave) const;
945 
946  // Add executor to the framework and slave.
947  void addExecutor(
948  const ExecutorInfo& executorInfo,
949  Framework* framework,
950  Slave* slave);
951 
952  // Add task to the framework and slave.
953  void addTask(const TaskInfo& task, Framework* framework, Slave* slave);
954 
955  // Transitions the task, and recovers resources if the task becomes
956  // terminal.
957  void updateTask(Task* task, const StatusUpdate& update);
958 
959  // Removes the task. `unreachable` indicates whether the task is removed due
960  // to being unreachable. Note that we cannot rely on the task state because
961  // it may not reflect unreachability due to being set to TASK_LOST for
962  // backwards compatibility.
963  void removeTask(Task* task, bool unreachable = false);
964 
965  // Remove an executor and recover its resources.
966  void removeExecutor(
967  Slave* slave,
968  const FrameworkID& frameworkId,
969  const ExecutorID& executorId);
970 
971  // Adds the given operation to the framework and the agent.
972  void addOperation(
973  Framework* framework,
974  Slave* slave,
975  Operation* operation);
976 
977  // Transitions the operation, and updates and recovers resources if
978  // the operation becomes terminal. If `convertResources` is `false`
979  // only the consumed resources of terminal operations are recovered,
980  // but no resources are converted.
981  void updateOperation(
982  Operation* operation,
983  const UpdateOperationStatusMessage& update,
984  bool convertResources = true);
985 
986  // Remove the operation.
987  void removeOperation(Operation* operation);
988 
989  // Attempts to update the allocator by applying the given operation.
990  // If successful, updates the slave's resources, sends a
991  // 'CheckpointResourcesMessage' to the slave with the updated
992  // checkpointed resources, and returns a 'Future' with 'Nothing'.
993  // Otherwise, no action is taken and returns a failed 'Future'.
995  Slave* slave,
996  const Offer::Operation& operation);
997 
998  // Forwards the update to the framework.
999  void forward(
1000  const StatusUpdate& update,
1001  const process::UPID& acknowledgee,
1002  Framework* framework);
1003 
1004  // Remove an offer after specified timeout
1005  void offerTimeout(const OfferID& offerId);
1006 
1007  // Remove an offer and optionally rescind the offer as well.
1008  void removeOffer(Offer* offer, bool rescind = false);
1009 
1010  // Remove an inverse offer after specified timeout
1011  void inverseOfferTimeout(const OfferID& inverseOfferId);
1012 
1013  // Remove an inverse offer and optionally rescind it as well.
1014  void removeInverseOffer(InverseOffer* inverseOffer, bool rescind = false);
1015 
1016  bool isCompletedFramework(const FrameworkID& frameworkId);
1017 
1018  Framework* getFramework(const FrameworkID& frameworkId) const;
1019  Offer* getOffer(const OfferID& offerId) const;
1020  InverseOffer* getInverseOffer(const OfferID& inverseOfferId) const;
1021 
1022  FrameworkID newFrameworkId();
1023  OfferID newOfferId();
1024  SlaveID newSlaveId();
1025 
1026 private:
1027  // Updates the agent's resources by applying the given operation.
1028  // Sends either `ApplyOperationMessage` or
1029  // `CheckpointResourcesMessage` (with updated checkpointed
1030  // resources) to the agent depending on if the agent has
1031  // `RESOURCE_PROVIDER` capability.
1032  void _apply(
1033  Slave* slave,
1034  Framework* framework,
1035  const Offer::Operation& operationInfo);
1036 
1037  void drop(
1038  const process::UPID& from,
1039  const scheduler::Call& call,
1040  const std::string& message);
1041 
1042  void drop(
1043  Framework* framework,
1044  const Offer::Operation& operation,
1045  const std::string& message);
1046 
1047  void drop(
1048  Framework* framework,
1049  const scheduler::Call& call,
1050  const std::string& message);
1051 
1052  void drop(
1053  Framework* framework,
1054  const scheduler::Call::Suppress& suppress,
1055  const std::string& message);
1056 
1057  void drop(
1058  Framework* framework,
1059  const scheduler::Call::Revive& revive,
1060  const std::string& message);
1061 
1062  // Call handlers.
1063  void receive(
1064  const process::UPID& from,
1065  scheduler::Call&& call);
1066 
1067  void subscribe(
1068  HttpConnection http,
1069  const scheduler::Call::Subscribe& subscribe);
1070 
1071  void _subscribe(
1072  HttpConnection http,
1073  const FrameworkInfo& frameworkInfo,
1074  bool force,
1075  const std::set<std::string>& suppressedRoles,
1076  const process::Future<bool>& authorized);
1077 
1078  void subscribe(
1079  const process::UPID& from,
1080  const scheduler::Call::Subscribe& subscribe);
1081 
1082  void _subscribe(
1083  const process::UPID& from,
1084  const FrameworkInfo& frameworkInfo,
1085  bool force,
1086  const std::set<std::string>& suppressedRoles,
1087  const process::Future<bool>& authorized);
1088 
1089  // Subscribes a client to the 'api/vX' endpoint.
1090  void subscribe(
1091  const HttpConnection& http,
1093 
1094  void teardown(Framework* framework);
1095 
1096  void accept(
1097  Framework* framework,
1098  scheduler::Call::Accept&& accept);
1099 
1100  void _accept(
1101  const FrameworkID& frameworkId,
1102  const SlaveID& slaveId,
1103  const Resources& offeredResources,
1104  scheduler::Call::Accept&& accept,
1105  const process::Future<
1106  std::vector<process::Future<bool>>>& authorizations);
1107 
1108  void acceptInverseOffers(
1109  Framework* framework,
1110  const scheduler::Call::AcceptInverseOffers& accept);
1111 
1112  void decline(
1113  Framework* framework,
1114  scheduler::Call::Decline&& decline);
1115 
1116  void declineInverseOffers(
1117  Framework* framework,
1118  const scheduler::Call::DeclineInverseOffers& decline);
1119 
1120  void revive(
1121  Framework* framework,
1122  const scheduler::Call::Revive& revive);
1123 
1124  void kill(
1125  Framework* framework,
1126  const scheduler::Call::Kill& kill);
1127 
1128  void shutdown(
1129  Framework* framework,
1130  const scheduler::Call::Shutdown& shutdown);
1131 
1132  void acknowledge(
1133  Framework* framework,
1134  scheduler::Call::Acknowledge&& acknowledge);
1135 
1136  void acknowledgeOperationStatus(
1137  Framework* framework,
1138  scheduler::Call::AcknowledgeOperationStatus&& acknowledge);
1139 
1140  void reconcile(
1141  Framework* framework,
1142  scheduler::Call::Reconcile&& reconcile);
1143 
1144  scheduler::Response::ReconcileOperations reconcileOperations(
1145  Framework* framework,
1146  const scheduler::Call::ReconcileOperations& reconcile);
1147 
1148  void message(
1149  Framework* framework,
1150  scheduler::Call::Message&& message);
1151 
1152  void request(
1153  Framework* framework,
1154  const scheduler::Call::Request& request);
1155 
1156  void suppress(
1157  Framework* framework,
1158  const scheduler::Call::Suppress& suppress);
1159 
1160  bool elected() const
1161  {
1162  return leader.isSome() && leader.get() == info_;
1163  }
1164 
1165  void scheduleRegistryGc();
1166 
1167  void doRegistryGc();
1168 
1169  void _doRegistryGc(
1170  const hashset<SlaveID>& toRemoveUnreachable,
1171  const hashset<SlaveID>& toRemoveGone,
1172  const process::Future<bool>& registrarResult);
1173 
1174  process::Future<bool> authorizeLogAccess(
1176 
1184  bool isWhitelistedRole(const std::string& name) const;
1185 
1193  class QuotaHandler
1194  {
1195  public:
1196  explicit QuotaHandler(Master* _master) : master(_master)
1197  {
1198  CHECK_NOTNULL(master);
1199  }
1200 
1201  // Returns a list of set quotas.
1203  const mesos::master::Call& call,
1205  ContentType contentType) const;
1206 
1208  const process::http::Request& request,
1210  principal) const;
1211 
1213  const mesos::master::Call& call,
1215  principal) const;
1216 
1220  principal) const;
1221 
1223  const mesos::master::Call& call,
1225  principal) const;
1226 
1230  principal) const;
1231 
1232  private:
1233  // Heuristically tries to determine whether a quota request could
1234  // reasonably be satisfied given the current cluster capacity. The
1235  // goal is to determine whether a user may accidentally request an
1236  // amount of resources that would prevent frameworks without quota
1237  // from getting any offers. A force flag will allow users to bypass
1238  // this check.
1239  //
1240  // The heuristic tests whether the total quota, including the new
1241  // request, does not exceed the sum of non-static cluster resources,
1242  // i.e. the following inequality holds:
1243  // total - statically reserved >= total quota + quota request
1244  //
1245  // Please be advised that:
1246  // * It is up to an allocator how to satisfy quota (for example,
1247  // what resources to account towards quota, as well as which
1248  // resources to consider allocatable for quota).
1249  // * Even if there are enough resources at the moment of this check,
1250  // agents may terminate at any time, rendering the cluster under
1251  // quota.
1252  Option<Error> capacityHeuristic(
1253  const mesos::quota::QuotaInfo& request) const;
1254 
1255  // We always want to rescind offers after the capacity heuristic. The
1256  // reason for this is the race between the allocator and the master:
1257  // it can happen that there are not enough free resources at the
1258  // allocator's disposal when it is notified about the quota request,
1259  // but at this point it's too late to rescind.
1260  //
1261  // While rescinding, we adhere to the following rules:
1262  // * Rescind at least as many resources as there are in the quota request.
1263  // * Rescind all offers from an agent in order to make the potential
1264  // offer bigger, which increases the chances that a quota'ed framework
1265  // will be able to use the offer.
1266  // * Rescind offers from at least `numF` agents to make it possible
1267  // (but not guaranteed, due to fair sharing) that each framework in
1268  // the role for which quota is set gets an offer (`numF` is the
1269  // number of frameworks in the quota'ed role). Though this is not
1270  // strictly necessary, we think this will increase the debugability
1271  // and will improve user experience.
1272  //
1273  // TODO(alexr): Consider removing this function once offer management
1274  // (including rescinding) is moved to allocator.
1275  void rescindOffers(const mesos::quota::QuotaInfo& request) const;
1276 
1277  process::Future<bool> authorizeGetQuota(
1279  const mesos::quota::QuotaInfo& quotaInfo) const;
1280 
1281  process::Future<bool> authorizeUpdateQuota(
1283  const mesos::quota::QuotaInfo& quotaInfo) const;
1284 
1287  principal) const;
1288 
1290  const mesos::quota::QuotaRequest& quotaRequest,
1292  principal) const;
1293 
1295  const mesos::quota::QuotaInfo& quotaInfo,
1296  bool forced) const;
1297 
1299  const std::string& role,
1301  principal) const;
1302 
1304  const std::string& role) const;
1305 
1306  // To perform actions related to quota management, we require access to the
1307  // master data structures. No synchronization primitives are needed here
1308  // since `QuotaHandler`'s functions are invoked in the Master's actor.
1309  Master* master;
1310  };
1311 
1319  class WeightsHandler
1320  {
1321  public:
1322  explicit WeightsHandler(Master* _master) : master(_master)
1323  {
1324  CHECK_NOTNULL(master);
1325  }
1326 
1330  principal) const;
1331 
1333  const mesos::master::Call& call,
1335  ContentType contentType) const;
1336 
1338  const process::http::Request& request,
1340  principal) const;
1341 
1343  const mesos::master::Call& call,
1345  ContentType contentType) const;
1346 
1347  private:
1348  process::Future<bool> authorizeGetWeight(
1350  const WeightInfo& weight) const;
1351 
1352  process::Future<bool> authorizeUpdateWeights(
1354  const std::vector<std::string>& roles) const;
1355 
1357  const std::vector<WeightInfo>& weightInfos,
1358  const std::vector<bool>& roleAuthorizations) const;
1359 
1362  principal) const;
1363 
1366  const google::protobuf::RepeatedPtrField<WeightInfo>& weightInfos)
1367  const;
1368 
1370  const std::vector<WeightInfo>& weightInfos) const;
1371 
1372  // Rescind all outstanding offers if any of the 'weightInfos' roles has
1373  // an active framework.
1374  void rescindOffers(const std::vector<WeightInfo>& weightInfos) const;
1375 
1376  Master* master;
1377  };
1378 
1379  // Inner class used to namespace HTTP route handlers (see
1380  // master/http.cpp for implementations).
1381  class Http
1382  {
1383  public:
1384  explicit Http(Master* _master) : master(_master),
1385  quotaHandler(_master),
1386  weightsHandler(_master) {}
1387 
1388  // /api/v1
1390  const process::http::Request& request,
1392  principal) const;
1393 
1394  // /api/v1/scheduler
1396  const process::http::Request& request,
1398  principal) const;
1399 
1400  // /master/create-volumes
1402  const process::http::Request& request,
1404  principal) const;
1405 
1406  // /master/destroy-volumes
1408  const process::http::Request& request,
1410  principal) const;
1411 
1412  // /master/flags
1414  const process::http::Request& request,
1416  principal) const;
1417 
1418  // /master/frameworks
1420  const process::http::Request& request,
1422  principal) const;
1423 
1424  // /master/health
1426  const process::http::Request& request) const;
1427 
1428  // /master/redirect
1430  const process::http::Request& request) const;
1431 
1432  // /master/reserve
1434  const process::http::Request& request,
1436  principal) const;
1437 
1438  // /master/roles
1440  const process::http::Request& request,
1442  principal) const;
1443 
1444  // /master/teardown
1446  const process::http::Request& request,
1448  principal) const;
1449 
1450  // /master/slaves
1452  const process::http::Request& request,
1454  principal) const;
1455 
1456  // /master/state
1458  const process::http::Request& request,
1460  principal) const;
1461 
1462  // /master/state-summary
1464  const process::http::Request& request,
1466  principal) const;
1467 
1468  // /master/tasks
1470  const process::http::Request& request,
1472  principal) const;
1473 
1474  // /master/maintenance/schedule
1475  process::Future<process::http::Response> maintenanceSchedule(
1476  const process::http::Request& request,
1478  principal) const;
1479 
1480  // /master/maintenance/status
1481  process::Future<process::http::Response> maintenanceStatus(
1482  const process::http::Request& request,
1484  principal) const;
1485 
1486  // /master/machine/down
1488  const process::http::Request& request,
1490  principal) const;
1491 
1492  // /master/machine/up
1494  const process::http::Request& request,
1496  principal) const;
1497 
1498  // /master/unreserve
1500  const process::http::Request& request,
1502  principal) const;
1503 
1504  // /master/quota
1506  const process::http::Request& request,
1508  principal) const;
1509 
1510  // /master/weights
1512  const process::http::Request& request,
1514  principal) const;
1515 
1516  static std::string API_HELP();
1517  static std::string SCHEDULER_HELP();
1518  static std::string FLAGS_HELP();
1519  static std::string FRAMEWORKS_HELP();
1520  static std::string HEALTH_HELP();
1521  static std::string REDIRECT_HELP();
1522  static std::string ROLES_HELP();
1523  static std::string TEARDOWN_HELP();
1524  static std::string SLAVES_HELP();
1525  static std::string STATE_HELP();
1526  static std::string STATESUMMARY_HELP();
1527  static std::string TASKS_HELP();
1528  static std::string MAINTENANCE_SCHEDULE_HELP();
1529  static std::string MAINTENANCE_STATUS_HELP();
1530  static std::string MACHINE_DOWN_HELP();
1531  static std::string MACHINE_UP_HELP();
1532  static std::string CREATE_VOLUMES_HELP();
1533  static std::string DESTROY_VOLUMES_HELP();
1534  static std::string RESERVE_HELP();
1535  static std::string UNRESERVE_HELP();
1536  static std::string QUOTA_HELP();
1537  static std::string WEIGHTS_HELP();
1538 
1539  private:
1540  JSON::Object __flags() const;
1541 
1542  class FlagsError; // Forward declaration.
1543 
1546  principal) const;
1547 
1549  const size_t limit,
1550  const size_t offset,
1551  const std::string& order,
1553  principal) const;
1554 
1556  const FrameworkID& id,
1558  principal) const;
1559 
1561  const FrameworkID& id) const;
1562 
1563  process::Future<process::http::Response> _updateMaintenanceSchedule(
1564  const mesos::maintenance::Schedule& schedule,
1566  principal) const;
1567 
1568  process::Future<process::http::Response> __updateMaintenanceSchedule(
1569  const mesos::maintenance::Schedule& schedule,
1570  const process::Owned<ObjectApprovers>& approvers) const;
1571 
1572  process::Future<process::http::Response> ___updateMaintenanceSchedule(
1573  const mesos::maintenance::Schedule& schedule,
1574  bool applied) const;
1575 
1576  mesos::maintenance::Schedule _getMaintenanceSchedule(
1577  const process::Owned<ObjectApprovers>& approvers) const;
1578 
1580  const process::Owned<ObjectApprovers>& approvers) const;
1581 
1582  process::Future<process::http::Response> _startMaintenance(
1583  const google::protobuf::RepeatedPtrField<MachineID>& machineIds,
1584  const process::Owned<ObjectApprovers>& approvers) const;
1585 
1587  const google::protobuf::RepeatedPtrField<MachineID>& machineIds,
1588  const process::Owned<ObjectApprovers>& approvers) const;
1589 
1591  const SlaveID& slaveId,
1592  const google::protobuf::RepeatedPtrField<Resource>& resources,
1594  principal) const;
1595 
1597  const SlaveID& slaveId,
1598  const google::protobuf::RepeatedPtrField<Resource>& resources,
1600  principal) const;
1601 
1603  const SlaveID& slaveId,
1604  const google::protobuf::RepeatedPtrField<Resource>& volumes,
1606  principal) const;
1607 
1609  const SlaveID& slaveId,
1610  const google::protobuf::RepeatedPtrField<Resource>& volumes,
1612  principal) const;
1613 
1634  const SlaveID& slaveId,
1635  Resources required,
1636  const Offer::Operation& operation) const;
1637 
1640  principal) const;
1641 
1642  // Master API handlers.
1643 
1645  const mesos::master::Call& call,
1647  ContentType contentType) const;
1648 
1649  mesos::master::Response::GetAgents _getAgents(
1650  const process::Owned<ObjectApprovers>& approvers) const;
1651 
1653  const mesos::master::Call& call,
1655  ContentType contentType) const;
1656 
1658  const mesos::master::Call& call,
1660  ContentType contentType) const;
1661 
1663  const mesos::master::Call& call,
1665  ContentType contentType) const;
1666 
1668  const mesos::master::Call& call,
1670  ContentType contentType) const;
1671 
1673  const mesos::master::Call& call,
1675  ContentType contentType) const;
1676 
1678  const mesos::master::Call& call,
1680  ContentType contentType) const;
1681 
1683  const mesos::master::Call& call,
1685  ContentType contentType) const;
1686 
1688  const mesos::master::Call& call,
1690  ContentType contentType) const;
1691 
1693  const mesos::master::Call& call,
1695  ContentType contentType) const;
1696 
1697  process::Future<process::http::Response> updateMaintenanceSchedule(
1698  const mesos::master::Call& call,
1700  ContentType contentType) const;
1701 
1702  process::Future<process::http::Response> getMaintenanceSchedule(
1703  const mesos::master::Call& call,
1705  ContentType contentType) const;
1706 
1707  process::Future<process::http::Response> getMaintenanceStatus(
1708  const mesos::master::Call& call,
1710  ContentType contentType) const;
1711 
1713  const mesos::master::Call& call,
1715  ContentType contentType) const;
1716 
1718  const mesos::master::Call& call,
1720  ContentType contentType) const;
1721 
1723  const mesos::master::Call& call,
1725  ContentType contentType) const;
1726 
1728  const mesos::master::Call& call,
1730  ContentType contentType) const;
1731 
1732  mesos::master::Response::GetTasks _getTasks(
1733  const process::Owned<ObjectApprovers>& approvers) const;
1734 
1736  const mesos::master::Call& call,
1738  ContentType contentType) const;
1739 
1741  const mesos::master::Call& call,
1743  ContentType contentType) const;
1744 
1746  const mesos::master::Call& call,
1748  ContentType contentType) const;
1749 
1751  const mesos::master::Call& call,
1753  ContentType contentType) const;
1754 
1756  const mesos::master::Call& call,
1758  ContentType contentType) const;
1759 
1760  process::Future<process::http::Response> unreserveResources(
1761  const mesos::master::Call& call,
1763  ContentType contentType) const;
1764 
1766  const mesos::master::Call& call,
1768  ContentType contentType) const;
1769 
1770  mesos::master::Response::GetFrameworks _getFrameworks(
1771  const process::Owned<ObjectApprovers>& approvers) const;
1772 
1774  const mesos::master::Call& call,
1776  ContentType contentType) const;
1777 
1778  mesos::master::Response::GetExecutors _getExecutors(
1779  const process::Owned<ObjectApprovers>& approvers) const;
1780 
1782  const mesos::master::Call& call,
1784  ContentType contentType) const;
1785 
1786  mesos::master::Response::GetState _getState(
1787  const process::Owned<ObjectApprovers>& approvers) const;
1788 
1790  const mesos::master::Call& call,
1792  ContentType contentType) const;
1793 
1795  const mesos::master::Call& call,
1797  ContentType contentType) const;
1798 
1800  const mesos::master::Call& call,
1802  ContentType contentType) const;
1803 
1805  const mesos::master::Call& call,
1807  ContentType contentType) const;
1808 
1810  const SlaveID& slaveId) const;
1811 
1812  process::Future<process::http::Response> reconcileOperations(
1813  Framework* framework,
1814  const scheduler::Call::ReconcileOperations& call,
1815  ContentType contentType) const;
1816 
1817  Master* master;
1818 
1819  // NOTE: The quota specific pieces of the Operator API are factored
1820  // out into this separate class.
1821  QuotaHandler quotaHandler;
1822 
1823  // NOTE: The weights specific pieces of the Operator API are factored
1824  // out into this separate class.
1825  WeightsHandler weightsHandler;
1826  };
1827 
1828  Master(const Master&); // No copying.
1829  Master& operator=(const Master&); // No assigning.
1830 
1831  friend struct Framework;
1832  friend struct Metrics;
1833  friend struct Slave;
1834  friend struct SlavesWriter;
1835  friend struct Subscriber;
1836 
1837  // NOTE: Since 'getOffer', 'getInverseOffer' and 'slaves' are
1838  // protected, we need to make the following functions friends.
1839  friend Offer* validation::offer::getOffer(
1840  Master* master, const OfferID& offerId);
1841 
1842  friend InverseOffer* validation::offer::getInverseOffer(
1843  Master* master, const OfferID& offerId);
1844 
1846  Master* master, const SlaveID& slaveId);
1847 
1848  const Flags flags;
1849 
1850  Http http;
1851 
1852  Option<MasterInfo> leader; // Current leading master.
1853 
1854  mesos::allocator::Allocator* allocator;
1855  WhitelistWatcher* whitelistWatcher;
1856  Registrar* registrar;
1857  Files* files;
1858 
1861 
1862  const Option<Authorizer*> authorizer;
1863 
1864  MasterInfo info_;
1865 
1866  // Holds some info which affects how a machine behaves, as well as state that
1867  // represent the master's view of this machine. See the `MachineInfo` protobuf
1868  // and `Machine` struct for more information.
1870 
1871  struct Maintenance
1872  {
1873  // Holds the maintenance schedule, as given by the operator.
1874  std::list<mesos::maintenance::Schedule> schedules;
1875  } maintenance;
1876 
1877  // Indicates when recovery is complete. Recovery begins once the
1878  // master is elected as a leader.
1880 
1881  // If this is the leading master, we periodically check whether we
1882  // should GC some information from the registry.
1883  Option<process::Timer> registryGcTimer;
1884 
1885  struct Slaves
1886  {
1887  Slaves() : removed(MAX_REMOVED_SLAVES) {}
1888 
1889  // Imposes a time limit for slaves that we recover from the
1890  // registry to reregister with the master.
1891  Option<process::Timer> recoveredTimer;
1892 
1893  // Slaves that have been recovered from the registrar after master
1894  // failover. Slaves are removed from this collection when they
1895  // either reregister with the master or are marked unreachable
1896  // because they do not reregister before `recoveredTimer` fires.
1897  // We must not answer questions related to these slaves (e.g.,
1898  // during task reconciliation) until we determine their fate
1899  // because their are in this transitioning state.
1900  hashmap<SlaveID, SlaveInfo> recovered;
1901 
1902  // Agents that are in the process of (re-)registering. They are
1903  // maintained here while the (re-)registration is in progress and
1904  // possibly pending in the authorizer or the registrar in order
1905  // to help deduplicate (re-)registration requests.
1906  hashset<process::UPID> registering;
1907  hashset<SlaveID> reregistering;
1908 
1909  // Registered slaves are indexed by SlaveID and UPID. Note that
1910  // iteration is supported but is exposed as iteration over a
1911  // hashmap<SlaveID, Slave*> since it is tedious to convert
1912  // the map's key/value iterator into a value iterator.
1913  //
1914  // TODO(bmahler): Consider pulling in boost's multi_index,
1915  // or creating a simpler indexing abstraction in stout.
1916  struct
1917  {
1918  bool contains(const SlaveID& slaveId) const
1919  {
1920  return ids.contains(slaveId);
1921  }
1922 
1923  bool contains(const process::UPID& pid) const
1924  {
1925  return pids.contains(pid);
1926  }
1927 
1928  Slave* get(const SlaveID& slaveId) const
1929  {
1930  return ids.get(slaveId).getOrElse(nullptr);
1931  }
1932 
1933  Slave* get(const process::UPID& pid) const
1934  {
1935  return pids.get(pid).getOrElse(nullptr);
1936  }
1937 
1938  void put(Slave* slave)
1939  {
1940  CHECK_NOTNULL(slave);
1941  ids[slave->id] = slave;
1942  pids[slave->pid] = slave;
1943  }
1944 
1945  void remove(Slave* slave)
1946  {
1947  CHECK_NOTNULL(slave);
1948  ids.erase(slave->id);
1949  pids.erase(slave->pid);
1950  }
1951 
1952  void clear()
1953  {
1954  ids.clear();
1955  pids.clear();
1956  }
1957 
1958  size_t size() const { return ids.size(); }
1959 
1960  typedef hashmap<SlaveID, Slave*>::iterator iterator;
1961  typedef hashmap<SlaveID, Slave*>::const_iterator const_iterator;
1962 
1963  iterator begin() { return ids.begin(); }
1964  iterator end() { return ids.end(); }
1965 
1966  const_iterator begin() const { return ids.begin(); }
1967  const_iterator end() const { return ids.end(); }
1968 
1969  private:
1972  } registered;
1973 
1974  // Slaves that are in the process of being removed from the
1975  // registrar.
1976  hashset<SlaveID> removing;
1977 
1978  // Slaves that are in the process of being marked unreachable.
1979  hashset<SlaveID> markingUnreachable;
1980 
1981  // Slaves that are in the process of being marked gone.
1982  hashset<SlaveID> markingGone;
1983 
1984  // This collection includes agents that have gracefully shutdown,
1985  // as well as those that have been marked unreachable or gone. We
1986  // keep a cache here to prevent this from growing in an unbounded
1987  // manner.
1988  //
1989  // TODO(bmahler): Ideally we could use a cache with set semantics.
1990  //
1991  // TODO(neilc): Consider storing all agent IDs that have been
1992  // marked unreachable by this master.
1994 
1995  // Slaves that have been marked unreachable. We recover this from
1996  // the registry, so it includes slaves marked as unreachable by
1997  // other instances of the master. Note that we use a LinkedHashMap
1998  // to ensure the order of elements here matches the order in the
1999  // registry's unreachable list, which matches the order in which
2000  // agents are marked unreachable. This list is garbage collected;
2001  // GC behavior is governed by the `registry_gc_interval`,
2002  // `registry_max_agent_age`, and `registry_max_agent_count` flags.
2004 
2005  // This helps us look up all unreachable tasks on an agent so we can remove
2006  // them from their primary storage `framework.unreachableTasks` when an
2007  // agent reregisters. This map is bounded by the same GC behavior as
2008  // `unreachable`. When the agent is GC'd from unreachable it's also
2009  // erased from `unreachableTasks`.
2011 
2012  // Slaves that have been marked gone. We recover this from the
2013  // registry, so it includes slaves marked as gone by other instances
2014  // of the master. Note that we use a LinkedHashMap to ensure the order
2015  // of elements here matches the order in the registry's gone list, which
2016  // matches the order in which agents are marked gone.
2018 
2019  // This rate limiter is used to limit the removal of slaves failing
2020  // health checks.
2021  // NOTE: Using a 'shared_ptr' here is OK because 'RateLimiter' is
2022  // a wrapper around libprocess process which is thread safe.
2024  } slaves;
2025 
2026  struct Frameworks
2027  {
2028  Frameworks(const Flags& masterFlags)
2029  : completed(masterFlags.max_completed_frameworks) {}
2030 
2032 
2034 
2035  // Principals of frameworks keyed by PID.
2036  // NOTE: Multiple PIDs can map to the same principal. The
2037  // principal is None when the framework doesn't specify it.
2038  // The differences between this map and 'authenticated' are:
2039  // 1) This map only includes *registered* frameworks. The mapping
2040  // is added when a framework (re-)registers.
2041  // 2) This map includes unauthenticated frameworks (when Master
2042  // allows them) if they have principals specified in
2043  // FrameworkInfo.
2045 
2046  // BoundedRateLimiters keyed by the framework principal.
2047  // Like Metrics::Frameworks, all frameworks of the same principal
2048  // are throttled together at a common rate limit.
2050 
2051  // The default limiter is for frameworks not specified in
2052  // 'flags.rate_limits'.
2054  } frameworks;
2055 
2056  struct Subscribers
2057  {
2058  Subscribers(Master* _master) : master(_master) {};
2059 
2060  // Represents a client subscribed to the 'api/vX' endpoint.
2061  //
2062  // TODO(anand): Add support for filtering. Some subscribers
2063  // might only be interested in a subset of events.
2064  struct Subscriber
2065  {
2067  const HttpConnection& _http,
2069  : http(_http),
2070  principal(_principal)
2071  {
2072  mesos::master::Event event;
2073  event.set_type(mesos::master::Event::HEARTBEAT);
2074 
2075  heartbeater =
2078  "subscriber " + stringify(http.streamId),
2079  event,
2080  http,
2083 
2084  process::spawn(heartbeater.get());
2085  }
2086 
2087  // Not copyable, not assignable.
2088  Subscriber(const Subscriber&) = delete;
2089  Subscriber& operator=(const Subscriber&) = delete;
2090 
2091  // TODO(greggomann): Refactor this function into multiple event-specific
2092  // overloads. See MESOS-8475.
2093  void send(
2095  const process::Owned<ObjectApprovers>& approvers,
2096  const process::Shared<FrameworkInfo>& frameworkInfo,
2097  const process::Shared<Task>& task);
2098 
2100  {
2101  // TODO(anand): Refactor `HttpConnection` to being a RAII class instead.
2102  // It is possible that a caller might accidentally invoke `close()`
2103  // after passing ownership to the `Subscriber` object. See MESOS-5843
2104  // for more details.
2105  http.close();
2106 
2107  terminate(heartbeater.get());
2108  wait(heartbeater.get());
2109  }
2110 
2115  };
2116 
2117  // Sends the event to all subscribers connected to the 'api/vX' endpoint.
2118  void send(
2119  mesos::master::Event&& event,
2120  const Option<FrameworkInfo>& frameworkInfo = None(),
2121  const Option<Task>& task = None());
2122 
2123  Master* master;
2124 
2125  // Active subscribers to the 'api/vX' endpoint keyed by the stream
2126  // identifier.
2128  };
2129 
2130  Subscribers subscribers;
2131 
2132  hashmap<OfferID, Offer*> offers;
2134 
2135  hashmap<OfferID, InverseOffer*> inverseOffers;
2136  hashmap<OfferID, process::Timer> inverseOfferTimers;
2137 
2138  // We track information about roles that we're aware of in the system.
2139  // Specifically, we keep track of the roles when a framework subscribes to
2140  // the role, and/or when there are resources allocated to the role
2141  // (e.g. some tasks and/or executors are consuming resources under the role).
2143 
2144  // Configured role whitelist if using the (deprecated) "explicit
2145  // roles" feature. If this is `None`, any role is allowed.
2146  Option<hashset<std::string>> roleWhitelist;
2147 
2148  // Configured weight for each role, if any. If a role does not
2149  // appear here, it has the default weight of 1.
2151 
2152  // Configured quota for each role, if any. We store quotas by role
2153  // because we set them at the role level.
2155 
2156  // Authenticator names as supplied via flags.
2157  std::vector<std::string> authenticatorNames;
2158 
2159  Option<Authenticator*> authenticator;
2160 
2161  // Frameworks/slaves that are currently in the process of authentication.
2162  // 'authenticating' future is completed when authenticator
2163  // completes authentication.
2164  // The future is removed from the map when master completes authentication.
2166 
2167  // Principals of authenticated frameworks/slaves keyed by PID.
2169 
2170  int64_t nextFrameworkId; // Used to give each framework a unique ID.
2171  int64_t nextOfferId; // Used to give each slot offer a unique ID.
2172  int64_t nextSlaveId; // Used to give each slave a unique ID.
2173 
2174  // NOTE: It is safe to use a 'shared_ptr' because 'Metrics' is
2175  // thread safe.
2176  // TODO(dhamon): This does not need to be a shared_ptr. Metrics contains
2177  // copyable metric types only.
2178  std::shared_ptr<Metrics> metrics;
2179 
2180  // PullGauge handlers.
2181  double _uptime_secs()
2182  {
2183  return (process::Clock::now() - startTime).secs();
2184  }
2185 
2186  double _elected()
2187  {
2188  return elected() ? 1 : 0;
2189  }
2190 
2191  double _slaves_connected();
2192  double _slaves_disconnected();
2193  double _slaves_active();
2194  double _slaves_inactive();
2195  double _slaves_unreachable();
2196 
2197  double _frameworks_connected();
2198  double _frameworks_disconnected();
2199  double _frameworks_active();
2200  double _frameworks_inactive();
2201 
2202  double _outstanding_offers()
2203  {
2204  return static_cast<double>(offers.size());
2205  }
2206 
2207  double _event_queue_messages()
2208  {
2209  return static_cast<double>(eventCount<process::MessageEvent>());
2210  }
2211 
2212  double _event_queue_dispatches()
2213  {
2214  return static_cast<double>(eventCount<process::DispatchEvent>());
2215  }
2216 
2217  double _event_queue_http_requests()
2218  {
2219  return static_cast<double>(eventCount<process::HttpEvent>());
2220  }
2221 
2222  double _tasks_staging();
2223  double _tasks_starting();
2224  double _tasks_running();
2225  double _tasks_unreachable();
2226  double _tasks_killing();
2227 
2228  double _resources_total(const std::string& name);
2229  double _resources_used(const std::string& name);
2230  double _resources_percent(const std::string& name);
2231 
2232  double _resources_revocable_total(const std::string& name);
2233  double _resources_revocable_used(const std::string& name);
2234  double _resources_revocable_percent(const std::string& name);
2235 
2236  process::Time startTime; // Start time used to calculate uptime.
2237 
2238  Option<process::Time> electedTime; // Time when this master is elected.
2239 
2240  // Validates the framework including authorization.
2241  // Returns None if the framework is valid.
2242  // Returns Error if the framework is invalid.
2243  // Returns Failure if authorization returns 'Failure'.
2245  const FrameworkInfo& frameworkInfo,
2246  const process::UPID& from);
2247 };
2248 
2249 
2250 inline std::ostream& operator<<(
2251  std::ostream& stream,
2252  const Framework& framework);
2253 
2254 
2255 // TODO(bmahler): Keeping the task and executor information in sync
2256 // across the Slave and Framework structs is error prone!
2258 {
2259  enum State
2260  {
2261  // Framework has never connected to this master. This implies the
2262  // master failed over and the framework has not yet reregistered,
2263  // but some framework state has been recovered from reregistering
2264  // agents that are running tasks for the framework.
2266 
2267  // Framework was previously connected to this master. A framework
2268  // becomes disconnected when there is a socket error.
2270 
2271  // The framework is connected but not active.
2273 
2274  // Framework is connected and eligible to receive offers. No
2275  // offers will be made to frameworks that are not active.
2276  ACTIVE
2277  };
2278 
2280  const Flags& masterFlags,
2281  const FrameworkInfo& info,
2282  const process::UPID& _pid,
2284  : Framework(master, masterFlags, info, ACTIVE, time)
2285  {
2286  pid = _pid;
2287  }
2288 
2289  Framework(Master* const master,
2290  const Flags& masterFlags,
2291  const FrameworkInfo& info,
2292  const HttpConnection& _http,
2294  : Framework(master, masterFlags, info, ACTIVE, time)
2295  {
2296  http = _http;
2297  }
2298 
2299  Framework(Master* const master,
2300  const Flags& masterFlags,
2301  const FrameworkInfo& info)
2302  : Framework(master, masterFlags, info, RECOVERED, process::Time()) {}
2303 
2305  {
2306  if (http.isSome()) {
2307  closeHttpConnection();
2308  }
2309  }
2310 
2311  Task* getTask(const TaskID& taskId)
2312  {
2313  if (tasks.count(taskId) > 0) {
2314  return tasks[taskId];
2315  }
2316 
2317  return nullptr;
2318  }
2319 
2320  void addTask(Task* task)
2321  {
2322  CHECK(!tasks.contains(task->task_id()))
2323  << "Duplicate task " << task->task_id()
2324  << " of framework " << task->framework_id();
2325 
2326  // Verify that Resource.AllocationInfo is set,
2327  // this should be guaranteed by the master.
2328  foreach (const Resource& resource, task->resources()) {
2329  CHECK(resource.has_allocation_info());
2330  }
2331 
2332  tasks[task->task_id()] = task;
2333 
2334  // Unreachable tasks should be added via `addUnreachableTask`.
2335  CHECK(task->state() != TASK_UNREACHABLE)
2336  << "Task '" << task->task_id() << "' of framework " << id()
2337  << " added in TASK_UNREACHABLE state";
2338 
2339  // Since we track terminal but unacknowledged tasks within
2340  // `tasks` rather than `completedTasks`, we need to handle
2341  // them here: don't count them as consuming resources.
2342  //
2343  // TODO(bmahler): Users currently get confused because
2344  // terminal tasks can show up as "active" tasks in the UI and
2345  // endpoints. Ideally, we show the terminal unacknowledged
2346  // tasks as "completed" as well.
2347  if (!protobuf::isTerminalState(task->state())) {
2348  // Note that we explicitly convert from protobuf to `Resources` once
2349  // and then use the result for calculations to avoid performance penalty
2350  // for multiple conversions and validations implied by `+=` with protobuf
2351  // arguments.
2352  // Conversion is safe, as resources have already passed validation.
2353  const Resources resources = task->resources();
2354  totalUsedResources += resources;
2355  usedResources[task->slave_id()] += resources;
2356 
2357  // It's possible that we're not tracking the task's role for
2358  // this framework if the role is absent from the framework's
2359  // set of roles. In this case, we track the role's allocation
2360  // for this framework.
2361  CHECK(!task->resources().empty());
2362  const std::string& role =
2363  task->resources().begin()->allocation_info().role();
2364 
2365  if (!isTrackedUnderRole(role)) {
2366  trackUnderRole(role);
2367  }
2368  }
2369 
2370  if (!master->subscribers.subscribed.empty()) {
2371  master->subscribers.send(
2373  info);
2374  }
2375  }
2376 
2377  // Update framework to recover the resources that were previously
2378  // being used by `task`.
2379  //
2380  // TODO(bmahler): This is a hack for performance. We need to
2381  // maintain resource counters because computing task resources
2382  // functionally for all tasks is expensive, for now.
2383  void recoverResources(Task* task)
2384  {
2385  CHECK(tasks.contains(task->task_id()))
2386  << "Unknown task " << task->task_id()
2387  << " of framework " << task->framework_id();
2388 
2389  totalUsedResources -= task->resources();
2390  usedResources[task->slave_id()] -= task->resources();
2391  if (usedResources[task->slave_id()].empty()) {
2392  usedResources.erase(task->slave_id());
2393  }
2394 
2395  // If we are no longer subscribed to the role to which these resources are
2396  // being returned to, and we have no more resources allocated to us for that
2397  // role, stop tracking the framework under the role.
2398  CHECK(!task->resources().empty());
2399  const std::string& role =
2400  task->resources().begin()->allocation_info().role();
2401 
2402  auto allocatedToRole = [&role](const Resource& resource) {
2403  return resource.allocation_info().role() == role;
2404  };
2405 
2406  if (roles.count(role) == 0 &&
2407  totalUsedResources.filter(allocatedToRole).empty()) {
2408  CHECK(totalOfferedResources.filter(allocatedToRole).empty());
2409  untrackUnderRole(role);
2410  }
2411  }
2412 
2413  // Sends a message to the connected framework.
2414  template <typename Message>
2415  void send(const Message& message)
2416  {
2417  if (!connected()) {
2418  LOG(WARNING) << "Master attempted to send message to disconnected"
2419  << " framework " << *this;
2420  }
2421 
2422  if (http.isSome()) {
2423  if (!http->send(message)) {
2424  LOG(WARNING) << "Unable to send event to framework " << *this << ":"
2425  << " connection closed";
2426  }
2427  } else {
2428  CHECK_SOME(pid);
2429  master->send(pid.get(), message);
2430  }
2431  }
2432 
2433  void addCompletedTask(Task&& task)
2434  {
2435  // TODO(neilc): We currently allow frameworks to reuse the task
2436  // IDs of completed tasks (although this is discouraged). This
2437  // means that there might be multiple completed tasks with the
2438  // same task ID. We should consider rejecting attempts to reuse
2439  // task IDs (MESOS-6779).
2440  completedTasks.push_back(process::Owned<Task>(new Task(std::move(task))));
2441  }
2442 
2443  void addUnreachableTask(const Task& task)
2444  {
2445  // TODO(adam-mesos): Check if unreachable task already exists.
2446  unreachableTasks.set(task.task_id(), process::Owned<Task>(new Task(task)));
2447  }
2448 
2449  // Removes the task. `unreachable` indicates whether the task is removed due
2450  // to being unreachable. Note that we cannot rely on the task state because
2451  // it may not reflect unreachability due to being set to TASK_LOST for
2452  // backwards compatibility.
2453  void removeTask(Task* task, bool unreachable)
2454  {
2455  CHECK(tasks.contains(task->task_id()))
2456  << "Unknown task " << task->task_id()
2457  << " of framework " << task->framework_id();
2458 
2459  // The invariant here is that the master will have already called
2460  // `recoverResources()` prior to removing terminal or unreachable tasks.
2461  if (!protobuf::isTerminalState(task->state()) &&
2462  task->state() != TASK_UNREACHABLE) {
2463  recoverResources(task);
2464  }
2465 
2466  if (unreachable) {
2467  addUnreachableTask(*task);
2468  } else {
2469  CHECK(task->state() != TASK_UNREACHABLE);
2470 
2471  // TODO(bmahler): This moves a potentially non-terminal task into
2472  // the completed list!
2473  addCompletedTask(Task(*task));
2474  }
2475 
2476  tasks.erase(task->task_id());
2477  }
2478 
2479  void addOffer(Offer* offer)
2480  {
2481  CHECK(!offers.contains(offer)) << "Duplicate offer " << offer->id();
2482  offers.insert(offer);
2483  totalOfferedResources += offer->resources();
2484  offeredResources[offer->slave_id()] += offer->resources();
2485  }
2486 
2487  void removeOffer(Offer* offer)
2488  {
2489  CHECK(offers.find(offer) != offers.end())
2490  << "Unknown offer " << offer->id();
2491 
2492  totalOfferedResources -= offer->resources();
2493  offeredResources[offer->slave_id()] -= offer->resources();
2494  if (offeredResources[offer->slave_id()].empty()) {
2495  offeredResources.erase(offer->slave_id());
2496  }
2497 
2498  offers.erase(offer);
2499  }
2500 
2501  void addInverseOffer(InverseOffer* inverseOffer)
2502  {
2503  CHECK(!inverseOffers.contains(inverseOffer))
2504  << "Duplicate inverse offer " << inverseOffer->id();
2505  inverseOffers.insert(inverseOffer);
2506  }
2507 
2508  void removeInverseOffer(InverseOffer* inverseOffer)
2509  {
2510  CHECK(inverseOffers.contains(inverseOffer))
2511  << "Unknown inverse offer " << inverseOffer->id();
2512 
2513  inverseOffers.erase(inverseOffer);
2514  }
2515 
2516  bool hasExecutor(const SlaveID& slaveId,
2517  const ExecutorID& executorId)
2518  {
2519  return executors.contains(slaveId) &&
2520  executors[slaveId].contains(executorId);
2521  }
2522 
2523  void addExecutor(const SlaveID& slaveId,
2524  const ExecutorInfo& executorInfo)
2525  {
2526  CHECK(!hasExecutor(slaveId, executorInfo.executor_id()))
2527  << "Duplicate executor '" << executorInfo.executor_id()
2528  << "' on agent " << slaveId;
2529 
2530  // Verify that Resource.AllocationInfo is set,
2531  // this should be guaranteed by the master.
2532  foreach (const Resource& resource, executorInfo.resources()) {
2533  CHECK(resource.has_allocation_info());
2534  }
2535 
2536  executors[slaveId][executorInfo.executor_id()] = executorInfo;
2537  totalUsedResources += executorInfo.resources();
2538  usedResources[slaveId] += executorInfo.resources();
2539 
2540  // It's possible that we're not tracking the task's role for
2541  // this framework if the role is absent from the framework's
2542  // set of roles. In this case, we track the role's allocation
2543  // for this framework.
2544  if (!executorInfo.resources().empty()) {
2545  const std::string& role =
2546  executorInfo.resources().begin()->allocation_info().role();
2547 
2548  if (!isTrackedUnderRole(role)) {
2549  trackUnderRole(role);
2550  }
2551  }
2552  }
2553 
2554  void removeExecutor(const SlaveID& slaveId,
2555  const ExecutorID& executorId)
2556  {
2557  CHECK(hasExecutor(slaveId, executorId))
2558  << "Unknown executor '" << executorId
2559  << "' of framework " << id()
2560  << " of agent " << slaveId;
2561 
2562  const ExecutorInfo& executorInfo = executors[slaveId][executorId];
2563 
2564  totalUsedResources -= executorInfo.resources();
2565  usedResources[slaveId] -= executorInfo.resources();
2566  if (usedResources[slaveId].empty()) {
2567  usedResources.erase(slaveId);
2568  }
2569 
2570  // If we are no longer subscribed to the role to which these resources are
2571  // being returned to, and we have no more resources allocated to us for that
2572  // role, stop tracking the framework under the role.
2573  if (!executorInfo.resources().empty()) {
2574  const std::string& role =
2575  executorInfo.resources().begin()->allocation_info().role();
2576 
2577  auto allocatedToRole = [&role](const Resource& resource) {
2578  return resource.allocation_info().role() == role;
2579  };
2580 
2581  if (roles.count(role) == 0 &&
2582  totalUsedResources.filter(allocatedToRole).empty()) {
2583  CHECK(totalOfferedResources.filter(allocatedToRole).empty());
2584  untrackUnderRole(role);
2585  }
2586  }
2587 
2588  executors[slaveId].erase(executorId);
2589  if (executors[slaveId].empty()) {
2590  executors.erase(slaveId);
2591  }
2592  }
2593 
2594  void addOperation(Operation* operation)
2595  {
2596  CHECK(operation->has_framework_id());
2597 
2598  const FrameworkID& frameworkId = operation->framework_id();
2599 
2600  const UUID& uuid = operation->uuid();
2601 
2602  CHECK(!operations.contains(uuid))
2603  << "Duplicate operation '" << operation->info().id()
2604  << "' (uuid: " << uuid << ") "
2605  << "of framework " << frameworkId;
2606 
2607  operations.put(uuid, operation);
2608 
2609  if (operation->info().has_id()) {
2610  operationUUIDs.put(operation->info().id(), uuid);
2611  }
2612 
2613  if (!protobuf::isSpeculativeOperation(operation->info()) &&
2614  !protobuf::isTerminalState(operation->latest_status().state())) {
2615  Try<Resources> consumed =
2616  protobuf::getConsumedResources(operation->info());
2617  CHECK_SOME(consumed);
2618 
2619  CHECK(operation->has_slave_id())
2620  << "External resource provider is not supported yet";
2621 
2622  const SlaveID& slaveId = operation->slave_id();
2623 
2624  totalUsedResources += consumed.get();
2625  usedResources[slaveId] += consumed.get();
2626 
2627  // It's possible that we're not tracking the role from the
2628  // resources in the operation for this framework if the role is
2629  // absent from the framework's set of roles. In this case, we
2630  // track the role's allocation for this framework.
2631  foreachkey (const std::string& role, consumed->allocations()) {
2632  if (!isTrackedUnderRole(role)) {
2633  trackUnderRole(role);
2634  }
2635  }
2636  }
2637  }
2638 
2639  Option<Operation*> getOperation(const OperationID& id) {
2640  Option<UUID> uuid = operationUUIDs.get(id);
2641 
2642  if (uuid.isNone()) {
2643  return None();
2644  }
2645 
2646  Option<Operation*> operation = operations.get(uuid.get());
2647 
2648  CHECK_SOME(operation);
2649 
2650  return operation;
2651  }
2652 
2653  void recoverResources(Operation* operation)
2654  {
2655  CHECK(operation->has_slave_id())
2656  << "External resource provider is not supported yet";
2657 
2658  const SlaveID& slaveId = operation->slave_id();
2659 
2660  if (protobuf::isSpeculativeOperation(operation->info())) {
2661  return;
2662  }
2663 
2664  Try<Resources> consumed = protobuf::getConsumedResources(operation->info());
2665  CHECK_SOME(consumed);
2666 
2667  CHECK(totalUsedResources.contains(consumed.get()))
2668  << "Tried to recover resources " << consumed.get()
2669  << " which do not seem used";
2670 
2671  CHECK(usedResources[slaveId].contains(consumed.get()))
2672  << "Tried to recover resources " << consumed.get() << " of agent "
2673  << slaveId << " which do not seem used";
2674 
2675  totalUsedResources -= consumed.get();
2676  usedResources[slaveId] -= consumed.get();
2677  if (usedResources[slaveId].empty()) {
2678  usedResources.erase(slaveId);
2679  }
2680 
2681  // If we are no longer subscribed to the role to which these
2682  // resources are being returned to, and we have no more resources
2683  // allocated to us for that role, stop tracking the framework
2684  // under the role.
2685  foreachkey (const std::string& role, consumed->allocations()) {
2686  auto allocatedToRole = [&role](const Resource& resource) {
2687  return resource.allocation_info().role() == role;
2688  };
2689 
2690  if (roles.count(role) == 0 &&
2691  totalUsedResources.filter(allocatedToRole).empty()) {
2692  CHECK(totalOfferedResources.filter(allocatedToRole).empty());
2693  untrackUnderRole(role);
2694  }
2695  }
2696  }
2697 
2698  void removeOperation(Operation* operation)
2699  {
2700  const UUID& uuid = operation->uuid();
2701 
2702  CHECK(operations.contains(uuid))
2703  << "Unknown operation '" << operation->info().id()
2704  << "' (uuid: " << uuid << ") "
2705  << "of framework " << operation->framework_id();
2706 
2707  if (!protobuf::isSpeculativeOperation(operation->info()) &&
2708  !protobuf::isTerminalState(operation->latest_status().state())) {
2709  recoverResources(operation);
2710  }
2711 
2712  if (operation->info().has_id()) {
2713  operationUUIDs.erase(operation->info().id());
2714  }
2715 
2716  operations.erase(uuid);
2717  }
2718 
2719  const FrameworkID id() const { return info.id(); }
2720 
2721  // Update fields in 'info' using those in 'newInfo'. Currently this
2722  // only updates `role`/`roles`, 'name', 'failover_timeout', 'hostname',
2723  // 'webui_url', 'capabilities', and 'labels'.
2724  void update(const FrameworkInfo& newInfo)
2725  {
2726  // We only merge 'info' from the same framework 'id'.
2727  CHECK_EQ(info.id(), newInfo.id());
2728 
2729  // Save the old list of roles for later.
2730  std::set<std::string> oldRoles = roles;
2731 
2732  // TODO(jmlvanre): Merge other fields as per design doc in
2733  // MESOS-703.
2734 
2735  info.clear_role();
2736  info.clear_roles();
2737 
2738  if (newInfo.has_role()) {
2739  info.set_role(newInfo.role());
2740  }
2741 
2742  if (newInfo.roles_size() > 0) {
2743  info.mutable_roles()->CopyFrom(newInfo.roles());
2744  }
2745 
2746  roles = protobuf::framework::getRoles(newInfo);
2747 
2748  if (newInfo.user() != info.user()) {
2749  LOG(WARNING) << "Cannot update FrameworkInfo.user to '" << newInfo.user()
2750  << "' for framework " << id() << ". Check MESOS-703";
2751  }
2752 
2753  info.set_name(newInfo.name());
2754 
2755  if (newInfo.has_failover_timeout()) {
2756  info.set_failover_timeout(newInfo.failover_timeout());
2757  } else {
2758  info.clear_failover_timeout();
2759  }
2760 
2761  if (newInfo.checkpoint() != info.checkpoint()) {
2762  LOG(WARNING) << "Cannot update FrameworkInfo.checkpoint to '"
2763  << stringify(newInfo.checkpoint()) << "' for framework "
2764  << id() << ". Check MESOS-703";
2765  }
2766 
2767  if (newInfo.has_hostname()) {
2768  info.set_hostname(newInfo.hostname());
2769  } else {
2770  info.clear_hostname();
2771  }
2772 
2773  if (newInfo.principal() != info.principal()) {
2774  LOG(WARNING) << "Cannot update FrameworkInfo.principal to '"
2775  << newInfo.principal() << "' for framework " << id()
2776  << ". Check MESOS-703";
2777  }
2778 
2779  if (newInfo.has_webui_url()) {
2780  info.set_webui_url(newInfo.webui_url());
2781  } else {
2782  info.clear_webui_url();
2783  }
2784 
2785  if (newInfo.capabilities_size() > 0) {
2786  info.mutable_capabilities()->CopyFrom(newInfo.capabilities());
2787  } else {
2788  info.clear_capabilities();
2789  }
2790  capabilities = protobuf::framework::Capabilities(info.capabilities());
2791 
2792  if (newInfo.has_labels()) {
2793  info.mutable_labels()->CopyFrom(newInfo.labels());
2794  } else {
2795  info.clear_labels();
2796  }
2797 
2798  const std::set<std::string>& newRoles = roles;
2799 
2800  const std::set<std::string> removedRoles = [&]() {
2801  std::set<std::string> result = oldRoles;
2802  foreach (const std::string& role, newRoles) {
2803  result.erase(role);
2804  }
2805  return result;
2806  }();
2807 
2808  foreach (const std::string& role, removedRoles) {
2809  auto allocatedToRole = [&role](const Resource& resource) {
2810  return resource.allocation_info().role() == role;
2811  };
2812 
2813  // Stop tracking the framework under this role if there are
2814  // no longer any resources allocated to it.
2815  if (totalUsedResources.filter(allocatedToRole).empty()) {
2816  CHECK(totalOfferedResources.filter(allocatedToRole).empty());
2817  untrackUnderRole(role);
2818  }
2819  }
2820 
2821  const std::set<std::string> addedRoles = [&]() {
2822  std::set<std::string> result = newRoles;
2823  foreach (const std::string& role, oldRoles) {
2824  result.erase(role);
2825  }
2826  return result;
2827  }();
2828 
2829  foreach (const std::string& role, addedRoles) {
2830  // NOTE: It's possible that we're already tracking this framework
2831  // under the role because a framework can unsubscribe from a role
2832  // while it still has resources allocated to the role.
2833  if (!isTrackedUnderRole(role)) {
2834  trackUnderRole(role);
2835  }
2836  }
2837  }
2838 
2839  void updateConnection(const process::UPID& newPid)
2840  {
2841  // Cleanup the HTTP connnection if this is a downgrade from HTTP
2842  // to PID. Note that the connection may already be closed.
2843  if (http.isSome()) {
2844  closeHttpConnection();
2845  }
2846 
2847  // TODO(benh): unlink(oldPid);
2848  pid = newPid;
2849  }
2850 
2851  void updateConnection(const HttpConnection& newHttp)
2852  {
2853  if (pid.isSome()) {
2854  // Wipe the PID if this is an upgrade from PID to HTTP.
2855  // TODO(benh): unlink(oldPid);
2856  pid = None();
2857  } else if (http.isSome()) {
2858  // Cleanup the old HTTP connection.
2859  // Note that master creates a new HTTP connection for every
2860  // subscribe request, so 'newHttp' should always be different
2861  // from 'http'.
2862  closeHttpConnection();
2863  }
2864 
2865  CHECK_NONE(http);
2866 
2867  http = newHttp;
2868  }
2869 
2870  // Closes the HTTP connection and stops the heartbeat.
2871  //
2872  // TODO(vinod): Currently `state` variable is set separately
2873  // from this method. We need to make sure these are in sync.
2875  {
2876  CHECK_SOME(http);
2877 
2878  if (connected() && !http->close()) {
2879  LOG(WARNING) << "Failed to close HTTP pipe for " << *this;
2880  }
2881 
2882  http = None();
2883 
2884  CHECK_SOME(heartbeater);
2885 
2886  terminate(heartbeater->get());
2887  wait(heartbeater->get());
2888 
2889  heartbeater = None();
2890  }
2891 
2892  void heartbeat()
2893  {
2894  CHECK_NONE(heartbeater);
2895  CHECK_SOME(http);
2896 
2897  // TODO(vinod): Make heartbeat interval configurable and include
2898  // this information in the SUBSCRIBED response.
2899  scheduler::Event event;
2900  event.set_type(scheduler::Event::HEARTBEAT);
2901 
2902  heartbeater =
2904  "framework " + stringify(info.id()),
2905  event,
2906  http.get(),
2908 
2909  process::spawn(heartbeater->get());
2910  }
2911 
2912  bool active() const { return state == ACTIVE; }
2913  bool connected() const { return state == ACTIVE || state == INACTIVE; }
2914  bool recovered() const { return state == RECOVERED; }
2915 
2916  bool isTrackedUnderRole(const std::string& role) const;
2917  void trackUnderRole(const std::string& role);
2918  void untrackUnderRole(const std::string& role);
2919 
2920  Master* const master;
2921 
2922  FrameworkInfo info;
2923 
2924  std::set<std::string> roles;
2925 
2927 
2928  // Frameworks can either be connected via HTTP or by message passing
2929  // (scheduler driver). At most one of `http` and `pid` will be set
2930  // according to the last connection made by the framework; neither
2931  // field will be set if the framework is in state `RECOVERED`.
2934 
2936 
2940 
2941  // Tasks that have not yet been launched because they are currently
2942  // being authorized.
2944 
2945  // TODO(bmahler): Make this private to enforce that `addTask()` and
2946  // `removeTask()` are used, and provide a const view into the tasks.
2948 
2949  // Tasks launched by this framework that have reached a terminal
2950  // state and have had all their updates acknowledged. We only keep a
2951  // fixed-size cache to avoid consuming too much memory. We use
2952  // boost::circular_buffer rather than BoundedHashMap because there
2953  // can be multiple completed tasks with the same task ID.
2954  boost::circular_buffer<process::Owned<Task>> completedTasks;
2955 
2956  // When an agent is marked unreachable, tasks running on it are stored
2957  // here. We only keep a fixed-size cache to avoid consuming too much memory.
2958  // NOTE: Non-partition-aware unreachable tasks in this map are marked
2959  // TASK_LOST instead of TASK_UNREACHABLE for backward compatibility.
2961 
2962  hashset<Offer*> offers; // Active offers for framework.
2963 
2964  hashset<InverseOffer*> inverseOffers; // Active inverse offers for framework.
2965 
2966  // TODO(bmahler): Make this private to enforce that `addExecutor()`
2967  // and `removeExecutor()` are used, and provide a const view into
2968  // the executors.
2970 
2971  // Pending operations or terminal operations that have
2972  // unacknowledged status updates.
2974 
2975  // The map from the framework-specified operation ID to the
2976  // corresponding internal operation UUID.
2978 
2979  // NOTE: For the used and offered resources below, we keep the
2980  // total as well as partitioned by SlaveID.
2981  // We expose the total resources via the HTTP endpoint, and we
2982  // keep a running total of the resources because looping over the
2983  // slaves to sum the resources has led to perf issues (MESOS-1862).
2984  // We keep the resources partitioned by SlaveID because non-scalar
2985  // resources can be lost when summing them up across multiple
2986  // slaves (MESOS-2373).
2987  //
2988  // Also note that keeping the totals is safe even though it yields
2989  // incorrect results for non-scalar resources.
2990  // (1) For overlapping set items / ranges across slaves, these
2991  // will get added N times but only represented once.
2992  // (2) When an initial subtraction occurs (N-1), the resource is
2993  // no longer represented. (This is the source of the bug).
2994  // (3) When any further subtractions occur (N-(1+M)), the
2995  // Resources simply ignores the subtraction since there's
2996  // nothing to remove, so this is safe for now.
2997 
2998  // TODO(mpark): Strip the non-scalar resources out of the totals
2999  // in order to avoid reporting incorrect statistics (MESOS-2623).
3000 
3001  // Active task / executor / operation resources.
3003 
3004  // Note that we maintain multiple copies of each shared resource in
3005  // `usedResources` as they are used by multiple tasks.
3007 
3008  // Offered resources.
3011 
3012  // This is only set for HTTP frameworks.
3015 
3016 private:
3017  Framework(Master* const _master,
3018  const Flags& masterFlags,
3019  const FrameworkInfo& _info,
3020  State state,
3021  const process::Time& time)
3022  : master(_master),
3023  info(_info),
3024  roles(protobuf::framework::getRoles(_info)),
3025  capabilities(_info.capabilities()),
3026  state(state),
3027  registeredTime(time),
3028  reregisteredTime(time),
3029  completedTasks(masterFlags.max_completed_tasks_per_framework),
3030  unreachableTasks(masterFlags.max_unreachable_tasks_per_framework)
3031  {
3032  foreach (const std::string& role, roles) {
3033  // NOTE: It's possible that we're already being tracked under the role
3034  // because a framework can unsubscribe from a role while it still has
3035  // resources allocated to the role.
3036  if (!isTrackedUnderRole(role)) {
3037  trackUnderRole(role);
3038  }
3039  }
3040  }
3041 
3042  Framework(const Framework&); // No copying.
3043  Framework& operator=(const Framework&); // No assigning.
3044 };
3045 
3046 
3047 inline std::ostream& operator<<(
3048  std::ostream& stream,
3049  const Framework& framework)
3050 {
3051  // TODO(vinod): Also log the hostname once FrameworkInfo is properly
3052  // updated on framework failover (MESOS-1784).
3053  stream << framework.id() << " (" << framework.info.name() << ")";
3054 
3055  if (framework.pid.isSome()) {
3056  stream << " at " << framework.pid.get();
3057  }
3058 
3059  return stream;
3060 }
3061 
3062 
3063 // Information about an active role.
3064 struct Role
3065 {
3066  Role() = delete;
3067 
3068  Role(const std::string& _role) : role(_role) {}
3069 
3070  void addFramework(Framework* framework)
3071  {
3072  frameworks[framework->id()] = framework;
3073  }
3074 
3075  void removeFramework(Framework* framework)
3076  {
3077  frameworks.erase(framework->id());
3078  }
3079 
3081  {
3082  Resources resources;
3083 
3084  auto allocatedTo = [](const std::string& role) {
3085  return [role](const Resource& resource) {
3086  CHECK(resource.has_allocation_info());
3087  return resource.allocation_info().role() == role;
3088  };
3089  };
3090 
3091  foreachvalue (Framework* framework, frameworks) {
3092  resources += framework->totalUsedResources.filter(allocatedTo(role));
3093  resources += framework->totalOfferedResources.filter(allocatedTo(role));
3094  }
3095 
3096  return resources;
3097  }
3098 
3099  const std::string role;
3100 
3101  // NOTE: The dynamic role/quota relation is stored in and administrated
3102  // by the master. There is no direct representation of quota information
3103  // here to avoid duplication and to support that an operator can associate
3104  // quota with a role before the role is created. Such ordering of operator
3105  // requests prevents a race of premature unbounded allocation that setting
3106  // quota first is intended to contain.
3107 
3109 };
3110 
3111 } // namespace master {
3112 } // namespace internal {
3113 } // namespace mesos {
3114 
3115 #endif // __MASTER_HPP__
Protocol< RecoverRequest, RecoverResponse > recover
Definition: path.hpp:26
void initialize() override
Invoked when a process gets spawned.
Definition: master.hpp:392
std::string generate(const std::string &prefix="")
Returns &#39;prefix(N)&#39; where N represents the number of instances where the same prefix (wrt...
Try< Resources > getConsumedResources(const Offer::Operation &operation)
bool send(const Message &message)
Definition: master.hpp:345
Framework(Master *const master, const Flags &masterFlags, const FrameworkInfo &info, const process::UPID &_pid, const process::Time &time=process::Clock::now())
Definition: master.hpp:2279
void recoverResources(Operation *operation)
Definition: master.hpp:2653
hashmap< ResourceProviderID, ResourceProvider > resourceProviders
Definition: master.hpp:316
Definition: nothing.hpp:16
hashmap< TaskID, Task * > tasks
Definition: master.hpp:2947
Definition: master.hpp:3064
Master *const master
Definition: master.hpp:2920
ContentType
Definition: http.hpp:43
Option< Error > validate(const std::string &imageDir)
hashmap< UUID, Operation * > operations
Definition: master.hpp:248
std::ostream & operator<<(std::ostream &stream, const Future< T > &future)
Definition: future.hpp:1829
Try< bool > update(const std::string &link, const Handle &parent, uint16_t protocol, const action::Mirror &mirror)
void finalize(bool finalize_wsa=false)
Clean up the library.
Try< Bytes > size(const std::string &path, const FollowSymlink follow=FollowSymlink::FOLLOW_SYMLINK)
Definition: stat.hpp:121
Future< Response > request(const Request &request, bool streamedResponse=false)
Asynchronously sends an HTTP request to the process and returns the HTTP response once the entire res...
Subscriber(const HttpConnection &_http, const Option< process::http::authentication::Principal > _principal)
Definition: master.hpp:2066
T & get()&
Definition: try.hpp:73
SlaveInfo info
Definition: master.hpp:188
bool connected() const
Definition: master.hpp:2913
Definition: master.hpp:27
~Framework()
Definition: master.hpp:2304
Role(const std::string &_role)
Definition: master.hpp:3068
Definition: check.hpp:33
const SlaveID id
Definition: master.hpp:187
hashset< Offer * > offers
Definition: master.hpp:251
Option< process::Timer > reregistrationTimer
Definition: master.hpp:217
bool connected
Definition: master.hpp:204
#define CHECK_NONE(expression)
Definition: check.hpp:54
bool initialize(const Option< std::string > &delegate=None(), const Option< std::string > &readwriteAuthenticationRealm=None(), const Option< std::string > &readonlyAuthenticationRealm=None())
Initialize the library.
Task * getTask(const TaskID &taskId)
Definition: master.hpp:2311
Resources totalResources
Definition: master.hpp:277
constexpr Duration DEFAULT_HEARTBEAT_INTERVAL
Definition: constants.hpp:52
Definition: protobuf_utils.hpp:261
hashmap< SlaveID, Resources > offeredResources
Definition: master.hpp:3010
protobuf::framework::Capabilities capabilities
Definition: master.hpp:2926
bool hasExecutor(const SlaveID &slaveId, const ExecutorID &executorId)
Definition: master.hpp:2516
std::set< std::string > getRoles(const FrameworkInfo &frameworkInfo)
Option< Operation * > getOperation(const OperationID &id)
Definition: master.hpp:2639
Resources filter(const lambda::function< bool(const Resource &)> &predicate) const
v1::AgentID evolve(const SlaveID &slaveId)
Result< ProcessStatus > status(pid_t pid)
Definition: proc.hpp:166
InverseOffer * getInverseOffer(Master *master, const OfferID &offerId)
Try< Nothing > machines(const google::protobuf::RepeatedPtrField< MachineID > &ids)
Performs the following checks on a list of machines:
void addUnreachableTask(const Task &task)
Definition: master.hpp:2443
mesos::v1::scheduler::Call Call
Definition: mesos.hpp:2611
Definition: resources.hpp:79
Resources totalUsedResources
Definition: master.hpp:3002
Slave * getSlave(Master *master, const SlaveID &slaveId)
void removeExecutor(const SlaveID &slaveId, const ExecutorID &executorId)
Definition: master.hpp:2554
Option< HttpConnection > http
Definition: master.hpp:2932
void update(const FrameworkInfo &newInfo)
Definition: master.hpp:2724
Definition: flags.hpp:42
Definition: registrar.hpp:91
void addFramework(Framework *framework)
Definition: master.hpp:3070
Option< Error > reregisterSlave(const ReregisterSlaveMessage &message)
Definition: files.hpp:73
void addCompletedTask(Task &&task)
Definition: master.hpp:2433
Operation
Definition: cgroups.hpp:441
Future< Nothing > redirect(int_fd from, Option< int_fd > to, size_t chunk=4096, const std::vector< lambda::function< void(const std::string &)>> &hooks={})
Redirect output from the &#39;from&#39; file descriptor to the &#39;to&#39; file descriptor (or /dev/null if &#39;to&#39; is ...
void updateConnection(const HttpConnection &newHttp)
Definition: master.hpp:2851
UPID spawn(ProcessBase *process, bool manage=false)
Spawn a new process.
Framework(Master *const master, const Flags &masterFlags, const FrameworkInfo &info)
Definition: master.hpp:2299
Definition: duration.hpp:32
void removeOperation(Operation *operation)
Definition: master.hpp:2698
void terminate(const UPID &pid, bool inject=true)
Sends a TerminateEvent to the given process.
hashmap< UUID, Operation * > operations
Definition: master.hpp:313
void heartbeat()
Definition: master.hpp:2892
void send(const process::UPID &to, const google::protobuf::Message &message)
Definition: protobuf.hpp:118
bool isSome() const
Definition: option.hpp:115
Definition: event.hpp:214
process::Future< Nothing > closed() const
Definition: master.hpp:358
Definition: http.hpp:517
Definition: json.hpp:158
hashmap< FrameworkID, hashmap< TaskID, Task * > > tasks
Definition: master.hpp:240
void updateConnection(const process::UPID &newPid)
Definition: master.hpp:2839
void removeOffer(Offer *offer)
Definition: master.hpp:2487
multihashmap< FrameworkID, TaskID > killedTasks
Definition: master.hpp:244
mesos::v1::scheduler::Event Event
Definition: mesos.hpp:2612
Definition: hashmap.hpp:38
Option< UUID > resourceVersion
Definition: master.hpp:290
mesos::master::Event createTaskAdded(const Task &task)
bool contains(const std::string &s, const std::string &substr)
Definition: strings.hpp:423
#define CHECK_SOME(expression)
Definition: check.hpp:50
Definition: master.hpp:333
bool active() const
Definition: master.hpp:2912
Resources checkpointedResources
Definition: master.hpp:270
SlaveObserver * observer
Definition: master.hpp:292
bool isTerminalState(const TaskState &state)
Definition: owned.hpp:26
Try< Nothing > unavailability(const Unavailability &unavailability)
An "untyped" PID, used to encapsulate the process ID for lower-layer abstractions (eg...
Definition: pid.hpp:39
BoundedHashMap< TaskID, process::Owned< Task > > unreachableTasks
Definition: master.hpp:2960
bool recovered() const
Definition: master.hpp:2914
process::Time registeredTime
Definition: master.hpp:200
bool active
Definition: master.hpp:209
Definition: http.hpp:339
An abstraction for contending to be a leading master.
Definition: contender.hpp:40
std::set< std::string > roles
Definition: master.hpp:2924
process::UPID pid
Definition: master.hpp:192
Definition: uuid.hpp:35
Definition: protobuf_utils.hpp:455
process::http::Pipe::Writer writer
Definition: master.hpp:363
Option< process::Time > reregisteredTime
Definition: master.hpp:201
Definition: spec.hpp:30
process::Time reregisteredTime
Definition: master.hpp:2938
Master *const master
Definition: master.hpp:186
void addOffer(Offer *offer)
Definition: master.hpp:2479
Timer delay(const Duration &duration, const PID< T > &pid, void(T::*method)())
Definition: delay.hpp:31
std::string encode(const T &record) const
Returns the "Record-IO" encoded record.
Definition: recordio.hpp:66
Option< Error > quotaInfo(const mesos::quota::QuotaInfo &quotaInfo)
const T & get() const &
Definition: option.hpp:118
const std::string role
Definition: master.hpp:3099
Framework(Master *const master, const Flags &masterFlags, const FrameworkInfo &info, const HttpConnection &_http, const process::Time &time=process::Clock::now())
Definition: master.hpp:2289
Definition: protobuf.hpp:100
process::Time registeredTime
Definition: master.hpp:2937
process::Future< Nothing > destroy(const std::string &hierarchy, const std::string &cgroup="/")
#define foreachvalue(VALUE, ELEMS)
Definition: foreach.hpp:77
Try< int_fd > accept(int_fd s)
Definition: network.hpp:31
void removeInverseOffer(InverseOffer *inverseOffer)
Definition: master.hpp:2508
Definition: whitelist_watcher.hpp:37
Definition: protobuf.hpp:59
void removeFramework(Framework *framework)
Definition: master.hpp:3075
MasterInfo info() const
Definition: master.hpp:578
bool wait(const UPID &pid, const Duration &duration=Seconds(-1))
Wait for the process to exit for no more than the specified seconds.
UUID resourceVersion
Definition: master.hpp:309
Definition: time.hpp:23
FrameworkInfo info
Definition: master.hpp:2922
ContentType contentType
Definition: master.hpp:364
bool exited(const UPID &from, const UPID &to)
Simulates disconnection of the link between &#39;from&#39; and &#39;to&#39; by sending an ExitedEvent to &#39;to&#39;...
id::UUID streamId
Definition: master.hpp:365
HttpConnection http
Definition: master.hpp:2111
void send(const Message &message)
Definition: master.hpp:2415
Definition: boundedhashmap.hpp:27
Basic model of an allocator: resources are allocated to a framework in the form of offers...
Definition: allocator.hpp:55
void recoverResources(Task *task)
Definition: master.hpp:2383
#define flags
Definition: decoder.hpp:18
bool empty() const
Definition: resources.hpp:393
Definition: none.hpp:27
Definition: attributes.hpp:24
hashset< InverseOffer * > inverseOffers
Definition: master.hpp:2964
void removeTask(Task *task, bool unreachable)
Definition: master.hpp:2453
Definition: executor.hpp:48
const MachineID machineId
Definition: master.hpp:190
boost::circular_buffer< process::Owned< Task > > completedTasks
Definition: master.hpp:2954
Definition: master.hpp:117
Resources allocatedResources() const
Definition: master.hpp:3080
#define foreachkey(KEY, ELEMS)
Definition: foreach.hpp:74
HttpConnection(const process::http::Pipe::Writer &_writer, ContentType _contentType, id::UUID _streamId)
Definition: master.hpp:335
constexpr size_t MAX_REMOVED_SLAVES
Definition: constants.hpp:84
hashset< InverseOffer * > inverseOffers
Definition: master.hpp:254
bool close()
Definition: master.hpp:353
An abstraction of a Master detector which can be used to detect the leading master from a group...
Definition: detector.hpp:38
static Time now()
The current clock time for either the current process that makes this call or the global clock time i...
Resources totalResources
Definition: master.hpp:296
void addExecutor(const SlaveID &slaveId, const ExecutorInfo &executorInfo)
Definition: master.hpp:2523
bool isNone() const
Definition: option.hpp:116
Definition: event.hpp:103
hashmap< TaskID, TaskInfo > pendingTasks
Definition: master.hpp:2943
Heartbeater(const std::string &_logMessage, const Message &_heartbeatMessage, const HttpConnection &_http, const Duration &_interval, const Option< Duration > &_delay=None())
Definition: master.hpp:379
Definition: master.hpp:376
Given an encoding function for individual records, this provides encoding from typed records into "Re...
Definition: recordio.hpp:57
Option< process::Owned< Heartbeater< scheduler::Event, v1::scheduler::Event > > > heartbeater
Definition: master.hpp:3014
Option< process::UPID > pid
Definition: master.hpp:2933
Definition: metrics.hpp:38
bool isSpeculativeOperation(const Offer::Operation &operation)
State
Definition: master.hpp:2259
Offer * getOffer(Master *master, const OfferID &offerId)
Try< Nothing > create(const std::string &hierarchy, const std::string &cgroup, bool recursive=false)
Try< Nothing > kill(const std::string &hierarchy, const std::string &cgroup, int signal)
hashmap< FrameworkID, Framework * > frameworks
Definition: master.hpp:3108
Try< Nothing > bind(int_fd s, const Address &address)
Definition: network.hpp:46
process::Time unregisteredTime
Definition: master.hpp:2939
std::string serialize(ContentType contentType, const google::protobuf::Message &message)
void addOperation(Operation *operation)
Definition: master.hpp:2594
std::string version
Definition: master.hpp:195
URI http(const std::string &host, const std::string &path="/", const Option< int > &port=None(), const Option< std::string > &query=None(), const Option< std::string > &fragment=None(), const Option< std::string > &user=None(), const Option< std::string > &password=None())
Creates an http URI with the given parameters.
Definition: http.hpp:35
ResourceProviderInfo info
Definition: master.hpp:295
hashmap< FrameworkID, hashmap< ExecutorID, ExecutorInfo > > executors
Definition: master.hpp:224
hashset< Offer * > offers
Definition: master.hpp:2962
std::string stringify(int flags)
Definition: owned.hpp:36
void addTask(Task *task)
Definition: master.hpp:2320
void closeHttpConnection()
Definition: master.hpp:2874
Definition: master.hpp:2257
protobuf::slave::Capabilities capabilities
Definition: master.hpp:198
hashmap< SlaveID, hashmap< ExecutorID, ExecutorInfo > > executors
Definition: master.hpp:2969
Definition: process.hpp:501
process::Owned< Heartbeater< mesos::master::Event, v1::master::Event > > heartbeater
Definition: master.hpp:2113
bool contains(const Key &key) const
Definition: hashmap.hpp:86
hashmap< OperationID, UUID > operationUUIDs
Definition: master.hpp:2977
hashmap< UUID, Operation * > operations
Definition: master.hpp:2973
Definition: parse.hpp:33
Try< std::set< pid_t > > pids()
Definition: freebsd.hpp:62
const Option< process::http::authentication::Principal > principal
Definition: master.hpp:2114
hashmap< SlaveID, Resources > usedResources
Definition: master.hpp:3006
Resources totalOfferedResources
Definition: master.hpp:3009
Definition: master.hpp:426
PID< MetricsProcess > metrics
Resources offeredResources
Definition: master.hpp:261
hashmap< FrameworkID, hashmap< TaskID, TaskInfo > > pendingTasks
Definition: master.hpp:230
const FrameworkID id() const
Definition: master.hpp:2719
constexpr const char * name
Definition: shell.hpp:43
hashmap< FrameworkID, Resources > usedResources
Definition: master.hpp:259
Try< std::vector< Value > > time(const std::string &hierarchy, const std::string &cgroup)
Try< Nothing > schedule(const mesos::maintenance::Schedule &schedule, const hashmap< MachineID, Machine > &machines)
Performs the following checks on the new maintenance schedule:
Option< Error > registerSlave(const RegisterSlaveMessage &message)
void addInverseOffer(InverseOffer *inverseOffer)
Definition: master.hpp:2501
State state
Definition: master.hpp:2935
Future< size_t > send(const int_fd &fd, const void *buf, size_t size)