Merge pull request #4788 from hashicorp/r-clientv2-exec

drivers/exec: add exec implementation
This commit is contained in:
Nick Ethier 2018-10-16 22:45:04 -04:00 committed by GitHub
commit 025dedff7e
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
11 changed files with 1120 additions and 56 deletions

View file

@ -276,12 +276,12 @@ func (e *UniversalExecutor) Launch(command *ExecCommand) (*ProcessState, error)
e.childCmd.Stderr = stderr e.childCmd.Stderr = stderr
// Look up the binary path and make it executable // Look up the binary path and make it executable
absPath, err := e.lookupBin(command.Cmd) absPath, err := lookupBin(command.TaskDir, command.Cmd)
if err != nil { if err != nil {
return nil, err return nil, err
} }
if err := e.makeExecutable(absPath); err != nil { if err := makeExecutable(absPath); err != nil {
return nil, err return nil, err
} }
@ -476,55 +476,6 @@ func (e *UniversalExecutor) Shutdown(signal string, grace time.Duration) error {
return nil return nil
} }
// lookupBin looks for path to the binary to run by looking for the binary in
// the following locations, in-order: task/local/, task/, based on host $PATH.
// The return path is absolute.
func (e *UniversalExecutor) lookupBin(bin string) (string, error) {
// Check in the local directory
local := filepath.Join(e.commandCfg.TaskDir, allocdir.TaskLocal, bin)
if _, err := os.Stat(local); err == nil {
return local, nil
}
// Check at the root of the task's directory
root := filepath.Join(e.commandCfg.TaskDir, bin)
if _, err := os.Stat(root); err == nil {
return root, nil
}
// Check the $PATH
if host, err := exec.LookPath(bin); err == nil {
return host, nil
}
return "", fmt.Errorf("binary %q could not be found", bin)
}
// makeExecutable makes the given file executable for root,group,others.
func (e *UniversalExecutor) makeExecutable(binPath string) error {
if runtime.GOOS == "windows" {
return nil
}
fi, err := os.Stat(binPath)
if err != nil {
if os.IsNotExist(err) {
return fmt.Errorf("binary %q does not exist", binPath)
}
return fmt.Errorf("specified binary is invalid: %v", err)
}
// If it is not executable, make it so.
perm := fi.Mode().Perm()
req := os.FileMode(0555)
if perm&req != req {
if err := os.Chmod(binPath, perm|req); err != nil {
return fmt.Errorf("error making %q executable: %s", binPath, err)
}
}
return nil
}
// Signal sends the passed signal to the task // Signal sends the passed signal to the task
func (e *UniversalExecutor) Signal(s os.Signal) error { func (e *UniversalExecutor) Signal(s os.Signal) error {
if e.childCmd.Process == nil { if e.childCmd.Process == nil {
@ -548,3 +499,52 @@ func (e *UniversalExecutor) Stats() (*cstructs.TaskResourceUsage, error) {
} }
return aggregatedResourceUsage(e.systemCpuStats, pidStats), nil return aggregatedResourceUsage(e.systemCpuStats, pidStats), nil
} }
// lookupBin looks for path to the binary to run by looking for the binary in
// the following locations, in-order: task/local/, task/, based on host $PATH.
// The return path is absolute.
func lookupBin(taskDir string, bin string) (string, error) {
// Check in the local directory
local := filepath.Join(taskDir, allocdir.TaskLocal, bin)
if _, err := os.Stat(local); err == nil {
return local, nil
}
// Check at the root of the task's directory
root := filepath.Join(taskDir, bin)
if _, err := os.Stat(root); err == nil {
return root, nil
}
// Check the $PATH
if host, err := exec.LookPath(bin); err == nil {
return host, nil
}
return "", fmt.Errorf("binary %q could not be found", bin)
}
// makeExecutable makes the given file executable for root,group,others.
func makeExecutable(binPath string) error {
if runtime.GOOS == "windows" {
return nil
}
fi, err := os.Stat(binPath)
if err != nil {
if os.IsNotExist(err) {
return fmt.Errorf("binary %q does not exist", binPath)
}
return fmt.Errorf("specified binary is invalid: %v", err)
}
// If it is not executable, make it so.
perm := fi.Mode().Perm()
req := os.FileMode(0555)
if perm&req != req {
if err := os.Chmod(binPath, perm|req); err != nil {
return fmt.Errorf("error making %q executable: %s", binPath, err)
}
}
return nil
}

View file

@ -101,6 +101,10 @@ func (l *LibcontainerExecutor) Launch(command *ExecCommand) (*ProcessState, erro
return nil, fmt.Errorf("unable to find the nomad binary: %v", err) return nil, fmt.Errorf("unable to find the nomad binary: %v", err)
} }
if command.Resources == nil {
command.Resources = &Resources{}
}
l.command = command l.command = command
// Move to the root cgroup until process is started // Move to the root cgroup until process is started
@ -129,7 +133,26 @@ func (l *LibcontainerExecutor) Launch(command *ExecCommand) (*ProcessState, erro
} }
l.container = container l.container = container
combined := append([]string{command.Cmd}, command.Args...) // Look up the binary path and make it executable
absPath, err := lookupBin(command.TaskDir, command.Cmd)
if err != nil {
return nil, err
}
if err := makeExecutable(absPath); err != nil {
return nil, err
}
path := absPath
// Determine the path to run as it may have to be relative to the chroot.
rel, err := filepath.Rel(command.TaskDir, path)
if err != nil {
return nil, fmt.Errorf("failed to determine relative path base=%q target=%q: %v", command.TaskDir, path, err)
}
path = rel
combined := append([]string{path}, command.Args...)
stdout, err := command.Stdout() stdout, err := command.Stdout()
if err != nil { if err != nil {
return nil, err return nil, err

440
drivers/exec/driver.go Normal file
View file

@ -0,0 +1,440 @@
package exec
import (
"fmt"
"os"
"path/filepath"
"strconv"
"time"
"github.com/hashicorp/consul-template/signals"
hclog "github.com/hashicorp/go-hclog"
plugin "github.com/hashicorp/go-plugin"
"github.com/hashicorp/nomad/client/driver/executor"
dstructs "github.com/hashicorp/nomad/client/driver/structs"
cstructs "github.com/hashicorp/nomad/client/structs"
"github.com/hashicorp/nomad/drivers/shared/eventer"
"github.com/hashicorp/nomad/plugins/base"
"github.com/hashicorp/nomad/plugins/drivers"
"github.com/hashicorp/nomad/plugins/drivers/utils"
"github.com/hashicorp/nomad/plugins/shared/hclspec"
"golang.org/x/net/context"
)
const (
// pluginName is the name of the plugin
pluginName = "exec"
// fingerprintPeriod is the interval at which the driver will send fingerprint responses
fingerprintPeriod = 30 * time.Second
)
var (
// pluginInfo is the response returned for the PluginInfo RPC
pluginInfo = &base.PluginInfoResponse{
Type: base.PluginTypeDriver,
PluginApiVersion: "0.0.1",
PluginVersion: "0.1.0",
Name: pluginName,
}
// configSpec is the hcl specification returned by the ConfigSchema RPC
configSpec = hclspec.NewObject(map[string]*hclspec.Spec{
"enabled": hclspec.NewDefault(
hclspec.NewAttr("enabled", "bool", false),
hclspec.NewLiteral("true"),
),
})
// taskConfigSpec is the hcl specification for the driver config section of
// a task within a job. It is returned in the TaskConfigSchema RPC
taskConfigSpec = hclspec.NewObject(map[string]*hclspec.Spec{
"command": hclspec.NewAttr("command", "string", true),
"args": hclspec.NewAttr("args", "list(string)", false),
})
// capabilities is returned by the Capabilities RPC and indicates what
// optional features this driver supports
capabilities = &drivers.Capabilities{
SendSignals: true,
Exec: true,
FSIsolation: cstructs.FSIsolationChroot,
}
)
// ExecDriver fork/execs tasks using many of the underlying OS's isolation
// features where configured.
type ExecDriver struct {
// eventer is used to handle multiplexing of TaskEvents calls such that an
// event can be broadcast to all callers
eventer *eventer.Eventer
// config is the driver configuration set by the SetConfig RPC
config *Config
// tasks is the in memory datastore mapping taskIDs to execDriverHandles
tasks *taskStore
// ctx is the context for the driver. It is passed to other subsystems to
// coordinate shutdown
ctx context.Context
// signalShutdown is called when the driver is shutting down and cancels the
// ctx passed to any subsystems
signalShutdown context.CancelFunc
// logger will log to the plugin output which is usually an 'executor.out'
// file located in the root of the TaskDir
logger hclog.Logger
}
// Config is the driver configuration set by the SetConfig RPC call
type Config struct {
// Enabled is set to true to enable the driver
Enabled bool `cty:"enabled"`
}
// TaskConfig is the driver configuration of a task within a job
type TaskConfig struct {
Command string `cty:"command"`
Args []string `cty:"args"`
}
// TaskState is the state which is encoded in the handle returned in
// StartTask. This information is needed to rebuild the task state and handler
// during recovery.
type TaskState struct {
ReattachConfig *utils.ReattachConfig
TaskConfig *drivers.TaskConfig
Pid int
StartedAt time.Time
}
// NewExecDriver returns a new DrivePlugin implementation
func NewExecDriver(logger hclog.Logger) drivers.DriverPlugin {
ctx, cancel := context.WithCancel(context.Background())
logger = logger.Named(pluginName)
return &ExecDriver{
eventer: eventer.NewEventer(ctx, logger),
config: &Config{},
tasks: newTaskStore(),
ctx: ctx,
signalShutdown: cancel,
logger: logger,
}
}
func (*ExecDriver) PluginInfo() (*base.PluginInfoResponse, error) {
return pluginInfo, nil
}
func (*ExecDriver) ConfigSchema() (*hclspec.Spec, error) {
return configSpec, nil
}
func (d *ExecDriver) SetConfig(data []byte) error {
var config Config
if err := base.MsgPackDecode(data, &config); err != nil {
return err
}
d.config = &config
return nil
}
func (d *ExecDriver) Shutdown(ctx context.Context) error {
d.signalShutdown()
return nil
}
func (d *ExecDriver) TaskConfigSchema() (*hclspec.Spec, error) {
return taskConfigSpec, nil
}
func (d *ExecDriver) Capabilities() (*drivers.Capabilities, error) {
return capabilities, nil
}
func (d *ExecDriver) Fingerprint(ctx context.Context) (<-chan *drivers.Fingerprint, error) {
ch := make(chan *drivers.Fingerprint)
go d.handleFingerprint(ctx, ch)
return ch, nil
}
func (d *ExecDriver) handleFingerprint(ctx context.Context, ch chan *drivers.Fingerprint) {
defer close(ch)
ticker := time.NewTimer(0)
for {
select {
case <-ctx.Done():
return
case <-d.ctx.Done():
return
case <-ticker.C:
ticker.Reset(fingerprintPeriod)
ch <- d.buildFingerprint()
}
}
}
func (d *ExecDriver) RecoverTask(handle *drivers.TaskHandle) error {
var taskState TaskState
logger := d.logger.With("task_id", handle.Config.ID)
err := handle.GetDriverState(&taskState)
if err != nil {
logger.Error("failed to decode driver state during task recovery", "error", err)
return fmt.Errorf("failed to decode state: %v", err)
}
plugRC, err := utils.ReattachConfigToGoPlugin(taskState.ReattachConfig)
if err != nil {
logger.Error("failed to build reattach config during task recovery", "error", err)
return fmt.Errorf("failed to build reattach config: %v", err)
}
pluginConfig := &plugin.ClientConfig{
Reattach: plugRC,
}
exec, pluginClient, err := utils.CreateExecutorWithConfig(pluginConfig, os.Stderr)
if err != nil {
logger.Error("failed to build executor during task recovery", "error", err)
return fmt.Errorf("failed to build executor: %v", err)
}
h := &execTaskHandle{
exec: exec,
pid: taskState.Pid,
pluginClient: pluginClient,
task: taskState.TaskConfig,
procState: drivers.TaskStateRunning,
startedAt: taskState.StartedAt,
exitCh: make(chan struct{}),
}
d.tasks.Set(taskState.TaskConfig.ID, h)
go h.run()
return nil
}
func (d *ExecDriver) StartTask(cfg *drivers.TaskConfig) (*drivers.TaskHandle, *cstructs.DriverNetwork, error) {
if _, ok := d.tasks.Get(cfg.ID); ok {
return nil, nil, fmt.Errorf("task with ID '%s' already started", cfg.ID)
}
var driverConfig TaskConfig
if err := cfg.DecodeDriverConfig(&driverConfig); err != nil {
return nil, nil, fmt.Errorf("failed to decode driver config: %v", err)
}
handle := drivers.NewTaskHandle(pluginName)
handle.Config = cfg
pluginLogFile := filepath.Join(cfg.TaskDir().Dir, "executor.out")
executorConfig := &dstructs.ExecutorConfig{
LogFile: pluginLogFile,
LogLevel: "debug",
FSIsolation: true,
}
// TODO: best way to pass port ranges in from client config
exec, pluginClient, err := utils.CreateExecutor(os.Stderr, hclog.Debug, 14000, 14512, executorConfig)
if err != nil {
return nil, nil, err
}
execCmd := &executor.ExecCommand{
Cmd: driverConfig.Command,
Args: driverConfig.Args,
Env: cfg.EnvList(),
User: cfg.User,
ResourceLimits: true,
TaskDir: cfg.TaskDir().Dir,
StdoutPath: cfg.StdoutPath,
StderrPath: cfg.StderrPath,
}
ps, err := exec.Launch(execCmd)
if err != nil {
pluginClient.Kill()
return nil, nil, err
}
h := &execTaskHandle{
exec: exec,
pid: ps.Pid,
pluginClient: pluginClient,
task: cfg,
procState: drivers.TaskStateRunning,
startedAt: time.Now().Round(time.Millisecond),
logger: d.logger,
exitCh: make(chan struct{}),
}
driverState := &TaskState{
ReattachConfig: utils.ReattachConfigFromGoPlugin(pluginClient.ReattachConfig()),
Pid: ps.Pid,
TaskConfig: cfg,
StartedAt: h.startedAt,
}
if err := handle.SetDriverState(&driverState); err != nil {
d.logger.Error("failed to start task, error setting driver state", "error", err)
exec.Shutdown("", 0)
pluginClient.Kill()
return nil, nil, err
}
d.tasks.Set(cfg.ID, h)
go h.run()
return handle, nil, nil
}
func (d *ExecDriver) WaitTask(ctx context.Context, taskID string) (<-chan *drivers.ExitResult, error) {
ch := make(chan *drivers.ExitResult)
handle, ok := d.tasks.Get(taskID)
if !ok {
return nil, drivers.ErrTaskNotFound
}
go d.handleWait(ctx, handle, ch)
return ch, nil
}
func (d *ExecDriver) handleWait(ctx context.Context, handle *execTaskHandle, ch chan *drivers.ExitResult) {
defer close(ch)
select {
case <-ctx.Done():
return
case <-d.ctx.Done():
return
case <-handle.exitCh:
ch <- handle.exitResult
}
}
func (d *ExecDriver) StopTask(taskID string, timeout time.Duration, signal string) error {
handle, ok := d.tasks.Get(taskID)
if !ok {
return drivers.ErrTaskNotFound
}
if err := handle.exec.Shutdown(signal, timeout); err != nil {
if handle.pluginClient.Exited() {
return nil
}
return fmt.Errorf("executor Shutdown failed: %v", err)
}
return nil
}
func (d *ExecDriver) DestroyTask(taskID string, force bool) error {
handle, ok := d.tasks.Get(taskID)
if !ok {
return drivers.ErrTaskNotFound
}
if handle.IsRunning() && !force {
return fmt.Errorf("cannot destroy running task")
}
if !handle.pluginClient.Exited() {
if handle.IsRunning() {
if err := handle.exec.Shutdown("", 0); err != nil {
handle.logger.Error("destroying executor failed", "err", err)
}
}
handle.pluginClient.Kill()
}
d.tasks.Delete(taskID)
return nil
}
func (d *ExecDriver) InspectTask(taskID string) (*drivers.TaskStatus, error) {
handle, ok := d.tasks.Get(taskID)
if !ok {
return nil, drivers.ErrTaskNotFound
}
handle.stateLock.RLock()
defer handle.stateLock.RUnlock()
status := &drivers.TaskStatus{
ID: handle.task.ID,
Name: handle.task.Name,
State: handle.procState,
StartedAt: handle.startedAt,
CompletedAt: handle.completedAt,
ExitResult: handle.exitResult,
DriverAttributes: map[string]string{
"pid": strconv.Itoa(handle.pid),
},
}
return status, nil
}
func (d *ExecDriver) TaskStats(taskID string) (*cstructs.TaskResourceUsage, error) {
handle, ok := d.tasks.Get(taskID)
if !ok {
return nil, drivers.ErrTaskNotFound
}
stats, err := handle.exec.Stats()
if err != nil {
return nil, fmt.Errorf("failed to retrieve stats from executor: %v", err)
}
return stats, nil
}
func (d *ExecDriver) TaskEvents(ctx context.Context) (<-chan *drivers.TaskEvent, error) {
return d.eventer.TaskEvents(ctx)
}
func (d *ExecDriver) SignalTask(taskID string, signal string) error {
handle, ok := d.tasks.Get(taskID)
if !ok {
return drivers.ErrTaskNotFound
}
sig := os.Interrupt
if s, ok := signals.SignalLookup[signal]; ok {
d.logger.Warn("signal to send to task unknown, using SIGINT", "signal", signal, "task_id", handle.task.ID)
sig = s
}
return handle.exec.Signal(sig)
}
func (d *ExecDriver) ExecTask(taskID string, cmd []string, timeout time.Duration) (*drivers.ExecTaskResult, error) {
if len(cmd) == 0 {
return nil, fmt.Errorf("error cmd must have atleast one value")
}
handle, ok := d.tasks.Get(taskID)
if !ok {
return nil, drivers.ErrTaskNotFound
}
args := []string{}
if len(cmd) > 1 {
args = cmd[1:]
}
out, exitCode, err := handle.exec.Exec(time.Now().Add(timeout), cmd[0], args)
if err != nil {
return nil, err
}
return &drivers.ExecTaskResult{
Stdout: out,
ExitResult: &drivers.ExitResult{
ExitCode: exitCode,
},
}, nil
}

View file

@ -0,0 +1,12 @@
//+build !linux
package exec
import "github.com/hashicorp/nomad/plugins/drivers"
func (d *ExecDriver) buildFingerprint() *drivers.Fingerprint {
return &drivers.Fingerprint{
Health: drivers.HealthStateUndetected,
HealthDescription: "exec driver unsupported on client OS",
}
}

View file

@ -0,0 +1,38 @@
package exec
import (
"github.com/hashicorp/nomad/client/fingerprint"
"github.com/hashicorp/nomad/plugins/drivers"
"golang.org/x/sys/unix"
)
func (d *ExecDriver) buildFingerprint() *drivers.Fingerprint {
fp := &drivers.Fingerprint{
Attributes: map[string]string{},
Health: drivers.HealthStateHealthy,
HealthDescription: "healthy",
}
mount, err := fingerprint.FindCgroupMountpointDir()
if err != nil {
fp.Health = drivers.HealthStateUnhealthy
fp.HealthDescription = "failed to discover cgroup mount point"
d.logger.Warn(fp.HealthDescription, "error", err)
return fp
}
if mount == "" {
fp.Health = drivers.HealthStateUnhealthy
fp.HealthDescription = "cgroups are unavailable"
return fp
}
if unix.Geteuid() != 0 {
fp.Health = drivers.HealthStateUnhealthy
fp.HealthDescription = "exec driver must run as root"
return fp
}
fp.Attributes["driver.exec"] = "1"
return fp
}

442
drivers/exec/driver_test.go Normal file
View file

@ -0,0 +1,442 @@
package exec
import (
"bytes"
"context"
"fmt"
"io/ioutil"
"os"
"path/filepath"
"runtime"
"strings"
"sync"
"testing"
"time"
"github.com/hashicorp/hcl2/hcl"
ctestutils "github.com/hashicorp/nomad/client/testutil"
"github.com/hashicorp/nomad/helper/testlog"
"github.com/hashicorp/nomad/helper/testtask"
"github.com/hashicorp/nomad/helper/uuid"
"github.com/hashicorp/nomad/plugins/drivers"
"github.com/hashicorp/nomad/plugins/shared"
"github.com/hashicorp/nomad/plugins/shared/hclspec"
"github.com/hashicorp/nomad/testutil"
"github.com/stretchr/testify/require"
)
func TestMain(m *testing.M) {
if !testtask.Run() {
os.Exit(m.Run())
}
}
func TestExecDriver_Fingerprint_NonLinux(t *testing.T) {
if !testutil.IsTravis() {
t.Parallel()
}
require := require.New(t)
if runtime.GOOS == "linux" {
t.Skip("Test only available not on Linux")
}
d := NewExecDriver(testlog.HCLogger(t))
harness := drivers.NewDriverHarness(t, d)
fingerCh, err := harness.Fingerprint(context.Background())
require.NoError(err)
select {
case finger := <-fingerCh:
require.Equal(drivers.HealthStateUndetected, finger.Health)
case <-time.After(time.Duration(testutil.TestMultiplier()*5) * time.Second):
require.Fail("timeout receiving fingerprint")
}
}
func TestExecDriver_Fingerprint(t *testing.T) {
t.Parallel()
require := require.New(t)
ctestutils.ExecCompatible(t)
d := NewExecDriver(testlog.HCLogger(t))
harness := drivers.NewDriverHarness(t, d)
fingerCh, err := harness.Fingerprint(context.Background())
require.NoError(err)
select {
case finger := <-fingerCh:
require.Equal(drivers.HealthStateHealthy, finger.Health)
require.Equal("1", finger.Attributes["driver.exec"])
case <-time.After(time.Duration(testutil.TestMultiplier()*5) * time.Second):
require.Fail("timeout receiving fingerprint")
}
}
func TestExecDriver_StartWait(t *testing.T) {
t.Parallel()
require := require.New(t)
ctestutils.ExecCompatible(t)
d := NewExecDriver(testlog.HCLogger(t))
harness := drivers.NewDriverHarness(t, d)
task := &drivers.TaskConfig{
ID: uuid.Generate(),
Name: "test",
}
taskConfig := map[string]interface{}{
"command": "cat",
"args": []string{"/proc/self/cgroup"},
}
encodeDriverHelper(require, task, taskConfig)
cleanup := harness.MkAllocDir(task, false)
defer cleanup()
fmt.Println(task.AllocDir)
handle, _, err := harness.StartTask(task)
require.NoError(err)
ch, err := harness.WaitTask(context.Background(), handle.Config.ID)
require.NoError(err)
result := <-ch
require.Zero(result.ExitCode)
require.NoError(harness.DestroyTask(task.ID, true))
}
func TestExecDriver_StartWaitStop(t *testing.T) {
t.Parallel()
require := require.New(t)
ctestutils.ExecCompatible(t)
d := NewExecDriver(testlog.HCLogger(t))
harness := drivers.NewDriverHarness(t, d)
task := &drivers.TaskConfig{
ID: uuid.Generate(),
Name: "test",
}
taskConfig := map[string]interface{}{
"command": "/bin/sleep",
"args": []string{"5"},
}
encodeDriverHelper(require, task, taskConfig)
cleanup := harness.MkAllocDir(task, false)
defer cleanup()
handle, _, err := harness.StartTask(task)
require.NoError(err)
ch, err := harness.WaitTask(context.Background(), handle.Config.ID)
require.NoError(err)
var wg sync.WaitGroup
wg.Add(1)
go func() {
defer wg.Done()
result := <-ch
require.Equal(2, result.Signal)
}()
require.NoError(harness.WaitUntilStarted(task.ID, 1*time.Second))
wg.Add(1)
go func() {
defer wg.Done()
err := harness.StopTask(task.ID, 2*time.Second, "SIGINT")
require.NoError(err)
}()
waitCh := make(chan struct{})
go func() {
defer close(waitCh)
wg.Wait()
}()
select {
case <-waitCh:
status, err := harness.InspectTask(task.ID)
require.NoError(err)
require.Equal(drivers.TaskStateExited, status.State)
case <-time.After(1 * time.Second):
require.Fail("timeout waiting for task to shutdown")
}
require.NoError(harness.DestroyTask(task.ID, true))
}
func TestExecDriver_StartWaitRecover(t *testing.T) {
t.Parallel()
require := require.New(t)
ctestutils.ExecCompatible(t)
d := NewExecDriver(testlog.HCLogger(t))
harness := drivers.NewDriverHarness(t, d)
task := &drivers.TaskConfig{
ID: uuid.Generate(),
Name: "test",
}
taskConfig := map[string]interface{}{
"command": "/bin/sleep",
"args": []string{"5"},
}
encodeDriverHelper(require, task, taskConfig)
cleanup := harness.MkAllocDir(task, false)
defer cleanup()
handle, _, err := harness.StartTask(task)
require.NoError(err)
ctx, cancel := context.WithCancel(context.Background())
ch, err := harness.WaitTask(ctx, handle.Config.ID)
require.NoError(err)
var wg sync.WaitGroup
wg.Add(1)
go func() {
defer wg.Done()
result := <-ch
require.Error(result.Err)
}()
require.NoError(harness.WaitUntilStarted(task.ID, 1*time.Second))
cancel()
waitCh := make(chan struct{})
go func() {
defer close(waitCh)
wg.Wait()
}()
select {
case <-waitCh:
status, err := harness.InspectTask(task.ID)
require.NoError(err)
require.Equal(drivers.TaskStateRunning, status.State)
case <-time.After(1 * time.Second):
require.Fail("timeout waiting for task wait to cancel")
}
// Loose task
d.(*ExecDriver).tasks.Delete(task.ID)
_, err = harness.InspectTask(task.ID)
require.Error(err)
require.NoError(harness.RecoverTask(handle))
status, err := harness.InspectTask(task.ID)
require.NoError(err)
require.Equal(drivers.TaskStateRunning, status.State)
require.NoError(harness.StopTask(task.ID, 0, ""))
require.NoError(harness.DestroyTask(task.ID, true))
}
func TestExecDriver_Stats(t *testing.T) {
t.Parallel()
require := require.New(t)
ctestutils.ExecCompatible(t)
d := NewExecDriver(testlog.HCLogger(t))
harness := drivers.NewDriverHarness(t, d)
task := &drivers.TaskConfig{
ID: uuid.Generate(),
Name: "test",
}
taskConfig := map[string]interface{}{
"command": "/bin/sleep",
"args": []string{"5"},
}
encodeDriverHelper(require, task, taskConfig)
cleanup := harness.MkAllocDir(task, false)
defer cleanup()
handle, _, err := harness.StartTask(task)
require.NoError(err)
require.NotNil(handle)
require.NoError(harness.WaitUntilStarted(task.ID, 1*time.Second))
stats, err := harness.TaskStats(task.ID)
require.NoError(err)
require.NotZero(stats.ResourceUsage.MemoryStats.RSS)
require.NoError(harness.DestroyTask(task.ID, true))
}
func TestExecDriver_Start_Wait_AllocDir(t *testing.T) {
t.Parallel()
require := require.New(t)
ctestutils.ExecCompatible(t)
d := NewExecDriver(testlog.HCLogger(t))
harness := drivers.NewDriverHarness(t, d)
task := &drivers.TaskConfig{
ID: uuid.Generate(),
Name: "sleep",
}
cleanup := harness.MkAllocDir(task, false)
defer cleanup()
exp := []byte{'w', 'i', 'n'}
file := "output.txt"
taskConfig := map[string]interface{}{
"command": "/bin/bash",
"args": []string{
"-c",
fmt.Sprintf(`sleep 1; echo -n %s > /alloc/%s`, string(exp), file),
},
}
encodeDriverHelper(require, task, taskConfig)
handle, _, err := harness.StartTask(task)
require.NoError(err)
require.NotNil(handle)
// Task should terminate quickly
waitCh, err := harness.WaitTask(context.Background(), task.ID)
require.NoError(err)
select {
case res := <-waitCh:
require.True(res.Successful(), "task should have exited successfully: %v", res)
case <-time.After(time.Duration(testutil.TestMultiplier()*5) * time.Second):
require.Fail("timeout waiting for task")
}
// Check that data was written to the shared alloc directory.
outputFile := filepath.Join(task.TaskDir().SharedAllocDir, file)
act, err := ioutil.ReadFile(outputFile)
require.NoError(err)
require.Exactly(exp, act)
require.NoError(harness.DestroyTask(task.ID, true))
}
func TestExecDriver_User(t *testing.T) {
t.Parallel()
require := require.New(t)
ctestutils.ExecCompatible(t)
d := NewExecDriver(testlog.HCLogger(t))
harness := drivers.NewDriverHarness(t, d)
task := &drivers.TaskConfig{
ID: uuid.Generate(),
Name: "sleep",
User: "alice",
}
cleanup := harness.MkAllocDir(task, false)
defer cleanup()
taskConfig := map[string]interface{}{
"command": "/bin/sleep",
"args": []string{"100"},
}
encodeDriverHelper(require, task, taskConfig)
handle, _, err := harness.StartTask(task)
require.Error(err)
require.Nil(handle)
msg := "user alice"
if !strings.Contains(err.Error(), msg) {
t.Fatalf("Expecting '%v' in '%v'", msg, err)
}
}
// TestExecDriver_HandlerExec ensures the exec driver's handle properly
// executes commands inside the container.
func TestExecDriver_HandlerExec(t *testing.T) {
t.Parallel()
require := require.New(t)
ctestutils.ExecCompatible(t)
d := NewExecDriver(testlog.HCLogger(t))
harness := drivers.NewDriverHarness(t, d)
task := &drivers.TaskConfig{
ID: uuid.Generate(),
Name: "sleep",
}
cleanup := harness.MkAllocDir(task, false)
defer cleanup()
taskConfig := map[string]interface{}{
"command": "/bin/sleep",
"args": []string{"9000"},
}
encodeDriverHelper(require, task, taskConfig)
handle, _, err := harness.StartTask(task)
require.NoError(err)
require.NotNil(handle)
// Exec a command that should work and dump the environment
// TODO: enable section when exec env is fully loaded
/*res, err := harness.ExecTask(task.ID, []string{"/bin/sh", "-c", "env | grep ^NOMAD"}, time.Second)
require.NoError(err)
require.True(res.ExitResult.Successful())
// Assert exec'd commands are run in a task-like environment
scriptEnv := make(map[string]string)
for _, line := range strings.Split(string(res.Stdout), "\n") {
if line == "" {
continue
}
parts := strings.SplitN(string(line), "=", 2)
if len(parts) != 2 {
t.Fatalf("Invalid env var: %q", line)
}
scriptEnv[parts[0]] = parts[1]
}
if v, ok := scriptEnv["NOMAD_SECRETS_DIR"]; !ok || v != "/secrets" {
t.Errorf("Expected NOMAD_SECRETS_DIR=/secrets but found=%t value=%q", ok, v)
}*/
// Assert cgroup membership
res, err := harness.ExecTask(task.ID, []string{"/bin/cat", "/proc/self/cgroup"}, time.Second)
require.NoError(err)
require.True(res.ExitResult.Successful())
found := false
for _, line := range strings.Split(string(res.Stdout), "\n") {
// Every cgroup entry should be /nomad/$ALLOC_ID
if line == "" {
continue
}
// Skip systemd cgroup
if strings.HasPrefix(line, "1:name=systemd") {
continue
}
if !strings.Contains(line, ":/nomad/") {
t.Errorf("Not a member of the alloc's cgroup: expected=...:/nomad/... -- found=%q", line)
continue
}
found = true
}
require.True(found, "exec'd command isn't in the task's cgroup")
// Exec a command that should fail
res, err = harness.ExecTask(task.ID, []string{"/usr/bin/stat", "lkjhdsaflkjshowaisxmcvnlia"}, time.Second)
require.NoError(err)
require.False(res.ExitResult.Successful())
if expected := "No such file or directory"; !bytes.Contains(res.Stdout, []byte(expected)) {
t.Fatalf("expected output to contain %q but found: %q", expected, res.Stdout)
}
require.NoError(harness.DestroyTask(task.ID, true))
}
func encodeDriverHelper(require *require.Assertions, task *drivers.TaskConfig, taskConfig map[string]interface{}) {
evalCtx := &hcl.EvalContext{
Functions: shared.GetStdlibFuncs(),
}
spec, diag := hclspec.Convert(taskConfigSpec)
require.False(diag.HasErrors())
taskConfigCtyVal, diag := shared.ParseHclInterface(taskConfig, spec, evalCtx)
require.False(diag.HasErrors())
err := task.EncodeDriverConfig(taskConfigCtyVal)
require.Nil(err)
}

52
drivers/exec/handle.go Normal file
View file

@ -0,0 +1,52 @@
package exec
import (
"sync"
"time"
hclog "github.com/hashicorp/go-hclog"
plugin "github.com/hashicorp/go-plugin"
"github.com/hashicorp/nomad/client/driver/executor"
"github.com/hashicorp/nomad/plugins/drivers"
)
type execTaskHandle struct {
exec executor.Executor
pid int
pluginClient *plugin.Client
logger hclog.Logger
// stateLock syncs access to all fields below
stateLock sync.RWMutex
task *drivers.TaskConfig
procState drivers.TaskState
startedAt time.Time
completedAt time.Time
exitResult *drivers.ExitResult
exitCh chan struct{}
}
func (h *execTaskHandle) IsRunning() bool {
return h.procState == drivers.TaskStateRunning
}
func (h *execTaskHandle) run() {
defer close(h.exitCh)
if h.exitResult == nil {
h.exitResult = &drivers.ExitResult{}
}
ps, err := h.exec.Wait()
if err != nil {
h.exitResult.Err = err
h.procState = drivers.TaskStateUnknown
h.completedAt = time.Now()
return
}
h.procState = drivers.TaskStateExited
h.exitResult.ExitCode = ps.ExitCode
h.exitResult.Signal = ps.Signal
h.completedAt = ps.Time
// TODO: plumb OOM bool
}

33
drivers/exec/state.go Normal file
View file

@ -0,0 +1,33 @@
package exec
import (
"sync"
)
type taskStore struct {
store map[string]*execTaskHandle
lock sync.RWMutex
}
func newTaskStore() *taskStore {
return &taskStore{store: map[string]*execTaskHandle{}}
}
func (ts *taskStore) Set(id string, handle *execTaskHandle) {
ts.lock.Lock()
defer ts.lock.Unlock()
ts.store[id] = handle
}
func (ts *taskStore) Get(id string) (*execTaskHandle, bool) {
ts.lock.RLock()
defer ts.lock.RUnlock()
t, ok := ts.store[id]
return t, ok
}
func (ts *taskStore) Delete(id string) {
ts.lock.Lock()
defer ts.lock.Unlock()
delete(ts.store, id)
}

View file

@ -1,6 +1,7 @@
package drivers package drivers
import ( import (
"fmt"
"io" "io"
"golang.org/x/net/context" "golang.org/x/net/context"
@ -125,7 +126,19 @@ func (b *driverPluginServer) WaitTask(ctx context.Context, req *proto.WaitTaskRe
return nil, err return nil, err
} }
result := <-ch var ok bool
var result *ExitResult
select {
case <-ctx.Done():
return nil, ctx.Err()
case result, ok = <-ch:
if !ok {
return &proto.WaitTaskResponse{
Err: "channel closed",
}, nil
}
}
var errStr string var errStr string
if result.Err != nil { if result.Err != nil {
errStr = result.Err.Error() errStr = result.Err.Error()
@ -206,7 +219,7 @@ func (b *driverPluginServer) TaskStats(ctx context.Context, req *proto.TaskStats
pb, err := taskStatsToProto(stats) pb, err := taskStatsToProto(stats)
if err != nil { if err != nil {
return nil, err return nil, fmt.Errorf("failed to encode task stats: %v", err)
} }
resp := &proto.TaskStatsResponse{ resp := &proto.TaskStatsResponse{

View file

@ -14,6 +14,7 @@ import (
hclog "github.com/hashicorp/go-hclog" hclog "github.com/hashicorp/go-hclog"
plugin "github.com/hashicorp/go-plugin" plugin "github.com/hashicorp/go-plugin"
"github.com/hashicorp/nomad/client/allocdir" "github.com/hashicorp/nomad/client/allocdir"
"github.com/hashicorp/nomad/client/config"
"github.com/hashicorp/nomad/client/logmon" "github.com/hashicorp/nomad/client/logmon"
cstructs "github.com/hashicorp/nomad/client/structs" cstructs "github.com/hashicorp/nomad/client/structs"
"github.com/hashicorp/nomad/helper/testlog" "github.com/hashicorp/nomad/helper/testlog"
@ -55,6 +56,7 @@ func NewDriverHarness(t testing.T, d DriverPlugin) *DriverHarness {
server: server, server: server,
DriverPlugin: dClient, DriverPlugin: dClient,
logger: logger, logger: logger,
t: t,
} }
raw, err = client.Dispense("logmon") raw, err = client.Dispense("logmon")
@ -84,7 +86,16 @@ func (h *DriverHarness) MkAllocDir(t *TaskConfig, enableLogs bool) func() {
allocDir := allocdir.NewAllocDir(h.logger, dir) allocDir := allocdir.NewAllocDir(h.logger, dir)
require.NoError(h.t, allocDir.Build()) require.NoError(h.t, allocDir.Build())
taskDir := allocDir.NewTaskDir(t.Name) taskDir := allocDir.NewTaskDir(t.Name)
require.NoError(h.t, taskDir.Build(false, nil, 0))
caps, err := h.Capabilities()
require.NoError(h.t, err)
var entries map[string]string
fsi := caps.FSIsolation
if fsi == cstructs.FSIsolationChroot {
entries = config.DefaultChrootEnv
}
require.NoError(h.t, taskDir.Build(false, entries, fsi))
//logmon //logmon
if enableLogs { if enableLogs {

View file

@ -263,7 +263,7 @@ func taskStatusFromProto(pb *proto.TaskStatus) (*TaskStatus, error) {
} }
func taskStatsToProto(stats *cstructs.TaskResourceUsage) (*proto.TaskStats, error) { func taskStatsToProto(stats *cstructs.TaskResourceUsage) (*proto.TaskStats, error) {
timestamp, err := ptypes.TimestampProto(time.Unix(stats.Timestamp, 0)) timestamp, err := ptypes.TimestampProto(time.Unix(0, stats.Timestamp))
if err != nil { if err != nil {
return nil, err return nil, err
} }