Apache Mesos
timeseries.hpp
Go to the documentation of this file.
1 // Licensed under the Apache License, Version 2.0 (the "License");
2 // you may not use this file except in compliance with the License.
3 // You may obtain a copy of the License at
4 //
5 // http://www.apache.org/licenses/LICENSE-2.0
6 //
7 // Unless required by applicable law or agreed to in writing, software
8 // distributed under the License is distributed on an "AS IS" BASIS,
9 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
10 // See the License for the specific language governing permissions and
11 // limitations under the License
12 
13 #ifndef __PROCESS_TIMESERIES_HPP__
14 #define __PROCESS_TIMESERIES_HPP__
15 
16 #include <algorithm> // For max.
17 #include <map>
18 #include <vector>
19 
20 #include <process/clock.hpp>
21 #include <process/time.hpp>
22 
23 #include <stout/duration.hpp>
24 #include <stout/none.hpp>
25 #include <stout/option.hpp>
26 
27 namespace process {
28 
29 // Default statistic configuration variables.
30 // TODO(bmahler): It appears there may be a bug with gcc-4.1.2 in
31 // which these duration constants were not being initialized when
32 // having static linkage. This issue did not manifest in newer gcc's.
33 // Specifically, 4.2.1 was ok. So we've moved these to have external
34 // linkage but perhaps in the future we can revert this.
35 extern const Duration TIME_SERIES_WINDOW;
36 extern const size_t TIME_SERIES_CAPACITY;
37 
38 
39 // Provides an in-memory time series of statistics over some window.
40 // When the time series capacity is exceeded within the window, the
41 // granularity of older values is coarsened. This means, for
42 // high-frequency statistics that exceed the capacity, we keep a lot
43 // of recent data points (fine granularity), and keep fewer older
44 // data points (coarse granularity). The tunable bit here is the
45 // total number of data points to keep around, which informs how
46 // often to delete older data points, while still keeping a window
47 // worth of data.
48 // TODO(bmahler): Investigate using Google's btree implementation.
49 // This provides better insertion and lookup performance for large
50 // containers. This _should_ also provide significant memory
51 // savings. These are true because we have the following properties:
52 // 1. Our insertion order will mostly be in sorted order.
53 // 2. Our keys (Seconds) have efficient comparison operators.
54 // See: http://code.google.com/p/cpp-btree/
55 // http://code.google.com/p/cpp-btree/wiki/UsageInstructions
56 template <typename T>
57 struct TimeSeries
58 {
60  size_t _capacity = TIME_SERIES_CAPACITY)
61  : window(_window),
62  // The truncation technique requires at least 3 elements.
63  capacity(std::max((size_t) 3, _capacity)) {}
64 
65  struct Value
66  {
67  Value(const Time& _time, const T& _data) : time(_time), data(_data) {}
68 
69  // Non-const for assignability.
71  T data;
72  };
73 
74  void set(const T& value, const Time& time = Clock::now())
75  {
76  // If we're not inserting at the end of the time series, then
77  // we have to reset the sparsification index. Given that
78  // out-of-order insertion is a rare use-case. This is a simple way
79  // to keep insertions O(log(n)). No need to figure out how to
80  // adjust the truncation index.
81  if (!values.empty() && time < values.rbegin()->first) {
82  index = None();
83  }
84 
85  values[time] = value;
86  truncate();
87  sparsify();
88  }
89 
90  // Returns the time series within the (optional) time range.
91  std::vector<Value> get(
92  const Option<Time>& start = None(),
93  const Option<Time>& stop = None()) const
94  {
95  // Ignore invalid ranges.
96  if (start.isSome() && stop.isSome() && start.get() > stop.get()) {
97  return std::vector<Value>();
98  }
99 
100  typename std::map<Time, T>::const_iterator lower = values.lower_bound(
101  start.isSome() ? start.get() : Time::epoch());
102 
103  typename std::map<Time, T>::const_iterator upper = values.upper_bound(
104  stop.isSome() ? stop.get() : Time::max());
105 
106  std::vector<Value> values;
107  while (lower != upper) {
108  values.push_back(Value(lower->first, lower->second));
109  ++lower;
110  }
111  return values;
112  }
113 
115  {
116  if (empty()) {
117  return None();
118  }
119 
120  return Value(values.rbegin()->first, values.rbegin()->second);
121  }
122 
123  bool empty() const { return values.empty(); }
124 
125  // Removes values outside the time window. This will ensure at
126  // least one value remains. Note that this is called automatically
127  // when writing to the time series, so this is only needed when
128  // one wants to explicitly trigger a truncation.
129  void truncate()
130  {
131  Time expired = Clock::now() - window;
132  typename std::map<Time, T>::iterator upper_bound =
133  values.upper_bound(expired);
134 
135  // Ensure at least 1 value remains.
136  if (values.size() <= 1 || upper_bound == values.end()) {
137  return;
138  }
139 
140  // When truncating and there exists a next value considered
141  // for sparsification, there are two cases to consider for
142  // updating the index:
143  //
144  // Case 1: upper_bound < next
145  // ----------------------------------------------------------
146  // upper_bound index, next
147  // v v
148  // Before: 0 1 2 3 4 5 6 7 ...
149  // ----------------------------------------------------------
150  // next index After truncating, index is
151  // v v must be adjusted:
152  // Truncate: 3 4 5 6 7 ... index -= # elements removed
153  // ----------------------------------------------------------
154  // index, next
155  // v
156  // After: 3 4 5 6 7 ...
157  // ----------------------------------------------------------
158  //
159  // Case 2: upper_bound >= next
160  // ----------------------------------------------------------
161  // upper_bound, index, next
162  // v
163  // Before: 0 1 2 3 4 5 6 7 ...
164  // ----------------------------------------------------------
165  // After truncating, we must
166  // After: 4 5 6 7 ... reset index to None().
167  // ----------------------------------------------------------
168  if (index.isSome() && upper_bound->first < next->first) {
169  size_t size = values.size();
170  values.erase(values.begin(), upper_bound);
171  index = index.get() - (size - values.size());
172  } else {
173  index = None();
174  values.erase(values.begin(), upper_bound);
175  }
176  }
177 
178 private:
179  // Performs "sparsification" to limit the size of the time series
180  // to be within the capacity.
181  //
182  // The sparsifying technique is to iteratively halve the granularity
183  // of the older half of the time series. Once sparsification reaches
184  // the midpoint of the time series, it begins again from the
185  // beginning.
186  //
187  // Sparsification results in the following granularity over time:
188  // Initial: | ------------------------ A -------------------- |
189  // Stage 1: | ------- 1/2 A ---------- | -------- B --------- |
190  // Stage 2: | -- 1/4A --- | -- 1/2B -- | -------- C --------- |
191  // Stage 3: | 1/8A | 1/4B | -- 1/2C -- | -------- D --------- |
192  // ...
193  //
194  // Each stage halves the size and granularity of time series prior
195  // to sparsifying.
196  void sparsify()
197  {
198  // We remove every other element up to the halfway point of the
199  // time series, until we're within the capacity. If we reach the
200  // half-way point of the time series, we'll start another
201  // sparsification cycle from the beginning, for example:
202  //
203  // next Time series with a capacity of 7.
204  // v Initial state with 7 entries
205  // 0 1 2 3 4 5 6
206  //
207  // next Insert '7'.
208  // v Capacity is exceeded, we remove '1' and
209  // 0 2 3 4 5 6 7 advance to remove '3' next.
210  //
211  // next Insert '8'.
212  // v Capacity is exceeded, we remove '3' and
213  // 0 2 4 5 6 7 8 advance to remove '5' next.
214  //
215  // next Insert '9'.
216  // v Capacity is exceeded, we remove '5' and now
217  // 0 2 4 6 7 8 9 '7' is past the halfway mark, so we will reset
218  // reset to the beginning and consider '2'.
219 
220  while (values.size() > capacity) {
221  // If the index is uninitialized, or past the half-way point,
222  // we set it back to the beginning.
223  if (index.isNone() || index.get() > values.size() / 2) {
224  // The second element is the initial deletion candidate.
225  next = values.begin();
226  ++next;
227  index = 1;
228  }
229 
230  next = values.erase(next);
231  next++; // Skip one element.
232  index = index.get() + 1;
233  }
234  }
235 
236  // Non-const for assignability.
237  Duration window;
238  size_t capacity;
239 
240  // We use a map instead of a hashmap to store the values because
241  // that way we can retrieve a series in sorted order efficiently.
242  std::map<Time, T> values;
243 
244  // Next deletion candidate. We store both the iterator and index.
245  // The index is None initially, and whenever a value is appended
246  // out-of-order. This means 'next' is only valid when 'index' is
247  // Some.
248  typename std::map<Time, T>::iterator next;
249  Option<size_t> index;
250 };
251 
252 } // namespace process {
253 
254 #endif // __PROCESS_TIMESERIES_HPP__
Definition: option.hpp:28
Try< Bytes > size(const std::string &path, const FollowSymlink follow=FollowSymlink::FOLLOW_SYMLINK)
Definition: stat.hpp:100
TimeSeries(const Duration &_window=TIME_SERIES_WINDOW, size_t _capacity=TIME_SERIES_CAPACITY)
Definition: timeseries.hpp:59
void expired(const std::shared_ptr< lambda::CallableOnce< Future< T >(const Future< T > &)>> &f, const std::shared_ptr< Latch > &latch, const std::shared_ptr< Promise< T >> &promise, const std::shared_ptr< Option< Timer >> &timer, const Future< T > &future)
Definition: future.hpp:1537
void set(const T &value, const Time &time=Clock::now())
Definition: timeseries.hpp:74
Value(const Time &_time, const T &_data)
Definition: timeseries.hpp:67
Definition: duration.hpp:32
const Duration TIME_SERIES_WINDOW
Try< Nothing > start(const std::string &name)
Starts the slice with the given name (via &#39;systemctl start &lt;name&gt;&#39;).
bool isSome() const
Definition: option.hpp:115
void truncate()
Definition: timeseries.hpp:129
Option< T > max(const Option< T > &left, const Option< T > &right)
Definition: option.hpp:199
bool empty() const
Definition: timeseries.hpp:123
const T & get() const &
Definition: option.hpp:118
Option< Value > latest() const
Definition: timeseries.hpp:114
Time time
Definition: timeseries.hpp:70
Definition: time.hpp:23
std::string upper(const std::string &s)
Definition: strings.hpp:420
Result< Process > process(pid_t pid)
Definition: freebsd.hpp:30
Definition: none.hpp:27
static Time max()
Definition: time.hpp:88
Definition: timeseries.hpp:65
T data
Definition: timeseries.hpp:71
static Time now()
The current clock time for either the current process that makes this call or the global clock time i...
bool isNone() const
Definition: option.hpp:116
std::string lower(const std::string &s)
Definition: strings.hpp:412
Try< std::vector< Value > > time(const std::string &hierarchy, const std::string &cgroup)
static Time epoch()
Definition: time.hpp:87
Definition: timeseries.hpp:57
const size_t TIME_SERIES_CAPACITY