Apache Mesos
state.hpp
Go to the documentation of this file.
1 // Licensed to the Apache Software Foundation (ASF) under one
2 // or more contributor license agreements. See the NOTICE file
3 // distributed with this work for additional information
4 // regarding copyright ownership. The ASF licenses this file
5 // to you under the Apache License, Version 2.0 (the
6 // "License"); you may not use this file except in compliance
7 // with the License. You may obtain a copy of the License at
8 //
9 // http://www.apache.org/licenses/LICENSE-2.0
10 //
11 // Unless required by applicable law or agreed to in writing, software
12 // distributed under the License is distributed on an "AS IS" BASIS,
13 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 // See the License for the specific language governing permissions and
15 // limitations under the License.
16 
17 #ifndef __MESOS_STATE_STATE_HPP__
18 #define __MESOS_STATE_STATE_HPP__
19 
20 #include <set>
21 #include <string>
22 
23 #include <mesos/state/storage.hpp>
24 
25 #include <process/deferred.hpp> // TODO(benh): This is required by Clang.
26 #include <process/future.hpp>
27 
28 #include <stout/lambda.hpp>
29 #include <stout/none.hpp>
30 #include <stout/option.hpp>
31 #include <stout/some.hpp>
32 #include <stout/try.hpp>
33 #include <stout/uuid.hpp>
34 
35 namespace mesos {
36 namespace state {
37 
38 // An abstraction of "state" (possibly between multiple distributed
39 // components) represented by "variables" (effectively key/value
40 // pairs). Variables are versioned such that setting a variable in the
41 // state will only succeed if the variable has not changed since last
42 // fetched. Varying implementations of state provide varying
43 // replicated guarantees.
44 //
45 // Note that the semantics of 'fetch' and 'store' provide
46 // atomicity. That is, you cannot store a variable that has changed
47 // since you did the last fetch. That is, if a store succeeds then no
48 // other writes have been performed on the variable since your fetch.
49 //
50 // Example:
51 //
52 // Storage* storage = new ZooKeeperStorage();
53 // State* state = new State(storage);
54 // Future<Variable> variable = state->fetch("slaves");
55 // std::string value = update(variable.value());
56 // variable = variable.mutate(value);
57 // state->store(variable);
58 
59 // Forward declarations.
60 class State;
61 
62 
63 // Wrapper around a state "entry" to force immutability.
64 class Variable
65 {
66 public:
67  std::string value() const
68  {
69  return entry.value();
70  }
71 
72  Variable mutate(const std::string& value) const
73  {
74  Variable variable(*this);
75  variable.entry.set_value(value);
76  return variable;
77  }
78 
79 private:
80  friend class State; // Creates and manages variables.
81 
82  explicit Variable(const internal::state::Entry& _entry)
83  : entry(_entry)
84  {}
85 
86  internal::state::Entry entry; // Not const to keep Variable assignable.
87 };
88 
89 
90 class State
91 {
92 public:
93  explicit State(Storage* _storage) : storage(_storage) {}
94  virtual ~State() {}
95 
96  // Returns a variable from the state, creating a new one if one
97  // previously did not exist (or an error if one occurs).
98  process::Future<Variable> fetch(const std::string& name);
99 
100  // Returns the variable specified if it was successfully stored in
101  // the state, otherwise returns none if the version of the variable
102  // was no longer valid, or an error if one occurs.
104 
105  // Returns true if successfully expunged the variable from the state.
106  process::Future<bool> expunge(const Variable& variable);
107 
108  // Returns the collection of variable names in the state.
110 
111 private:
112  // Helpers to handle future results from fetch and swap. We make
113  // these static members of State for friend access to Variable's
114  // constructor.
115  static process::Future<Variable> _fetch(
116  const std::string& name,
117  const Option<internal::state::Entry>& option);
118 
119  static process::Future<Option<Variable>> _store(
120  const internal::state::Entry& entry,
121  const bool& b); // TODO(benh): Remove 'const &' after fixing libprocess.
122 
123  Storage* storage;
124 };
125 
126 
127 inline process::Future<Variable> State::fetch(const std::string& name)
128 {
129  return storage->get(name)
130  .then(lambda::bind(&State::_fetch, name, lambda::_1));
131 }
132 
133 
134 inline process::Future<Variable> State::_fetch(
135  const std::string& name,
136  const Option<internal::state::Entry>& option)
137 {
138  if (option.isSome()) {
139  return Variable(option.get());
140  }
141 
142  // Otherwise, construct a Variable with a new Entry (with a random
143  // UUID and no value to start).
144  internal::state::Entry entry;
145  entry.set_name(name);
146  entry.set_uuid(id::UUID::random().toBytes());
147 
148  return Variable(entry);
149 }
150 
151 
153 {
154  // Note that we try and swap an entry even if the value didn't change!
155  id::UUID uuid = id::UUID::fromBytes(variable.entry.uuid()).get();
156 
157  // Create a new entry to replace the existing entry provided the
158  // UUID matches.
159  internal::state::Entry entry;
160  entry.set_name(variable.entry.name());
161  entry.set_uuid(id::UUID::random().toBytes());
162  entry.set_value(variable.entry.value());
163 
164  return storage->set(entry, uuid)
165  .then(lambda::bind(&State::_store, entry, lambda::_1));
166 }
167 
168 
169 inline process::Future<Option<Variable>> State::_store(
170  const internal::state::Entry& entry,
171  const bool& b) // TODO(benh): Remove 'const &' after fixing libprocess.
172 {
173  if (b) {
174  return Some(Variable(entry));
175  }
176 
177  return None();
178 }
179 
180 
182 {
183  return storage->expunge(variable.entry);
184 }
185 
186 
188 {
189  return storage->names();
190 }
191 
192 } // namespace state {
193 } // namespace mesos {
194 
195 #endif // __MESOS_STATE_STATE_HPP__
State(Storage *_storage)
Definition: state.hpp:93
process::Future< bool > expunge(const Variable &variable)
Definition: state.hpp:181
process::Future< Option< Variable > > store(const Variable &variable)
Definition: state.hpp:152
virtual process::Future< Option< internal::state::Entry > > get(const std::string &name)=0
Definition: option.hpp:28
Definition: state.hpp:90
std::string value() const
Definition: state.hpp:67
process::Future< std::set< std::string > > names()
Definition: state.hpp:187
virtual ~State()
Definition: state.hpp:94
virtual process::Future< bool > expunge(const internal::state::Entry &entry)=0
bool isSome() const
Definition: option.hpp:115
static UUID random()
Definition: uuid.hpp:38
Definition: state.hpp:64
Variable mutate(const std::string &value) const
Definition: state.hpp:72
Definition: uuid.hpp:35
const T & get() const &
Definition: option.hpp:118
Definition: storage.hpp:33
Future< X > then(lambda::CallableOnce< Future< X >(const T &)> f) const
Definition: future.hpp:1592
process::Future< Variable > fetch(const std::string &name)
Definition: state.hpp:127
_Some< typename std::decay< T >::type > Some(T &&t)
Definition: some.hpp:42
Definition: none.hpp:27
static Try< UUID > fromBytes(const std::string &s)
Definition: uuid.hpp:49
virtual process::Future< bool > set(const internal::state::Entry &entry, const id::UUID &uuid)=0
virtual process::Future< std::set< std::string > > names()=0
Try< Nothing > bind(int_fd s, const Address &address)
Definition: network.hpp:46
constexpr const char * name
Definition: shell.hpp:41
Definition: future.hpp:57