drivers/rawexec: PR comments and feedback

This commit is contained in:
Nick Ethier 2018-09-29 23:05:14 -04:00 committed by Michael Schurter
parent 5742a6b932
commit 8cf32eb9d2
6 changed files with 172 additions and 135 deletions

View file

@ -1,11 +1,10 @@
package raw_exec
package rawexec
import (
"fmt"
"os"
"path/filepath"
"strconv"
"sync"
"time"
"github.com/hashicorp/consul-template/signals"
@ -29,6 +28,7 @@ const (
)
var (
// pluginInfo is the response returned for the PluginInfo RPC
pluginInfo = &base.PluginInfoResponse{
Type: base.PluginTypeDriver,
PluginApiVersion: "0.0.1",
@ -36,6 +36,7 @@ var (
Name: pluginName,
}
// configSpec is the hcl specification returned by the ConfigSchema RPC
configSpec = hclspec.NewObject(map[string]*hclspec.Spec{
"enabled": hclspec.NewDefault(
hclspec.NewAttr("enabled", "bool", false),
@ -47,11 +48,15 @@ var (
),
})
// taskConfigSpec is the hcl specification for the driver config section of
// a task within a job. It is returned in the TaskConfigSchema RPC
taskConfigSpec = hclspec.NewObject(map[string]*hclspec.Spec{
"command": hclspec.NewAttr("command", "string", true),
"args": hclspec.NewAttr("command", "list(string)", false),
})
// capabilities is returned by the Capabilities RPC and indicates what
// optional features this driver supports
capabilities = &drivers.Capabilities{
SendSignals: true,
Exec: true,
@ -59,23 +64,34 @@ var (
}
)
// The RawExecDriver is a privileged version of the exec driver. It provides no
// RawExecDriver is a privileged version of the exec driver. It provides no
// resource isolation and just fork/execs. The Exec driver should be preferred
// and this should only be used when explicitly needed.
type RawExecDriver struct {
*utils.Eventer
// eventer is used to handle multiplexing of TaskEvents calls such that an
// event can be broadcast to all callers
eventer *utils.Eventer
// config is the driver configuration set by the SetConfig RPC
config *Config
tasks *taskStore
// fingerprintCh is a channel which other funcs can send fingerprints to
// that will immediately be sent
fingerprintCh chan *drivers.Fingerprint
// tasks is the in memory datastore mapping taskIDs to rawExecDriverHandles
tasks *taskStore
stopCh chan struct{}
// ctx is the context for the driver. It is passed to other subsystems to
// coordinate shutdown
ctx context.Context
// signalShutdown is called when the driver is shutting down and cancels the
// ctx passed to any subsystems
signalShutdown context.CancelFunc
// logger will log to the plugin output which is usually an 'executor.out'
// file located in the root of the TaskDir
logger hclog.Logger
}
// Config is the driver configuration set by the SetConfig RPC call
type Config struct {
// NoCgroups tracks whether we should use a cgroup to manage the process
// tree
@ -85,20 +101,33 @@ type Config struct {
Enabled bool `codec:"enabled"`
}
// TaskConfig is the driver configuration of a task within a job
type TaskConfig struct {
Command string `codec:"command"`
Args []string `codec:"args"`
}
// RawExecTaskState is the state which is encoded in the handle returned in
// StartTask. This information is needed to rebuild the task state and handler
// during recovery.
type RawExecTaskState struct {
ReattachConfig *utils.ReattachConfig
TaskConfig *drivers.TaskConfig
Pid int
StartedAt time.Time
}
// NewRawExecDriver returns a new DriverPlugin implementation
func NewRawExecDriver(logger hclog.Logger) drivers.DriverPlugin {
stopCh := make(chan struct{})
ctx, cancel := context.WithCancel(context.Background())
logger = logger.Named(pluginName)
return &RawExecDriver{
Eventer: utils.NewEventer(stopCh),
config: &Config{},
tasks: newTaskStore(),
fingerprintCh: make(chan *drivers.Fingerprint),
stopCh: stopCh,
logger: logger.Named(pluginName),
eventer: utils.NewEventer(ctx, logger),
config: &Config{},
tasks: newTaskStore(),
ctx: ctx,
signalShutdown: cancel,
logger: logger,
}
}
@ -117,12 +146,11 @@ func (r *RawExecDriver) SetConfig(data []byte) error {
}
r.config = &config
go r.fingerprintNow()
return nil
}
func (r *RawExecDriver) Shutdown(ctx context.Context) error {
close(r.stopCh)
r.signalShutdown()
return nil
}
@ -136,32 +164,26 @@ func (r *RawExecDriver) Capabilities() (*drivers.Capabilities, error) {
func (r *RawExecDriver) Fingerprint(ctx context.Context) (<-chan *drivers.Fingerprint, error) {
ch := make(chan *drivers.Fingerprint)
go r.fingerprint(ctx, ch)
go r.handleFingerprint(ctx, ch)
return ch, nil
}
func (r *RawExecDriver) fingerprint(ctx context.Context, ch chan *drivers.Fingerprint) {
defer close(r.fingerprintCh)
func (r *RawExecDriver) handleFingerprint(ctx context.Context, ch chan *drivers.Fingerprint) {
ticker := time.NewTimer(0)
for {
select {
case <-ctx.Done():
return
case <-r.ctx.Done():
return
case <-ticker.C:
ticker.Reset(fingerprintPeriod)
go r.fingerprintNow()
case f := <-r.fingerprintCh:
ch <- f
ch <- r.buildFingerprint()
}
}
}
func (r *RawExecDriver) fingerprintNow() {
if r.fingerprintCh == nil {
r.logger.Debug("fingerprint channel was nil, skipping fingerprint")
return
}
func (r *RawExecDriver) buildFingerprint() *drivers.Fingerprint {
var health drivers.HealthState
var desc string
attrs := map[string]string{}
@ -173,7 +195,8 @@ func (r *RawExecDriver) fingerprintNow() {
health = drivers.HealthStateUndetected
desc = "raw_exec disabled"
}
r.fingerprintCh <- &drivers.Fingerprint{
return &drivers.Fingerprint{
Attributes: map[string]string{},
Health: health,
HealthDescription: desc,
@ -181,18 +204,20 @@ func (r *RawExecDriver) fingerprintNow() {
}
func (r *RawExecDriver) RecoverTask(handle *drivers.TaskHandle) error {
var taskState RawExecTaskState
if handle == nil {
return fmt.Errorf("error: handle cannot be nil")
}
err := handle.GetDriverState(&taskState)
if err != nil {
r.logger.Error("failed to recover task", "error", err, "task_id", handle.Config.ID)
return err
var taskState RawExecTaskState
if err := handle.GetDriverState(&taskState); err != nil {
r.logger.Error("failed to decode task state from handle", "error", err, "task_id", handle.Config.ID)
return fmt.Errorf("failed to decode task state from handle: %v", err)
}
plugRC, err := utils.ReattachConfigToGoPlugin(taskState.ReattachConfig)
if err != nil {
r.logger.Error("failed to recover task", "error", err, "task_id", handle.Config.ID)
return err
r.logger.Error("failed to build ReattachConfig from task state", "error", err, "task_id", handle.Config.ID)
return fmt.Errorf("failed to build ReattachConfig from task state: %v", err)
}
pluginConfig := &plugin.ClientConfig{
@ -201,8 +226,8 @@ func (r *RawExecDriver) RecoverTask(handle *drivers.TaskHandle) error {
exec, pluginClient, err := utils.CreateExecutorWithConfig(pluginConfig, os.Stderr)
if err != nil {
r.logger.Error("failed to recover task", "error", err, "task_id", handle.Config.ID)
return err
r.logger.Error("failed to reattach to executor", "error", err, "task_id", handle.Config.ID)
return fmt.Errorf("failed to reattach to executor: %v", err)
}
h := &rawExecTaskHandle{
@ -223,12 +248,12 @@ func (r *RawExecDriver) RecoverTask(handle *drivers.TaskHandle) error {
func (r *RawExecDriver) StartTask(cfg *drivers.TaskConfig) (*drivers.TaskHandle, error) {
if _, ok := r.tasks.Get(cfg.ID); ok {
return nil, fmt.Errorf("task with ID '%s' already started", cfg.ID)
return nil, fmt.Errorf("task with ID %q already started", cfg.ID)
}
var driverConfig TaskConfig
if err := cfg.DecodeDriverConfig(&driverConfig); err != nil {
return nil, err
return nil, fmt.Errorf("failed to decode driver config: %v", err)
}
handle := drivers.NewTaskHandle(pluginName)
@ -240,9 +265,10 @@ func (r *RawExecDriver) StartTask(cfg *drivers.TaskConfig) (*drivers.TaskHandle,
LogLevel: "debug",
}
// TODO: best way to pass port ranges in from client config
exec, pluginClient, err := utils.CreateExecutor(os.Stderr, hclog.Debug, 14000, 14512, executorConfig)
if err != nil {
return nil, err
return nil, fmt.Errorf("failed to create executor: %v", err)
}
execCmd := &executor.ExecCommand{
@ -259,7 +285,7 @@ func (r *RawExecDriver) StartTask(cfg *drivers.TaskConfig) (*drivers.TaskHandle,
ps, err := exec.Launch(execCmd)
if err != nil {
pluginClient.Kill()
return nil, err
return nil, fmt.Errorf("failed to launch command with executor: %v", err)
}
h := &rawExecTaskHandle{
@ -272,8 +298,6 @@ func (r *RawExecDriver) StartTask(cfg *drivers.TaskConfig) (*drivers.TaskHandle,
logger: r.logger,
}
r.tasks.Set(cfg.ID, h)
driverState := RawExecTaskState{
ReattachConfig: utils.ReattachConfigFromGoPlugin(pluginClient.ReattachConfig()),
Pid: ps.Pid,
@ -285,19 +309,21 @@ func (r *RawExecDriver) StartTask(cfg *drivers.TaskConfig) (*drivers.TaskHandle,
r.logger.Error("failed to start task, error setting driver state", "error", err)
exec.Shutdown("", 0)
pluginClient.Kill()
return nil, err
return nil, fmt.Errorf("failed to set driver state: %v", err)
}
r.tasks.Set(cfg.ID, h)
go h.run()
return handle, nil
}
func (r *RawExecDriver) WaitTask(ctx context.Context, taskID string) (<-chan *drivers.ExitResult, error) {
ch := make(chan *drivers.ExitResult)
handle, ok := r.tasks.Get(taskID)
if !ok {
return nil, drivers.ErrTaskNotFound
}
ch := make(chan *drivers.ExitResult)
go r.handleWait(ctx, handle, ch)
return ch, nil
@ -321,6 +347,8 @@ func (r *RawExecDriver) handleWait(ctx context.Context, handle *rawExecTaskHandl
select {
case <-ctx.Done():
return
case <-r.ctx.Done():
return
case ch <- result:
}
}
@ -384,7 +412,6 @@ func (r *RawExecDriver) InspectTask(taskID string) (*drivers.TaskStatus, error)
DriverAttributes: map[string]string{
"pid": strconv.Itoa(handle.pid),
},
NetworkOverride: &drivers.NetworkOverride{},
}
return status, nil
@ -409,6 +436,10 @@ func (r *RawExecDriver) TaskStats(taskID string) (*drivers.TaskStats, error) {
}, nil
}
func (r *RawExecDriver) TaskEvents(ctx context.Context) (<-chan *drivers.TaskEvent, error) {
return r.eventer.TaskEvents(ctx)
}
func (r *RawExecDriver) SignalTask(taskID string, signal string) error {
handle, ok := r.tasks.Get(taskID)
if !ok {
@ -417,6 +448,7 @@ func (r *RawExecDriver) SignalTask(taskID string, signal string) error {
sig := os.Interrupt
if s, ok := signals.SignalLookup[signal]; ok {
r.logger.Warn("signal to send to task unknown, using SIGINT", "signal", signal)
sig = s
}
return handle.exec.Signal(sig)
@ -448,83 +480,3 @@ func (r *RawExecDriver) ExecTask(taskID string, cmd []string, timeout time.Durat
},
}, nil
}
type taskStore struct {
store map[string]*rawExecTaskHandle
lock sync.RWMutex
}
func newTaskStore() *taskStore {
return &taskStore{store: map[string]*rawExecTaskHandle{}}
}
func (ts *taskStore) Set(id string, handle *rawExecTaskHandle) {
ts.lock.Lock()
defer ts.lock.Unlock()
ts.store[id] = handle
}
func (ts *taskStore) Get(id string) (*rawExecTaskHandle, bool) {
ts.lock.RLock()
defer ts.lock.RUnlock()
t, ok := ts.store[id]
return t, ok
}
func (ts *taskStore) Delete(id string) {
ts.lock.Lock()
defer ts.lock.Unlock()
delete(ts.store, id)
}
type RawExecTaskState struct {
ReattachConfig *utils.ReattachConfig
TaskConfig *drivers.TaskConfig
Pid int
StartedAt time.Time
}
type rawExecTaskHandle struct {
exec executor.Executor
pid int
pluginClient *plugin.Client
logger hclog.Logger
// stateLock syncs access to all fields below
stateLock sync.RWMutex
task *drivers.TaskConfig
procState drivers.TaskState
startedAt time.Time
completedAt time.Time
exitResult *drivers.ExitResult
}
func (h *rawExecTaskHandle) IsRunning() bool {
return h.procState == drivers.TaskStateRunning
}
func (h *rawExecTaskHandle) run() {
// since run is called immediatly after the handle is created this
// ensures the exitResult is initialized so we avoid a nil pointer
// thus it does not need to be included in the lock
if h.exitResult == nil {
h.exitResult = &drivers.ExitResult{}
}
ps, err := h.exec.Wait()
h.stateLock.Lock()
defer h.stateLock.Unlock()
if err != nil {
h.exitResult.Err = err
h.procState = drivers.TaskStateUnknown
h.completedAt = time.Now()
return
}
h.procState = drivers.TaskStateExited
h.exitResult.ExitCode = ps.ExitCode
h.exitResult.Signal = ps.Signal
h.completedAt = ps.Time
}

View file

@ -1,4 +1,4 @@
package raw_exec
package rawexec
import (
"context"

View file

@ -1,6 +1,6 @@
// +build !windows
package raw_exec
package rawexec
import (
"context"

86
drivers/rawexec/state.go Normal file
View file

@ -0,0 +1,86 @@
package rawexec
import (
"sync"
"time"
hclog "github.com/hashicorp/go-hclog"
plugin "github.com/hashicorp/go-plugin"
"github.com/hashicorp/nomad/client/driver/executor"
"github.com/hashicorp/nomad/plugins/drivers"
)
type taskStore struct {
store map[string]*rawExecTaskHandle
lock sync.RWMutex
}
func newTaskStore() *taskStore {
return &taskStore{store: map[string]*rawExecTaskHandle{}}
}
func (ts *taskStore) Set(id string, handle *rawExecTaskHandle) {
ts.lock.Lock()
defer ts.lock.Unlock()
ts.store[id] = handle
}
func (ts *taskStore) Get(id string) (*rawExecTaskHandle, bool) {
ts.lock.RLock()
defer ts.lock.RUnlock()
t, ok := ts.store[id]
return t, ok
}
func (ts *taskStore) Delete(id string) {
ts.lock.Lock()
defer ts.lock.Unlock()
delete(ts.store, id)
}
type rawExecTaskHandle struct {
exec executor.Executor
pid int
pluginClient *plugin.Client
logger hclog.Logger
// stateLock syncs access to all fields below
stateLock sync.RWMutex
task *drivers.TaskConfig
procState drivers.TaskState
startedAt time.Time
completedAt time.Time
exitResult *drivers.ExitResult
}
func (h *rawExecTaskHandle) IsRunning() bool {
return h.procState == drivers.TaskStateRunning
}
func (h *rawExecTaskHandle) run() {
// since run is called immediatly after the handle is created this
// ensures the exitResult is initialized so we avoid a nil pointer
// thus it does not need to be included in the lock
if h.exitResult == nil {
h.exitResult = &drivers.ExitResult{}
}
ps, err := h.exec.Wait()
h.stateLock.Lock()
defer h.stateLock.Unlock()
if err != nil {
h.exitResult.Err = err
h.procState = drivers.TaskStateUnknown
h.completedAt = time.Now()
return
}
h.procState = drivers.TaskStateExited
h.exitResult.ExitCode = ps.ExitCode
h.exitResult.Signal = ps.Signal
h.completedAt = ps.Time
// TODO: detect if the task OOMed
}

View file

@ -2,7 +2,6 @@ package drivers
import (
"github.com/hashicorp/nomad/plugins/base"
"github.com/ugorji/go/codec"
)
// TaskHandle is the state shared between a driver and the client.
@ -21,10 +20,10 @@ func NewTaskHandle(driver string) *TaskHandle {
func (h *TaskHandle) SetDriverState(v interface{}) error {
h.driverState = []byte{}
return codec.NewEncoderBytes(&h.driverState, base.MsgpackHandle).Encode(v)
return base.MsgPackEncode(&h.driverState, v)
}
func (h *TaskHandle) GetDriverState(v interface{}) error {
return codec.NewDecoderBytes(h.driverState, base.MsgpackHandle).Decode(v)
return base.MsgPackDecode(h.driverState, v)
}

View file

@ -17,7 +17,7 @@ type ReattachConfig struct {
}
// ReattachConfigToGoPlugin converts a ReattachConfig wrapper struct into a go
// plugin ReattacConfig struct
// plugin ReattachConfig struct
func ReattachConfigToGoPlugin(rc *ReattachConfig) (*plugin.ReattachConfig, error) {
plug := &plugin.ReattachConfig{
Protocol: plugin.Protocol(rc.Protocol),