Apache Mesos
scheduler.hpp
Go to the documentation of this file.
1 // Licensed to the Apache Software Foundation (ASF) under one
2 // or more contributor license agreements. See the NOTICE file
3 // distributed with this work for additional information
4 // regarding copyright ownership. The ASF licenses this file
5 // to you under the Apache License, Version 2.0 (the
6 // "License"); you may not use this file except in compliance
7 // with the License. You may obtain a copy of the License at
8 //
9 // http://www.apache.org/licenses/LICENSE-2.0
10 //
11 // Unless required by applicable law or agreed to in writing, software
12 // distributed under the License is distributed on an "AS IS" BASIS,
13 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 // See the License for the specific language governing permissions and
15 // limitations under the License.
16 
17 #ifndef __MESOS_SCHEDULER_HPP__
18 #define __MESOS_SCHEDULER_HPP__
19 
20 #include <memory>
21 #include <mutex>
22 #include <string>
23 #include <vector>
24 
25 #include <mesos/mesos.hpp>
26 
27 // Mesos scheduler interface and scheduler driver. A scheduler is used
28 // to interact with Mesos in order to run distributed computations.
29 //
30 // IF YOU FIND YOURSELF MODIFYING COMMENTS HERE PLEASE CONSIDER MAKING
31 // THE SAME MODIFICATIONS FOR OTHER LANGUAGE BINDINGS (e.g., Java:
32 // src/java/src/org/apache/mesos, Python: src/python/src, etc.).
33 
34 // Forward declaration.
35 namespace process {
36 class Latch;
37 } // namespace process {
38 
39 namespace mesos {
40 
41 // A few forward declarations.
42 class SchedulerDriver;
43 
44 namespace scheduler {
45 class MesosProcess;
46 } // namespace scheduler {
47 
48 namespace internal {
49 class SchedulerProcess;
50 } // namespace internal {
51 
52 namespace master {
53 namespace detector {
54 class MasterDetector;
55 } // namespace detector {
56 } // namespace master {
57 
58 // Callback interface to be implemented by frameworks' schedulers.
59 // Note that only one callback will be invoked at a time, so it is not
60 // recommended that you block within a callback because it may cause a
61 // deadlock.
62 //
63 // Each callback includes a pointer to the scheduler driver that was
64 // used to run this scheduler. The pointer will not change for the
65 // duration of a scheduler (i.e., from the point you do
66 // SchedulerDriver::start() to the point that SchedulerDriver::join()
67 // returns). This is intended for convenience so that a scheduler
68 // doesn't need to store a pointer to the driver itself.
69 class Scheduler
70 {
71 public:
72  // Empty virtual destructor (necessary to instantiate subclasses).
73  virtual ~Scheduler() {}
74 
75  // Invoked when the scheduler successfully registers with a Mesos
76  // master. A unique ID (generated by the master) used for
77  // distinguishing this framework from others and MasterInfo with the
78  // ip and port of the current master are provided as arguments.
79  virtual void registered(
80  SchedulerDriver* driver,
81  const FrameworkID& frameworkId,
82  const MasterInfo& masterInfo) = 0;
83 
84  // Invoked when the scheduler reregisters with a newly elected
85  // Mesos master. This is only called when the scheduler has
86  // previously been registered. MasterInfo containing the updated
87  // information about the elected master is provided as an argument.
88  virtual void reregistered(
89  SchedulerDriver* driver,
90  const MasterInfo& masterInfo) = 0;
91 
92  // Invoked when the scheduler becomes "disconnected" from the master
93  // (e.g., the master fails and another is taking over).
94  virtual void disconnected(SchedulerDriver* driver) = 0;
95 
96  // Invoked when resources have been offered to this framework. A
97  // single offer will only contain resources from a single slave.
98  // Resources associated with an offer will not be re-offered to
99  // _this_ framework until either (a) this framework has rejected
100  // those resources (see SchedulerDriver::launchTasks) or (b) those
101  // resources have been rescinded (see Scheduler::offerRescinded).
102  // Note that resources may be concurrently offered to more than one
103  // framework at a time (depending on the allocator being used). In
104  // that case, the first framework to launch tasks using those
105  // resources will be able to use them while the other frameworks
106  // will have those resources rescinded (or if a framework has
107  // already launched tasks with those resources then those tasks will
108  // fail with a TASK_LOST status and a message saying as much).
109  virtual void resourceOffers(
110  SchedulerDriver* driver,
111  const std::vector<Offer>& offers) = 0;
112 
113  // Invoked when an offer is no longer valid (e.g., the slave was
114  // lost or another framework used resources in the offer). If for
115  // whatever reason an offer is never rescinded (e.g., dropped
116  // message, failing over framework, etc.), a framework that attempts
117  // to launch tasks using an invalid offer will receive TASK_LOST
118  // status updates for those tasks (see Scheduler::resourceOffers).
119  virtual void offerRescinded(
120  SchedulerDriver* driver,
121  const OfferID& offerId) = 0;
122 
123  // Invoked when the status of a task has changed (e.g., a slave is
124  // lost and so the task is lost, a task finishes and an executor
125  // sends a status update saying so, etc). If implicit
126  // acknowledgements are being used, then returning from this
127  // callback _acknowledges_ receipt of this status update! If for
128  // whatever reason the scheduler aborts during this callback (or
129  // the process exits) another status update will be delivered (note,
130  // however, that this is currently not true if the slave sending the
131  // status update is lost/fails during that time). If explicit
132  // acknowledgements are in use, the scheduler must acknowledge this
133  // status on the driver.
134  virtual void statusUpdate(
135  SchedulerDriver* driver,
136  const TaskStatus& status) = 0;
137 
138  // Invoked when an executor sends a message. These messages are best
139  // effort; do not expect a framework message to be retransmitted in
140  // any reliable fashion.
141  virtual void frameworkMessage(
142  SchedulerDriver* driver,
143  const ExecutorID& executorId,
144  const SlaveID& slaveId,
145  const std::string& data) = 0;
146 
147  // Invoked when a slave has been determined unreachable (e.g.,
148  // machine failure, network partition). Most frameworks will need to
149  // reschedule any tasks launched on this slave on a new slave.
150  //
151  // NOTE: This callback is not reliably delivered. If a host or
152  // network failure causes messages between the master and the
153  // scheduler to be dropped, this callback may not be invoked.
154  virtual void slaveLost(
155  SchedulerDriver* driver,
156  const SlaveID& slaveId) = 0;
157 
158  // Invoked when an executor has exited/terminated. Note that any
159  // tasks running will have TASK_LOST status updates automagically
160  // generated.
161  //
162  // NOTE: This callback is not reliably delivered. If a host or
163  // network failure causes messages between the master and the
164  // scheduler to be dropped, this callback may not be invoked.
165  virtual void executorLost(
166  SchedulerDriver* driver,
167  const ExecutorID& executorId,
168  const SlaveID& slaveId,
169  int status) = 0;
170 
171  // Invoked when there is an unrecoverable error in the scheduler or
172  // scheduler driver. The driver will be aborted BEFORE invoking this
173  // callback.
174  virtual void error(
175  SchedulerDriver* driver,
176  const std::string& message) = 0;
177 };
178 
179 
180 // Abstract interface for connecting a scheduler to Mesos. This
181 // interface is used both to manage the scheduler's lifecycle (start
182 // it, stop it, or wait for it to finish) and to interact with Mesos
183 // (e.g., launch tasks, kill tasks, etc.). See MesosSchedulerDriver
184 // below for a concrete example of a SchedulerDriver.
186 {
187 public:
188  // Empty virtual destructor (necessary to instantiate subclasses).
189  // It is expected that 'stop()' is called before this is called.
190  virtual ~SchedulerDriver() {}
191 
192  // Starts the scheduler driver. This needs to be called before any
193  // other driver calls are made.
194  virtual Status start() = 0;
195 
196  // Stops the scheduler driver. If the 'failover' flag is set to
197  // false then it is expected that this framework will never
198  // reconnect to Mesos. So Mesos will unregister the framework and
199  // shutdown all its tasks and executors. If 'failover' is true, all
200  // executors and tasks will remain running (for some framework
201  // specific failover timeout) allowing the scheduler to reconnect
202  // (possibly in the same process, or from a different process, for
203  // example, on a different machine).
204  virtual Status stop(bool failover = false) = 0;
205 
206  // Aborts the driver so that no more callbacks can be made to the
207  // scheduler. The semantics of abort and stop have deliberately been
208  // separated so that code can detect an aborted driver (i.e., via
209  // the return status of SchedulerDriver::join, see below), and
210  // instantiate and start another driver if desired (from within the
211  // same process). Note that 'stop()' is not automatically called
212  // inside 'abort()'.
213  virtual Status abort() = 0;
214 
215  // Waits for the driver to be stopped or aborted, possibly
216  // _blocking_ the current thread indefinitely. The return status of
217  // this function can be used to determine if the driver was aborted
218  // (see mesos.proto for a description of Status).
219  virtual Status join() = 0;
220 
221  // Starts and immediately joins (i.e., blocks on) the driver.
222  virtual Status run() = 0;
223 
224  // Requests resources from Mesos (see mesos.proto for a description
225  // of Request and how, for example, to request resources from
226  // specific slaves). Any resources available are offered to the
227  // framework via Scheduler::resourceOffers callback, asynchronously.
228  virtual Status requestResources(const std::vector<Request>& requests) = 0;
229 
230  // Launches the given set of tasks. Any remaining resources (i.e.,
231  // those that are not used by the launched tasks or their executors)
232  // will be considered declined. Note that this includes resources
233  // used by tasks that the framework attempted to launch but failed
234  // (with TASK_ERROR) due to a malformed task description.
235  //
236  // The specified filters are applied on all unused resources (see
237  // mesos.proto for a description of Filters). Note that the default
238  // argument includes a 5-second `refuse_offers` filter.
239  //
240  // Available resources are aggregated when multiple offers are provided.
241  // Note that all offers must belong to the same slave. Invoking this
242  // function with an empty collection of tasks declines offers in their
243  // entirety (see Scheduler::declineOffer).
244  virtual Status launchTasks(
245  const std::vector<OfferID>& offerIds,
246  const std::vector<TaskInfo>& tasks,
247  const Filters& filters = Filters()) = 0;
248 
249  // DEPRECATED: Use launchTasks(offerIds, tasks, filters) instead.
250  virtual Status launchTasks(
251  const OfferID& offerId,
252  const std::vector<TaskInfo>& tasks,
253  const Filters& filters = Filters()) = 0;
254 
255  // Kills the specified task. Note that attempting to kill a task is
256  // currently not reliable. If, for example, a scheduler fails over
257  // while it was attempting to kill a task it will need to retry in
258  // the future. Likewise, if unregistered / disconnected, the request
259  // will be dropped (these semantics may be changed in the future).
260  virtual Status killTask(const TaskID& taskId) = 0;
261 
262  // Accepts the given offers and performs a sequence of operations on
263  // those accepted offers. See Offer.Operation in mesos.proto for the
264  // set of available operations. Any remaining resources (i.e., those
265  // that are not used by the launched tasks or their executors) will
266  // be considered declined. Note that this includes resources used by
267  // tasks that the framework attempted to launch but failed (with
268  // TASK_ERROR) due to a malformed task description. The specified
269  // filters are applied on all unused resources (see mesos.proto for
270  // a description of Filters). Available resources are aggregated
271  // when multiple offers are provided. Note that all offers must
272  // belong to the same slave.
273  virtual Status acceptOffers(
274  const std::vector<OfferID>& offerIds,
275  const std::vector<Offer::Operation>& operations,
276  const Filters& filters = Filters()) = 0;
277 
278  // Declines an offer in its entirety and applies the specified
279  // filters on the resources (see mesos.proto for a description of
280  // Filters). Note that this can be done at any time, it is not
281  // necessary to do this within the Scheduler::resourceOffers
282  // callback.
283  virtual Status declineOffer(
284  const OfferID& offerId,
285  const Filters& filters = Filters()) = 0;
286 
287  // Removes all filters previously set by the framework (via launchTasks()
288  // or declineOffer()) and clears the set of suppressed roles.
289  //
290  // NOTE: If the framework is not connected to the master, the set
291  // of suppressed roles stored by the driver will be cleared, and an
292  // up-to-date set of suppressed roles will be sent to the master
293  // during re-registration.
294  virtual Status reviveOffers() = 0;
295 
296  // Removes filters for the specified roles and removes these roles from
297  // the suppressed set. If the framework is not connected to the master,
298  // an up-to-date set of suppressed roles will be sent to the master
299  // during re-registration.
300  //
301  // NOTE: If 'roles' is empty, this method does nothing.
302  virtual Status reviveOffers(const std::vector<std::string>& roles) = 0;
303 
304  // Informs Mesos master to stop sending offers to the framework (i.e.
305  // to suppress all roles of the framework). To resume getting offers,
306  // the scheduler can call reviveOffers() or set the suppressed roles
307  // explicitly via updateFramework().
308  //
309  // NOTE: If the framework is not connected to the master, all the roles
310  // will be added to the set of suppressed roles in the driver, and an
311  // up-to-date suppressed roles set will be sent to the master during
312  // re-registration.
313  virtual Status suppressOffers() = 0;
314 
315  // Adds the roles to the suppressed set. If the framework is not connected
316  // to the master, an up-to-date set of suppressed roles will be sent to
317  // the master during re-registration.
318  //
319  // NOTE: If 'roles' is empty, this method does nothing.
320  virtual Status suppressOffers(const std::vector<std::string>& roles) = 0;
321 
322  // Acknowledges the status update. This should only be called
323  // once the status update is processed durably by the scheduler.
324  // Not that explicit acknowledgements must be requested via the
325  // constructor argument, otherwise a call to this method will
326  // cause the driver to crash.
327  virtual Status acknowledgeStatusUpdate(
328  const TaskStatus& status) = 0;
329 
330  // Sends a message from the framework to one of its executors. These
331  // messages are best effort; do not expect a framework message to be
332  // retransmitted in any reliable fashion.
333  virtual Status sendFrameworkMessage(
334  const ExecutorID& executorId,
335  const SlaveID& slaveId,
336  const std::string& data) = 0;
337 
338  // Allows the framework to query the status for non-terminal tasks.
339  // This causes the master to send back the latest task status for
340  // each task in 'statuses', if possible. Tasks that are no longer
341  // known will result in a TASK_LOST update. If statuses is empty,
342  // then the master will send the latest status for each task
343  // currently known.
344  virtual Status reconcileTasks(
345  const std::vector<TaskStatus>& statuses) = 0;
346 
347  // Inform Mesos master about changes to the `FrameworkInfo` and
348  // the set of suppressed roles. The driver will store the new
349  // `FrameworkInfo` and the new set of suppressed roles, and all
350  // subsequent re-registrations will use them.
351  //
352  // NOTE: If the supplied info is invalid or fails authorization,
353  // the `error()` callback will be invoked asynchronously (after
354  // the master replies with a `FrameworkErrorMessage`).
355  //
356  // NOTE: This must be called after initial registration with the
357  // master completes and the `FrameworkID` is assigned. The assigned
358  // `FrameworkID` must be set in `frameworkInfo`.
359  //
360  // NOTE: The `FrameworkInfo.user` and `FrameworkInfo.hostname`
361  // fields will be auto-populated using the same approach used
362  // during driver initialization.
363  virtual Status updateFramework(
364  const FrameworkInfo& frameworkInfo,
365  const std::vector<std::string>& suppressedRoles) = 0;
366 };
367 
368 
369 // Concrete implementation of a SchedulerDriver that connects a
370 // Scheduler with a Mesos master. The MesosSchedulerDriver is
371 // thread-safe.
372 //
373 // Note that scheduler failover is supported in Mesos. After a
374 // scheduler is registered with Mesos it may failover (to a new
375 // process on the same machine or across multiple machines) by
376 // creating a new driver with the ID given to it in
377 // Scheduler::registered.
378 //
379 // The driver is responsible for invoking the Scheduler callbacks as
380 // it communicates with the Mesos master.
381 //
382 // Note that blocking on the MesosSchedulerDriver (e.g., via
383 // MesosSchedulerDriver::join) doesn't affect the scheduler callbacks
384 // in anyway because they are handled by a different thread.
385 //
386 // Note that the driver uses GLOG to do its own logging. GLOG flags
387 // can be set via environment variables, prefixing the flag name with
388 // "GLOG_", e.g., "GLOG_v=1". For Mesos specific logging flags see
389 // src/logging/flags.hpp. Mesos flags can also be set via environment
390 // variables, prefixing the flag name with "MESOS_", e.g.,
391 // "MESOS_QUIET=1".
392 //
393 // See src/examples/test_framework.cpp for an example of using the
394 // MesosSchedulerDriver.
396 {
397 public:
398  // Creates a new driver for the specified scheduler. The master
399  // should be one of:
400  //
401  // host:port
402  // zk://host1:port1,host2:port2,.../path
403  // zk://username:password@host1:port1,host2:port2,.../path
404  // file:///path/to/file (where file contains one of the above)
405  //
406  // The driver will attempt to "failover" if the specified
407  // FrameworkInfo includes a valid FrameworkID.
408  //
409  // Any Mesos configuration options are read from environment
410  // variables, as well as any configuration files found through the
411  // environment variables.
412  //
413  // TODO(vinod): Deprecate this once 'MesosSchedulerDriver' can take
414  // 'Option<Credential>' as parameter. Currently it cannot because
415  // 'stout' is not visible from here.
417  Scheduler* scheduler,
418  const FrameworkInfo& framework,
419  const std::string& master);
420 
421  // Same as the above constructor but takes 'credential' as argument.
422  // The credential will be used for authenticating with the master.
424  Scheduler* scheduler,
425  const FrameworkInfo& framework,
426  const std::string& master,
427  const Credential& credential);
428 
429  // These constructors are the same as the above two, but allow
430  // the framework to specify whether implicit or explicit
431  // acknowledgements are desired. See statusUpdate() for the
432  // details about explicit acknowledgements.
433  //
434  // TODO(bmahler): Deprecate the above two constructors. In 0.22.0
435  // these new constructors are exposed.
437  Scheduler* scheduler,
438  const FrameworkInfo& framework,
439  const std::string& master,
440  bool implicitAcknowledgements);
441 
443  Scheduler* scheduler,
444  const FrameworkInfo& framework,
445  const std::string& master,
446  bool implicitAcknowlegements,
447  const Credential& credential);
448 
449  // These constructors are the same as the above two, but allow
450  // the framework to also specify the initial set of suppressed roles.
452  Scheduler* scheduler,
453  const FrameworkInfo& framework,
454  const std::vector<std::string>& suppressedRoles,
455  const std::string& master,
456  bool implicitAcknowledgements);
457 
459  Scheduler* scheduler,
460  const FrameworkInfo& framework,
461  const std::vector<std::string>& suppressedRoles,
462  const std::string& master,
463  bool implicitAcknowlegements,
464  const Credential& credential);
465 
466 
467  // This destructor will block indefinitely if
468  // MesosSchedulerDriver::start was invoked successfully (possibly
469  // via MesosSchedulerDriver::run) and MesosSchedulerDriver::stop has
470  // not been invoked.
471  ~MesosSchedulerDriver() override;
472 
473  // See SchedulerDriver for descriptions of these.
474  Status start() override;
475  Status stop(bool failover = false) override;
476  Status abort() override;
477  Status join() override;
478  Status run() override;
479 
480  Status requestResources(
481  const std::vector<Request>& requests) override;
482 
483  // TODO(nnielsen): launchTasks using single offer is deprecated.
484  // Use launchTasks with offer list instead.
485  Status launchTasks(
486  const OfferID& offerId,
487  const std::vector<TaskInfo>& tasks,
488  const Filters& filters = Filters()) override;
489 
490  Status launchTasks(
491  const std::vector<OfferID>& offerIds,
492  const std::vector<TaskInfo>& tasks,
493  const Filters& filters = Filters()) override;
494 
495  Status killTask(const TaskID& taskId) override;
496 
497  Status acceptOffers(
498  const std::vector<OfferID>& offerIds,
499  const std::vector<Offer::Operation>& operations,
500  const Filters& filters = Filters()) override;
501 
502  Status declineOffer(
503  const OfferID& offerId,
504  const Filters& filters = Filters()) override;
505 
506  Status reviveOffers() override;
507 
508  Status reviveOffers(const std::vector<std::string>& roles) override;
509 
510  Status suppressOffers() override;
511 
512  Status suppressOffers(const std::vector<std::string>& roles) override;
513 
514  Status acknowledgeStatusUpdate(
515  const TaskStatus& status) override;
516 
517  Status sendFrameworkMessage(
518  const ExecutorID& executorId,
519  const SlaveID& slaveId,
520  const std::string& data) override;
521 
522  Status reconcileTasks(
523  const std::vector<TaskStatus>& statuses) override;
524 
525  Status updateFramework(
526  const FrameworkInfo& frameworkInfo,
527  const std::vector<std::string>& suppressedRoles) override;
528 
529 protected:
530  // Used to detect (i.e., choose) the master.
531  std::shared_ptr<master::detector::MasterDetector> detector;
532 
533 private:
534  void initialize();
535 
536  Scheduler* scheduler;
537  FrameworkInfo framework;
538  const std::vector<std::string> initialSuppressedRoles;
539  std::string master;
540 
541  // Used for communicating with the master.
542  internal::SchedulerProcess* process;
543 
544  // URL for the master (e.g., zk://, file://, etc).
545  std::string url;
546 
547  // Mutex for enforcing serial execution of all non-callbacks.
548  std::recursive_mutex mutex;
549 
550  // Latch for waiting until driver terminates.
551  process::Latch* latch;
552 
553  // Current status of the driver.
554  Status status;
555 
556  const bool implicitAcknowlegements;
557 
558  const Credential* credential;
559 
560  // Scheduler process ID.
561  std::string schedulerId;
562 };
563 
564 } // namespace mesos {
565 
566 #endif // __MESOS_SCHEDULER_HPP__
Definition: master.hpp:27
virtual ~SchedulerDriver()
Definition: scheduler.hpp:190
bool initialize(const Option< std::string > &delegate=None(), const Option< std::string > &readwriteAuthenticationRealm=None(), const Option< std::string > &readonlyAuthenticationRealm=None())
Initialize the library.
Result< ProcessStatus > status(pid_t pid)
Definition: proc.hpp:166
Definition: scheduler.hpp:395
std::string join(const std::string &path1, const std::string &path2, const char _separator=os::PATH_SEPARATOR)
Definition: path.hpp:116
virtual ~Scheduler()
Definition: scheduler.hpp:73
Try< Nothing > start(const std::string &name)
Starts the slice with the given name (via &#39;systemctl start <name>&#39;).
Definition: scheduler.hpp:185
Result< std::vector< Filter< Classifier > > > filters(const std::string &_link, const Handle &parent)
Definition: internal.hpp:769
Definition: agent.hpp:25
Definition: scheduler.hpp:69
Future< R > run(R(*method)())
Definition: run.hpp:55
std::shared_ptr< master::detector::MasterDetector > detector
Definition: scheduler.hpp:531
Result< Process > process(pid_t pid)
Definition: freebsd.hpp:30
Definition: attributes.hpp:24
std::string error(const std::string &msg, uint32_t code)
Definition: executor.hpp:48
Definition: latch.hpp:24