Supported `ContainerLogger` with nested containers.
[mesos.git] / src / slave / containerizer / mesos / containerizer.cpp
1 // Licensed to the Apache Software Foundation (ASF) under one
2 // or more contributor license agreements. See the NOTICE file
3 // distributed with this work for additional information
4 // regarding copyright ownership. The ASF licenses this file
5 // to you under the Apache License, Version 2.0 (the
6 // "License"); you may not use this file except in compliance
7 // with the License. You may obtain a copy of the License at
8 //
9 // http://www.apache.org/licenses/LICENSE-2.0
10 //
11 // Unless required by applicable law or agreed to in writing, software
12 // distributed under the License is distributed on an "AS IS" BASIS,
13 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 // See the License for the specific language governing permissions and
15 // limitations under the License.
16
17 #ifndef __WINDOWS__
18 #include <sys/wait.h>
19 #endif // __WINDOWS__
20
21 #include <set>
22
23 #include <mesos/module/isolator.hpp>
24
25 #include <mesos/slave/container_logger.hpp>
26 #include <mesos/slave/isolator.hpp>
27
28 #include <process/collect.hpp>
29 #include <process/defer.hpp>
30 #include <process/io.hpp>
31 #include <process/owned.hpp>
32 #include <process/reap.hpp>
33 #include <process/subprocess.hpp>
34
35 #include <process/metrics/metrics.hpp>
36
37 #include <stout/adaptor.hpp>
38 #include <stout/foreach.hpp>
39 #include <stout/fs.hpp>
40 #include <stout/hashmap.hpp>
41 #include <stout/lambda.hpp>
42 #include <stout/os.hpp>
43 #include <stout/path.hpp>
44 #include <stout/strings.hpp>
45 #include <stout/unreachable.hpp>
46
47 #include "common/protobuf_utils.hpp"
48
49 #include "hook/manager.hpp"
50
51 #include "module/manager.hpp"
52
53 #include "slave/paths.hpp"
54 #include "slave/slave.hpp"
55
56 #include "slave/containerizer/containerizer.hpp"
57 #include "slave/containerizer/fetcher.hpp"
58
59 #include "slave/containerizer/mesos/constants.hpp"
60 #include "slave/containerizer/mesos/launch.hpp"
61 #include "slave/containerizer/mesos/launcher.hpp"
62 #include "slave/containerizer/mesos/containerizer.hpp"
63 #include "slave/containerizer/mesos/paths.hpp"
64 #include "slave/containerizer/mesos/utils.hpp"
65
66 #include "slave/containerizer/mesos/isolators/filesystem/posix.hpp"
67 #include "slave/containerizer/mesos/isolators/posix.hpp"
68 #include "slave/containerizer/mesos/isolators/posix/disk.hpp"
69 #include "slave/containerizer/mesos/isolators/volume/sandbox_path.hpp"
70
71 #include "slave/containerizer/mesos/provisioner/provisioner.hpp"
72
73 #ifdef __WINDOWS__
74 #include "slave/containerizer/mesos/isolators/windows.hpp"
75 #include "slave/containerizer/mesos/isolators/filesystem/windows.hpp"
76 #endif // __WINDOWS__
77
78 #ifdef __linux__
79 #include "slave/containerizer/mesos/linux_launcher.hpp"
80
81 #include "slave/containerizer/mesos/isolators/appc/runtime.hpp"
82 #include "slave/containerizer/mesos/isolators/cgroups/cgroups.hpp"
83 #include "slave/containerizer/mesos/isolators/docker/runtime.hpp"
84 #include "slave/containerizer/mesos/isolators/docker/volume/isolator.hpp"
85 #include "slave/containerizer/mesos/isolators/filesystem/linux.hpp"
86 #include "slave/containerizer/mesos/isolators/filesystem/shared.hpp"
87 #include "slave/containerizer/mesos/isolators/gpu/nvidia.hpp"
88 #include "slave/containerizer/mesos/isolators/linux/capabilities.hpp"
89 #include "slave/containerizer/mesos/isolators/namespaces/pid.hpp"
90 #include "slave/containerizer/mesos/isolators/network/cni/cni.hpp"
91 #include "slave/containerizer/mesos/isolators/volume/image.hpp"
92 #endif // __linux__
93
94 #ifdef WITH_NETWORK_ISOLATOR
95 #include "slave/containerizer/mesos/isolators/network/port_mapping.hpp"
96 #endif
97
98 #if ENABLE_XFS_DISK_ISOLATOR
99 #include "slave/containerizer/mesos/isolators/xfs/disk.hpp"
100 #endif
101
102 using process::collect;
103 using process::dispatch;
104 using process::defer;
105
106 using process::Failure;
107 using process::Future;
108 using process::Owned;
109
110 using std::list;
111 using std::map;
112 using std::set;
113 using std::string;
114 using std::vector;
115
116 using mesos::internal::slave::state::SlaveState;
117 using mesos::internal::slave::state::FrameworkState;
118 using mesos::internal::slave::state::ExecutorState;
119 using mesos::internal::slave::state::RunState;
120
121 using mesos::modules::ModuleManager;
122
123 using mesos::slave::ContainerConfig;
124 using mesos::slave::ContainerLaunchInfo;
125 using mesos::slave::ContainerLimitation;
126 using mesos::slave::ContainerLogger;
127 using mesos::slave::ContainerState;
128 using mesos::slave::ContainerTermination;
129 using mesos::slave::Isolator;
130
131 namespace mesos {
132 namespace internal {
133 namespace slave {
134
135 Try<MesosContainerizer*> MesosContainerizer::create(
136 const Flags& flags,
137 bool local,
138 Fetcher* fetcher,
139 const Option<NvidiaComponents>& nvidia)
140 {
141 // Modify `flags` based on the deprecated `isolation` flag (and then
142 // use `flags_` in the rest of this function).
143 Flags flags_ = flags;
144
145 if (flags.isolation == "process") {
146 LOG(WARNING) << "The 'process' isolation flag is deprecated, "
147 << "please update your flags to"
148 << " '--isolation=posix/cpu,posix/mem'.";
149
150 flags_.isolation = "posix/cpu,posix/mem";
151 } else if (flags.isolation == "cgroups") {
152 LOG(WARNING) << "The 'cgroups' isolation flag is deprecated, "
153 << "please update your flags to"
154 << " '--isolation=cgroups/cpu,cgroups/mem'.";
155
156 flags_.isolation = "cgroups/cpu,cgroups/mem";
157 }
158
159 // One and only one filesystem isolator is required. The filesystem
160 // isolator is responsible for preparing the filesystems for
161 // containers (e.g., prepare filesystem roots, volumes, etc.). If
162 // the user does not specify one, 'filesystem/posix' will be used.
163 //
164 // TODO(jieyu): Check that only one filesystem isolator is used.
165 if (!strings::contains(flags_.isolation, "filesystem/")) {
166 flags_.isolation += ",filesystem/posix";
167 }
168
169 if (strings::contains(flags_.isolation, "posix/disk")) {
170 LOG(WARNING) << "'posix/disk' has been renamed as 'disk/du', "
171 << "please update your --isolation flag to use 'disk/du'";
172
173 if (strings::contains(flags_.isolation, "disk/du")) {
174 return Error(
175 "Using 'posix/disk' and 'disk/du' simultaneously is disallowed");
176 }
177 }
178
179 #ifdef __linux__
180 // One and only one `network` isolator is required. The network
181 // isolator is responsible for preparing the network namespace for
182 // containers. If the user does not specify one, 'network/cni'
183 // isolator will be used.
184
185 // TODO(jieyu): Check that only one network isolator is used.
186 if (!strings::contains(flags_.isolation, "network/")) {
187 flags_.isolation += ",network/cni";
188 }
189
190 // Always enable 'volume/image' on linux if 'filesystem/linux' is
191 // enabled, to ensure backwards compatibility.
192 //
193 // TODO(gilbert): Make sure the 'gpu/nvidia' isolator to be created
194 // after all volume isolators, so that the nvidia gpu libraries
195 // '/usr/local/nvidia' will be overwritten.
196 if (strings::contains(flags_.isolation, "filesystem/linux") &&
197 !strings::contains(flags_.isolation, "volume/image")) {
198 flags_.isolation += ",volume/image";
199 }
200 #endif // __linux__
201
202 LOG(INFO) << "Using isolation: " << flags_.isolation;
203
204 // Create the container logger for the MesosContainerizer.
205 Try<ContainerLogger*> logger =
206 ContainerLogger::create(flags_.container_logger);
207
208 if (logger.isError()) {
209 return Error("Failed to create container logger: " + logger.error());
210 }
211
212 // Create the launcher for the MesosContainerizer.
213 Try<Launcher*> launcher = [&flags_]() -> Try<Launcher*> {
214 #ifdef __linux__
215 if (flags_.launcher == "linux") {
216 return LinuxLauncher::create(flags_);
217 } else if (flags_.launcher == "posix") {
218 return PosixLauncher::create(flags_);
219 } else {
220 return Error("Unknown or unsupported launcher: " + flags_.launcher);
221 }
222 #elif __WINDOWS__
223 if (flags_.launcher != "windows") {
224 return Error("Unsupported launcher: " + flags_.launcher);
225 }
226
227 return WindowsLauncher::create(flags_);
228 #else
229 if (flags_.launcher != "posix") {
230 return Error("Unsupported launcher: " + flags_.launcher);
231 }
232
233 return PosixLauncher::create(flags_);
234 #endif // __linux__
235 }();
236
237 if (launcher.isError()) {
238 return Error("Failed to create launcher: " + launcher.error());
239 }
240
241 Try<Owned<Provisioner>> _provisioner = Provisioner::create(flags_);
242 if (_provisioner.isError()) {
243 return Error("Failed to create provisioner: " + _provisioner.error());
244 }
245
246 Shared<Provisioner> provisioner = _provisioner.get().share();
247
248 // Create the isolators.
249 //
250 // Currently, the order of the entries in the --isolation flag
251 // specifies the ordering of the isolators. Specifically, the
252 // `create` and `prepare` calls for each isolator are run serially
253 // in the order in which they appear in the --isolation flag, while
254 // the `cleanup` call is serialized in reverse order.
255 //
256 // It is the responsibility of each isolator to check its
257 // dependency requirements (if any) during its `create`
258 // execution. This means that if the operator specifies the
259 // flags in the wrong order, it will produce an error during
260 // isolator creation.
261 //
262 // NOTE: We ignore the placement of the filesystem isolator in
263 // the --isolation flag and place it at the front of the isolator
264 // list. This is a temporary hack until isolators are able to
265 // express and validate their ordering requirements.
266
267 const hashmap<string, lambda::function<Try<Isolator*>(const Flags&)>>
268 creators = {
269 // Filesystem isolators.
270 #ifndef __WINDOWS__
271 {"filesystem/posix", &PosixFilesystemIsolatorProcess::create},
272 #else
273 {"filesystem/windows", &WindowsFilesystemIsolatorProcess::create},
274 #endif // __WINDOWS__
275 #ifdef __linux__
276 {"filesystem/linux", &LinuxFilesystemIsolatorProcess::create},
277
278 // TODO(jieyu): Deprecate this in favor of using filesystem/linux.
279 {"filesystem/shared", &SharedFilesystemIsolatorProcess::create},
280 #endif // __linux__
281
282 // Runtime isolators.
283 #ifndef __WINDOWS__
284 {"posix/cpu", &PosixCpuIsolatorProcess::create},
285 {"posix/mem", &PosixMemIsolatorProcess::create},
286
287 // "posix/disk" is deprecated in favor of the name "disk/du".
288 {"posix/disk", &PosixDiskIsolatorProcess::create},
289 {"disk/du", &PosixDiskIsolatorProcess::create},
290 {"volume/sandbox_path", &VolumeSandboxPathIsolatorProcess::create},
291
292 #if ENABLE_XFS_DISK_ISOLATOR
293 {"disk/xfs", &XfsDiskIsolatorProcess::create},
294 #endif
295 #else
296 {"windows/cpu", &WindowsCpuIsolatorProcess::create},
297 #endif // __WINDOWS__
298
299 #ifdef __linux__
300 {"cgroups/cpu", &CgroupsIsolatorProcess::create},
301 {"cgroups/devices", &CgroupsIsolatorProcess::create},
302 {"cgroups/mem", &CgroupsIsolatorProcess::create},
303 {"cgroups/net_cls", &CgroupsIsolatorProcess::create},
304 {"cgroups/perf_event", &CgroupsIsolatorProcess::create},
305 {"appc/runtime", &AppcRuntimeIsolatorProcess::create},
306 {"docker/runtime", &DockerRuntimeIsolatorProcess::create},
307 {"docker/volume", &DockerVolumeIsolatorProcess::create},
308 {"linux/capabilities", &LinuxCapabilitiesIsolatorProcess::create},
309
310 {"volume/image",
311 [&provisioner] (const Flags& flags) -> Try<Isolator*> {
312 return VolumeImageIsolatorProcess::create(flags, provisioner);
313 }},
314
315 {"gpu/nvidia",
316 [&nvidia] (const Flags& flags) -> Try<Isolator*> {
317 if (!nvml::isAvailable()) {
318 return Error("Cannot create the Nvidia GPU isolator:"
319 " NVML is not available");
320 }
321
322 CHECK_SOME(nvidia)
323 << "Nvidia components should be set when NVML is available";
324
325 return NvidiaGpuIsolatorProcess::create(flags, nvidia.get());
326 }},
327
328 {"namespaces/pid", &NamespacesPidIsolatorProcess::create},
329 {"network/cni", &NetworkCniIsolatorProcess::create},
330 #endif // __linux__
331 // NOTE: Network isolation is currently not supported on Windows builds.
332 #if !defined(__WINDOWS__) && defined(WITH_NETWORK_ISOLATOR)
333 {"network/port_mapping", &PortMappingIsolatorProcess::create},
334 #endif
335 };
336
337 vector<string> tokens = strings::tokenize(flags_.isolation, ",");
338 set<string> isolations = set<string>(tokens.begin(), tokens.end());
339
340 if (tokens.size() != isolations.size()) {
341 return Error("Duplicate entries found in --isolation flag '" +
342 stringify(tokens) + "'");
343 }
344
345 vector<Owned<Isolator>> isolators;
346
347 // Note: For cgroups, we only create `CgroupsIsolatorProcess` once.
348 // We use this flag to identify whether `CgroupsIsolatorProcess` has
349 // been created or not.
350 bool cgroupsIsolatorCreated = false;
351
352 foreach (const string& isolation, isolations) {
353 if (strings::startsWith(isolation, "cgroups/")) {
354 if (cgroupsIsolatorCreated) {
355 // Skip when `CgroupsIsolatorProcess` have been created.
356 continue;
357 } else {
358 cgroupsIsolatorCreated = true;
359 }
360 }
361
362 Try<Isolator*> isolator = [&]() -> Try<Isolator*> {
363 if (creators.contains(isolation)) {
364 return creators.at(isolation)(flags_);
365 } else if (ModuleManager::contains<Isolator>(isolation)) {
366 return ModuleManager::create<Isolator>(isolation);
367 }
368 return Error("Unknown or unsupported isolator");
369 }();
370
371 if (isolator.isError()) {
372 return Error("Failed to create isolator '" + isolation + "': " +
373 isolator.error());
374 }
375
376 // NOTE: The filesystem isolator must be the first isolator used
377 // so that the runtime isolators can have a consistent view on the
378 // prepared filesystem (e.g., any volume mounts are performed).
379 if (strings::contains(isolation, "filesystem/")) {
380 isolators.insert(isolators.begin(), Owned<Isolator>(isolator.get()));
381 } else {
382 isolators.push_back(Owned<Isolator>(isolator.get()));
383 }
384 }
385
386 return new MesosContainerizer(
387 flags_,
388 local,
389 fetcher,
390 Owned<ContainerLogger>(logger.get()),
391 Owned<Launcher>(launcher.get()),
392 provisioner,
393 isolators);
394 }
395
396
397 MesosContainerizer::MesosContainerizer(
398 const Flags& flags,
399 bool local,
400 Fetcher* fetcher,
401 const Owned<ContainerLogger>& logger,
402 const Owned<Launcher>& launcher,
403 const Shared<Provisioner>& provisioner,
404 const vector<Owned<Isolator>>& isolators)
405 : process(new MesosContainerizerProcess(
406 flags,
407 local,
408 fetcher,
409 logger,
410 launcher,
411 provisioner,
412 isolators))
413 {
414 spawn(process.get());
415 }
416
417
418 MesosContainerizer::MesosContainerizer(
419 const Owned<MesosContainerizerProcess>& _process)
420 : process(_process)
421 {
422 spawn(process.get());
423 }
424
425
426 MesosContainerizer::~MesosContainerizer()
427 {
428 terminate(process.get());
429 process::wait(process.get());
430 }
431
432
433 Future<Nothing> MesosContainerizer::recover(
434 const Option<state::SlaveState>& state)
435 {
436 return dispatch(process.get(),
437 &MesosContainerizerProcess::recover,
438 state);
439 }
440
441
442 Future<bool> MesosContainerizer::launch(
443 const ContainerID& containerId,
444 const Option<TaskInfo>& taskInfo,
445 const ExecutorInfo& executorInfo,
446 const string& directory,
447 const Option<string>& user,
448 const SlaveID& slaveId,
449 const map<string, string>& environment,
450 bool checkpoint)
451 {
452 // Need to disambiguate for the compiler.
453 Future<bool> (MesosContainerizerProcess::*launch)(
454 const ContainerID&,
455 const Option<TaskInfo>&,
456 const ExecutorInfo&,
457 const string&,
458 const Option<string>&,
459 const SlaveID&,
460 const map<string, string>&,
461 bool) = &MesosContainerizerProcess::launch;
462
463 return dispatch(process.get(),
464 launch,
465 containerId,
466 taskInfo,
467 executorInfo,
468 directory,
469 user,
470 slaveId,
471 environment,
472 checkpoint);
473 }
474
475
476 Future<bool> MesosContainerizer::launch(
477 const ContainerID& containerId,
478 const CommandInfo& commandInfo,
479 const Option<ContainerInfo>& containerInfo,
480 const Option<string>& user,
481 const SlaveID& slaveId)
482 {
483 // Need to disambiguate for the compiler.
484 Future<bool> (MesosContainerizerProcess::*launch)(
485 const ContainerID&,
486 const CommandInfo&,
487 const Option<ContainerInfo>&,
488 const Option<string>&,
489 const SlaveID&) = &MesosContainerizerProcess::launch;
490
491 return dispatch(process.get(),
492 launch,
493 containerId,
494 commandInfo,
495 containerInfo,
496 user,
497 slaveId);
498 }
499
500
501 Future<Nothing> MesosContainerizer::update(
502 const ContainerID& containerId,
503 const Resources& resources)
504 {
505 return dispatch(process.get(),
506 &MesosContainerizerProcess::update,
507 containerId,
508 resources);
509 }
510
511
512 Future<ResourceStatistics> MesosContainerizer::usage(
513 const ContainerID& containerId)
514 {
515 return dispatch(process.get(),
516 &MesosContainerizerProcess::usage,
517 containerId);
518 }
519
520
521 Future<ContainerStatus> MesosContainerizer::status(
522 const ContainerID& containerId)
523 {
524 return dispatch(process.get(),
525 &MesosContainerizerProcess::status,
526 containerId);
527 }
528
529
530 Future<Option<ContainerTermination>> MesosContainerizer::wait(
531 const ContainerID& containerId)
532 {
533 return dispatch(process.get(),
534 &MesosContainerizerProcess::wait,
535 containerId);
536 }
537
538
539 Future<bool> MesosContainerizer::destroy(const ContainerID& containerId)
540 {
541 return dispatch(process.get(),
542 &MesosContainerizerProcess::destroy,
543 containerId);
544 }
545
546
547 Future<hashset<ContainerID>> MesosContainerizer::containers()
548 {
549 return dispatch(process.get(),
550 &MesosContainerizerProcess::containers);
551 }
552
553
554 Future<Nothing> MesosContainerizerProcess::recover(
555 const Option<state::SlaveState>& state)
556 {
557 LOG(INFO) << "Recovering containerizer";
558
559 // Gather the executor run states that we will attempt to recover.
560 list<ContainerState> recoverable;
561 if (state.isSome()) {
562 foreachvalue (const FrameworkState& framework, state.get().frameworks) {
563 foreachvalue (const ExecutorState& executor, framework.executors) {
564 if (executor.info.isNone()) {
565 LOG(WARNING) << "Skipping recovery of executor '" << executor.id
566 << "' of framework " << framework.id
567 << " because its info could not be recovered";
568 continue;
569 }
570
571 if (executor.latest.isNone()) {
572 LOG(WARNING) << "Skipping recovery of executor '" << executor.id
573 << "' of framework " << framework.id
574 << " because its latest run could not be recovered";
575 continue;
576 }
577
578 // We are only interested in the latest run of the executor!
579 const ContainerID& containerId = executor.latest.get();
580 Option<RunState> run = executor.runs.get(containerId);
581 CHECK_SOME(run);
582 CHECK_SOME(run.get().id);
583
584 // We need the pid so the reaper can monitor the executor so
585 // skip this executor if it's not present. This is not an
586 // error because the slave will try to wait on the container
587 // which will return a failed ContainerTermination and
588 // everything will get cleaned up.
589 if (!run.get().forkedPid.isSome()) {
590 continue;
591 }
592
593 if (run.get().completed) {
594 VLOG(1) << "Skipping recovery of executor '" << executor.id
595 << "' of framework " << framework.id
596 << " because its latest run "
597 << containerId << " is completed";
598 continue;
599 }
600
601 // Note that MesosContainerizer will also recover executors
602 // launched by the DockerContainerizer as before 0.23 the
603 // slave doesn't checkpoint container information.
604 const ExecutorInfo& executorInfo = executor.info.get();
605 if (executorInfo.has_container() &&
606 executorInfo.container().type() != ContainerInfo::MESOS) {
607 LOG(INFO) << "Skipping recovery of executor '" << executor.id
608 << "' of framework " << framework.id
609 << " because it was not launched from mesos containerizer";
610 continue;
611 }
612
613 LOG(INFO) << "Recovering container " << containerId
614 << " for executor '" << executor.id
615 << "' of framework " << framework.id;
616
617 // NOTE: We create the executor directory before checkpointing
618 // the executor. Therefore, it's not possible for this
619 // directory to be non-existent.
620 const string& directory = paths::getExecutorRunPath(
621 flags.work_dir,
622 state.get().id,
623 framework.id,
624 executor.id,
625 containerId);
626
627 CHECK(os::exists(directory));
628
629 ContainerState executorRunState =
630 protobuf::slave::createContainerState(
631 executorInfo,
632 run.get().id.get(),
633 run.get().forkedPid.get(),
634 directory);
635
636 recoverable.push_back(executorRunState);
637 }
638 }
639 }
640
641 // Recover the executor containers from 'SlaveState'.
642 hashset<ContainerID> alive;
643 foreach (const ContainerState& state, recoverable) {
644 ContainerID containerId = state.container_id();
645 alive.insert(containerId);
646
647 // Contruct the structure for containers from the 'SlaveState'
648 // first, to maintain the children list in the container.
649 Owned<Container> container(new Container());
650 container->status = reap(containerId, state.pid());
651
652 // We only checkpoint the containerizer pid after the container
653 // successfully launched, therefore we can assume checkpointed
654 // containers should be running after recover.
655 container->state = RUNNING;
656 container->pid = state.pid();
657 container->directory = state.directory();
658 containers_[containerId] = container;
659 }
660
661 // TODO(gilbert): Draw the logic VENN Diagram here in comment.
662 hashset<ContainerID> orphans;
663
664 // Recover the containers from the runtime directory.
665 Try<vector<ContainerID>> containerIds =
666 containerizer::paths::getContainerIds(flags.runtime_dir);
667
668 if (containerIds.isError()) {
669 return Failure(
670 "Failed to get container ids from the runtime directory: " +
671 containerIds.error());
672 }
673
674 // Reconcile the runtime containers with the containers from
675 // `recoverable`. Treat discovered orphans as "known orphans"
676 // that we aggregate with any orphans that get returned from
677 // calling `launcher->recover`.
678 foreach (const ContainerID& containerId, containerIds.get()) {
679 if (alive.contains(containerId)) {
680 continue;
681 }
682
683 // Nested containers may have already been destroyed, but we leave
684 // their runtime directories around for the lifetime of their
685 // top-level container. If they have already been destroyed, we
686 // checkpoint their termination state, so the existence of this
687 // checkpointed information means we can safely ignore them here.
688 const string terminationPath = path::join(
689 containerizer::paths::getRuntimePath(flags.runtime_dir, containerId),
690 containerizer::paths::TERMINATION_FILE);
691
692 if (os::exists(terminationPath)) {
693 continue;
694 }
695
696 // Attempt to read the pid from the container runtime directory.
697 Result<pid_t> pid =
698 containerizer::paths::getContainerPid(flags.runtime_dir, containerId);
699
700 if (pid.isError()) {
701 return Failure("Failed to get container pid: " + pid.error());
702 }
703
704 // Determine the sandbox if this is a nested container.
705 Option<string> directory;
706 if (containerId.has_parent()) {
707 const ContainerID& rootContainerId = getRootContainerId(containerId);
708 CHECK(containers_.contains(rootContainerId));
709
710 if (containers_[rootContainerId]->directory.isSome()) {
711 directory = containerizer::paths::getSandboxPath(
712 containers_[rootContainerId]->directory.get(),
713 containerId);
714 }
715 }
716
717 Owned<Container> container(new Container());
718 container->state = RUNNING;
719 container->pid = pid.isSome() ? pid.get() : Option<pid_t>();
720 container->directory = directory;
721
722 // Invoke 'reap' on each 'Container'. However, It's possible
723 // that 'pid' for a container is unknown (e.g., agent crashes
724 // after fork before checkpoint the pid). In that case, simply
725 // assume the child process will exit because of the pipe,
726 // and do not call 'reap' on it.
727 if (pid.isSome()) {
728 container->status = reap(containerId, pid.get());
729 } else {
730 container->status = Future<Option<int>>(None());
731 }
732
733 containers_[containerId] = container;
734
735 // Add recoverable nested containers to the list of 'ContainerState'.
736 if (containerId.has_parent() &&
737 alive.contains(getRootContainerId(containerId)) &&
738 pid.isSome()) {
739 CHECK_SOME(directory);
740 ContainerState state =
741 protobuf::slave::createContainerState(
742 None(),
743 containerId,
744 container->pid.get(),
745 container->directory.get());
746
747 recoverable.push_back(state);
748 continue;
749 }
750
751 orphans.insert(containerId);
752 }
753
754 // Try to recover the launcher first.
755 return launcher->recover(recoverable)
756 .then(defer(self(), [=](
757 const hashset<ContainerID>& launchedOrphans) -> Future<Nothing> {
758 // For the extra part of launcher orphans, which are not included
759 // in the constructed orphan list. The parent-child relationship
760 // will be maintained at the end of 'recover' before orphans are
761 // cleaned up.
762 hashset<ContainerID> _orphans = orphans;
763 foreach (const ContainerID& containerId, launchedOrphans) {
764 if (orphans.contains(containerId)) {
765 continue;
766 }
767
768 Owned<Container> container(new Container());
769 container->state = RUNNING;
770 container->status = Future<Option<int>>(None());
771 containers_[containerId] = container;
772
773 _orphans.insert(containerId);
774 }
775
776 return _recover(recoverable, _orphans);
777 }));
778 }
779
780
781 Future<Nothing> MesosContainerizerProcess::_recover(
782 const list<ContainerState>& recoverable,
783 const hashset<ContainerID>& orphans)
784 {
785 // Recover isolators first then recover the provisioner, because of
786 // possible cleanups on unknown containers.
787 return recoverIsolators(recoverable, orphans)
788 .then(defer(self(), &Self::recoverProvisioner, recoverable, orphans))
789 .then(defer(self(), &Self::__recover, recoverable, orphans));
790 }
791
792
793 Future<list<Nothing>> MesosContainerizerProcess::recoverIsolators(
794 const list<ContainerState>& recoverable,
795 const hashset<ContainerID>& orphans)
796 {
797 list<Future<Nothing>> futures;
798
799 // Then recover the isolators.
800 foreach (const Owned<Isolator>& isolator, isolators) {
801 // NOTE: We should not send nested containers to the isolator if
802 // the isolator does not support nesting.
803 if (isolator->supportsNesting()) {
804 futures.push_back(isolator->recover(recoverable, orphans));
805 } else {
806 // Strip nested containers from 'recoverable' and 'orphans'.
807 list<ContainerState> _recoverable;
808 hashset<ContainerID> _orphans;
809
810 foreach (const ContainerState& state, recoverable) {
811 if (!state.container_id().has_parent()) {
812 _recoverable.push_back(state);
813 }
814 }
815
816 foreach (const ContainerID& orphan, orphans) {
817 if (!orphan.has_parent()) {
818 _orphans.insert(orphan);
819 }
820 }
821
822 futures.push_back(isolator->recover(_recoverable, _orphans));
823 }
824 }
825
826 // If all isolators recover then continue.
827 return collect(futures);
828 }
829
830
831 Future<Nothing> MesosContainerizerProcess::recoverProvisioner(
832 const list<ContainerState>& recoverable,
833 const hashset<ContainerID>& orphans)
834 {
835 // TODO(gilbert): Consolidate 'recoverProvisioner()' interface
836 // once the launcher returns a full set of known containers.
837 hashset<ContainerID> knownContainerIds = orphans;
838
839 foreach (const ContainerState& state, recoverable) {
840 knownContainerIds.insert(state.container_id());
841 }
842
843 return provisioner->recover(knownContainerIds);
844 }
845
846
847 Future<Nothing> MesosContainerizerProcess::__recover(
848 const list<ContainerState>& recovered,
849 const hashset<ContainerID>& orphans)
850 {
851 foreach (const ContainerState& run, recovered) {
852 const ContainerID& containerId = run.container_id();
853
854 foreach (const Owned<Isolator>& isolator, isolators) {
855 // If this is a nested container, we need to skip isolators that
856 // do not support nesting.
857 if (containerId.has_parent() && !isolator->supportsNesting()) {
858 continue;
859 }
860
861 isolator->watch(containerId)
862 .onAny(defer(self(), &Self::limited, containerId, lambda::_1));
863 }
864 }
865
866 // Maintain the children list in the `Container` struct.
867 foreachpair (const ContainerID& containerId,
868 const Owned<Container>& container,
869 containers_) {
870 if (containerId.has_parent()) {
871 CHECK(containers_.contains(containerId.parent()));
872 containers_[containerId.parent()]->children.insert(containerId);
873 }
874
875 // NOTE: We do not register the callback until we correctly setup
876 // the parent/child relationship. 'destroy' uses that information
877 // to make sure all child containers are cleaned up before it
878 // starts to cleanup the parent container.
879 container->status->onAny(defer(self(), &Self::reaped, containerId));
880 }
881
882 // Destroy all the orphan containers.
883 foreach (const ContainerID& containerId, orphans) {
884 LOG(INFO) << "Cleaning up orphan container " << containerId;
885 destroy(containerId);
886 }
887
888 return Nothing();
889 }
890
891
892 // Launching an executor involves the following steps:
893 // 1. Call prepare on each isolator.
894 // 2. Fork the executor. The forked child is blocked from exec'ing until it has
895 // been isolated.
896 // 3. Isolate the executor. Call isolate with the pid for each isolator.
897 // 4. Fetch the executor.
898 // 5. Exec the executor. The forked child is signalled to continue. It will
899 // first execute any preparation commands from isolators and then exec the
900 // executor.
901 Future<bool> MesosContainerizerProcess::launch(
902 const ContainerID& containerId,
903 const Option<TaskInfo>& taskInfo,
904 const ExecutorInfo& _executorInfo,
905 const string& directory,
906 const Option<string>& user,
907 const SlaveID& slaveId,
908 const map<string, string>& environment,
909 bool checkpoint)
910 {
911 CHECK(!containerId.has_parent());
912
913 if (containers_.contains(containerId)) {
914 return Failure("Container already started");
915 }
916
917 if (taskInfo.isSome() &&
918 taskInfo.get().has_container() &&
919 taskInfo.get().container().type() != ContainerInfo::MESOS) {
920 return false;
921 }
922
923 // NOTE: We make a copy of the executor info because we may mutate
924 // it with default container info.
925 ExecutorInfo executorInfo = _executorInfo;
926
927 if (executorInfo.has_container() &&
928 executorInfo.container().type() != ContainerInfo::MESOS) {
929 return false;
930 }
931
932 // Add the default container info to the executor info.
933 // TODO(jieyu): Rename the flag to be default_mesos_container_info.
934 if (!executorInfo.has_container() &&
935 flags.default_container_info.isSome()) {
936 executorInfo.mutable_container()->CopyFrom(
937 flags.default_container_info.get());
938 }
939
940 LOG(INFO) << "Starting container " << containerId
941 << " for executor '" << executorInfo.executor_id()
942 << "' of framework " << executorInfo.framework_id();
943
944 ContainerConfig containerConfig;
945 containerConfig.mutable_executor_info()->CopyFrom(executorInfo);
946 containerConfig.mutable_command_info()->CopyFrom(executorInfo.command());
947 containerConfig.mutable_resources()->CopyFrom(executorInfo.resources());
948 containerConfig.set_directory(directory);
949
950 if (user.isSome()) {
951 containerConfig.set_user(user.get());
952 }
953
954 if (taskInfo.isSome()) {
955 // Command task case.
956 containerConfig.mutable_task_info()->CopyFrom(taskInfo.get());
957
958 if (taskInfo->has_container()) {
959 ContainerInfo* containerInfo = containerConfig.mutable_container_info();
960 containerInfo->CopyFrom(taskInfo->container());
961
962 if (taskInfo->container().mesos().has_image()) {
963 // For command tasks, We need to set the command executor user
964 // as root as it needs to perform chroot (even when
965 // switch_user is set to false).
966 containerConfig.mutable_command_info()->set_user("root");
967 }
968 }
969 } else {
970 // Other cases.
971 if (executorInfo.has_container()) {
972 ContainerInfo* containerInfo = containerConfig.mutable_container_info();
973 containerInfo->CopyFrom(executorInfo.container());
974 }
975 }
976
977 return launch(containerId,
978 containerConfig,
979 environment,
980 slaveId,
981 checkpoint);
982 }
983
984
985 Future<bool> MesosContainerizerProcess::launch(
986 const ContainerID& containerId,
987 const ContainerConfig& containerConfig,
988 const map<string, string>& environment,
989 const SlaveID& slaveId,
990 bool checkpoint)
991 {
992 // Before we launch the container, we first create the container
993 // runtime directory to hold internal checkpoint information about
994 // the container.
995 //
996 // NOTE: This is different than the checkpoint information requested
997 // by the agent via the `checkpoint` parameter. The containerizer
998 // itself uses the runtime directory created here to checkpoint
999 // state for internal use.
1000 const string runtimePath =
1001 containerizer::paths::getRuntimePath(flags.runtime_dir, containerId);
1002
1003 Try<Nothing> mkdir = os::mkdir(runtimePath);
1004 if (mkdir.isError()) {
1005 return Failure(
1006 "Failed to make the containerizer runtime directory"
1007 " '" + runtimePath + "': " + mkdir.error());
1008 }
1009
1010 Owned<Container> container(new Container());
1011 container->state = PROVISIONING;
1012 container->config = containerConfig;
1013 container->resources = containerConfig.resources();
1014 container->directory = containerConfig.directory();
1015
1016 // Maintain the 'children' list in the parent's 'Container' struct,
1017 // which will be used for recursive destroy.
1018 if (containerId.has_parent()) {
1019 CHECK(containers_.contains(containerId.parent()));
1020 containers_[containerId.parent()]->children.insert(containerId);
1021 }
1022
1023 containers_.put(containerId, container);
1024
1025 // We'll first provision the image for the container, and
1026 // then provision the images specified in `volumes` using
1027 // the 'volume/image' isolator.
1028 if (!containerConfig.has_container_info() ||
1029 !containerConfig.container_info().mesos().has_image()) {
1030 return prepare(containerId, None())
1031 .then(defer(self(),
1032 &Self::_launch,
1033 containerId,
1034 environment,
1035 slaveId,
1036 checkpoint));
1037 }
1038
1039 container->provisioning = provisioner->provision(
1040 containerId,
1041 containerConfig.container_info().mesos().image());
1042
1043 return container->provisioning
1044 .then(defer(self(),
1045 [=](const ProvisionInfo& provisionInfo) -> Future<bool> {
1046 return prepare(containerId, provisionInfo)
1047 .then(defer(self(),
1048 &Self::_launch,
1049 containerId,
1050 environment,
1051 slaveId,
1052 checkpoint));
1053 }));
1054 }
1055
1056
1057 Future<Nothing> MesosContainerizerProcess::prepare(
1058 const ContainerID& containerId,
1059 const Option<ProvisionInfo>& provisionInfo)
1060 {
1061 // This is because if a 'destroy' happens during the provisoiner is
1062 // provisioning in '_launch', even if the '____destroy' will wait
1063 // for the 'provision' in '_launch' to finish, there is still a
1064 // chance that '____destroy' and its dependencies finish before
1065 // 'prepare' starts since onAny is not guaranteed to be executed
1066 // in order.
1067 if (!containers_.contains(containerId)) {
1068 return Failure("Container destroyed during provisioning");
1069 }
1070
1071 const Owned<Container>& container = containers_.at(containerId);
1072
1073 // Make sure containerizer is not in DESTROYING state, to avoid
1074 // a possible race that containerizer is destroying the container
1075 // while it is preparing isolators for the container.
1076 if (container->state == DESTROYING) {
1077 return Failure("Container is being destroyed during provisioning");
1078 }
1079
1080 CHECK_EQ(container->state, PROVISIONING);
1081
1082 container->state = PREPARING;
1083
1084 if (provisionInfo.isSome()) {
1085 container->config.set_rootfs(provisionInfo->rootfs);
1086
1087 if (provisionInfo->dockerManifest.isSome() &&
1088 provisionInfo->appcManifest.isSome()) {
1089 return Failure("Container cannot have both Docker and Appc manifests");
1090 }
1091
1092 if (provisionInfo->dockerManifest.isSome()) {
1093 ContainerConfig::Docker* docker = container->config.mutable_docker();
1094 docker->mutable_manifest()->CopyFrom(provisionInfo->dockerManifest.get());
1095 }
1096
1097 if (provisionInfo->appcManifest.isSome()) {
1098 ContainerConfig::Appc* appc = container->config.mutable_appc();
1099 appc->mutable_manifest()->CopyFrom(provisionInfo->appcManifest.get());
1100 }
1101 }
1102
1103 // Captured for lambdas below.
1104 ContainerConfig containerConfig = container->config;
1105
1106 // We prepare the isolators sequentially according to their ordering
1107 // to permit basic dependency specification, e.g., preparing a
1108 // filesystem isolator before other isolators.
1109 Future<list<Option<ContainerLaunchInfo>>> f =
1110 list<Option<ContainerLaunchInfo>>();
1111
1112 foreach (const Owned<Isolator>& isolator, isolators) {
1113 // If this is a nested container, we need to skip isolators that
1114 // do not support nesting.
1115 if (containerId.has_parent() && !isolator->supportsNesting()) {
1116 continue;
1117 }
1118
1119 // Chain together preparing each isolator.
1120 f = f.then([=](list<Option<ContainerLaunchInfo>> launchInfos) {
1121 return isolator->prepare(containerId, containerConfig)
1122 .then([=](const Option<ContainerLaunchInfo>& launchInfo) mutable {
1123 launchInfos.push_back(launchInfo);
1124 return launchInfos;
1125 });
1126 });
1127 }
1128
1129 container->launchInfos = f;
1130
1131 return f.then([]() { return Nothing(); });
1132 }
1133
1134
1135 Future<Nothing> MesosContainerizerProcess::fetch(
1136 const ContainerID& containerId,
1137 const SlaveID& slaveId)
1138 {
1139 if (!containers_.contains(containerId)) {
1140 return Failure("Container destroyed during isolating");
1141 }
1142
1143 const Owned<Container>& container = containers_.at(containerId);
1144
1145 if (container->state == DESTROYING) {
1146 return Failure("Container is being destroyed during isolating");
1147 }
1148
1149 CHECK_EQ(container->state, ISOLATING);
1150
1151 container->state = FETCHING;
1152
1153 const string directory = container->config.directory();
1154
1155 Option<string> user;
1156 if (container->config.has_user()) {
1157 user = container->config.user();
1158 }
1159
1160 return fetcher->fetch(
1161 containerId,
1162 container->config.command_info(),
1163 directory,
1164 user,
1165 slaveId,
1166 flags)
1167 .then([=]() -> Future<Nothing> {
1168 if (HookManager::hooksAvailable()) {
1169 HookManager::slavePostFetchHook(containerId, directory);
1170 }
1171 return Nothing();
1172 });
1173 }
1174
1175
1176 Future<bool> MesosContainerizerProcess::_launch(
1177 const ContainerID& containerId,
1178 const map<string, string>& _environment,
1179 const SlaveID& slaveId,
1180 bool checkpoint)
1181 {
1182 if (!containers_.contains(containerId)) {
1183 return Failure("Container destroyed during preparing");
1184 }
1185
1186 const Owned<Container>& container = containers_.at(containerId);
1187
1188 if (container->state == DESTROYING) {
1189 return Failure("Container is being destroyed during preparing");
1190 }
1191
1192 CHECK_EQ(container->state, PREPARING);
1193
1194 JSON::Object environment;
1195 foreachpair (const string& key, const string& value, _environment) {
1196 environment.values[key] = value;
1197 }
1198
1199 // TODO(jieyu): Consider moving this to 'executorEnvironment' and
1200 // consolidating with docker containerizer.
1201 //
1202 // NOTE: For the command executor case, although it uses the host
1203 // filesystem for itself, we still set 'MESOS_SANDBOX' according to
1204 // the root filesystem of the task (if specified). Command executor
1205 // itself does not use this environment variable.
1206 environment.values["MESOS_SANDBOX"] = container->config.has_rootfs()
1207 ? flags.sandbox_directory
1208 : container->config.directory();
1209
1210 // NOTE: Command task is a special case. Even if the container
1211 // config has a root filesystem, the executor container still uses
1212 // the host filesystem.
1213 Option<string> rootfs;
1214 if (!container->config.has_task_info() &&
1215 container->config.has_rootfs()) {
1216 rootfs = container->config.rootfs();
1217 }
1218
1219 Option<CommandInfo> launchCommand;
1220 Option<string> workingDirectory;
1221 JSON::Array preExecCommands;
1222 Option<CapabilityInfo> capabilities;
1223
1224 // TODO(jieyu): We should use Option here. If no namespace is
1225 // required, we should pass None() to 'launcher->fork'.
1226 int namespaces = 0;
1227
1228 CHECK_READY(container->launchInfos);
1229
1230 foreach (const Option<ContainerLaunchInfo>& launchInfo,
1231 container->launchInfos.get()) {
1232 if (launchInfo.isNone()) {
1233 continue;
1234 }
1235
1236 if (launchInfo->has_environment()) {
1237 foreach (const Environment::Variable& variable,
1238 launchInfo->environment().variables()) {
1239 const string& name = variable.name();
1240 const string& value = variable.value();
1241
1242 if (environment.values.count(name) > 0) {
1243 VLOG(1) << "Overwriting environment variable '"
1244 << name << "', original: '"
1245 << environment.values[name] << "', new: '"
1246 << value << "', for container "
1247 << containerId;
1248 }
1249
1250 environment.values[name] = value;
1251 }
1252 }
1253
1254 if (launchInfo->has_command()) {
1255 // NOTE: 'command' from 'launchInfo' will be merged. It is
1256 // isolators' responsibility to make sure that the merged
1257 // command is a valid command.
1258 if (launchCommand.isSome()) {
1259 VLOG(1) << "Merging launch commands '" << launchCommand.get()
1260 << "' and '" << launchInfo->command()
1261 << "' from two different isolators";
1262
1263 launchCommand->MergeFrom(launchInfo->command());
1264 } else {
1265 launchCommand = launchInfo->command();
1266 }
1267 }
1268
1269 if (launchInfo->has_working_directory()) {
1270 if (workingDirectory.isSome()) {
1271 return Failure(
1272 "At most one working directory can be returned from isolators");
1273 } else {
1274 workingDirectory = launchInfo->working_directory();
1275 }
1276 }
1277
1278 foreach (const CommandInfo& command, launchInfo->pre_exec_commands()) {
1279 preExecCommands.values.emplace_back(JSON::protobuf(command));
1280 }
1281
1282 if (launchInfo->has_namespaces()) {
1283 namespaces |= launchInfo->namespaces();
1284 }
1285
1286 if (launchInfo->has_capabilities()) {
1287 if (capabilities.isSome()) {
1288 return Failure(
1289 "At most one capabilities set can be returned from isolators");
1290 } else {
1291 capabilities = launchInfo->capabilities();
1292 }
1293 }
1294 }
1295
1296 // Determine the launch command for the container.
1297 if (launchCommand.isNone()) {
1298 launchCommand = container->config.command_info();
1299 }
1300
1301 // For the command executor case, we should add the rootfs flag to
1302 // the launch command of the command executor.
1303 // TODO(jieyu): Remove this once we no longer support the old style
1304 // command task (i.e., that uses mesos-execute).
1305 if (container->config.has_task_info() &&
1306 container->config.has_rootfs()) {
1307 CHECK_SOME(launchCommand);
1308 launchCommand->add_arguments(
1309 "--rootfs=" + container->config.rootfs());
1310 }
1311
1312 // TODO(jieyu): 'uris', 'environment' and 'user' in 'launchCommand'
1313 // will be ignored. In fact, the above fields should be moved to
1314 // TaskInfo or ExecutorInfo, instead of putting them in CommandInfo.
1315 launchCommand->clear_uris();
1316 launchCommand->clear_environment();
1317 launchCommand->clear_user();
1318
1319 // Include any enviroment variables from CommandInfo.
1320 foreach (const Environment::Variable& variable,
1321 container->config.command_info().environment().variables()) {
1322 const string& name = variable.name();
1323 const string& value = variable.value();
1324
1325 if (environment.values.count(name) > 0) {
1326 VLOG(1) << "Overwriting environment variable '"
1327 << name << "', original: '"
1328 << environment.values[name] << "', new: '"
1329 << value << "', for container "
1330 << containerId;
1331 }
1332
1333 environment.values[name] = value;
1334 }
1335
1336 // Determine the 'ExecutorInfo' for the logger. If launching a
1337 // top level executor container, use the 'ExecutorInfo' from
1338 // 'ContainerConfig'. If launching a nested container, use the
1339 // 'ExecutorInfo' from its top level parent container.
1340 ExecutorInfo executorInfo;
1341 if (container->config.has_executor_info()) {
1342 // The top level executor container case. The 'ExecutorInfo'
1343 // will always be set in 'ContainerConfig'.
1344 executorInfo = container->config.executor_info();
1345 } else {
1346 // The nested container case. Use the 'ExecutorInfo' from its root
1347 // parent container.
1348 CHECK(containerId.has_parent());
1349 const ContainerID& rootContainerId = getRootContainerId(containerId);
1350 CHECK(containers_.contains(rootContainerId));
1351 CHECK(containers_[rootContainerId]->config.has_executor_info());
1352 executorInfo = containers_[rootContainerId]->config.executor_info();
1353 }
1354
1355 return logger->prepare(
1356 executorInfo,
1357 container->config.directory())
1358 .then(defer(
1359 self(),
1360 [=](const ContainerLogger::SubprocessInfo& subprocessInfo)
1361 -> Future<bool> {
1362 if (!containers_.contains(containerId)) {
1363 return Failure("Container destroyed during preparing");
1364 }
1365
1366 if (containers_.at(containerId)->state == DESTROYING) {
1367 return Failure("Container is being destroyed during preparing");
1368 }
1369
1370 const Owned<Container>& container = containers_.at(containerId);
1371
1372 // Use a pipe to block the child until it's been isolated.
1373 // The `pipes` array is captured later in a lambda.
1374 std::array<int, 2> pipes;
1375
1376 // TODO(jmlvanre): consider returning failure if `pipe` gives an
1377 // error. Currently we preserve the previous logic.
1378 CHECK_SOME(os::pipe(pipes.data()));
1379
1380 // Prepare the flags to pass to the launch process.
1381 MesosContainerizerLaunch::Flags launchFlags;
1382
1383 launchFlags.command = JSON::protobuf(launchCommand.get());
1384 launchFlags.environment = environment;
1385
1386 if (rootfs.isNone()) {
1387 // NOTE: If the executor shares the host filesystem, we should
1388 // not allow them to 'cd' into an arbitrary directory because
1389 // that'll create security issues.
1390 if (workingDirectory.isSome()) {
1391 LOG(WARNING) << "Ignore working directory '" << workingDirectory.get()
1392 << "' specified in container launch info for container "
1393 << containerId << " since the executor is using the "
1394 << "host filesystem";
1395 }
1396
1397 launchFlags.working_directory = container->config.directory();
1398 } else {
1399 launchFlags.working_directory = workingDirectory.isSome()
1400 ? workingDirectory
1401 : flags.sandbox_directory;
1402 }
1403
1404 #ifdef __linux__
1405 // TODO(bbannier): For the case where the user requested
1406 // capabilities, but no capabilities isolation was configured for
1407 // the agent, the master should reject the task.
1408 launchFlags.capabilities = capabilities;
1409 #endif // __linux__
1410
1411 #ifdef __WINDOWS__
1412 if (rootfs.isSome()) {
1413 return Failure(
1414 "`chroot` is not supported on Windows, but the executor "
1415 "specifies a root filesystem.");
1416 }
1417
1418 if (container->config.has_user()) {
1419 return Failure(
1420 "`su` is not supported on Windows, but the executor "
1421 "specifies a user.");
1422 }
1423 #else
1424 launchFlags.rootfs = rootfs;
1425
1426 if (container->config.has_user()) {
1427 launchFlags.user = container->config.user();
1428 }
1429 #endif // __WINDOWS__
1430
1431 #ifndef __WINDOWS__
1432 launchFlags.pipe_read = pipes[0];
1433 launchFlags.pipe_write = pipes[1];
1434 #else
1435 // NOTE: On windows we need to pass `Handle`s between processes, as fds
1436 // are not unique across processes.
1437 launchFlags.pipe_read = os::fd_to_handle(pipes[0]);
1438 launchFlags.pipe_write = os::fd_to_handle(pipes[1]);
1439 #endif // __WINDOWS
1440 launchFlags.pre_exec_commands = preExecCommands;
1441
1442 #ifndef __WINDOWS__
1443 // Set the `runtime_directory` launcher flag so that the launch
1444 // helper knows where to checkpoint the status of the container
1445 // once it exits.
1446 const string runtimePath =
1447 containerizer::paths::getRuntimePath(flags.runtime_dir, containerId);
1448
1449 CHECK(os::exists(runtimePath));
1450
1451 launchFlags.runtime_directory = runtimePath;
1452 #endif // __WINDOWS__
1453
1454 VLOG(1) << "Launching '" << MESOS_CONTAINERIZER << "' with flags '"
1455 << launchFlags << "'";
1456
1457 // Fork the child using launcher.
1458 vector<string> argv(2);
1459 argv[0] = MESOS_CONTAINERIZER;
1460 argv[1] = MesosContainerizerLaunch::NAME;
1461
1462 Try<pid_t> forked = launcher->fork(
1463 containerId,
1464 path::join(flags.launcher_dir, MESOS_CONTAINERIZER),
1465 argv,
1466 Subprocess::FD(STDIN_FILENO),
1467 (local ? Subprocess::FD(STDOUT_FILENO)
1468 : Subprocess::IO(subprocessInfo.out)),
1469 (local ? Subprocess::FD(STDERR_FILENO)
1470 : Subprocess::IO(subprocessInfo.err)),
1471 &launchFlags,
1472 None(),
1473 namespaces); // 'namespaces' will be ignored by PosixLauncher.
1474
1475 if (forked.isError()) {
1476 return Failure("Failed to fork: " + forked.error());
1477 }
1478
1479 pid_t pid = forked.get();
1480 container->pid = pid;
1481
1482 // Checkpoint the forked pid if requested by the agent.
1483 if (checkpoint) {
1484 const string& path = slave::paths::getForkedPidPath(
1485 slave::paths::getMetaRootDir(flags.work_dir),
1486 slaveId,
1487 container->config.executor_info().framework_id(),
1488 container->config.executor_info().executor_id(),
1489 containerId);
1490
1491 LOG(INFO) << "Checkpointing container's forked pid " << pid
1492 << " to '" << path << "'";
1493
1494 Try<Nothing> checkpointed =
1495 slave::state::checkpoint(path, stringify(pid));
1496
1497 if (checkpointed.isError()) {
1498 LOG(ERROR) << "Failed to checkpoint container's forked pid to '"
1499 << path << "': " << checkpointed.error();
1500
1501 return Failure("Could not checkpoint container's pid");
1502 }
1503 }
1504
1505 // Checkpoint the forked pid to the container runtime directory.
1506 //
1507 // NOTE: This checkpoint MUST happen after checkpointing the `pid`
1508 // to the meta directory above. This ensures that there will never
1509 // be a pid checkpointed to the container runtime directory until
1510 // after it has been checkpointed in the agent's meta directory.
1511 // By maintaining this invariant we know that the only way a `pid`
1512 // could ever exist in the runtime directory and NOT in the agent
1513 // meta directory is if the meta directory was wiped clean for
1514 // some reason. As such, we know if we run into this situation
1515 // that it is safe to treat the relevant containers as orphans and
1516 // destroy them.
1517 const string pidPath = path::join(
1518 containerizer::paths::getRuntimePath(flags.runtime_dir, containerId),
1519 containerizer::paths::PID_FILE);
1520
1521 Try<Nothing> checkpointed =
1522 slave::state::checkpoint(pidPath, stringify(pid));
1523
1524 if (checkpointed.isError()) {
1525 return Failure("Failed to checkpoint the container pid to"
1526 " '" + pidPath + "': " + checkpointed.error());
1527 }
1528
1529 // Monitor the forked process's pid. We keep the future because
1530 // we'll refer to it again during container destroy.
1531 container->status = reap(containerId, pid);
1532 container->status->onAny(defer(self(), &Self::reaped, containerId));
1533
1534 return isolate(containerId, pid)
1535 .then(defer(self(),
1536 &Self::fetch,
1537 containerId,
1538 slaveId))
1539 .then(defer(self(), &Self::exec, containerId, pipes[1]))
1540 .onAny([pipes]() { os::close(pipes[0]); })
1541 .onAny([pipes]() { os::close(pipes[1]); });
1542 }));
1543 }
1544
1545
1546 Future<bool> MesosContainerizerProcess::isolate(
1547 const ContainerID& containerId,
1548 pid_t _pid)
1549 {
1550 if (!containers_.contains(containerId)) {
1551 return Failure("Container destroyed during preparing");
1552 }
1553
1554 if (containers_.at(containerId)->state == DESTROYING) {
1555 return Failure("Container is being destroyed during preparing");
1556 }
1557
1558 CHECK_EQ(containers_.at(containerId)->state, PREPARING);
1559
1560 containers_.at(containerId)->state = ISOLATING;
1561
1562 // Set up callbacks for isolator limitations.
1563 foreach (const Owned<Isolator>& isolator, isolators) {
1564 // If this is a nested container, we need to skip isolators that
1565 // do not support nesting.
1566 if (containerId.has_parent() && !isolator->supportsNesting()) {
1567 continue;
1568 }
1569
1570 isolator->watch(containerId)
1571 .onAny(defer(self(), &Self::limited, containerId, lambda::_1));
1572 }
1573
1574 // Isolate the executor with each isolator.
1575 // NOTE: This is done is parallel and is not sequenced like prepare
1576 // or destroy because we assume there are no dependencies in
1577 // isolation.
1578 list<Future<Nothing>> futures;
1579 foreach (const Owned<Isolator>& isolator, isolators) {
1580 // If this is a nested container, we need to skip isolators that
1581 // do not support nesting.
1582 if (containerId.has_parent() && !isolator->supportsNesting()) {
1583 continue;
1584 }
1585
1586 futures.push_back(isolator->isolate(containerId, _pid));
1587 }
1588
1589 // Wait for all isolators to complete.
1590 Future<list<Nothing>> future = collect(futures);
1591
1592 containers_.at(containerId)->isolation = future;
1593
1594 return future.then([]() { return true; });
1595 }
1596
1597
1598 Future<bool> MesosContainerizerProcess::exec(
1599 const ContainerID& containerId,
1600 int pipeWrite)
1601 {
1602 // The container may be destroyed before we exec the executor so
1603 // return failure here.
1604 if (!containers_.contains(containerId)) {
1605 return Failure("Container destroyed during fetching");
1606 }
1607
1608 if (containers_.at(containerId)->state == DESTROYING) {
1609 return Failure("Container is being destroyed during fetching");
1610 }
1611
1612 CHECK_EQ(containers_.at(containerId)->state, FETCHING);
1613
1614 // Now that we've contained the child we can signal it to continue
1615 // by writing to the pipe.
1616 char dummy;
1617 ssize_t length;
1618 while ((length = write(pipeWrite, &dummy, sizeof(dummy))) == -1 &&
1619 errno == EINTR);
1620
1621 if (length != sizeof(dummy)) {
1622 return Failure("Failed to synchronize child process: " +
1623 os::strerror(errno));
1624 }
1625
1626 containers_.at(containerId)->state = RUNNING;
1627
1628 return true;
1629 }
1630
1631
1632 Future<bool> MesosContainerizerProcess::launch(
1633 const ContainerID& containerId,
1634 const CommandInfo& commandInfo,
1635 const Option<ContainerInfo>& containerInfo,
1636 const Option<string>& user,
1637 const SlaveID& slaveId)
1638 {
1639 CHECK(containerId.has_parent());
1640
1641 if (containers_.contains(containerId)) {
1642 return Failure(
1643 "Nested container " + stringify(containerId) + " already started");
1644 }
1645
1646 const ContainerID& parentContainerId = containerId.parent();
1647 if (!containers_.contains(parentContainerId)) {
1648 return Failure(
1649 "Parent container " + stringify(parentContainerId) +
1650 " does not exist");
1651 }
1652
1653 if (containers_[parentContainerId]->state == DESTROYING) {
1654 return Failure(
1655 "Parent container " + stringify(parentContainerId) +
1656 " is in 'DESTROYING' state");
1657 }
1658
1659 LOG(INFO) << "Starting nested container " << containerId;
1660
1661 const ContainerID rootContainerId = getRootContainerId(containerId);
1662
1663 CHECK(containers_.contains(rootContainerId));
1664 if (containers_[rootContainerId]->directory.isNone()) {
1665 return Failure(
1666 "Unexpected empty sandbox directory for root container " +
1667 stringify(rootContainerId));
1668 }
1669
1670 const string directory = containerizer::paths::getSandboxPath(
1671 containers_[rootContainerId]->directory.get(),
1672 containerId);
1673
1674 Try<Nothing> mkdir = os::mkdir(directory);
1675 if (mkdir.isError()) {
1676 return Failure(
1677 "Failed to create nested sandbox directory '" +
1678 directory + "': " + mkdir.error());
1679 }
1680
1681 #ifndef __WINDOWS__
1682 if (user.isSome()) {
1683 LOG(INFO) << "Trying to chown '" << directory << "' to user '"
1684 << user.get() << "'";
1685
1686 Try<Nothing> chown = os::chown(user.get(), directory);
1687 if (chown.isError()) {
1688 LOG(WARNING) << "Failed to chown sandbox directory '" << directory
1689 << "'. This may be due to attempting to run the container "
1690 << "as a nonexistent user on the agent; see the description"
1691 << " for the `--switch_user` flag for more information: "
1692 << chown.error();
1693 }
1694 }
1695 #endif // __WINDOWS__
1696
1697 ContainerConfig containerConfig;
1698 containerConfig.mutable_command_info()->CopyFrom(commandInfo);
1699 containerConfig.set_directory(directory);
1700
1701 if (user.isSome()) {
1702 containerConfig.set_user(user.get());
1703 }
1704
1705 if (containerInfo.isSome()) {
1706 containerConfig.mutable_container_info()->CopyFrom(containerInfo.get());
1707 }
1708
1709 return launch(containerId,
1710 containerConfig,
1711 map<string, string>(),
1712 slaveId,
1713 false);
1714 }
1715
1716
1717 Future<Option<ContainerTermination>> MesosContainerizerProcess::wait(
1718 const ContainerID& containerId)
1719 {
1720 if (!containers_.contains(containerId)) {
1721 // If a container does not exist in our `container_` hashmap, it
1722 // may be a nested container with checkpointed termination
1723 // state. Attempt to return as such.
1724 if (containerId.has_parent()) {
1725 Result<ContainerTermination> termination =
1726 containerizer::paths::getContainerTermination(
1727 flags.runtime_dir,
1728 containerId);
1729
1730 if (termination.isError()) {
1731 return Failure("Failed to get container termination state:"
1732 " " + termination.error());
1733 }
1734
1735 if (termination.isSome()) {
1736 return termination.get();
1737 }
1738 }
1739
1740 // For all other cases return `None()`. See the comments in
1741 // `destroy()` for race conditions which lead to "unknown
1742 // containers".
1743 return None();
1744 }
1745
1746 return containers_.at(containerId)->termination.future()
1747 .then(Option<ContainerTermination>::some);
1748 }
1749
1750
1751 Future<Nothing> MesosContainerizerProcess::update(
1752 const ContainerID& containerId,
1753 const Resources& resources)
1754 {
1755 CHECK(!containerId.has_parent());
1756
1757 if (!containers_.contains(containerId)) {
1758 // It is not considered a failure if the container is not known
1759 // because the slave will attempt to update the container's
1760 // resources on a task's terminal state change but the executor
1761 // may have already exited and the container cleaned up.
1762 LOG(WARNING) << "Ignoring update for unknown container " << containerId;
1763 return Nothing();
1764 }
1765
1766 const Owned<Container>& container = containers_.at(containerId);
1767
1768 if (container->state == DESTROYING) {
1769 LOG(WARNING) << "Ignoring update for currently being destroyed "
1770 << "container " << containerId;
1771 return Nothing();
1772 }
1773
1774 // NOTE: We update container's resources before isolators are updated
1775 // so that subsequent containerizer->update can be handled properly.
1776 container->resources = resources;
1777
1778 // Update each isolator.
1779 list<Future<Nothing>> futures;
1780 foreach (const Owned<Isolator>& isolator, isolators) {
1781 // NOTE: No need to skip non-nesting aware isolator here because
1782 // 'update' currently will not be called for nested container.
1783 futures.push_back(isolator->update(containerId, resources));
1784 }
1785
1786 // Wait for all isolators to complete.
1787 return collect(futures)
1788 .then([]() { return Nothing(); });
1789 }
1790
1791
1792 // Resources are used to set the limit fields in the statistics but
1793 // are optional because they aren't known after recovery until/unless
1794 // update() is called.
1795 Future<ResourceStatistics> _usage(
1796 const ContainerID& containerId,
1797 const Option<Resources>& resources,
1798 const list<Future<ResourceStatistics>>& statistics)
1799 {
1800 CHECK(!containerId.has_parent());
1801
1802 ResourceStatistics result;
1803
1804 // Set the timestamp now we have all statistics.
1805 result.set_timestamp(Clock::now().secs());
1806
1807 foreach (const Future<ResourceStatistics>& statistic, statistics) {
1808 if (statistic.isReady()) {
1809 result.MergeFrom(statistic.get());
1810 } else {
1811 LOG(WARNING) << "Skipping resource statistic for container "
1812 << containerId << " because: "
1813 << (statistic.isFailed() ? statistic.failure()
1814 : "discarded");
1815 }
1816 }
1817
1818 if (resources.isSome()) {
1819 // Set the resource allocations.
1820 Option<Bytes> mem = resources.get().mem();
1821 if (mem.isSome()) {
1822 result.set_mem_limit_bytes(mem.get().bytes());
1823 }
1824
1825 Option<double> cpus = resources.get().cpus();
1826 if (cpus.isSome()) {
1827 result.set_cpus_limit(cpus.get());
1828 }
1829 }
1830
1831 return result;
1832 }
1833
1834
1835 Future<ResourceStatistics> MesosContainerizerProcess::usage(
1836 const ContainerID& containerId)
1837 {
1838 CHECK(!containerId.has_parent());
1839
1840 if (!containers_.contains(containerId)) {
1841 return Failure("Unknown container " + stringify(containerId));
1842 }
1843
1844 list<Future<ResourceStatistics>> futures;
1845 foreach (const Owned<Isolator>& isolator, isolators) {
1846 // NOTE: No need to skip non-nesting aware isolator here because
1847 // 'update' currently will not be called for nested container.
1848 futures.push_back(isolator->usage(containerId));
1849 }
1850
1851 // Use await() here so we can return partial usage statistics.
1852 // TODO(idownes): After recovery resources won't be known until
1853 // after an update() because they aren't part of the SlaveState.
1854 return await(futures)
1855 .then(lambda::bind(
1856 _usage,
1857 containerId,
1858 containers_.at(containerId)->resources,
1859 lambda::_1));
1860 }
1861
1862
1863 Future<ContainerStatus> _status(
1864 const ContainerID& containerId,
1865 const list<Future<ContainerStatus>>& statuses)
1866 {
1867 ContainerStatus result;
1868
1869 foreach (const Future<ContainerStatus>& status, statuses) {
1870 if (status.isReady()) {
1871 result.MergeFrom(status.get());
1872 } else {
1873 LOG(WARNING) << "Skipping status for container "
1874 << containerId << " because: "
1875 << (status.isFailed() ? status.failure()
1876 : "discarded");
1877 }
1878 }
1879
1880 VLOG(2) << "Aggregating status for container " << containerId;
1881
1882 return result;
1883 }
1884
1885
1886 Future<ContainerStatus> MesosContainerizerProcess::status(
1887 const ContainerID& containerId)
1888 {
1889 if (!containers_.contains(containerId)) {
1890 return Failure("Unknown container: " + stringify(containerId));
1891 }
1892
1893 list<Future<ContainerStatus>> futures;
1894 foreach (const Owned<Isolator>& isolator, isolators) {
1895 // If this is a nested container, we need to skip isolators that
1896 // do not support nesting.
1897 if (containerId.has_parent() && !isolator->supportsNesting()) {
1898 continue;
1899 }
1900
1901 futures.push_back(isolator->status(containerId));
1902 }
1903 futures.push_back(launcher->status(containerId));
1904
1905 // We are using `await` here since we are interested in partial
1906 // results from calls to `isolator->status`. We also need to
1907 // serialize the invocation to `await` in order to maintain the
1908 // order of requests for `ContainerStatus` by the agent. See
1909 // MESOS-4671 for more details.
1910 VLOG(2) << "Serializing status request for container " << containerId;
1911
1912 return containers_.at(containerId)->sequence.add<ContainerStatus>(
1913 [=]() -> Future<ContainerStatus> {
1914 return await(futures)
1915 .then(lambda::bind(_status, containerId, lambda::_1));
1916 });
1917 }
1918
1919
1920 Future<bool> MesosContainerizerProcess::destroy(
1921 const ContainerID& containerId)
1922 {
1923 if (!containers_.contains(containerId)) {
1924 // This can happen due to the race between destroys initiated by
1925 // the launch failure, the terminated executor and the agent so
1926 // the same container is destroyed multiple times in reaction to
1927 // one failure. e.g., a stuck fetcher results in:
1928 // - The agent invoking destroy(), which kills the fetcher and
1929 // the executor.
1930 // - The agent invoking destroy() again for the failed launch
1931 // (due to the fetcher getting killed).
1932 // - The containerizer invoking destroy() for the reaped executor.
1933 //
1934 // The guard here and `if (container->state == DESTROYING)` below
1935 // make sure redundant destroys short-circuit.
1936
1937 // TODO(bmahler): Currently the agent does not log destroy
1938 // failures or unknown containers, so we log it here for now.
1939 // Move this logging into the callers.
1940 LOG(WARNING) << "Attempted to destroy unknown container " << containerId;
1941
1942 return false;
1943 }
1944
1945 const Owned<Container>& container = containers_.at(containerId);
1946
1947 if (container->state == DESTROYING) {
1948 return container->termination.future()
1949 .then([]() { return true; });
1950 }
1951
1952 LOG(INFO) << "Destroying container " << containerId << " in "
1953 << container->state << " state";
1954
1955 // NOTE: We save the preivous state so that '_destroy' can properly
1956 // cleanup based on the previous state of the container.
1957 State previousState = container->state;
1958
1959 container->state = DESTROYING;
1960
1961 list<Future<bool>> destroys;
1962 foreach (const ContainerID& child, container->children) {
1963 destroys.push_back(destroy(child));
1964 }
1965
1966 await(destroys)
1967 .then(defer(self(), [=](const list<Future<bool>>& futures) {
1968 _destroy(containerId, previousState, futures);
1969 return Nothing();
1970 }));
1971
1972 return container->termination.future()
1973 .then([]() { return true; });
1974 }
1975
1976
1977 void MesosContainerizerProcess::_destroy(
1978 const ContainerID& containerId,
1979 const State& previousState,
1980 const list<Future<bool>>& destroys)
1981 {
1982 CHECK(containers_.contains(containerId));
1983
1984 const Owned<Container>& container = containers_[containerId];
1985
1986 CHECK_EQ(container->state, DESTROYING);
1987
1988 vector<string> errors;
1989 foreach (const Future<bool>& future, destroys) {
1990 if (!future.isReady()) {
1991 errors.push_back(future.isFailed()
1992 ? future.failure()
1993 : "discarded");
1994 }
1995 }
1996
1997 if (!errors.empty()) {
1998 container->termination.fail(
1999 "Failed to destroy nested containers: " +
2000 strings::join("; ", errors));
2001
2002 ++metrics.container_destroy_errors;
2003 return;
2004 }
2005
2006 if (previousState == PROVISIONING) {
2007 VLOG(1) << "Waiting for the provisioner to complete provisioning "
2008 << "before destroying container " << containerId;
2009
2010 // Wait for the provisioner to finish provisioning before we
2011 // start destroying the container.
2012 container->provisioning
2013 .onAny(defer(
2014 self(),
2015 &Self::_____destroy,
2016 containerId,
2017 list<Future<Nothing>>()));
2018
2019 return;
2020 }
2021
2022 if (previousState == PREPARING) {
2023 VLOG(1) << "Waiting for the isolators to complete preparing "
2024 << "before destroying container " << containerId;
2025
2026 // We need to wait for the isolators to finish preparing to
2027 // prevent a race that the destroy method calls the 'cleanup'
2028 // method of an isolator before the 'prepare' method is called.
2029 //
2030 // NOTE: It's likely that the launcher already forked the
2031 // container. However, since we change the state to 'DESTROYING',
2032 // the 'isolate()' will fail, causing the control pipes being
2033 // closed. The container will terminate itself. Therefore, we
2034 // should wait for the container to terminate before we start to
2035 // cleanup isolators.
2036 await(container->launchInfos,
2037 container->status.isSome()
2038 ? container->status.get()
2039 : None())
2040 .onAny(defer(self(), &Self::____destroy, containerId));
2041
2042 return;
2043 }
2044
2045 if (previousState == ISOLATING) {
2046 VLOG(1) << "Waiting for the isolators to complete isolation "
2047 << "before destroying container " << containerId;
2048
2049 // Wait for the isolators to finish isolating before we start
2050 // to destroy the container.
2051 container->isolation
2052 .onAny(defer(self(), &Self::__destroy, containerId));
2053
2054 return;
2055 }
2056
2057 // Either RUNNING or FETCHING at this point.
2058 if (previousState == FETCHING) {
2059 fetcher->kill(containerId);
2060 }
2061
2062 __destroy(containerId);
2063 }
2064
2065
2066 void MesosContainerizerProcess::__destroy(
2067 const ContainerID& containerId)
2068 {
2069 CHECK(containers_.contains(containerId));
2070
2071 // Kill all processes then continue destruction.
2072 launcher->destroy(containerId)
2073 .onAny(defer(self(), &Self::___destroy, containerId, lambda::_1));
2074 }
2075
2076
2077 void MesosContainerizerProcess::___destroy(
2078 const ContainerID& containerId,
2079 const Future<Nothing>& future)
2080 {
2081 CHECK(containers_.contains(containerId));
2082
2083 const Owned<Container>& container = containers_.at(containerId);
2084
2085 // Something has gone wrong and the launcher wasn't able to kill all
2086 // the processes in the container. We cannot clean up the isolators
2087 // because they may require that all processes have exited so just
2088 // return the failure to the slave.
2089 // TODO(idownes): This is a pretty bad state to be in but we should
2090 // consider cleaning up here.
2091 if (!future.isReady()) {
2092 container->termination.fail(
2093 "Failed to kill all processes in the container: " +
2094 (future.isFailed() ? future.failure() : "discarded future"));
2095
2096 ++metrics.container_destroy_errors;
2097 return;
2098 }
2099
2100 // We've successfully killed all processes in the container so get
2101 // the exit status of the executor when it's ready (it may already
2102 // be) and continue the destroy.
2103 CHECK_SOME(container->status);
2104
2105 container->status.get()
2106 .onAny(defer(self(), &Self::____destroy, containerId));
2107 }
2108
2109
2110 void MesosContainerizerProcess::____destroy(
2111 const ContainerID& containerId)
2112 {
2113 CHECK(containers_.contains(containerId));
2114
2115 cleanupIsolators(containerId)
2116 .onAny(defer(self(), &Self::_____destroy, containerId, lambda::_1));
2117 }
2118
2119
2120 void MesosContainerizerProcess::_____destroy(
2121 const ContainerID& containerId,
2122 const Future<list<Future<Nothing>>>& cleanups)
2123 {
2124 // This should not occur because we only use the Future<list> to
2125 // facilitate chaining.
2126 CHECK_READY(cleanups);
2127 CHECK(containers_.contains(containerId));
2128
2129 const Owned<Container>& container = containers_.at(containerId);
2130
2131 // Check cleanup succeeded for all isolators. If not, we'll fail the
2132 // container termination.
2133 vector<string> errors;
2134 foreach (const Future<Nothing>& cleanup, cleanups.get()) {
2135 if (!cleanup.isReady()) {
2136 errors.push_back(cleanup.isFailed()
2137 ? cleanup.failure()
2138 : "discarded");
2139 }
2140 }
2141
2142 if (!errors.empty()) {
2143 container->termination.fail(
2144 "Failed to clean up an isolator when destroying container: " +
2145 strings::join("; ", errors));
2146
2147 ++metrics.container_destroy_errors;
2148 return;
2149 }
2150
2151 provisioner->destroy(containerId)
2152 .onAny(defer(self(), &Self::______destroy, containerId, lambda::_1));
2153 }
2154
2155
2156 void MesosContainerizerProcess::______destroy(
2157 const ContainerID& containerId,
2158 const Future<bool>& destroy)
2159 {
2160 CHECK(containers_.contains(containerId));
2161
2162 const Owned<Container>& container = containers_.at(containerId);
2163
2164 if (!destroy.isReady()) {
2165 container->termination.fail(
2166 "Failed to destroy the provisioned rootfs when destroying container: " +
2167 (destroy.isFailed() ? destroy.failure() : "discarded future"));
2168
2169 ++metrics.container_destroy_errors;
2170 return;
2171 }
2172
2173 ContainerTermination termination;
2174
2175 if (container->status.isSome() &&
2176 container->status->isReady() &&
2177 container->status->get().isSome()) {
2178 termination.set_status(container->status->get().get());
2179 }
2180
2181 // NOTE: We may not see a limitation in time for it to be
2182 // registered. This could occur if the limitation (e.g., an OOM)
2183 // killed the executor and we triggered destroy() off the executor
2184 // exit.
2185 if (!container->limitations.empty()) {
2186 termination.set_state(TaskState::TASK_FAILED);
2187
2188 // We concatenate the messages if there are multiple limitations.
2189 vector<string> messages;
2190
2191 foreach (const ContainerLimitation& limitation, container->limitations) {
2192 messages.push_back(limitation.message());
2193
2194 if (limitation.has_reason()) {
2195 termination.add_reasons(limitation.reason());
2196 }
2197 }
2198
2199 termination.set_message(strings::join("; ", messages));
2200 }
2201
2202 // Now that we are done destroying the container we need to cleanup
2203 // it's runtime directory. There are two cases to consider:
2204 //
2205 // (1) We are a nested container:
2206 // In this case we should defer deletion of the runtime directory
2207 // until the top-level container is destroyed. Instead, we
2208 // checkpoint a file with the termination state indicating that
2209 // the container has already been destroyed. This allows
2210 // subsequent calls to `wait()` to succeed with the proper
2211 // termination state until the top-level container is destroyed.
2212 // It also prevents subsequent `destroy()` calls from attempting
2213 // to cleanup the container a second time.
2214 //
2215 // (2) We are a top-level container:
2216 // We should simply remove the runtime directory. Since we build
2217 // the runtime directories of nested containers hierarchically,
2218 // removing the top-level runtime directory will automatically
2219 // cleanup all nested container runtime directories as well.
2220 //
2221 // NOTE: The runtime directory will not exist for legacy containers,
2222 // so we need to make sure it actually exists before attempting to
2223 // remove it.
2224 const string runtimePath =
2225 containerizer::paths::getRuntimePath(flags.runtime_dir, containerId);
2226
2227 if (containerId.has_parent()) {
2228 const string terminationPath =
2229 path::join(runtimePath, containerizer::paths::TERMINATION_FILE);
2230
2231 LOG(INFO) << "Checkpointing termination state to nested container's"
2232 << " runtime directory '" << terminationPath << "'";
2233
2234 Try<Nothing> checkpointed =
2235 slave::state::checkpoint(terminationPath, termination);
2236
2237 if (checkpointed.isError()) {
2238 LOG(ERROR) << "Failed to checkpoint nested container's termination state"
2239 << " to '" << terminationPath << "': " << checkpointed.error();
2240 }
2241 } else if (os::exists(runtimePath)) {
2242 Try<Nothing> rmdir = os::rmdir(runtimePath);
2243 if (rmdir.isError()) {
2244 LOG(WARNING) << "Failed to remove the runtime directory"
2245 << " for container " << containerId
2246 << ": " << rmdir.error();
2247 }
2248 }
2249
2250 container->termination.set(termination);
2251
2252 if (containerId.has_parent()) {
2253 CHECK(containers_.contains(containerId.parent()));
2254 CHECK(containers_[containerId.parent()]->children.contains(containerId));
2255 containers_[containerId.parent()]->children.erase(containerId);
2256 }
2257
2258 containers_.erase(containerId);
2259 }
2260
2261
2262 Future<Option<int>> MesosContainerizerProcess::reap(
2263 const ContainerID& containerId,
2264 pid_t pid)
2265 {
2266 #ifdef __WINDOWS__
2267 // We currently don't checkpoint the wait status on windows so
2268 // just return the reaped status directly.
2269 return process::reap(pid);
2270 #else
2271 return process::reap(pid)
2272 .then(defer(self(), [=](const Option<int>& status) -> Future<Option<int>> {
2273 // Determine if we just reaped a legacy container or a
2274 // non-legacy container. We do this by checking for the
2275 // existence of the container runtime directory (which only
2276 // exists for new (i.e. non-legacy) containers). If it is a
2277 // legacy container, we simply forward the reaped exit status
2278 // back to the caller.
2279 const string runtimePath =
2280 containerizer::paths::getRuntimePath(flags.runtime_dir, containerId);
2281
2282 if (!os::exists(runtimePath)) {
2283 return status;
2284 }
2285
2286 // If we are a non-legacy container, attempt to reap the
2287 // container status from the checkpointed status file.
2288 Result<int> containerStatus =
2289 containerizer::paths::getContainerStatus(
2290 flags.runtime_dir,
2291 containerId);
2292
2293 if (containerStatus.isError()) {
2294 return Failure("Failed to get container status: " +
2295 containerStatus.error());
2296 } else if (containerStatus.isSome()) {
2297 return containerStatus.get();
2298 }
2299
2300 // If there isn't a container status file or it is empty, then the
2301 // init process must have been interrupted by a SIGKILL before
2302 // it had a chance to write the file. Return as such.
2303 return W_EXITCODE(0, SIGKILL);
2304 }));
2305 #endif // __WINDOWS__
2306 }
2307
2308
2309 void MesosContainerizerProcess::reaped(const ContainerID& containerId)
2310 {
2311 if (!containers_.contains(containerId)) {
2312 return;
2313 }
2314
2315 LOG(INFO) << "Container " << containerId << " has exited";
2316
2317 // The executor has exited so destroy the container.
2318 destroy(containerId);
2319 }
2320
2321
2322 void MesosContainerizerProcess::limited(
2323 const ContainerID& containerId,
2324 const Future<ContainerLimitation>& future)
2325 {
2326 if (!containers_.contains(containerId) ||
2327 containers_.at(containerId)->state == DESTROYING) {
2328 return;
2329 }
2330
2331 if (future.isReady()) {
2332 LOG(INFO) << "Container " << containerId << " has reached its limit for"
2333 << " resource " << future.get().resources()
2334 << " and will be terminated";
2335
2336 containers_.at(containerId)->limitations.push_back(future.get());
2337 } else {
2338 // TODO(idownes): A discarded future will not be an error when
2339 // isolators discard their promises after cleanup.
2340 LOG(ERROR) << "Error in a resource limitation for container "
2341 << containerId << ": " << (future.isFailed() ? future.failure()
2342 : "discarded");
2343 }
2344
2345 // The container has been affected by the limitation so destroy it.
2346 destroy(containerId);
2347 }
2348
2349
2350 Future<hashset<ContainerID>> MesosContainerizerProcess::containers()
2351 {
2352 return containers_.keys();
2353 }
2354
2355
2356 MesosContainerizerProcess::Metrics::Metrics()
2357 : container_destroy_errors(
2358 "containerizer/mesos/container_destroy_errors")
2359 {
2360 process::metrics::add(container_destroy_errors);
2361 }
2362
2363
2364 MesosContainerizerProcess::Metrics::~Metrics()
2365 {
2366 process::metrics::remove(container_destroy_errors);
2367 }
2368
2369
2370 Future<list<Future<Nothing>>> MesosContainerizerProcess::cleanupIsolators(
2371 const ContainerID& containerId)
2372 {
2373 Future<list<Future<Nothing>>> f = list<Future<Nothing>>();
2374
2375 // NOTE: We clean up each isolator in the reverse order they were
2376 // prepared (see comment in prepare()).
2377 foreach (const Owned<Isolator>& isolator, adaptor::reverse(isolators)) {
2378 // If this is a nested container, we need to skip isolators that
2379 // do not support nesting.
2380 if (containerId.has_parent() && !isolator->supportsNesting()) {
2381 continue;
2382 }
2383
2384 // We'll try to clean up all isolators, waiting for each to
2385 // complete and continuing if one fails.
2386 // TODO(jieyu): Technically, we cannot bind 'isolator' here
2387 // because the ownership will be transferred after the bind.
2388 f = f.then([=](list<Future<Nothing>> cleanups) {
2389 // Accumulate but do not propagate any failure.
2390 Future<Nothing> cleanup = isolator->cleanup(containerId);
2391 cleanups.push_back(cleanup);
2392
2393 // Wait for the cleanup to complete/fail before returning the
2394 // list. We use await here to asynchronously wait for the
2395 // isolator to complete then return cleanups.
2396 return await(list<Future<Nothing>>({cleanup}))
2397 .then([cleanups]() -> Future<list<Future<Nothing>>> {
2398 return cleanups;
2399 });
2400 });
2401 }
2402
2403 return f;
2404 }
2405
2406
2407 std::ostream& operator<<(
2408 std::ostream& stream,
2409 const MesosContainerizerProcess::State& state)
2410 {
2411 switch (state) {
2412 case MesosContainerizerProcess::PROVISIONING:
2413 return stream << "PROVISIONING";
2414 case MesosContainerizerProcess::PREPARING:
2415 return stream << "PREPARING";
2416 case MesosContainerizerProcess::ISOLATING:
2417 return stream << "ISOLATING";
2418 case MesosContainerizerProcess::FETCHING:
2419 return stream << "FETCHING";
2420 case MesosContainerizerProcess::RUNNING:
2421 return stream << "RUNNING";
2422 case MesosContainerizerProcess::DESTROYING:
2423 return stream << "DESTROYING";
2424 default:
2425 UNREACHABLE();
2426 }
2427 };
2428
2429 } // namespace slave {
2430 } // namespace internal {
2431 } // namespace mesos {