Apache Mesos
sorter.hpp
Go to the documentation of this file.
1 // Licensed to the Apache Software Foundation (ASF) under one
2 // or more contributor license agreements. See the NOTICE file
3 // distributed with this work for additional information
4 // regarding copyright ownership. The ASF licenses this file
5 // to you under the Apache License, Version 2.0 (the
6 // "License"); you may not use this file except in compliance
7 // with the License. You may obtain a copy of the License at
8 //
9 // http://www.apache.org/licenses/LICENSE-2.0
10 //
11 // Unless required by applicable law or agreed to in writing, software
12 // distributed under the License is distributed on an "AS IS" BASIS,
13 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 // See the License for the specific language governing permissions and
15 // limitations under the License.
16 
17 #ifndef __MASTER_ALLOCATOR_MESOS_SORTER_DRF_SORTER_HPP__
18 #define __MASTER_ALLOCATOR_MESOS_SORTER_DRF_SORTER_HPP__
19 
20 #include <algorithm>
21 #include <set>
22 #include <string>
23 #include <vector>
24 
25 #include <mesos/mesos.hpp>
27 #include <mesos/resources.hpp>
28 #include <mesos/values.hpp>
29 
30 #include <stout/check.hpp>
31 #include <stout/hashmap.hpp>
32 #include <stout/option.hpp>
33 
35 
37 
38 
39 namespace mesos {
40 namespace internal {
41 namespace master {
42 namespace allocator {
43 
44 class DRFSorter : public Sorter
45 {
46 public:
47  DRFSorter();
48 
49  explicit DRFSorter(
50  const process::UPID& allocator,
51  const std::string& metricsPrefix);
52 
53  ~DRFSorter() override;
54 
55  void initialize(
56  const Option<std::set<std::string>>& fairnessExcludeResourceNames)
57  override;
58 
59  void add(const std::string& clientPath) override;
60 
61  void remove(const std::string& clientPath) override;
62 
63  void activate(const std::string& clientPath) override;
64 
65  void deactivate(const std::string& clientPath) override;
66 
67  void updateWeight(const std::string& path, double weight) override;
68 
69  void allocated(
70  const std::string& clientPath,
71  const SlaveID& slaveId,
72  const Resources& resources) override;
73 
74  void update(
75  const std::string& clientPath,
76  const SlaveID& slaveId,
77  const Resources& oldAllocation,
78  const Resources& newAllocation) override;
79 
80  void unallocated(
81  const std::string& clientPath,
82  const SlaveID& slaveId,
83  const Resources& resources) override;
84 
86  const std::string& clientPath) const override;
87 
89  const std::string& clientPath) const override;
91  const override;
92 
94  const std::string& clientPath,
95  const SlaveID& slaveId) const override;
96 
97  void addSlave(
98  const SlaveID& slaveId,
99  const ResourceQuantities& scalarQuantities) override;
100 
101  void removeSlave(const SlaveID& slaveId) override;
102 
103  std::vector<std::string> sort() override;
104 
105  bool contains(const std::string& clientPath) const override;
106 
107  size_t count() const override;
108 
109 private:
110  // A node in the sorter's tree.
111  struct Node;
112 
113  // Returns the dominant resource share for the node.
114  double calculateShare(const Node* node) const;
115 
116  // Returns the weight associated with the node. If no weight has
117  // been configured for the node's path, the default weight (1.0) is
118  // returned.
119  double getWeight(const Node* node) const;
120 
121  // Returns the client associated with the given path. Returns
122  // nullptr if the path is not found or if the path identifies an
123  // internal node in the tree (not a client).
124  Node* find(const std::string& clientPath) const;
125 
126  // Resources (by name) that will be excluded from fair sharing.
127  Option<std::set<std::string>> fairnessExcludeResourceNames;
128 
129  // If true, sort() will recalculate all shares and resort the tree.
130  bool dirty = false;
131 
132  // The root node in the sorter tree.
133  Node* root;
134 
135  // To speed lookups, we keep a map from client paths to the leaf
136  // node associated with that client. There is an entry in this map
137  // for every leaf node in the client tree (except for the root when
138  // the tree is empty). Paths in this map do NOT contain the trailing
139  // "." label we use for leaf nodes.
141 
142  // Weights associated with role paths. Setting the weight for a path
143  // influences the share of all nodes in the subtree rooted at that
144  // path. This hashmap might include weights for paths that are not
145  // currently in the sorter tree.
147 
148  // Total resources.
149  struct Total
150  {
151  // We keep the aggregated scalar resource quantities to speed
152  // up share calculation. Note, resources shared count are ignored.
153  // Because sharedness inherently refers to the identities of resources
154  // and not quantities.
155  ResourceQuantities totals;
156 
157  // We keep track of per-agent resource quantities to handle agent removal.
158  //
159  // Note that the only way to change the stored resource quantities
160  // is to remove the agent from the sorter and add it with new resources.
161  // Thus, when a resource shared count on an agent changes, multiple copies
162  // of the same shared resource are still accounted for exactly once.
163  hashmap<SlaveID, const ResourceQuantities> agentResourceQuantities;
164  } total_;
165 
166  // Metrics are optionally exposed by the sorter.
167  friend Metrics;
168  Option<Metrics> metrics;
169 };
170 
171 
172 // Represents a node in the sorter's tree. The structure of the tree
173 // reflects the hierarchical relationships between the clients of the
174 // sorter. Some (but not all) nodes correspond to sorter clients; some
175 // nodes only exist to represent the structure of the sorter
176 // tree. Clients are always associated with leaf nodes.
177 //
178 // For example, if there are two sorter clients "a/b" and "c/d", the
179 // tree will contain five nodes: the root node, internal nodes for "a"
180 // and "c", and leaf nodes for the clients "a/b" and "c/d".
182 {
183  // Indicates whether a node is an active leaf node, an inactive leaf
184  // node, or an internal node. Sorter clients always correspond to
185  // leaf nodes, and only leaf nodes can be activated or deactivated.
186  // The root node is always an "internal" node.
187  enum Kind
188  {
191  INTERNAL
192  };
193 
194  Node(const std::string& _name, Kind _kind, Node* _parent)
195  : name(_name), share(0), kind(_kind), parent(_parent)
196  {
197  // Compute the node's path. Three cases:
198  //
199  // (1) If the root node, use the empty string
200  // (2) If a child of the root node, use the child's name
201  // (3) Otherwise, use the parent's name, "/", and the child's name.
202  if (parent == nullptr) {
203  path = "";
204  } else if (parent->parent == nullptr) {
205  path = name;
206  } else {
207  path = strings::join("/", parent->path, name);
208  }
209  }
210 
212  {
213  foreach (Node* child, children) {
214  delete child;
215  }
216  }
217 
218  // The label of the edge from this node's parent to the
219  // node. "Implicit" leaf nodes are always named ".".
220  //
221  // TODO(neilc): Consider naming implicit leaf nodes in a clearer
222  // way, e.g., by making `name` an Option?
223  std::string name;
224 
225  // Complete path from root to node. This includes the trailing "."
226  // label for virtual leaf nodes.
227  std::string path;
228 
229  // NOTE: Not computed for root node.
230  double share;
231 
232  // Cached weight of the node, access this through `getWeight()`.
233  // The value is cached by `getWeight()` and updated by
234  // `updateWeight()`. Marked mutable since the caching writes
235  // to this are logically const.
237 
239 
241 
242  // Pointers to the child nodes. `children` is only non-empty if
243  // `kind` is INTERNAL_NODE. Two ordering invariants are maintained
244  // on the `children` vector:
245  //
246  // (1) All inactive leaves are stored at the end of the vector; that
247  // is, each `children` vector consists of zero or more active leaves
248  // and internal nodes, followed by zero or more inactive leaves. This
249  // means that code that only wants to iterate over active children
250  // can stop when the first inactive leaf is observed.
251  //
252  // (2) If the tree is not dirty, the active leaves and internal
253  // nodes are kept sorted by DRF share.
254  std::vector<Node*> children;
255 
256  // If this node represents a sorter client, this returns the path of
257  // that client. Unlike the `path` field, this does NOT include the
258  // trailing "." label for virtual leaf nodes.
259  //
260  // For example, if the sorter contains two clients "a" and "a/b",
261  // the tree will contain four nodes: the root node, "a", "a/."
262  // (virtual leaf), and "a/b". The `clientPath()` of "a/." is "a",
263  // because that is the name of the client associated with that
264  // virtual leaf node.
265  const std::string& clientPath() const
266  {
267  if (name == ".") {
268  CHECK(kind == ACTIVE_LEAF || kind == INACTIVE_LEAF);
269  return CHECK_NOTNULL(parent)->path;
270  }
271 
272  return path;
273  }
274 
275  bool isLeaf() const
276  {
277  if (kind == ACTIVE_LEAF || kind == INACTIVE_LEAF) {
278  CHECK(children.empty());
279  return true;
280  }
281 
282  return false;
283  }
284 
285  void removeChild(const Node* child)
286  {
287  // Sanity check: ensure we are removing an extant node.
288  auto it = std::find(children.begin(), children.end(), child);
289  CHECK(it != children.end());
290 
291  children.erase(it);
292  }
293 
294  void addChild(Node* child)
295  {
296  // Sanity check: don't allow duplicates to be inserted.
297  auto it = std::find(children.begin(), children.end(), child);
298  CHECK(it == children.end());
299 
300  // If we're inserting an inactive leaf, place it at the end of the
301  // `children` vector; otherwise, place it at the beginning. This
302  // maintains ordering invariant (1) above. It is up to the caller
303  // to maintain invariant (2) -- e.g., by marking the tree dirty.
304  if (child->kind == INACTIVE_LEAF) {
305  children.push_back(child);
306  } else {
307  children.insert(children.begin(), child);
308  }
309  }
310 
311  // Allocation for a node.
312  struct Allocation
313  {
314  Allocation() : count(0) {}
315 
316  void add(const SlaveID& slaveId, const Resources& toAdd)
317  {
318  // Add shared resources to the allocated quantities when the same
319  // resources don't already exist in the allocation.
320  const Resources sharedToAdd = toAdd.shared()
321  .filter([this, slaveId](const Resource& resource) {
322  return !resources[slaveId].contains(resource);
323  });
324 
325  const ResourceQuantities quantitiesToAdd =
327  (toAdd.nonShared() + sharedToAdd).scalars());
328 
329  resources[slaveId] += toAdd;
330  totals += quantitiesToAdd;
331 
332  count++;
333  }
334 
335  void subtract(const SlaveID& slaveId, const Resources& toRemove)
336  {
337  CHECK(resources.contains(slaveId))
338  << "Resources " << resources << " does not contain " << slaveId;
339  CHECK(resources.at(slaveId).contains(toRemove))
340  << "Resources " << resources.at(slaveId) << " at agent " << slaveId
341  << " does not contain " << toRemove;
342 
343  resources[slaveId] -= toRemove;
344 
345  // Remove shared resources from the allocated quantities when there
346  // are no instances of same resources left in the allocation.
347  const Resources sharedToRemove = toRemove.shared()
348  .filter([this, slaveId](const Resource& resource) {
349  return !resources[slaveId].contains(resource);
350  });
351 
352  const ResourceQuantities quantitiesToRemove =
354  (toRemove.nonShared() + sharedToRemove).scalars());
355 
356  CHECK(totals.contains(quantitiesToRemove))
357  << totals << " does not contain " << quantitiesToRemove;
358 
359  totals -= quantitiesToRemove;
360 
361  if (resources.at(slaveId).empty()) {
362  resources.erase(slaveId);
363  }
364  }
365 
366  void update(
367  const SlaveID& slaveId,
368  const Resources& oldAllocation,
369  const Resources& newAllocation)
370  {
371  const ResourceQuantities oldAllocationQuantities =
373  const ResourceQuantities newAllocationQuantities =
375 
376  CHECK(resources.contains(slaveId))
377  << "Resources " << resources << " does not contain " << slaveId;
378  CHECK(resources[slaveId].contains(oldAllocation))
379  << "Resources " << resources[slaveId] << " at agent " << slaveId
380  << " does not contain " << oldAllocation;
381 
382  CHECK(totals.contains(oldAllocationQuantities))
383  << totals << " does not contain " << oldAllocationQuantities;
384 
385  resources[slaveId] -= oldAllocation;
386  resources[slaveId] += newAllocation;
387 
388  // It is possible that allocations can be updated to empty.
389  // See MESOS-9015 and MESOS-9975.
390  if (resources.at(slaveId).empty()) {
391  resources.erase(slaveId);
392  }
393 
394  totals -= oldAllocationQuantities;
395  totals += newAllocationQuantities;
396  }
397 
398  // We store the number of times this client has been chosen for
399  // allocation so that we can fairly share the resources across
400  // clients that have the same share. Note that this information is
401  // not persisted across master failovers, but since the point is
402  // to equalize the `count` across clients of the same `share`
403  // having allocations restart at 0 after a master failover should
404  // be sufficient (famous last words.)
405  uint64_t count;
406 
407  // We maintain multiple copies of each shared resource allocated
408  // to a client, where the number of copies represents the number
409  // of times this shared resource has been allocated to (and has
410  // not been recovered from) a specific client.
412 
413  // We keep the aggregated scalar resource quantities to speed
414  // up share calculation. Note, resources shared count are ignored.
415  // Because sharedness inherently refers to the identities of resources
416  // and not quantities.
418  } allocation;
419 
420  // Compares two nodes according to DRF share.
421  static bool compareDRF(const Node* left, const Node* right)
422  {
423  if (left->share != right->share) {
424  return left->share < right->share;
425  }
426 
427  if (left->allocation.count != right->allocation.count) {
428  return left->allocation.count < right->allocation.count;
429  }
430 
431  return left->path < right->path;
432  }
433 };
434 
435 } // namespace allocator {
436 } // namespace master {
437 } // namespace internal {
438 } // namespace mesos {
439 
440 #endif // __MASTER_ALLOCATOR_MESOS_SORTER_DRF_SORTER_HPP__
Definition: path.hpp:29
bool contains(const std::string &clientPath) const override
Node(const std::string &_name, Kind _kind, Node *_parent)
Definition: sorter.hpp:194
Definition: option.hpp:29
void deactivate(const std::string &clientPath) override
bool isLeaf() const
Definition: sorter.hpp:275
Definition: master.hpp:27
std::stringstream & join(std::stringstream &stream, const std::string &separator, T &&...args)
Definition: strings.hpp:307
std::vector< Node * > children
Definition: sorter.hpp:254
Definition: resource_quantities.hpp:63
void add(const SlaveID &slaveId, const Resources &toAdd)
Definition: sorter.hpp:316
std::string path
Definition: sorter.hpp:227
Resources filter(const lambda::function< bool(const Resource &)> &predicate) const
Definition: resources.hpp:83
void unallocated(const std::string &clientPath, const SlaveID &slaveId, const Resources &resources) override
void update(const std::string &clientPath, const SlaveID &slaveId, const Resources &oldAllocation, const Resources &newAllocation) override
Option< double > weight
Definition: sorter.hpp:236
std::string name
Definition: sorter.hpp:223
Definition: hashmap.hpp:38
const char * kind()
hashmap< SlaveID, Resources > resources
Definition: sorter.hpp:411
struct mesos::internal::master::allocator::DRFSorter::Node::Allocation allocation
An "untyped" PID, used to encapsulate the process ID for lower-layer abstractions (eg...
Definition: pid.hpp:39
void subtract(const SlaveID &slaveId, const Resources &toRemove)
Definition: sorter.hpp:335
static ResourceQuantities fromScalarResources(const Resources &resources)
void removeChild(const Node *child)
Definition: sorter.hpp:285
void updateWeight(const std::string &path, double weight) override
void initialize(const Option< std::set< std::string >> &fairnessExcludeResourceNames) override
Definition: agent.hpp:25
const ResourceQuantities & allocationScalarQuantities() const override
std::vector< std::string > sort() override
static bool compareDRF(const Node *left, const Node *right)
Definition: sorter.hpp:421
ResourceQuantities totals
Definition: sorter.hpp:417
void allocated(const std::string &clientPath, const SlaveID &slaveId, const Resources &resources) override
Resources scalars() const
void removeSlave(const SlaveID &slaveId) override
const hashmap< SlaveID, Resources > & allocation(const std::string &clientPath) const override
Resources shared() const
Definition: attributes.hpp:24
Resources nonShared() const
std::set< pid_t > children(pid_t, const std::list< Process > &, bool)
Definition: os.hpp:217
void activate(const std::string &clientPath) override
const std::string & clientPath() const
Definition: sorter.hpp:265
void addChild(Node *child)
Definition: sorter.hpp:294
void update(const SlaveID &slaveId, const Resources &oldAllocation, const Resources &newAllocation)
Definition: sorter.hpp:366
void addSlave(const SlaveID &slaveId, const ResourceQuantities &scalarQuantities) override
constexpr const char * name
Definition: shell.hpp:41
void add(const std::string &clientPath) override
Try< std::list< std::string > > find(const std::string &directory, const std::string &pattern)
Definition: find.hpp:37