diff --git a/client/driver/plugins/executor_linux.go b/client/driver/plugins/executor_linux.go index 776d0af12..152397667 100644 --- a/client/driver/plugins/executor_linux.go +++ b/client/driver/plugins/executor_linux.go @@ -14,12 +14,13 @@ import ( "time" "github.com/hashicorp/go-multierror" - //"github.com/opencontainers/runc/libcontainer/cgroups" - //cgroupFs "github.com/opencontainers/runc/libcontainer/cgroups/fs" - //"github.com/opencontainers/runc/libcontainer/cgroups/systemd" - //cgroupConfig "github.com/opencontainers/runc/libcontainer/configs" + "github.com/opencontainers/runc/libcontainer/cgroups" + cgroupFs "github.com/opencontainers/runc/libcontainer/cgroups/fs" + "github.com/opencontainers/runc/libcontainer/cgroups/systemd" + cgroupConfig "github.com/opencontainers/runc/libcontainer/configs" "github.com/hashicorp/nomad/client/allocdir" + "github.com/hashicorp/nomad/nomad/structs" ) var ( @@ -41,7 +42,7 @@ type LinuxExecutor struct { cmd exec.Cmd ctx *ExecutorContext - //groups *cgroupConfig.Cgroup + groups *cgroupConfig.Cgroup taskDir string logger *log.Logger @@ -86,10 +87,23 @@ func (e *LinuxExecutor) LaunchCmd(command *ExecCommand, ctx *ExecutorContext) (* e.configureChroot() + if err := e.configureCgroups(e.ctx.Task.Resources); err != nil { + return nil, fmt.Errorf("error creating cgroups: %v", err) + } + if err := e.cmd.Start(); err != nil { return nil, fmt.Errorf("error starting command: %v", err) } + manager := e.getCgroupManager(e.groups) + if err := manager.Apply(e.cmd.Process.Pid); err != nil { + e.logger.Printf("[ERROR] unable to join cgroup: %v", err) + if err := e.Exit(); err != nil { + e.logger.Printf("[ERROR] unable to kill process: %v", err) + } + return nil, err + } + return &ProcessState{Pid: e.cmd.Process.Pid, ExitCode: -1, Time: time.Now()}, nil } @@ -244,15 +258,18 @@ func (e *LinuxExecutor) Wait() (*ProcessState, error) { } } e.cleanTaskDir() + e.destroyCgroup() return &ProcessState{Pid: 0, ExitCode: exitCode, Time: time.Now()}, nil } func (e *LinuxExecutor) Exit() error { + e.logger.Printf("[INFO] Exiting plugin for task %q", e.ctx.Task.Name) proc, err := os.FindProcess(e.cmd.Process.Pid) if err != nil { return fmt.Errorf("failied to find user process %v: %v", e.cmd.Process.Pid, err) } e.cleanTaskDir() + e.destroyCgroup() return proc.Kill() } @@ -266,3 +283,92 @@ func (e *LinuxExecutor) ShutDown() error { } return proc.Signal(os.Interrupt) } + +// configureCgroups converts a Nomad Resources specification into the equivalent +// cgroup configuration. It returns an error if the resources are invalid. +func (e *LinuxExecutor) configureCgroups(resources *structs.Resources) error { + e.groups = &cgroupConfig.Cgroup{} + e.groups.Resources = &cgroupConfig.Resources{} + e.groups.Name = structs.GenerateUUID() + + // TODO: verify this is needed for things like network access + e.groups.Resources.AllowAllDevices = true + + if resources.MemoryMB > 0 { + // Total amount of memory allowed to consume + e.groups.Resources.Memory = int64(resources.MemoryMB * 1024 * 1024) + // Disable swap to avoid issues on the machine + e.groups.Resources.MemorySwap = int64(-1) + } + + if resources.CPU < 2 { + return fmt.Errorf("resources.CPU must be equal to or greater than 2: %v", resources.CPU) + } + + // Set the relative CPU shares for this cgroup. + e.groups.Resources.CpuShares = int64(resources.CPU) + + if resources.IOPS != 0 { + // Validate it is in an acceptable range. + if resources.IOPS < 10 || resources.IOPS > 1000 { + return fmt.Errorf("resources.IOPS must be between 10 and 1000: %d", resources.IOPS) + } + + e.groups.Resources.BlkioWeight = uint16(resources.IOPS) + } + + return nil +} + +// destroyCgroup kills all processes in the cgroup and removes the cgroup +// configuration from the host. +func (e *LinuxExecutor) destroyCgroup() error { + if e.groups == nil { + return fmt.Errorf("Can't destroy: cgroup configuration empty") + } + + // Prevent a race between Wait/ForceStop + e.lock.Lock() + defer e.lock.Unlock() + + manager := e.getCgroupManager(e.groups) + pids, err := manager.GetPids() + if err != nil { + return fmt.Errorf("Failed to get pids in the cgroup %v: %v", e.groups.Name, err) + } + + errs := new(multierror.Error) + for _, pid := range pids { + process, err := os.FindProcess(pid) + if err != nil { + multierror.Append(errs, fmt.Errorf("Failed to find Pid %v: %v", pid, err)) + continue + } + + if err := process.Kill(); err != nil && err.Error() != "os: process already finished" { + multierror.Append(errs, fmt.Errorf("Failed to kill Pid %v: %v", pid, err)) + continue + } + } + + // Remove the cgroup. + if err := manager.Destroy(); err != nil { + multierror.Append(errs, fmt.Errorf("Failed to delete the cgroup directories: %v", err)) + } + + if len(errs.Errors) != 0 { + return fmt.Errorf("Failed to destroy cgroup: %v", errs) + } + + return nil +} + +// getCgroupManager returns the correct libcontainer cgroup manager. +func (e *LinuxExecutor) getCgroupManager(groups *cgroupConfig.Cgroup) cgroups.Manager { + var manager cgroups.Manager + manager = &cgroupFs.Manager{Cgroups: groups} + if systemd.UseSystemd() { + manager = &systemd.Manager{Cgroups: groups} + } + return manager +}