Apache Mesos
fetcher_process.hpp
Go to the documentation of this file.
1 // Licensed to the Apache Software Foundation (ASF) under one
2 // or more contributor license agreements. See the NOTICE file
3 // distributed with this work for additional information
4 // regarding copyright ownership. The ASF licenses this file
5 // to you under the Apache License, Version 2.0 (the
6 // "License"); you may not use this file except in compliance
7 // with the License. You may obtain a copy of the License at
8 //
9 // http://www.apache.org/licenses/LICENSE-2.0
10 //
11 // Unless required by applicable law or agreed to in writing, software
12 // distributed under the License is distributed on an "AS IS" BASIS,
13 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 // See the License for the specific language governing permissions and
15 // limitations under the License.
16 
17 #ifndef __SLAVE_CONTAINERIZER_FETCHER_PROCESS_HPP__
18 #define __SLAVE_CONTAINERIZER_FETCHER_PROCESS_HPP__
19 
20 #include <list>
21 #include <memory>
22 #include <string>
23 
24 #include <mesos/mesos.hpp>
25 #include <mesos/type_utils.hpp>
26 
27 #include <process/future.hpp>
28 #include <process/process.hpp>
29 
32 
33 #include <stout/hashmap.hpp>
34 
35 #include "slave/flags.hpp"
36 
37 namespace mesos {
38 namespace internal {
39 namespace slave {
40 
41 class FetcherProcess : public process::Process<FetcherProcess>
42 {
43 public:
44  explicit FetcherProcess(const Flags& _flags);
45  ~FetcherProcess() override;
46 
48  const ContainerID& containerId,
49  const CommandInfo& commandInfo,
50  const std::string& sandboxDirectory,
51  const Option<std::string>& user);
52 
53  // Runs the mesos-fetcher, creating a "stdout" and "stderr" file
54  // in the given directory, using these for trace output.
56  const ContainerID& containerId,
57  const std::string& sandboxDirectory,
58  const Option<std::string>& user,
59  const mesos::fetcher::FetcherInfo& info);
60 
61  // Best effort attempt to kill the external mesos-fetcher process
62  // running on behalf of the given container ID, if any.
63  void kill(const ContainerID& containerId);
64 
65  // Representation of the fetcher cache and its contents. There is
66  // exactly one instance per instance of FetcherProcess. All methods
67  // of Cache are to be executed on the latter to ensure atomicity of
68  // cache operations.
69  class Cache
70  {
71  public:
72  class Entry
73  {
74  public:
76  const std::string& key,
77  const std::string& directory,
78  const std::string& filename)
79  : key(key),
80  directory(directory),
81  filename(filename),
82  size(0),
83  referenceCount(0) {}
84 
85  ~Entry() {}
86 
87  // Marks this file's download as successful by setting its promise
88  // to the path of the file in the cache.
89  void complete();
90 
91  // Indicates whether this file's download into the cache is
92  // successfully completed.
94 
95  // Marks this download as failed, notifying concurrent fetch attempts
96  // waiting for this result, by setting the promise to failed.
97  void fail();
98 
99  // While an entry is "referenced" it cannot be evicted from the
100  // cache.
101  void reference();
102  void unreference();
103  bool isReferenced() const;
104 
105  // Returns the path in the filesystem where cache entry resides.
106  // TODO(bernd-mesos): Remove this construct after refactoring so
107  // that the slave flags get injected into the fetcher.
108  Path path() const { return Path(path::join(directory, filename)); }
109 
110  // Uniquely identifies a user/URI combination.
111  const std::string key;
112 
113  // Cache directory where this entry is stored.
114  // TODO(bernd-mesos): Remove this construct after refactoring so
115  // that the slave flags get injected into the fetcher.
116  const std::string directory;
117 
118  // The unique name of the file held in the cache on behalf of a
119  // URI.
120  const std::string filename;
121 
122  // The expected size of the cache file. This field is set before
123  // downloading. If the actual size of the downloaded file is
124  // different a warning is logged and the field's value adjusted.
126 
127  private:
128  // Concurrent fetch attempts can reference the same entry multiple
129  // times.
130  unsigned long referenceCount;
131 
132  // Indicates successful downloading to the cache.
134  };
135 
136  explicit Cache(Bytes _space) : space(_space), tally(0), filenameSerial(0) {}
137  virtual ~Cache() {}
138 
139  void claimSpace(const Bytes& bytes);
140  void releaseSpace(const Bytes& bytes);
141 
142  Bytes totalSpace() const;
143  Bytes usedSpace() const;
144  Bytes availableSpace() const;
145 
146  // Invents a new, distinct base name for a cache file, using the same
147  // filename extension as the URI.
148  std::string nextFilename(const CommandInfo::URI& uri);
149 
150  // Creates a new entry and inserts it into the cache table. Also
151  // sets its reference count to 1. Returns the entry.
152  std::shared_ptr<Entry> create(
153  const std::string& cacheDirectory,
154  const Option<std::string>& user,
155  const CommandInfo::URI& uri);
156 
157  // Retrieves the cache entry indexed by the parameters, without
158  // changing its reference count.
160  const Option<std::string>& user,
161  const std::string& uri);
162 
163  // Returns whether an entry for this user and URI is in the cache.
164  bool contains(
165  const Option<std::string>& user, const std::string& uri) const;
166 
167  // Returns whether this identical entry is in the cache.
168  bool contains(const std::shared_ptr<Cache::Entry>& entry) const;
169 
170  // Completely deletes a cache entry and its file. Warns on failure.
171  // Virtual for mock testing.
172  virtual Try<Nothing> remove(const std::shared_ptr<Entry>& entry);
173 
174  // Determines a list of cache entries to remove, respectively cache files
175  // to delete, so that at least the required amount of space would become
176  // available.
178  selectVictims(const Bytes& requiredSpace);
179 
180  // Ensures that there is the requested amount of space is available
181  // Evicts other files as necessary to make it so.
182  Try<Nothing> reserve(const Bytes& requestedSpace);
183 
184  // Finds out if any predictions about cache file sizes have been
185  // inaccurate, logs this if so, and records the cache files' actual
186  // sizes and adjusts the cache's total amount of space in use.
187  Try<Nothing> adjust(const std::shared_ptr<Cache::Entry>& entry);
188 
189  // Number of entries.
190  size_t size() const;
191 
192  private:
193  // Returns whether the cache file exists and not corrupted.
194  Try<Nothing> validate(const std::shared_ptr<Cache::Entry>& entry);
195 
196  // Maximum storable number of bytes in the cache directory.
197  const Bytes space;
198 
199  // How much space has been reserved to be occupied by cache files.
200  Bytes tally;
201 
202  // Used to generate distinct cache file names simply by counting.
203  unsigned long filenameSerial;
204 
205  // Maps keys (cache directory / URI combinations) to cache file
206  // entries.
208 
209  // Stores cache file entries sorted from LRU to MRU.
210  std::list<std::shared_ptr<Entry>> lruSortedEntries;
211  };
212 
213  // Public and virtual for mock testing.
215  const hashmap<
216  CommandInfo::URI,
217  Option<process::Future<std::shared_ptr<Cache::Entry>>>>&
218  entries,
219  const ContainerID& containerId,
220  const std::string& sandboxDirectory,
221  const std::string& cacheDirectory,
222  const Option<std::string>& user);
223 
224  // Returns a list of cache files on disk for the given slave
225  // (for all users combined). For testing.
227 
228  // Returns the number of cache entries for the given slave (for all
229  // users combined). For testing.
230  size_t cacheSize() const;
231 
232  // Returns the amount of remaining cache space that is not occupied
233  // by cache entries. For testing.
234  Bytes availableCacheSpace() const;
235 
236 private:
237  process::Future<Nothing> __fetch(
238  const hashmap<CommandInfo::URI,
239  Option<std::shared_ptr<Cache::Entry>>>& entries,
240  const ContainerID& containerId,
241  const std::string& sandboxDirectory,
242  const std::string& cacheDirectory,
243  const Option<std::string>& user);
244 
245  // Calls Cache::reserve() and returns a ready entry future if successful,
246  // else Failure. Claims the space and assigns the entry's size to this
247  // amount if and only if successful.
249  const Try<Bytes>& requestedSpace,
250  const std::shared_ptr<Cache::Entry>& entry);
251 
252  struct Metrics
253  {
254  explicit Metrics(FetcherProcess *fetcher);
255  ~Metrics();
256 
257  // NOTE: These metrics will increment at most once per task. Even if
258  // a single task asks for multiple artifacts, the total number of
259  // fetches will only go up by one. And if any of those artifacts
260  // fail to fetch, the failure count will only increase by one.
261  process::metrics::Counter task_fetches_succeeded;
262  process::metrics::Counter task_fetches_failed;
263 
264  process::metrics::PullGauge cache_size_total_bytes;
265  process::metrics::PullGauge cache_size_used_bytes;
266  } metrics;
267 
268  const Flags flags;
269 
270  Cache cache;
271 
272  hashmap<ContainerID, pid_t> subprocessPids;
273 };
274 
275 
276 } // namespace slave {
277 } // namespace internal {
278 } // namespace mesos {
279 
280 #endif // __SLAVE_CONTAINERIZER_FETCHER_PROCESS_HPP__
bool contains(const Option< std::string > &user, const std::string &uri) const
Try< Nothing > adjust(const std::shared_ptr< Cache::Entry > &entry)
Definition: check.hpp:33
Try< Nothing > reserve(const Bytes &requestedSpace)
Cache(Bytes _space)
Definition: fetcher_process.hpp:136
Result< std::string > user(Option< uid_t > uid=None())
Definition: su.hpp:284
Definition: fetcher_process.hpp:41
Try< std::list< Path > > cacheFiles() const
process::Future< Nothing > fetch(const ContainerID &containerId, const CommandInfo &commandInfo, const std::string &sandboxDirectory, const Option< std::string > &user)
const std::string key
Definition: fetcher_process.hpp:111
virtual process::Future< Nothing > _fetch(const hashmap< CommandInfo::URI, Option< process::Future< std::shared_ptr< Cache::Entry >>>> &entries, const ContainerID &containerId, const std::string &sandboxDirectory, const std::string &cacheDirectory, const Option< std::string > &user)
std::string join(const std::string &path1, const std::string &path2, const char _separator=os::PATH_SEPARATOR)
Definition: path.hpp:116
Definition: flags.hpp:39
std::string nextFilename(const CommandInfo::URI &uri)
Definition: counter.hpp:26
Definition: fetcher_process.hpp:69
const std::string filename
Definition: fetcher_process.hpp:120
Definition: hashmap.hpp:38
Definition: pull_gauge.hpp:46
~Entry()
Definition: fetcher_process.hpp:85
Represents a POSIX or Windows file system path and offers common path manipulations.
Definition: path.hpp:212
virtual process::Future< Nothing > run(const ContainerID &containerId, const std::string &sandboxDirectory, const Option< std::string > &user, const mesos::fetcher::FetcherInfo &info)
Definition: agent.hpp:25
const std::string directory
Definition: fetcher_process.hpp:116
virtual ~Cache()
Definition: fetcher_process.hpp:137
Path path() const
Definition: fetcher_process.hpp:108
Definition: attributes.hpp:24
Bytes size
Definition: fetcher_process.hpp:125
Entry(const std::string &key, const std::string &directory, const std::string &filename)
Definition: fetcher_process.hpp:75
Definition: uri.hpp:21
JSON::Object Metrics()
Definition: bytes.hpp:30
Definition: process.hpp:505
Try< std::list< std::shared_ptr< Cache::Entry > > > selectVictims(const Bytes &requiredSpace)
Definition: parse.hpp:33
void kill(const ContainerID &containerId)
std::shared_ptr< Entry > create(const std::string &cacheDirectory, const Option< std::string > &user, const CommandInfo::URI &uri)