Apache Mesos
scheduler.hpp
Go to the documentation of this file.
1 // Licensed to the Apache Software Foundation (ASF) under one
2 // or more contributor license agreements. See the NOTICE file
3 // distributed with this work for additional information
4 // regarding copyright ownership. The ASF licenses this file
5 // to you under the Apache License, Version 2.0 (the
6 // "License"); you may not use this file except in compliance
7 // with the License. You may obtain a copy of the License at
8 //
9 // http://www.apache.org/licenses/LICENSE-2.0
10 //
11 // Unless required by applicable law or agreed to in writing, software
12 // distributed under the License is distributed on an "AS IS" BASIS,
13 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 // See the License for the specific language governing permissions and
15 // limitations under the License.
16 
17 #ifndef __MESOS_SCHEDULER_HPP__
18 #define __MESOS_SCHEDULER_HPP__
19 
20 #include <memory>
21 #include <mutex>
22 #include <string>
23 #include <vector>
24 
25 #include <mesos/mesos.hpp>
26 
27 // Mesos scheduler interface and scheduler driver. A scheduler is used
28 // to interact with Mesos in order to run distributed computations.
29 //
30 // IF YOU FIND YOURSELF MODIFYING COMMENTS HERE PLEASE CONSIDER MAKING
31 // THE SAME MODIFICATIONS FOR OTHER LANGUAGE BINDINGS (e.g., Java:
32 // src/java/src/org/apache/mesos, Python: src/python/src, etc.).
33 
34 // Forward declaration.
35 namespace process {
36 class Latch;
37 } // namespace process {
38 
39 namespace mesos {
40 
41 // A few forward declarations.
42 class SchedulerDriver;
43 
44 namespace scheduler {
45 class MesosProcess;
46 } // namespace scheduler {
47 
48 namespace internal {
49 class SchedulerProcess;
50 } // namespace internal {
51 
52 namespace master {
53 namespace detector {
54 class MasterDetector;
55 } // namespace detector {
56 } // namespace master {
57 
58 // Callback interface to be implemented by frameworks' schedulers.
59 // Note that only one callback will be invoked at a time, so it is not
60 // recommended that you block within a callback because it may cause a
61 // deadlock.
62 //
63 // Each callback includes a pointer to the scheduler driver that was
64 // used to run this scheduler. The pointer will not change for the
65 // duration of a scheduler (i.e., from the point you do
66 // SchedulerDriver::start() to the point that SchedulerDriver::join()
67 // returns). This is intended for convenience so that a scheduler
68 // doesn't need to store a pointer to the driver itself.
69 class Scheduler
70 {
71 public:
72  // Empty virtual destructor (necessary to instantiate subclasses).
73  virtual ~Scheduler() {}
74 
75  // Invoked when the scheduler successfully registers with a Mesos
76  // master. A unique ID (generated by the master) used for
77  // distinguishing this framework from others and MasterInfo with the
78  // ip and port of the current master are provided as arguments.
79  virtual void registered(
80  SchedulerDriver* driver,
81  const FrameworkID& frameworkId,
82  const MasterInfo& masterInfo) = 0;
83 
84  // Invoked when the scheduler reregisters with a newly elected
85  // Mesos master. This is only called when the scheduler has
86  // previously been registered. MasterInfo containing the updated
87  // information about the elected master is provided as an argument.
88  virtual void reregistered(
89  SchedulerDriver* driver,
90  const MasterInfo& masterInfo) = 0;
91 
92  // Invoked when the scheduler becomes "disconnected" from the master
93  // (e.g., the master fails and another is taking over).
94  virtual void disconnected(SchedulerDriver* driver) = 0;
95 
96  // Invoked when resources have been offered to this framework. A
97  // single offer will only contain resources from a single slave.
98  // Resources associated with an offer will not be re-offered to
99  // _this_ framework until either (a) this framework has rejected
100  // those resources (see SchedulerDriver::launchTasks) or (b) those
101  // resources have been rescinded (see Scheduler::offerRescinded).
102  // Note that resources may be concurrently offered to more than one
103  // framework at a time (depending on the allocator being used). In
104  // that case, the first framework to launch tasks using those
105  // resources will be able to use them while the other frameworks
106  // will have those resources rescinded (or if a framework has
107  // already launched tasks with those resources then those tasks will
108  // fail with a TASK_LOST status and a message saying as much).
109  virtual void resourceOffers(
110  SchedulerDriver* driver,
111  const std::vector<Offer>& offers) = 0;
112 
113  // Invoked when an offer is no longer valid (e.g., the slave was
114  // lost or another framework used resources in the offer). If for
115  // whatever reason an offer is never rescinded (e.g., dropped
116  // message, failing over framework, etc.), a framework that attempts
117  // to launch tasks using an invalid offer will receive TASK_LOST
118  // status updates for those tasks (see Scheduler::resourceOffers).
119  virtual void offerRescinded(
120  SchedulerDriver* driver,
121  const OfferID& offerId) = 0;
122 
123  // Invoked when the status of a task has changed (e.g., a slave is
124  // lost and so the task is lost, a task finishes and an executor
125  // sends a status update saying so, etc). If implicit
126  // acknowledgements are being used, then returning from this
127  // callback _acknowledges_ receipt of this status update! If for
128  // whatever reason the scheduler aborts during this callback (or
129  // the process exits) another status update will be delivered (note,
130  // however, that this is currently not true if the slave sending the
131  // status update is lost/fails during that time). If explicit
132  // acknowledgements are in use, the scheduler must acknowledge this
133  // status on the driver.
134  virtual void statusUpdate(
135  SchedulerDriver* driver,
136  const TaskStatus& status) = 0;
137 
138  // Invoked when an executor sends a message. These messages are best
139  // effort; do not expect a framework message to be retransmitted in
140  // any reliable fashion.
141  virtual void frameworkMessage(
142  SchedulerDriver* driver,
143  const ExecutorID& executorId,
144  const SlaveID& slaveId,
145  const std::string& data) = 0;
146 
147  // Invoked when a slave has been determined unreachable (e.g.,
148  // machine failure, network partition). Most frameworks will need to
149  // reschedule any tasks launched on this slave on a new slave.
150  //
151  // NOTE: This callback is not reliably delivered. If a host or
152  // network failure causes messages between the master and the
153  // scheduler to be dropped, this callback may not be invoked.
154  virtual void slaveLost(
155  SchedulerDriver* driver,
156  const SlaveID& slaveId) = 0;
157 
158  // Invoked when an executor has exited/terminated. Note that any
159  // tasks running will have TASK_LOST status updates automagically
160  // generated.
161  //
162  // NOTE: This callback is not reliably delivered. If a host or
163  // network failure causes messages between the master and the
164  // scheduler to be dropped, this callback may not be invoked.
165  virtual void executorLost(
166  SchedulerDriver* driver,
167  const ExecutorID& executorId,
168  const SlaveID& slaveId,
169  int status) = 0;
170 
171  // Invoked when there is an unrecoverable error in the scheduler or
172  // scheduler driver. The driver will be aborted BEFORE invoking this
173  // callback.
174  virtual void error(
175  SchedulerDriver* driver,
176  const std::string& message) = 0;
177 };
178 
179 
180 // Abstract interface for connecting a scheduler to Mesos. This
181 // interface is used both to manage the scheduler's lifecycle (start
182 // it, stop it, or wait for it to finish) and to interact with Mesos
183 // (e.g., launch tasks, kill tasks, etc.). See MesosSchedulerDriver
184 // below for a concrete example of a SchedulerDriver.
186 {
187 public:
188  // Empty virtual destructor (necessary to instantiate subclasses).
189  // It is expected that 'stop()' is called before this is called.
190  virtual ~SchedulerDriver() {}
191 
192  // Starts the scheduler driver. This needs to be called before any
193  // other driver calls are made.
194  virtual Status start() = 0;
195 
196  // Stops the scheduler driver. If the 'failover' flag is set to
197  // false then it is expected that this framework will never
198  // reconnect to Mesos. So Mesos will unregister the framework and
199  // shutdown all its tasks and executors. If 'failover' is true, all
200  // executors and tasks will remain running (for some framework
201  // specific failover timeout) allowing the scheduler to reconnect
202  // (possibly in the same process, or from a different process, for
203  // example, on a different machine).
204  virtual Status stop(bool failover = false) = 0;
205 
206  // Aborts the driver so that no more callbacks can be made to the
207  // scheduler. The semantics of abort and stop have deliberately been
208  // separated so that code can detect an aborted driver (i.e., via
209  // the return status of SchedulerDriver::join, see below), and
210  // instantiate and start another driver if desired (from within the
211  // same process). Note that 'stop()' is not automatically called
212  // inside 'abort()'.
213  virtual Status abort() = 0;
214 
215  // Waits for the driver to be stopped or aborted, possibly
216  // _blocking_ the current thread indefinitely. The return status of
217  // this function can be used to determine if the driver was aborted
218  // (see mesos.proto for a description of Status).
219  virtual Status join() = 0;
220 
221  // Starts and immediately joins (i.e., blocks on) the driver.
222  virtual Status run() = 0;
223 
224  // Requests resources from Mesos (see mesos.proto for a description
225  // of Request and how, for example, to request resources from
226  // specific slaves). Any resources available are offered to the
227  // framework via Scheduler::resourceOffers callback, asynchronously.
228  virtual Status requestResources(const std::vector<Request>& requests) = 0;
229 
230  // Launches the given set of tasks. Any remaining resources (i.e.,
231  // those that are not used by the launched tasks or their executors)
232  // will be considered declined. Note that this includes resources
233  // used by tasks that the framework attempted to launch but failed
234  // (with TASK_ERROR) due to a malformed task description. The
235  // specified filters are applied on all unused resources (see
236  // mesos.proto for a description of Filters). Available resources
237  // are aggregated when multiple offers are provided. Note that all
238  // offers must belong to the same slave. Invoking this function with
239  // an empty collection of tasks declines offers in their entirety
240  // (see Scheduler::declineOffer).
241  virtual Status launchTasks(
242  const std::vector<OfferID>& offerIds,
243  const std::vector<TaskInfo>& tasks,
244  const Filters& filters = Filters()) = 0;
245 
246  // DEPRECATED: Use launchTasks(offerIds, tasks, filters) instead.
247  virtual Status launchTasks(
248  const OfferID& offerId,
249  const std::vector<TaskInfo>& tasks,
250  const Filters& filters = Filters()) = 0;
251 
252  // Kills the specified task. Note that attempting to kill a task is
253  // currently not reliable. If, for example, a scheduler fails over
254  // while it was attempting to kill a task it will need to retry in
255  // the future. Likewise, if unregistered / disconnected, the request
256  // will be dropped (these semantics may be changed in the future).
257  virtual Status killTask(const TaskID& taskId) = 0;
258 
259  // Accepts the given offers and performs a sequence of operations on
260  // those accepted offers. See Offer.Operation in mesos.proto for the
261  // set of available operations. Any remaining resources (i.e., those
262  // that are not used by the launched tasks or their executors) will
263  // be considered declined. Note that this includes resources used by
264  // tasks that the framework attempted to launch but failed (with
265  // TASK_ERROR) due to a malformed task description. The specified
266  // filters are applied on all unused resources (see mesos.proto for
267  // a description of Filters). Available resources are aggregated
268  // when multiple offers are provided. Note that all offers must
269  // belong to the same slave.
270  virtual Status acceptOffers(
271  const std::vector<OfferID>& offerIds,
272  const std::vector<Offer::Operation>& operations,
273  const Filters& filters = Filters()) = 0;
274 
275  // Declines an offer in its entirety and applies the specified
276  // filters on the resources (see mesos.proto for a description of
277  // Filters). Note that this can be done at any time, it is not
278  // necessary to do this within the Scheduler::resourceOffers
279  // callback.
280  virtual Status declineOffer(
281  const OfferID& offerId,
282  const Filters& filters = Filters()) = 0;
283 
284  // Removes all filters previously set by the framework (via
285  // launchTasks()). This enables the framework to receive offers from
286  // those filtered slaves.
287  virtual Status reviveOffers() = 0;
288 
289  // Inform Mesos master to stop sending offers to the framework. The
290  // scheduler should call reviveOffers() to resume getting offers.
291  virtual Status suppressOffers() = 0;
292 
293  // Acknowledges the status update. This should only be called
294  // once the status update is processed durably by the scheduler.
295  // Not that explicit acknowledgements must be requested via the
296  // constructor argument, otherwise a call to this method will
297  // cause the driver to crash.
298  virtual Status acknowledgeStatusUpdate(
299  const TaskStatus& status) = 0;
300 
301  // Sends a message from the framework to one of its executors. These
302  // messages are best effort; do not expect a framework message to be
303  // retransmitted in any reliable fashion.
304  virtual Status sendFrameworkMessage(
305  const ExecutorID& executorId,
306  const SlaveID& slaveId,
307  const std::string& data) = 0;
308 
309  // Allows the framework to query the status for non-terminal tasks.
310  // This causes the master to send back the latest task status for
311  // each task in 'statuses', if possible. Tasks that are no longer
312  // known will result in a TASK_LOST update. If statuses is empty,
313  // then the master will send the latest status for each task
314  // currently known.
315  virtual Status reconcileTasks(
316  const std::vector<TaskStatus>& statuses) = 0;
317 };
318 
319 
320 // Concrete implementation of a SchedulerDriver that connects a
321 // Scheduler with a Mesos master. The MesosSchedulerDriver is
322 // thread-safe.
323 //
324 // Note that scheduler failover is supported in Mesos. After a
325 // scheduler is registered with Mesos it may failover (to a new
326 // process on the same machine or across multiple machines) by
327 // creating a new driver with the ID given to it in
328 // Scheduler::registered.
329 //
330 // The driver is responsible for invoking the Scheduler callbacks as
331 // it communicates with the Mesos master.
332 //
333 // Note that blocking on the MesosSchedulerDriver (e.g., via
334 // MesosSchedulerDriver::join) doesn't affect the scheduler callbacks
335 // in anyway because they are handled by a different thread.
336 //
337 // Note that the driver uses GLOG to do its own logging. GLOG flags
338 // can be set via environment variables, prefixing the flag name with
339 // "GLOG_", e.g., "GLOG_v=1". For Mesos specific logging flags see
340 // src/logging/flags.hpp. Mesos flags can also be set via environment
341 // variables, prefixing the flag name with "MESOS_", e.g.,
342 // "MESOS_QUIET=1".
343 //
344 // See src/examples/test_framework.cpp for an example of using the
345 // MesosSchedulerDriver.
347 {
348 public:
349  // Creates a new driver for the specified scheduler. The master
350  // should be one of:
351  //
352  // host:port
353  // zk://host1:port1,host2:port2,.../path
354  // zk://username:password@host1:port1,host2:port2,.../path
355  // file:///path/to/file (where file contains one of the above)
356  //
357  // The driver will attempt to "failover" if the specified
358  // FrameworkInfo includes a valid FrameworkID.
359  //
360  // Any Mesos configuration options are read from environment
361  // variables, as well as any configuration files found through the
362  // environment variables.
363  //
364  // TODO(vinod): Deprecate this once 'MesosSchedulerDriver' can take
365  // 'Option<Credential>' as parameter. Currently it cannot because
366  // 'stout' is not visible from here.
368  Scheduler* scheduler,
369  const FrameworkInfo& framework,
370  const std::string& master);
371 
372  // Same as the above constructor but takes 'credential' as argument.
373  // The credential will be used for authenticating with the master.
375  Scheduler* scheduler,
376  const FrameworkInfo& framework,
377  const std::string& master,
378  const Credential& credential);
379 
380  // These constructors are the same as the above two, but allow
381  // the framework to specify whether implicit or explicit
382  // acknowledgements are desired. See statusUpdate() for the
383  // details about explicit acknowledgements.
384  //
385  // TODO(bmahler): Deprecate the above two constructors. In 0.22.0
386  // these new constructors are exposed.
388  Scheduler* scheduler,
389  const FrameworkInfo& framework,
390  const std::string& master,
391  bool implicitAcknowledgements);
392 
394  Scheduler* scheduler,
395  const FrameworkInfo& framework,
396  const std::string& master,
397  bool implicitAcknowlegements,
398  const Credential& credential);
399 
400  // This destructor will block indefinitely if
401  // MesosSchedulerDriver::start was invoked successfully (possibly
402  // via MesosSchedulerDriver::run) and MesosSchedulerDriver::stop has
403  // not been invoked.
404  virtual ~MesosSchedulerDriver();
405 
406  // See SchedulerDriver for descriptions of these.
407  virtual Status start();
408  virtual Status stop(bool failover = false);
409  virtual Status abort();
410  virtual Status join();
411  virtual Status run();
412 
413  virtual Status requestResources(
414  const std::vector<Request>& requests);
415 
416  // TODO(nnielsen): launchTasks using single offer is deprecated.
417  // Use launchTasks with offer list instead.
418  virtual Status launchTasks(
419  const OfferID& offerId,
420  const std::vector<TaskInfo>& tasks,
421  const Filters& filters = Filters());
422 
423  virtual Status launchTasks(
424  const std::vector<OfferID>& offerIds,
425  const std::vector<TaskInfo>& tasks,
426  const Filters& filters = Filters());
427 
428  virtual Status killTask(const TaskID& taskId);
429 
430  virtual Status acceptOffers(
431  const std::vector<OfferID>& offerIds,
432  const std::vector<Offer::Operation>& operations,
433  const Filters& filters = Filters());
434 
435  virtual Status declineOffer(
436  const OfferID& offerId,
437  const Filters& filters = Filters());
438 
439  virtual Status reviveOffers();
440 
441  virtual Status suppressOffers();
442 
443  virtual Status acknowledgeStatusUpdate(
444  const TaskStatus& status);
445 
446  virtual Status sendFrameworkMessage(
447  const ExecutorID& executorId,
448  const SlaveID& slaveId,
449  const std::string& data);
450 
451  virtual Status reconcileTasks(
452  const std::vector<TaskStatus>& statuses);
453 
454 protected:
455  // Used to detect (i.e., choose) the master.
456  std::shared_ptr<master::detector::MasterDetector> detector;
457 
458 private:
459  void initialize();
460 
461  Scheduler* scheduler;
462  FrameworkInfo framework;
463  std::string master;
464 
465  // Used for communicating with the master.
466  internal::SchedulerProcess* process;
467 
468  // URL for the master (e.g., zk://, file://, etc).
469  std::string url;
470 
471  // Mutex for enforcing serial execution of all non-callbacks.
472  std::recursive_mutex mutex;
473 
474  // Latch for waiting until driver terminates.
475  process::Latch* latch;
476 
477  // Current status of the driver.
478  Status status;
479 
480  const bool implicitAcknowlegements;
481 
482  const Credential* credential;
483 
484  // Scheduler process ID.
485  std::string schedulerId;
486 };
487 
488 } // namespace mesos {
489 
490 #endif // __MESOS_SCHEDULER_HPP__
Definition: master.hpp:27
virtual ~SchedulerDriver()
Definition: scheduler.hpp:190
bool initialize(const Option< std::string > &delegate=None(), const Option< std::string > &readwriteAuthenticationRealm=None(), const Option< std::string > &readonlyAuthenticationRealm=None())
Initialize the library.
Result< ProcessStatus > status(pid_t pid)
Definition: proc.hpp:166
Definition: scheduler.hpp:346
std::string join(const std::string &path1, const std::string &path2, const char _separator=os::PATH_SEPARATOR)
Definition: path.hpp:56
virtual ~Scheduler()
Definition: scheduler.hpp:73
Try< Nothing > start(const std::string &name)
Starts the slice with the given name (via &#39;systemctl start <name>&#39;).
Definition: scheduler.hpp:185
Result< std::vector< Filter< Classifier > > > filters(const std::string &_link, const Handle &parent)
Definition: internal.hpp:776
Definition: spec.hpp:30
Definition: scheduler.hpp:69
Future< R > run(R(*method)())
Definition: run.hpp:55
std::shared_ptr< master::detector::MasterDetector > detector
Definition: scheduler.hpp:456
Result< Process > process(pid_t pid)
Definition: freebsd.hpp:30
Definition: attributes.hpp:24
std::string error(const std::string &msg, uint32_t code)
Definition: executor.hpp:48
Definition: latch.hpp:24