open-nomad/drivers/exec/driver.go
Seth Hoenig 2e5c6de820 client: enable support for cgroups v2
This PR introduces support for using Nomad on systems with cgroups v2 [1]
enabled as the cgroups controller mounted on /sys/fs/cgroups. Newer Linux
distros like Ubuntu 21.10 are shipping with cgroups v2 only, causing problems
for Nomad users.

Nomad mostly "just works" with cgroups v2 due to the indirection via libcontainer,
but not so for managing cpuset cgroups. Before, Nomad has been making use of
a feature in v1 where a PID could be a member of more than one cgroup. In v2
this is no longer possible, and so the logic around computing cpuset values
must be modified. When Nomad detects v2, it manages cpuset values in-process,
rather than making use of cgroup heirarchy inheritence via shared/reserved
parents.

Nomad will only activate the v2 logic when it detects cgroups2 is mounted at
/sys/fs/cgroups. This means on systems running in hybrid mode with cgroups2
mounted at /sys/fs/cgroups/unified (as is typical) Nomad will continue to
use the v1 logic, and should operate as before. Systems that do not support
cgroups v2 are also not affected.

When v2 is activated, Nomad will create a parent called nomad.slice (unless
otherwise configured in Client conifg), and create cgroups for tasks using
naming convention <allocID>-<task>.scope. These follow the naming convention
set by systemd and also used by Docker when cgroups v2 is detected.

Client nodes now export a new fingerprint attribute, unique.cgroups.version
which will be set to 'v1' or 'v2' to indicate the cgroups regime in use by
Nomad.

The new cpuset management strategy fixes #11705, where docker tasks that
spawned processes on startup would "leak". In cgroups v2, the PIDs are
started in the cgroup they will always live in, and thus the cause of
the leak is eliminated.

[1] https://www.kernel.org/doc/html/latest/admin-guide/cgroup-v2.html

Closes #11289
Fixes #11705 #11773 #11933
2022-03-23 11:35:27 -05:00

694 lines
20 KiB
Go

package exec
import (
"context"
"fmt"
"os"
"path/filepath"
"runtime"
"sync"
"time"
"github.com/hashicorp/consul-template/signals"
hclog "github.com/hashicorp/go-hclog"
"github.com/hashicorp/nomad/client/lib/cgutil"
"github.com/hashicorp/nomad/drivers/shared/capabilities"
"github.com/hashicorp/nomad/drivers/shared/eventer"
"github.com/hashicorp/nomad/drivers/shared/executor"
"github.com/hashicorp/nomad/drivers/shared/resolvconf"
"github.com/hashicorp/nomad/helper"
"github.com/hashicorp/nomad/helper/pluginutils/loader"
"github.com/hashicorp/nomad/plugins/base"
"github.com/hashicorp/nomad/plugins/drivers"
"github.com/hashicorp/nomad/plugins/drivers/utils"
"github.com/hashicorp/nomad/plugins/shared/hclspec"
pstructs "github.com/hashicorp/nomad/plugins/shared/structs"
)
const (
// pluginName is the name of the plugin
pluginName = "exec"
// fingerprintPeriod is the interval at which the driver will send fingerprint responses
fingerprintPeriod = 30 * time.Second
// taskHandleVersion is the version of task handle which this driver sets
// and understands how to decode driver state
taskHandleVersion = 1
)
var (
// PluginID is the exec plugin metadata registered in the plugin
// catalog.
PluginID = loader.PluginID{
Name: pluginName,
PluginType: base.PluginTypeDriver,
}
// PluginConfig is the exec driver factory function registered in the
// plugin catalog.
PluginConfig = &loader.InternalPluginConfig{
Config: map[string]interface{}{},
Factory: func(ctx context.Context, l hclog.Logger) interface{} { return NewExecDriver(ctx, l) },
}
// pluginInfo is the response returned for the PluginInfo RPC
pluginInfo = &base.PluginInfoResponse{
Type: base.PluginTypeDriver,
PluginApiVersions: []string{drivers.ApiVersion010},
PluginVersion: "0.1.0",
Name: pluginName,
}
// configSpec is the hcl specification returned by the ConfigSchema RPC
configSpec = hclspec.NewObject(map[string]*hclspec.Spec{
"no_pivot_root": hclspec.NewDefault(
hclspec.NewAttr("no_pivot_root", "bool", false),
hclspec.NewLiteral("false"),
),
"default_pid_mode": hclspec.NewDefault(
hclspec.NewAttr("default_pid_mode", "string", false),
hclspec.NewLiteral(`"private"`),
),
"default_ipc_mode": hclspec.NewDefault(
hclspec.NewAttr("default_ipc_mode", "string", false),
hclspec.NewLiteral(`"private"`),
),
"allow_caps": hclspec.NewDefault(
hclspec.NewAttr("allow_caps", "list(string)", false),
hclspec.NewLiteral(capabilities.HCLSpecLiteral),
),
})
// taskConfigSpec is the hcl specification for the driver config section of
// a task within a job. It is returned in the TaskConfigSchema RPC
taskConfigSpec = hclspec.NewObject(map[string]*hclspec.Spec{
"command": hclspec.NewAttr("command", "string", true),
"args": hclspec.NewAttr("args", "list(string)", false),
"pid_mode": hclspec.NewAttr("pid_mode", "string", false),
"ipc_mode": hclspec.NewAttr("ipc_mode", "string", false),
"cap_add": hclspec.NewAttr("cap_add", "list(string)", false),
"cap_drop": hclspec.NewAttr("cap_drop", "list(string)", false),
})
// driverCapabilities represents the RPC response for what features are
// implemented by the exec task driver
driverCapabilities = &drivers.Capabilities{
SendSignals: true,
Exec: true,
FSIsolation: drivers.FSIsolationChroot,
NetIsolationModes: []drivers.NetIsolationMode{
drivers.NetIsolationModeHost,
drivers.NetIsolationModeGroup,
},
MountConfigs: drivers.MountConfigSupportAll,
}
)
// Driver fork/execs tasks using many of the underlying OS's isolation
// features where configured.
type Driver struct {
// eventer is used to handle multiplexing of TaskEvents calls such that an
// event can be broadcast to all callers
eventer *eventer.Eventer
// config is the driver configuration set by the SetConfig RPC
config Config
// nomadConfig is the client config from nomad
nomadConfig *base.ClientDriverConfig
// tasks is the in memory datastore mapping taskIDs to driverHandles
tasks *taskStore
// ctx is the context for the driver. It is passed to other subsystems to
// coordinate shutdown
ctx context.Context
// logger will log to the Nomad agent
logger hclog.Logger
// A tri-state boolean to know if the fingerprinting has happened and
// whether it has been successful
fingerprintSuccess *bool
fingerprintLock sync.Mutex
}
// Config is the driver configuration set by the SetConfig RPC call
type Config struct {
// NoPivotRoot disables the use of pivot_root, useful when the root partition
// is on ramdisk
NoPivotRoot bool `codec:"no_pivot_root"`
// DefaultModePID is the default PID isolation set for all tasks using
// exec-based task drivers.
DefaultModePID string `codec:"default_pid_mode"`
// DefaultModeIPC is the default IPC isolation set for all tasks using
// exec-based task drivers.
DefaultModeIPC string `codec:"default_ipc_mode"`
// AllowCaps configures which Linux Capabilities are enabled for tasks
// running on this node.
AllowCaps []string `codec:"allow_caps"`
}
func (c *Config) validate() error {
switch c.DefaultModePID {
case executor.IsolationModePrivate, executor.IsolationModeHost:
default:
return fmt.Errorf("default_pid_mode must be %q or %q, got %q", executor.IsolationModePrivate, executor.IsolationModeHost, c.DefaultModePID)
}
switch c.DefaultModeIPC {
case executor.IsolationModePrivate, executor.IsolationModeHost:
default:
return fmt.Errorf("default_ipc_mode must be %q or %q, got %q", executor.IsolationModePrivate, executor.IsolationModeHost, c.DefaultModeIPC)
}
badCaps := capabilities.Supported().Difference(capabilities.New(c.AllowCaps))
if !badCaps.Empty() {
return fmt.Errorf("allow_caps configured with capabilities not supported by system: %s", badCaps)
}
return nil
}
// TaskConfig is the driver configuration of a task within a job
type TaskConfig struct {
// Command is the thing to exec.
Command string `codec:"command"`
// Args are passed along to Command.
Args []string `codec:"args"`
// ModePID indicates whether PID namespace isolation is enabled for the task.
// Must be "private" or "host" if set.
ModePID string `codec:"pid_mode"`
// ModeIPC indicates whether IPC namespace isolation is enabled for the task.
// Must be "private" or "host" if set.
ModeIPC string `codec:"ipc_mode"`
// CapAdd is a set of linux capabilities to enable.
CapAdd []string `codec:"cap_add"`
// CapDrop is a set of linux capabilities to disable.
CapDrop []string `codec:"cap_drop"`
}
func (tc *TaskConfig) validate() error {
switch tc.ModePID {
case "", executor.IsolationModePrivate, executor.IsolationModeHost:
default:
return fmt.Errorf("pid_mode must be %q or %q, got %q", executor.IsolationModePrivate, executor.IsolationModeHost, tc.ModePID)
}
switch tc.ModeIPC {
case "", executor.IsolationModePrivate, executor.IsolationModeHost:
default:
return fmt.Errorf("ipc_mode must be %q or %q, got %q", executor.IsolationModePrivate, executor.IsolationModeHost, tc.ModeIPC)
}
supported := capabilities.Supported()
badAdds := supported.Difference(capabilities.New(tc.CapAdd))
if !badAdds.Empty() {
return fmt.Errorf("cap_add configured with capabilities not supported by system: %s", badAdds)
}
badDrops := supported.Difference(capabilities.New(tc.CapDrop))
if !badDrops.Empty() {
return fmt.Errorf("cap_drop configured with capabilities not supported by system: %s", badDrops)
}
return nil
}
// TaskState is the state which is encoded in the handle returned in
// StartTask. This information is needed to rebuild the task state and handler
// during recovery.
type TaskState struct {
ReattachConfig *pstructs.ReattachConfig
TaskConfig *drivers.TaskConfig
Pid int
StartedAt time.Time
}
// NewExecDriver returns a new DrivePlugin implementation
func NewExecDriver(ctx context.Context, logger hclog.Logger) drivers.DriverPlugin {
logger = logger.Named(pluginName)
return &Driver{
eventer: eventer.NewEventer(ctx, logger),
tasks: newTaskStore(),
ctx: ctx,
logger: logger,
}
}
// setFingerprintSuccess marks the driver as having fingerprinted successfully
func (d *Driver) setFingerprintSuccess() {
d.fingerprintLock.Lock()
d.fingerprintSuccess = helper.BoolToPtr(true)
d.fingerprintLock.Unlock()
}
// setFingerprintFailure marks the driver as having failed fingerprinting
func (d *Driver) setFingerprintFailure() {
d.fingerprintLock.Lock()
d.fingerprintSuccess = helper.BoolToPtr(false)
d.fingerprintLock.Unlock()
}
// fingerprintSuccessful returns true if the driver has
// never fingerprinted or has successfully fingerprinted
func (d *Driver) fingerprintSuccessful() bool {
d.fingerprintLock.Lock()
defer d.fingerprintLock.Unlock()
return d.fingerprintSuccess == nil || *d.fingerprintSuccess
}
func (d *Driver) PluginInfo() (*base.PluginInfoResponse, error) {
return pluginInfo, nil
}
func (d *Driver) ConfigSchema() (*hclspec.Spec, error) {
return configSpec, nil
}
func (d *Driver) SetConfig(cfg *base.Config) error {
// unpack, validate, and set agent plugin config
var config Config
if len(cfg.PluginConfig) != 0 {
if err := base.MsgPackDecode(cfg.PluginConfig, &config); err != nil {
return err
}
}
if err := config.validate(); err != nil {
return err
}
d.config = config
if cfg != nil && cfg.AgentConfig != nil {
d.nomadConfig = cfg.AgentConfig.Driver
}
return nil
}
func (d *Driver) TaskConfigSchema() (*hclspec.Spec, error) {
return taskConfigSpec, nil
}
// Capabilities is returned by the Capabilities RPC and indicates what
// optional features this driver supports
func (d *Driver) Capabilities() (*drivers.Capabilities, error) {
return driverCapabilities, nil
}
func (d *Driver) Fingerprint(ctx context.Context) (<-chan *drivers.Fingerprint, error) {
ch := make(chan *drivers.Fingerprint)
go d.handleFingerprint(ctx, ch)
return ch, nil
}
func (d *Driver) handleFingerprint(ctx context.Context, ch chan<- *drivers.Fingerprint) {
defer close(ch)
ticker := time.NewTimer(0)
for {
select {
case <-ctx.Done():
return
case <-d.ctx.Done():
return
case <-ticker.C:
ticker.Reset(fingerprintPeriod)
ch <- d.buildFingerprint()
}
}
}
func (d *Driver) buildFingerprint() *drivers.Fingerprint {
if runtime.GOOS != "linux" {
d.setFingerprintFailure()
return &drivers.Fingerprint{
Health: drivers.HealthStateUndetected,
HealthDescription: "exec driver unsupported on client OS",
}
}
fp := &drivers.Fingerprint{
Attributes: map[string]*pstructs.Attribute{},
Health: drivers.HealthStateHealthy,
HealthDescription: drivers.DriverHealthy,
}
if !utils.IsUnixRoot() {
fp.Health = drivers.HealthStateUndetected
fp.HealthDescription = drivers.DriverRequiresRootMessage
d.setFingerprintFailure()
return fp
}
mount, err := cgutil.FindCgroupMountpointDir()
if err != nil {
fp.Health = drivers.HealthStateUnhealthy
fp.HealthDescription = drivers.NoCgroupMountMessage
if d.fingerprintSuccessful() {
d.logger.Warn(fp.HealthDescription, "error", err)
}
d.setFingerprintFailure()
return fp
}
if mount == "" {
fp.Health = drivers.HealthStateUnhealthy
fp.HealthDescription = drivers.CgroupMountEmpty
d.setFingerprintFailure()
return fp
}
fp.Attributes["driver.exec"] = pstructs.NewBoolAttribute(true)
d.setFingerprintSuccess()
return fp
}
func (d *Driver) RecoverTask(handle *drivers.TaskHandle) error {
if handle == nil {
return fmt.Errorf("handle cannot be nil")
}
// COMPAT(0.10): pre 0.9 upgrade path check
if handle.Version == 0 {
return d.recoverPre09Task(handle)
}
// If already attached to handle there's nothing to recover.
if _, ok := d.tasks.Get(handle.Config.ID); ok {
d.logger.Trace("nothing to recover; task already exists",
"task_id", handle.Config.ID,
"task_name", handle.Config.Name,
)
return nil
}
// Handle doesn't already exist, try to reattach
var taskState TaskState
if err := handle.GetDriverState(&taskState); err != nil {
d.logger.Error("failed to decode task state from handle", "error", err, "task_id", handle.Config.ID)
return fmt.Errorf("failed to decode task state from handle: %v", err)
}
// Create client for reattached executor
plugRC, err := pstructs.ReattachConfigToGoPlugin(taskState.ReattachConfig)
if err != nil {
d.logger.Error("failed to build ReattachConfig from task state", "error", err, "task_id", handle.Config.ID)
return fmt.Errorf("failed to build ReattachConfig from task state: %v", err)
}
exec, pluginClient, err := executor.ReattachToExecutor(plugRC,
d.logger.With("task_name", handle.Config.Name, "alloc_id", handle.Config.AllocID))
if err != nil {
d.logger.Error("failed to reattach to executor", "error", err, "task_id", handle.Config.ID)
return fmt.Errorf("failed to reattach to executor: %v", err)
}
h := &taskHandle{
exec: exec,
pid: taskState.Pid,
pluginClient: pluginClient,
taskConfig: taskState.TaskConfig,
procState: drivers.TaskStateRunning,
startedAt: taskState.StartedAt,
exitResult: &drivers.ExitResult{},
logger: d.logger,
}
d.tasks.Set(taskState.TaskConfig.ID, h)
go h.run()
return nil
}
func (d *Driver) StartTask(cfg *drivers.TaskConfig) (*drivers.TaskHandle, *drivers.DriverNetwork, error) {
if _, ok := d.tasks.Get(cfg.ID); ok {
return nil, nil, fmt.Errorf("task with ID %q already started", cfg.ID)
}
var driverConfig TaskConfig
if err := cfg.DecodeDriverConfig(&driverConfig); err != nil {
return nil, nil, fmt.Errorf("failed to decode driver config: %v", err)
}
if err := driverConfig.validate(); err != nil {
return nil, nil, fmt.Errorf("failed driver config validation: %v", err)
}
d.logger.Info("starting task", "driver_cfg", hclog.Fmt("%+v", driverConfig))
handle := drivers.NewTaskHandle(taskHandleVersion)
handle.Config = cfg
pluginLogFile := filepath.Join(cfg.TaskDir().Dir, "executor.out")
executorConfig := &executor.ExecutorConfig{
LogFile: pluginLogFile,
LogLevel: "debug",
FSIsolation: true,
}
exec, pluginClient, err := executor.CreateExecutor(
d.logger.With("task_name", handle.Config.Name, "alloc_id", handle.Config.AllocID),
d.nomadConfig, executorConfig)
if err != nil {
return nil, nil, fmt.Errorf("failed to create executor: %v", err)
}
user := cfg.User
if user == "" {
user = "nobody"
}
if cfg.DNS != nil {
dnsMount, err := resolvconf.GenerateDNSMount(cfg.TaskDir().Dir, cfg.DNS)
if err != nil {
return nil, nil, fmt.Errorf("failed to build mount for resolv.conf: %v", err)
}
cfg.Mounts = append(cfg.Mounts, dnsMount)
}
caps, err := capabilities.Calculate(
capabilities.NomadDefaults(), d.config.AllowCaps, driverConfig.CapAdd, driverConfig.CapDrop,
)
if err != nil {
return nil, nil, err
}
d.logger.Debug("task capabilities", "capabilities", caps)
execCmd := &executor.ExecCommand{
Cmd: driverConfig.Command,
Args: driverConfig.Args,
Env: cfg.EnvList(),
User: user,
ResourceLimits: true,
NoPivotRoot: d.config.NoPivotRoot,
Resources: cfg.Resources,
TaskDir: cfg.TaskDir().Dir,
StdoutPath: cfg.StdoutPath,
StderrPath: cfg.StderrPath,
Mounts: cfg.Mounts,
Devices: cfg.Devices,
NetworkIsolation: cfg.NetworkIsolation,
ModePID: executor.IsolationMode(d.config.DefaultModePID, driverConfig.ModePID),
ModeIPC: executor.IsolationMode(d.config.DefaultModeIPC, driverConfig.ModeIPC),
Capabilities: caps,
}
ps, err := exec.Launch(execCmd)
if err != nil {
pluginClient.Kill()
return nil, nil, fmt.Errorf("failed to launch command with executor: %v", err)
}
h := &taskHandle{
exec: exec,
pid: ps.Pid,
pluginClient: pluginClient,
taskConfig: cfg,
procState: drivers.TaskStateRunning,
startedAt: time.Now().Round(time.Millisecond),
logger: d.logger,
}
driverState := TaskState{
ReattachConfig: pstructs.ReattachConfigFromGoPlugin(pluginClient.ReattachConfig()),
Pid: ps.Pid,
TaskConfig: cfg,
StartedAt: h.startedAt,
}
if err := handle.SetDriverState(&driverState); err != nil {
d.logger.Error("failed to start task, error setting driver state", "error", err)
_ = exec.Shutdown("", 0)
pluginClient.Kill()
return nil, nil, fmt.Errorf("failed to set driver state: %v", err)
}
d.tasks.Set(cfg.ID, h)
go h.run()
return handle, nil, nil
}
func (d *Driver) WaitTask(ctx context.Context, taskID string) (<-chan *drivers.ExitResult, error) {
handle, ok := d.tasks.Get(taskID)
if !ok {
return nil, drivers.ErrTaskNotFound
}
ch := make(chan *drivers.ExitResult)
go d.handleWait(ctx, handle, ch)
return ch, nil
}
func (d *Driver) handleWait(ctx context.Context, handle *taskHandle, ch chan *drivers.ExitResult) {
defer close(ch)
var result *drivers.ExitResult
ps, err := handle.exec.Wait(ctx)
if err != nil {
result = &drivers.ExitResult{
Err: fmt.Errorf("executor: error waiting on process: %v", err),
}
} else {
result = &drivers.ExitResult{
ExitCode: ps.ExitCode,
Signal: ps.Signal,
}
}
select {
case <-ctx.Done():
return
case <-d.ctx.Done():
return
case ch <- result:
}
}
func (d *Driver) StopTask(taskID string, timeout time.Duration, signal string) error {
handle, ok := d.tasks.Get(taskID)
if !ok {
return drivers.ErrTaskNotFound
}
if err := handle.exec.Shutdown(signal, timeout); err != nil {
if handle.pluginClient.Exited() {
return nil
}
return fmt.Errorf("executor Shutdown failed: %v", err)
}
return nil
}
func (d *Driver) DestroyTask(taskID string, force bool) error {
handle, ok := d.tasks.Get(taskID)
if !ok {
return drivers.ErrTaskNotFound
}
if handle.IsRunning() && !force {
return fmt.Errorf("cannot destroy running task")
}
if !handle.pluginClient.Exited() {
if err := handle.exec.Shutdown("", 0); err != nil {
handle.logger.Error("destroying executor failed", "err", err)
}
handle.pluginClient.Kill()
}
d.tasks.Delete(taskID)
return nil
}
func (d *Driver) InspectTask(taskID string) (*drivers.TaskStatus, error) {
handle, ok := d.tasks.Get(taskID)
if !ok {
return nil, drivers.ErrTaskNotFound
}
return handle.TaskStatus(), nil
}
func (d *Driver) TaskStats(ctx context.Context, taskID string, interval time.Duration) (<-chan *drivers.TaskResourceUsage, error) {
handle, ok := d.tasks.Get(taskID)
if !ok {
return nil, drivers.ErrTaskNotFound
}
return handle.exec.Stats(ctx, interval)
}
func (d *Driver) TaskEvents(ctx context.Context) (<-chan *drivers.TaskEvent, error) {
return d.eventer.TaskEvents(ctx)
}
func (d *Driver) SignalTask(taskID string, signal string) error {
handle, ok := d.tasks.Get(taskID)
if !ok {
return drivers.ErrTaskNotFound
}
sig := os.Interrupt
if s, ok := signals.SignalLookup[signal]; ok {
sig = s
} else {
d.logger.Warn("unknown signal to send to task, using SIGINT instead", "signal", signal, "task_id", handle.taskConfig.ID)
}
return handle.exec.Signal(sig)
}
func (d *Driver) ExecTask(taskID string, cmd []string, timeout time.Duration) (*drivers.ExecTaskResult, error) {
if len(cmd) == 0 {
return nil, fmt.Errorf("error cmd must have at least one value")
}
handle, ok := d.tasks.Get(taskID)
if !ok {
return nil, drivers.ErrTaskNotFound
}
args := []string{}
if len(cmd) > 1 {
args = cmd[1:]
}
out, exitCode, err := handle.exec.Exec(time.Now().Add(timeout), cmd[0], args)
if err != nil {
return nil, err
}
return &drivers.ExecTaskResult{
Stdout: out,
ExitResult: &drivers.ExitResult{
ExitCode: exitCode,
},
}, nil
}
var _ drivers.ExecTaskStreamingRawDriver = (*Driver)(nil)
func (d *Driver) ExecTaskStreamingRaw(ctx context.Context,
taskID string,
command []string,
tty bool,
stream drivers.ExecTaskStream) error {
if len(command) == 0 {
return fmt.Errorf("error cmd must have at least one value")
}
handle, ok := d.tasks.Get(taskID)
if !ok {
return drivers.ErrTaskNotFound
}
return handle.exec.ExecStreaming(ctx, command, tty, stream)
}