From e012d9411ef1fce0f04383851123ed854dc80376 Mon Sep 17 00:00:00 2001 From: Luiz Aoqui Date: Wed, 24 Aug 2022 17:43:07 -0400 Subject: [PATCH] Task lifecycle restart (#14127) * allocrunner: handle lifecycle when all tasks die When all tasks die the Coordinator must transition to its terminal state, coordinatorStatePoststop, to unblock poststop tasks. Since this could happen at any time (for example, a prestart task dies), all states must be able to transition to this terminal state. * allocrunner: implement different alloc restarts Add a new alloc restart mode where all tasks are restarted, even if they have already exited. Also unifies the alloc restart logic to use the implementation that restarts tasks concurrently and ignores ErrTaskNotRunning errors since those are expected when restarting the allocation. * allocrunner: allow tasks to run again Prevent the task runner Run() method from exiting to allow a dead task to run again. When the task runner is signaled to restart, the function will jump back to the MAIN loop and run it again. The task runner determines if a task needs to run again based on two new task events that were added to differentiate between a request to restart a specific task, the tasks that are currently running, or all tasks that have already run. * api/cli: add support for all tasks alloc restart Implement the new -all-tasks alloc restart CLI flag and its API counterpar, AllTasks. The client endpoint calls the appropriate restart method from the allocrunner depending on the restart parameters used. * test: fix tasklifecycle Coordinator test * allocrunner: kill taskrunners if all tasks are dead When all non-poststop tasks are dead we need to kill the taskrunners so we don't leak their goroutines, which are blocked in the alloc restart loop. This also ensures the allocrunner exits on its own. * taskrunner: fix tests that waited on WaitCh Now that "dead" tasks may run again, the taskrunner Run() method will not return when the task finishes running, so tests must wait for the task state to be "dead" instead of using the WaitCh, since it won't be closed until the taskrunner is killed. * tests: add tests for all tasks alloc restart * changelog: add entry for #14127 * taskrunner: fix restore logic. The first implementation of the task runner restore process relied on server data (`tr.Alloc().TerminalStatus()`) which may not be available to the client at the time of restore. It also had the incorrect code path. When restoring a dead task the driver handle always needs to be clear cleanly using `clearDriverHandle` otherwise, after exiting the MAIN loop, the task may be killed by `tr.handleKill`. The fix is to store the state of the Run() loop in the task runner local client state: if the task runner ever exits this loop cleanly (not with a shutdown) it will never be able to run again. So if the Run() loops starts with this local state flag set, it must exit early. This local state flag is also being checked on task restart requests. If the task is "dead" and its Run() loop is not active it will never be able to run again. * address code review requests * apply more code review changes * taskrunner: add different Restart modes Using the task event to differentiate between the allocrunner restart methods proved to be confusing for developers to understand how it all worked. So instead of relying on the event type, this commit separated the logic of restarting an taskRunner into two methods: - `Restart` will retain the current behaviour and only will only restart the task if it's currently running. - `ForceRestart` is the new method where a `dead` task is allowed to restart if its `Run()` method is still active. Callers will need to restart the allocRunner taskCoordinator to make sure it will allow the task to run again. * minor fixes --- .changelog/14127.txt | 7 + api/allocations.go | 21 +- client/alloc_endpoint.go | 2 +- client/alloc_endpoint_test.go | 39 ++ client/allocrunner/alloc_runner.go | 145 +++--- client/allocrunner/alloc_runner_test.go | 463 +++++++++++++++++- client/allocrunner/alloc_runner_unix_test.go | 12 +- .../allocrunner/tasklifecycle/coordinator.go | 42 +- .../tasklifecycle/coordinator_test.go | 15 + client/allocrunner/taskrunner/lifecycle.go | 102 +++- .../allocrunner/taskrunner/sids_hook_test.go | 7 +- client/allocrunner/taskrunner/state/state.go | 6 + client/allocrunner/taskrunner/task_runner.go | 77 ++- .../taskrunner/task_runner_test.go | 213 +++++--- client/allocrunner/testing.go | 12 + client/client.go | 22 +- command/agent/alloc_endpoint.go | 4 + command/alloc_restart.go | 47 +- nomad/mock/mock.go | 44 ++ nomad/structs/structs.go | 6 +- website/content/api-docs/allocations.mdx | 7 + .../content/docs/commands/alloc/restart.mdx | 17 +- 22 files changed, 1098 insertions(+), 212 deletions(-) create mode 100644 .changelog/14127.txt diff --git a/.changelog/14127.txt b/.changelog/14127.txt new file mode 100644 index 000000000..61c036877 --- /dev/null +++ b/.changelog/14127.txt @@ -0,0 +1,7 @@ +```release-note:improvement +client: add option to restart all tasks of an allocation, regardless of lifecycle type or state. +``` + +```release-note:improvement +client: only start poststop tasks after poststart tasks are done. +``` diff --git a/api/allocations.go b/api/allocations.go index f1fce2c6d..dc2ebb279 100644 --- a/api/allocations.go +++ b/api/allocations.go @@ -141,7 +141,9 @@ func (a *Allocations) GC(alloc *Allocation, q *QueryOptions) error { return err } -// Restart restarts an allocation. +// Restart restarts the tasks that are currently running or a specific task if +// taskName is provided. An error is returned if the task to be restarted is +// not running. // // Note: for cluster topologies where API consumers don't have network access to // Nomad clients, set api.ClientConnTimeout to a small value (ex 1ms) to avoid @@ -156,6 +158,22 @@ func (a *Allocations) Restart(alloc *Allocation, taskName string, q *QueryOption return err } +// RestartAllTasks restarts all tasks in the allocation, regardless of +// lifecycle type or state. Tasks will restart following their lifecycle order. +// +// Note: for cluster topologies where API consumers don't have network access to +// Nomad clients, set api.ClientConnTimeout to a small value (ex 1ms) to avoid +// long pauses on this API call. +func (a *Allocations) RestartAllTasks(alloc *Allocation, q *QueryOptions) error { + req := AllocationRestartRequest{ + AllTasks: true, + } + + var resp struct{} + _, err := a.client.putQuery("/v1/client/allocation/"+alloc.ID+"/restart", &req, &resp, q) + return err +} + // Stop stops an allocation. // // Note: for cluster topologies where API consumers don't have network access to @@ -447,6 +465,7 @@ func (a Allocation) RescheduleInfo(t time.Time) (int, int) { type AllocationRestartRequest struct { TaskName string + AllTasks bool } type AllocSignalRequest struct { diff --git a/client/alloc_endpoint.go b/client/alloc_endpoint.go index 2be7dfb41..52ab8f414 100644 --- a/client/alloc_endpoint.go +++ b/client/alloc_endpoint.go @@ -102,7 +102,7 @@ func (a *Allocations) Restart(args *nstructs.AllocRestartRequest, reply *nstruct return nstructs.ErrPermissionDenied } - return a.c.RestartAllocation(args.AllocID, args.TaskName) + return a.c.RestartAllocation(args.AllocID, args.TaskName, args.AllTasks) } // Stats is used to collect allocation statistics diff --git a/client/alloc_endpoint_test.go b/client/alloc_endpoint_test.go index c8f2560d0..f3c6e3e2b 100644 --- a/client/alloc_endpoint_test.go +++ b/client/alloc_endpoint_test.go @@ -68,6 +68,45 @@ func TestAllocations_Restart(t *testing.T) { }) } +func TestAllocations_RestartAllTasks(t *testing.T) { + ci.Parallel(t) + + require := require.New(t) + client, cleanup := TestClient(t, nil) + defer cleanup() + + alloc := mock.LifecycleAlloc() + require.Nil(client.addAlloc(alloc, "")) + + // Can't restart all tasks while specifying a task name. + req := &nstructs.AllocRestartRequest{ + AllocID: alloc.ID, + AllTasks: true, + TaskName: "web", + } + var resp nstructs.GenericResponse + err := client.ClientRPC("Allocations.Restart", &req, &resp) + require.Error(err) + + // Good request. + req = &nstructs.AllocRestartRequest{ + AllocID: alloc.ID, + AllTasks: true, + } + + testutil.WaitForResult(func() (bool, error) { + var resp2 nstructs.GenericResponse + err := client.ClientRPC("Allocations.Restart", &req, &resp2) + if err != nil && strings.Contains(err.Error(), "not running") { + return false, err + } + + return true, nil + }, func(err error) { + t.Fatalf("err: %v", err) + }) +} + func TestAllocations_Restart_ACL(t *testing.T) { ci.Parallel(t) require := require.New(t) diff --git a/client/allocrunner/alloc_runner.go b/client/allocrunner/alloc_runner.go index 33bc57a8f..1d50c2595 100644 --- a/client/allocrunner/alloc_runner.go +++ b/client/allocrunner/alloc_runner.go @@ -28,7 +28,6 @@ import ( cstate "github.com/hashicorp/nomad/client/state" cstructs "github.com/hashicorp/nomad/client/structs" "github.com/hashicorp/nomad/client/vaultclient" - agentconsul "github.com/hashicorp/nomad/command/agent/consul" "github.com/hashicorp/nomad/helper/pointer" "github.com/hashicorp/nomad/nomad/structs" "github.com/hashicorp/nomad/plugins/device" @@ -547,40 +546,64 @@ func (ar *allocRunner) handleTaskStateUpdates() { } } - // if all live runners are sidecars - kill alloc - if killEvent == nil && hasSidecars && !hasNonSidecarTasks(liveRunners) { - killEvent = structs.NewTaskEvent(structs.TaskMainDead) - } - - // If there's a kill event set and live runners, kill them - if killEvent != nil && len(liveRunners) > 0 { - - // Log kill reason - switch killEvent.Type { - case structs.TaskLeaderDead: - ar.logger.Debug("leader task dead, destroying all tasks", "leader_task", killTask) - case structs.TaskMainDead: - ar.logger.Debug("main tasks dead, destroying all sidecar tasks") - default: - ar.logger.Debug("task failure, destroying all tasks", "failed_task", killTask) + if len(liveRunners) > 0 { + // if all live runners are sidecars - kill alloc + onlySidecarsRemaining := hasSidecars && !hasNonSidecarTasks(liveRunners) + if killEvent == nil && onlySidecarsRemaining { + killEvent = structs.NewTaskEvent(structs.TaskMainDead) } - // Emit kill event for live runners - for _, tr := range liveRunners { - tr.EmitEvent(killEvent) + // If there's a kill event set and live runners, kill them + if killEvent != nil { + + // Log kill reason + switch killEvent.Type { + case structs.TaskLeaderDead: + ar.logger.Debug("leader task dead, destroying all tasks", "leader_task", killTask) + case structs.TaskMainDead: + ar.logger.Debug("main tasks dead, destroying all sidecar tasks") + default: + ar.logger.Debug("task failure, destroying all tasks", "failed_task", killTask) + } + + // Emit kill event for live runners + for _, tr := range liveRunners { + tr.EmitEvent(killEvent) + } + + // Kill 'em all + states = ar.killTasks() + + // Wait for TaskRunners to exit before continuing. This will + // prevent looping before TaskRunners have transitioned to + // Dead. + for _, tr := range liveRunners { + ar.logger.Info("waiting for task to exit", "task", tr.Task().Name) + select { + case <-tr.WaitCh(): + case <-ar.waitCh: + } + } } + } else { + // If there are no live runners left kill all non-poststop task + // runners to unblock them from the alloc restart loop. + for _, tr := range ar.tasks { + if tr.IsPoststopTask() { + continue + } - // Kill 'em all - states = ar.killTasks() - - // Wait for TaskRunners to exit before continuing to - // prevent looping before TaskRunners have transitioned - // to Dead. - for _, tr := range liveRunners { - ar.logger.Info("killing task", "task", tr.Task().Name) select { case <-tr.WaitCh(): case <-ar.waitCh: + default: + // Kill task runner without setting an event because the + // task is already dead, it's just waiting in the alloc + // restart loop. + err := tr.Kill(context.TODO(), nil) + if err != nil { + ar.logger.Warn("failed to kill task", "task", tr.Task().Name, "error", err) + } } } } @@ -648,7 +671,7 @@ func (ar *allocRunner) killTasks() map[string]*structs.TaskState { break } - // Kill the rest non-sidecar or poststop tasks concurrently + // Kill the rest non-sidecar and non-poststop tasks concurrently wg := sync.WaitGroup{} for name, tr := range ar.tasks { // Filter out poststop and sidecar tasks so that they stop after all the other tasks are killed @@ -1205,19 +1228,37 @@ func (ar *allocRunner) GetTaskEventHandler(taskName string) drivermanager.EventH return nil } -// RestartTask signalls the task runner for the provided task to restart. -func (ar *allocRunner) RestartTask(taskName string, taskEvent *structs.TaskEvent) error { +// Restart satisfies the WorkloadRestarter interface and restarts all tasks +// that are currently running. +func (ar *allocRunner) Restart(ctx context.Context, event *structs.TaskEvent, failure bool) error { + return ar.restartTasks(ctx, event, failure, false) +} + +// RestartTask restarts the provided task. +func (ar *allocRunner) RestartTask(taskName string, event *structs.TaskEvent) error { tr, ok := ar.tasks[taskName] if !ok { return fmt.Errorf("Could not find task runner for task: %s", taskName) } - return tr.Restart(context.TODO(), taskEvent, false) + return tr.Restart(context.TODO(), event, false) } -// Restart satisfies the WorkloadRestarter interface restarts all task runners -// concurrently -func (ar *allocRunner) Restart(ctx context.Context, event *structs.TaskEvent, failure bool) error { +// RestartRunning restarts all tasks that are currently running. +func (ar *allocRunner) RestartRunning(event *structs.TaskEvent) error { + return ar.restartTasks(context.TODO(), event, false, false) +} + +// RestartAll restarts all tasks in the allocation, including dead ones. They +// will restart following their lifecycle order. +func (ar *allocRunner) RestartAll(event *structs.TaskEvent) error { + // Restart the taskCoordinator to allow dead tasks to run again. + ar.taskCoordinator.Restart() + return ar.restartTasks(context.TODO(), event, false, true) +} + +// restartTasks restarts all task runners concurrently. +func (ar *allocRunner) restartTasks(ctx context.Context, event *structs.TaskEvent, failure bool, force bool) error { waitCh := make(chan struct{}) var err *multierror.Error var errMutex sync.Mutex @@ -1230,10 +1271,19 @@ func (ar *allocRunner) Restart(ctx context.Context, event *structs.TaskEvent, fa defer close(waitCh) for tn, tr := range ar.tasks { wg.Add(1) - go func(taskName string, r agentconsul.WorkloadRestarter) { + go func(taskName string, taskRunner *taskrunner.TaskRunner) { defer wg.Done() - e := r.Restart(ctx, event, failure) - if e != nil { + + var e error + if force { + e = taskRunner.ForceRestart(ctx, event.Copy(), failure) + } else { + e = taskRunner.Restart(ctx, event.Copy(), failure) + } + + // Ignore ErrTaskNotRunning errors since tasks that are not + // running are expected to not be restarted. + if e != nil && e != taskrunner.ErrTaskNotRunning { errMutex.Lock() defer errMutex.Unlock() err = multierror.Append(err, fmt.Errorf("failed to restart task %s: %v", taskName, e)) @@ -1251,25 +1301,6 @@ func (ar *allocRunner) Restart(ctx context.Context, event *structs.TaskEvent, fa return err.ErrorOrNil() } -// RestartAll signalls all task runners in the allocation to restart and passes -// a copy of the task event to each restart event. -// Returns any errors in a concatenated form. -func (ar *allocRunner) RestartAll(taskEvent *structs.TaskEvent) error { - var err *multierror.Error - - // run alloc task restart hooks - ar.taskRestartHooks() - - for tn := range ar.tasks { - rerr := ar.RestartTask(tn, taskEvent.Copy()) - if rerr != nil { - err = multierror.Append(err, rerr) - } - } - - return err.ErrorOrNil() -} - // Signal sends a signal request to task runners inside an allocation. If the // taskName is empty, then it is sent to all tasks. func (ar *allocRunner) Signal(taskName, signal string) error { diff --git a/client/allocrunner/alloc_runner_test.go b/client/allocrunner/alloc_runner_test.go index 0494c938a..90b94657b 100644 --- a/client/allocrunner/alloc_runner_test.go +++ b/client/allocrunner/alloc_runner_test.go @@ -10,6 +10,7 @@ import ( "time" "github.com/hashicorp/consul/api" + multierror "github.com/hashicorp/go-multierror" "github.com/hashicorp/nomad/ci" "github.com/hashicorp/nomad/client/allochealth" "github.com/hashicorp/nomad/client/allocrunner/tasklifecycle" @@ -483,6 +484,464 @@ func TestAllocRunner_Lifecycle_Poststop(t *testing.T) { } +func TestAllocRunner_Lifecycle_Restart(t *testing.T) { + ci.Parallel(t) + + // test cases can use this default or override w/ taskDefs param + alloc := mock.LifecycleAllocFromTasks([]mock.LifecycleTaskDef{ + {Name: "main", RunFor: "100s", ExitCode: 0, Hook: "", IsSidecar: false}, + {Name: "prestart-oneshot", RunFor: "1s", ExitCode: 0, Hook: "prestart", IsSidecar: false}, + {Name: "prestart-sidecar", RunFor: "100s", ExitCode: 0, Hook: "prestart", IsSidecar: true}, + {Name: "poststart-oneshot", RunFor: "1s", ExitCode: 0, Hook: "poststart", IsSidecar: false}, + {Name: "poststart-sidecar", RunFor: "100s", ExitCode: 0, Hook: "poststart", IsSidecar: true}, + {Name: "poststop", RunFor: "1s", ExitCode: 0, Hook: "poststop", IsSidecar: false}, + }) + alloc.Job.Type = structs.JobTypeService + rp := &structs.RestartPolicy{ + Attempts: 1, + Interval: 10 * time.Minute, + Delay: 1 * time.Nanosecond, + Mode: structs.RestartPolicyModeFail, + } + + ev := &structs.TaskEvent{Type: structs.TaskRestartSignal} + + testCases := []struct { + name string + taskDefs []mock.LifecycleTaskDef + isBatch bool + hasLeader bool + action func(*allocRunner, *structs.Allocation) error + expectedErr string + expectedAfter map[string]structs.TaskState + }{ + { + name: "restart entire allocation", + action: func(ar *allocRunner, alloc *structs.Allocation) error { + return ar.RestartAll(ev) + }, + expectedAfter: map[string]structs.TaskState{ + "main": structs.TaskState{State: "running", Restarts: 1}, + "prestart-oneshot": structs.TaskState{State: "dead", Restarts: 1}, + "prestart-sidecar": structs.TaskState{State: "running", Restarts: 1}, + "poststart-oneshot": structs.TaskState{State: "dead", Restarts: 1}, + "poststart-sidecar": structs.TaskState{State: "running", Restarts: 1}, + "poststop": structs.TaskState{State: "pending", Restarts: 0}, + }, + }, + { + name: "restart only running tasks", + action: func(ar *allocRunner, alloc *structs.Allocation) error { + return ar.RestartRunning(ev) + }, + expectedAfter: map[string]structs.TaskState{ + "main": structs.TaskState{State: "running", Restarts: 1}, + "prestart-oneshot": structs.TaskState{State: "dead", Restarts: 0}, + "prestart-sidecar": structs.TaskState{State: "running", Restarts: 1}, + "poststart-oneshot": structs.TaskState{State: "dead", Restarts: 0}, + "poststart-sidecar": structs.TaskState{State: "running", Restarts: 1}, + "poststop": structs.TaskState{State: "pending", Restarts: 0}, + }, + }, + { + name: "batch job restart entire allocation", + taskDefs: []mock.LifecycleTaskDef{ + {Name: "main", RunFor: "100s", ExitCode: 1, Hook: "", IsSidecar: false}, + {Name: "prestart-oneshot", RunFor: "1s", ExitCode: 0, Hook: "prestart", IsSidecar: false}, + {Name: "prestart-sidecar", RunFor: "100s", ExitCode: 0, Hook: "prestart", IsSidecar: true}, + {Name: "poststart-oneshot", RunFor: "1s", ExitCode: 0, Hook: "poststart", IsSidecar: false}, + {Name: "poststart-sidecar", RunFor: "100s", ExitCode: 0, Hook: "poststart", IsSidecar: true}, + {Name: "poststop", RunFor: "1s", ExitCode: 0, Hook: "poststop", IsSidecar: false}, + }, + isBatch: true, + action: func(ar *allocRunner, alloc *structs.Allocation) error { + return ar.RestartAll(ev) + }, + expectedAfter: map[string]structs.TaskState{ + "main": structs.TaskState{State: "running", Restarts: 1}, + "prestart-oneshot": structs.TaskState{State: "dead", Restarts: 1}, + "prestart-sidecar": structs.TaskState{State: "running", Restarts: 1}, + "poststart-oneshot": structs.TaskState{State: "dead", Restarts: 1}, + "poststart-sidecar": structs.TaskState{State: "running", Restarts: 1}, + "poststop": structs.TaskState{State: "pending", Restarts: 0}, + }, + }, + { + name: "batch job restart only running tasks ", + taskDefs: []mock.LifecycleTaskDef{ + {Name: "main", RunFor: "100s", ExitCode: 1, Hook: "", IsSidecar: false}, + {Name: "prestart-oneshot", RunFor: "1s", ExitCode: 0, Hook: "prestart", IsSidecar: false}, + {Name: "prestart-sidecar", RunFor: "100s", ExitCode: 0, Hook: "prestart", IsSidecar: true}, + {Name: "poststart-oneshot", RunFor: "1s", ExitCode: 0, Hook: "poststart", IsSidecar: false}, + {Name: "poststart-sidecar", RunFor: "100s", ExitCode: 0, Hook: "poststart", IsSidecar: true}, + {Name: "poststop", RunFor: "1s", ExitCode: 0, Hook: "poststop", IsSidecar: false}, + }, + isBatch: true, + action: func(ar *allocRunner, alloc *structs.Allocation) error { + return ar.RestartRunning(ev) + }, + expectedAfter: map[string]structs.TaskState{ + "main": structs.TaskState{State: "running", Restarts: 1}, + "prestart-oneshot": structs.TaskState{State: "dead", Restarts: 0}, + "prestart-sidecar": structs.TaskState{State: "running", Restarts: 1}, + "poststart-oneshot": structs.TaskState{State: "dead", Restarts: 0}, + "poststart-sidecar": structs.TaskState{State: "running", Restarts: 1}, + "poststop": structs.TaskState{State: "pending", Restarts: 0}, + }, + }, + { + name: "restart entire allocation with leader", + hasLeader: true, + action: func(ar *allocRunner, alloc *structs.Allocation) error { + return ar.RestartAll(ev) + }, + expectedAfter: map[string]structs.TaskState{ + "main": structs.TaskState{State: "running", Restarts: 1}, + "prestart-oneshot": structs.TaskState{State: "dead", Restarts: 1}, + "prestart-sidecar": structs.TaskState{State: "running", Restarts: 1}, + "poststart-oneshot": structs.TaskState{State: "dead", Restarts: 1}, + "poststart-sidecar": structs.TaskState{State: "running", Restarts: 1}, + "poststop": structs.TaskState{State: "pending", Restarts: 0}, + }, + }, + { + name: "stop from server", + action: func(ar *allocRunner, alloc *structs.Allocation) error { + stopAlloc := alloc.Copy() + stopAlloc.DesiredStatus = structs.AllocDesiredStatusStop + ar.Update(stopAlloc) + return nil + }, + expectedAfter: map[string]structs.TaskState{ + "main": structs.TaskState{State: "dead", Restarts: 0}, + "prestart-oneshot": structs.TaskState{State: "dead", Restarts: 0}, + "prestart-sidecar": structs.TaskState{State: "dead", Restarts: 0}, + "poststart-oneshot": structs.TaskState{State: "dead", Restarts: 0}, + "poststart-sidecar": structs.TaskState{State: "dead", Restarts: 0}, + "poststop": structs.TaskState{State: "dead", Restarts: 0}, + }, + }, + { + name: "restart main task", + action: func(ar *allocRunner, alloc *structs.Allocation) error { + return ar.RestartTask("main", ev) + }, + expectedAfter: map[string]structs.TaskState{ + "main": structs.TaskState{State: "running", Restarts: 1}, + "prestart-oneshot": structs.TaskState{State: "dead", Restarts: 0}, + "prestart-sidecar": structs.TaskState{State: "running", Restarts: 0}, + "poststart-oneshot": structs.TaskState{State: "dead", Restarts: 0}, + "poststart-sidecar": structs.TaskState{State: "running", Restarts: 0}, + "poststop": structs.TaskState{State: "pending", Restarts: 0}, + }, + }, + { + name: "restart leader main task", + hasLeader: true, + action: func(ar *allocRunner, alloc *structs.Allocation) error { + return ar.RestartTask("main", ev) + }, + expectedAfter: map[string]structs.TaskState{ + "main": structs.TaskState{State: "running", Restarts: 1}, + "prestart-oneshot": structs.TaskState{State: "dead", Restarts: 0}, + "prestart-sidecar": structs.TaskState{State: "running", Restarts: 0}, + "poststart-oneshot": structs.TaskState{State: "dead", Restarts: 0}, + "poststart-sidecar": structs.TaskState{State: "running", Restarts: 0}, + "poststop": structs.TaskState{State: "pending", Restarts: 0}, + }, + }, + { + name: "main task fails and restarts once", + taskDefs: []mock.LifecycleTaskDef{ + {Name: "main", RunFor: "2s", ExitCode: 1, Hook: "", IsSidecar: false}, + {Name: "prestart-oneshot", RunFor: "1s", ExitCode: 0, Hook: "prestart", IsSidecar: false}, + {Name: "prestart-sidecar", RunFor: "100s", ExitCode: 0, Hook: "prestart", IsSidecar: true}, + {Name: "poststart-oneshot", RunFor: "1s", ExitCode: 0, Hook: "poststart", IsSidecar: false}, + {Name: "poststart-sidecar", RunFor: "100s", ExitCode: 0, Hook: "poststart", IsSidecar: true}, + {Name: "poststop", RunFor: "1s", ExitCode: 0, Hook: "poststop", IsSidecar: false}, + }, + action: func(ar *allocRunner, alloc *structs.Allocation) error { + time.Sleep(3 * time.Second) // make sure main task has exited + return nil + }, + expectedAfter: map[string]structs.TaskState{ + "main": structs.TaskState{State: "dead", Restarts: 1}, + "prestart-oneshot": structs.TaskState{State: "dead", Restarts: 0}, + "prestart-sidecar": structs.TaskState{State: "dead", Restarts: 0}, + "poststart-oneshot": structs.TaskState{State: "dead", Restarts: 0}, + "poststart-sidecar": structs.TaskState{State: "dead", Restarts: 0}, + "poststop": structs.TaskState{State: "dead", Restarts: 0}, + }, + }, + { + name: "leader main task fails and restarts once", + taskDefs: []mock.LifecycleTaskDef{ + {Name: "main", RunFor: "2s", ExitCode: 1, Hook: "", IsSidecar: false}, + {Name: "prestart-oneshot", RunFor: "1s", ExitCode: 0, Hook: "prestart", IsSidecar: false}, + {Name: "prestart-sidecar", RunFor: "100s", ExitCode: 0, Hook: "prestart", IsSidecar: true}, + {Name: "poststart-oneshot", RunFor: "1s", ExitCode: 0, Hook: "poststart", IsSidecar: false}, + {Name: "poststart-sidecar", RunFor: "100s", ExitCode: 0, Hook: "poststart", IsSidecar: true}, + {Name: "poststop", RunFor: "1s", ExitCode: 0, Hook: "poststop", IsSidecar: false}, + }, + hasLeader: true, + action: func(ar *allocRunner, alloc *structs.Allocation) error { + time.Sleep(3 * time.Second) // make sure main task has exited + return nil + }, + expectedAfter: map[string]structs.TaskState{ + "main": structs.TaskState{State: "dead", Restarts: 1}, + "prestart-oneshot": structs.TaskState{State: "dead", Restarts: 0}, + "prestart-sidecar": structs.TaskState{State: "dead", Restarts: 0}, + "poststart-oneshot": structs.TaskState{State: "dead", Restarts: 0}, + "poststart-sidecar": structs.TaskState{State: "dead", Restarts: 0}, + "poststop": structs.TaskState{State: "dead", Restarts: 0}, + }, + }, + { + name: "main stopped unexpectedly and restarts once", + taskDefs: []mock.LifecycleTaskDef{ + {Name: "main", RunFor: "2s", ExitCode: 0, Hook: "", IsSidecar: false}, + {Name: "prestart-oneshot", RunFor: "1s", ExitCode: 0, Hook: "prestart", IsSidecar: false}, + {Name: "prestart-sidecar", RunFor: "100s", ExitCode: 0, Hook: "prestart", IsSidecar: true}, + {Name: "poststart-oneshot", RunFor: "1s", ExitCode: 0, Hook: "poststart", IsSidecar: false}, + {Name: "poststart-sidecar", RunFor: "100s", ExitCode: 0, Hook: "poststart", IsSidecar: true}, + {Name: "poststop", RunFor: "1s", ExitCode: 0, Hook: "poststop", IsSidecar: false}, + }, + action: func(ar *allocRunner, alloc *structs.Allocation) error { + time.Sleep(3 * time.Second) // make sure main task has exited + return nil + }, + expectedAfter: map[string]structs.TaskState{ + "main": structs.TaskState{State: "dead", Restarts: 1}, + "prestart-oneshot": structs.TaskState{State: "dead", Restarts: 0}, + "prestart-sidecar": structs.TaskState{State: "dead", Restarts: 0}, + "poststart-oneshot": structs.TaskState{State: "dead", Restarts: 0}, + "poststart-sidecar": structs.TaskState{State: "dead", Restarts: 0}, + "poststop": structs.TaskState{State: "dead", Restarts: 0}, + }, + }, + { + name: "leader main stopped unexpectedly and restarts once", + taskDefs: []mock.LifecycleTaskDef{ + {Name: "main", RunFor: "2s", ExitCode: 0, Hook: "", IsSidecar: false}, + {Name: "prestart-oneshot", RunFor: "1s", ExitCode: 0, Hook: "prestart", IsSidecar: false}, + {Name: "prestart-sidecar", RunFor: "100s", ExitCode: 0, Hook: "prestart", IsSidecar: true}, + {Name: "poststart-oneshot", RunFor: "1s", ExitCode: 0, Hook: "poststart", IsSidecar: false}, + {Name: "poststart-sidecar", RunFor: "100s", ExitCode: 0, Hook: "poststart", IsSidecar: true}, + {Name: "poststop", RunFor: "1s", ExitCode: 0, Hook: "poststop", IsSidecar: false}, + }, + action: func(ar *allocRunner, alloc *structs.Allocation) error { + time.Sleep(3 * time.Second) // make sure main task has exited + return nil + }, + expectedAfter: map[string]structs.TaskState{ + "main": structs.TaskState{State: "dead", Restarts: 1}, + "prestart-oneshot": structs.TaskState{State: "dead", Restarts: 0}, + "prestart-sidecar": structs.TaskState{State: "dead", Restarts: 0}, + "poststart-oneshot": structs.TaskState{State: "dead", Restarts: 0}, + "poststart-sidecar": structs.TaskState{State: "dead", Restarts: 0}, + "poststop": structs.TaskState{State: "dead", Restarts: 0}, + }, + }, + { + name: "failed main task cannot be restarted", + taskDefs: []mock.LifecycleTaskDef{ + {Name: "main", RunFor: "2s", ExitCode: 1, Hook: "", IsSidecar: false}, + {Name: "prestart-oneshot", RunFor: "1s", ExitCode: 0, Hook: "prestart", IsSidecar: false}, + {Name: "prestart-sidecar", RunFor: "100s", ExitCode: 0, Hook: "prestart", IsSidecar: true}, + {Name: "poststart-oneshot", RunFor: "1s", ExitCode: 0, Hook: "poststart", IsSidecar: false}, + {Name: "poststart-sidecar", RunFor: "100s", ExitCode: 0, Hook: "poststart", IsSidecar: true}, + {Name: "poststop", RunFor: "1s", ExitCode: 0, Hook: "poststop", IsSidecar: false}, + }, + action: func(ar *allocRunner, alloc *structs.Allocation) error { + // make sure main task has had a chance to restart once on its + // own and fail again before we try to manually restart it + time.Sleep(5 * time.Second) + return ar.RestartTask("main", ev) + }, + expectedErr: "Task not running", + expectedAfter: map[string]structs.TaskState{ + "main": structs.TaskState{State: "dead", Restarts: 1}, + "prestart-oneshot": structs.TaskState{State: "dead", Restarts: 0}, + "prestart-sidecar": structs.TaskState{State: "dead", Restarts: 0}, + "poststart-oneshot": structs.TaskState{State: "dead", Restarts: 0}, + "poststart-sidecar": structs.TaskState{State: "dead", Restarts: 0}, + "poststop": structs.TaskState{State: "dead", Restarts: 0}, + }, + }, + { + name: "restart prestart-sidecar task", + action: func(ar *allocRunner, alloc *structs.Allocation) error { + return ar.RestartTask("prestart-sidecar", ev) + }, + expectedAfter: map[string]structs.TaskState{ + "main": structs.TaskState{State: "running", Restarts: 0}, + "prestart-oneshot": structs.TaskState{State: "dead", Restarts: 0}, + "prestart-sidecar": structs.TaskState{State: "running", Restarts: 1}, + "poststart-oneshot": structs.TaskState{State: "dead", Restarts: 0}, + "poststart-sidecar": structs.TaskState{State: "running", Restarts: 0}, + "poststop": structs.TaskState{State: "pending", Restarts: 0}, + }, + }, + { + name: "restart poststart-sidecar task", + action: func(ar *allocRunner, alloc *structs.Allocation) error { + return ar.RestartTask("poststart-sidecar", ev) + }, + expectedAfter: map[string]structs.TaskState{ + "main": structs.TaskState{State: "running", Restarts: 0}, + "prestart-oneshot": structs.TaskState{State: "dead", Restarts: 0}, + "prestart-sidecar": structs.TaskState{State: "running", Restarts: 0}, + "poststart-oneshot": structs.TaskState{State: "dead", Restarts: 0}, + "poststart-sidecar": structs.TaskState{State: "running", Restarts: 1}, + "poststop": structs.TaskState{State: "pending", Restarts: 0}, + }, + }, + } + + for _, tc := range testCases { + tc := tc + t.Run(tc.name, func(t *testing.T) { + ci.Parallel(t) + + alloc := alloc.Copy() + alloc.Job.TaskGroups[0].RestartPolicy = rp + if tc.taskDefs != nil { + alloc = mock.LifecycleAllocFromTasks(tc.taskDefs) + alloc.Job.Type = structs.JobTypeService + } + for _, task := range alloc.Job.TaskGroups[0].Tasks { + task.RestartPolicy = rp // tasks inherit the group policy + } + if tc.hasLeader { + for _, task := range alloc.Job.TaskGroups[0].Tasks { + if task.Name == "main" { + task.Leader = true + } + } + } + if tc.isBatch { + alloc.Job.Type = structs.JobTypeBatch + } + + conf, cleanup := testAllocRunnerConfig(t, alloc) + defer cleanup() + ar, err := NewAllocRunner(conf) + require.NoError(t, err) + defer destroy(ar) + go ar.Run() + + upd := conf.StateUpdater.(*MockStateUpdater) + + // assert our "before" states: + // - all one-shot tasks should be dead but not failed + // - all main tasks and sidecars should be running + // - no tasks should have restarted + testutil.WaitForResult(func() (bool, error) { + last := upd.Last() + if last == nil { + return false, fmt.Errorf("no update") + } + if last.ClientStatus != structs.AllocClientStatusRunning { + return false, fmt.Errorf( + "expected alloc to be running not %s", last.ClientStatus) + } + var errs *multierror.Error + + expectedBefore := map[string]string{ + "main": "running", + "prestart-oneshot": "dead", + "prestart-sidecar": "running", + "poststart-oneshot": "dead", + "poststart-sidecar": "running", + "poststop": "pending", + } + + for task, expected := range expectedBefore { + got, ok := last.TaskStates[task] + if !ok { + continue + } + if got.State != expected { + errs = multierror.Append(errs, fmt.Errorf( + "expected initial state of task %q to be %q not %q", + task, expected, got.State)) + } + if got.Restarts != 0 { + errs = multierror.Append(errs, fmt.Errorf( + "expected no initial restarts of task %q, not %q", + task, got.Restarts)) + } + if expected == "dead" && got.Failed { + errs = multierror.Append(errs, fmt.Errorf( + "expected ephemeral task %q to be dead but not failed", + task)) + } + + } + if errs.ErrorOrNil() != nil { + return false, errs.ErrorOrNil() + } + return true, nil + }, func(err error) { + require.NoError(t, err, "error waiting for initial state") + }) + + // perform the action + err = tc.action(ar, alloc.Copy()) + if tc.expectedErr != "" { + require.EqualError(t, err, tc.expectedErr) + } else { + require.NoError(t, err) + } + + // assert our "after" states + testutil.WaitForResult(func() (bool, error) { + last := upd.Last() + if last == nil { + return false, fmt.Errorf("no update") + } + var errs *multierror.Error + for task, expected := range tc.expectedAfter { + got, ok := last.TaskStates[task] + if !ok { + errs = multierror.Append(errs, fmt.Errorf( + "no final state found for task %q", task, + )) + } + if got.State != expected.State { + errs = multierror.Append(errs, fmt.Errorf( + "expected final state of task %q to be %q not %q", + task, expected.State, got.State)) + } + if expected.State == "dead" { + if got.FinishedAt.IsZero() || got.StartedAt.IsZero() { + errs = multierror.Append(errs, fmt.Errorf( + "expected final state of task %q to have start and finish time", task)) + } + if len(got.Events) < 2 { + errs = multierror.Append(errs, fmt.Errorf( + "expected final state of task %q to include at least 2 tasks", task)) + } + } + + if got.Restarts != expected.Restarts { + errs = multierror.Append(errs, fmt.Errorf( + "expected final restarts of task %q to be %v not %v", + task, expected.Restarts, got.Restarts)) + } + } + if errs.ErrorOrNil() != nil { + return false, errs.ErrorOrNil() + } + return true, nil + }, func(err error) { + require.NoError(t, err, "error waiting for final state") + }) + }) + } +} + func TestAllocRunner_TaskGroup_ShutdownDelay(t *testing.T) { ci.Parallel(t) @@ -1213,7 +1672,7 @@ func TestAllocRunner_MoveAllocDir(t *testing.T) { ar.Run() defer destroy(ar) - require.Equal(t, structs.AllocClientStatusComplete, ar.AllocState().ClientStatus) + WaitForClientState(t, ar, structs.AllocClientStatusComplete) // Step 2. Modify its directory task := alloc.Job.TaskGroups[0].Tasks[0] @@ -1241,7 +1700,7 @@ func TestAllocRunner_MoveAllocDir(t *testing.T) { ar2.Run() defer destroy(ar2) - require.Equal(t, structs.AllocClientStatusComplete, ar2.AllocState().ClientStatus) + WaitForClientState(t, ar, structs.AllocClientStatusComplete) // Ensure that data from ar was moved to ar2 dataFile = filepath.Join(ar2.allocDir.SharedDir, "data", "data_file") diff --git a/client/allocrunner/alloc_runner_unix_test.go b/client/allocrunner/alloc_runner_unix_test.go index ab3c777dd..085956910 100644 --- a/client/allocrunner/alloc_runner_unix_test.go +++ b/client/allocrunner/alloc_runner_unix_test.go @@ -207,18 +207,18 @@ func TestAllocRunner_Restore_CompletedBatch(t *testing.T) { go ar2.Run() defer destroy(ar2) - // AR waitCh must be closed even when task doesn't run again + // AR waitCh must be open as the task waits for a possible alloc restart. select { case <-ar2.WaitCh(): - case <-time.After(10 * time.Second): - require.Fail(t, "alloc.waitCh wasn't closed") + require.Fail(t, "alloc.waitCh was closed") + default: } - // TR waitCh must be closed too! + // TR waitCh must be open too! select { case <-ar2.tasks[task.Name].WaitCh(): - case <-time.After(10 * time.Second): - require.Fail(t, "tr.waitCh wasn't closed") + require.Fail(t, "tr.waitCh was closed") + default: } // Assert that events are unmodified, which they would if task re-run diff --git a/client/allocrunner/tasklifecycle/coordinator.go b/client/allocrunner/tasklifecycle/coordinator.go index 60b798ed5..90a8e7fc0 100644 --- a/client/allocrunner/tasklifecycle/coordinator.go +++ b/client/allocrunner/tasklifecycle/coordinator.go @@ -16,7 +16,7 @@ const ( coordinatorStatePrestart coordinatorStateMain coordinatorStatePoststart - coordinatorStateWaitMain + coordinatorStateWaitAlloc coordinatorStatePoststop ) @@ -30,8 +30,8 @@ func (s coordinatorState) String() string { return "main" case coordinatorStatePoststart: return "poststart" - case coordinatorStateWaitMain: - return "wait_main" + case coordinatorStateWaitAlloc: + return "wait_alloc" case coordinatorStatePoststop: return "poststart" } @@ -109,6 +109,15 @@ func NewCoordinator(logger hclog.Logger, tasks []*structs.Task, shutdownCh <-cha return c } +// Restart sets the Coordinator state back to "init" and is used to coordinate +// a full alloc restart. Since all tasks will run again they need to be pending +// before they are allowed to proceed. +func (c *Coordinator) Restart() { + c.currentStateLock.Lock() + defer c.currentStateLock.Unlock() + c.enterStateLocked(coordinatorStateInit) +} + // Restore is used to set the Coordinator FSM to the correct state when an // alloc is restored. Must be called before the allocrunner is running. func (c *Coordinator) Restore(states map[string]*structs.TaskState) { @@ -151,6 +160,13 @@ func (c *Coordinator) TaskStateUpdated(states map[string]*structs.TaskState) { // current internal state and the received states of the tasks. // The currentStateLock must be held before calling this method. func (c *Coordinator) nextStateLocked(states map[string]*structs.TaskState) coordinatorState { + + // coordinatorStatePoststop is the terminal state of the FSM, and can be + // reached at any time. + if c.isAllocDone(states) { + return coordinatorStatePoststop + } + switch c.currentState { case coordinatorStateInit: if !c.isInitDone(states) { @@ -174,11 +190,11 @@ func (c *Coordinator) nextStateLocked(states map[string]*structs.TaskState) coor if !c.isPoststartDone(states) { return coordinatorStatePoststart } - return coordinatorStateWaitMain + return coordinatorStateWaitAlloc - case coordinatorStateWaitMain: - if !c.isWaitMainDone(states) { - return coordinatorStateWaitMain + case coordinatorStateWaitAlloc: + if !c.isAllocDone(states) { + return coordinatorStateWaitAlloc } return coordinatorStatePoststop @@ -233,7 +249,7 @@ func (c *Coordinator) enterStateLocked(state coordinatorState) { c.allow(lifecycleStagePoststartEphemeral) c.allow(lifecycleStagePoststartSidecar) - case coordinatorStateWaitMain: + case coordinatorStateWaitAlloc: c.block(lifecycleStagePrestartEphemeral) c.block(lifecycleStagePoststartEphemeral) c.block(lifecycleStagePoststop) @@ -268,7 +284,7 @@ func (c *Coordinator) isInitDone(states map[string]*structs.TaskState) bool { // isPrestartDone returns true when the following conditions are met: // - there is at least one prestart task -// - all ephemeral prestart tasks are in the "dead" state. +// - all ephemeral prestart tasks are successful. // - no ephemeral prestart task has failed. // - all prestart sidecar tasks are running. func (c *Coordinator) isPrestartDone(states map[string]*structs.TaskState) bool { @@ -277,7 +293,7 @@ func (c *Coordinator) isPrestartDone(states map[string]*structs.TaskState) bool } for _, task := range c.tasksByLifecycle[lifecycleStagePrestartEphemeral] { - if states[task].State != structs.TaskStateDead || states[task].Failed { + if !states[task].Successful() { return false } } @@ -321,9 +337,9 @@ func (c *Coordinator) isPoststartDone(states map[string]*structs.TaskState) bool return true } -// isWaitMainDone returns true when the following conditions are met: -// - all tasks that are not poststop are in the "dead" state. -func (c *Coordinator) isWaitMainDone(states map[string]*structs.TaskState) bool { +// isAllocDone returns true when the following conditions are met: +// - all non-poststop tasks are in the "dead" state. +func (c *Coordinator) isAllocDone(states map[string]*structs.TaskState) bool { for lifecycle, tasks := range c.tasksByLifecycle { if lifecycle == lifecycleStagePoststop { continue diff --git a/client/allocrunner/tasklifecycle/coordinator_test.go b/client/allocrunner/tasklifecycle/coordinator_test.go index 17e42e96e..3f86dcc99 100644 --- a/client/allocrunner/tasklifecycle/coordinator_test.go +++ b/client/allocrunner/tasklifecycle/coordinator_test.go @@ -58,6 +58,9 @@ func TestCoordinator_PrestartRunsBeforeMain(t *testing.T) { sideTask := tasks[1] initTask := tasks[2] + // Only use the tasks that we care about. + tasks = []*structs.Task{mainTask, sideTask, initTask} + shutdownCh := make(chan struct{}) defer close(shutdownCh) coord := NewCoordinator(logger, tasks, shutdownCh) @@ -161,6 +164,9 @@ func TestCoordinator_MainRunsAfterManyInitTasks(t *testing.T) { init1Task := tasks[1] init2Task := tasks[2] + // Only use the tasks that we care about. + tasks = []*structs.Task{mainTask, init1Task, init2Task} + shutdownCh := make(chan struct{}) defer close(shutdownCh) coord := NewCoordinator(logger, tasks, shutdownCh) @@ -227,6 +233,9 @@ func TestCoordinator_FailedInitTask(t *testing.T) { init1Task := tasks[1] init2Task := tasks[2] + // Only use the tasks that we care about. + tasks = []*structs.Task{mainTask, init1Task, init2Task} + shutdownCh := make(chan struct{}) defer close(shutdownCh) coord := NewCoordinator(logger, tasks, shutdownCh) @@ -292,6 +301,9 @@ func TestCoordinator_SidecarNeverStarts(t *testing.T) { sideTask := tasks[1] initTask := tasks[2] + // Only use the tasks that we care about. + tasks = []*structs.Task{mainTask, sideTask, initTask} + shutdownCh := make(chan struct{}) defer close(shutdownCh) coord := NewCoordinator(logger, tasks, shutdownCh) @@ -356,6 +368,9 @@ func TestCoordinator_PoststartStartsAfterMain(t *testing.T) { sideTask := tasks[1] postTask := tasks[2] + // Only use the tasks that we care about. + tasks = []*structs.Task{mainTask, sideTask, postTask} + // Make the the third task is a poststart hook postTask.Lifecycle.Hook = structs.TaskLifecycleHookPoststart diff --git a/client/allocrunner/taskrunner/lifecycle.go b/client/allocrunner/taskrunner/lifecycle.go index b812156a8..90c3d3718 100644 --- a/client/allocrunner/taskrunner/lifecycle.go +++ b/client/allocrunner/taskrunner/lifecycle.go @@ -6,28 +6,103 @@ import ( "github.com/hashicorp/nomad/nomad/structs" ) -// Restart a task. Returns immediately if no task is running. Blocks until -// existing task exits or passed-in context is canceled. +// Restart restarts a task that is already running. Returns an error if the +// task is not running. Blocks until existing task exits or passed-in context +// is canceled. func (tr *TaskRunner) Restart(ctx context.Context, event *structs.TaskEvent, failure bool) error { - tr.logger.Trace("Restart requested", "failure", failure) + tr.logger.Trace("Restart requested", "failure", failure, "event", event.GoString()) - // Grab the handle - handle := tr.getDriverHandle() + taskState := tr.TaskState() + if taskState == nil { + return ErrTaskNotRunning + } - // Check it is running - if handle == nil { + switch taskState.State { + case structs.TaskStatePending, structs.TaskStateDead: + return ErrTaskNotRunning + } + + return tr.restartImpl(ctx, event, failure) +} + +// ForceRestart restarts a task that is already running or reruns it if dead. +// Returns an error if the task is not able to rerun. Blocks until existing +// task exits or passed-in context is canceled. +// +// Callers must restart the AllocRuner taskCoordinator beforehand to make sure +// the task will be able to run again. +func (tr *TaskRunner) ForceRestart(ctx context.Context, event *structs.TaskEvent, failure bool) error { + tr.logger.Trace("Force restart requested", "failure", failure, "event", event.GoString()) + + taskState := tr.TaskState() + if taskState == nil { + return ErrTaskNotRunning + } + + tr.stateLock.Lock() + localState := tr.localState.Copy() + tr.stateLock.Unlock() + + if localState == nil { + return ErrTaskNotRunning + } + + switch taskState.State { + case structs.TaskStatePending: + return ErrTaskNotRunning + + case structs.TaskStateDead: + // Tasks that are in the "dead" state are only allowed to restart if + // their Run() method is still active. + if localState.RunComplete { + return ErrTaskNotRunning + } + } + + return tr.restartImpl(ctx, event, failure) +} + +// restartImpl implements to task restart process. +// +// It should never be called directly as it doesn't verify if the task state +// allows for a restart. +func (tr *TaskRunner) restartImpl(ctx context.Context, event *structs.TaskEvent, failure bool) error { + + // Check if the task is able to restart based on its state and the type of + // restart event that was triggered. + taskState := tr.TaskState() + if taskState == nil { return ErrTaskNotRunning } // Emit the event since it may take a long time to kill tr.EmitEvent(event) - // Run the pre-kill hooks prior to restarting the task - tr.preKill() - // Tell the restart tracker that a restart triggered the exit tr.restartTracker.SetRestartTriggered(failure) + // Signal a restart to unblock tasks that are in the "dead" state, but + // don't block since the channel is buffered. Only one signal is enough to + // notify the tr.Run() loop. + // The channel must be signaled after SetRestartTriggered is called so the + // tr.Run() loop runs again. + if taskState.State == structs.TaskStateDead { + select { + case tr.restartCh <- struct{}{}: + default: + } + } + + // Grab the handle to see if the task is still running and needs to be + // killed. + handle := tr.getDriverHandle() + if handle == nil { + return nil + } + + // Run the pre-kill hooks prior to restarting the task + tr.preKill() + // Grab a handle to the wait channel that will timeout with context cancelation // _before_ killing the task. waitCh, err := handle.WaitCh(ctx) @@ -69,14 +144,17 @@ func (tr *TaskRunner) Signal(event *structs.TaskEvent, s string) error { // Kill a task. Blocks until task exits or context is canceled. State is set to // dead. func (tr *TaskRunner) Kill(ctx context.Context, event *structs.TaskEvent) error { - tr.logger.Trace("Kill requested", "event_type", event.Type, "event_reason", event.KillReason) + tr.logger.Trace("Kill requested") // Cancel the task runner to break out of restart delay or the main run // loop. tr.killCtxCancel() // Emit kill event - tr.EmitEvent(event) + if event != nil { + tr.logger.Trace("Kill event", "event_type", event.Type, "event_reason", event.KillReason) + tr.EmitEvent(event) + } select { case <-tr.WaitCh(): diff --git a/client/allocrunner/taskrunner/sids_hook_test.go b/client/allocrunner/taskrunner/sids_hook_test.go index d5951ed01..c8f765741 100644 --- a/client/allocrunner/taskrunner/sids_hook_test.go +++ b/client/allocrunner/taskrunner/sids_hook_test.go @@ -22,7 +22,6 @@ import ( "github.com/hashicorp/nomad/helper/uuid" "github.com/hashicorp/nomad/nomad/mock" "github.com/hashicorp/nomad/nomad/structs" - "github.com/hashicorp/nomad/testutil" "github.com/stretchr/testify/require" "golang.org/x/sys/unix" ) @@ -297,11 +296,7 @@ func TestTaskRunner_DeriveSIToken_UnWritableTokenFile(t *testing.T) { go tr.Run() // wait for task runner to finish running - select { - case <-tr.WaitCh(): - case <-time.After(time.Duration(testutil.TestMultiplier()*15) * time.Second): - r.Fail("timed out waiting for task runner") - } + testWaitForTaskToDie(t, tr) // assert task exited un-successfully finalState := tr.TaskState() diff --git a/client/allocrunner/taskrunner/state/state.go b/client/allocrunner/taskrunner/state/state.go index 5f83c476c..a4bc26b0f 100644 --- a/client/allocrunner/taskrunner/state/state.go +++ b/client/allocrunner/taskrunner/state/state.go @@ -16,6 +16,11 @@ type LocalState struct { // TaskHandle is the handle used to reattach to the task during recovery TaskHandle *drivers.TaskHandle + + // RunComplete is set to true when the TaskRunner.Run() method finishes. + // It is used to distinguish between a dead task that could be restarted + // and one that will never run again. + RunComplete bool } func NewLocalState() *LocalState { @@ -52,6 +57,7 @@ func (s *LocalState) Copy() *LocalState { Hooks: make(map[string]*HookState, len(s.Hooks)), DriverNetwork: s.DriverNetwork.Copy(), TaskHandle: s.TaskHandle.Copy(), + RunComplete: s.RunComplete, } // Copy the hook state diff --git a/client/allocrunner/taskrunner/task_runner.go b/client/allocrunner/taskrunner/task_runner.go index ecfb2e905..445cf0440 100644 --- a/client/allocrunner/taskrunner/task_runner.go +++ b/client/allocrunner/taskrunner/task_runner.go @@ -62,6 +62,11 @@ const ( // updates have come in since the last one was handled, we only need to // handle the last one. triggerUpdateChCap = 1 + + // restartChCap is the capacity for the restartCh used for triggering task + // restarts. It should be exactly 1 as even if multiple restarts have come + // we only need to handle the last one. + restartChCap = 1 ) type TaskRunner struct { @@ -95,6 +100,9 @@ type TaskRunner struct { // stateDB is for persisting localState and taskState stateDB cstate.StateDB + // restartCh is used to signal that the task should restart. + restartCh chan struct{} + // shutdownCtx is used to exit the TaskRunner *without* affecting task state. shutdownCtx context.Context @@ -367,6 +375,7 @@ func NewTaskRunner(config *Config) (*TaskRunner, error) { shutdownCtx: trCtx, shutdownCtxCancel: trCancel, triggerUpdateCh: make(chan struct{}, triggerUpdateChCap), + restartCh: make(chan struct{}, restartChCap), waitCh: make(chan struct{}), csiManager: config.CSIManager, cpusetCgroupPathGetter: config.CpusetCgroupPathGetter, @@ -506,20 +515,25 @@ func (tr *TaskRunner) Run() { tr.stateLock.RLock() dead := tr.state.State == structs.TaskStateDead + runComplete := tr.localState.RunComplete tr.stateLock.RUnlock() - // if restoring a dead task, ensure that task is cleared and all post hooks - // are called without additional state updates + // If restoring a dead task, ensure the task is cleared and, if the local + // state indicates that the previous Run() call is complete, execute all + // post stop hooks and exit early, otherwise proceed until the + // ALLOC_RESTART loop skipping MAIN since the task is dead. if dead { // do cleanup functions without emitting any additional events/work // to handle cases where we restored a dead task where client terminated // after task finished before completing post-run actions. tr.clearDriverHandle() tr.stateUpdater.TaskStateUpdated() - if err := tr.stop(); err != nil { - tr.logger.Error("stop failed on terminal task", "error", err) + if runComplete { + if err := tr.stop(); err != nil { + tr.logger.Error("stop failed on terminal task", "error", err) + } + return } - return } // Updates are handled asynchronously with the other hooks but each @@ -544,27 +558,24 @@ func (tr *TaskRunner) Run() { // Set the initial task state. tr.stateUpdater.TaskStateUpdated() - select { - case <-tr.startConditionMetCh: - tr.logger.Debug("lifecycle start condition has been met, proceeding") - // yay proceed - case <-tr.killCtx.Done(): - case <-tr.shutdownCtx.Done(): - return - } - timer, stop := helper.NewSafeTimer(0) // timer duration calculated JIT defer stop() MAIN: for !tr.shouldShutdown() { + if dead { + break + } + select { case <-tr.killCtx.Done(): break MAIN case <-tr.shutdownCtx.Done(): // TaskRunner was told to exit immediately return - default: + case <-tr.startConditionMetCh: + tr.logger.Debug("lifecycle start condition has been met, proceeding") + // yay proceed } // Run the prestart hooks @@ -674,6 +685,38 @@ MAIN: // Mark the task as dead tr.UpdateState(structs.TaskStateDead, nil) + // Wait here in case the allocation is restarted. Poststop tasks will never + // run again so skip them to avoid blocking forever. + if !tr.Task().IsPoststop() { + ALLOC_RESTART: + // Run in a loop to handle cases where restartCh is triggered but the + // task runner doesn't need to restart. + for { + select { + case <-tr.killCtx.Done(): + break ALLOC_RESTART + case <-tr.shutdownCtx.Done(): + return + case <-tr.restartCh: + // Restart without delay since the task is not running anymore. + restart, _ := tr.shouldRestart() + if restart { + // Set runner as not dead to allow the MAIN loop to run. + dead = false + goto MAIN + } + } + } + } + + tr.stateLock.Lock() + tr.localState.RunComplete = true + err := tr.stateDB.PutTaskRunnerLocalState(tr.allocID, tr.taskName, tr.localState) + if err != nil { + tr.logger.Warn("error persisting task state on run loop exit", "error", err) + } + tr.stateLock.Unlock() + // Run the stop hooks if err := tr.stop(); err != nil { tr.logger.Error("stop failed", "error", err) @@ -1200,8 +1243,10 @@ func (tr *TaskRunner) UpdateState(state string, event *structs.TaskEvent) { tr.stateLock.Lock() defer tr.stateLock.Unlock() + tr.logger.Trace("setting task state", "state", state) + if event != nil { - tr.logger.Trace("setting task state", "state", state, "event", event.Type) + tr.logger.Trace("appending task event", "state", state, "event", event.Type) // Append the event tr.appendEvent(event) diff --git a/client/allocrunner/taskrunner/task_runner_test.go b/client/allocrunner/taskrunner/task_runner_test.go index 85de7b2eb..6c9eda48f 100644 --- a/client/allocrunner/taskrunner/task_runner_test.go +++ b/client/allocrunner/taskrunner/task_runner_test.go @@ -335,7 +335,7 @@ func TestTaskRunner_Restore_Running(t *testing.T) { defer newTR.Kill(context.Background(), structs.NewTaskEvent("cleanup")) // Wait for new task runner to exit when the process does - <-newTR.WaitCh() + testWaitForTaskToDie(t, newTR) // Assert that the process was only started once started := 0 @@ -349,6 +349,87 @@ func TestTaskRunner_Restore_Running(t *testing.T) { assert.Equal(t, 1, started) } +// TestTaskRunner_Restore_Dead asserts that restoring a dead task will place it +// back in the correct state. If the task was waiting for an alloc restart it +// must be able to be restarted after restore, otherwise a restart must fail. +func TestTaskRunner_Restore_Dead(t *testing.T) { + ci.Parallel(t) + + alloc := mock.BatchAlloc() + alloc.Job.TaskGroups[0].Count = 1 + task := alloc.Job.TaskGroups[0].Tasks[0] + task.Driver = "mock_driver" + task.Config = map[string]interface{}{ + "run_for": "2s", + } + conf, cleanup := testTaskRunnerConfig(t, alloc, task.Name) + conf.StateDB = cstate.NewMemDB(conf.Logger) // "persist" state between task runners + defer cleanup() + + // Run the first TaskRunner + origTR, err := NewTaskRunner(conf) + require.NoError(t, err) + go origTR.Run() + defer origTR.Kill(context.Background(), structs.NewTaskEvent("cleanup")) + + // Wait for it to be dead + testWaitForTaskToDie(t, origTR) + + // Cause TR to exit without shutting down task + origTR.Shutdown() + + // Start a new TaskRunner and do the Restore + newTR, err := NewTaskRunner(conf) + require.NoError(t, err) + require.NoError(t, newTR.Restore()) + + go newTR.Run() + defer newTR.Kill(context.Background(), structs.NewTaskEvent("cleanup")) + + // Verify that the TaskRunner is still active since it was recovered after + // a forced shutdown. + select { + case <-newTR.WaitCh(): + require.Fail(t, "WaitCh is not blocking") + default: + } + + // Verify that we can restart task. + // Retry a few times as the newTR.Run() may not have started yet. + testutil.WaitForResult(func() (bool, error) { + ev := &structs.TaskEvent{Type: structs.TaskRestartSignal} + err = newTR.ForceRestart(context.Background(), ev, false) + return err == nil, err + }, func(err error) { + require.NoError(t, err) + }) + testWaitForTaskToStart(t, newTR) + + // Kill task to verify that it's restored as dead and not able to restart. + newTR.Kill(context.Background(), nil) + testutil.WaitForResult(func() (bool, error) { + select { + case <-newTR.WaitCh(): + return true, nil + default: + return false, fmt.Errorf("task still running") + } + }, func(err error) { + require.NoError(t, err) + }) + + newTR2, err := NewTaskRunner(conf) + require.NoError(t, err) + require.NoError(t, newTR2.Restore()) + + go newTR2.Run() + defer newTR2.Kill(context.Background(), structs.NewTaskEvent("cleanup")) + + ev := &structs.TaskEvent{Type: structs.TaskRestartSignal} + err = newTR2.ForceRestart(context.Background(), ev, false) + require.Equal(t, err, ErrTaskNotRunning) +} + // setupRestoreFailureTest starts a service, shuts down the task runner, and // kills the task before restarting a new TaskRunner. The new TaskRunner is // returned once it is running and waiting in pending along with a cleanup @@ -603,11 +684,7 @@ func TestTaskRunner_TaskEnv_Interpolated(t *testing.T) { defer cleanup() // Wait for task to complete - select { - case <-tr.WaitCh(): - case <-time.After(3 * time.Second): - require.Fail("timeout waiting for task to exit") - } + testWaitForTaskToDie(t, tr) // Get the mock driver plugin driverPlugin, err := conf.DriverManager.Dispense(mockdriver.PluginID.Name) @@ -654,7 +731,9 @@ func TestTaskRunner_TaskEnv_Chroot(t *testing.T) { go tr.Run() defer tr.Kill(context.Background(), structs.NewTaskEvent("cleanup")) - // Wait for task to exit + // Wait for task to exit and kill the task runner to run the stop hooks. + testWaitForTaskToDie(t, tr) + tr.Kill(context.Background(), structs.NewTaskEvent("kill")) timeout := 15 * time.Second if testutil.IsCI() { timeout = 120 * time.Second @@ -703,7 +782,9 @@ func TestTaskRunner_TaskEnv_Image(t *testing.T) { tr, conf, cleanup := runTestTaskRunner(t, alloc, task.Name) defer cleanup() - // Wait for task to exit + // Wait for task to exit and kill task runner to run the stop hooks. + testWaitForTaskToDie(t, tr) + tr.Kill(context.Background(), structs.NewTaskEvent("kill")) select { case <-tr.WaitCh(): case <-time.After(15 * time.Second): @@ -750,7 +831,9 @@ func TestTaskRunner_TaskEnv_None(t *testing.T) { %s `, root, taskDir, taskDir, os.Getenv("PATH")) - // Wait for task to exit + // Wait for task to exit and kill the task runner to run the stop hooks. + testWaitForTaskToDie(t, tr) + tr.Kill(context.Background(), structs.NewTaskEvent("kill")) select { case <-tr.WaitCh(): case <-time.After(15 * time.Second): @@ -818,10 +901,7 @@ func TestTaskRunner_DevicePropogation(t *testing.T) { defer tr.Kill(context.Background(), structs.NewTaskEvent("cleanup")) // Wait for task to complete - select { - case <-tr.WaitCh(): - case <-time.After(3 * time.Second): - } + testWaitForTaskToDie(t, tr) // Get the mock driver plugin driverPlugin, err := conf.DriverManager.Dispense(mockdriver.PluginID.Name) @@ -1328,11 +1408,7 @@ func TestTaskRunner_CheckWatcher_Restart(t *testing.T) { // Wait until the task exits. Don't simply wait for it to run as it may // get restarted and terminated before the test is able to observe it // running. - select { - case <-tr.WaitCh(): - case <-time.After(time.Duration(testutil.TestMultiplier()*15) * time.Second): - require.Fail(t, "timeout") - } + testWaitForTaskToDie(t, tr) state := tr.TaskState() actualEvents := make([]string, len(state.Events)) @@ -1421,11 +1497,7 @@ func TestTaskRunner_BlockForSIDSToken(t *testing.T) { // task runner should exit now that it has been unblocked and it is a batch // job with a zero sleep time - select { - case <-tr.WaitCh(): - case <-time.After(15 * time.Second * time.Duration(testutil.TestMultiplier())): - r.Fail("timed out waiting for batch task to exist") - } + testWaitForTaskToDie(t, tr) // assert task exited successfully finalState := tr.TaskState() @@ -1478,11 +1550,7 @@ func TestTaskRunner_DeriveSIToken_Retry(t *testing.T) { go tr.Run() // assert task runner blocks on SI token - select { - case <-tr.WaitCh(): - case <-time.After(time.Duration(testutil.TestMultiplier()*15) * time.Second): - r.Fail("timed out waiting for task runner") - } + testWaitForTaskToDie(t, tr) // assert task exited successfully finalState := tr.TaskState() @@ -1598,11 +1666,7 @@ func TestTaskRunner_BlockForVaultToken(t *testing.T) { // TR should exit now that it's unblocked by vault as its a batch job // with 0 sleeping. - select { - case <-tr.WaitCh(): - case <-time.After(15 * time.Second * time.Duration(testutil.TestMultiplier())): - require.Fail(t, "timed out waiting for batch task to exit") - } + testWaitForTaskToDie(t, tr) // Assert task exited successfully finalState := tr.TaskState() @@ -1615,6 +1679,14 @@ func TestTaskRunner_BlockForVaultToken(t *testing.T) { require.NoError(t, err) require.Equal(t, token, string(data)) + // Kill task runner to trigger stop hooks + tr.Kill(context.Background(), structs.NewTaskEvent("kill")) + select { + case <-tr.WaitCh(): + case <-time.After(time.Duration(testutil.TestMultiplier()*15) * time.Second): + require.Fail(t, "timed out waiting for task runner to exit") + } + // Check the token was revoked testutil.WaitForResult(func() (bool, error) { if len(vaultClient.StoppedTokens()) != 1 { @@ -1661,17 +1733,21 @@ func TestTaskRunner_DeriveToken_Retry(t *testing.T) { defer tr.Kill(context.Background(), structs.NewTaskEvent("cleanup")) go tr.Run() - // Wait for TR to exit and check its state + // Wait for TR to die and check its state + testWaitForTaskToDie(t, tr) + + state := tr.TaskState() + require.Equal(t, structs.TaskStateDead, state.State) + require.False(t, state.Failed) + + // Kill task runner to trigger stop hooks + tr.Kill(context.Background(), structs.NewTaskEvent("kill")) select { case <-tr.WaitCh(): case <-time.After(time.Duration(testutil.TestMultiplier()*15) * time.Second): require.Fail(t, "timed out waiting for task runner to exit") } - state := tr.TaskState() - require.Equal(t, structs.TaskStateDead, state.State) - require.False(t, state.Failed) - require.Equal(t, 1, count) // Check that the token is on disk @@ -1771,11 +1847,7 @@ func TestTaskRunner_Download_ChrootExec(t *testing.T) { defer cleanup() // Wait for task to run and exit - select { - case <-tr.WaitCh(): - case <-time.After(time.Duration(testutil.TestMultiplier()*15) * time.Second): - require.Fail(t, "timed out waiting for task runner to exit") - } + testWaitForTaskToDie(t, tr) state := tr.TaskState() require.Equal(t, structs.TaskStateDead, state.State) @@ -1816,11 +1888,7 @@ func TestTaskRunner_Download_RawExec(t *testing.T) { defer cleanup() // Wait for task to run and exit - select { - case <-tr.WaitCh(): - case <-time.After(time.Duration(testutil.TestMultiplier()*15) * time.Second): - require.Fail(t, "timed out waiting for task runner to exit") - } + testWaitForTaskToDie(t, tr) state := tr.TaskState() require.Equal(t, structs.TaskStateDead, state.State) @@ -1851,11 +1919,7 @@ func TestTaskRunner_Download_List(t *testing.T) { defer cleanup() // Wait for task to run and exit - select { - case <-tr.WaitCh(): - case <-time.After(time.Duration(testutil.TestMultiplier()*15) * time.Second): - require.Fail(t, "timed out waiting for task runner to exit") - } + testWaitForTaskToDie(t, tr) state := tr.TaskState() require.Equal(t, structs.TaskStateDead, state.State) @@ -1902,11 +1966,7 @@ func TestTaskRunner_Download_Retries(t *testing.T) { tr, _, cleanup := runTestTaskRunner(t, alloc, task.Name) defer cleanup() - select { - case <-tr.WaitCh(): - case <-time.After(time.Duration(testutil.TestMultiplier()*15) * time.Second): - require.Fail(t, "timed out waiting for task to exit") - } + testWaitForTaskToDie(t, tr) state := tr.TaskState() require.Equal(t, structs.TaskStateDead, state.State) @@ -2100,6 +2160,8 @@ func TestTaskRunner_RestartSignalTask_NotRunning(t *testing.T) { case <-time.After(1 * time.Second): } + require.Equal(t, structs.TaskStatePending, tr.TaskState().State) + // Send a signal and restart err = tr.Signal(structs.NewTaskEvent("don't panic"), "QUIT") require.EqualError(t, err, ErrTaskNotRunning.Error()) @@ -2110,12 +2172,7 @@ func TestTaskRunner_RestartSignalTask_NotRunning(t *testing.T) { // Unblock and let it finish waitCh <- struct{}{} - - select { - case <-tr.WaitCh(): - case <-time.After(10 * time.Second): - require.Fail(t, "timed out waiting for task to complete") - } + testWaitForTaskToDie(t, tr) // Assert the task ran and never restarted state := tr.TaskState() @@ -2153,11 +2210,7 @@ func TestTaskRunner_Run_RecoverableStartError(t *testing.T) { tr, _, cleanup := runTestTaskRunner(t, alloc, task.Name) defer cleanup() - select { - case <-tr.WaitCh(): - case <-time.After(time.Duration(testutil.TestMultiplier()*15) * time.Second): - require.Fail(t, "timed out waiting for task to exit") - } + testWaitForTaskToDie(t, tr) state := tr.TaskState() require.Equal(t, structs.TaskStateDead, state.State) @@ -2202,11 +2255,7 @@ func TestTaskRunner_Template_Artifact(t *testing.T) { go tr.Run() // Wait for task to run and exit - select { - case <-tr.WaitCh(): - case <-time.After(15 * time.Second * time.Duration(testutil.TestMultiplier())): - require.Fail(t, "timed out waiting for task runner to exit") - } + testWaitForTaskToDie(t, tr) state := tr.TaskState() require.Equal(t, structs.TaskStateDead, state.State) @@ -2536,7 +2585,9 @@ func TestTaskRunner_UnregisterConsul_Retries(t *testing.T) { tr, err := NewTaskRunner(conf) require.NoError(t, err) defer tr.Kill(context.Background(), structs.NewTaskEvent("cleanup")) - tr.Run() + go tr.Run() + + testWaitForTaskToDie(t, tr) state := tr.TaskState() require.Equal(t, structs.TaskStateDead, state.State) @@ -2562,7 +2613,17 @@ func TestTaskRunner_UnregisterConsul_Retries(t *testing.T) { func testWaitForTaskToStart(t *testing.T, tr *TaskRunner) { testutil.WaitForResult(func() (bool, error) { ts := tr.TaskState() - return ts.State == structs.TaskStateRunning, fmt.Errorf("%v", ts.State) + return ts.State == structs.TaskStateRunning, fmt.Errorf("expected task to be running, got %v", ts.State) + }, func(err error) { + require.NoError(t, err) + }) +} + +// testWaitForTaskToDie waits for the task to die or fails the test +func testWaitForTaskToDie(t *testing.T, tr *TaskRunner) { + testutil.WaitForResult(func() (bool, error) { + ts := tr.TaskState() + return ts.State == structs.TaskStateDead, fmt.Errorf("expected task to be dead, got %v", ts.State) }, func(err error) { require.NoError(t, err) }) diff --git a/client/allocrunner/testing.go b/client/allocrunner/testing.go index 6f2fd7b03..44e3eb524 100644 --- a/client/allocrunner/testing.go +++ b/client/allocrunner/testing.go @@ -4,6 +4,7 @@ package allocrunner import ( + "fmt" "sync" "testing" @@ -20,6 +21,7 @@ import ( "github.com/hashicorp/nomad/client/state" "github.com/hashicorp/nomad/client/vaultclient" "github.com/hashicorp/nomad/nomad/structs" + "github.com/hashicorp/nomad/testutil" "github.com/stretchr/testify/require" ) @@ -104,3 +106,13 @@ func TestAllocRunnerFromAlloc(t *testing.T, alloc *structs.Allocation) (*allocRu return ar, cleanup } + +func WaitForClientState(t *testing.T, ar *allocRunner, state string) { + testutil.WaitForResult(func() (bool, error) { + got := ar.AllocState().ClientStatus + return got == state, + fmt.Errorf("expected alloc runner to be in state %s, got %s", state, got) + }, func(err error) { + require.NoError(t, err) + }) +} diff --git a/client/client.go b/client/client.go index cd4925b40..9e8253132 100644 --- a/client/client.go +++ b/client/client.go @@ -160,6 +160,7 @@ type AllocRunner interface { PersistState() error RestartTask(taskName string, taskEvent *structs.TaskEvent) error + RestartRunning(taskEvent *structs.TaskEvent) error RestartAll(taskEvent *structs.TaskEvent) error Reconnect(update *structs.Allocation) error @@ -925,20 +926,31 @@ func (c *Client) CollectAllAllocs() { c.garbageCollector.CollectAll() } -func (c *Client) RestartAllocation(allocID, taskName string) error { +func (c *Client) RestartAllocation(allocID, taskName string, allTasks bool) error { + if allTasks && taskName != "" { + return fmt.Errorf("task name cannot be set when restarting all tasks") + } + ar, err := c.getAllocRunner(allocID) if err != nil { return err } - event := structs.NewTaskEvent(structs.TaskRestartSignal). - SetRestartReason("User requested restart") - if taskName != "" { + event := structs.NewTaskEvent(structs.TaskRestartSignal). + SetRestartReason("User requested task to restart") return ar.RestartTask(taskName, event) } - return ar.RestartAll(event) + if allTasks { + event := structs.NewTaskEvent(structs.TaskRestartSignal). + SetRestartReason("User requested all tasks to restart") + return ar.RestartAll(event) + } + + event := structs.NewTaskEvent(structs.TaskRestartSignal). + SetRestartReason("User requested running tasks to restart") + return ar.RestartRunning(event) } // Node returns the locally registered node diff --git a/command/agent/alloc_endpoint.go b/command/agent/alloc_endpoint.go index e7e1e1d09..c043fe024 100644 --- a/command/agent/alloc_endpoint.go +++ b/command/agent/alloc_endpoint.go @@ -283,6 +283,7 @@ func (s *HTTPServer) allocRestart(allocID string, resp http.ResponseWriter, req // Explicitly parse the body separately to disallow overriding AllocID in req Body. var reqBody struct { TaskName string + AllTasks bool } err := json.NewDecoder(req.Body).Decode(&reqBody) if err != nil && err != io.EOF { @@ -291,6 +292,9 @@ func (s *HTTPServer) allocRestart(allocID string, resp http.ResponseWriter, req if reqBody.TaskName != "" { args.TaskName = reqBody.TaskName } + if reqBody.AllTasks { + args.AllTasks = reqBody.AllTasks + } // Determine the handler to use useLocalClient, useClientRPC, useServerRPC := s.rpcHandlerForAlloc(allocID) diff --git a/command/alloc_restart.go b/command/alloc_restart.go index 9c5dcb3be..cc6d91654 100644 --- a/command/alloc_restart.go +++ b/command/alloc_restart.go @@ -18,8 +18,11 @@ func (c *AllocRestartCommand) Help() string { Usage: nomad alloc restart [options] Restart an existing allocation. This command is used to restart a specific alloc - and its tasks. If no task is provided then all of the allocation's tasks will - be restarted. + and its tasks. If no task is provided then all of the allocation's tasks that + are currently running will be restarted. + + Use the option '-all-tasks' to restart tasks that have already run, such as + non-sidecar prestart and poststart tasks. When ACLs are enabled, this command requires a token with the 'alloc-lifecycle', 'read-job', and 'list-jobs' capabilities for the @@ -31,9 +34,15 @@ General Options: Restart Specific Options: + -all-tasks + If set, all tasks in the allocation will be restarted, even the ones that + already ran. This option cannot be used with '-task' or the '' + argument. + -task - Specify the individual task to restart. If task name is given with both an + Specify the individual task to restart. If task name is given with both an argument and the '-task' option, preference is given to the '-task' option. + This option cannot be used with '-all-tasks'. -verbose Show full information. @@ -44,11 +53,12 @@ Restart Specific Options: func (c *AllocRestartCommand) Name() string { return "alloc restart" } func (c *AllocRestartCommand) Run(args []string) int { - var verbose bool + var allTasks, verbose bool var task string flags := c.Meta.FlagSet(c.Name(), FlagSetClient) flags.Usage = func() { c.Ui.Output(c.Help()) } + flags.BoolVar(&allTasks, "all-tasks", false, "") flags.BoolVar(&verbose, "verbose", false, "") flags.StringVar(&task, "task", "", "") @@ -66,6 +76,17 @@ func (c *AllocRestartCommand) Run(args []string) int { allocID := args[0] + // If -task isn't provided fallback to reading the task name + // from args. + if task == "" && len(args) >= 2 { + task = args[1] + } + + if allTasks && task != "" { + c.Ui.Error("The -all-tasks option is not allowed when restarting a specific task.") + return 1 + } + // Truncate the id unless full length is requested length := shortId if verbose { @@ -113,12 +134,6 @@ func (c *AllocRestartCommand) Run(args []string) int { return 1 } - // If -task isn't provided fallback to reading the task name - // from args. - if task == "" && len(args) >= 2 { - task = args[1] - } - if task != "" { err := validateTaskExistsInAllocation(task, alloc) if err != nil { @@ -127,9 +142,17 @@ func (c *AllocRestartCommand) Run(args []string) int { } } - err = client.Allocations().Restart(alloc, task, nil) + if allTasks { + err = client.Allocations().RestartAllTasks(alloc, nil) + } else { + err = client.Allocations().Restart(alloc, task, nil) + } if err != nil { - c.Ui.Error(fmt.Sprintf("Failed to restart allocation:\n\n%s", err.Error())) + target := "allocation" + if task != "" { + target = "task" + } + c.Ui.Error(fmt.Sprintf("Failed to restart %s:\n\n%s", target, err.Error())) return 1 } diff --git a/nomad/mock/mock.go b/nomad/mock/mock.go index 63c7132c5..d5df8d285 100644 --- a/nomad/mock/mock.go +++ b/nomad/mock/mock.go @@ -700,6 +700,50 @@ func LifecycleAlloc() *structs.Allocation { return alloc } +type LifecycleTaskDef struct { + Name string + RunFor string + ExitCode int + Hook string + IsSidecar bool +} + +// LifecycleAllocFromTasks generates an Allocation with mock tasks that have +// the provided lifecycles. +func LifecycleAllocFromTasks(tasks []LifecycleTaskDef) *structs.Allocation { + alloc := LifecycleAlloc() + alloc.Job.TaskGroups[0].Tasks = []*structs.Task{} + for _, task := range tasks { + var lc *structs.TaskLifecycleConfig + if task.Hook != "" { + // TODO: task coordinator doesn't treat nil and empty structs the same + lc = &structs.TaskLifecycleConfig{ + Hook: task.Hook, + Sidecar: task.IsSidecar, + } + } + + alloc.Job.TaskGroups[0].Tasks = append(alloc.Job.TaskGroups[0].Tasks, + &structs.Task{ + Name: task.Name, + Driver: "mock_driver", + Config: map[string]interface{}{ + "run_for": task.RunFor, + "exit_code": task.ExitCode}, + Lifecycle: lc, + LogConfig: structs.DefaultLogConfig(), + Resources: &structs.Resources{CPU: 100, MemoryMB: 256}, + }, + ) + alloc.TaskResources[task.Name] = &structs.Resources{CPU: 100, MemoryMB: 256} + alloc.AllocatedResources.Tasks[task.Name] = &structs.AllocatedTaskResources{ + Cpu: structs.AllocatedCpuResources{CpuShares: 100}, + Memory: structs.AllocatedMemoryResources{MemoryMB: 256}, + } + } + return alloc +} + func LifecycleJobWithPoststopDeploy() *structs.Job { job := &structs.Job{ Region: "global", diff --git a/nomad/structs/structs.go b/nomad/structs/structs.go index 839feece5..aa089d1ab 100644 --- a/nomad/structs/structs.go +++ b/nomad/structs/structs.go @@ -1028,6 +1028,7 @@ type AllocsGetRequest struct { type AllocRestartRequest struct { AllocID string TaskName string + AllTasks bool QueryOptions } @@ -8043,7 +8044,7 @@ const ( // restarted because it has exceeded its restart policy. TaskNotRestarting = "Not Restarting" - // TaskRestartSignal indicates that the task has been signalled to be + // TaskRestartSignal indicates that the task has been signaled to be // restarted TaskRestartSignal = "Restart Signaled" @@ -8324,6 +8325,9 @@ func (e *TaskEvent) PopulateEventDisplayMessage() { } func (e *TaskEvent) GoString() string { + if e == nil { + return "" + } return fmt.Sprintf("%v - %v", e.Time, e.Type) } diff --git a/website/content/api-docs/allocations.mdx b/website/content/api-docs/allocations.mdx index 2afe3322f..128dbf660 100644 --- a/website/content/api-docs/allocations.mdx +++ b/website/content/api-docs/allocations.mdx @@ -731,6 +731,13 @@ The table below shows this endpoint's support for must be the full UUID, not the short 8-character one. This is specified as part of the path. +- `TaskName` `(string: "")` - Specifies the individual task to restart. Cannot + be used with `AllTasks` set to `true`. + +- `AllTasks` `(bool: false)` - If set to `true` all tasks in the allocation + will be restarted, even the ones that already ran. Cannot be set to `true` if + `TaskName` is defined. + ### Sample Payload ```json diff --git a/website/content/docs/commands/alloc/restart.mdx b/website/content/docs/commands/alloc/restart.mdx index a07820567..32f14b6e8 100644 --- a/website/content/docs/commands/alloc/restart.mdx +++ b/website/content/docs/commands/alloc/restart.mdx @@ -18,12 +18,16 @@ nomad alloc restart [options] This command accepts a single allocation ID and a task name. The task name must be part of the allocation and the task must be currently running. The task name -is optional and if omitted every task in the allocation will be restarted. +is optional and if omitted all tasks that are currently running will be +restarted. -Task name may also be specified using the `-task` option rather than a command -argument. If task name is given with both an argument and the `-task` option, +Task name may also be specified using the `-task` option rather than a command +argument. If task name is given with both an argument and the `-task` option, preference is given to the `-task` option. +Use the option `-all-tasks` to restart tasks that have already run, such as +non-sidecar prestart and poststart tasks. + When ACLs are enabled, this command requires a token with the `alloc-lifecycle`, `read-job`, and `list-jobs` capabilities for the allocation's namespace. @@ -34,7 +38,12 @@ allocation's namespace. ## Restart Options -- `-task`: Specify the individual task to restart. +- `-all-tasks`: If set, all tasks in the allocation will be restarted, even the + ones that already ran. This option cannot be used with `-task` or the + `` argument. + +- `-task`: Specify the individual task to restart. This option cannot be used + with `-all-tasks`. - `-verbose`: Display verbose output.