drivers: add docker upgrade path and e2e test
This commit is contained in:
parent
a3510413ff
commit
5b9013528e
|
@ -3,6 +3,7 @@ package state
|
||||||
import (
|
import (
|
||||||
"encoding/json"
|
"encoding/json"
|
||||||
"fmt"
|
"fmt"
|
||||||
|
"strings"
|
||||||
|
|
||||||
"github.com/hashicorp/nomad/client/allocrunner/taskrunner/state"
|
"github.com/hashicorp/nomad/client/allocrunner/taskrunner/state"
|
||||||
"github.com/hashicorp/nomad/nomad/structs"
|
"github.com/hashicorp/nomad/nomad/structs"
|
||||||
|
@ -44,7 +45,16 @@ type taskRunnerState08 struct {
|
||||||
//CreatedResources *driver.CreatedResources
|
//CreatedResources *driver.CreatedResources
|
||||||
}
|
}
|
||||||
|
|
||||||
type taskRunnerHandle08 struct {
|
type TaskRunnerHandle08 struct {
|
||||||
|
// Docker specific handle info
|
||||||
|
ContainerID string `json:"ContainerID"`
|
||||||
|
Image string `json:"Image"`
|
||||||
|
|
||||||
|
// LXC specific handle info
|
||||||
|
ContainerName string `json:"ContainerName"`
|
||||||
|
LxcPath string `json:"LxcPath"`
|
||||||
|
|
||||||
|
// Executor reattach config
|
||||||
PluginConfig struct {
|
PluginConfig struct {
|
||||||
Pid int `json:"Pid"`
|
Pid int `json:"Pid"`
|
||||||
AddrNet string `json:"AddrNet"`
|
AddrNet string `json:"AddrNet"`
|
||||||
|
@ -52,7 +62,7 @@ type taskRunnerHandle08 struct {
|
||||||
} `json:"PluginConfig"`
|
} `json:"PluginConfig"`
|
||||||
}
|
}
|
||||||
|
|
||||||
func (t *taskRunnerHandle08) reattachConfig() *shared.ReattachConfig {
|
func (t *TaskRunnerHandle08) ReattachConfig() *shared.ReattachConfig {
|
||||||
return &shared.ReattachConfig{
|
return &shared.ReattachConfig{
|
||||||
Network: t.PluginConfig.AddrNet,
|
Network: t.PluginConfig.AddrNet,
|
||||||
Addr: t.PluginConfig.AddrName,
|
Addr: t.PluginConfig.AddrName,
|
||||||
|
@ -90,18 +100,30 @@ func (t *taskRunnerState08) Upgrade(allocID, taskName string) (*state.LocalState
|
||||||
|
|
||||||
ls.TaskHandle.State = drivers.TaskStateUnknown
|
ls.TaskHandle.State = drivers.TaskStateUnknown
|
||||||
|
|
||||||
// A ReattachConfig to the pre09 executor is sent
|
// The docker driver prefixed the handle with 'DOCKER:'
|
||||||
var raw []byte
|
// Strip so that it can be unmarshalled
|
||||||
var handle taskRunnerHandle08
|
data := t.HandleID
|
||||||
if err := json.Unmarshal([]byte(t.HandleID), &handle); err != nil {
|
if strings.HasPrefix(data, "DOCKER:") {
|
||||||
return nil, fmt.Errorf("failed to decode 0.8 driver state: %v", err)
|
data = data[7:]
|
||||||
}
|
|
||||||
raw, err := json.Marshal(handle.reattachConfig())
|
|
||||||
if err != nil {
|
|
||||||
return nil, fmt.Errorf("failed to encode updated driver state: %v", err)
|
|
||||||
}
|
}
|
||||||
|
|
||||||
ls.TaskHandle.DriverState = raw
|
// The pre09 driver handle ID is given to the driver. It is unmarshalled
|
||||||
|
// here to check for errors
|
||||||
|
if _, err := UnmarshalPre09HandleID([]byte(data)); err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
ls.TaskHandle.DriverState = []byte(data)
|
||||||
|
|
||||||
return ls, nil
|
return ls, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// UnmarshalPre09HandleID decodes the pre09 json encoded handle ID
|
||||||
|
func UnmarshalPre09HandleID(raw []byte) (*TaskRunnerHandle08, error) {
|
||||||
|
var handle TaskRunnerHandle08
|
||||||
|
if err := json.Unmarshal(raw, &handle); err != nil {
|
||||||
|
return nil, fmt.Errorf("failed to decode 0.8 driver state: %v", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
return &handle, nil
|
||||||
|
}
|
||||||
|
|
|
@ -118,6 +118,11 @@ func (d *Driver) RecoverTask(handle *drivers.TaskHandle) error {
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// COMPAT(0.10): pre 0.9 upgrade path check
|
||||||
|
if handle.Version == 0 {
|
||||||
|
return d.recoverPre09Task(handle)
|
||||||
|
}
|
||||||
|
|
||||||
var handleState taskHandleState
|
var handleState taskHandleState
|
||||||
if err := handle.GetDriverState(&handleState); err != nil {
|
if err := handle.GetDriverState(&handleState); err != nil {
|
||||||
return fmt.Errorf("failed to decode driver task state: %v", err)
|
return fmt.Errorf("failed to decode driver task state: %v", err)
|
||||||
|
|
|
@ -0,0 +1,77 @@
|
||||||
|
package docker
|
||||||
|
|
||||||
|
import (
|
||||||
|
"fmt"
|
||||||
|
|
||||||
|
"github.com/hashicorp/nomad/client/state"
|
||||||
|
"github.com/hashicorp/nomad/drivers/docker/docklog"
|
||||||
|
"github.com/hashicorp/nomad/drivers/shared/executor"
|
||||||
|
"github.com/hashicorp/nomad/helper/uuid"
|
||||||
|
"github.com/hashicorp/nomad/plugins/drivers"
|
||||||
|
"github.com/hashicorp/nomad/plugins/shared"
|
||||||
|
)
|
||||||
|
|
||||||
|
func (d *Driver) recoverPre09Task(h *drivers.TaskHandle) error {
|
||||||
|
handle, err := state.UnmarshalPre09HandleID(h.DriverState)
|
||||||
|
if err != nil {
|
||||||
|
return fmt.Errorf("failed to decode pre09 driver handle: %v", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
reattach, err := shared.ReattachConfigToGoPlugin(handle.ReattachConfig())
|
||||||
|
if err != nil {
|
||||||
|
return fmt.Errorf("failed to decode reattach config from pre09 handle: %v", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
h.Config.ID = fmt.Sprintf("pre09-%s", uuid.Generate())
|
||||||
|
exec, pluginClient, err := executor.ReattachToPre09Executor(reattach,
|
||||||
|
d.logger.With("task_name", h.Config.Name, "alloc_id", h.Config.AllocID))
|
||||||
|
if err != nil {
|
||||||
|
d.logger.Error("failed to reattach to executor", "error", err, "task_name", h.Config.Name)
|
||||||
|
return fmt.Errorf("failed to reattach to executor: %v", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
client, _, err := d.dockerClients()
|
||||||
|
if err != nil {
|
||||||
|
return fmt.Errorf("failed to get docker client: %v", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
container, err := client.InspectContainer(handle.ContainerID)
|
||||||
|
if err != nil {
|
||||||
|
return fmt.Errorf("failed to inspect container for id %q: %v", handle.ContainerID, err)
|
||||||
|
}
|
||||||
|
|
||||||
|
th := &taskHandle{
|
||||||
|
client: client,
|
||||||
|
waitClient: waitClient,
|
||||||
|
dlogger: &executorDockerLoggerShim{exec: exec},
|
||||||
|
dloggerPluginClient: pluginClient,
|
||||||
|
logger: d.logger.With("container_id", container.ID),
|
||||||
|
task: h.Config,
|
||||||
|
containerID: container.ID,
|
||||||
|
containerImage: container.Image,
|
||||||
|
doneCh: make(chan bool),
|
||||||
|
waitCh: make(chan struct{}),
|
||||||
|
removeContainerOnExit: d.config.GC.Container,
|
||||||
|
}
|
||||||
|
|
||||||
|
d.tasks.Set(h.Config.ID, th)
|
||||||
|
go th.run()
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// executorDockerLoggerShim is used by upgraded tasks as the docker logger. When
|
||||||
|
// the task exits, the Stop() func of the docker logger is called, this shim
|
||||||
|
// will proxy that call to the executor Shutdown() func which will stop the
|
||||||
|
// syslog server started by the pre09 executor
|
||||||
|
type executorDockerLoggerShim struct {
|
||||||
|
exec executor.Executor
|
||||||
|
}
|
||||||
|
|
||||||
|
func (e *executorDockerLoggerShim) Start(*docklog.StartOpts) error { return nil }
|
||||||
|
func (e *executorDockerLoggerShim) Stop() error {
|
||||||
|
if err := e.exec.Shutdown("", 0); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
return nil
|
||||||
|
}
|
|
@ -2,7 +2,6 @@ package exec
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"context"
|
"context"
|
||||||
"encoding/json"
|
|
||||||
"fmt"
|
"fmt"
|
||||||
"os"
|
"os"
|
||||||
"path/filepath"
|
"path/filepath"
|
||||||
|
@ -259,19 +258,9 @@ func (d *Driver) RecoverTask(handle *drivers.TaskHandle) error {
|
||||||
return fmt.Errorf("handle cannot be nil")
|
return fmt.Errorf("handle cannot be nil")
|
||||||
}
|
}
|
||||||
|
|
||||||
// pre 0.9 upgrade path check
|
// COMPAT(0.10): pre 0.9 upgrade path check
|
||||||
if handle.Version == 0 {
|
if handle.Version == 0 {
|
||||||
var reattach shared.ReattachConfig
|
return d.recoverPre09Task(handle)
|
||||||
d.logger.Debug("parsing pre09 driver state", "state", string(handle.DriverState))
|
|
||||||
if err := json.Unmarshal(handle.DriverState, &reattach); err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
|
|
||||||
reattachConfig, err := shared.ReattachConfigToGoPlugin(&reattach)
|
|
||||||
if err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
return d.recoverPre09Task(handle.Config, reattachConfig)
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// If already attached to handle there's nothing to recover.
|
// If already attached to handle there's nothing to recover.
|
||||||
|
|
|
@ -4,33 +4,44 @@ import (
|
||||||
"fmt"
|
"fmt"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
plugin "github.com/hashicorp/go-plugin"
|
"github.com/hashicorp/nomad/client/state"
|
||||||
"github.com/hashicorp/nomad/drivers/shared/executor"
|
"github.com/hashicorp/nomad/drivers/shared/executor"
|
||||||
"github.com/hashicorp/nomad/helper/uuid"
|
"github.com/hashicorp/nomad/helper/uuid"
|
||||||
"github.com/hashicorp/nomad/plugins/drivers"
|
"github.com/hashicorp/nomad/plugins/drivers"
|
||||||
|
"github.com/hashicorp/nomad/plugins/shared"
|
||||||
)
|
)
|
||||||
|
|
||||||
func (d *Driver) recoverPre09Task(config *drivers.TaskConfig, reattach *plugin.ReattachConfig) error {
|
func (d *Driver) recoverPre09Task(h *drivers.TaskHandle) error {
|
||||||
config.ID = fmt.Sprintf("pre09-%s", uuid.Generate())
|
handle, err := state.UnmarshalPre09HandleID(h.DriverState)
|
||||||
exec, pluginClient, err := executor.ReattachToPre09Executor(reattach,
|
|
||||||
d.logger.With("task_name", config.Name, "alloc_id", config.AllocID))
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
d.logger.Error("failed to reattach to executor", "error", err, "task_name", config.Name)
|
return fmt.Errorf("failed to decode pre09 driver handle: %v", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
reattach, err := shared.ReattachConfigToGoPlugin(handle.ReattachConfig())
|
||||||
|
if err != nil {
|
||||||
|
return fmt.Errorf("failed to decode reattach config from pre09 handle: %v", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
h.Config.ID = fmt.Sprintf("pre09-%s", uuid.Generate())
|
||||||
|
exec, pluginClient, err := executor.ReattachToPre09Executor(reattach,
|
||||||
|
d.logger.With("task_name", h.Config.Name, "alloc_id", h.Config.AllocID))
|
||||||
|
if err != nil {
|
||||||
|
d.logger.Error("failed to reattach to executor", "error", err, "task_name", h.Config.Name)
|
||||||
return fmt.Errorf("failed to reattach to executor: %v", err)
|
return fmt.Errorf("failed to reattach to executor: %v", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
h := &taskHandle{
|
th := &taskHandle{
|
||||||
exec: exec,
|
exec: exec,
|
||||||
pid: reattach.Pid,
|
pid: reattach.Pid,
|
||||||
pluginClient: pluginClient,
|
pluginClient: pluginClient,
|
||||||
taskConfig: config,
|
taskConfig: h.Config,
|
||||||
procState: drivers.TaskStateRunning,
|
procState: drivers.TaskStateRunning,
|
||||||
startedAt: time.Now(),
|
startedAt: time.Now(),
|
||||||
exitResult: &drivers.ExitResult{},
|
exitResult: &drivers.ExitResult{},
|
||||||
}
|
}
|
||||||
|
|
||||||
d.tasks.Set(config.ID, h)
|
d.tasks.Set(h.Config.ID, th)
|
||||||
|
|
||||||
go h.run()
|
go th.run()
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
|
@ -2,7 +2,6 @@ package java
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"context"
|
"context"
|
||||||
"encoding/json"
|
|
||||||
"fmt"
|
"fmt"
|
||||||
"os"
|
"os"
|
||||||
"os/exec"
|
"os/exec"
|
||||||
|
@ -247,19 +246,9 @@ func (d *Driver) RecoverTask(handle *drivers.TaskHandle) error {
|
||||||
return fmt.Errorf("handle cannot be nil")
|
return fmt.Errorf("handle cannot be nil")
|
||||||
}
|
}
|
||||||
|
|
||||||
// pre 0.9 upgrade path check
|
// COMPAT(0.10): pre 0.9 upgrade path check
|
||||||
if handle.Version == 0 {
|
if handle.Version == 0 {
|
||||||
var reattach shared.ReattachConfig
|
return d.recoverPre09Task(handle)
|
||||||
d.logger.Debug("parsing pre09 driver state", "state", string(handle.DriverState))
|
|
||||||
if err := json.Unmarshal(handle.DriverState, &reattach); err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
|
|
||||||
reattachConfig, err := shared.ReattachConfigToGoPlugin(&reattach)
|
|
||||||
if err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
return d.recoverPre09Task(handle.Config, reattachConfig)
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// If already attached to handle there's nothing to recover.
|
// If already attached to handle there's nothing to recover.
|
||||||
|
|
|
@ -4,33 +4,44 @@ import (
|
||||||
"fmt"
|
"fmt"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
plugin "github.com/hashicorp/go-plugin"
|
"github.com/hashicorp/nomad/client/state"
|
||||||
"github.com/hashicorp/nomad/drivers/shared/executor"
|
"github.com/hashicorp/nomad/drivers/shared/executor"
|
||||||
"github.com/hashicorp/nomad/helper/uuid"
|
"github.com/hashicorp/nomad/helper/uuid"
|
||||||
"github.com/hashicorp/nomad/plugins/drivers"
|
"github.com/hashicorp/nomad/plugins/drivers"
|
||||||
|
"github.com/hashicorp/nomad/plugins/shared"
|
||||||
)
|
)
|
||||||
|
|
||||||
func (d *Driver) recoverPre09Task(config *drivers.TaskConfig, reattach *plugin.ReattachConfig) error {
|
func (d *Driver) recoverPre09Task(h *drivers.TaskHandle) error {
|
||||||
config.ID = fmt.Sprintf("pre09-%s", uuid.Generate())
|
handle, err := state.UnmarshalPre09HandleID(h.DriverState)
|
||||||
exec, pluginClient, err := executor.ReattachToPre09Executor(reattach,
|
|
||||||
d.logger.With("task_name", config.Name, "alloc_id", config.AllocID))
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
d.logger.Error("failed to reattach to executor", "error", err, "task_name", config.Name)
|
return fmt.Errorf("failed to decode pre09 driver handle: %v", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
reattach, err := shared.ReattachConfigToGoPlugin(handle.ReattachConfig())
|
||||||
|
if err != nil {
|
||||||
|
return fmt.Errorf("failed to decode reattach config from pre09 handle: %v", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
h.Config.ID = fmt.Sprintf("pre09-%s", uuid.Generate())
|
||||||
|
exec, pluginClient, err := executor.ReattachToPre09Executor(reattach,
|
||||||
|
d.logger.With("task_name", h.Config.Name, "alloc_id", h.Config.AllocID))
|
||||||
|
if err != nil {
|
||||||
|
d.logger.Error("failed to reattach to executor", "error", err, "task_name", h.Config.Name)
|
||||||
return fmt.Errorf("failed to reattach to executor: %v", err)
|
return fmt.Errorf("failed to reattach to executor: %v", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
h := &taskHandle{
|
th := &taskHandle{
|
||||||
exec: exec,
|
exec: exec,
|
||||||
pid: reattach.Pid,
|
pid: reattach.Pid,
|
||||||
pluginClient: pluginClient,
|
pluginClient: pluginClient,
|
||||||
taskConfig: config,
|
taskConfig: h.Config,
|
||||||
procState: drivers.TaskStateRunning,
|
procState: drivers.TaskStateRunning,
|
||||||
startedAt: time.Now(),
|
startedAt: time.Now(),
|
||||||
exitResult: &drivers.ExitResult{},
|
exitResult: &drivers.ExitResult{},
|
||||||
}
|
}
|
||||||
|
|
||||||
d.tasks.Set(config.ID, h)
|
d.tasks.Set(h.Config.ID, th)
|
||||||
|
|
||||||
go h.run()
|
go th.run()
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
|
@ -2,7 +2,6 @@ package qemu
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"context"
|
"context"
|
||||||
"encoding/json"
|
|
||||||
"errors"
|
"errors"
|
||||||
"fmt"
|
"fmt"
|
||||||
"net"
|
"net"
|
||||||
|
@ -245,19 +244,9 @@ func (d *Driver) RecoverTask(handle *drivers.TaskHandle) error {
|
||||||
return fmt.Errorf("error: handle cannot be nil")
|
return fmt.Errorf("error: handle cannot be nil")
|
||||||
}
|
}
|
||||||
|
|
||||||
// pre 0.9 upgrade path check
|
// COMPAT(0.10): pre 0.9 upgrade path check
|
||||||
if handle.Version == 0 {
|
if handle.Version == 0 {
|
||||||
var reattach shared.ReattachConfig
|
return d.recoverPre09Task(handle)
|
||||||
d.logger.Debug("parsing pre09 driver state", "state", string(handle.DriverState))
|
|
||||||
if err := json.Unmarshal(handle.DriverState, &reattach); err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
|
|
||||||
reattachConfig, err := shared.ReattachConfigToGoPlugin(&reattach)
|
|
||||||
if err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
return d.recoverPre09Task(handle.Config, reattachConfig)
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// If already attached to handle there's nothing to recover.
|
// If already attached to handle there's nothing to recover.
|
||||||
|
|
|
@ -4,33 +4,44 @@ import (
|
||||||
"fmt"
|
"fmt"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
plugin "github.com/hashicorp/go-plugin"
|
"github.com/hashicorp/nomad/client/state"
|
||||||
"github.com/hashicorp/nomad/drivers/shared/executor"
|
"github.com/hashicorp/nomad/drivers/shared/executor"
|
||||||
"github.com/hashicorp/nomad/helper/uuid"
|
"github.com/hashicorp/nomad/helper/uuid"
|
||||||
"github.com/hashicorp/nomad/plugins/drivers"
|
"github.com/hashicorp/nomad/plugins/drivers"
|
||||||
|
"github.com/hashicorp/nomad/plugins/shared"
|
||||||
)
|
)
|
||||||
|
|
||||||
func (d *Driver) recoverPre09Task(config *drivers.TaskConfig, reattach *plugin.ReattachConfig) error {
|
func (d *Driver) recoverPre09Task(h *drivers.TaskHandle) error {
|
||||||
config.ID = fmt.Sprintf("pre09-%s", uuid.Generate())
|
handle, err := state.UnmarshalPre09HandleID(h.DriverState)
|
||||||
exec, pluginClient, err := executor.ReattachToPre09Executor(reattach,
|
|
||||||
d.logger.With("task_name", config.Name, "alloc_id", config.AllocID))
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
d.logger.Error("failed to reattach to executor", "error", err, "task_name", config.Name)
|
return fmt.Errorf("failed to decode pre09 driver handle: %v", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
reattach, err := shared.ReattachConfigToGoPlugin(handle.ReattachConfig())
|
||||||
|
if err != nil {
|
||||||
|
return fmt.Errorf("failed to decode reattach config from pre09 handle: %v", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
h.Config.ID = fmt.Sprintf("pre09-%s", uuid.Generate())
|
||||||
|
exec, pluginClient, err := executor.ReattachToPre09Executor(reattach,
|
||||||
|
d.logger.With("task_name", h.Config.Name, "alloc_id", h.Config.AllocID))
|
||||||
|
if err != nil {
|
||||||
|
d.logger.Error("failed to reattach to executor", "error", err, "task_name", h.Config.Name)
|
||||||
return fmt.Errorf("failed to reattach to executor: %v", err)
|
return fmt.Errorf("failed to reattach to executor: %v", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
h := &taskHandle{
|
th := &taskHandle{
|
||||||
exec: exec,
|
exec: exec,
|
||||||
pid: reattach.Pid,
|
pid: reattach.Pid,
|
||||||
pluginClient: pluginClient,
|
pluginClient: pluginClient,
|
||||||
taskConfig: config,
|
taskConfig: h.Config,
|
||||||
procState: drivers.TaskStateRunning,
|
procState: drivers.TaskStateRunning,
|
||||||
startedAt: time.Now(),
|
startedAt: time.Now(),
|
||||||
exitResult: &drivers.ExitResult{},
|
exitResult: &drivers.ExitResult{},
|
||||||
}
|
}
|
||||||
|
|
||||||
d.tasks.Set(config.ID, h)
|
d.tasks.Set(h.Config.ID, th)
|
||||||
|
|
||||||
go h.run()
|
go th.run()
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
|
@ -2,7 +2,6 @@ package rawexec
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"context"
|
"context"
|
||||||
"encoding/json"
|
|
||||||
"fmt"
|
"fmt"
|
||||||
"os"
|
"os"
|
||||||
"path/filepath"
|
"path/filepath"
|
||||||
|
@ -251,19 +250,9 @@ func (d *Driver) RecoverTask(handle *drivers.TaskHandle) error {
|
||||||
return fmt.Errorf("handle cannot be nil")
|
return fmt.Errorf("handle cannot be nil")
|
||||||
}
|
}
|
||||||
|
|
||||||
// pre 0.9 upgrade path check
|
// COMPAT(0.10): pre 0.9 upgrade path check
|
||||||
if handle.Version == 0 {
|
if handle.Version == 0 {
|
||||||
var reattach shared.ReattachConfig
|
return d.recoverPre09Task(handle)
|
||||||
d.logger.Debug("parsing pre09 driver state", "state", string(handle.DriverState))
|
|
||||||
if err := json.Unmarshal(handle.DriverState, &reattach); err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
|
|
||||||
reattachConfig, err := shared.ReattachConfigToGoPlugin(&reattach)
|
|
||||||
if err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
return d.recoverPre09Task(handle.Config, reattachConfig)
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// If already attached to handle there's nothing to recover.
|
// If already attached to handle there's nothing to recover.
|
||||||
|
|
|
@ -4,33 +4,44 @@ import (
|
||||||
"fmt"
|
"fmt"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
plugin "github.com/hashicorp/go-plugin"
|
"github.com/hashicorp/nomad/client/state"
|
||||||
"github.com/hashicorp/nomad/drivers/shared/executor"
|
"github.com/hashicorp/nomad/drivers/shared/executor"
|
||||||
"github.com/hashicorp/nomad/helper/uuid"
|
"github.com/hashicorp/nomad/helper/uuid"
|
||||||
"github.com/hashicorp/nomad/plugins/drivers"
|
"github.com/hashicorp/nomad/plugins/drivers"
|
||||||
|
"github.com/hashicorp/nomad/plugins/shared"
|
||||||
)
|
)
|
||||||
|
|
||||||
func (d *Driver) recoverPre09Task(config *drivers.TaskConfig, reattach *plugin.ReattachConfig) error {
|
func (d *Driver) recoverPre09Task(h *drivers.TaskHandle) error {
|
||||||
config.ID = fmt.Sprintf("pre09-%s", uuid.Generate())
|
handle, err := state.UnmarshalPre09HandleID(h.DriverState)
|
||||||
exec, pluginClient, err := executor.ReattachToPre09Executor(reattach,
|
|
||||||
d.logger.With("task_name", config.Name, "alloc_id", config.AllocID))
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
d.logger.Error("failed to reattach to executor", "error", err, "task_name", config.Name)
|
return fmt.Errorf("failed to decode pre09 driver handle: %v", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
reattach, err := shared.ReattachConfigToGoPlugin(handle.ReattachConfig())
|
||||||
|
if err != nil {
|
||||||
|
return fmt.Errorf("failed to decode reattach config from pre09 handle: %v", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
h.Config.ID = fmt.Sprintf("pre09-%s", uuid.Generate())
|
||||||
|
exec, pluginClient, err := executor.ReattachToPre09Executor(reattach,
|
||||||
|
d.logger.With("task_name", h.Config.Name, "alloc_id", h.Config.AllocID))
|
||||||
|
if err != nil {
|
||||||
|
d.logger.Error("failed to reattach to executor", "error", err, "task_name", h.Config.Name)
|
||||||
return fmt.Errorf("failed to reattach to executor: %v", err)
|
return fmt.Errorf("failed to reattach to executor: %v", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
h := &taskHandle{
|
th := &taskHandle{
|
||||||
exec: exec,
|
exec: exec,
|
||||||
pid: reattach.Pid,
|
pid: reattach.Pid,
|
||||||
pluginClient: pluginClient,
|
pluginClient: pluginClient,
|
||||||
taskConfig: config,
|
taskConfig: h.Config,
|
||||||
procState: drivers.TaskStateRunning,
|
procState: drivers.TaskStateRunning,
|
||||||
startedAt: time.Now(),
|
startedAt: time.Now(),
|
||||||
exitResult: &drivers.ExitResult{},
|
exitResult: &drivers.ExitResult{},
|
||||||
}
|
}
|
||||||
|
|
||||||
d.tasks.Set(config.ID, h)
|
d.tasks.Set(h.Config.ID, th)
|
||||||
|
|
||||||
go h.run()
|
go th.run()
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
|
@ -358,19 +358,9 @@ func (d *Driver) RecoverTask(handle *drivers.TaskHandle) error {
|
||||||
return fmt.Errorf("error: handle cannot be nil")
|
return fmt.Errorf("error: handle cannot be nil")
|
||||||
}
|
}
|
||||||
|
|
||||||
// pre 0.9 upgrade path check
|
// COMPAT(0.10): pre 0.9 upgrade path check
|
||||||
if handle.Version == 0 {
|
if handle.Version == 0 {
|
||||||
var reattach shared.ReattachConfig
|
return d.recoverPre09Task(handle)
|
||||||
d.logger.Debug("parsing pre09 driver state", "state", string(handle.DriverState))
|
|
||||||
if err := json.Unmarshal(handle.DriverState, &reattach); err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
|
|
||||||
reattachConfig, err := shared.ReattachConfigToGoPlugin(&reattach)
|
|
||||||
if err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
return d.recoverPre09Task(handle.Config, reattachConfig)
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// If already attached to handle there's nothing to recover.
|
// If already attached to handle there's nothing to recover.
|
||||||
|
|
|
@ -4,33 +4,44 @@ import (
|
||||||
"fmt"
|
"fmt"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
plugin "github.com/hashicorp/go-plugin"
|
"github.com/hashicorp/nomad/client/state"
|
||||||
"github.com/hashicorp/nomad/drivers/shared/executor"
|
"github.com/hashicorp/nomad/drivers/shared/executor"
|
||||||
"github.com/hashicorp/nomad/helper/uuid"
|
"github.com/hashicorp/nomad/helper/uuid"
|
||||||
"github.com/hashicorp/nomad/plugins/drivers"
|
"github.com/hashicorp/nomad/plugins/drivers"
|
||||||
|
"github.com/hashicorp/nomad/plugins/shared"
|
||||||
)
|
)
|
||||||
|
|
||||||
func (d *Driver) recoverPre09Task(config *drivers.TaskConfig, reattach *plugin.ReattachConfig) error {
|
func (d *Driver) recoverPre09Task(h *drivers.TaskHandle) error {
|
||||||
config.ID = fmt.Sprintf("pre09-%s", uuid.Generate())
|
handle, err := state.UnmarshalPre09HandleID(h.DriverState)
|
||||||
exec, pluginClient, err := executor.ReattachToPre09Executor(reattach,
|
|
||||||
d.logger.With("task_name", config.Name, "alloc_id", config.AllocID))
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
d.logger.Error("failed to reattach to executor", "error", err, "task_name", config.Name)
|
return fmt.Errorf("failed to decode pre09 driver handle: %v", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
reattach, err := shared.ReattachConfigToGoPlugin(handle.ReattachConfig())
|
||||||
|
if err != nil {
|
||||||
|
return fmt.Errorf("failed to decode reattach config from pre09 handle: %v", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
h.Config.ID = fmt.Sprintf("pre09-%s", uuid.Generate())
|
||||||
|
exec, pluginClient, err := executor.ReattachToPre09Executor(reattach,
|
||||||
|
d.logger.With("task_name", h.Config.Name, "alloc_id", h.Config.AllocID))
|
||||||
|
if err != nil {
|
||||||
|
d.logger.Error("failed to reattach to executor", "error", err, "task_name", h.Config.Name)
|
||||||
return fmt.Errorf("failed to reattach to executor: %v", err)
|
return fmt.Errorf("failed to reattach to executor: %v", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
h := &taskHandle{
|
th := &taskHandle{
|
||||||
exec: exec,
|
exec: exec,
|
||||||
pid: reattach.Pid,
|
pid: reattach.Pid,
|
||||||
pluginClient: pluginClient,
|
pluginClient: pluginClient,
|
||||||
taskConfig: config,
|
taskConfig: h.Config,
|
||||||
procState: drivers.TaskStateRunning,
|
procState: drivers.TaskStateRunning,
|
||||||
startedAt: time.Now(),
|
startedAt: time.Now(),
|
||||||
exitResult: &drivers.ExitResult{},
|
exitResult: &drivers.ExitResult{},
|
||||||
}
|
}
|
||||||
|
|
||||||
d.tasks.Set(config.ID, h)
|
d.tasks.Set(h.Config.ID, th)
|
||||||
|
|
||||||
go h.run()
|
go th.run()
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
|
@ -6,6 +6,7 @@ import (
|
||||||
_ "github.com/hashicorp/nomad/e2e/affinities"
|
_ "github.com/hashicorp/nomad/e2e/affinities"
|
||||||
_ "github.com/hashicorp/nomad/e2e/consultemplate"
|
_ "github.com/hashicorp/nomad/e2e/consultemplate"
|
||||||
_ "github.com/hashicorp/nomad/e2e/example"
|
_ "github.com/hashicorp/nomad/e2e/example"
|
||||||
|
_ "github.com/hashicorp/nomad/e2e/nomad09upgrade"
|
||||||
_ "github.com/hashicorp/nomad/e2e/spread"
|
_ "github.com/hashicorp/nomad/e2e/spread"
|
||||||
_ "github.com/hashicorp/nomad/e2e/taskevents"
|
_ "github.com/hashicorp/nomad/e2e/taskevents"
|
||||||
)
|
)
|
||||||
|
|
|
@ -6,8 +6,7 @@ job "sleep" {
|
||||||
driver = "docker"
|
driver = "docker"
|
||||||
|
|
||||||
config {
|
config {
|
||||||
image = "busybox"
|
image = "redis"
|
||||||
args = ["sleep", "10000"]
|
|
||||||
}
|
}
|
||||||
|
|
||||||
resources {
|
resources {
|
||||||
|
|
|
@ -220,7 +220,7 @@ func (tc *UpgradePathTC) TestRawExecTaskUpgrade(f *framework.F) {
|
||||||
ver := ver
|
ver := ver
|
||||||
f.T().Run(ver, func(t *testing.T) {
|
f.T().Run(ver, func(t *testing.T) {
|
||||||
t.Parallel()
|
t.Parallel()
|
||||||
tc.testUpgradeForJob(t, ver, "rawexec.nomad")
|
tc.testUpgradeForJob(t, ver, "nomad09upgrade/rawexec.nomad")
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -230,7 +230,17 @@ func (tc *UpgradePathTC) TestExecTaskUpgrade(f *framework.F) {
|
||||||
ver := ver
|
ver := ver
|
||||||
f.T().Run(ver, func(t *testing.T) {
|
f.T().Run(ver, func(t *testing.T) {
|
||||||
t.Parallel()
|
t.Parallel()
|
||||||
tc.testUpgradeForJob(t, ver, "exec.nomad")
|
tc.testUpgradeForJob(t, ver, "nomad09upgrade/exec.nomad")
|
||||||
|
})
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (tc *UpgradePathTC) TestDockerTaskUpgrade(f *framework.F) {
|
||||||
|
for _, ver := range nomadVersions {
|
||||||
|
ver := ver
|
||||||
|
f.T().Run(ver, func(t *testing.T) {
|
||||||
|
t.Parallel()
|
||||||
|
tc.testUpgradeForJob(t, ver, "nomad09upgrade/docker.nomad")
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue