Apache Mesos
log.hpp
Go to the documentation of this file.
1 // Licensed to the Apache Software Foundation (ASF) under one
2 // or more contributor license agreements. See the NOTICE file
3 // distributed with this work for additional information
4 // regarding copyright ownership. The ASF licenses this file
5 // to you under the Apache License, Version 2.0 (the
6 // "License"); you may not use this file except in compliance
7 // with the License. You may obtain a copy of the License at
8 //
9 // http://www.apache.org/licenses/LICENSE-2.0
10 //
11 // Unless required by applicable law or agreed to in writing, software
12 // distributed under the License is distributed on an "AS IS" BASIS,
13 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 // See the License for the specific language governing permissions and
15 // limitations under the License.
16 
17 #ifndef __LOG_LOG_HPP__
18 #define __LOG_LOG_HPP__
19 
20 #include <stdint.h>
21 
22 #include <mesos/log/log.hpp>
23 
24 #include <process/future.hpp>
25 #include <process/owned.hpp>
26 #include <process/process.hpp>
27 #include <process/shared.hpp>
28 
30 
31 #include <stout/nothing.hpp>
32 
33 #include "log/coordinator.hpp"
34 #include "log/metrics.hpp"
35 #include "log/network.hpp"
36 #include "log/recover.hpp"
37 #include "log/replica.hpp"
38 
39 namespace mesos {
40 namespace internal {
41 namespace log {
42 
43 class LogProcess : public process::Process<LogProcess>
44 {
45 public:
46  LogProcess(
47  size_t _quorum,
48  const std::string& path,
49  const std::set<process::UPID>& pids,
50  bool _autoInitialize,
51  const Option<std::string>& metricsPrefix);
52 
53  LogProcess(
54  size_t _quorum,
55  const std::string& path,
56  const std::string& servers,
57  const Duration& timeout,
58  const std::string& znode,
60  bool _autoInitialize,
61  const Option<std::string>& metricsPrefix);
62 
63  // Recovers the log by catching up if needed. Returns a shared
64  // pointer to the local replica if the recovery succeeds.
66 
67 protected:
68  virtual void initialize();
69  virtual void finalize();
70 
71 private:
72  friend class LogReaderProcess;
73  friend class LogWriterProcess;
74 
75  // Continuations.
76  void _recover();
77 
78  // Return true if the log has finished recovery.
79  double _recovered();
80 
81  // TODO(benh): Factor this out into "membership renewer".
82  void watch(
83  const process::UPID& pid,
84  const std::set<zookeeper::Group::Membership>& memberships);
85 
86  void failed(const std::string& message);
87  void discarded();
88 
89  const size_t quorum;
92  const bool autoInitialize;
93 
94  // For replica recovery.
96  process::Promise<Nothing> recovered;
97  std::list<process::Promise<process::Shared<Replica>>*> promises;
98 
99  // For renewing membership. We store a Group instance in order to
100  // continually renew the replicas membership (when using ZooKeeper).
103 
104  friend Metrics;
106 
107  // The size of the network. We use "ensemble" because it as a metric
108  // name more intuitively means the "replica set".
109  process::Future<double> _ensemble_size()
110  {
111  // Watching for any value different than 0 should give us the
112  // current value.
113  return network->watch(0u)
114  .then([](size_t size) -> double { return size; });
115  }
116 };
117 
118 
119 class LogReaderProcess : public process::Process<LogReaderProcess>
120 {
121 public:
122  explicit LogReaderProcess(mesos::log::Log* log);
123 
126 
128  const mesos::log::Log::Position& from,
129  const mesos::log::Log::Position& to);
130 
132 
133 protected:
134  virtual void initialize();
135  virtual void finalize();
136 
137 private:
138  // Returns a position from a raw value.
139  static mesos::log::Log::Position position(uint64_t value);
140 
141  // Returns a future which gets set when the log recovery has
142  // finished (either succeeded or failed).
144 
145  // Continuations.
146  void _recover();
147 
150 
152  const mesos::log::Log::Position& from,
153  const mesos::log::Log::Position& to);
154 
156  const mesos::log::Log::Position& from,
157  const mesos::log::Log::Position& to,
158  const std::list<Action>& actions);
159 
161 
162  const size_t quorum;
163  const process::Shared<Network> network;
164 
166  std::list<process::Promise<Nothing>*> promises;
167 };
168 
169 
170 class LogWriterProcess : public process::Process<LogWriterProcess>
171 {
172 public:
173  explicit LogWriterProcess(mesos::log::Log* log);
174 
177  const std::string& bytes);
179  const mesos::log::Log::Position& to);
180 
181 protected:
182  virtual void initialize();
183  virtual void finalize();
184 
185 private:
186  // Helper for converting an optional position returned from the
187  // coordinator into a Log::Position.
188  static Option<mesos::log::Log::Position> position(
189  const Option<uint64_t>& position);
190 
191  // Returns a future which gets set when the log recovery has
192  // finished (either succeeded or failed).
194 
195  // Continuations.
196  void _recover();
197 
199  Option<mesos::log::Log::Position> __start(const Option<uint64_t>& position);
200 
201  void failed(const std::string& message, const std::string& reason);
202 
203  const size_t quorum;
204  const process::Shared<Network> network;
205 
207  std::list<process::Promise<Nothing>*> promises;
208 
209  Coordinator* coordinator;
211 };
212 
213 } // namespace log {
214 } // namespace internal {
215 } // namespace mesos {
216 
217 #endif // __LOG_LOG_HPP__
Definition: path.hpp:26
Try< Bytes > size(const std::string &path, const FollowSymlink follow=FollowSymlink::FOLLOW_SYMLINK)
Definition: stat.hpp:100
friend class LogReaderProcess
Definition: log.hpp:72
Definition: duration.hpp:32
Try< Nothing > start(const std::string &name)
Starts the slice with the given name (via &#39;systemctl start <name>&#39;).
process::Future< size_t > watch(size_t size, WatchMode mode=NOT_EQUAL_TO) const
Definition: network.hpp:367
process::Future< process::Shared< Replica > > recover()
hashmap< std::string, MessageHandler > message
Definition: process.hpp:451
Definition: owned.hpp:26
Definition: log.hpp:52
An "untyped" PID, used to encapsulate the process ID for lower-layer abstractions (eg...
Definition: pid.hpp:39
virtual void finalize()
Invoked when a process is terminated.
LogProcess(size_t _quorum, const std::string &path, const std::set< process::UPID > &pids, bool _autoInitialize, const Option< std::string > &metricsPrefix)
Definition: spec.hpp:30
Future< X > then(lambda::CallableOnce< Future< X >(const T &)> f) const
Definition: future.hpp:1581
Definition: log.hpp:43
Definition: coordinator.hpp:40
Definition: grp.hpp:26
Definition: attributes.hpp:24
Result< Credentials > read(const Path &path)
Definition: credentials.hpp:35
Definition: group.hpp:49
std::string error(const std::string &msg, uint32_t code)
Definition: log.hpp:59
void discarded(Future< U > future)
process::Future< Nothing > catchup(size_t quorum, const process::Shared< Replica > &replica, const process::Shared< Network > &network, const Option< uint64_t > &proposal, const IntervalSet< uint64_t > &positions, const Duration &timeout=Seconds(10))
JSON::Object Metrics()
Try< Nothing > append(const std::string &path, const google::protobuf::Message &message)
Definition: protobuf.hpp:135
friend class LogWriterProcess
Definition: log.hpp:73
Definition: process.hpp:501
virtual void initialize()
Invoked when a process gets spawned.
Definition: metrics.hpp:31
Try< std::set< pid_t > > pids()
Definition: freebsd.hpp:62
PID< MetricsProcess > metrics
Definition: future.hpp:57