Apache Mesos
future_tracker.hpp
Go to the documentation of this file.
1 // Licensed to the Apache Software Foundation (ASF) under one
2 // or more contributor license agreements. See the NOTICE file
3 // distributed with this work for additional information
4 // regarding copyright ownership. The ASF licenses this file
5 // to you under the Apache License, Version 2.0 (the
6 // "License"); you may not use this file except in compliance
7 // with the License. You may obtain a copy of the License at
8 //
9 // http://www.apache.org/licenses/LICENSE-2.0
10 //
11 // Unless required by applicable law or agreed to in writing, software
12 // distributed under the License is distributed on an "AS IS" BASIS,
13 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 // See the License for the specific language governing permissions and
15 // limitations under the License.
16 
17 #ifndef __FUTURE_TRACKER_HPP__
18 #define __FUTURE_TRACKER_HPP__
19 
20 #include <list>
21 #include <map>
22 #include <string>
23 #include <vector>
24 
25 #include <process/defer.hpp>
26 #include <process/dispatch.hpp>
27 #include <process/future.hpp>
28 #include <process/id.hpp>
29 #include <process/owned.hpp>
30 #include <process/process.hpp>
31 
32 namespace mesos {
33 namespace internal {
34 
36 {
37  std::string operation;
38  std::string component;
39  std::map<std::string, std::string> args;
40 
41  inline bool operator==(const FutureMetadata& that) const
42  {
43  return operation == that.operation &&
44  component == that.component &&
45  args == that.args;
46  }
47 };
48 
49 
51  : public process::Process<PendingFutureTrackerProcess>
52 {
53 public:
55  : ProcessBase(process::ID::generate("pending-future-tracker")) {}
56 
57  template <typename T>
58  void addFuture(const process::Future<T>& future, FutureMetadata&& metadata)
59  {
60  auto it = pending.emplace(pending.end(), std::move(metadata));
61 
62  future
65  .onAbandoned(process::defer(
67  }
68 
69  void eraseFuture(typename std::list<FutureMetadata>::iterator it)
70  {
71  pending.erase(it);
72  }
73 
75  {
76  return std::vector<FutureMetadata>(pending.begin(), pending.end());
77  }
78 
79 private:
80  std::list<FutureMetadata> pending;
81 };
82 
83 
85 {
86 public:
88  {
91  }
92 
94  {
95  terminate(process.get());
96  process::wait(process.get());
97  }
98 
115  template <typename T>
117  const process::Future<T>& future,
118  const std::string& operation,
119  const std::string& component,
120  const std::map<std::string, std::string>& args = {})
121  {
123  process.get(),
124  &PendingFutureTrackerProcess::addFuture<T>,
125  future,
127 
128  return future;
129  }
130 
137  {
138  return process::dispatch(
139  process.get(),
141  }
142 
143 private:
144  explicit PendingFutureTracker(
146  : process(_process)
147  {
148  spawn(process.get());
149  }
150 
152 };
153 
154 } // namespace internal {
155 } // namespace mesos {
156 
157 #endif // __FUTURE_TRACKER_HPP__
std::string generate(const std::string &prefix="")
Returns &#39;prefix(N)&#39; where N represents the number of instances where the same prefix (wrt...
std::string operation
Definition: future_tracker.hpp:37
process::Future< std::vector< FutureMetadata > > pendingFutures()
This method returns a list of pending futures represented as objects of FutureMetadata class...
Definition: future_tracker.hpp:136
bool operator==(const FutureMetadata &that) const
Definition: future_tracker.hpp:41
bool pending(int signal)
Definition: signals.hpp:50
Definition: check.hpp:33
Definition: future_tracker.hpp:35
PendingFutureTrackerProcess()
Definition: future_tracker.hpp:54
Definition: future_tracker.hpp:50
std::string component
Definition: future_tracker.hpp:38
static Try< PendingFutureTracker * > create()
Definition: future_tracker.hpp:87
~PendingFutureTracker()
Definition: future_tracker.hpp:93
process::Future< std::vector< FutureMetadata > > pendingFutures()
Definition: future_tracker.hpp:74
UPID spawn(ProcessBase *process, bool manage=false)
Spawn a new process.
void terminate(const UPID &pid, bool inject=true)
Sends a TerminateEvent to the given process.
void addFuture(const process::Future< T > &future, FutureMetadata &&metadata)
Definition: future_tracker.hpp:58
std::map< std::string, std::string > args
Definition: future_tracker.hpp:39
void dispatch(const PID< T > &pid, void(T::*method)())
Definition: dispatch.hpp:174
Definition: future_tracker.hpp:84
const Future< T > & onAny(AnyCallback &&callback) const
Definition: future.hpp:1442
void eraseFuture(typename std::list< FutureMetadata >::iterator it)
Definition: future_tracker.hpp:69
Definition: agent.hpp:25
process::Future< T > track(const process::Future< T > &future, const std::string &operation, const std::string &component, const std::map< std::string, std::string > &args={})
This method subscribes on state transitions of the future to keep track of pending operations/promise...
Definition: future_tracker.hpp:116
bool wait(const UPID &pid, const Duration &duration=Seconds(-1))
Wait for the process to exit for no more than the specified seconds.
Result< Process > process(pid_t pid)
Definition: freebsd.hpp:30
Definition: attributes.hpp:24
Definition: executor.hpp:48
Definition: owned.hpp:36
Definition: process.hpp:505
Deferred< void()> defer(const PID< T > &pid, void(T::*method)())
Definition: defer.hpp:35
Definition: future.hpp:58