Apache Mesos
jobobject.hpp
Go to the documentation of this file.
1 // Licensed under the Apache License, Version 2.0 (the "License");
2 // you may not use this file except in compliance with the License.
3 // You may obtain a copy of the License at
4 //
5 // http://www.apache.org/licenses/LICENSE-2.0
6 //
7 // Unless required by applicable law or agreed to in writing, software
8 // distributed under the License is distributed on an "AS IS" BASIS,
9 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
10 // See the License for the specific language governing permissions and
11 // limitations under the License.
12 
13 #ifndef __STOUT_WINDOWS_JOBOBJECT_HPP__
14 #define __STOUT_WINDOWS_JOBOBJECT_HPP__
15 
16 #include <algorithm>
17 #include <numeric>
18 #include <set>
19 #include <string>
20 
21 #include <stout/bytes.hpp>
22 #include <stout/none.hpp>
23 #include <stout/nothing.hpp>
24 #include <stout/stringify.hpp>
25 #include <stout/strings.hpp>
26 #include <stout/try.hpp>
27 #include <stout/windows.hpp>
28 
29 #include <stout/os/os.hpp>
30 #include <stout/os/process.hpp>
31 
32 namespace os {
33 
34 // `name_job` maps a `pid` to a `wstring` name for a job object.
35 // Only named job objects are accessible via `OpenJobObject`.
36 // Thus all our job objects must be named. This is essentially a shim
37 // to map the Linux concept of a process tree's root `pid` to a
38 // named job object so that the process group can be treated similarly.
40 {
41  Try<std::string> alpha_pid = strings::internal::format("MESOS_JOB_%X", pid);
42  if (alpha_pid.isError()) {
43  return Error(alpha_pid.error());
44  }
45  return wide_stringify(alpha_pid.get());
46 }
47 
48 
49 // `open_job` returns a safe shared handle to the named job object `name`.
50 // `desired_access` is a job object access rights flag.
51 // `inherit_handles` if true, processes created by this
52 // process will inherit the handle. Otherwise, the processes
53 // do not inherit this handle.
55  const DWORD desired_access,
56  const BOOL inherit_handles,
57  const std::wstring& name)
58 {
59  SharedHandle job_handle(
60  ::OpenJobObjectW(desired_access, inherit_handles, name.data()),
61  ::CloseHandle);
62 
63  if (job_handle.get_handle() == nullptr) {
64  return WindowsError(
65  "os::open_job: Call to `OpenJobObject` failed for job: " +
66  stringify(name));
67  }
68 
69  return job_handle;
70 }
71 
72 
74  const DWORD desired_access, const BOOL inherit_handles, const pid_t pid)
75 {
76  const Try<std::wstring> name = os::name_job(pid);
77  if (name.isError()) {
78  return Error(name.error());
79  }
80 
81  return open_job(desired_access, inherit_handles, name.get());
82 }
83 
84 // `create_job` function creates a named job object using `name`.
85 inline Try<SharedHandle> create_job(const std::wstring& name)
86 {
87  SharedHandle job_handle(
88  ::CreateJobObjectW(
89  nullptr, // Use a default security descriptor, and
90  // the created handle cannot be inherited.
91  name.data()), // The name of the job.
92  ::CloseHandle);
93 
94  if (job_handle.get_handle() == nullptr) {
95  return WindowsError(
96  "os::create_job: Call to `CreateJobObject` failed for job: " +
97  stringify(name));
98  }
99 
100  return job_handle;
101 }
102 
103 
104 // `get_job_info` gets the job object information for the process group
105 // represented by `pid`, assuming it is assigned to a job object. This function
106 // will fail otherwise.
107 //
108 // https://msdn.microsoft.com/en-us/library/windows/desktop/ms684925(v=vs.85).aspx // NOLINT(whitespace/line_length)
110 {
111  Try<SharedHandle> job_handle = os::open_job(JOB_OBJECT_QUERY, false, pid);
112  if (job_handle.isError()) {
113  return Error(job_handle.error());
114  }
115 
116  JOBOBJECT_BASIC_ACCOUNTING_INFORMATION info = {};
117 
118  const BOOL result = ::QueryInformationJobObject(
119  job_handle->get_handle(),
120  JobObjectBasicAccountingInformation,
121  &info,
122  sizeof(info),
123  nullptr);
124  if (result == FALSE) {
125  return WindowsError(
126  "os::get_job_info: call to `QueryInformationJobObject` failed");
127  }
128 
129  return info;
130 }
131 
132 
133 template <size_t max_pids>
135 {
136  // This is a statically allocated `JOBOBJECT_BASIC_PROCESS_ID_LIST`. We lie to
137  // the Windows API and construct our own struct to avoid (a) having to do
138  // hairy size calculations and (b) having to allocate dynamically, and then
139  // worry about deallocating.
140  struct
141  {
142  DWORD NumberOfAssignedProcesses;
143  DWORD NumberOfProcessIdsInList;
144  DWORD ProcessIdList[max_pids];
145  } pid_list;
146 
147  const BOOL result = ::QueryInformationJobObject(
148  job_handle.get_handle(),
149  JobObjectBasicProcessIdList,
150  reinterpret_cast<JOBOBJECT_BASIC_PROCESS_ID_LIST*>(&pid_list),
151  sizeof(pid_list),
152  nullptr);
153 
154  // `ERROR_MORE_DATA` indicates we need a larger `max_pids`.
155  if (result == FALSE && ::GetLastError() == ERROR_MORE_DATA) {
156  return None();
157  }
158 
159  if (result == FALSE) {
160  return WindowsError(
161  "os::_get_job_processes: call to `QueryInformationJobObject` failed");
162  }
163 
164  std::set<Process> processes;
165  for (DWORD i = 0; i < pid_list.NumberOfProcessIdsInList; ++i) {
166  Result<Process> process = os::process(pid_list.ProcessIdList[i]);
167  if (process.isSome()) {
168  processes.insert(process.get());
169  }
170  }
171 
172  return processes;
173 }
174 
175 
177 {
178  // TODO(andschwa): Overload open_job to use pid.
179  Try<SharedHandle> job_handle = os::open_job(JOB_OBJECT_QUERY, false, pid);
180  if (job_handle.isError()) {
181  return Error(job_handle.error());
182  }
183 
184  // Try to enumerate the processes with three sizes: 32, 1K, and 32K.
185 
186  Result<std::set<Process>> result =
187  os::_get_job_processes<32>(job_handle.get());
188  if (result.isError()) {
189  return Error(result.error());
190  } else if (result.isSome()) {
191  return result.get();
192  }
193 
194  result = os::_get_job_processes<32 * 32>(job_handle.get());
195  if (result.isError()) {
196  return Error(result.error());
197  } else if (result.isSome()) {
198  return result.get();
199  }
200 
201  result = os::_get_job_processes<32 * 32 * 32>(job_handle.get());
202  if (result.isError()) {
203  return Error(result.error());
204  } else if (result.isSome()) {
205  return result.get();
206  }
207 
208  // If it was bigger than 32K, something else has gone wrong.
209 
210  return Error("os::get_job_processes: failed to get processes");
211 }
212 
213 
215 {
217  if (processes.isError()) {
218  return Error(processes.error());
219  }
220 
221  return std::accumulate(
222  processes->cbegin(),
223  processes->cend(),
224  Bytes(0),
225  [](const Bytes& bytes, const Process& process) {
226  if (process.rss.isNone()) {
227  return bytes;
228  }
229 
230  return bytes + process.rss.get();
231  });
232 }
233 
234 
235 // `set_job_kill_on_close_limit` causes the job object to terminate all
236 // processes assigned to it when the last handle to the job object is closed.
237 // This can be used to limit the lifetime of the process group represented by
238 // the job object. Without this limit set, the processes will continue to run.
240 {
241  Try<SharedHandle> job_handle =
242  os::open_job(JOB_OBJECT_SET_ATTRIBUTES, false, pid);
243 
244  if (job_handle.isError()) {
245  return Error(job_handle.error());
246  }
247 
248  JOBOBJECT_EXTENDED_LIMIT_INFORMATION info = {};
249  info.BasicLimitInformation.LimitFlags = JOB_OBJECT_LIMIT_KILL_ON_JOB_CLOSE;
250 
251  const BOOL result = ::SetInformationJobObject(
252  job_handle->get_handle(),
253  JobObjectExtendedLimitInformation,
254  &info,
255  sizeof(info));
256 
257  if (result == FALSE) {
258  return WindowsError(
259  "os::set_job_kill_on_close_limit: call to `SetInformationJobObject` "
260  "failed");
261  }
262 
263  return Nothing();
264 }
265 
266 
267 // `set_job_cpu_limit` sets a CPU limit for the process represented by
268 // `pid`, assuming it is assigned to a job object. This function will fail
269 // otherwise. This limit is a hard cap enforced by the OS.
270 //
271 // https://msdn.microsoft.com/en-us/library/windows/desktop/hh448384(v=vs.85).aspx // NOLINT(whitespace/line_length)
273 {
274  JOBOBJECT_CPU_RATE_CONTROL_INFORMATION control_info = {};
275  control_info.ControlFlags =
276  JOB_OBJECT_CPU_RATE_CONTROL_ENABLE | JOB_OBJECT_CPU_RATE_CONTROL_HARD_CAP;
277 
278  // This `CpuRate` is the number of cycles per 10,000 cycles, or a percentage
279  // times 100, e.g. 20% yields 20 * 100 = 2,000. However, the `cpus` argument
280  // represents 1 CPU core with `1.0`, so a 100% CPU limit on a quad-core
281  // machine would be `4.0 cpus`. Thus a mapping of `cpus` to `CpuRate` is
282  // `(cpus / os::cpus()) * 100 * 100`, or the requested `cpus` divided by the
283  // number of CPUs to obtain a fractional representation, multiplied by 100 to
284  // make it a percentage, multiplied again by 100 to become a `CpuRate`.
285  //
286  // Mathematically, we're normalizing the requested CPUS to a range
287  // of [1, 10000] cycles. However, because the input is not
288  // sanitized, we have to handle the edge case of the ratio being
289  // greater than 1. So we take the `min(max(ratio * 10000, 1),
290  // 10000)`. We don't consider going out of bounds an error because
291  // CPU limitations are inherently imprecise.
292  const long total_cpus = os::cpus().get(); // This doesn't fail on Windows.
293  // This must be constrained. We don't care about perfect precision.
294  const long cycles = static_cast<long>((cpus / total_cpus) * 10000L);
295  const long cpu_rate = std::min(std::max(cycles, 1L), 10000L);
296  control_info.CpuRate = static_cast<DWORD>(cpu_rate);
297  Try<SharedHandle> job_handle =
298  os::open_job(JOB_OBJECT_SET_ATTRIBUTES, false, pid);
299  if (job_handle.isError()) {
300  return Error(job_handle.error());
301  }
302 
303  const BOOL result = ::SetInformationJobObject(
304  job_handle->get_handle(),
305  JobObjectCpuRateControlInformation,
306  &control_info,
307  sizeof(control_info));
308  if (result == FALSE) {
309  return WindowsError(
310  "os::set_job_cpu_limit: call to `SetInformationJobObject` failed");
311  }
312 
313  return Nothing();
314 }
315 
316 
317 // `set_job_mem_limit` sets a memory limit for the process represented by
318 // `pid`, assuming it is assigned to a job object. This function will fail
319 // otherwise. This limit is a hard cap enforced by the OS.
320 //
321 // https://msdn.microsoft.com/en-us/library/windows/desktop/ms684156(v=vs.85).aspx // NOLINT(whitespace/line_length)
323 {
324  JOBOBJECT_EXTENDED_LIMIT_INFORMATION info = {};
325  info.BasicLimitInformation.LimitFlags = JOB_OBJECT_LIMIT_JOB_MEMORY;
326  info.JobMemoryLimit = limit.bytes();
327 
328  Try<SharedHandle> job_handle =
329  os::open_job(JOB_OBJECT_SET_ATTRIBUTES, false, pid);
330  if (job_handle.isError()) {
331  return Error(job_handle.error());
332  }
333 
334  const BOOL result = ::SetInformationJobObject(
335  job_handle->get_handle(),
336  JobObjectExtendedLimitInformation,
337  &info,
338  sizeof(info));
339  if (result == FALSE) {
340  return WindowsError(
341  "os::set_job_mem_limit: call to `SetInformationJobObject` failed");
342  }
343 
344  return Nothing();
345 }
346 
347 
348 // `assign_job` assigns a process with `pid` to the job object `job_handle`.
349 // Every process started by the `pid` process using `CreateProcess`
350 // will also be owned by the job object.
351 inline Try<Nothing> assign_job(SharedHandle job_handle, pid_t pid)
352 {
353  // Get process handle for `pid`.
354  SharedHandle process_handle(
355  ::OpenProcess(
356  // Required access rights to assign to a Job Object.
357  PROCESS_SET_QUOTA | PROCESS_TERMINATE,
358  false, // Don't inherit handle.
359  pid),
360  ::CloseHandle);
361 
362  if (process_handle.get_handle() == nullptr) {
363  return WindowsError("os::assign_job: Call to `OpenProcess` failed");
364  }
365 
366  const BOOL result = ::AssignProcessToJobObject(
367  job_handle.get_handle(), process_handle.get_handle());
368 
369  if (result == FALSE) {
370  return WindowsError(
371  "os::assign_job: Call to `AssignProcessToJobObject` failed");
372  };
373 
374  return Nothing();
375 }
376 
377 
378 // The `kill_job` function wraps the Windows sytem call `TerminateJobObject`
379 // for the job object `job_handle`. This will call `TerminateProcess`
380 // for every associated child process.
382 {
383  const BOOL result = ::TerminateJobObject(
384  job_handle.get_handle(),
385  // The exit code to be used by all processes in the job object.
386  1);
387 
388  if (result == FALSE) {
389  return WindowsError("os::kill_job: Call to `TerminateJobObject` failed");
390  }
391 
392  return Nothing();
393 }
394 
395 } // namespace os {
396 
397 #endif // __STOUT_WINDOWS_JOBOBJECT_HPP__
Definition: nothing.hpp:16
Definition: errorbase.hpp:36
T & get()&
Definition: try.hpp:80
HANDLE get_handle() const
Definition: windows.hpp:90
Definition: windows.hpp:72
Definition: check.hpp:33
Try< std::list< Process > > processes()
Definition: os.hpp:184
Definition: error.hpp:108
Definition: posix_signalhandler.hpp:23
Try< std::wstring > name_job(pid_t pid)
Definition: jobobject.hpp:39
Definition: check.hpp:30
Try< Nothing > set_job_mem_limit(pid_t pid, Bytes limit)
Definition: jobobject.hpp:322
Try< Nothing > assign_job(SharedHandle job_handle, pid_t pid)
Definition: jobobject.hpp:351
Try< SharedHandle > open_job(const DWORD desired_access, const BOOL inherit_handles, const std::wstring &name)
Definition: jobobject.hpp:54
DWORD pid_t
Definition: windows.hpp:181
Definition: process.hpp:32
Try< SharedHandle > create_job(const std::wstring &name)
Definition: jobobject.hpp:85
Option< T > max(const Option< T > &left, const Option< T > &right)
Definition: option.hpp:214
Try< long > cpus()
Definition: os.hpp:265
Try< Nothing > kill_job(SharedHandle job_handle)
Definition: jobobject.hpp:381
Option< T > min(const Option< T > &left, const Option< T > &right)
Definition: option.hpp:185
static Try error(const E &e)
Definition: try.hpp:43
Result< Process > process(pid_t pid)
Definition: freebsd.hpp:30
Definition: none.hpp:27
bool isError() const
Definition: try.hpp:78
Try< Bytes > get_job_mem(pid_t pid)
Definition: jobobject.hpp:214
T & get()&
Definition: result.hpp:116
Result< std::set< Process > > _get_job_processes(const SharedHandle &job_handle)
Definition: jobobject.hpp:134
Definition: executor.hpp:48
uint64_t bytes() const
Definition: bytes.hpp:79
Try< std::set< Process > > get_job_processes(pid_t pid)
Definition: jobobject.hpp:176
Try< JOBOBJECT_BASIC_ACCOUNTING_INFORMATION > get_job_info(pid_t pid)
Definition: jobobject.hpp:109
bool isSome() const
Definition: result.hpp:112
Try< Nothing > set_job_cpu_limit(pid_t pid, double cpus)
Definition: jobobject.hpp:272
Try< std::string > format(const std::string &fmt, va_list args)
Definition: format.hpp:68
Definition: bytes.hpp:30
std::string stringify(int flags)
Try< Nothing > set_job_kill_on_close_limit(pid_t pid)
Definition: jobobject.hpp:239
constexpr const char * name
Definition: shell.hpp:41