drivers: use consts for task handle version

This commit is contained in:
Nick Ethier 2019-01-16 21:52:31 -05:00
parent ae77fbbe28
commit 80c90ef745
No known key found for this signature in database
GPG Key ID: 07C1A3ECED90D24A
18 changed files with 80 additions and 33 deletions

View File

@ -60,7 +60,7 @@ func (t *taskRunnerHandle08) reattachConfig() *shared.ReattachConfig {
}
}
func (t *taskRunnerState08) Upgrade(allocID, taskName string) *state.LocalState {
func (t *taskRunnerState08) Upgrade(allocID, taskName string) (*state.LocalState, error) {
ls := state.NewLocalState()
// Reuse DriverNetwork
@ -76,38 +76,32 @@ func (t *taskRunnerState08) Upgrade(allocID, taskName string) *state.LocalState
PrestartDone: t.TaskDirBuilt,
}
// Don't need logmon in pre09 tasks
ls.Hooks["logmon"] = &state.HookState{
PrestartDone: true,
}
// Upgrade dispatch payload state
ls.Hooks["dispatch_payload"] = &state.HookState{
PrestartDone: t.PayloadRendered,
}
// Add nessicary fields to TaskConfig
ls.TaskHandle = drivers.NewTaskHandle(0)
// Add necessary fields to TaskConfig
ls.TaskHandle = drivers.NewTaskHandle(drivers.Pre09TaskHandleVersion)
ls.TaskHandle.Config = &drivers.TaskConfig{
Name: taskName,
AllocID: allocID,
}
//TODO do we need to se this accurately? Or will RecoverTask handle it?
ls.TaskHandle.State = drivers.TaskStateUnknown
// A ReattachConfig to the pre09 executor is sent
var raw []byte
var handle taskRunnerHandle08
if err := json.Unmarshal([]byte(t.HandleID), &handle); err != nil {
fmt.Println("ERR: ", err)
return nil, fmt.Errorf("failed to decode 0.8 driver state: %v", err)
}
raw, err := json.Marshal(handle.reattachConfig())
if err != nil {
fmt.Println("ERR: ", err)
return nil, fmt.Errorf("failed to encode updated driver state: %v", err)
}
ls.TaskHandle.DriverState = raw
return ls
return ls, nil
}

View File

@ -208,10 +208,25 @@ func upgradeAllocBucket(logger hclog.Logger, tx *boltdd.Tx, bkt *bolt.Bucket, al
taskName, err,
)
}
continue
}
// Convert 0.8 task state to 0.9 task state
localTaskState := oldState.Upgrade(allocID, taskName)
localTaskState, err := oldState.Upgrade(allocID, taskName)
if err != nil {
taskLogger.Warn("dropping invalid task due to error while upgrading state",
"error", err,
)
// Delete the invalid task bucket and treat failures
// here as unrecoverable errors.
if err := bkt.DeleteBucket(taskBucket); err != nil {
return fmt.Errorf("error deleting invalid task state for task %q: %v",
taskName, err,
)
}
continue
}
// Insert the new task state
if err := putTaskRunnerLocalStateImpl(tx, allocID, taskName, localTaskState); err != nil {

View File

@ -53,6 +53,10 @@ var (
}
return nstructs.NewRecoverableError(err, r)
}
// taskHandleVersion is the version of task handle which this driver sets
// and understands how to decode driver state
taskHandleVersion = 1
)
type Driver struct {
@ -162,7 +166,7 @@ func (d *Driver) StartTask(cfg *drivers.TaskConfig) (*drivers.TaskHandle, *drive
return nil, nil, fmt.Errorf("failed to decode driver config: %v", err)
}
handle := drivers.NewTaskHandle(1)
handle := drivers.NewTaskHandle(taskHandleVersion)
handle.Config = cfg
// Initialize docker API clients

View File

@ -235,7 +235,7 @@ func (d *Driver) RecoverTask(handle *drivers.TaskHandle) error {
if err != nil {
return err
}
return d.recoverPre0_9Task(handle.Config, reattachConfig)
return d.recoverPre09task(handle.Config, reattachConfig)
}
// If already attached to handle there's nothing to recover.

View File

@ -10,7 +10,7 @@ import (
"github.com/hashicorp/nomad/plugins/drivers"
)
func (d *Driver) recoverPre0_9Task(config *drivers.TaskConfig, reattach *plugin.ReattachConfig) error {
func (d *Driver) recoverPre09task(config *drivers.TaskConfig, reattach *plugin.ReattachConfig) error {
config.ID = fmt.Sprintf("pre09-%s", uuid.Generate())
exec, pluginClient, err := executor.ReattachToPre09Executor(reattach,
d.logger.With("task_name", config.Name, "alloc_id", config.AllocID))

View File

@ -34,6 +34,10 @@ const (
// The key populated in Node Attributes to indicate presence of the Java driver
driverAttr = "driver.java"
driverVersionAttr = "driver.java.version"
// taskHandleVersion is the version of task handle which this driver sets
// and understands how to decode driver state
taskHandleVersion = 1
)
var (
@ -255,7 +259,7 @@ func (d *Driver) RecoverTask(handle *drivers.TaskHandle) error {
if err != nil {
return err
}
return d.recoverPre0_9Task(handle.Config, reattachConfig)
return d.recoverPre09task(handle.Config, reattachConfig)
}
// If already attached to handle there's nothing to recover.
@ -325,7 +329,7 @@ func (d *Driver) StartTask(cfg *drivers.TaskConfig) (*drivers.TaskHandle, *drive
d.logger.Info("starting java task", "driver_cfg", hclog.Fmt("%+v", driverConfig), "args", args)
handle := drivers.NewTaskHandle(1)
handle := drivers.NewTaskHandle(taskHandleVersion)
handle.Config = cfg
pluginLogFile := filepath.Join(cfg.TaskDir().Dir, "executor.out")

View File

@ -10,7 +10,7 @@ import (
"github.com/hashicorp/nomad/plugins/drivers"
)
func (d *Driver) recoverPre0_9Task(config *drivers.TaskConfig, reattach *plugin.ReattachConfig) error {
func (d *Driver) recoverPre09task(config *drivers.TaskConfig, reattach *plugin.ReattachConfig) error {
config.ID = fmt.Sprintf("pre09-%s", uuid.Generate())
exec, pluginClient, err := executor.ReattachToPre09Executor(reattach,
d.logger.With("task_name", config.Name, "alloc_id", config.AllocID))

View File

@ -26,6 +26,10 @@ const (
// fingerprintPeriod is the interval at which the driver will send fingerprint responses
fingerprintPeriod = 500 * time.Millisecond
// taskHandleVersion is the version of task handle which this driver sets
// and understands how to decode driver state
taskHandleVersion = 1
)
var (
@ -419,7 +423,7 @@ func (d *Driver) StartTask(cfg *drivers.TaskConfig) (*drivers.TaskHandle, *drive
driverState := MockTaskState{
StartedAt: h.startedAt,
}
handle := drivers.NewTaskHandle(1)
handle := drivers.NewTaskHandle(taskHandleVersion)
handle.Config = cfg
if err := handle.SetDriverState(&driverState); err != nil {
d.logger.Error("failed to start task, error setting driver state", "error", err, "task_name", cfg.Name)

View File

@ -43,6 +43,10 @@ const (
// Maximum socket path length prior to qemu 2.10.1
qemuLegacyMaxMonitorPathLen = 108
// taskHandleVersion is the version of task handle which this driver sets
// and understands how to decode driver state
taskHandleVersion = 1
)
var (
@ -253,7 +257,7 @@ func (d *Driver) RecoverTask(handle *drivers.TaskHandle) error {
if err != nil {
return err
}
return d.recoverPre0_9Task(handle.Config, reattachConfig)
return d.recoverPre09task(handle.Config, reattachConfig)
}
// If already attached to handle there's nothing to recover.
@ -311,7 +315,7 @@ func (d *Driver) StartTask(cfg *drivers.TaskConfig) (*drivers.TaskHandle, *drive
return nil, nil, fmt.Errorf("failed to decode driver config: %v", err)
}
handle := drivers.NewTaskHandle(1)
handle := drivers.NewTaskHandle(taskHandleVersion)
handle.Config = cfg
// Get the image source

View File

@ -10,7 +10,7 @@ import (
"github.com/hashicorp/nomad/plugins/drivers"
)
func (d *Driver) recoverPre0_9Task(config *drivers.TaskConfig, reattach *plugin.ReattachConfig) error {
func (d *Driver) recoverPre09task(config *drivers.TaskConfig, reattach *plugin.ReattachConfig) error {
config.ID = fmt.Sprintf("pre09-%s", uuid.Generate())
exec, pluginClient, err := executor.ReattachToPre09Executor(reattach,
d.logger.With("task_name", config.Name, "alloc_id", config.AllocID))

View File

@ -29,6 +29,10 @@ const (
// fingerprintPeriod is the interval at which the driver will send fingerprint responses
fingerprintPeriod = 30 * time.Second
// taskHandleVersion is the version of task handle which this driver sets
// and understands how to decode driver state
taskHandleVersion = 1
)
var (
@ -319,7 +323,7 @@ func (d *Driver) StartTask(cfg *drivers.TaskConfig) (*drivers.TaskHandle, *drive
}
d.logger.Info("starting task", "driver_cfg", hclog.Fmt("%+v", driverConfig))
handle := drivers.NewTaskHandle(1)
handle := drivers.NewTaskHandle(taskHandleVersion)
handle.Config = cfg
pluginLogFile := filepath.Join(cfg.TaskDir().Dir, "executor.out")

View File

@ -10,7 +10,7 @@ import (
"github.com/hashicorp/nomad/plugins/drivers"
)
func (d *Driver) recoverPre0_9Task(config *drivers.TaskConfig, reattach *plugin.ReattachConfig) error {
func (d *Driver) recoverPre09task(config *drivers.TaskConfig, reattach *plugin.ReattachConfig) error {
config.ID = fmt.Sprintf("pre09-%s", uuid.Generate())
exec, pluginClient, err := executor.ReattachToPre09Executor(reattach,
d.logger.With("task_name", config.Name, "alloc_id", config.AllocID))

View File

@ -55,6 +55,10 @@ const (
// networkDeadline is how long to wait for container network
// information to become available.
networkDeadline = 1 * time.Minute
// taskHandleVersion is the version of task handle which this driver sets
// and understands how to decode driver state
taskHandleVersion = 1
)
var (
@ -356,7 +360,7 @@ func (d *Driver) RecoverTask(handle *drivers.TaskHandle) error {
if err != nil {
return err
}
return d.recoverPre0_9Task(handle.Config, reattachConfig)
return d.recoverPre09task(handle.Config, reattachConfig)
}
// If already attached to handle there's nothing to recover.
@ -423,7 +427,7 @@ func (d *Driver) StartTask(cfg *drivers.TaskConfig) (*drivers.TaskHandle, *drive
return nil, nil, fmt.Errorf("failed to decode driver config: %v", err)
}
handle := drivers.NewTaskHandle(1)
handle := drivers.NewTaskHandle(taskHandleVersion)
handle.Config = cfg
// todo(preetha) - port map in client v1 is a slice of maps that get merged. figure out if the caller will do this

View File

@ -10,7 +10,7 @@ import (
"github.com/hashicorp/nomad/plugins/drivers"
)
func (d *Driver) recoverPre0_9Task(config *drivers.TaskConfig, reattach *plugin.ReattachConfig) error {
func (d *Driver) recoverPre09task(config *drivers.TaskConfig, reattach *plugin.ReattachConfig) error {
config.ID = fmt.Sprintf("pre09-%s", uuid.Generate())
exec, pluginClient, err := executor.ReattachToPre09Executor(reattach,
d.logger.With("task_name", config.Name, "alloc_id", config.AllocID))

View File

@ -2,6 +2,7 @@ package executor
import (
"encoding/gob"
"fmt"
"net/rpc"
"os"
"syscall"
@ -30,7 +31,7 @@ type legacyExecutorWrapper struct {
}
func (l *legacyExecutorWrapper) Launch(launchCmd *ExecCommand) (*ProcessState, error) {
panic("not implemented")
return nil, fmt.Errorf("operation not supported for legacy exec wrapper")
}
func (l *legacyExecutorWrapper) Wait(ctx context.Context) (*ProcessState, error) {
@ -59,7 +60,7 @@ func (l *legacyExecutorWrapper) Shutdown(signal string, gracePeriod time.Duratio
}
func (l *legacyExecutorWrapper) UpdateResources(*drivers.Resources) error {
panic("not implemented")
return fmt.Errorf("operation not supported for legacy exec wrapper")
}
func (l *legacyExecutorWrapper) Version() (*ExecutorVersion, error) {
@ -194,5 +195,5 @@ func (p *pre09ExecutorPlugin) Client(b *plugin.MuxBroker, c *rpc.Client) (interf
}
func (p *pre09ExecutorPlugin) Server(*plugin.MuxBroker) (interface{}, error) {
panic("client only supported")
return nil, fmt.Errorf("client only supported")
}

View File

@ -25,6 +25,9 @@ const (
var (
// Handshake is a common handshake that is shared by all plugins and Nomad.
Handshake = plugin.HandshakeConfig{
// ProtocolVersion for the executor protocol.
// Version 1: pre 0.9 netrpc based executor
// Version 2: 0.9+ grpc based executor
ProtocolVersion: 2,
MagicCookieKey: "NOMAD_PLUGIN_MAGIC_COOKIE",
MagicCookieValue: "e4327c2e01eabfd75a8a67adb114fb34a757d57eee7728d857a8cec6e91a7255",

View File

@ -21,7 +21,16 @@ import (
"github.com/zclconf/go-cty/cty/msgpack"
)
const DriverHealthy = "Healthy"
const (
// DriverHealthy is the default health description that should be used
// if the driver is nominal
DriverHealthy = "Healthy"
// Pre09TaskHandleVersion is the version used to identify that the task
// handle is from a driver that existed before driver plugins (v0.9). The
// driver should take appropriate action to handle the old driver state.
Pre09TaskHandleVersion = 0
)
// DriverPlugin is the interface with drivers will implement. It is also
// implemented by a plugin client which proxies the calls to go-plugin. See

View File

@ -448,7 +448,8 @@ enum TaskState {
// TaskHandle is created when starting a task and is used to recover task
message TaskHandle {
// Version is used by the driver to version the DriverState schema
// Version is used by the driver to version the DriverState schema.
// Version 0 is reserved by Nomad and should not be used.
int32 version = 1;
// Config is the TaskConfig for the task