From 5617f3615b7f56308830c4a0a342a10411bf932c Mon Sep 17 00:00:00 2001 From: Nick Ethier Date: Wed, 19 Sep 2018 15:56:50 -0400 Subject: [PATCH] driver/raw_exec: initial raw_exec implementation --- plugins/drivers/raw_exec/driver.go | 515 ++++++++++++++++++++++++ plugins/drivers/raw_exec/driver_test.go | 37 ++ plugins/drivers/utils/eventer.go | 101 +++++ plugins/drivers/utils/eventer_test.go | 72 ++++ 4 files changed, 725 insertions(+) create mode 100644 plugins/drivers/raw_exec/driver.go create mode 100644 plugins/drivers/raw_exec/driver_test.go create mode 100644 plugins/drivers/utils/eventer.go create mode 100644 plugins/drivers/utils/eventer_test.go diff --git a/plugins/drivers/raw_exec/driver.go b/plugins/drivers/raw_exec/driver.go new file mode 100644 index 000000000..dabf6f8cc --- /dev/null +++ b/plugins/drivers/raw_exec/driver.go @@ -0,0 +1,515 @@ +package raw_exec + +import ( + "fmt" + "os" + "path/filepath" + "strconv" + "sync" + "time" + + "github.com/hashicorp/consul-template/signals" + hclog "github.com/hashicorp/go-hclog" + plugin "github.com/hashicorp/go-plugin" + "github.com/hashicorp/nomad/client/driver/executor" + dstructs "github.com/hashicorp/nomad/client/driver/structs" + bbase "github.com/hashicorp/nomad/plugins/base" + "github.com/hashicorp/nomad/plugins/drivers/base" + "github.com/hashicorp/nomad/plugins/drivers/utils" + "github.com/hashicorp/nomad/plugins/shared/hclspec" + "golang.org/x/net/context" +) + +const ( + // pluginName is the name of the plugin + pluginName = "raw_exec" + + // fingerprintPeriod is the interval at which the driver will send fingerprint responses + fingerprintPeriod = 30 * time.Second +) + +var ( + pluginInfo = &bbase.PluginInfoResponse{ + Type: bbase.PluginTypeDriver, + PluginApiVersion: "0.0.1", + PluginVersion: "0.1.0", + Name: pluginName, + } + + configSpec = hclspec.NewObject(map[string]*hclspec.Spec{ + "enabled": hclspec.NewDefault( + hclspec.NewAttr("enabled", "bool", false), + hclspec.NewLiteral("false"), + ), + "no_cgroups": hclspec.NewDefault( + hclspec.NewAttr("no_cgroups", "bool", false), + hclspec.NewLiteral("false"), + ), + }) + + taskConfigSpec = hclspec.NewObject(map[string]*hclspec.Spec{ + "command": hclspec.NewAttr("command", "string", true), + "args": hclspec.NewAttr("command", "list(string)", false), + }) + + capabilities = &base.Capabilities{ + SendSignals: true, + Exec: true, + FSIsolation: base.FSIsolationNone, + } +) + +// The RawExecDriver is a privileged version of the exec driver. It provides no +// resource isolation and just fork/execs. The Exec driver should be preferred +// and this should only be used when explicitly needed. +type RawExecDriver struct { + *utils.Eventer + config *Config + tasks *taskStore + + // fingerprintCh is a channel which other funcs can send fingerprints to + // that will immediatly be sent + fingerprintCh chan *base.Fingerprint + + stopCh chan struct{} + + logger hclog.Logger +} + +type Config struct { + // noCgroups tracks whether we should use a cgroup to manage the process + // tree + noCgroups bool `codec:"no_cgroups"` + + // enabled is set to true to enable the raw_exec driver + enabled bool `codec:"enabled"` +} + +type TaskConfig struct { + Command string `codec:"command"` + Args []string `codec:"args"` +} + +func NewRawExecDriver(logger hclog.Logger) base.DriverPlugin { + stopCh := make(chan struct{}) + return &RawExecDriver{ + Eventer: utils.NewEventer(stopCh), + config: &Config{}, + tasks: newTaskStore(), + fingerprintCh: make(chan *base.Fingerprint), + stopCh: stopCh, + logger: logger.Named(pluginName), + } +} + +func (r *RawExecDriver) PluginInfo() (*bbase.PluginInfoResponse, error) { + return pluginInfo, nil +} + +func (r *RawExecDriver) ConfigSchema() (*hclspec.Spec, error) { + return configSpec, nil +} + +func (r *RawExecDriver) SetConfig(data []byte) error { + var config Config + if err := bbase.MsgPackDecode(data, &config); err != nil { + return err + } + + r.config = &config + go r.fingerprintNow() + return nil +} + +func (r *RawExecDriver) Shutdown(ctx context.Context) error { + close(r.stopCh) + return nil +} + +func (r *RawExecDriver) TaskConfigSchema() (*hclspec.Spec, error) { + return taskConfigSpec, nil +} + +func (r *RawExecDriver) Capabilities() (*base.Capabilities, error) { + return capabilities, nil +} + +func (r *RawExecDriver) Fingerprint(ctx context.Context) (<-chan *base.Fingerprint, error) { + ch := make(chan *base.Fingerprint) + go r.fingerprint(ctx, ch) + return r.fingerprintCh, nil +} + +func (r *RawExecDriver) fingerprint(ctx context.Context, ch chan *base.Fingerprint) { + defer close(r.fingerprintCh) + + ticker := time.NewTimer(0) + for { + select { + case <-ctx.Done(): + return + case <-ticker.C: + ticker.Reset(fingerprintPeriod) + go r.fingerprintNow() + case f := <-r.fingerprintCh: + ch <- f + } + } +} + +func (r *RawExecDriver) fingerprintNow() { + if r.fingerprintCh == nil { + r.logger.Debug("fingerprint channel was nil, skipping fingerprint") + return + } + var health base.HealthState + var desc string + if r.config.enabled { + health = base.HealthStateHealthy + desc = "raw_exec enabled" + } else { + health = base.HealthStateUnhealthy + desc = "raw_exec not enabled" + } + r.fingerprintCh <- &base.Fingerprint{ + Attributes: map[string]string{}, + Health: health, + HealthDescription: desc, + } +} + +func (r *RawExecDriver) RecoverTask(handle *base.TaskHandle) error { + var taskState RawExecTaskState + + err := handle.GetDriverState(&taskState) + if err != nil { + return err + } + + pluginConfig := &plugin.ClientConfig{ + Reattach: taskState.ReattachConfig, + } + + exec, pluginClient, err := utils.CreateExecutorWithConfig(pluginConfig, os.Stderr) + if err != nil { + return err + } + + h := &rawExecTaskHandle{ + exec: exec, + pid: taskState.Pid, + pluginClient: pluginClient, + task: taskState.TaskConfig, + procState: base.TaskStateRunning, + startedAt: taskState.StartedAt, + } + + r.tasks.Set(taskState.TaskConfig.ID, h) + + go h.run() + return nil +} + +func (r *RawExecDriver) StartTask(cfg *base.TaskConfig) (*base.TaskHandle, error) { + if _, ok := r.tasks.Get(cfg.ID); ok { + return nil, fmt.Errorf("task with ID '%s' already started", cfg.ID) + } + + var driverConfig TaskConfig + if err := cfg.DecodeDriverConfig(&driverConfig); err != nil { + return nil, err + } + + handle := base.NewTaskHandle(pluginName) + handle.Config = cfg + + // Get the command to be ran + command := driverConfig.Command + if err := utils.ValidateCommand(command, "args"); err != nil { + return nil, err + } + + pluginLogFile := filepath.Join(cfg.TaskDir().Dir, "executor.out") + executorConfig := &dstructs.ExecutorConfig{ + LogFile: pluginLogFile, + LogLevel: "debug", + } + + exec, pluginClient, err := utils.CreateExecutor(os.Stderr, hclog.Debug, 14000, 14512, executorConfig) + if err != nil { + return nil, err + } + + execCmd := &executor.ExecCommand{ + Cmd: driverConfig.Command, + Args: driverConfig.Args, + Env: cfg.EnvList(), + User: cfg.User, + //TaskKillSignal: os.Interrupt, + BasicProcessCgroup: !r.config.noCgroups, + } + + ps, err := exec.Launch(execCmd) + if err != nil { + pluginClient.Kill() + return nil, err + } + + h := &rawExecTaskHandle{ + exec: exec, + pid: ps.Pid, + pluginClient: pluginClient, + task: cfg, + procState: base.TaskStateRunning, + startedAt: time.Now(), + } + + r.tasks.Set(cfg.ID, h) + + handle.SetDriverState(RawExecTaskState{ + ReattachConfig: pluginClient.ReattachConfig(), + Pid: ps.Pid, + TaskConfig: cfg, + StartedAt: h.startedAt, + }) + + go h.run() + return handle, nil +} + +func (r *RawExecDriver) WaitTask(ctx context.Context, taskID string) (<-chan *base.ExitResult, error) { + ch := make(chan *base.ExitResult) + handle, ok := r.tasks.Get(taskID) + if !ok { + return nil, base.ErrTaskNotFound + } + go r.handleWait(ctx, handle, ch) + + return ch, nil +} + +func (r *RawExecDriver) handleWait(ctx context.Context, handle *rawExecTaskHandle, ch chan *base.ExitResult) { + defer close(ch) + var result *base.ExitResult + ps, err := handle.exec.Wait() + if err != nil { + result = &base.ExitResult{ + Err: fmt.Errorf("executor: error waiting on process: %v", err), + } + } else { + result = &base.ExitResult{ + ExitCode: ps.ExitCode, + Signal: ps.Signal, + } + } + + select { + case <-ctx.Done(): + return + case ch <- result: + } +} + +func (r *RawExecDriver) StopTask(taskID string, timeout time.Duration, signal string) error { + /* + handle, ok := r.tasks.Get(taskID) + if !ok { + return base.ErrTaskNotFound + } + + //TODO executor only supports shutting down with the initial signal provided + if err := handle.exec.Shutdown(); err != nil { + if handle.pluginClient.Exited() { + return nil + } + return fmt.Errorf("executor Shutdown failed: %v", err) + } + + select { + case <-d.WaitTask(taskID): + return nil + case <-time.After(timeout): + if handle.pluginClient.Exited() { + return nil + } + if err := handle.exec.Exit(); err != nil { + return fmt.Errorf("executor Exit failed: %v", err) + } + return nil + }*/ + panic("not implemented") +} + +func (r *RawExecDriver) DestroyTask(taskID string, force bool) error { + handle, ok := r.tasks.Get(taskID) + if !ok { + return base.ErrTaskNotFound + } + + if !handle.pluginClient.Exited() { + if err := handle.exec.Destroy(); err != nil { + handle.logger.Error("destroying executor failed", "err", err) + } + } + + handle.pluginClient.Kill() + r.tasks.Delete(taskID) + return nil +} + +func (r *RawExecDriver) InspectTask(taskID string) (*base.TaskStatus, error) { + handle, ok := r.tasks.Get(taskID) + if !ok { + return nil, base.ErrTaskNotFound + } + + status := &base.TaskStatus{ + ID: handle.task.ID, + Name: handle.task.Name, + State: handle.procState, + SizeOnDiskMB: 0, + StartedAt: handle.startedAt, + CompletedAt: handle.completedAt, + ExitResult: handle.exitResult, + DriverAttributes: map[string]string{ + "pid": strconv.Itoa(handle.pid), + }, + NetworkOverride: &base.NetworkOverride{}, + } + + return status, nil +} + +func (r *RawExecDriver) TaskStats(taskID string) (*base.TaskStats, error) { + handle, ok := r.tasks.Get(taskID) + if !ok { + return nil, base.ErrTaskNotFound + } + + stats, err := handle.exec.Stats() + if err != nil { + return nil, err + } + + return &base.TaskStats{ + ID: handle.task.ID, + Timestamp: stats.Timestamp, + AggResourceUsage: stats.ResourceUsage, + ResourceUsageByPid: stats.Pids, + }, nil +} + +func (r *RawExecDriver) SignalTask(taskID string, signal string) error { + handle, ok := r.tasks.Get(taskID) + if !ok { + return base.ErrTaskNotFound + } + + sig := os.Interrupt + if s, ok := signals.SignalLookup[signal]; ok { + sig = s + } + return handle.exec.Signal(sig) +} + +func (r *RawExecDriver) ExecTask(taskID string, cmd []string, timeout time.Duration) (*base.ExecTaskResult, error) { + if len(cmd) == 0 { + return nil, fmt.Errorf("error cmd must have atleast one value") + } + handle, ok := r.tasks.Get(taskID) + if !ok { + return nil, base.ErrTaskNotFound + } + + args := []string{} + if len(cmd) > 1 { + args = cmd[1:] + } + + out, exitCode, err := handle.exec.Exec(time.Now().Add(timeout), cmd[0], args) + if err != nil { + return nil, err + } + + return &base.ExecTaskResult{ + Stdout: out, + ExitResult: &base.ExitResult{ + ExitCode: exitCode, + }, + }, nil +} + +type taskStore struct { + store map[string]*rawExecTaskHandle + lock sync.RWMutex +} + +func newTaskStore() *taskStore { + return &taskStore{store: map[string]*rawExecTaskHandle{}} +} + +func (ts *taskStore) Set(id string, handle *rawExecTaskHandle) { + ts.lock.Lock() + defer ts.lock.Unlock() + ts.store[id] = handle +} + +func (ts *taskStore) Get(id string) (*rawExecTaskHandle, bool) { + ts.lock.RLock() + defer ts.lock.RUnlock() + t, ok := ts.store[id] + return t, ok +} + +func (ts *taskStore) Delete(id string) { + ts.lock.Lock() + defer ts.lock.Unlock() + delete(ts.store, id) +} + +func (ts *taskStore) Range(f func(id string, handle *rawExecTaskHandle) bool) { + ts.lock.RLock() + defer ts.lock.RUnlock() + for k, v := range ts.store { + if f(k, v) { + break + } + } +} + +type RawExecTaskState struct { + ReattachConfig *plugin.ReattachConfig + TaskConfig *base.TaskConfig + Pid int + StartedAt time.Time +} + +type rawExecTaskHandle struct { + exec executor.Executor + pid int + pluginClient *plugin.Client + task *base.TaskConfig + procState base.TaskState + startedAt time.Time + completedAt time.Time + exitResult *base.ExitResult + logger hclog.Logger +} + +func (h *rawExecTaskHandle) run() { + if h.exitResult == nil { + h.exitResult = &base.ExitResult{} + } + + ps, err := h.exec.Wait() + if err != nil { + h.exitResult.Err = err + h.procState = base.TaskStateUnknown + h.completedAt = time.Now() + return + } + h.procState = base.TaskStateExited + h.exitResult.ExitCode = ps.ExitCode + h.exitResult.Signal = ps.Signal + h.completedAt = ps.Time +} diff --git a/plugins/drivers/raw_exec/driver_test.go b/plugins/drivers/raw_exec/driver_test.go new file mode 100644 index 000000000..7d1a026a4 --- /dev/null +++ b/plugins/drivers/raw_exec/driver_test.go @@ -0,0 +1,37 @@ +package raw_exec + +import ( + "context" + "testing" + + "github.com/hashicorp/nomad/helper/testlog" + "github.com/hashicorp/nomad/helper/uuid" + "github.com/hashicorp/nomad/plugins/drivers/base" + "github.com/stretchr/testify/require" +) + +func TestDriverStartTask(t *testing.T) { + t.Parallel() + require := require.New(t) + + d := NewRawExecDriver(testlog.HCLogger(t)) + harness := base.NewDriverHarness(t, d) + task := &base.TaskConfig{ + ID: uuid.Generate(), + Name: "test", + } + task.EncodeDriverConfig(&TaskConfig{ + Command: "go", + Args: []string{"version"}, + }) + cleanup := harness.MkAllocDir(task) + defer cleanup() + + handle, err := harness.StartTask(task) + require.NoError(err) + + ch, err := harness.WaitTask(context.Background(), handle.Config.ID) + require.NoError(err) + result := <-ch + require.Zero(result.ExitCode) +} diff --git a/plugins/drivers/utils/eventer.go b/plugins/drivers/utils/eventer.go new file mode 100644 index 000000000..6de4209e5 --- /dev/null +++ b/plugins/drivers/utils/eventer.go @@ -0,0 +1,101 @@ +package utils + +import ( + "sync" + "time" + + "github.com/hashicorp/nomad/plugins/drivers/base" + "golang.org/x/net/context" +) + +var ( + //DefaultSendEventTimeout is the timeout used when publishing events to consumers + DefaultSendEventTimeout = 2 * time.Second +) + +// Eventer is a utility to control broadcast of TaskEvents to multiple consumers. +// It also implements the TaskStats func in the DriverPlugin interface so that +// it can be embedded in a implementing driver struct. +type Eventer struct { + sync.RWMutex + + // events is a channel were events to be broadcasted are sent + events chan *base.TaskEvent + + // streamers is a slice of consumers to broadcast events to + // access is gaurded by RWMutex + streamers []*eventStreamer + + // stop chan to allow control of event loop shutdown + stop chan struct{} +} + +// NewEventer returns an Eventer with a running event loop that can be stopped +// by closing the given stop channel +func NewEventer(stop chan struct{}) *Eventer { + e := &Eventer{ + events: make(chan *base.TaskEvent), + stop: stop, + } + go e.eventLoop() + return e +} + +// eventLoop is the main logic which pulls events from the channel and broadcasts +// them to all consumers +func (e *Eventer) eventLoop() { + for { + select { + case <-e.stop: + for _, stream := range e.streamers { + close(stream.ch) + } + return + case event := <-e.events: + e.RLock() + for _, stream := range e.streamers { + stream.send(event) + } + e.RUnlock() + } + } +} + +type eventStreamer struct { + timeout time.Duration + ctx context.Context + ch chan *base.TaskEvent +} + +func (s *eventStreamer) send(event *base.TaskEvent) { + select { + case <-time.After(s.timeout): + case <-s.ctx.Done(): + case s.ch <- event: + } +} + +func (e *Eventer) newStream(ctx context.Context) <-chan *base.TaskEvent { + e.Lock() + defer e.Unlock() + + stream := &eventStreamer{ + ch: make(chan *base.TaskEvent), + ctx: ctx, + timeout: DefaultSendEventTimeout, + } + e.streamers = append(e.streamers, stream) + + return stream.ch +} + +// TaskEvents is an implementation of the DriverPlugin.TaskEvents function +func (e *Eventer) TaskEvents(ctx context.Context) (<-chan *base.TaskEvent, error) { + stream := e.newStream(ctx) + return stream, nil +} + +// EmitEvent can be used to broadcast a new event +func (e *Eventer) EmitEvent(event *base.TaskEvent) { + e.events <- event +} diff --git a/plugins/drivers/utils/eventer_test.go b/plugins/drivers/utils/eventer_test.go new file mode 100644 index 000000000..93ab7f301 --- /dev/null +++ b/plugins/drivers/utils/eventer_test.go @@ -0,0 +1,72 @@ +package utils + +import ( + "context" + "sync" + "testing" + "time" + + "github.com/hashicorp/nomad/plugins/drivers/base" + "github.com/stretchr/testify/require" +) + +func TestEventer(t *testing.T) { + t.Parallel() + require := require.New(t) + + stop := make(chan struct{}) + e := NewEventer(stop) + + events := []*base.TaskEvent{ + { + TaskID: "a", + Timestamp: time.Now(), + }, + { + TaskID: "b", + Timestamp: time.Now(), + }, + { + TaskID: "c", + Timestamp: time.Now(), + }, + } + + consumer1, err := e.TaskEvents(context.Background()) + require.NoError(err) + consumer2, err := e.TaskEvents(context.Background()) + require.NoError(err) + + var buffer1, buffer2 []*base.TaskEvent + var wg sync.WaitGroup + wg.Add(2) + go func() { + defer wg.Done() + for { + event, ok := <-consumer1 + if !ok { + return + } + buffer1 = append(buffer1, event) + } + }() + go func() { + defer wg.Done() + for { + event, ok := <-consumer2 + if !ok { + return + } + buffer2 = append(buffer2, event) + } + }() + + for _, event := range events { + e.EmitEvent(event) + } + + close(stop) + wg.Wait() + require.Exactly(events, buffer1) + require.Exactly(events, buffer2) +}