client: first pass at implementing task restoring

Task restoring works but dead tasks may be restarted
This commit is contained in:
Michael Schurter 2018-10-18 13:39:02 -07:00
parent c474b1f652
commit 2bbd88888c
14 changed files with 356 additions and 53 deletions

View file

@ -157,7 +157,7 @@ func (ar *allocRunner) initTaskRunners(tasks []*structs.Task) error {
StateDB: ar.stateDB,
StateUpdater: ar,
Consul: ar.consulClient,
VaultClient: ar.vaultClient,
Vault: ar.vaultClient,
PluginSingletonLoader: ar.pluginSingletonLoader,
}

View file

@ -11,7 +11,6 @@ import (
consulapi "github.com/hashicorp/nomad/client/consul"
"github.com/hashicorp/nomad/client/state"
"github.com/hashicorp/nomad/client/vaultclient"
"github.com/hashicorp/nomad/helper/testlog"
"github.com/hashicorp/nomad/nomad/mock"
"github.com/hashicorp/nomad/nomad/structs"
"github.com/hashicorp/nomad/plugins/shared/catalog"
@ -57,13 +56,12 @@ func (m *MockStateUpdater) Reset() {
// testAllocRunnerConfig returns a new allocrunner.Config with mocks and noop
// versions of dependencies along with a cleanup func.
func testAllocRunnerConfig(t *testing.T, alloc *structs.Allocation) (*Config, func()) {
logger := testlog.HCLogger(t)
pluginLoader := catalog.TestPluginLoader(t)
clientConf, cleanup := config.TestClientConfig(t)
conf := &Config{
// Copy the alloc in case the caller edits and reuses it
Alloc: alloc.Copy(),
Logger: logger,
Logger: clientConf.Logger,
ClientConfig: clientConf,
StateDB: state.NoopDB{},
Consul: consulapi.NewMockConsulServiceClient(t, logger),

View file

@ -61,7 +61,7 @@ func (tr *TaskRunner) Signal(event *structs.TaskEvent, s string) error {
func (tr *TaskRunner) Kill(ctx context.Context, event *structs.TaskEvent) error {
// Cancel the task runner to break out of restart delay or the main run
// loop.
tr.ctxCancel()
tr.killCtxCancel()
// Grab the handle
handle := tr.getDriverHandle()

View file

@ -78,12 +78,18 @@ type TaskRunner struct {
// stateDB is for persisting localState and taskState
stateDB cstate.StateDB
// ctx is the task runner's context representing the tasks's lifecycle.
// Canceling the context will cause the task to be destroyed.
// killCtx is the task runner's context representing the tasks's lifecycle.
// The context is canceled when the task is killed.
killCtx context.Context
// killCtxCancel is called when killing a task.
killCtxCancel context.CancelFunc
// ctx is used to exit the TaskRunner *without* affecting task state.
ctx context.Context
// ctxCancel is used to exit the task runner's Run loop without
// stopping the task. Shutdown hooks are run.
// ctxCancel causes the TaskRunner to exit immediately without
// affecting task state. Useful for testing or graceful agent shutdown.
ctxCancel context.CancelFunc
// Logger is the logger for the task runner.
@ -168,8 +174,8 @@ type Config struct {
TaskDir *allocdir.TaskDir
Logger log.Logger
// VaultClient is the client to use to derive and renew Vault tokens
VaultClient vaultclient.VaultClient
// Vault is the client to use to derive and renew Vault tokens
Vault vaultclient.VaultClient
// StateDB is used to store and restore state.
StateDB cstate.StateDB
@ -183,9 +189,12 @@ type Config struct {
}
func NewTaskRunner(config *Config) (*TaskRunner, error) {
// Create a context for the runner
// Create a context for causing the runner to exit
trCtx, trCancel := context.WithCancel(context.Background())
// Create a context for killing the runner
killCtx, killCancel := context.WithCancel(context.Background())
// Initialize the environment builder
envBuilder := env.NewBuilder(
config.ClientConfig.Node,
@ -210,11 +219,13 @@ func NewTaskRunner(config *Config) (*TaskRunner, error) {
taskLeader: config.Task.Leader,
envBuilder: envBuilder,
consulClient: config.Consul,
vaultClient: config.VaultClient,
vaultClient: config.Vault,
state: tstate,
localState: state.NewLocalState(),
stateDB: config.StateDB,
stateUpdater: config.StateUpdater,
killCtx: killCtx,
killCtxCancel: killCancel,
ctx: trCtx,
ctxCancel: trCancel,
triggerUpdateCh: make(chan struct{}, triggerUpdateChCap),
@ -299,7 +310,16 @@ func (tr *TaskRunner) Run() {
go tr.handleUpdates()
MAIN:
for tr.ctx.Err() == nil {
for {
select {
case <-tr.killCtx.Done():
break MAIN
case <-tr.ctx.Done():
// TaskRunner was told to exit immediately
return
default:
}
// Run the prestart hooks
if err := tr.prestart(); err != nil {
tr.logger.Error("prestart failed", "error", err)
@ -307,8 +327,13 @@ MAIN:
goto RESTART
}
if tr.ctx.Err() != nil {
select {
case <-tr.killCtx.Done():
break MAIN
case <-tr.ctx.Done():
// TaskRunner was told to exit immediately
return
default:
}
// Run the task
@ -327,12 +352,19 @@ MAIN:
{
handle := tr.getDriverHandle()
// Do *not* use tr.ctx here as it would cause Wait() to
// unblock before the task exits when Kill() is called.
// Do *not* use tr.killCtx here as it would cause
// Wait() to unblock before the task exits when Kill()
// is called.
if resultCh, err := handle.WaitCh(context.Background()); err != nil {
tr.logger.Error("wait task failed", "error", err)
} else {
result = <-resultCh
select {
case result = <-resultCh:
// WaitCh returned a result
case <-tr.ctx.Done():
// TaskRunner was told to exit immediately
return
}
}
}
@ -355,9 +387,12 @@ MAIN:
// Actually restart by sleeping and also watching for destroy events
select {
case <-time.After(restartDelay):
case <-tr.ctx.Done():
case <-tr.killCtx.Done():
tr.logger.Trace("task killed between restarts", "delay", restartDelay)
break MAIN
case <-tr.ctx.Done():
// TaskRunner was told to exit immediately
return
}
}
@ -444,17 +479,76 @@ func (tr *TaskRunner) runDriver() error {
//TODO mounts and devices
//XXX Evaluate and encode driver config
// Start the job
handle, net, err := tr.driver.StartTask(taskConfig)
var handle *drivers.TaskHandle
var net *cstructs.DriverNetwork
var err error
// Check to see if a task handle was restored
tr.localStateLock.RLock()
handle = tr.localState.TaskHandle
net = tr.localState.DriverNetwork
tr.localStateLock.RUnlock()
if handle != nil {
tr.logger.Trace("restored handle; recovering task", "task_id", handle.Config.ID)
if err := tr.driver.RecoverTask(handle); err != nil {
tr.logger.Error("error recovering task; destroying and restarting",
"error", err, "task_id", handle.Config.ID)
// Clear invalid task state
tr.localStateLock.Lock()
tr.localState.TaskHandle = nil
tr.localState.DriverNetwork = nil
tr.localStateLock.Unlock()
// Try to cleanup any existing task state in the plugin before restarting
if err := tr.driver.DestroyTask(handle.Config.ID, true); err != nil {
// Ignore ErrTaskNotFound errors as ideally
// this task has already been stopped and
// therefore doesn't exist.
if err != drivers.ErrTaskNotFound {
tr.logger.Warn("error destroying unrecoverable task",
"error", err, "task_id", handle.Config.ID)
}
}
goto START
}
// Update driver handle on task runner
tr.setDriverHandle(NewDriverHandle(tr.driver, handle.Config.ID, tr.Task(), net))
// Ensure running state is persisted but do *not* append a new
// task event as restoring is a client event and not relevant
// to a task's lifecycle.
if err := tr.updateStateImpl(structs.TaskStateRunning); err != nil {
tr.logger.Warn("error persisting task state", "error", err)
}
return nil
}
START:
// Start the job if there's no existing handle (or if RecoverTask failed)
handle, net, err = tr.driver.StartTask(taskConfig)
if err != nil {
return fmt.Errorf("driver start failed: %v", err)
}
tr.localStateLock.Lock()
tr.localState.TaskHandle = handle
tr.localState.DriverNetwork = net
if err := tr.stateDB.PutTaskRunnerLocalState(tr.allocID, tr.taskName, tr.localState); err != nil {
//TODO Nomad will be unable to restore this task; try to kill
// it now and fail? In general we prefer to leave running
// tasks running even if the agent encounters an error.
tr.logger.Warn("error persisting local task state; may be unable to restore after a Nomad restart",
"error", err, "task_id", handle.Config.ID)
}
tr.localStateLock.Unlock()
tr.setDriverHandle(NewDriverHandle(tr.driver, taskConfig.ID, tr.Task(), net))
// Emit an event that we started
tr.UpdateState(structs.TaskStateRunning, structs.NewTaskEvent(structs.TaskStarted))
return nil
@ -601,29 +695,33 @@ func (tr *TaskRunner) Restore() error {
// UpdateState sets the task runners allocation state and triggers a server
// update.
func (tr *TaskRunner) UpdateState(state string, event *structs.TaskEvent) {
tr.stateLock.Lock()
defer tr.stateLock.Unlock()
tr.logger.Trace("setting task state", "state", state, "event", event.Type)
// Update the local state
tr.setStateLocal(state, event)
// Append the event
tr.appendEvent(event)
// Update the state
if err := tr.updateStateImpl(state); err != nil {
// Only log the error as we persistence errors should not
// affect task state.
tr.logger.Error("error persisting task state", "error", err, "event", event, "state", state)
}
// Notify the alloc runner of the transition
tr.stateUpdater.TaskStateUpdated()
}
// setStateLocal updates the local in-memory state, persists a copy to disk and returns a
// copy of the task's state.
func (tr *TaskRunner) setStateLocal(state string, event *structs.TaskEvent) {
tr.stateLock.Lock()
defer tr.stateLock.Unlock()
// updateStateImpl updates the in-memory task state and persists to disk.
func (tr *TaskRunner) updateStateImpl(state string) error {
// Update the task state
oldState := tr.state.State
taskState := tr.state
taskState.State = state
// Append the event
tr.appendEvent(event)
// Handle the state transition.
switch state {
case structs.TaskStateRunning:
@ -662,11 +760,7 @@ func (tr *TaskRunner) setStateLocal(state string, event *structs.TaskEvent) {
}
// Persist the state and event
if err := tr.stateDB.PutTaskState(tr.allocID, tr.taskName, taskState); err != nil {
// Only a warning because the next event/state-transition will
// try to persist it again.
tr.logger.Error("error persisting task state", "error", err, "event", event, "state", state)
}
return tr.stateDB.PutTaskState(tr.allocID, tr.taskName, taskState)
}
// EmitEvent appends a new TaskEvent to this task's TaskState. The actual

View file

@ -123,7 +123,7 @@ func (tr *TaskRunner) prestart() error {
// Run the prestart hook
var resp interfaces.TaskPrestartResponse
if err := pre.Prestart(tr.ctx, &req, &resp); err != nil {
if err := pre.Prestart(tr.killCtx, &req, &resp); err != nil {
return structs.WrapRecoverable(fmt.Sprintf("prestart hook %q failed: %v", name, err), err)
}
@ -195,7 +195,7 @@ func (tr *TaskRunner) poststart() error {
TaskEnv: tr.envBuilder.Build(),
}
var resp interfaces.TaskPoststartResponse
if err := post.Poststart(tr.ctx, &req, &resp); err != nil {
if err := post.Poststart(tr.killCtx, &req, &resp); err != nil {
merr.Errors = append(merr.Errors, fmt.Errorf("poststart hook %q failed: %v", name, err))
}
@ -237,7 +237,7 @@ func (tr *TaskRunner) exited() error {
req := interfaces.TaskExitedRequest{}
var resp interfaces.TaskExitedResponse
if err := post.Exited(tr.ctx, &req, &resp); err != nil {
if err := post.Exited(tr.killCtx, &req, &resp); err != nil {
merr.Errors = append(merr.Errors, fmt.Errorf("exited hook %q failed: %v", name, err))
}
@ -280,7 +280,7 @@ func (tr *TaskRunner) stop() error {
req := interfaces.TaskStopRequest{}
var resp interfaces.TaskStopResponse
if err := post.Stop(tr.ctx, &req, &resp); err != nil {
if err := post.Stop(tr.killCtx, &req, &resp); err != nil {
merr.Errors = append(merr.Errors, fmt.Errorf("stop hook %q failed: %v", name, err))
}
@ -336,7 +336,7 @@ func (tr *TaskRunner) updateHooks() {
// Run the update hook
var resp interfaces.TaskUpdateResponse
if err := upd.Update(tr.ctx, &req, &resp); err != nil {
if err := upd.Update(tr.killCtx, &req, &resp); err != nil {
tr.logger.Error("update hook failed", "name", name, "error", err)
}

View file

@ -0,0 +1,163 @@
package taskrunner
import (
"context"
"fmt"
"path/filepath"
"testing"
"time"
"github.com/hashicorp/nomad/client/allocdir"
"github.com/hashicorp/nomad/client/config"
consulapi "github.com/hashicorp/nomad/client/consul"
cstate "github.com/hashicorp/nomad/client/state"
"github.com/hashicorp/nomad/client/vaultclient"
"github.com/hashicorp/nomad/helper/testlog"
"github.com/hashicorp/nomad/nomad/mock"
"github.com/hashicorp/nomad/nomad/structs"
"github.com/hashicorp/nomad/plugins/shared/catalog"
"github.com/hashicorp/nomad/plugins/shared/singleton"
"github.com/hashicorp/nomad/testutil"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)
type MockTaskStateUpdater struct {
ch chan struct{}
}
func NewMockTaskStateUpdater() *MockTaskStateUpdater {
return &MockTaskStateUpdater{
ch: make(chan struct{}, 1),
}
}
func (m *MockTaskStateUpdater) TaskStateUpdated() {
select {
case m.ch <- struct{}{}:
default:
}
}
// testTaskRunnerConfig returns a taskrunner.Config for the given alloc+task
// plus a cleanup func.
func testTaskRunnerConfig(t *testing.T, alloc *structs.Allocation, taskName string) (*Config, func()) {
logger := testlog.HCLogger(t)
pluginLoader := catalog.TestPluginLoader(t)
clientConf, cleanup := config.TestClientConfig(t)
// Find the task
var thisTask *structs.Task
for _, tg := range alloc.Job.TaskGroups {
for _, task := range tg.Tasks {
if task.Name == taskName {
if thisTask != nil {
cleanup()
t.Fatalf("multiple tasks named %q; cannot use this helper", taskName)
}
thisTask = task
}
}
}
if thisTask == nil {
cleanup()
t.Fatalf("could not find task %q", taskName)
}
// Create the alloc dir + task dir
allocPath := filepath.Join(clientConf.AllocDir, alloc.ID)
allocDir := allocdir.NewAllocDir(logger, allocPath)
if err := allocDir.Build(); err != nil {
cleanup()
t.Fatalf("error building alloc dir: %v", err)
}
taskDir := allocDir.NewTaskDir(taskName)
trCleanup := func() {
if err := allocDir.Destroy(); err != nil {
t.Logf("error destroying alloc dir: %v", err)
}
cleanup()
}
conf := &Config{
Alloc: alloc,
ClientConfig: clientConf,
Consul: consulapi.NewMockConsulServiceClient(t, logger),
Task: thisTask,
TaskDir: taskDir,
Logger: clientConf.Logger,
Vault: vaultclient.NewMockVaultClient(),
StateDB: cstate.NoopDB{},
StateUpdater: NewMockTaskStateUpdater(),
PluginSingletonLoader: singleton.NewSingletonLoader(logger, pluginLoader),
}
return conf, trCleanup
}
// TestTaskRunner_Restore asserts restoring a running task does not rerun the
// task.
func TestTaskRunner_Restore_Running(t *testing.T) {
t.Parallel()
require := require.New(t)
alloc := mock.BatchAlloc()
alloc.Job.TaskGroups[0].Count = 1
task := alloc.Job.TaskGroups[0].Tasks[0]
task.Name = "testtask"
task.Driver = "mock_driver"
task.Config = map[string]interface{}{
"run_for": 2 * time.Second,
}
conf, cleanup := testTaskRunnerConfig(t, alloc, "testtask")
conf.StateDB = cstate.NewMemDB() // "persist" state between task runners
defer cleanup()
// Run the first TaskRunner
origTR, err := NewTaskRunner(conf)
require.NoError(err)
go origTR.Run()
defer origTR.Kill(context.Background(), structs.NewTaskEvent("cleanup"))
// Wait for it to be running
testutil.WaitForResult(func() (bool, error) {
ts := origTR.TaskState()
return ts.State == structs.TaskStateRunning, fmt.Errorf("%v", ts.State)
}, func(err error) {
t.Fatalf("expected running; got: %v", err)
})
// Cause TR to exit without shutting down task
origTR.ctxCancel()
<-origTR.WaitCh()
// Start a new TaskRunner and make sure it does not rerun the task
newTR, err := NewTaskRunner(conf)
require.NoError(err)
// Do the Restore
require.NoError(newTR.Restore())
go newTR.Run()
defer newTR.Kill(context.Background(), structs.NewTaskEvent("cleanup"))
// Wait for new task runner to exit when the process does
<-newTR.WaitCh()
// Assert that the process was only started once, and only restored once
started := 0
restored := 0
state := newTR.TaskState()
require.Equal(structs.TaskStateDead, state.State)
for _, ev := range state.Events {
t.Logf("task event: %s %s", ev.Type, ev.Message)
switch ev.Type {
case structs.TaskStarted:
started++
case structs.TaskRestored:
restored++
}
}
assert.Equal(t, 1, started)
assert.Equal(t, 1, restored)
}

View file

@ -6,6 +6,7 @@ import (
"path/filepath"
"github.com/hashicorp/nomad/helper"
"github.com/hashicorp/nomad/helper/testlog"
"github.com/hashicorp/nomad/nomad/structs"
"github.com/mitchellh/go-testing-interface"
)
@ -14,6 +15,7 @@ import (
// a cleanup func to remove the state and alloc dirs when finished.
func TestClientConfig(t testing.T) (*Config, func()) {
conf := DefaultConfig()
conf.Logger = testlog.HCLogger(t)
// Create a tempdir to hold state and alloc subdirs
parent, err := ioutil.TempDir("", "nomadtest")

View file

@ -273,9 +273,23 @@ func (d *Driver) buildFingerprint() *drivers.Fingerprint {
}
}
func (d *Driver) RecoverTask(*drivers.TaskHandle) error {
//TODO is there anything to do here?
return nil
func (d *Driver) RecoverTask(h *drivers.TaskHandle) error {
if h == nil {
return fmt.Errorf("handle cannot be nil")
}
if _, ok := d.tasks.Get(h.Config.ID); ok {
d.logger.Debug("nothing to recover; task already exists",
"task_id", h.Config.ID,
"task_name", h.Config.Name,
)
return nil
}
// Recovering a task requires the task to be running external to the
// plugin. Since the mock_driver runs all tasks in process it cannot
// recover tasks.
return fmt.Errorf("%s cannot recover tasks", pluginName)
}
func (d *Driver) StartTask(cfg *drivers.TaskConfig) (*drivers.TaskHandle, *cstructs.DriverNetwork, error) {

View file

@ -244,6 +244,15 @@ func (d *Driver) RecoverTask(handle *drivers.TaskHandle) error {
return fmt.Errorf("error: handle cannot be nil")
}
// If already attached to handle there's nothing to recover.
if _, ok := d.tasks.Get(handle.Config.ID); ok {
d.logger.Trace("nothing to recover; task already exists",
"task_id", handle.Config.ID,
"task_name", handle.Config.Name,
)
return nil
}
var taskState TaskState
if err := handle.GetDriverState(&taskState); err != nil {
d.logger.Error("failed to decode taskConfig state from handle", "error", err, "task_id", handle.Config.ID)

View file

@ -242,9 +242,19 @@ func (d *Driver) buildFingerprint() *drivers.Fingerprint {
func (d *Driver) RecoverTask(handle *drivers.TaskHandle) error {
if handle == nil {
return fmt.Errorf("error: handle cannot be nil")
return fmt.Errorf("handle cannot be nil")
}
// If already attached to handle there's nothing to recover.
if _, ok := d.tasks.Get(handle.Config.ID); ok {
d.logger.Trace("nothing to recover; task already exists",
"task_id", handle.Config.ID,
"task_name", handle.Config.Name,
)
return nil
}
// Handle doesn't already exist, try to reattach
var taskState TaskState
if err := handle.GetDriverState(&taskState); err != nil {
d.logger.Error("failed to decode task state from handle", "error", err, "task_id", handle.Config.ID)
@ -261,6 +271,7 @@ func (d *Driver) RecoverTask(handle *drivers.TaskHandle) error {
Reattach: plugRC,
}
// Create client for reattached executor
exec, pluginClient, err := utils.CreateExecutorWithConfig(pluginConfig, os.Stderr)
if err != nil {
d.logger.Error("failed to reattach to executor", "error", err, "task_id", handle.Config.ID)

View file

@ -317,6 +317,15 @@ func (d *Driver) RecoverTask(handle *drivers.TaskHandle) error {
return fmt.Errorf("error: handle cannot be nil")
}
// If already attached to handle there's nothing to recover.
if _, ok := d.tasks.Get(handle.Config.ID); ok {
d.logger.Trace("nothing to recover; task already exists",
"task_id", handle.Config.ID,
"task_name", handle.Config.Name,
)
return nil
}
var taskState TaskState
if err := handle.GetDriverState(&taskState); err != nil {
d.logger.Error("failed to decode taskConfig state from handle", "error", err, "task_id", handle.Config.ID)

View file

@ -46,7 +46,7 @@ type DriverPlugin interface {
// DriverPlugin interface.
type DriverSignalTaskNotSupported struct{}
func (_ DriverSignalTaskNotSupported) SignalTask(taskID, signal string) error {
func (DriverSignalTaskNotSupported) SignalTask(taskID, signal string) error {
return fmt.Errorf("SignalTask is not supported by this driver")
}

View file

@ -11,7 +11,7 @@ type TaskHandle struct {
Driver string
Config *TaskConfig
State TaskState
driverState []byte
DriverState []byte
}
func NewTaskHandle(driver string) *TaskHandle {
@ -19,12 +19,12 @@ func NewTaskHandle(driver string) *TaskHandle {
}
func (h *TaskHandle) SetDriverState(v interface{}) error {
h.driverState = []byte{}
return base.MsgPackEncode(&h.driverState, v)
h.DriverState = []byte{}
return base.MsgPackEncode(&h.DriverState, v)
}
func (h *TaskHandle) GetDriverState(v interface{}) error {
return base.MsgPackDecode(h.driverState, v)
return base.MsgPackDecode(h.DriverState, v)
}
@ -34,7 +34,10 @@ func (h *TaskHandle) Copy() *TaskHandle {
}
handle := new(TaskHandle)
*handle = *h
handle.Driver = h.Driver
handle.Config = h.Config.Copy()
handle.State = h.State
handle.DriverState = make([]byte, len(h.DriverState))
copy(handle.DriverState, h.DriverState)
return handle
}

View file

@ -194,7 +194,7 @@ func taskHandleFromProto(pb *proto.TaskHandle) *TaskHandle {
return &TaskHandle{
Config: taskConfigFromProto(pb.Config),
State: taskStateFromProtoMap[pb.State],
driverState: pb.DriverState,
DriverState: pb.DriverState,
}
}
@ -202,7 +202,7 @@ func taskHandleToProto(handle *TaskHandle) *proto.TaskHandle {
return &proto.TaskHandle{
Config: taskConfigToProto(handle.Config),
State: taskStateToProtoMap[handle.State],
DriverState: handle.driverState,
DriverState: handle.DriverState,
}
}