From 80c90ef745284585c2e1b53941dca5b6e82826c3 Mon Sep 17 00:00:00 2001 From: Nick Ethier Date: Wed, 16 Jan 2019 21:52:31 -0500 Subject: [PATCH] drivers: use consts for task handle version --- client/state/08types.go | 18 ++++++------------ client/state/upgrade.go | 17 ++++++++++++++++- drivers/docker/driver.go | 6 +++++- drivers/exec/driver.go | 2 +- drivers/exec/driver_pre09.go | 2 +- drivers/java/driver.go | 8 ++++++-- drivers/java/driver_pre09.go | 2 +- drivers/mock/driver.go | 6 +++++- drivers/qemu/driver.go | 8 ++++++-- drivers/qemu/driver_pre09.go | 2 +- drivers/rawexec/driver.go | 6 +++++- drivers/rawexec/driver_pre09.go | 2 +- drivers/rkt/driver.go | 8 ++++++-- drivers/rkt/driver_pre09.go | 2 +- .../shared/executor/legacy_executor_wrapper.go | 7 ++++--- plugins/base/plugin.go | 3 +++ plugins/drivers/driver.go | 11 ++++++++++- plugins/drivers/proto/driver.proto | 3 ++- 18 files changed, 80 insertions(+), 33 deletions(-) diff --git a/client/state/08types.go b/client/state/08types.go index 760539e47..78d4ebd62 100644 --- a/client/state/08types.go +++ b/client/state/08types.go @@ -60,7 +60,7 @@ func (t *taskRunnerHandle08) reattachConfig() *shared.ReattachConfig { } } -func (t *taskRunnerState08) Upgrade(allocID, taskName string) *state.LocalState { +func (t *taskRunnerState08) Upgrade(allocID, taskName string) (*state.LocalState, error) { ls := state.NewLocalState() // Reuse DriverNetwork @@ -76,38 +76,32 @@ func (t *taskRunnerState08) Upgrade(allocID, taskName string) *state.LocalState PrestartDone: t.TaskDirBuilt, } - // Don't need logmon in pre09 tasks - ls.Hooks["logmon"] = &state.HookState{ - PrestartDone: true, - } - // Upgrade dispatch payload state ls.Hooks["dispatch_payload"] = &state.HookState{ PrestartDone: t.PayloadRendered, } - // Add nessicary fields to TaskConfig - ls.TaskHandle = drivers.NewTaskHandle(0) + // Add necessary fields to TaskConfig + ls.TaskHandle = drivers.NewTaskHandle(drivers.Pre09TaskHandleVersion) ls.TaskHandle.Config = &drivers.TaskConfig{ Name: taskName, AllocID: allocID, } - //TODO do we need to se this accurately? Or will RecoverTask handle it? ls.TaskHandle.State = drivers.TaskStateUnknown // A ReattachConfig to the pre09 executor is sent var raw []byte var handle taskRunnerHandle08 if err := json.Unmarshal([]byte(t.HandleID), &handle); err != nil { - fmt.Println("ERR: ", err) + return nil, fmt.Errorf("failed to decode 0.8 driver state: %v", err) } raw, err := json.Marshal(handle.reattachConfig()) if err != nil { - fmt.Println("ERR: ", err) + return nil, fmt.Errorf("failed to encode updated driver state: %v", err) } ls.TaskHandle.DriverState = raw - return ls + return ls, nil } diff --git a/client/state/upgrade.go b/client/state/upgrade.go index a16178f50..da2bfd0fb 100644 --- a/client/state/upgrade.go +++ b/client/state/upgrade.go @@ -208,10 +208,25 @@ func upgradeAllocBucket(logger hclog.Logger, tx *boltdd.Tx, bkt *bolt.Bucket, al taskName, err, ) } + continue } // Convert 0.8 task state to 0.9 task state - localTaskState := oldState.Upgrade(allocID, taskName) + localTaskState, err := oldState.Upgrade(allocID, taskName) + if err != nil { + taskLogger.Warn("dropping invalid task due to error while upgrading state", + "error", err, + ) + + // Delete the invalid task bucket and treat failures + // here as unrecoverable errors. + if err := bkt.DeleteBucket(taskBucket); err != nil { + return fmt.Errorf("error deleting invalid task state for task %q: %v", + taskName, err, + ) + } + continue + } // Insert the new task state if err := putTaskRunnerLocalStateImpl(tx, allocID, taskName, localTaskState); err != nil { diff --git a/drivers/docker/driver.go b/drivers/docker/driver.go index 85bcb69e6..5a03dfde0 100644 --- a/drivers/docker/driver.go +++ b/drivers/docker/driver.go @@ -53,6 +53,10 @@ var ( } return nstructs.NewRecoverableError(err, r) } + + // taskHandleVersion is the version of task handle which this driver sets + // and understands how to decode driver state + taskHandleVersion = 1 ) type Driver struct { @@ -162,7 +166,7 @@ func (d *Driver) StartTask(cfg *drivers.TaskConfig) (*drivers.TaskHandle, *drive return nil, nil, fmt.Errorf("failed to decode driver config: %v", err) } - handle := drivers.NewTaskHandle(1) + handle := drivers.NewTaskHandle(taskHandleVersion) handle.Config = cfg // Initialize docker API clients diff --git a/drivers/exec/driver.go b/drivers/exec/driver.go index 08e8d508f..01134f7b7 100644 --- a/drivers/exec/driver.go +++ b/drivers/exec/driver.go @@ -235,7 +235,7 @@ func (d *Driver) RecoverTask(handle *drivers.TaskHandle) error { if err != nil { return err } - return d.recoverPre0_9Task(handle.Config, reattachConfig) + return d.recoverPre09task(handle.Config, reattachConfig) } // If already attached to handle there's nothing to recover. diff --git a/drivers/exec/driver_pre09.go b/drivers/exec/driver_pre09.go index f29c20331..e475f73d0 100644 --- a/drivers/exec/driver_pre09.go +++ b/drivers/exec/driver_pre09.go @@ -10,7 +10,7 @@ import ( "github.com/hashicorp/nomad/plugins/drivers" ) -func (d *Driver) recoverPre0_9Task(config *drivers.TaskConfig, reattach *plugin.ReattachConfig) error { +func (d *Driver) recoverPre09task(config *drivers.TaskConfig, reattach *plugin.ReattachConfig) error { config.ID = fmt.Sprintf("pre09-%s", uuid.Generate()) exec, pluginClient, err := executor.ReattachToPre09Executor(reattach, d.logger.With("task_name", config.Name, "alloc_id", config.AllocID)) diff --git a/drivers/java/driver.go b/drivers/java/driver.go index ecc8dd4de..e64069ed3 100644 --- a/drivers/java/driver.go +++ b/drivers/java/driver.go @@ -34,6 +34,10 @@ const ( // The key populated in Node Attributes to indicate presence of the Java driver driverAttr = "driver.java" driverVersionAttr = "driver.java.version" + + // taskHandleVersion is the version of task handle which this driver sets + // and understands how to decode driver state + taskHandleVersion = 1 ) var ( @@ -255,7 +259,7 @@ func (d *Driver) RecoverTask(handle *drivers.TaskHandle) error { if err != nil { return err } - return d.recoverPre0_9Task(handle.Config, reattachConfig) + return d.recoverPre09task(handle.Config, reattachConfig) } // If already attached to handle there's nothing to recover. @@ -325,7 +329,7 @@ func (d *Driver) StartTask(cfg *drivers.TaskConfig) (*drivers.TaskHandle, *drive d.logger.Info("starting java task", "driver_cfg", hclog.Fmt("%+v", driverConfig), "args", args) - handle := drivers.NewTaskHandle(1) + handle := drivers.NewTaskHandle(taskHandleVersion) handle.Config = cfg pluginLogFile := filepath.Join(cfg.TaskDir().Dir, "executor.out") diff --git a/drivers/java/driver_pre09.go b/drivers/java/driver_pre09.go index f83be3873..6186be233 100644 --- a/drivers/java/driver_pre09.go +++ b/drivers/java/driver_pre09.go @@ -10,7 +10,7 @@ import ( "github.com/hashicorp/nomad/plugins/drivers" ) -func (d *Driver) recoverPre0_9Task(config *drivers.TaskConfig, reattach *plugin.ReattachConfig) error { +func (d *Driver) recoverPre09task(config *drivers.TaskConfig, reattach *plugin.ReattachConfig) error { config.ID = fmt.Sprintf("pre09-%s", uuid.Generate()) exec, pluginClient, err := executor.ReattachToPre09Executor(reattach, d.logger.With("task_name", config.Name, "alloc_id", config.AllocID)) diff --git a/drivers/mock/driver.go b/drivers/mock/driver.go index be9ca436b..0039da41e 100644 --- a/drivers/mock/driver.go +++ b/drivers/mock/driver.go @@ -26,6 +26,10 @@ const ( // fingerprintPeriod is the interval at which the driver will send fingerprint responses fingerprintPeriod = 500 * time.Millisecond + + // taskHandleVersion is the version of task handle which this driver sets + // and understands how to decode driver state + taskHandleVersion = 1 ) var ( @@ -419,7 +423,7 @@ func (d *Driver) StartTask(cfg *drivers.TaskConfig) (*drivers.TaskHandle, *drive driverState := MockTaskState{ StartedAt: h.startedAt, } - handle := drivers.NewTaskHandle(1) + handle := drivers.NewTaskHandle(taskHandleVersion) handle.Config = cfg if err := handle.SetDriverState(&driverState); err != nil { d.logger.Error("failed to start task, error setting driver state", "error", err, "task_name", cfg.Name) diff --git a/drivers/qemu/driver.go b/drivers/qemu/driver.go index c8bf1f90a..37b8727aa 100644 --- a/drivers/qemu/driver.go +++ b/drivers/qemu/driver.go @@ -43,6 +43,10 @@ const ( // Maximum socket path length prior to qemu 2.10.1 qemuLegacyMaxMonitorPathLen = 108 + + // taskHandleVersion is the version of task handle which this driver sets + // and understands how to decode driver state + taskHandleVersion = 1 ) var ( @@ -253,7 +257,7 @@ func (d *Driver) RecoverTask(handle *drivers.TaskHandle) error { if err != nil { return err } - return d.recoverPre0_9Task(handle.Config, reattachConfig) + return d.recoverPre09task(handle.Config, reattachConfig) } // If already attached to handle there's nothing to recover. @@ -311,7 +315,7 @@ func (d *Driver) StartTask(cfg *drivers.TaskConfig) (*drivers.TaskHandle, *drive return nil, nil, fmt.Errorf("failed to decode driver config: %v", err) } - handle := drivers.NewTaskHandle(1) + handle := drivers.NewTaskHandle(taskHandleVersion) handle.Config = cfg // Get the image source diff --git a/drivers/qemu/driver_pre09.go b/drivers/qemu/driver_pre09.go index 0848f3aaf..605c8c23c 100644 --- a/drivers/qemu/driver_pre09.go +++ b/drivers/qemu/driver_pre09.go @@ -10,7 +10,7 @@ import ( "github.com/hashicorp/nomad/plugins/drivers" ) -func (d *Driver) recoverPre0_9Task(config *drivers.TaskConfig, reattach *plugin.ReattachConfig) error { +func (d *Driver) recoverPre09task(config *drivers.TaskConfig, reattach *plugin.ReattachConfig) error { config.ID = fmt.Sprintf("pre09-%s", uuid.Generate()) exec, pluginClient, err := executor.ReattachToPre09Executor(reattach, d.logger.With("task_name", config.Name, "alloc_id", config.AllocID)) diff --git a/drivers/rawexec/driver.go b/drivers/rawexec/driver.go index 376ecaf2e..aa5523a63 100644 --- a/drivers/rawexec/driver.go +++ b/drivers/rawexec/driver.go @@ -29,6 +29,10 @@ const ( // fingerprintPeriod is the interval at which the driver will send fingerprint responses fingerprintPeriod = 30 * time.Second + + // taskHandleVersion is the version of task handle which this driver sets + // and understands how to decode driver state + taskHandleVersion = 1 ) var ( @@ -319,7 +323,7 @@ func (d *Driver) StartTask(cfg *drivers.TaskConfig) (*drivers.TaskHandle, *drive } d.logger.Info("starting task", "driver_cfg", hclog.Fmt("%+v", driverConfig)) - handle := drivers.NewTaskHandle(1) + handle := drivers.NewTaskHandle(taskHandleVersion) handle.Config = cfg pluginLogFile := filepath.Join(cfg.TaskDir().Dir, "executor.out") diff --git a/drivers/rawexec/driver_pre09.go b/drivers/rawexec/driver_pre09.go index b0d21c4dc..c2201ca0e 100644 --- a/drivers/rawexec/driver_pre09.go +++ b/drivers/rawexec/driver_pre09.go @@ -10,7 +10,7 @@ import ( "github.com/hashicorp/nomad/plugins/drivers" ) -func (d *Driver) recoverPre0_9Task(config *drivers.TaskConfig, reattach *plugin.ReattachConfig) error { +func (d *Driver) recoverPre09task(config *drivers.TaskConfig, reattach *plugin.ReattachConfig) error { config.ID = fmt.Sprintf("pre09-%s", uuid.Generate()) exec, pluginClient, err := executor.ReattachToPre09Executor(reattach, d.logger.With("task_name", config.Name, "alloc_id", config.AllocID)) diff --git a/drivers/rkt/driver.go b/drivers/rkt/driver.go index 3c134827c..3c71046d0 100644 --- a/drivers/rkt/driver.go +++ b/drivers/rkt/driver.go @@ -55,6 +55,10 @@ const ( // networkDeadline is how long to wait for container network // information to become available. networkDeadline = 1 * time.Minute + + // taskHandleVersion is the version of task handle which this driver sets + // and understands how to decode driver state + taskHandleVersion = 1 ) var ( @@ -356,7 +360,7 @@ func (d *Driver) RecoverTask(handle *drivers.TaskHandle) error { if err != nil { return err } - return d.recoverPre0_9Task(handle.Config, reattachConfig) + return d.recoverPre09task(handle.Config, reattachConfig) } // If already attached to handle there's nothing to recover. @@ -423,7 +427,7 @@ func (d *Driver) StartTask(cfg *drivers.TaskConfig) (*drivers.TaskHandle, *drive return nil, nil, fmt.Errorf("failed to decode driver config: %v", err) } - handle := drivers.NewTaskHandle(1) + handle := drivers.NewTaskHandle(taskHandleVersion) handle.Config = cfg // todo(preetha) - port map in client v1 is a slice of maps that get merged. figure out if the caller will do this diff --git a/drivers/rkt/driver_pre09.go b/drivers/rkt/driver_pre09.go index 44e0e5029..c23b4c737 100644 --- a/drivers/rkt/driver_pre09.go +++ b/drivers/rkt/driver_pre09.go @@ -10,7 +10,7 @@ import ( "github.com/hashicorp/nomad/plugins/drivers" ) -func (d *Driver) recoverPre0_9Task(config *drivers.TaskConfig, reattach *plugin.ReattachConfig) error { +func (d *Driver) recoverPre09task(config *drivers.TaskConfig, reattach *plugin.ReattachConfig) error { config.ID = fmt.Sprintf("pre09-%s", uuid.Generate()) exec, pluginClient, err := executor.ReattachToPre09Executor(reattach, d.logger.With("task_name", config.Name, "alloc_id", config.AllocID)) diff --git a/drivers/shared/executor/legacy_executor_wrapper.go b/drivers/shared/executor/legacy_executor_wrapper.go index c32899d3d..e9a01a4cd 100644 --- a/drivers/shared/executor/legacy_executor_wrapper.go +++ b/drivers/shared/executor/legacy_executor_wrapper.go @@ -2,6 +2,7 @@ package executor import ( "encoding/gob" + "fmt" "net/rpc" "os" "syscall" @@ -30,7 +31,7 @@ type legacyExecutorWrapper struct { } func (l *legacyExecutorWrapper) Launch(launchCmd *ExecCommand) (*ProcessState, error) { - panic("not implemented") + return nil, fmt.Errorf("operation not supported for legacy exec wrapper") } func (l *legacyExecutorWrapper) Wait(ctx context.Context) (*ProcessState, error) { @@ -59,7 +60,7 @@ func (l *legacyExecutorWrapper) Shutdown(signal string, gracePeriod time.Duratio } func (l *legacyExecutorWrapper) UpdateResources(*drivers.Resources) error { - panic("not implemented") + return fmt.Errorf("operation not supported for legacy exec wrapper") } func (l *legacyExecutorWrapper) Version() (*ExecutorVersion, error) { @@ -194,5 +195,5 @@ func (p *pre09ExecutorPlugin) Client(b *plugin.MuxBroker, c *rpc.Client) (interf } func (p *pre09ExecutorPlugin) Server(*plugin.MuxBroker) (interface{}, error) { - panic("client only supported") + return nil, fmt.Errorf("client only supported") } diff --git a/plugins/base/plugin.go b/plugins/base/plugin.go index 30d2fd49c..be4145161 100644 --- a/plugins/base/plugin.go +++ b/plugins/base/plugin.go @@ -25,6 +25,9 @@ const ( var ( // Handshake is a common handshake that is shared by all plugins and Nomad. Handshake = plugin.HandshakeConfig{ + // ProtocolVersion for the executor protocol. + // Version 1: pre 0.9 netrpc based executor + // Version 2: 0.9+ grpc based executor ProtocolVersion: 2, MagicCookieKey: "NOMAD_PLUGIN_MAGIC_COOKIE", MagicCookieValue: "e4327c2e01eabfd75a8a67adb114fb34a757d57eee7728d857a8cec6e91a7255", diff --git a/plugins/drivers/driver.go b/plugins/drivers/driver.go index cedae6fac..0af72641a 100644 --- a/plugins/drivers/driver.go +++ b/plugins/drivers/driver.go @@ -21,7 +21,16 @@ import ( "github.com/zclconf/go-cty/cty/msgpack" ) -const DriverHealthy = "Healthy" +const ( + // DriverHealthy is the default health description that should be used + // if the driver is nominal + DriverHealthy = "Healthy" + + // Pre09TaskHandleVersion is the version used to identify that the task + // handle is from a driver that existed before driver plugins (v0.9). The + // driver should take appropriate action to handle the old driver state. + Pre09TaskHandleVersion = 0 +) // DriverPlugin is the interface with drivers will implement. It is also // implemented by a plugin client which proxies the calls to go-plugin. See diff --git a/plugins/drivers/proto/driver.proto b/plugins/drivers/proto/driver.proto index 197d06002..7265880fe 100644 --- a/plugins/drivers/proto/driver.proto +++ b/plugins/drivers/proto/driver.proto @@ -448,7 +448,8 @@ enum TaskState { // TaskHandle is created when starting a task and is used to recover task message TaskHandle { - // Version is used by the driver to version the DriverState schema + // Version is used by the driver to version the DriverState schema. + // Version 0 is reserved by Nomad and should not be used. int32 version = 1; // Config is the TaskConfig for the task