open-nomad/drivers/exec/driver.go

Ignoring revisions in .git-blame-ignore-revs. Click here to bypass and see the normal blame view.

714 lines
21 KiB
Go
Raw Normal View History

// Copyright (c) HashiCorp, Inc.
// SPDX-License-Identifier: MPL-2.0
2018-10-16 02:37:58 +00:00
package exec
import (
"context"
2018-10-16 02:37:58 +00:00
"fmt"
"os"
"path/filepath"
"runtime"
2019-01-17 00:53:48 +00:00
"sync"
2018-10-16 02:37:58 +00:00
"time"
"github.com/hashicorp/consul-template/signals"
hclog "github.com/hashicorp/go-hclog"
client: enable support for cgroups v2 This PR introduces support for using Nomad on systems with cgroups v2 [1] enabled as the cgroups controller mounted on /sys/fs/cgroups. Newer Linux distros like Ubuntu 21.10 are shipping with cgroups v2 only, causing problems for Nomad users. Nomad mostly "just works" with cgroups v2 due to the indirection via libcontainer, but not so for managing cpuset cgroups. Before, Nomad has been making use of a feature in v1 where a PID could be a member of more than one cgroup. In v2 this is no longer possible, and so the logic around computing cpuset values must be modified. When Nomad detects v2, it manages cpuset values in-process, rather than making use of cgroup heirarchy inheritence via shared/reserved parents. Nomad will only activate the v2 logic when it detects cgroups2 is mounted at /sys/fs/cgroups. This means on systems running in hybrid mode with cgroups2 mounted at /sys/fs/cgroups/unified (as is typical) Nomad will continue to use the v1 logic, and should operate as before. Systems that do not support cgroups v2 are also not affected. When v2 is activated, Nomad will create a parent called nomad.slice (unless otherwise configured in Client conifg), and create cgroups for tasks using naming convention <allocID>-<task>.scope. These follow the naming convention set by systemd and also used by Docker when cgroups v2 is detected. Client nodes now export a new fingerprint attribute, unique.cgroups.version which will be set to 'v1' or 'v2' to indicate the cgroups regime in use by Nomad. The new cpuset management strategy fixes #11705, where docker tasks that spawned processes on startup would "leak". In cgroups v2, the PIDs are started in the cgroup they will always live in, and thus the cause of the leak is eliminated. [1] https://www.kernel.org/doc/html/latest/admin-guide/cgroup-v2.html Closes #11289 Fixes #11705 #11773 #11933
2022-02-28 22:24:01 +00:00
"github.com/hashicorp/nomad/client/lib/cgutil"
"github.com/hashicorp/nomad/drivers/shared/capabilities"
2018-10-16 02:37:58 +00:00
"github.com/hashicorp/nomad/drivers/shared/eventer"
2018-12-07 01:54:14 +00:00
"github.com/hashicorp/nomad/drivers/shared/executor"
"github.com/hashicorp/nomad/drivers/shared/resolvconf"
2019-01-23 14:27:14 +00:00
"github.com/hashicorp/nomad/helper/pluginutils/loader"
"github.com/hashicorp/nomad/helper/pointer"
2018-10-16 02:37:58 +00:00
"github.com/hashicorp/nomad/plugins/base"
"github.com/hashicorp/nomad/plugins/drivers"
"github.com/hashicorp/nomad/plugins/drivers/utils"
"github.com/hashicorp/nomad/plugins/shared/hclspec"
pstructs "github.com/hashicorp/nomad/plugins/shared/structs"
2018-10-16 02:37:58 +00:00
)
const (
// pluginName is the name of the plugin
pluginName = "exec"
// fingerprintPeriod is the interval at which the driver will send fingerprint responses
fingerprintPeriod = 30 * time.Second
2019-01-14 17:25:59 +00:00
// taskHandleVersion is the version of task handle which this driver sets
// and understands how to decode driver state
taskHandleVersion = 1
2018-10-16 02:37:58 +00:00
)
var (
2018-11-02 12:39:18 +00:00
// PluginID is the exec plugin metadata registered in the plugin
// catalog.
PluginID = loader.PluginID{
Name: pluginName,
PluginType: base.PluginTypeDriver,
}
2018-11-02 12:39:18 +00:00
// PluginConfig is the exec driver factory function registered in the
// plugin catalog.
PluginConfig = &loader.InternalPluginConfig{
Config: map[string]interface{}{},
Factory: func(ctx context.Context, l hclog.Logger) interface{} { return NewExecDriver(ctx, l) },
2018-11-02 12:39:18 +00:00
}
2018-10-16 02:37:58 +00:00
// pluginInfo is the response returned for the PluginInfo RPC
pluginInfo = &base.PluginInfoResponse{
Type: base.PluginTypeDriver,
PluginApiVersions: []string{drivers.ApiVersion010},
PluginVersion: "0.1.0",
Name: pluginName,
2018-10-16 02:37:58 +00:00
}
// configSpec is the hcl specification returned by the ConfigSchema RPC
configSpec = hclspec.NewObject(map[string]*hclspec.Spec{
"no_pivot_root": hclspec.NewDefault(
hclspec.NewAttr("no_pivot_root", "bool", false),
hclspec.NewLiteral("false"),
),
"default_pid_mode": hclspec.NewDefault(
hclspec.NewAttr("default_pid_mode", "string", false),
hclspec.NewLiteral(`"private"`),
),
"default_ipc_mode": hclspec.NewDefault(
hclspec.NewAttr("default_ipc_mode", "string", false),
hclspec.NewLiteral(`"private"`),
),
"allow_caps": hclspec.NewDefault(
hclspec.NewAttr("allow_caps", "list(string)", false),
hclspec.NewLiteral(capabilities.HCLSpecLiteral),
),
})
2018-10-16 02:37:58 +00:00
// taskConfigSpec is the hcl specification for the driver config section of
// a task within a job. It is returned in the TaskConfigSchema RPC
taskConfigSpec = hclspec.NewObject(map[string]*hclspec.Spec{
"command": hclspec.NewAttr("command", "string", true),
"args": hclspec.NewAttr("args", "list(string)", false),
"pid_mode": hclspec.NewAttr("pid_mode", "string", false),
"ipc_mode": hclspec.NewAttr("ipc_mode", "string", false),
"cap_add": hclspec.NewAttr("cap_add", "list(string)", false),
"cap_drop": hclspec.NewAttr("cap_drop", "list(string)", false),
2018-10-16 02:37:58 +00:00
})
// driverCapabilities represents the RPC response for what features are
// implemented by the exec task driver
driverCapabilities = &drivers.Capabilities{
2018-10-16 02:37:58 +00:00
SendSignals: true,
Exec: true,
2019-01-04 21:11:25 +00:00
FSIsolation: drivers.FSIsolationChroot,
NetIsolationModes: []drivers.NetIsolationMode{
drivers.NetIsolationModeHost,
drivers.NetIsolationModeGroup,
},
MountConfigs: drivers.MountConfigSupportAll,
2018-10-16 02:37:58 +00:00
}
)
// Driver fork/execs tasks using many of the underlying OS's isolation
2018-10-16 02:37:58 +00:00
// features where configured.
type Driver struct {
2018-10-16 02:37:58 +00:00
// eventer is used to handle multiplexing of TaskEvents calls such that an
// event can be broadcast to all callers
eventer *eventer.Eventer
// config is the driver configuration set by the SetConfig RPC
config Config
2018-10-17 03:00:26 +00:00
// nomadConfig is the client config from nomad
nomadConfig *base.ClientDriverConfig
2018-10-17 03:00:26 +00:00
// tasks is the in memory datastore mapping taskIDs to driverHandles
2018-10-16 02:37:58 +00:00
tasks *taskStore
// ctx is the context for the driver. It is passed to other subsystems to
// coordinate shutdown
ctx context.Context
// logger will log to the Nomad agent
2018-10-16 02:37:58 +00:00
logger hclog.Logger
// A tri-state boolean to know if the fingerprinting has happened and
// whether it has been successful
fingerprintSuccess *bool
fingerprintLock sync.Mutex
2018-10-16 02:37:58 +00:00
}
// Config is the driver configuration set by the SetConfig RPC call
type Config struct {
// NoPivotRoot disables the use of pivot_root, useful when the root partition
// is on ramdisk
NoPivotRoot bool `codec:"no_pivot_root"`
// DefaultModePID is the default PID isolation set for all tasks using
// exec-based task drivers.
DefaultModePID string `codec:"default_pid_mode"`
// DefaultModeIPC is the default IPC isolation set for all tasks using
// exec-based task drivers.
DefaultModeIPC string `codec:"default_ipc_mode"`
// AllowCaps configures which Linux Capabilities are enabled for tasks
// running on this node.
AllowCaps []string `codec:"allow_caps"`
}
func (c *Config) validate() error {
switch c.DefaultModePID {
2021-02-08 16:58:44 +00:00
case executor.IsolationModePrivate, executor.IsolationModeHost:
default:
2021-02-08 16:58:44 +00:00
return fmt.Errorf("default_pid_mode must be %q or %q, got %q", executor.IsolationModePrivate, executor.IsolationModeHost, c.DefaultModePID)
}
switch c.DefaultModeIPC {
2021-02-08 16:58:44 +00:00
case executor.IsolationModePrivate, executor.IsolationModeHost:
default:
2021-02-08 16:58:44 +00:00
return fmt.Errorf("default_ipc_mode must be %q or %q, got %q", executor.IsolationModePrivate, executor.IsolationModeHost, c.DefaultModeIPC)
}
badCaps := capabilities.Supported().Difference(capabilities.New(c.AllowCaps))
if !badCaps.Empty() {
return fmt.Errorf("allow_caps configured with capabilities not supported by system: %s", badCaps)
}
return nil
}
2018-10-16 02:37:58 +00:00
// TaskConfig is the driver configuration of a task within a job
type TaskConfig struct {
// Command is the thing to exec.
Command string `codec:"command"`
// Args are passed along to Command.
Args []string `codec:"args"`
// ModePID indicates whether PID namespace isolation is enabled for the task.
// Must be "private" or "host" if set.
ModePID string `codec:"pid_mode"`
// ModeIPC indicates whether IPC namespace isolation is enabled for the task.
// Must be "private" or "host" if set.
ModeIPC string `codec:"ipc_mode"`
// CapAdd is a set of linux capabilities to enable.
CapAdd []string `codec:"cap_add"`
// CapDrop is a set of linux capabilities to disable.
CapDrop []string `codec:"cap_drop"`
}
func (tc *TaskConfig) validate() error {
switch tc.ModePID {
case "", executor.IsolationModePrivate, executor.IsolationModeHost:
default:
return fmt.Errorf("pid_mode must be %q or %q, got %q", executor.IsolationModePrivate, executor.IsolationModeHost, tc.ModePID)
}
switch tc.ModeIPC {
case "", executor.IsolationModePrivate, executor.IsolationModeHost:
default:
return fmt.Errorf("ipc_mode must be %q or %q, got %q", executor.IsolationModePrivate, executor.IsolationModeHost, tc.ModeIPC)
}
supported := capabilities.Supported()
badAdds := supported.Difference(capabilities.New(tc.CapAdd))
if !badAdds.Empty() {
return fmt.Errorf("cap_add configured with capabilities not supported by system: %s", badAdds)
}
badDrops := supported.Difference(capabilities.New(tc.CapDrop))
if !badDrops.Empty() {
return fmt.Errorf("cap_drop configured with capabilities not supported by system: %s", badDrops)
}
return nil
2018-10-16 02:37:58 +00:00
}
// TaskState is the state which is encoded in the handle returned in
// StartTask. This information is needed to rebuild the task state and handler
// during recovery.
type TaskState struct {
2019-01-15 01:02:44 +00:00
ReattachConfig *pstructs.ReattachConfig
2018-10-16 02:37:58 +00:00
TaskConfig *drivers.TaskConfig
Pid int
StartedAt time.Time
}
// NewExecDriver returns a new DrivePlugin implementation
func NewExecDriver(ctx context.Context, logger hclog.Logger) drivers.DriverPlugin {
2018-10-16 02:37:58 +00:00
logger = logger.Named(pluginName)
return &Driver{
eventer: eventer.NewEventer(ctx, logger),
tasks: newTaskStore(),
ctx: ctx,
logger: logger,
2018-10-16 02:37:58 +00:00
}
}
// setFingerprintSuccess marks the driver as having fingerprinted successfully
func (d *Driver) setFingerprintSuccess() {
d.fingerprintLock.Lock()
d.fingerprintSuccess = pointer.Of(true)
d.fingerprintLock.Unlock()
}
// setFingerprintFailure marks the driver as having failed fingerprinting
func (d *Driver) setFingerprintFailure() {
d.fingerprintLock.Lock()
d.fingerprintSuccess = pointer.Of(false)
d.fingerprintLock.Unlock()
}
2019-01-16 17:04:11 +00:00
// fingerprintSuccessful returns true if the driver has
// never fingerprinted or has successfully fingerprinted
func (d *Driver) fingerprintSuccessful() bool {
d.fingerprintLock.Lock()
defer d.fingerprintLock.Unlock()
return d.fingerprintSuccess == nil || *d.fingerprintSuccess
}
func (d *Driver) PluginInfo() (*base.PluginInfoResponse, error) {
2018-10-16 02:37:58 +00:00
return pluginInfo, nil
}
func (d *Driver) ConfigSchema() (*hclspec.Spec, error) {
2018-10-16 02:37:58 +00:00
return configSpec, nil
}
func (d *Driver) SetConfig(cfg *base.Config) error {
// unpack, validate, and set agent plugin config
var config Config
if len(cfg.PluginConfig) != 0 {
if err := base.MsgPackDecode(cfg.PluginConfig, &config); err != nil {
return err
}
}
if err := config.validate(); err != nil {
return err
}
d.config = config
if cfg != nil && cfg.AgentConfig != nil {
d.nomadConfig = cfg.AgentConfig.Driver
}
2018-10-16 02:37:58 +00:00
return nil
}
func (d *Driver) TaskConfigSchema() (*hclspec.Spec, error) {
2018-10-16 02:37:58 +00:00
return taskConfigSpec, nil
}
// Capabilities is returned by the Capabilities RPC and indicates what
// optional features this driver supports
func (d *Driver) Capabilities() (*drivers.Capabilities, error) {
return driverCapabilities, nil
2018-10-16 02:37:58 +00:00
}
func (d *Driver) Fingerprint(ctx context.Context) (<-chan *drivers.Fingerprint, error) {
2018-10-16 02:37:58 +00:00
ch := make(chan *drivers.Fingerprint)
go d.handleFingerprint(ctx, ch)
return ch, nil
}
func (d *Driver) handleFingerprint(ctx context.Context, ch chan<- *drivers.Fingerprint) {
2018-10-16 02:37:58 +00:00
defer close(ch)
ticker := time.NewTimer(0)
for {
select {
case <-ctx.Done():
return
case <-d.ctx.Done():
return
case <-ticker.C:
ticker.Reset(fingerprintPeriod)
ch <- d.buildFingerprint()
}
}
}
func (d *Driver) buildFingerprint() *drivers.Fingerprint {
if runtime.GOOS != "linux" {
d.setFingerprintFailure()
return &drivers.Fingerprint{
Health: drivers.HealthStateUndetected,
HealthDescription: "exec driver unsupported on client OS",
}
}
fp := &drivers.Fingerprint{
Attributes: map[string]*pstructs.Attribute{},
Health: drivers.HealthStateHealthy,
HealthDescription: drivers.DriverHealthy,
}
2018-10-16 02:37:58 +00:00
if !utils.IsUnixRoot() {
fp.Health = drivers.HealthStateUndetected
fp.HealthDescription = drivers.DriverRequiresRootMessage
d.setFingerprintFailure()
return fp
}
mount, err := cgutil.FindCgroupMountpointDir()
2018-10-16 02:37:58 +00:00
if err != nil {
fp.Health = drivers.HealthStateUnhealthy
fp.HealthDescription = drivers.NoCgroupMountMessage
2019-01-16 17:04:11 +00:00
if d.fingerprintSuccessful() {
d.logger.Warn(fp.HealthDescription, "error", err)
}
d.setFingerprintFailure()
return fp
}
if mount == "" {
fp.Health = drivers.HealthStateUnhealthy
fp.HealthDescription = drivers.CgroupMountEmpty
d.setFingerprintFailure()
return fp
}
fp.Attributes["driver.exec"] = pstructs.NewBoolAttribute(true)
d.setFingerprintSuccess()
return fp
}
func (d *Driver) RecoverTask(handle *drivers.TaskHandle) error {
if handle == nil {
return fmt.Errorf("handle cannot be nil")
}
// If already attached to handle there's nothing to recover.
if _, ok := d.tasks.Get(handle.Config.ID); ok {
d.logger.Trace("nothing to recover; task already exists",
"task_id", handle.Config.ID,
"task_name", handle.Config.Name,
)
return nil
}
// Handle doesn't already exist, try to reattach
var taskState TaskState
if err := handle.GetDriverState(&taskState); err != nil {
d.logger.Error("failed to decode task state from handle", "error", err, "task_id", handle.Config.ID)
return fmt.Errorf("failed to decode task state from handle: %v", err)
2018-10-16 02:37:58 +00:00
}
// Create client for reattached executor
2019-01-15 01:02:44 +00:00
plugRC, err := pstructs.ReattachConfigToGoPlugin(taskState.ReattachConfig)
2018-10-16 02:37:58 +00:00
if err != nil {
d.logger.Error("failed to build ReattachConfig from task state", "error", err, "task_id", handle.Config.ID)
return fmt.Errorf("failed to build ReattachConfig from task state: %v", err)
2018-10-16 02:37:58 +00:00
}
2019-01-14 17:25:59 +00:00
exec, pluginClient, err := executor.ReattachToExecutor(plugRC,
d.logger.With("task_name", handle.Config.Name, "alloc_id", handle.Config.AllocID))
2018-10-16 02:37:58 +00:00
if err != nil {
d.logger.Error("failed to reattach to executor", "error", err, "task_id", handle.Config.ID)
return fmt.Errorf("failed to reattach to executor: %v", err)
2018-10-16 02:37:58 +00:00
}
h := &taskHandle{
2018-10-16 02:37:58 +00:00
exec: exec,
pid: taskState.Pid,
pluginClient: pluginClient,
taskConfig: taskState.TaskConfig,
2018-10-16 02:37:58 +00:00
procState: drivers.TaskStateRunning,
startedAt: taskState.StartedAt,
exitResult: &drivers.ExitResult{},
logger: d.logger,
2018-10-16 02:37:58 +00:00
}
d.tasks.Set(taskState.TaskConfig.ID, h)
go h.run()
return nil
}
func (d *Driver) StartTask(cfg *drivers.TaskConfig) (*drivers.TaskHandle, *drivers.DriverNetwork, error) {
2018-10-16 02:37:58 +00:00
if _, ok := d.tasks.Get(cfg.ID); ok {
return nil, nil, fmt.Errorf("task with ID %q already started", cfg.ID)
2018-10-16 02:37:58 +00:00
}
var driverConfig TaskConfig
if err := cfg.DecodeDriverConfig(&driverConfig); err != nil {
return nil, nil, fmt.Errorf("failed to decode driver config: %v", err)
}
if err := driverConfig.validate(); err != nil {
return nil, nil, fmt.Errorf("failed driver config validation: %v", err)
}
d.logger.Info("starting task", "driver_cfg", hclog.Fmt("%+v", driverConfig))
2019-01-14 17:25:59 +00:00
handle := drivers.NewTaskHandle(taskHandleVersion)
2018-10-16 02:37:58 +00:00
handle.Config = cfg
pluginLogFile := filepath.Join(cfg.TaskDir().Dir, "executor.out")
executorConfig := &executor.ExecutorConfig{
2018-10-16 02:37:58 +00:00
LogFile: pluginLogFile,
LogLevel: "debug",
FSIsolation: true,
}
exec, pluginClient, err := executor.CreateExecutor(
d.logger.With("task_name", handle.Config.Name, "alloc_id", handle.Config.AllocID),
d.nomadConfig, executorConfig)
2018-10-16 02:37:58 +00:00
if err != nil {
return nil, nil, fmt.Errorf("failed to create executor: %v", err)
2018-10-16 02:37:58 +00:00
}
user := cfg.User
if user == "" {
user = "nobody"
}
if cfg.DNS != nil {
dnsMount, err := resolvconf.GenerateDNSMount(cfg.TaskDir().Dir, cfg.DNS)
if err != nil {
return nil, nil, fmt.Errorf("failed to build mount for resolv.conf: %v", err)
}
cfg.Mounts = append(cfg.Mounts, dnsMount)
}
caps, err := capabilities.Calculate(
capabilities.NomadDefaults(), d.config.AllowCaps, driverConfig.CapAdd, driverConfig.CapDrop,
)
if err != nil {
return nil, nil, err
}
d.logger.Debug("task capabilities", "capabilities", caps)
2018-12-07 01:54:14 +00:00
execCmd := &executor.ExecCommand{
Cmd: driverConfig.Command,
Args: driverConfig.Args,
Env: cfg.EnvList(),
User: user,
ResourceLimits: true,
NoPivotRoot: d.config.NoPivotRoot,
Resources: cfg.Resources,
TaskDir: cfg.TaskDir().Dir,
StdoutPath: cfg.StdoutPath,
StderrPath: cfg.StderrPath,
Mounts: cfg.Mounts,
Devices: cfg.Devices,
NetworkIsolation: cfg.NetworkIsolation,
ModePID: executor.IsolationMode(d.config.DefaultModePID, driverConfig.ModePID),
ModeIPC: executor.IsolationMode(d.config.DefaultModeIPC, driverConfig.ModeIPC),
Capabilities: caps,
2018-10-16 02:37:58 +00:00
}
ps, err := exec.Launch(execCmd)
if err != nil {
pluginClient.Kill()
return nil, nil, fmt.Errorf("failed to launch command with executor: %v", err)
2018-10-16 02:37:58 +00:00
}
h := &taskHandle{
2018-10-16 02:37:58 +00:00
exec: exec,
pid: ps.Pid,
pluginClient: pluginClient,
taskConfig: cfg,
2018-10-16 02:37:58 +00:00
procState: drivers.TaskStateRunning,
startedAt: time.Now().Round(time.Millisecond),
logger: d.logger,
}
driverState := TaskState{
2019-01-15 01:02:44 +00:00
ReattachConfig: pstructs.ReattachConfigFromGoPlugin(pluginClient.ReattachConfig()),
2018-10-16 02:37:58 +00:00
Pid: ps.Pid,
TaskConfig: cfg,
StartedAt: h.startedAt,
}
if err := handle.SetDriverState(&driverState); err != nil {
d.logger.Error("failed to start task, error setting driver state", "error", err)
_ = exec.Shutdown("", 0)
2018-10-16 02:37:58 +00:00
pluginClient.Kill()
return nil, nil, fmt.Errorf("failed to set driver state: %v", err)
2018-10-16 02:37:58 +00:00
}
d.tasks.Set(cfg.ID, h)
go h.run()
return handle, nil, nil
}
func (d *Driver) WaitTask(ctx context.Context, taskID string) (<-chan *drivers.ExitResult, error) {
2018-10-16 02:37:58 +00:00
handle, ok := d.tasks.Get(taskID)
if !ok {
return nil, drivers.ErrTaskNotFound
}
ch := make(chan *drivers.ExitResult)
2018-10-16 02:37:58 +00:00
go d.handleWait(ctx, handle, ch)
return ch, nil
}
func (d *Driver) handleWait(ctx context.Context, handle *taskHandle, ch chan *drivers.ExitResult) {
2018-10-16 02:37:58 +00:00
defer close(ch)
var result *drivers.ExitResult
2018-12-05 16:04:18 +00:00
ps, err := handle.exec.Wait(ctx)
if err != nil {
result = &drivers.ExitResult{
Err: fmt.Errorf("executor: error waiting on process: %v", err),
}
} else {
result = &drivers.ExitResult{
ExitCode: ps.ExitCode,
Signal: ps.Signal,
}
}
2018-10-16 02:37:58 +00:00
select {
case <-ctx.Done():
return
case <-d.ctx.Done():
return
case ch <- result:
2018-10-16 02:37:58 +00:00
}
}
func (d *Driver) StopTask(taskID string, timeout time.Duration, signal string) error {
2018-10-16 02:37:58 +00:00
handle, ok := d.tasks.Get(taskID)
if !ok {
return drivers.ErrTaskNotFound
}
if err := handle.exec.Shutdown(signal, timeout); err != nil {
if handle.pluginClient.Exited() {
return nil
}
return fmt.Errorf("executor Shutdown failed: %v", err)
}
return nil
}
// resetCgroup will re-create the v2 cgroup for the task after the task has been
// destroyed by libcontainer. In the case of a task restart we call DestroyTask
// which removes the cgroup - but we still need it!
//
// Ideally the cgroup management would be more unified - and we could do the creation
// on a task runner pre-start hook, eliminating the need for this hack.
func (d *Driver) resetCgroup(handle *taskHandle) {
if cgutil.UseV2 {
if handle.taskConfig.Resources != nil &&
handle.taskConfig.Resources.LinuxResources != nil &&
handle.taskConfig.Resources.LinuxResources.CpusetCgroupPath != "" {
err := os.Mkdir(handle.taskConfig.Resources.LinuxResources.CpusetCgroupPath, 0755)
if err != nil {
d.logger.Trace("failed to reset cgroup", "path", handle.taskConfig.Resources.LinuxResources.CpusetCgroupPath)
}
}
}
}
func (d *Driver) DestroyTask(taskID string, force bool) error {
2018-10-16 02:37:58 +00:00
handle, ok := d.tasks.Get(taskID)
if !ok {
return drivers.ErrTaskNotFound
}
if handle.IsRunning() && !force {
return fmt.Errorf("cannot destroy running task")
}
if !handle.pluginClient.Exited() {
2019-11-15 14:33:46 +00:00
if err := handle.exec.Shutdown("", 0); err != nil {
handle.logger.Error("destroying executor failed", "error", err)
2018-10-16 02:37:58 +00:00
}
handle.pluginClient.Kill()
}
// workaround for the case where DestroyTask was issued on task restart
d.resetCgroup(handle)
2018-10-16 02:37:58 +00:00
d.tasks.Delete(taskID)
return nil
}
func (d *Driver) InspectTask(taskID string) (*drivers.TaskStatus, error) {
2018-10-16 02:37:58 +00:00
handle, ok := d.tasks.Get(taskID)
if !ok {
return nil, drivers.ErrTaskNotFound
}
return handle.TaskStatus(), nil
2018-10-16 02:37:58 +00:00
}
func (d *Driver) TaskStats(ctx context.Context, taskID string, interval time.Duration) (<-chan *drivers.TaskResourceUsage, error) {
2018-10-16 02:37:58 +00:00
handle, ok := d.tasks.Get(taskID)
if !ok {
return nil, drivers.ErrTaskNotFound
}
return handle.exec.Stats(ctx, interval)
2018-10-16 02:37:58 +00:00
}
func (d *Driver) TaskEvents(ctx context.Context) (<-chan *drivers.TaskEvent, error) {
2018-10-16 02:37:58 +00:00
return d.eventer.TaskEvents(ctx)
}
func (d *Driver) SignalTask(taskID string, signal string) error {
2018-10-16 02:37:58 +00:00
handle, ok := d.tasks.Get(taskID)
if !ok {
return drivers.ErrTaskNotFound
}
sig := os.Interrupt
if s, ok := signals.SignalLookup[signal]; ok {
sig = s
} else {
d.logger.Warn("unknown signal to send to task, using SIGINT instead", "signal", signal, "task_id", handle.taskConfig.ID)
2018-10-16 02:37:58 +00:00
}
return handle.exec.Signal(sig)
}
func (d *Driver) ExecTask(taskID string, cmd []string, timeout time.Duration) (*drivers.ExecTaskResult, error) {
2018-10-16 02:37:58 +00:00
if len(cmd) == 0 {
2019-05-13 14:01:19 +00:00
return nil, fmt.Errorf("error cmd must have at least one value")
2018-10-16 02:37:58 +00:00
}
handle, ok := d.tasks.Get(taskID)
if !ok {
return nil, drivers.ErrTaskNotFound
}
args := []string{}
if len(cmd) > 1 {
args = cmd[1:]
}
out, exitCode, err := handle.exec.Exec(time.Now().Add(timeout), cmd[0], args)
if err != nil {
return nil, err
}
return &drivers.ExecTaskResult{
Stdout: out,
ExitResult: &drivers.ExitResult{
ExitCode: exitCode,
},
}, nil
}
var _ drivers.ExecTaskStreamingRawDriver = (*Driver)(nil)
func (d *Driver) ExecTaskStreamingRaw(ctx context.Context,
taskID string,
command []string,
tty bool,
stream drivers.ExecTaskStream) error {
if len(command) == 0 {
2019-05-13 14:01:19 +00:00
return fmt.Errorf("error cmd must have at least one value")
}
handle, ok := d.tasks.Get(taskID)
if !ok {
return drivers.ErrTaskNotFound
}
return handle.exec.ExecStreaming(ctx, command, tty, stream)
}