Apache Mesos
timeseries.hpp
Go to the documentation of this file.
1 // Licensed under the Apache License, Version 2.0 (the "License");
2 // you may not use this file except in compliance with the License.
3 // You may obtain a copy of the License at
4 //
5 // http://www.apache.org/licenses/LICENSE-2.0
6 //
7 // Unless required by applicable law or agreed to in writing, software
8 // distributed under the License is distributed on an "AS IS" BASIS,
9 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
10 // See the License for the specific language governing permissions and
11 // limitations under the License
12 
13 #ifndef __PROCESS_TIMESERIES_HPP__
14 #define __PROCESS_TIMESERIES_HPP__
15 
16 #include <algorithm> // For max.
17 #include <map>
18 #include <vector>
19 
20 #include <process/clock.hpp>
21 #include <process/time.hpp>
22 
23 #include <stout/duration.hpp>
24 #include <stout/none.hpp>
25 #include <stout/option.hpp>
26 
27 namespace process {
28 
29 // Default statistic configuration variables.
31 constexpr size_t TIME_SERIES_CAPACITY = 1000;
32 
33 
34 // Provides an in-memory time series of statistics over some window.
35 // When the time series capacity is exceeded within the window, the
36 // granularity of older values is coarsened. This means, for
37 // high-frequency statistics that exceed the capacity, we keep a lot
38 // of recent data points (fine granularity), and keep fewer older
39 // data points (coarse granularity). The tunable bit here is the
40 // total number of data points to keep around, which informs how
41 // often to delete older data points, while still keeping a window
42 // worth of data.
43 // TODO(bmahler): Investigate using Google's btree implementation.
44 // This provides better insertion and lookup performance for large
45 // containers. This _should_ also provide significant memory
46 // savings. These are true because we have the following properties:
47 // 1. Our insertion order will mostly be in sorted order.
48 // 2. Our keys (Seconds) have efficient comparison operators.
49 // See: http://code.google.com/p/cpp-btree/
50 // http://code.google.com/p/cpp-btree/wiki/UsageInstructions
51 template <typename T>
52 struct TimeSeries
53 {
54  TimeSeries(const Duration& _window = TIME_SERIES_WINDOW,
55  size_t _capacity = TIME_SERIES_CAPACITY)
56  : window(_window),
57  // The truncation technique requires at least 3 elements.
58  capacity(std::max((size_t) 3, _capacity)) {}
59 
60  struct Value
61  {
62  Value(const Time& _time, const T& _data) : time(_time), data(_data) {}
63 
64  // Non-const for assignability.
66  T data;
67  };
68 
69  void set(const T& value, const Time& time = Clock::now())
70  {
71  // If we're not inserting at the end of the time series, then
72  // we have to reset the sparsification index. Given that
73  // out-of-order insertion is a rare use-case. This is a simple way
74  // to keep insertions O(log(n)). No need to figure out how to
75  // adjust the truncation index.
76  if (!values.empty() && time < values.rbegin()->first) {
77  index = None();
78  }
79 
80  values[time] = value;
81  truncate();
82  sparsify();
83  }
84 
85  // Returns the time series within the (optional) time range.
86  std::vector<Value> get(
87  const Option<Time>& start = None(),
88  const Option<Time>& stop = None()) const
89  {
90  // Ignore invalid ranges.
91  if (start.isSome() && stop.isSome() && start.get() > stop.get()) {
92  return std::vector<Value>();
93  }
94 
95  typename std::map<Time, T>::const_iterator lower = values.lower_bound(
96  start.isSome() ? start.get() : Time::epoch());
97 
98  typename std::map<Time, T>::const_iterator upper = values.upper_bound(
99  stop.isSome() ? stop.get() : Time::max());
100 
101  std::vector<Value> values;
102  while (lower != upper) {
103  values.push_back(Value(lower->first, lower->second));
104  ++lower;
105  }
106  return values;
107  }
108 
110  {
111  if (empty()) {
112  return None();
113  }
114 
115  return Value(values.rbegin()->first, values.rbegin()->second);
116  }
117 
118  bool empty() const { return values.empty(); }
119 
120  // Removes values outside the time window. This will ensure at
121  // least one value remains. Note that this is called automatically
122  // when writing to the time series, so this is only needed when
123  // one wants to explicitly trigger a truncation.
124  void truncate()
125  {
126  Time expired = Clock::now() - window;
127  typename std::map<Time, T>::iterator upper_bound =
128  values.upper_bound(expired);
129 
130  // Ensure at least 1 value remains.
131  if (values.size() <= 1 || upper_bound == values.end()) {
132  return;
133  }
134 
135  // When truncating and there exists a next value considered
136  // for sparsification, there are two cases to consider for
137  // updating the index:
138  //
139  // Case 1: upper_bound < next
140  // ----------------------------------------------------------
141  // upper_bound index, next
142  // v v
143  // Before: 0 1 2 3 4 5 6 7 ...
144  // ----------------------------------------------------------
145  // next index After truncating, index is
146  // v v must be adjusted:
147  // Truncate: 3 4 5 6 7 ... index -= # elements removed
148  // ----------------------------------------------------------
149  // index, next
150  // v
151  // After: 3 4 5 6 7 ...
152  // ----------------------------------------------------------
153  //
154  // Case 2: upper_bound >= next
155  // ----------------------------------------------------------
156  // upper_bound, index, next
157  // v
158  // Before: 0 1 2 3 4 5 6 7 ...
159  // ----------------------------------------------------------
160  // After truncating, we must
161  // After: 4 5 6 7 ... reset index to None().
162  // ----------------------------------------------------------
163  if (index.isSome() && upper_bound->first < next->first) {
164  size_t size = values.size();
165  values.erase(values.begin(), upper_bound);
166  index = index.get() - (size - values.size());
167  } else {
168  index = None();
169  values.erase(values.begin(), upper_bound);
170  }
171  }
172 
173 private:
174  // Performs "sparsification" to limit the size of the time series
175  // to be within the capacity.
176  //
177  // The sparsifying technique is to iteratively halve the granularity
178  // of the older half of the time series. Once sparsification reaches
179  // the midpoint of the time series, it begins again from the
180  // beginning.
181  //
182  // Sparsification results in the following granularity over time:
183  // Initial: | ------------------------ A -------------------- |
184  // Stage 1: | ------- 1/2 A ---------- | -------- B --------- |
185  // Stage 2: | -- 1/4A --- | -- 1/2B -- | -------- C --------- |
186  // Stage 3: | 1/8A | 1/4B | -- 1/2C -- | -------- D --------- |
187  // ...
188  //
189  // Each stage halves the size and granularity of time series prior
190  // to sparsifying.
191  void sparsify()
192  {
193  // We remove every other element up to the halfway point of the
194  // time series, until we're within the capacity. If we reach the
195  // half-way point of the time series, we'll start another
196  // sparsification cycle from the beginning, for example:
197  //
198  // next Time series with a capacity of 7.
199  // v Initial state with 7 entries
200  // 0 1 2 3 4 5 6
201  //
202  // next Insert '7'.
203  // v Capacity is exceeded, we remove '1' and
204  // 0 2 3 4 5 6 7 advance to remove '3' next.
205  //
206  // next Insert '8'.
207  // v Capacity is exceeded, we remove '3' and
208  // 0 2 4 5 6 7 8 advance to remove '5' next.
209  //
210  // next Insert '9'.
211  // v Capacity is exceeded, we remove '5' and now
212  // 0 2 4 6 7 8 9 '7' is past the halfway mark, so we will reset
213  // reset to the beginning and consider '2'.
214 
215  while (values.size() > capacity) {
216  // If the index is uninitialized, or past the half-way point,
217  // we set it back to the beginning.
218  if (index.isNone() || index.get() > values.size() / 2) {
219  // The second element is the initial deletion candidate.
220  next = values.begin();
221  ++next;
222  index = 1;
223  }
224 
225  next = values.erase(next);
226  next++; // Skip one element.
227  index = index.get() + 1;
228  }
229  }
230 
231  // Non-const for assignability.
232  Duration window;
233  size_t capacity;
234 
235  // We use a map instead of a hashmap to store the values because
236  // that way we can retrieve a series in sorted order efficiently.
237  std::map<Time, T> values;
238 
239  // Next deletion candidate. We store both the iterator and index.
240  // The index is None initially, and whenever a value is appended
241  // out-of-order. This means 'next' is only valid when 'index' is
242  // Some.
243  typename std::map<Time, T>::iterator next;
244  Option<size_t> index;
245 };
246 
247 } // namespace process {
248 
249 #endif // __PROCESS_TIMESERIES_HPP__
Definition: option.hpp:29
Try< Bytes > size(const std::string &path, const FollowSymlink follow=FollowSymlink::FOLLOW_SYMLINK)
Definition: stat.hpp:130
TimeSeries(const Duration &_window=TIME_SERIES_WINDOW, size_t _capacity=TIME_SERIES_CAPACITY)
Definition: timeseries.hpp:54
Definition: type_utils.hpp:619
void expired(const std::shared_ptr< lambda::CallableOnce< Future< T >(const Future< T > &)>> &f, const std::shared_ptr< Latch > &latch, const std::shared_ptr< Promise< T >> &promise, const std::shared_ptr< Option< Timer >> &timer, const Future< T > &future)
Definition: future.hpp:1521
Value(const Time &_time, const T &_data)
Definition: timeseries.hpp:62
Definition: duration.hpp:32
Try< Nothing > start(const std::string &name)
Starts the slice with the given name (via &#39;systemctl start <name>&#39;).
bool isSome() const
Definition: option.hpp:116
constexpr Duration TIME_SERIES_WINDOW
Definition: timeseries.hpp:30
void truncate()
Definition: timeseries.hpp:124
Option< T > max(const Option< T > &left, const Option< T > &right)
Definition: option.hpp:214
bool empty() const
Definition: timeseries.hpp:118
const T & get() const &
Definition: option.hpp:119
Option< Value > latest() const
Definition: timeseries.hpp:109
Time time
Definition: timeseries.hpp:65
Definition: time.hpp:23
std::string upper(const std::string &s)
Definition: strings.hpp:437
Definition: none.hpp:27
Definition: executor.hpp:48
static Time max()
Definition: time.hpp:88
Definition: timeseries.hpp:60
T data
Definition: timeseries.hpp:66
static Time now()
The current clock time for either the current process that makes this call or the global clock time i...
constexpr size_t TIME_SERIES_CAPACITY
Definition: timeseries.hpp:31
std::string lower(const std::string &s)
Definition: strings.hpp:429
Definition: duration.hpp:263
static Time epoch()
Definition: time.hpp:87
Definition: timeseries.hpp:52