Apache Mesos
scheduler.hpp
Go to the documentation of this file.
1 // Licensed to the Apache Software Foundation (ASF) under one
2 // or more contributor license agreements. See the NOTICE file
3 // distributed with this work for additional information
4 // regarding copyright ownership. The ASF licenses this file
5 // to you under the Apache License, Version 2.0 (the
6 // "License"); you may not use this file except in compliance
7 // with the License. You may obtain a copy of the License at
8 //
9 // http://www.apache.org/licenses/LICENSE-2.0
10 //
11 // Unless required by applicable law or agreed to in writing, software
12 // distributed under the License is distributed on an "AS IS" BASIS,
13 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 // See the License for the specific language governing permissions and
15 // limitations under the License.
16 
17 #ifndef __MESOS_SCHEDULER_HPP__
18 #define __MESOS_SCHEDULER_HPP__
19 
20 #include <memory>
21 #include <mutex>
22 #include <string>
23 #include <vector>
24 
25 #include <mesos/mesos.hpp>
26 #include <mesos/scheduler/scheduler.pb.h>
27 
28 // Mesos scheduler interface and scheduler driver. A scheduler is used
29 // to interact with Mesos in order to run distributed computations.
30 //
31 // IF YOU FIND YOURSELF MODIFYING COMMENTS HERE PLEASE CONSIDER MAKING
32 // THE SAME MODIFICATIONS FOR OTHER LANGUAGE BINDINGS (e.g., Java:
33 // src/java/src/org/apache/mesos, Python: src/python/src, etc.).
34 
35 // Forward declaration.
36 namespace process {
37 class Latch;
38 } // namespace process {
39 
40 namespace mesos {
41 
42 // A few forward declarations.
43 class SchedulerDriver;
44 
45 namespace scheduler {
46 class MesosProcess;
47 } // namespace scheduler {
48 
49 namespace internal {
50 class SchedulerProcess;
51 } // namespace internal {
52 
53 namespace master {
54 namespace detector {
55 class MasterDetector;
56 } // namespace detector {
57 } // namespace master {
58 
59 // Callback interface to be implemented by frameworks' schedulers.
60 // Note that only one callback will be invoked at a time, so it is not
61 // recommended that you block within a callback because it may cause a
62 // deadlock.
63 //
64 // Each callback includes a pointer to the scheduler driver that was
65 // used to run this scheduler. The pointer will not change for the
66 // duration of a scheduler (i.e., from the point you do
67 // SchedulerDriver::start() to the point that SchedulerDriver::join()
68 // returns). This is intended for convenience so that a scheduler
69 // doesn't need to store a pointer to the driver itself.
70 class Scheduler
71 {
72 public:
73  // Empty virtual destructor (necessary to instantiate subclasses).
74  virtual ~Scheduler() {}
75 
76  // Invoked when the scheduler successfully registers with a Mesos
77  // master. A unique ID (generated by the master) used for
78  // distinguishing this framework from others and MasterInfo with the
79  // ip and port of the current master are provided as arguments.
80  virtual void registered(
81  SchedulerDriver* driver,
82  const FrameworkID& frameworkId,
83  const MasterInfo& masterInfo) = 0;
84 
85  // Invoked when the scheduler reregisters with a newly elected
86  // Mesos master. This is only called when the scheduler has
87  // previously been registered. MasterInfo containing the updated
88  // information about the elected master is provided as an argument.
89  virtual void reregistered(
90  SchedulerDriver* driver,
91  const MasterInfo& masterInfo) = 0;
92 
93  // Invoked when the scheduler becomes "disconnected" from the master
94  // (e.g., the master fails and another is taking over).
95  virtual void disconnected(SchedulerDriver* driver) = 0;
96 
97  // Invoked when resources have been offered to this framework. A
98  // single offer will only contain resources from a single slave.
99  // Resources associated with an offer will not be re-offered to
100  // _this_ framework until either (a) this framework has rejected
101  // those resources (see SchedulerDriver::launchTasks) or (b) those
102  // resources have been rescinded (see Scheduler::offerRescinded).
103  // Note that resources may be concurrently offered to more than one
104  // framework at a time (depending on the allocator being used). In
105  // that case, the first framework to launch tasks using those
106  // resources will be able to use them while the other frameworks
107  // will have those resources rescinded (or if a framework has
108  // already launched tasks with those resources then those tasks will
109  // fail with a TASK_LOST status and a message saying as much).
110  virtual void resourceOffers(
111  SchedulerDriver* driver,
112  const std::vector<Offer>& offers) = 0;
113 
114  // Invoked when an offer is no longer valid (e.g., the slave was
115  // lost or another framework used resources in the offer). If for
116  // whatever reason an offer is never rescinded (e.g., dropped
117  // message, failing over framework, etc.), a framework that attempts
118  // to launch tasks using an invalid offer will receive TASK_LOST
119  // status updates for those tasks (see Scheduler::resourceOffers).
120  virtual void offerRescinded(
121  SchedulerDriver* driver,
122  const OfferID& offerId) = 0;
123 
124  // Invoked when the status of a task has changed (e.g., a slave is
125  // lost and so the task is lost, a task finishes and an executor
126  // sends a status update saying so, etc). If implicit
127  // acknowledgements are being used, then returning from this
128  // callback _acknowledges_ receipt of this status update! If for
129  // whatever reason the scheduler aborts during this callback (or
130  // the process exits) another status update will be delivered (note,
131  // however, that this is currently not true if the slave sending the
132  // status update is lost/fails during that time). If explicit
133  // acknowledgements are in use, the scheduler must acknowledge this
134  // status on the driver.
135  virtual void statusUpdate(
136  SchedulerDriver* driver,
137  const TaskStatus& status) = 0;
138 
139  // Invoked when an executor sends a message. These messages are best
140  // effort; do not expect a framework message to be retransmitted in
141  // any reliable fashion.
142  virtual void frameworkMessage(
143  SchedulerDriver* driver,
144  const ExecutorID& executorId,
145  const SlaveID& slaveId,
146  const std::string& data) = 0;
147 
148  // Invoked when a slave has been determined unreachable (e.g.,
149  // machine failure, network partition). Most frameworks will need to
150  // reschedule any tasks launched on this slave on a new slave.
151  //
152  // NOTE: This callback is not reliably delivered. If a host or
153  // network failure causes messages between the master and the
154  // scheduler to be dropped, this callback may not be invoked.
155  virtual void slaveLost(
156  SchedulerDriver* driver,
157  const SlaveID& slaveId) = 0;
158 
159  // Invoked when an executor has exited/terminated. Note that any
160  // tasks running will have TASK_LOST status updates automagically
161  // generated.
162  //
163  // NOTE: This callback is not reliably delivered. If a host or
164  // network failure causes messages between the master and the
165  // scheduler to be dropped, this callback may not be invoked.
166  virtual void executorLost(
167  SchedulerDriver* driver,
168  const ExecutorID& executorId,
169  const SlaveID& slaveId,
170  int status) = 0;
171 
172  // Invoked when there is an unrecoverable error in the scheduler or
173  // scheduler driver. The driver will be aborted BEFORE invoking this
174  // callback.
175  virtual void error(
176  SchedulerDriver* driver,
177  const std::string& message) = 0;
178 };
179 
180 
181 // Abstract interface for connecting a scheduler to Mesos. This
182 // interface is used both to manage the scheduler's lifecycle (start
183 // it, stop it, or wait for it to finish) and to interact with Mesos
184 // (e.g., launch tasks, kill tasks, etc.). See MesosSchedulerDriver
185 // below for a concrete example of a SchedulerDriver.
187 {
188 public:
189  // Empty virtual destructor (necessary to instantiate subclasses).
190  // It is expected that 'stop()' is called before this is called.
191  virtual ~SchedulerDriver() {}
192 
193  // Starts the scheduler driver. This needs to be called before any
194  // other driver calls are made.
195  virtual Status start() = 0;
196 
197  // Stops the scheduler driver. If the 'failover' flag is set to
198  // false then it is expected that this framework will never
199  // reconnect to Mesos. So Mesos will unregister the framework and
200  // shutdown all its tasks and executors. If 'failover' is true, all
201  // executors and tasks will remain running (for some framework
202  // specific failover timeout) allowing the scheduler to reconnect
203  // (possibly in the same process, or from a different process, for
204  // example, on a different machine).
205  virtual Status stop(bool failover = false) = 0;
206 
207  // Aborts the driver so that no more callbacks can be made to the
208  // scheduler. The semantics of abort and stop have deliberately been
209  // separated so that code can detect an aborted driver (i.e., via
210  // the return status of SchedulerDriver::join, see below), and
211  // instantiate and start another driver if desired (from within the
212  // same process). Note that 'stop()' is not automatically called
213  // inside 'abort()'.
214  virtual Status abort() = 0;
215 
216  // Waits for the driver to be stopped or aborted, possibly
217  // _blocking_ the current thread indefinitely. The return status of
218  // this function can be used to determine if the driver was aborted
219  // (see mesos.proto for a description of Status).
220  virtual Status join() = 0;
221 
222  // Starts and immediately joins (i.e., blocks on) the driver.
223  virtual Status run() = 0;
224 
225  // Requests resources from Mesos (see mesos.proto for a description
226  // of Request and how, for example, to request resources from
227  // specific slaves). Any resources available are offered to the
228  // framework via Scheduler::resourceOffers callback, asynchronously.
229  virtual Status requestResources(const std::vector<Request>& requests) = 0;
230 
231  // Launches the given set of tasks. Any remaining resources (i.e.,
232  // those that are not used by the launched tasks or their executors)
233  // will be considered declined. Note that this includes resources
234  // used by tasks that the framework attempted to launch but failed
235  // (with TASK_ERROR) due to a malformed task description.
236  //
237  // The specified filters are applied on all unused resources (see
238  // mesos.proto for a description of Filters). Note that the default
239  // argument includes a 5-second `refuse_offers` filter.
240  //
241  // Available resources are aggregated when multiple offers are provided.
242  // Note that all offers must belong to the same slave. Invoking this
243  // function with an empty collection of tasks declines offers in their
244  // entirety (see Scheduler::declineOffer).
245  virtual Status launchTasks(
246  const std::vector<OfferID>& offerIds,
247  const std::vector<TaskInfo>& tasks,
248  const Filters& filters = Filters()) = 0;
249 
250  // DEPRECATED: Use launchTasks(offerIds, tasks, filters) instead.
251  virtual Status launchTasks(
252  const OfferID& offerId,
253  const std::vector<TaskInfo>& tasks,
254  const Filters& filters = Filters()) = 0;
255 
256  // Kills the specified task. Note that attempting to kill a task is
257  // currently not reliable. If, for example, a scheduler fails over
258  // while it was attempting to kill a task it will need to retry in
259  // the future. Likewise, if unregistered / disconnected, the request
260  // will be dropped (these semantics may be changed in the future).
261  virtual Status killTask(const TaskID& taskId) = 0;
262 
263  // Accepts the given offers and performs a sequence of operations on
264  // those accepted offers. See Offer.Operation in mesos.proto for the
265  // set of available operations. Any remaining resources (i.e., those
266  // that are not used by the launched tasks or their executors) will
267  // be considered declined. Note that this includes resources used by
268  // tasks that the framework attempted to launch but failed (with
269  // TASK_ERROR) due to a malformed task description. The specified
270  // filters are applied on all unused resources (see mesos.proto for
271  // a description of Filters). Available resources are aggregated
272  // when multiple offers are provided. Note that all offers must
273  // belong to the same slave.
274  virtual Status acceptOffers(
275  const std::vector<OfferID>& offerIds,
276  const std::vector<Offer::Operation>& operations,
277  const Filters& filters = Filters()) = 0;
278 
279  // Declines an offer in its entirety and applies the specified
280  // filters on the resources (see mesos.proto for a description of
281  // Filters). Note that this can be done at any time, it is not
282  // necessary to do this within the Scheduler::resourceOffers
283  // callback.
284  virtual Status declineOffer(
285  const OfferID& offerId,
286  const Filters& filters = Filters()) = 0;
287 
288  // Removes all filters previously set by the framework (via launchTasks()
289  // or declineOffer()) and clears the set of suppressed roles.
290  //
291  // NOTE: If the framework is not connected to the master, the set
292  // of suppressed roles stored by the driver will be cleared, and an
293  // up-to-date set of suppressed roles will be sent to the master
294  // during re-registration.
295  virtual Status reviveOffers() = 0;
296 
297  // Removes filters for the specified roles and removes these roles from
298  // the suppressed set. If the framework is not connected to the master,
299  // an up-to-date set of suppressed roles will be sent to the master
300  // during re-registration.
301  //
302  // NOTE: If 'roles' is empty, this method does nothing.
303  virtual Status reviveOffers(const std::vector<std::string>& roles) = 0;
304 
305  // Informs Mesos master to stop sending offers to the framework (i.e.
306  // to suppress all roles of the framework). To resume getting offers,
307  // the scheduler can call reviveOffers() or set the suppressed roles
308  // explicitly via updateFramework().
309  //
310  // NOTE: If the framework is not connected to the master, all the roles
311  // will be added to the set of suppressed roles in the driver, and an
312  // up-to-date suppressed roles set will be sent to the master during
313  // re-registration.
314  virtual Status suppressOffers() = 0;
315 
316  // Adds the roles to the suppressed set. If the framework is not connected
317  // to the master, an up-to-date set of suppressed roles will be sent to
318  // the master during re-registration.
319  //
320  // NOTE: If 'roles' is empty, this method does nothing.
321  virtual Status suppressOffers(const std::vector<std::string>& roles) = 0;
322 
323  // Acknowledges the status update. This should only be called
324  // once the status update is processed durably by the scheduler.
325  // Not that explicit acknowledgements must be requested via the
326  // constructor argument, otherwise a call to this method will
327  // cause the driver to crash.
328  virtual Status acknowledgeStatusUpdate(
329  const TaskStatus& status) = 0;
330 
331  // Sends a message from the framework to one of its executors. These
332  // messages are best effort; do not expect a framework message to be
333  // retransmitted in any reliable fashion.
334  virtual Status sendFrameworkMessage(
335  const ExecutorID& executorId,
336  const SlaveID& slaveId,
337  const std::string& data) = 0;
338 
339  // Allows the framework to query the status for non-terminal tasks.
340  // This causes the master to send back the latest task status for
341  // each task in 'statuses', if possible. Tasks that are no longer
342  // known will result in a TASK_LOST update. If statuses is empty,
343  // then the master will send the latest status for each task
344  // currently known.
345  virtual Status reconcileTasks(
346  const std::vector<TaskStatus>& statuses) = 0;
347 
348  // Requests Mesos master to change the `FrameworkInfo`, the set of suppressed
349  // roles and the offer constraints. The driver will store the new
350  // `FrameworkInfo`, the new set of suppressed roles and the new offer
351  // constraints, and all subsequent re-registrations will use them.
352  //
353  // NOTE: If the supplied info is invalid or fails authorization,
354  // or the supplied offer constraints are not valid, the `error()` callback
355  // will be invoked asynchronously (after the master replies with a
356  // `FrameworkErrorMessage`). Note that validity of non-empty (i.e.
357  // not default-constructed) offer constraints may depend on master flags.
358  //
359  // NOTE: This must be called after initial registration with the
360  // master completes and the `FrameworkID` is assigned. The assigned
361  // `FrameworkID` must be set in `frameworkInfo`.
362  //
363  // NOTE: The `FrameworkInfo.user` and `FrameworkInfo.hostname`
364  // fields will be auto-populated using the same approach used
365  // during driver initialization.
366  virtual Status updateFramework(
367  const FrameworkInfo& frameworkInfo,
368  const std::vector<std::string>& suppressedRoles,
369  ::mesos::scheduler::OfferConstraints&& offerConstraints) = 0;
370 };
371 
372 
373 // Concrete implementation of a SchedulerDriver that connects a
374 // Scheduler with a Mesos master. The MesosSchedulerDriver is
375 // thread-safe.
376 //
377 // Note that scheduler failover is supported in Mesos. After a
378 // scheduler is registered with Mesos it may failover (to a new
379 // process on the same machine or across multiple machines) by
380 // creating a new driver with the ID given to it in
381 // Scheduler::registered.
382 //
383 // The driver is responsible for invoking the Scheduler callbacks as
384 // it communicates with the Mesos master.
385 //
386 // Note that blocking on the MesosSchedulerDriver (e.g., via
387 // MesosSchedulerDriver::join) doesn't affect the scheduler callbacks
388 // in anyway because they are handled by a different thread.
389 //
390 // Note that the driver uses GLOG to do its own logging. GLOG flags
391 // can be set via environment variables, prefixing the flag name with
392 // "GLOG_", e.g., "GLOG_v=1". For Mesos specific logging flags see
393 // src/logging/flags.hpp. Mesos flags can also be set via environment
394 // variables, prefixing the flag name with "MESOS_", e.g.,
395 // "MESOS_QUIET=1".
396 //
397 // See src/examples/test_framework.cpp for an example of using the
398 // MesosSchedulerDriver.
400 {
401 public:
402  // Creates a new driver for the specified scheduler. The master
403  // should be one of:
404  //
405  // host:port
406  // zk://host1:port1,host2:port2,.../path
407  // zk://username:password@host1:port1,host2:port2,.../path
408  // file:///path/to/file (where file contains one of the above)
409  //
410  // The driver will attempt to "failover" if the specified
411  // FrameworkInfo includes a valid FrameworkID.
412  //
413  // Any Mesos configuration options are read from environment
414  // variables, as well as any configuration files found through the
415  // environment variables.
416  //
417  // TODO(vinod): Deprecate this once 'MesosSchedulerDriver' can take
418  // 'Option<Credential>' as parameter. Currently it cannot because
419  // 'stout' is not visible from here.
421  Scheduler* scheduler,
422  const FrameworkInfo& framework,
423  const std::string& master);
424 
425  // Same as the above constructor but takes 'credential' as argument.
426  // The credential will be used for authenticating with the master.
428  Scheduler* scheduler,
429  const FrameworkInfo& framework,
430  const std::string& master,
431  const Credential& credential);
432 
433  // These constructors are the same as the above two, but allow
434  // the framework to specify whether implicit or explicit
435  // acknowledgements are desired. See statusUpdate() for the
436  // details about explicit acknowledgements.
437  //
438  // TODO(bmahler): Deprecate the above two constructors. In 0.22.0
439  // these new constructors are exposed.
441  Scheduler* scheduler,
442  const FrameworkInfo& framework,
443  const std::string& master,
444  bool implicitAcknowledgements);
445 
447  Scheduler* scheduler,
448  const FrameworkInfo& framework,
449  const std::string& master,
450  bool implicitAcknowlegements,
451  const Credential& credential);
452 
453  // These constructors are the same as the above two, but allow
454  // the framework to also specify the initial set of suppressed roles.
456  Scheduler* scheduler,
457  const FrameworkInfo& framework,
458  const std::vector<std::string>& suppressedRoles,
459  const std::string& master,
460  bool implicitAcknowledgements);
461 
463  Scheduler* scheduler,
464  const FrameworkInfo& framework,
465  const std::vector<std::string>& suppressedRoles,
466  const std::string& master,
467  bool implicitAcknowlegements,
468  const Credential& credential);
469 
470 
471  // This destructor will block indefinitely if
472  // MesosSchedulerDriver::start was invoked successfully (possibly
473  // via MesosSchedulerDriver::run) and MesosSchedulerDriver::stop has
474  // not been invoked.
475  ~MesosSchedulerDriver() override;
476 
477  // See SchedulerDriver for descriptions of these.
478  Status start() override;
479  Status stop(bool failover = false) override;
480  Status abort() override;
481  Status join() override;
482  Status run() override;
483 
484  Status requestResources(
485  const std::vector<Request>& requests) override;
486 
487  // TODO(nnielsen): launchTasks using single offer is deprecated.
488  // Use launchTasks with offer list instead.
489  Status launchTasks(
490  const OfferID& offerId,
491  const std::vector<TaskInfo>& tasks,
492  const Filters& filters = Filters()) override;
493 
494  Status launchTasks(
495  const std::vector<OfferID>& offerIds,
496  const std::vector<TaskInfo>& tasks,
497  const Filters& filters = Filters()) override;
498 
499  Status killTask(const TaskID& taskId) override;
500 
501  Status acceptOffers(
502  const std::vector<OfferID>& offerIds,
503  const std::vector<Offer::Operation>& operations,
504  const Filters& filters = Filters()) override;
505 
506  Status declineOffer(
507  const OfferID& offerId,
508  const Filters& filters = Filters()) override;
509 
510  Status reviveOffers() override;
511 
512  Status reviveOffers(const std::vector<std::string>& roles) override;
513 
514  Status suppressOffers() override;
515 
516  Status suppressOffers(const std::vector<std::string>& roles) override;
517 
518  Status acknowledgeStatusUpdate(
519  const TaskStatus& status) override;
520 
521  Status sendFrameworkMessage(
522  const ExecutorID& executorId,
523  const SlaveID& slaveId,
524  const std::string& data) override;
525 
526  Status reconcileTasks(
527  const std::vector<TaskStatus>& statuses) override;
528 
529  Status updateFramework(
530  const FrameworkInfo& frameworkInfo,
531  const std::vector<std::string>& suppressedRoles,
532  ::mesos::scheduler::OfferConstraints&& offerConstraints)
533  override;
534 
535 protected:
536  // Used to detect (i.e., choose) the master.
537  std::shared_ptr<master::detector::MasterDetector> detector;
538 
539 private:
540  void initialize();
541 
542  Scheduler* scheduler;
543  FrameworkInfo framework;
544  const std::vector<std::string> initialSuppressedRoles;
545  std::string master;
546 
547  // Used for communicating with the master.
548  internal::SchedulerProcess* process;
549 
550  // URL for the master (e.g., zk://, file://, etc).
551  std::string url;
552 
553  // Mutex for enforcing serial execution of all non-callbacks.
554  std::recursive_mutex mutex;
555 
556  // Latch for waiting until driver terminates.
557  process::Latch* latch;
558 
559  // Current status of the driver.
560  Status status;
561 
562  const bool implicitAcknowlegements;
563 
564  const Credential* credential;
565 
566  // Scheduler process ID.
567  std::string schedulerId;
568 };
569 
570 } // namespace mesos {
571 
572 #endif // __MESOS_SCHEDULER_HPP__
Definition: master.hpp:27
virtual ~SchedulerDriver()
Definition: scheduler.hpp:191
bool initialize(const Option< std::string > &delegate=None(), const Option< std::string > &readwriteAuthenticationRealm=None(), const Option< std::string > &readonlyAuthenticationRealm=None())
Initialize the library.
Result< ProcessStatus > status(pid_t pid)
Definition: proc.hpp:166
Definition: scheduler.hpp:399
std::string join(const std::string &path1, const std::string &path2, const char _separator=os::PATH_SEPARATOR)
Definition: path.hpp:116
virtual ~Scheduler()
Definition: scheduler.hpp:74
Try< Nothing > start(const std::string &name)
Starts the slice with the given name (via &#39;systemctl start <name>&#39;).
Definition: scheduler.hpp:186
Result< std::vector< Filter< Classifier > > > filters(const std::string &_link, const Handle &parent)
Definition: internal.hpp:769
Definition: agent.hpp:25
Definition: scheduler.hpp:70
Future< R > run(R(*method)())
Definition: run.hpp:55
std::shared_ptr< master::detector::MasterDetector > detector
Definition: scheduler.hpp:537
Result< Process > process(pid_t pid)
Definition: freebsd.hpp:30
Definition: attributes.hpp:24
std::string error(const std::string &msg, uint32_t code)
Definition: executor.hpp:48
Definition: latch.hpp:24