Apache Mesos
v1_volume_manager_process.hpp
Go to the documentation of this file.
1 // Licensed to the Apache Software Foundation (ASF) under one
2 // or more contributor license agreements. See the NOTICE file
3 // distributed with this work for additional information
4 // regarding copyright ownership. The ASF licenses this file
5 // to you under the Apache License, Version 2.0 (the
6 // "License"); you may not use this file except in compliance
7 // with the License. You may obtain a copy of the License at
8 //
9 // http://www.apache.org/licenses/LICENSE-2.0
10 //
11 // Unless required by applicable law or agreed to in writing, software
12 // distributed under the License is distributed on an "AS IS" BASIS,
13 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 // See the License for the specific language governing permissions and
15 // limitations under the License.
16 
17 #ifndef __CSI_V1_VOLUME_MANAGER_PROCESS_HPP__
18 #define __CSI_V1_VOLUME_MANAGER_PROCESS_HPP__
19 
20 #include <string>
21 #include <vector>
22 
23 #include <google/protobuf/map.h>
24 
25 #include <mesos/mesos.hpp>
26 
27 #include <mesos/csi/types.hpp>
28 
29 #include <process/future.hpp>
30 #include <process/grpc.hpp>
31 #include <process/http.hpp>
32 #include <process/loop.hpp>
33 #include <process/owned.hpp>
34 #include <process/process.hpp>
35 #include <process/sequence.hpp>
36 
37 #include <stout/bytes.hpp>
38 #include <stout/duration.hpp>
39 #include <stout/error.hpp>
40 #include <stout/hashmap.hpp>
41 #include <stout/hashset.hpp>
42 #include <stout/nothing.hpp>
43 #include <stout/option.hpp>
44 #include <stout/try.hpp>
45 
46 #include "csi/metrics.hpp"
47 #include "csi/service_manager.hpp"
48 #include "csi/state.hpp"
49 #include "csi/v1_client.hpp"
50 #include "csi/v1_utils.hpp"
52 #include "csi/volume_manager.hpp"
53 
54 namespace mesos {
55 namespace csi {
56 namespace v1 {
57 
58 class VolumeManagerProcess : public process::Process<VolumeManagerProcess>
59 {
60 public:
61  explicit VolumeManagerProcess(
62  const std::string& _rootDir,
63  const CSIPluginInfo& _info,
64  const hashset<Service> _services,
65  const process::grpc::client::Runtime& _runtime,
66  ServiceManager* _serviceManager,
67  Metrics* _metrics);
68 
70 
72 
74  const types::VolumeCapability& capability,
75  const google::protobuf::Map<std::string, std::string>& parameters);
76 
78  const std::string& name,
79  const Bytes& capacity,
80  const types::VolumeCapability& capability,
81  const google::protobuf::Map<std::string, std::string>& parameters);
82 
84  const VolumeInfo& volumeInfo,
85  const types::VolumeCapability& capability,
86  const google::protobuf::Map<std::string, std::string>& parameters);
87 
88  process::Future<bool> deleteVolume(const std::string& volumeId);
89 
90  process::Future<Nothing> attachVolume(const std::string& volumeId);
91 
92  process::Future<Nothing> detachVolume(const std::string& volumeId);
93 
94  process::Future<Nothing> publishVolume(const std::string& volumeId);
95 
96  process::Future<Nothing> unpublishVolume(const std::string& volumeId);
97 
98  // Wrapper functions to make CSI calls and update RPC metrics. Made public for
99  // testing purpose.
100  //
101  // The call is made asynchronously and thus no guarantee is provided on the
102  // order in which calls are sent. Callers need to either ensure to not have
103  // multiple conflicting calls in flight, or treat results idempotently.
104  //
105  // NOTE: We currently ensure this by 1) resource locking to forbid concurrent
106  // calls on the same volume, and 2) no profile update while there are ongoing
107  // `CREATE_DISK` or `DESTROY_DISK` operations.
108  template <typename Request, typename Response>
110  const Service& service,
111  process::Future<RPCResult<Response>> (Client::*rpc)(Request),
112  const Request& request,
113  bool retry = false);
114 
115  template <typename Request, typename Response>
117  const std::string& endpoint,
118  process::Future<RPCResult<Response>> (Client::*rpc)(Request),
119  const Request& request);
120 
121  template <typename Response>
123  const RPCResult<Response>& result, const Option<Duration>& backoff);
124 
125 private:
126  process::Future<Nothing> prepareServices();
127 
128  process::Future<bool> _deleteVolume(const std::string& volumeId);
129  process::Future<bool> __deleteVolume(const std::string& volumeId);
130 
131  // The following methods are used to manage volume lifecycles. Transient
132  // states are omitted.
133  //
134  // +------------+
135  // + + + | CREATED | ^
136  // | | | +---+----^---+ |
137  // _attachVolume | | | | | | _detachVolume
138  // | | | +---v----+---+ |
139  // v + + | NODE_READY | + ^
140  // | | +---+----^---+ | |
141  // __publishVolume | | | | | | _unpublishVolume
142  // | | +---v----+---+ | |
143  // v + | VOL_READY | + + ^
144  // | +---+----^---+ | | |
145  // _publishVolume | | | | | | __unpublishVolume
146  // | +---v----+---+ | | |
147  // V | PUBLISHED | + + +
148  // +------------+
149 
150  // Transition a volume to `NODE_READY` state from any state above.
151  process::Future<Nothing> _attachVolume(const std::string& volumeId);
152 
153  // Transition a volume to `CREATED` state from any state below.
154  process::Future<Nothing> _detachVolume(const std::string& volumeId);
155 
156  // Transition a volume to `PUBLISHED` state from any state above.
157  process::Future<Nothing> _publishVolume(const std::string& volumeId);
158 
159  // Transition a volume to `VOL_READY` state from any state above.
160  process::Future<Nothing> __publishVolume(const std::string& volumeId);
161 
162  // Transition a volume to `NODE_READY` state from any state below.
163  process::Future<Nothing> _unpublishVolume(const std::string& volumeId);
164 
165  // Transition a volume to `VOL_READY` state from any state below.
166  process::Future<Nothing> __unpublishVolume(const std::string& volumeId);
167 
168  void checkpointVolumeState(const std::string& volumeId);
169 
170  void garbageCollectMountPath(const std::string& volumeId);
171 
172  const std::string rootDir;
173  const CSIPluginInfo info;
174  const hashset<Service> services;
175 
177  ServiceManager* serviceManager;
178  Metrics* metrics;
179 
180  Option<std::string> bootId;
181  Option<PluginCapabilities> pluginCapabilities;
182  Option<ControllerCapabilities> controllerCapabilities;
183  Option<NodeCapabilities> nodeCapabilities;
184  Option<std::string> nodeId;
185 
186  struct VolumeData
187  {
188  VolumeData(state::VolumeState&& _state)
189  : state(_state), sequence(new process::Sequence("csi-volume-sequence")) {}
190 
191  state::VolumeState state;
192 
193  // We call all CSI operations on the same volume in a sequence to ensure
194  // that they are processed in a sequential order.
196  };
197 
199 };
200 
201 } // namespace v1 {
202 } // namespace csi {
203 } // namespace mesos {
204 
205 #endif // __CSI_V1_VOLUME_MANAGER_PROCESS_HPP__
process::Future< Nothing > detachVolume(const std::string &volumeId)
Future< Response > request(const Request &request, bool streamedResponse=false)
Asynchronously sends an HTTP request to the process and returns the HTTP response once the entire res...
process::Future< Option< Error > > validateVolume(const VolumeInfo &volumeInfo, const types::VolumeCapability &capability, const google::protobuf::Map< std::string, std::string > &parameters)
VolumeManagerProcess(const std::string &_rootDir, const CSIPluginInfo &_info, const hashset< Service > _services, const process::grpc::client::Runtime &_runtime, ServiceManager *_serviceManager, Metrics *_metrics)
Definition: check.hpp:33
process::Future< Nothing > unpublishVolume(const std::string &volumeId)
process::Future< RPCResult< Response > > _call(const std::string &endpoint, process::Future< RPCResult< Response >>(Client::*rpc)(Request), const Request &request)
Definition: volume_manager.hpp:47
process::Future< process::ControlFlow< Response > > __call(const RPCResult< Response > &result, const Option< Duration > &backoff)
Definition: metrics.hpp:28
Definition: v0.hpp:49
Definition: sequence.hpp:33
Definition: v1_client.hpp:35
process::Future< Nothing > recover()
A copyable interface to manage an internal runtime process for asynchronous gRPC calls.
Definition: grpc.hpp:157
Definition: agent.hpp:25
process::Future< bool > deleteVolume(const std::string &volumeId)
process::Future< Response > call(const Service &service, process::Future< RPCResult< Response >>(Client::*rpc)(Request), const Request &request, bool retry=false)
process::Future< Bytes > getCapacity(const types::VolumeCapability &capability, const google::protobuf::Map< std::string, std::string > &parameters)
process::Future< Nothing > attachVolume(const std::string &volumeId)
CSIPluginContainerInfo::Service Service
Definition: service_manager.hpp:38
Definition: v1_volume_manager_process.hpp:58
process::Future< VolumeInfo > createVolume(const std::string &name, const Bytes &capacity, const types::VolumeCapability &capability, const google::protobuf::Map< std::string, std::string > &parameters)
Definition: bytes.hpp:30
Definition: process.hpp:505
Definition: service_manager.hpp:51
process::Future< Nothing > publishVolume(const std::string &volumeId)
constexpr const char * name
Definition: shell.hpp:43
process::Future< std::vector< VolumeInfo > > listVolumes()