Apache Mesos
uri_disk_profile_adaptor.hpp
Go to the documentation of this file.
1 // Licensed to the Apache Software Foundation (ASF) under one
2 // or more contributor license agreements. See the NOTICE file
3 // distributed with this work for additional information
4 // regarding copyright ownership. The ASF licenses this file
5 // to you under the Apache License, Version 2.0 (the
6 // "License"); you may not use this file except in compliance
7 // with the License. You may obtain a copy of the License at
8 //
9 // http://www.apache.org/licenses/LICENSE-2.0
10 //
11 // Unless required by applicable law or agreed to in writing, software
12 // distributed under the License is distributed on an "AS IS" BASIS,
13 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 // See the License for the specific language governing permissions and
15 // limitations under the License.
16 
17 #ifndef __RESOURCE_PROVIDER_URI_DISK_PROFILE_ADAPTOR_HPP__
18 #define __RESOURCE_PROVIDER_URI_DISK_PROFILE_ADAPTOR_HPP__
19 
20 #include <list>
21 #include <string>
22 #include <tuple>
23 
25 
26 #include <process/future.hpp>
27 #include <process/owned.hpp>
28 #include <process/process.hpp>
29 
30 #include <process/ssl/flags.hpp>
31 
32 #include <stout/duration.hpp>
33 #include <stout/error.hpp>
34 #include <stout/flags.hpp>
35 #include <stout/hashmap.hpp>
36 #include <stout/option.hpp>
37 #include <stout/path.hpp>
38 #include <stout/strings.hpp>
39 
41 
42 namespace mesos {
43 namespace internal {
44 namespace storage {
45 
46 // Forward declaration.
47 class UriDiskProfileAdaptorProcess;
48 
49 // The `UriDiskProfileAdaptor` is an example DiskProfileAdaptor module
50 // that takes a URI as a module parameter and fetches that URI
51 // periodically. The fetched data is parsed into the required CSI
52 // protobufs (which also acts as validation).
53 //
54 // If there is an error during fetching, any previously fetched results
55 // will be used until fetching is successful.
56 //
57 // This module does not filter return results based on
58 // `CSIPluginInfo::type` and assumes that all fetched profiles are meant
59 // for all resource providers.
60 //
61 // See `UriDiskProfileAdaptor::Flags` below for more information.
63 {
64 public:
65  struct Flags : public virtual flags::FlagsBase
66  {
68  {
69  add(&Flags::uri,
70  "uri",
71  None(),
72  "URI to a JSON object containing the disk profile mapping.\n"
73  "This module supports both HTTP(s) and file URIs\n."
74  "\n"
75  "The JSON object should consist of some top-level string keys\n"
76  "corresponding to the disk profile name. Each value should contain\n"
77  "a `ResourceProviderSelector` under 'resource_provider_selector' or\n"
78  "a `CSIPluginTypeSelector` under 'csi_plugin_type_selector' to\n"
79  "specify the set of resource providers this profile applies to,\n"
80  "followed by a `VolumeCapability` under 'volume_capabilities'\n"
81  "and a free-form string-string mapping under 'create_parameters'.\n"
82  "\n"
83  "The JSON is modeled after a protobuf found in\n"
84  "`src/resource_provider/storage/disk_profile.proto`.\n"
85  "\n"
86  "For example:\n"
87  "{\n"
88  " \"profile_matrix\" : {\n"
89  " \"my-profile\" : {\n"
90  " \"csi_plugin_type_selector\": {\n"
91  " \"plugin_type\" : \"org.apache.mesos.csi.test\"\n"
92  " \"},\n"
93  " \"volume_capabilities\" : {\n"
94  " \"block\" : {},\n"
95  " \"access_mode\" : { \"mode\" : \"SINGLE_NODE_WRITER\" }\n"
96  " },\n"
97  " \"create_parameters\" : {\n"
98  " \"mesos-does-not\" : \"interpret-these\",\n"
99  " \"type\" : \"raid5\",\n"
100  " \"stripes\" : \"3\",\n"
101  " \"stripesize\" : \"64\"\n"
102  " }\n"
103  " }\n"
104  " }\n"
105  "}",
106  static_cast<const Path*>(nullptr),
107  [](const Path& value) -> Option<Error> {
108  // For now, just check if the URI has a supported scheme.
109  //
110  // TODO(josephw): Once we have a proper URI class and parser,
111  // consider validating this URI more thoroughly.
112  if (strings::startsWith(value.string(), "http://")
113 #ifdef USE_SSL_SOCKET
114  || (process::network::openssl::flags().enabled &&
115  strings::startsWith(value.string(), "https://"))
116 #endif // USE_SSL_SOCKET
117  ) {
120 
121  if (url.isError()) {
122  return Error("Failed to parse URI: " + url.error());
123  }
124 
125  return None();
126  }
127 
128  // NOTE: The `Path` class will strip off the 'file://' prefix.
129  if (strings::contains(value.string(), "://")) {
130  return Error(
131  "--uri must use a supported scheme (file or http(s))");
132  }
133 
134  // We only allow absolute paths for file paths.
135  if (!value.is_absolute()) {
136  return Error("--uri to a file must be an absolute path");
137  }
138 
139  return None();
140  });
141 
143  "poll_interval",
144  "How long to wait between polling the specified `--uri`.\n"
145  "The time is checked each time the `translate` method is called.\n"
146  "If the given time has elapsed, then the URI is re-fetched.\n"
147  "If not specified, the URI is only fetched once.",
148  [](const Option<Duration>& value) -> Option<Error> {
149  if (value.isSome() && value.get() <= Seconds(0)) {
150  return Error("--poll_interval must be non-negative");
151  }
152 
153  return None();
154  });
155 
157  "max_random_wait",
158  "How long at most to wait between discovering a new set of profiles\n"
159  "and notifying the callers of `watch`. The actual wait time is a\n"
160  "uniform random value between 0 and this value. If `--uri` points\n"
161  "to a centralized location, it may be good to scale this number\n"
162  "according to the number of resource providers in the cluster.",
163  Seconds(0),
164  [](const Duration& value) -> Option<Error> {
165  if (value < Seconds(0)) {
166  return Error("--max_random_wait must be zero or greater");
167  }
168 
169  return None();
170  });
171  }
172 
173  // NOTE: We use the `Path` type here so that the stout flags parser
174  // does not attempt to read a file if given a `file://` prefixed value.
175  //
176  // TODO(josephw): Replace with a URI type when stout gets one.
178 
181  };
182 
183 
184  UriDiskProfileAdaptor(const Flags& _flags);
185 
186  ~UriDiskProfileAdaptor() override;
187 
189  const std::string& profile,
190  const ResourceProviderInfo& resourceProviderInfo) override;
191 
193  const hashset<std::string>& knownProfiles,
194  const ResourceProviderInfo& resourceProviderInfo) override;
195 
196 protected:
199 };
200 
201 
203  public process::Process<UriDiskProfileAdaptorProcess>
204 {
205 public:
207 
208  void initialize() override;
209 
211  const std::string& profile,
212  const ResourceProviderInfo& resourceProviderInfo);
213 
215  const hashset<std::string>& knownProfiles,
216  const ResourceProviderInfo& resourceProviderInfo);
217 
218  // Helpers for fetching the `--uri`.
219  // If `--poll_interval` is set, this method will dispatch to itself with
220  // a delay once the fetch is complete.
221  // Made public for testing purpose.
222  void poll();
223  void _poll(const process::Future<process::http::Response>& future);
224  void __poll(const Try<std::string>& fetched);
225 
226 private:
227  // Helper that is called upon successfully polling and parsing the `--uri`.
228  // This method will validate that the capability and parameters of a known
229  // profile must remain the same. Then, any watcher will be notified if its set
230  // of profiles has been changed.
231  void notify(const resource_provider::DiskProfileMapping& parsed);
232 
234 
235  struct ProfileRecord
236  {
237  resource_provider::DiskProfileMapping::CSIManifest manifest;
238 
239  // True if the profile is seen in the last fetched profile mapping.
240  bool active;
241  };
242 
243  // The mapping of all profiles seen so far.
244  // Profiles can only be added and never removed from this mapping. Once added,
245  // a profile's volume capability and parameters cannot be changed.
246  //
247  // TODO(josephw): Consider persisting this mapping across agent restarts.
249 
250  struct WatcherData
251  {
252  WatcherData(
253  const hashset<std::string>& _known, const ResourceProviderInfo& _info)
254  : known(_known), info(_info) {}
255 
256  hashset<std::string> known;
257  ResourceProviderInfo info;
259  };
260 
261  std::vector<WatcherData> watchers;
262 };
263 
264 } // namespace storage {
265 } // namespace internal {
266 } // namespace mesos {
267 
268 #endif // __RESOURCE_PROVIDER_URI_DISK_PROFILE_ADAPTOR_HPP__
Definition: uri_disk_profile_adaptor.hpp:202
Definition: errorbase.hpp:36
Option< Duration > poll_interval
Definition: uri_disk_profile_adaptor.hpp:179
static Try< URL > parse(const std::string &urlString)
Definition: check.hpp:33
Definition: uri_disk_profile_adaptor.hpp:65
Future< short > poll(int_fd fd, short events)
Returns the events (a subset of the events specified) that can be performed on the specified file des...
URI manifest(const std::string &repository, const std::string &reference, const std::string &registry, const Option< std::string > &scheme=None(), const Option< int > &port=None())
Definition: docker.hpp:47
std::queue< ev_io * > * watchers
Definition: duration.hpp:32
bool isSome() const
Definition: option.hpp:116
bool contains(const std::string &s, const std::string &substr)
Definition: strings.hpp:423
Try< Nothing > initialize(const Flags &flags)
Initialized state for support of systemd functions in this file.
Represents a POSIX or Windows file system path and offers common path manipulations.
Definition: path.hpp:212
Definition: duration.hpp:207
Definition: flags.hpp:44
Definition: agent.hpp:25
Definition: future.hpp:74
const T & get() const &
Definition: option.hpp:119
Protocol< PromiseRequest, PromiseResponse > promise
This module is used by Storage Resource Providers to translate the "profile" field of a Resource::Dis...
Definition: disk_profile_adaptor.hpp:52
static Try error(const E &e)
Definition: try.hpp:43
#define flags
Definition: decoder.hpp:18
Flags()
Definition: uri_disk_profile_adaptor.hpp:67
Definition: none.hpp:27
Definition: attributes.hpp:24
bool isError() const
Definition: try.hpp:78
Path uri
Definition: uri_disk_profile_adaptor.hpp:177
process::Future< DiskProfileAdaptor::ProfileInfo > translate(const std::string &profile, const ResourceProviderInfo &resourceProviderInfo) override
Returns the CSI volume capability and the parameters to create CSI volumes associated with the profil...
Definition: uri_disk_profile_adaptor.hpp:62
void add(T1 Flags::*t1, const Name &name, const Option< Name > &alias, const std::string &help, const T2 *t2, F validate)
Definition: flags.hpp:333
process::Future< hashset< std::string > > watch(const hashset< std::string > &knownProfiles, const ResourceProviderInfo &resourceProviderInfo) override
Returns a future that will be satisifed iff the set of profiles known by the module differs from the ...
process::Owned< UriDiskProfileAdaptorProcess > process
Definition: uri_disk_profile_adaptor.hpp:198
Flags flags
Definition: uri_disk_profile_adaptor.hpp:197
Duration max_random_wait
Definition: uri_disk_profile_adaptor.hpp:180
Definition: owned.hpp:36
bool startsWith(const std::string &s, const std::string &prefix)
Definition: strings.hpp:381
Definition: process.hpp:505
const std::string & string() const
Definition: path.hpp:387
bool is_absolute() const
Definition: path.hpp:376
Definition: future.hpp:58