diff --git a/.changelog/14127.txt b/.changelog/14127.txt new file mode 100644 index 000000000..61c036877 --- /dev/null +++ b/.changelog/14127.txt @@ -0,0 +1,7 @@ +```release-note:improvement +client: add option to restart all tasks of an allocation, regardless of lifecycle type or state. +``` + +```release-note:improvement +client: only start poststop tasks after poststart tasks are done. +``` diff --git a/api/allocations.go b/api/allocations.go index f1fce2c6d..dc2ebb279 100644 --- a/api/allocations.go +++ b/api/allocations.go @@ -141,7 +141,9 @@ func (a *Allocations) GC(alloc *Allocation, q *QueryOptions) error { return err } -// Restart restarts an allocation. +// Restart restarts the tasks that are currently running or a specific task if +// taskName is provided. An error is returned if the task to be restarted is +// not running. // // Note: for cluster topologies where API consumers don't have network access to // Nomad clients, set api.ClientConnTimeout to a small value (ex 1ms) to avoid @@ -156,6 +158,22 @@ func (a *Allocations) Restart(alloc *Allocation, taskName string, q *QueryOption return err } +// RestartAllTasks restarts all tasks in the allocation, regardless of +// lifecycle type or state. Tasks will restart following their lifecycle order. +// +// Note: for cluster topologies where API consumers don't have network access to +// Nomad clients, set api.ClientConnTimeout to a small value (ex 1ms) to avoid +// long pauses on this API call. +func (a *Allocations) RestartAllTasks(alloc *Allocation, q *QueryOptions) error { + req := AllocationRestartRequest{ + AllTasks: true, + } + + var resp struct{} + _, err := a.client.putQuery("/v1/client/allocation/"+alloc.ID+"/restart", &req, &resp, q) + return err +} + // Stop stops an allocation. // // Note: for cluster topologies where API consumers don't have network access to @@ -447,6 +465,7 @@ func (a Allocation) RescheduleInfo(t time.Time) (int, int) { type AllocationRestartRequest struct { TaskName string + AllTasks bool } type AllocSignalRequest struct { diff --git a/client/alloc_endpoint.go b/client/alloc_endpoint.go index 2be7dfb41..52ab8f414 100644 --- a/client/alloc_endpoint.go +++ b/client/alloc_endpoint.go @@ -102,7 +102,7 @@ func (a *Allocations) Restart(args *nstructs.AllocRestartRequest, reply *nstruct return nstructs.ErrPermissionDenied } - return a.c.RestartAllocation(args.AllocID, args.TaskName) + return a.c.RestartAllocation(args.AllocID, args.TaskName, args.AllTasks) } // Stats is used to collect allocation statistics diff --git a/client/alloc_endpoint_test.go b/client/alloc_endpoint_test.go index c8f2560d0..f3c6e3e2b 100644 --- a/client/alloc_endpoint_test.go +++ b/client/alloc_endpoint_test.go @@ -68,6 +68,45 @@ func TestAllocations_Restart(t *testing.T) { }) } +func TestAllocations_RestartAllTasks(t *testing.T) { + ci.Parallel(t) + + require := require.New(t) + client, cleanup := TestClient(t, nil) + defer cleanup() + + alloc := mock.LifecycleAlloc() + require.Nil(client.addAlloc(alloc, "")) + + // Can't restart all tasks while specifying a task name. + req := &nstructs.AllocRestartRequest{ + AllocID: alloc.ID, + AllTasks: true, + TaskName: "web", + } + var resp nstructs.GenericResponse + err := client.ClientRPC("Allocations.Restart", &req, &resp) + require.Error(err) + + // Good request. + req = &nstructs.AllocRestartRequest{ + AllocID: alloc.ID, + AllTasks: true, + } + + testutil.WaitForResult(func() (bool, error) { + var resp2 nstructs.GenericResponse + err := client.ClientRPC("Allocations.Restart", &req, &resp2) + if err != nil && strings.Contains(err.Error(), "not running") { + return false, err + } + + return true, nil + }, func(err error) { + t.Fatalf("err: %v", err) + }) +} + func TestAllocations_Restart_ACL(t *testing.T) { ci.Parallel(t) require := require.New(t) diff --git a/client/allocrunner/alloc_runner.go b/client/allocrunner/alloc_runner.go index 33bc57a8f..1d50c2595 100644 --- a/client/allocrunner/alloc_runner.go +++ b/client/allocrunner/alloc_runner.go @@ -28,7 +28,6 @@ import ( cstate "github.com/hashicorp/nomad/client/state" cstructs "github.com/hashicorp/nomad/client/structs" "github.com/hashicorp/nomad/client/vaultclient" - agentconsul "github.com/hashicorp/nomad/command/agent/consul" "github.com/hashicorp/nomad/helper/pointer" "github.com/hashicorp/nomad/nomad/structs" "github.com/hashicorp/nomad/plugins/device" @@ -547,40 +546,64 @@ func (ar *allocRunner) handleTaskStateUpdates() { } } - // if all live runners are sidecars - kill alloc - if killEvent == nil && hasSidecars && !hasNonSidecarTasks(liveRunners) { - killEvent = structs.NewTaskEvent(structs.TaskMainDead) - } - - // If there's a kill event set and live runners, kill them - if killEvent != nil && len(liveRunners) > 0 { - - // Log kill reason - switch killEvent.Type { - case structs.TaskLeaderDead: - ar.logger.Debug("leader task dead, destroying all tasks", "leader_task", killTask) - case structs.TaskMainDead: - ar.logger.Debug("main tasks dead, destroying all sidecar tasks") - default: - ar.logger.Debug("task failure, destroying all tasks", "failed_task", killTask) + if len(liveRunners) > 0 { + // if all live runners are sidecars - kill alloc + onlySidecarsRemaining := hasSidecars && !hasNonSidecarTasks(liveRunners) + if killEvent == nil && onlySidecarsRemaining { + killEvent = structs.NewTaskEvent(structs.TaskMainDead) } - // Emit kill event for live runners - for _, tr := range liveRunners { - tr.EmitEvent(killEvent) + // If there's a kill event set and live runners, kill them + if killEvent != nil { + + // Log kill reason + switch killEvent.Type { + case structs.TaskLeaderDead: + ar.logger.Debug("leader task dead, destroying all tasks", "leader_task", killTask) + case structs.TaskMainDead: + ar.logger.Debug("main tasks dead, destroying all sidecar tasks") + default: + ar.logger.Debug("task failure, destroying all tasks", "failed_task", killTask) + } + + // Emit kill event for live runners + for _, tr := range liveRunners { + tr.EmitEvent(killEvent) + } + + // Kill 'em all + states = ar.killTasks() + + // Wait for TaskRunners to exit before continuing. This will + // prevent looping before TaskRunners have transitioned to + // Dead. + for _, tr := range liveRunners { + ar.logger.Info("waiting for task to exit", "task", tr.Task().Name) + select { + case <-tr.WaitCh(): + case <-ar.waitCh: + } + } } + } else { + // If there are no live runners left kill all non-poststop task + // runners to unblock them from the alloc restart loop. + for _, tr := range ar.tasks { + if tr.IsPoststopTask() { + continue + } - // Kill 'em all - states = ar.killTasks() - - // Wait for TaskRunners to exit before continuing to - // prevent looping before TaskRunners have transitioned - // to Dead. - for _, tr := range liveRunners { - ar.logger.Info("killing task", "task", tr.Task().Name) select { case <-tr.WaitCh(): case <-ar.waitCh: + default: + // Kill task runner without setting an event because the + // task is already dead, it's just waiting in the alloc + // restart loop. + err := tr.Kill(context.TODO(), nil) + if err != nil { + ar.logger.Warn("failed to kill task", "task", tr.Task().Name, "error", err) + } } } } @@ -648,7 +671,7 @@ func (ar *allocRunner) killTasks() map[string]*structs.TaskState { break } - // Kill the rest non-sidecar or poststop tasks concurrently + // Kill the rest non-sidecar and non-poststop tasks concurrently wg := sync.WaitGroup{} for name, tr := range ar.tasks { // Filter out poststop and sidecar tasks so that they stop after all the other tasks are killed @@ -1205,19 +1228,37 @@ func (ar *allocRunner) GetTaskEventHandler(taskName string) drivermanager.EventH return nil } -// RestartTask signalls the task runner for the provided task to restart. -func (ar *allocRunner) RestartTask(taskName string, taskEvent *structs.TaskEvent) error { +// Restart satisfies the WorkloadRestarter interface and restarts all tasks +// that are currently running. +func (ar *allocRunner) Restart(ctx context.Context, event *structs.TaskEvent, failure bool) error { + return ar.restartTasks(ctx, event, failure, false) +} + +// RestartTask restarts the provided task. +func (ar *allocRunner) RestartTask(taskName string, event *structs.TaskEvent) error { tr, ok := ar.tasks[taskName] if !ok { return fmt.Errorf("Could not find task runner for task: %s", taskName) } - return tr.Restart(context.TODO(), taskEvent, false) + return tr.Restart(context.TODO(), event, false) } -// Restart satisfies the WorkloadRestarter interface restarts all task runners -// concurrently -func (ar *allocRunner) Restart(ctx context.Context, event *structs.TaskEvent, failure bool) error { +// RestartRunning restarts all tasks that are currently running. +func (ar *allocRunner) RestartRunning(event *structs.TaskEvent) error { + return ar.restartTasks(context.TODO(), event, false, false) +} + +// RestartAll restarts all tasks in the allocation, including dead ones. They +// will restart following their lifecycle order. +func (ar *allocRunner) RestartAll(event *structs.TaskEvent) error { + // Restart the taskCoordinator to allow dead tasks to run again. + ar.taskCoordinator.Restart() + return ar.restartTasks(context.TODO(), event, false, true) +} + +// restartTasks restarts all task runners concurrently. +func (ar *allocRunner) restartTasks(ctx context.Context, event *structs.TaskEvent, failure bool, force bool) error { waitCh := make(chan struct{}) var err *multierror.Error var errMutex sync.Mutex @@ -1230,10 +1271,19 @@ func (ar *allocRunner) Restart(ctx context.Context, event *structs.TaskEvent, fa defer close(waitCh) for tn, tr := range ar.tasks { wg.Add(1) - go func(taskName string, r agentconsul.WorkloadRestarter) { + go func(taskName string, taskRunner *taskrunner.TaskRunner) { defer wg.Done() - e := r.Restart(ctx, event, failure) - if e != nil { + + var e error + if force { + e = taskRunner.ForceRestart(ctx, event.Copy(), failure) + } else { + e = taskRunner.Restart(ctx, event.Copy(), failure) + } + + // Ignore ErrTaskNotRunning errors since tasks that are not + // running are expected to not be restarted. + if e != nil && e != taskrunner.ErrTaskNotRunning { errMutex.Lock() defer errMutex.Unlock() err = multierror.Append(err, fmt.Errorf("failed to restart task %s: %v", taskName, e)) @@ -1251,25 +1301,6 @@ func (ar *allocRunner) Restart(ctx context.Context, event *structs.TaskEvent, fa return err.ErrorOrNil() } -// RestartAll signalls all task runners in the allocation to restart and passes -// a copy of the task event to each restart event. -// Returns any errors in a concatenated form. -func (ar *allocRunner) RestartAll(taskEvent *structs.TaskEvent) error { - var err *multierror.Error - - // run alloc task restart hooks - ar.taskRestartHooks() - - for tn := range ar.tasks { - rerr := ar.RestartTask(tn, taskEvent.Copy()) - if rerr != nil { - err = multierror.Append(err, rerr) - } - } - - return err.ErrorOrNil() -} - // Signal sends a signal request to task runners inside an allocation. If the // taskName is empty, then it is sent to all tasks. func (ar *allocRunner) Signal(taskName, signal string) error { diff --git a/client/allocrunner/alloc_runner_test.go b/client/allocrunner/alloc_runner_test.go index 0494c938a..90b94657b 100644 --- a/client/allocrunner/alloc_runner_test.go +++ b/client/allocrunner/alloc_runner_test.go @@ -10,6 +10,7 @@ import ( "time" "github.com/hashicorp/consul/api" + multierror "github.com/hashicorp/go-multierror" "github.com/hashicorp/nomad/ci" "github.com/hashicorp/nomad/client/allochealth" "github.com/hashicorp/nomad/client/allocrunner/tasklifecycle" @@ -483,6 +484,464 @@ func TestAllocRunner_Lifecycle_Poststop(t *testing.T) { } +func TestAllocRunner_Lifecycle_Restart(t *testing.T) { + ci.Parallel(t) + + // test cases can use this default or override w/ taskDefs param + alloc := mock.LifecycleAllocFromTasks([]mock.LifecycleTaskDef{ + {Name: "main", RunFor: "100s", ExitCode: 0, Hook: "", IsSidecar: false}, + {Name: "prestart-oneshot", RunFor: "1s", ExitCode: 0, Hook: "prestart", IsSidecar: false}, + {Name: "prestart-sidecar", RunFor: "100s", ExitCode: 0, Hook: "prestart", IsSidecar: true}, + {Name: "poststart-oneshot", RunFor: "1s", ExitCode: 0, Hook: "poststart", IsSidecar: false}, + {Name: "poststart-sidecar", RunFor: "100s", ExitCode: 0, Hook: "poststart", IsSidecar: true}, + {Name: "poststop", RunFor: "1s", ExitCode: 0, Hook: "poststop", IsSidecar: false}, + }) + alloc.Job.Type = structs.JobTypeService + rp := &structs.RestartPolicy{ + Attempts: 1, + Interval: 10 * time.Minute, + Delay: 1 * time.Nanosecond, + Mode: structs.RestartPolicyModeFail, + } + + ev := &structs.TaskEvent{Type: structs.TaskRestartSignal} + + testCases := []struct { + name string + taskDefs []mock.LifecycleTaskDef + isBatch bool + hasLeader bool + action func(*allocRunner, *structs.Allocation) error + expectedErr string + expectedAfter map[string]structs.TaskState + }{ + { + name: "restart entire allocation", + action: func(ar *allocRunner, alloc *structs.Allocation) error { + return ar.RestartAll(ev) + }, + expectedAfter: map[string]structs.TaskState{ + "main": structs.TaskState{State: "running", Restarts: 1}, + "prestart-oneshot": structs.TaskState{State: "dead", Restarts: 1}, + "prestart-sidecar": structs.TaskState{State: "running", Restarts: 1}, + "poststart-oneshot": structs.TaskState{State: "dead", Restarts: 1}, + "poststart-sidecar": structs.TaskState{State: "running", Restarts: 1}, + "poststop": structs.TaskState{State: "pending", Restarts: 0}, + }, + }, + { + name: "restart only running tasks", + action: func(ar *allocRunner, alloc *structs.Allocation) error { + return ar.RestartRunning(ev) + }, + expectedAfter: map[string]structs.TaskState{ + "main": structs.TaskState{State: "running", Restarts: 1}, + "prestart-oneshot": structs.TaskState{State: "dead", Restarts: 0}, + "prestart-sidecar": structs.TaskState{State: "running", Restarts: 1}, + "poststart-oneshot": structs.TaskState{State: "dead", Restarts: 0}, + "poststart-sidecar": structs.TaskState{State: "running", Restarts: 1}, + "poststop": structs.TaskState{State: "pending", Restarts: 0}, + }, + }, + { + name: "batch job restart entire allocation", + taskDefs: []mock.LifecycleTaskDef{ + {Name: "main", RunFor: "100s", ExitCode: 1, Hook: "", IsSidecar: false}, + {Name: "prestart-oneshot", RunFor: "1s", ExitCode: 0, Hook: "prestart", IsSidecar: false}, + {Name: "prestart-sidecar", RunFor: "100s", ExitCode: 0, Hook: "prestart", IsSidecar: true}, + {Name: "poststart-oneshot", RunFor: "1s", ExitCode: 0, Hook: "poststart", IsSidecar: false}, + {Name: "poststart-sidecar", RunFor: "100s", ExitCode: 0, Hook: "poststart", IsSidecar: true}, + {Name: "poststop", RunFor: "1s", ExitCode: 0, Hook: "poststop", IsSidecar: false}, + }, + isBatch: true, + action: func(ar *allocRunner, alloc *structs.Allocation) error { + return ar.RestartAll(ev) + }, + expectedAfter: map[string]structs.TaskState{ + "main": structs.TaskState{State: "running", Restarts: 1}, + "prestart-oneshot": structs.TaskState{State: "dead", Restarts: 1}, + "prestart-sidecar": structs.TaskState{State: "running", Restarts: 1}, + "poststart-oneshot": structs.TaskState{State: "dead", Restarts: 1}, + "poststart-sidecar": structs.TaskState{State: "running", Restarts: 1}, + "poststop": structs.TaskState{State: "pending", Restarts: 0}, + }, + }, + { + name: "batch job restart only running tasks ", + taskDefs: []mock.LifecycleTaskDef{ + {Name: "main", RunFor: "100s", ExitCode: 1, Hook: "", IsSidecar: false}, + {Name: "prestart-oneshot", RunFor: "1s", ExitCode: 0, Hook: "prestart", IsSidecar: false}, + {Name: "prestart-sidecar", RunFor: "100s", ExitCode: 0, Hook: "prestart", IsSidecar: true}, + {Name: "poststart-oneshot", RunFor: "1s", ExitCode: 0, Hook: "poststart", IsSidecar: false}, + {Name: "poststart-sidecar", RunFor: "100s", ExitCode: 0, Hook: "poststart", IsSidecar: true}, + {Name: "poststop", RunFor: "1s", ExitCode: 0, Hook: "poststop", IsSidecar: false}, + }, + isBatch: true, + action: func(ar *allocRunner, alloc *structs.Allocation) error { + return ar.RestartRunning(ev) + }, + expectedAfter: map[string]structs.TaskState{ + "main": structs.TaskState{State: "running", Restarts: 1}, + "prestart-oneshot": structs.TaskState{State: "dead", Restarts: 0}, + "prestart-sidecar": structs.TaskState{State: "running", Restarts: 1}, + "poststart-oneshot": structs.TaskState{State: "dead", Restarts: 0}, + "poststart-sidecar": structs.TaskState{State: "running", Restarts: 1}, + "poststop": structs.TaskState{State: "pending", Restarts: 0}, + }, + }, + { + name: "restart entire allocation with leader", + hasLeader: true, + action: func(ar *allocRunner, alloc *structs.Allocation) error { + return ar.RestartAll(ev) + }, + expectedAfter: map[string]structs.TaskState{ + "main": structs.TaskState{State: "running", Restarts: 1}, + "prestart-oneshot": structs.TaskState{State: "dead", Restarts: 1}, + "prestart-sidecar": structs.TaskState{State: "running", Restarts: 1}, + "poststart-oneshot": structs.TaskState{State: "dead", Restarts: 1}, + "poststart-sidecar": structs.TaskState{State: "running", Restarts: 1}, + "poststop": structs.TaskState{State: "pending", Restarts: 0}, + }, + }, + { + name: "stop from server", + action: func(ar *allocRunner, alloc *structs.Allocation) error { + stopAlloc := alloc.Copy() + stopAlloc.DesiredStatus = structs.AllocDesiredStatusStop + ar.Update(stopAlloc) + return nil + }, + expectedAfter: map[string]structs.TaskState{ + "main": structs.TaskState{State: "dead", Restarts: 0}, + "prestart-oneshot": structs.TaskState{State: "dead", Restarts: 0}, + "prestart-sidecar": structs.TaskState{State: "dead", Restarts: 0}, + "poststart-oneshot": structs.TaskState{State: "dead", Restarts: 0}, + "poststart-sidecar": structs.TaskState{State: "dead", Restarts: 0}, + "poststop": structs.TaskState{State: "dead", Restarts: 0}, + }, + }, + { + name: "restart main task", + action: func(ar *allocRunner, alloc *structs.Allocation) error { + return ar.RestartTask("main", ev) + }, + expectedAfter: map[string]structs.TaskState{ + "main": structs.TaskState{State: "running", Restarts: 1}, + "prestart-oneshot": structs.TaskState{State: "dead", Restarts: 0}, + "prestart-sidecar": structs.TaskState{State: "running", Restarts: 0}, + "poststart-oneshot": structs.TaskState{State: "dead", Restarts: 0}, + "poststart-sidecar": structs.TaskState{State: "running", Restarts: 0}, + "poststop": structs.TaskState{State: "pending", Restarts: 0}, + }, + }, + { + name: "restart leader main task", + hasLeader: true, + action: func(ar *allocRunner, alloc *structs.Allocation) error { + return ar.RestartTask("main", ev) + }, + expectedAfter: map[string]structs.TaskState{ + "main": structs.TaskState{State: "running", Restarts: 1}, + "prestart-oneshot": structs.TaskState{State: "dead", Restarts: 0}, + "prestart-sidecar": structs.TaskState{State: "running", Restarts: 0}, + "poststart-oneshot": structs.TaskState{State: "dead", Restarts: 0}, + "poststart-sidecar": structs.TaskState{State: "running", Restarts: 0}, + "poststop": structs.TaskState{State: "pending", Restarts: 0}, + }, + }, + { + name: "main task fails and restarts once", + taskDefs: []mock.LifecycleTaskDef{ + {Name: "main", RunFor: "2s", ExitCode: 1, Hook: "", IsSidecar: false}, + {Name: "prestart-oneshot", RunFor: "1s", ExitCode: 0, Hook: "prestart", IsSidecar: false}, + {Name: "prestart-sidecar", RunFor: "100s", ExitCode: 0, Hook: "prestart", IsSidecar: true}, + {Name: "poststart-oneshot", RunFor: "1s", ExitCode: 0, Hook: "poststart", IsSidecar: false}, + {Name: "poststart-sidecar", RunFor: "100s", ExitCode: 0, Hook: "poststart", IsSidecar: true}, + {Name: "poststop", RunFor: "1s", ExitCode: 0, Hook: "poststop", IsSidecar: false}, + }, + action: func(ar *allocRunner, alloc *structs.Allocation) error { + time.Sleep(3 * time.Second) // make sure main task has exited + return nil + }, + expectedAfter: map[string]structs.TaskState{ + "main": structs.TaskState{State: "dead", Restarts: 1}, + "prestart-oneshot": structs.TaskState{State: "dead", Restarts: 0}, + "prestart-sidecar": structs.TaskState{State: "dead", Restarts: 0}, + "poststart-oneshot": structs.TaskState{State: "dead", Restarts: 0}, + "poststart-sidecar": structs.TaskState{State: "dead", Restarts: 0}, + "poststop": structs.TaskState{State: "dead", Restarts: 0}, + }, + }, + { + name: "leader main task fails and restarts once", + taskDefs: []mock.LifecycleTaskDef{ + {Name: "main", RunFor: "2s", ExitCode: 1, Hook: "", IsSidecar: false}, + {Name: "prestart-oneshot", RunFor: "1s", ExitCode: 0, Hook: "prestart", IsSidecar: false}, + {Name: "prestart-sidecar", RunFor: "100s", ExitCode: 0, Hook: "prestart", IsSidecar: true}, + {Name: "poststart-oneshot", RunFor: "1s", ExitCode: 0, Hook: "poststart", IsSidecar: false}, + {Name: "poststart-sidecar", RunFor: "100s", ExitCode: 0, Hook: "poststart", IsSidecar: true}, + {Name: "poststop", RunFor: "1s", ExitCode: 0, Hook: "poststop", IsSidecar: false}, + }, + hasLeader: true, + action: func(ar *allocRunner, alloc *structs.Allocation) error { + time.Sleep(3 * time.Second) // make sure main task has exited + return nil + }, + expectedAfter: map[string]structs.TaskState{ + "main": structs.TaskState{State: "dead", Restarts: 1}, + "prestart-oneshot": structs.TaskState{State: "dead", Restarts: 0}, + "prestart-sidecar": structs.TaskState{State: "dead", Restarts: 0}, + "poststart-oneshot": structs.TaskState{State: "dead", Restarts: 0}, + "poststart-sidecar": structs.TaskState{State: "dead", Restarts: 0}, + "poststop": structs.TaskState{State: "dead", Restarts: 0}, + }, + }, + { + name: "main stopped unexpectedly and restarts once", + taskDefs: []mock.LifecycleTaskDef{ + {Name: "main", RunFor: "2s", ExitCode: 0, Hook: "", IsSidecar: false}, + {Name: "prestart-oneshot", RunFor: "1s", ExitCode: 0, Hook: "prestart", IsSidecar: false}, + {Name: "prestart-sidecar", RunFor: "100s", ExitCode: 0, Hook: "prestart", IsSidecar: true}, + {Name: "poststart-oneshot", RunFor: "1s", ExitCode: 0, Hook: "poststart", IsSidecar: false}, + {Name: "poststart-sidecar", RunFor: "100s", ExitCode: 0, Hook: "poststart", IsSidecar: true}, + {Name: "poststop", RunFor: "1s", ExitCode: 0, Hook: "poststop", IsSidecar: false}, + }, + action: func(ar *allocRunner, alloc *structs.Allocation) error { + time.Sleep(3 * time.Second) // make sure main task has exited + return nil + }, + expectedAfter: map[string]structs.TaskState{ + "main": structs.TaskState{State: "dead", Restarts: 1}, + "prestart-oneshot": structs.TaskState{State: "dead", Restarts: 0}, + "prestart-sidecar": structs.TaskState{State: "dead", Restarts: 0}, + "poststart-oneshot": structs.TaskState{State: "dead", Restarts: 0}, + "poststart-sidecar": structs.TaskState{State: "dead", Restarts: 0}, + "poststop": structs.TaskState{State: "dead", Restarts: 0}, + }, + }, + { + name: "leader main stopped unexpectedly and restarts once", + taskDefs: []mock.LifecycleTaskDef{ + {Name: "main", RunFor: "2s", ExitCode: 0, Hook: "", IsSidecar: false}, + {Name: "prestart-oneshot", RunFor: "1s", ExitCode: 0, Hook: "prestart", IsSidecar: false}, + {Name: "prestart-sidecar", RunFor: "100s", ExitCode: 0, Hook: "prestart", IsSidecar: true}, + {Name: "poststart-oneshot", RunFor: "1s", ExitCode: 0, Hook: "poststart", IsSidecar: false}, + {Name: "poststart-sidecar", RunFor: "100s", ExitCode: 0, Hook: "poststart", IsSidecar: true}, + {Name: "poststop", RunFor: "1s", ExitCode: 0, Hook: "poststop", IsSidecar: false}, + }, + action: func(ar *allocRunner, alloc *structs.Allocation) error { + time.Sleep(3 * time.Second) // make sure main task has exited + return nil + }, + expectedAfter: map[string]structs.TaskState{ + "main": structs.TaskState{State: "dead", Restarts: 1}, + "prestart-oneshot": structs.TaskState{State: "dead", Restarts: 0}, + "prestart-sidecar": structs.TaskState{State: "dead", Restarts: 0}, + "poststart-oneshot": structs.TaskState{State: "dead", Restarts: 0}, + "poststart-sidecar": structs.TaskState{State: "dead", Restarts: 0}, + "poststop": structs.TaskState{State: "dead", Restarts: 0}, + }, + }, + { + name: "failed main task cannot be restarted", + taskDefs: []mock.LifecycleTaskDef{ + {Name: "main", RunFor: "2s", ExitCode: 1, Hook: "", IsSidecar: false}, + {Name: "prestart-oneshot", RunFor: "1s", ExitCode: 0, Hook: "prestart", IsSidecar: false}, + {Name: "prestart-sidecar", RunFor: "100s", ExitCode: 0, Hook: "prestart", IsSidecar: true}, + {Name: "poststart-oneshot", RunFor: "1s", ExitCode: 0, Hook: "poststart", IsSidecar: false}, + {Name: "poststart-sidecar", RunFor: "100s", ExitCode: 0, Hook: "poststart", IsSidecar: true}, + {Name: "poststop", RunFor: "1s", ExitCode: 0, Hook: "poststop", IsSidecar: false}, + }, + action: func(ar *allocRunner, alloc *structs.Allocation) error { + // make sure main task has had a chance to restart once on its + // own and fail again before we try to manually restart it + time.Sleep(5 * time.Second) + return ar.RestartTask("main", ev) + }, + expectedErr: "Task not running", + expectedAfter: map[string]structs.TaskState{ + "main": structs.TaskState{State: "dead", Restarts: 1}, + "prestart-oneshot": structs.TaskState{State: "dead", Restarts: 0}, + "prestart-sidecar": structs.TaskState{State: "dead", Restarts: 0}, + "poststart-oneshot": structs.TaskState{State: "dead", Restarts: 0}, + "poststart-sidecar": structs.TaskState{State: "dead", Restarts: 0}, + "poststop": structs.TaskState{State: "dead", Restarts: 0}, + }, + }, + { + name: "restart prestart-sidecar task", + action: func(ar *allocRunner, alloc *structs.Allocation) error { + return ar.RestartTask("prestart-sidecar", ev) + }, + expectedAfter: map[string]structs.TaskState{ + "main": structs.TaskState{State: "running", Restarts: 0}, + "prestart-oneshot": structs.TaskState{State: "dead", Restarts: 0}, + "prestart-sidecar": structs.TaskState{State: "running", Restarts: 1}, + "poststart-oneshot": structs.TaskState{State: "dead", Restarts: 0}, + "poststart-sidecar": structs.TaskState{State: "running", Restarts: 0}, + "poststop": structs.TaskState{State: "pending", Restarts: 0}, + }, + }, + { + name: "restart poststart-sidecar task", + action: func(ar *allocRunner, alloc *structs.Allocation) error { + return ar.RestartTask("poststart-sidecar", ev) + }, + expectedAfter: map[string]structs.TaskState{ + "main": structs.TaskState{State: "running", Restarts: 0}, + "prestart-oneshot": structs.TaskState{State: "dead", Restarts: 0}, + "prestart-sidecar": structs.TaskState{State: "running", Restarts: 0}, + "poststart-oneshot": structs.TaskState{State: "dead", Restarts: 0}, + "poststart-sidecar": structs.TaskState{State: "running", Restarts: 1}, + "poststop": structs.TaskState{State: "pending", Restarts: 0}, + }, + }, + } + + for _, tc := range testCases { + tc := tc + t.Run(tc.name, func(t *testing.T) { + ci.Parallel(t) + + alloc := alloc.Copy() + alloc.Job.TaskGroups[0].RestartPolicy = rp + if tc.taskDefs != nil { + alloc = mock.LifecycleAllocFromTasks(tc.taskDefs) + alloc.Job.Type = structs.JobTypeService + } + for _, task := range alloc.Job.TaskGroups[0].Tasks { + task.RestartPolicy = rp // tasks inherit the group policy + } + if tc.hasLeader { + for _, task := range alloc.Job.TaskGroups[0].Tasks { + if task.Name == "main" { + task.Leader = true + } + } + } + if tc.isBatch { + alloc.Job.Type = structs.JobTypeBatch + } + + conf, cleanup := testAllocRunnerConfig(t, alloc) + defer cleanup() + ar, err := NewAllocRunner(conf) + require.NoError(t, err) + defer destroy(ar) + go ar.Run() + + upd := conf.StateUpdater.(*MockStateUpdater) + + // assert our "before" states: + // - all one-shot tasks should be dead but not failed + // - all main tasks and sidecars should be running + // - no tasks should have restarted + testutil.WaitForResult(func() (bool, error) { + last := upd.Last() + if last == nil { + return false, fmt.Errorf("no update") + } + if last.ClientStatus != structs.AllocClientStatusRunning { + return false, fmt.Errorf( + "expected alloc to be running not %s", last.ClientStatus) + } + var errs *multierror.Error + + expectedBefore := map[string]string{ + "main": "running", + "prestart-oneshot": "dead", + "prestart-sidecar": "running", + "poststart-oneshot": "dead", + "poststart-sidecar": "running", + "poststop": "pending", + } + + for task, expected := range expectedBefore { + got, ok := last.TaskStates[task] + if !ok { + continue + } + if got.State != expected { + errs = multierror.Append(errs, fmt.Errorf( + "expected initial state of task %q to be %q not %q", + task, expected, got.State)) + } + if got.Restarts != 0 { + errs = multierror.Append(errs, fmt.Errorf( + "expected no initial restarts of task %q, not %q", + task, got.Restarts)) + } + if expected == "dead" && got.Failed { + errs = multierror.Append(errs, fmt.Errorf( + "expected ephemeral task %q to be dead but not failed", + task)) + } + + } + if errs.ErrorOrNil() != nil { + return false, errs.ErrorOrNil() + } + return true, nil + }, func(err error) { + require.NoError(t, err, "error waiting for initial state") + }) + + // perform the action + err = tc.action(ar, alloc.Copy()) + if tc.expectedErr != "" { + require.EqualError(t, err, tc.expectedErr) + } else { + require.NoError(t, err) + } + + // assert our "after" states + testutil.WaitForResult(func() (bool, error) { + last := upd.Last() + if last == nil { + return false, fmt.Errorf("no update") + } + var errs *multierror.Error + for task, expected := range tc.expectedAfter { + got, ok := last.TaskStates[task] + if !ok { + errs = multierror.Append(errs, fmt.Errorf( + "no final state found for task %q", task, + )) + } + if got.State != expected.State { + errs = multierror.Append(errs, fmt.Errorf( + "expected final state of task %q to be %q not %q", + task, expected.State, got.State)) + } + if expected.State == "dead" { + if got.FinishedAt.IsZero() || got.StartedAt.IsZero() { + errs = multierror.Append(errs, fmt.Errorf( + "expected final state of task %q to have start and finish time", task)) + } + if len(got.Events) < 2 { + errs = multierror.Append(errs, fmt.Errorf( + "expected final state of task %q to include at least 2 tasks", task)) + } + } + + if got.Restarts != expected.Restarts { + errs = multierror.Append(errs, fmt.Errorf( + "expected final restarts of task %q to be %v not %v", + task, expected.Restarts, got.Restarts)) + } + } + if errs.ErrorOrNil() != nil { + return false, errs.ErrorOrNil() + } + return true, nil + }, func(err error) { + require.NoError(t, err, "error waiting for final state") + }) + }) + } +} + func TestAllocRunner_TaskGroup_ShutdownDelay(t *testing.T) { ci.Parallel(t) @@ -1213,7 +1672,7 @@ func TestAllocRunner_MoveAllocDir(t *testing.T) { ar.Run() defer destroy(ar) - require.Equal(t, structs.AllocClientStatusComplete, ar.AllocState().ClientStatus) + WaitForClientState(t, ar, structs.AllocClientStatusComplete) // Step 2. Modify its directory task := alloc.Job.TaskGroups[0].Tasks[0] @@ -1241,7 +1700,7 @@ func TestAllocRunner_MoveAllocDir(t *testing.T) { ar2.Run() defer destroy(ar2) - require.Equal(t, structs.AllocClientStatusComplete, ar2.AllocState().ClientStatus) + WaitForClientState(t, ar, structs.AllocClientStatusComplete) // Ensure that data from ar was moved to ar2 dataFile = filepath.Join(ar2.allocDir.SharedDir, "data", "data_file") diff --git a/client/allocrunner/alloc_runner_unix_test.go b/client/allocrunner/alloc_runner_unix_test.go index ab3c777dd..085956910 100644 --- a/client/allocrunner/alloc_runner_unix_test.go +++ b/client/allocrunner/alloc_runner_unix_test.go @@ -207,18 +207,18 @@ func TestAllocRunner_Restore_CompletedBatch(t *testing.T) { go ar2.Run() defer destroy(ar2) - // AR waitCh must be closed even when task doesn't run again + // AR waitCh must be open as the task waits for a possible alloc restart. select { case <-ar2.WaitCh(): - case <-time.After(10 * time.Second): - require.Fail(t, "alloc.waitCh wasn't closed") + require.Fail(t, "alloc.waitCh was closed") + default: } - // TR waitCh must be closed too! + // TR waitCh must be open too! select { case <-ar2.tasks[task.Name].WaitCh(): - case <-time.After(10 * time.Second): - require.Fail(t, "tr.waitCh wasn't closed") + require.Fail(t, "tr.waitCh was closed") + default: } // Assert that events are unmodified, which they would if task re-run diff --git a/client/allocrunner/tasklifecycle/coordinator.go b/client/allocrunner/tasklifecycle/coordinator.go index 60b798ed5..90a8e7fc0 100644 --- a/client/allocrunner/tasklifecycle/coordinator.go +++ b/client/allocrunner/tasklifecycle/coordinator.go @@ -16,7 +16,7 @@ const ( coordinatorStatePrestart coordinatorStateMain coordinatorStatePoststart - coordinatorStateWaitMain + coordinatorStateWaitAlloc coordinatorStatePoststop ) @@ -30,8 +30,8 @@ func (s coordinatorState) String() string { return "main" case coordinatorStatePoststart: return "poststart" - case coordinatorStateWaitMain: - return "wait_main" + case coordinatorStateWaitAlloc: + return "wait_alloc" case coordinatorStatePoststop: return "poststart" } @@ -109,6 +109,15 @@ func NewCoordinator(logger hclog.Logger, tasks []*structs.Task, shutdownCh <-cha return c } +// Restart sets the Coordinator state back to "init" and is used to coordinate +// a full alloc restart. Since all tasks will run again they need to be pending +// before they are allowed to proceed. +func (c *Coordinator) Restart() { + c.currentStateLock.Lock() + defer c.currentStateLock.Unlock() + c.enterStateLocked(coordinatorStateInit) +} + // Restore is used to set the Coordinator FSM to the correct state when an // alloc is restored. Must be called before the allocrunner is running. func (c *Coordinator) Restore(states map[string]*structs.TaskState) { @@ -151,6 +160,13 @@ func (c *Coordinator) TaskStateUpdated(states map[string]*structs.TaskState) { // current internal state and the received states of the tasks. // The currentStateLock must be held before calling this method. func (c *Coordinator) nextStateLocked(states map[string]*structs.TaskState) coordinatorState { + + // coordinatorStatePoststop is the terminal state of the FSM, and can be + // reached at any time. + if c.isAllocDone(states) { + return coordinatorStatePoststop + } + switch c.currentState { case coordinatorStateInit: if !c.isInitDone(states) { @@ -174,11 +190,11 @@ func (c *Coordinator) nextStateLocked(states map[string]*structs.TaskState) coor if !c.isPoststartDone(states) { return coordinatorStatePoststart } - return coordinatorStateWaitMain + return coordinatorStateWaitAlloc - case coordinatorStateWaitMain: - if !c.isWaitMainDone(states) { - return coordinatorStateWaitMain + case coordinatorStateWaitAlloc: + if !c.isAllocDone(states) { + return coordinatorStateWaitAlloc } return coordinatorStatePoststop @@ -233,7 +249,7 @@ func (c *Coordinator) enterStateLocked(state coordinatorState) { c.allow(lifecycleStagePoststartEphemeral) c.allow(lifecycleStagePoststartSidecar) - case coordinatorStateWaitMain: + case coordinatorStateWaitAlloc: c.block(lifecycleStagePrestartEphemeral) c.block(lifecycleStagePoststartEphemeral) c.block(lifecycleStagePoststop) @@ -268,7 +284,7 @@ func (c *Coordinator) isInitDone(states map[string]*structs.TaskState) bool { // isPrestartDone returns true when the following conditions are met: // - there is at least one prestart task -// - all ephemeral prestart tasks are in the "dead" state. +// - all ephemeral prestart tasks are successful. // - no ephemeral prestart task has failed. // - all prestart sidecar tasks are running. func (c *Coordinator) isPrestartDone(states map[string]*structs.TaskState) bool { @@ -277,7 +293,7 @@ func (c *Coordinator) isPrestartDone(states map[string]*structs.TaskState) bool } for _, task := range c.tasksByLifecycle[lifecycleStagePrestartEphemeral] { - if states[task].State != structs.TaskStateDead || states[task].Failed { + if !states[task].Successful() { return false } } @@ -321,9 +337,9 @@ func (c *Coordinator) isPoststartDone(states map[string]*structs.TaskState) bool return true } -// isWaitMainDone returns true when the following conditions are met: -// - all tasks that are not poststop are in the "dead" state. -func (c *Coordinator) isWaitMainDone(states map[string]*structs.TaskState) bool { +// isAllocDone returns true when the following conditions are met: +// - all non-poststop tasks are in the "dead" state. +func (c *Coordinator) isAllocDone(states map[string]*structs.TaskState) bool { for lifecycle, tasks := range c.tasksByLifecycle { if lifecycle == lifecycleStagePoststop { continue diff --git a/client/allocrunner/tasklifecycle/coordinator_test.go b/client/allocrunner/tasklifecycle/coordinator_test.go index 17e42e96e..3f86dcc99 100644 --- a/client/allocrunner/tasklifecycle/coordinator_test.go +++ b/client/allocrunner/tasklifecycle/coordinator_test.go @@ -58,6 +58,9 @@ func TestCoordinator_PrestartRunsBeforeMain(t *testing.T) { sideTask := tasks[1] initTask := tasks[2] + // Only use the tasks that we care about. + tasks = []*structs.Task{mainTask, sideTask, initTask} + shutdownCh := make(chan struct{}) defer close(shutdownCh) coord := NewCoordinator(logger, tasks, shutdownCh) @@ -161,6 +164,9 @@ func TestCoordinator_MainRunsAfterManyInitTasks(t *testing.T) { init1Task := tasks[1] init2Task := tasks[2] + // Only use the tasks that we care about. + tasks = []*structs.Task{mainTask, init1Task, init2Task} + shutdownCh := make(chan struct{}) defer close(shutdownCh) coord := NewCoordinator(logger, tasks, shutdownCh) @@ -227,6 +233,9 @@ func TestCoordinator_FailedInitTask(t *testing.T) { init1Task := tasks[1] init2Task := tasks[2] + // Only use the tasks that we care about. + tasks = []*structs.Task{mainTask, init1Task, init2Task} + shutdownCh := make(chan struct{}) defer close(shutdownCh) coord := NewCoordinator(logger, tasks, shutdownCh) @@ -292,6 +301,9 @@ func TestCoordinator_SidecarNeverStarts(t *testing.T) { sideTask := tasks[1] initTask := tasks[2] + // Only use the tasks that we care about. + tasks = []*structs.Task{mainTask, sideTask, initTask} + shutdownCh := make(chan struct{}) defer close(shutdownCh) coord := NewCoordinator(logger, tasks, shutdownCh) @@ -356,6 +368,9 @@ func TestCoordinator_PoststartStartsAfterMain(t *testing.T) { sideTask := tasks[1] postTask := tasks[2] + // Only use the tasks that we care about. + tasks = []*structs.Task{mainTask, sideTask, postTask} + // Make the the third task is a poststart hook postTask.Lifecycle.Hook = structs.TaskLifecycleHookPoststart diff --git a/client/allocrunner/taskrunner/lifecycle.go b/client/allocrunner/taskrunner/lifecycle.go index b812156a8..90c3d3718 100644 --- a/client/allocrunner/taskrunner/lifecycle.go +++ b/client/allocrunner/taskrunner/lifecycle.go @@ -6,28 +6,103 @@ import ( "github.com/hashicorp/nomad/nomad/structs" ) -// Restart a task. Returns immediately if no task is running. Blocks until -// existing task exits or passed-in context is canceled. +// Restart restarts a task that is already running. Returns an error if the +// task is not running. Blocks until existing task exits or passed-in context +// is canceled. func (tr *TaskRunner) Restart(ctx context.Context, event *structs.TaskEvent, failure bool) error { - tr.logger.Trace("Restart requested", "failure", failure) + tr.logger.Trace("Restart requested", "failure", failure, "event", event.GoString()) - // Grab the handle - handle := tr.getDriverHandle() + taskState := tr.TaskState() + if taskState == nil { + return ErrTaskNotRunning + } - // Check it is running - if handle == nil { + switch taskState.State { + case structs.TaskStatePending, structs.TaskStateDead: + return ErrTaskNotRunning + } + + return tr.restartImpl(ctx, event, failure) +} + +// ForceRestart restarts a task that is already running or reruns it if dead. +// Returns an error if the task is not able to rerun. Blocks until existing +// task exits or passed-in context is canceled. +// +// Callers must restart the AllocRuner taskCoordinator beforehand to make sure +// the task will be able to run again. +func (tr *TaskRunner) ForceRestart(ctx context.Context, event *structs.TaskEvent, failure bool) error { + tr.logger.Trace("Force restart requested", "failure", failure, "event", event.GoString()) + + taskState := tr.TaskState() + if taskState == nil { + return ErrTaskNotRunning + } + + tr.stateLock.Lock() + localState := tr.localState.Copy() + tr.stateLock.Unlock() + + if localState == nil { + return ErrTaskNotRunning + } + + switch taskState.State { + case structs.TaskStatePending: + return ErrTaskNotRunning + + case structs.TaskStateDead: + // Tasks that are in the "dead" state are only allowed to restart if + // their Run() method is still active. + if localState.RunComplete { + return ErrTaskNotRunning + } + } + + return tr.restartImpl(ctx, event, failure) +} + +// restartImpl implements to task restart process. +// +// It should never be called directly as it doesn't verify if the task state +// allows for a restart. +func (tr *TaskRunner) restartImpl(ctx context.Context, event *structs.TaskEvent, failure bool) error { + + // Check if the task is able to restart based on its state and the type of + // restart event that was triggered. + taskState := tr.TaskState() + if taskState == nil { return ErrTaskNotRunning } // Emit the event since it may take a long time to kill tr.EmitEvent(event) - // Run the pre-kill hooks prior to restarting the task - tr.preKill() - // Tell the restart tracker that a restart triggered the exit tr.restartTracker.SetRestartTriggered(failure) + // Signal a restart to unblock tasks that are in the "dead" state, but + // don't block since the channel is buffered. Only one signal is enough to + // notify the tr.Run() loop. + // The channel must be signaled after SetRestartTriggered is called so the + // tr.Run() loop runs again. + if taskState.State == structs.TaskStateDead { + select { + case tr.restartCh <- struct{}{}: + default: + } + } + + // Grab the handle to see if the task is still running and needs to be + // killed. + handle := tr.getDriverHandle() + if handle == nil { + return nil + } + + // Run the pre-kill hooks prior to restarting the task + tr.preKill() + // Grab a handle to the wait channel that will timeout with context cancelation // _before_ killing the task. waitCh, err := handle.WaitCh(ctx) @@ -69,14 +144,17 @@ func (tr *TaskRunner) Signal(event *structs.TaskEvent, s string) error { // Kill a task. Blocks until task exits or context is canceled. State is set to // dead. func (tr *TaskRunner) Kill(ctx context.Context, event *structs.TaskEvent) error { - tr.logger.Trace("Kill requested", "event_type", event.Type, "event_reason", event.KillReason) + tr.logger.Trace("Kill requested") // Cancel the task runner to break out of restart delay or the main run // loop. tr.killCtxCancel() // Emit kill event - tr.EmitEvent(event) + if event != nil { + tr.logger.Trace("Kill event", "event_type", event.Type, "event_reason", event.KillReason) + tr.EmitEvent(event) + } select { case <-tr.WaitCh(): diff --git a/client/allocrunner/taskrunner/sids_hook_test.go b/client/allocrunner/taskrunner/sids_hook_test.go index d5951ed01..c8f765741 100644 --- a/client/allocrunner/taskrunner/sids_hook_test.go +++ b/client/allocrunner/taskrunner/sids_hook_test.go @@ -22,7 +22,6 @@ import ( "github.com/hashicorp/nomad/helper/uuid" "github.com/hashicorp/nomad/nomad/mock" "github.com/hashicorp/nomad/nomad/structs" - "github.com/hashicorp/nomad/testutil" "github.com/stretchr/testify/require" "golang.org/x/sys/unix" ) @@ -297,11 +296,7 @@ func TestTaskRunner_DeriveSIToken_UnWritableTokenFile(t *testing.T) { go tr.Run() // wait for task runner to finish running - select { - case <-tr.WaitCh(): - case <-time.After(time.Duration(testutil.TestMultiplier()*15) * time.Second): - r.Fail("timed out waiting for task runner") - } + testWaitForTaskToDie(t, tr) // assert task exited un-successfully finalState := tr.TaskState() diff --git a/client/allocrunner/taskrunner/state/state.go b/client/allocrunner/taskrunner/state/state.go index 5f83c476c..a4bc26b0f 100644 --- a/client/allocrunner/taskrunner/state/state.go +++ b/client/allocrunner/taskrunner/state/state.go @@ -16,6 +16,11 @@ type LocalState struct { // TaskHandle is the handle used to reattach to the task during recovery TaskHandle *drivers.TaskHandle + + // RunComplete is set to true when the TaskRunner.Run() method finishes. + // It is used to distinguish between a dead task that could be restarted + // and one that will never run again. + RunComplete bool } func NewLocalState() *LocalState { @@ -52,6 +57,7 @@ func (s *LocalState) Copy() *LocalState { Hooks: make(map[string]*HookState, len(s.Hooks)), DriverNetwork: s.DriverNetwork.Copy(), TaskHandle: s.TaskHandle.Copy(), + RunComplete: s.RunComplete, } // Copy the hook state diff --git a/client/allocrunner/taskrunner/task_runner.go b/client/allocrunner/taskrunner/task_runner.go index ecfb2e905..445cf0440 100644 --- a/client/allocrunner/taskrunner/task_runner.go +++ b/client/allocrunner/taskrunner/task_runner.go @@ -62,6 +62,11 @@ const ( // updates have come in since the last one was handled, we only need to // handle the last one. triggerUpdateChCap = 1 + + // restartChCap is the capacity for the restartCh used for triggering task + // restarts. It should be exactly 1 as even if multiple restarts have come + // we only need to handle the last one. + restartChCap = 1 ) type TaskRunner struct { @@ -95,6 +100,9 @@ type TaskRunner struct { // stateDB is for persisting localState and taskState stateDB cstate.StateDB + // restartCh is used to signal that the task should restart. + restartCh chan struct{} + // shutdownCtx is used to exit the TaskRunner *without* affecting task state. shutdownCtx context.Context @@ -367,6 +375,7 @@ func NewTaskRunner(config *Config) (*TaskRunner, error) { shutdownCtx: trCtx, shutdownCtxCancel: trCancel, triggerUpdateCh: make(chan struct{}, triggerUpdateChCap), + restartCh: make(chan struct{}, restartChCap), waitCh: make(chan struct{}), csiManager: config.CSIManager, cpusetCgroupPathGetter: config.CpusetCgroupPathGetter, @@ -506,20 +515,25 @@ func (tr *TaskRunner) Run() { tr.stateLock.RLock() dead := tr.state.State == structs.TaskStateDead + runComplete := tr.localState.RunComplete tr.stateLock.RUnlock() - // if restoring a dead task, ensure that task is cleared and all post hooks - // are called without additional state updates + // If restoring a dead task, ensure the task is cleared and, if the local + // state indicates that the previous Run() call is complete, execute all + // post stop hooks and exit early, otherwise proceed until the + // ALLOC_RESTART loop skipping MAIN since the task is dead. if dead { // do cleanup functions without emitting any additional events/work // to handle cases where we restored a dead task where client terminated // after task finished before completing post-run actions. tr.clearDriverHandle() tr.stateUpdater.TaskStateUpdated() - if err := tr.stop(); err != nil { - tr.logger.Error("stop failed on terminal task", "error", err) + if runComplete { + if err := tr.stop(); err != nil { + tr.logger.Error("stop failed on terminal task", "error", err) + } + return } - return } // Updates are handled asynchronously with the other hooks but each @@ -544,27 +558,24 @@ func (tr *TaskRunner) Run() { // Set the initial task state. tr.stateUpdater.TaskStateUpdated() - select { - case <-tr.startConditionMetCh: - tr.logger.Debug("lifecycle start condition has been met, proceeding") - // yay proceed - case <-tr.killCtx.Done(): - case <-tr.shutdownCtx.Done(): - return - } - timer, stop := helper.NewSafeTimer(0) // timer duration calculated JIT defer stop() MAIN: for !tr.shouldShutdown() { + if dead { + break + } + select { case <-tr.killCtx.Done(): break MAIN case <-tr.shutdownCtx.Done(): // TaskRunner was told to exit immediately return - default: + case <-tr.startConditionMetCh: + tr.logger.Debug("lifecycle start condition has been met, proceeding") + // yay proceed } // Run the prestart hooks @@ -674,6 +685,38 @@ MAIN: // Mark the task as dead tr.UpdateState(structs.TaskStateDead, nil) + // Wait here in case the allocation is restarted. Poststop tasks will never + // run again so skip them to avoid blocking forever. + if !tr.Task().IsPoststop() { + ALLOC_RESTART: + // Run in a loop to handle cases where restartCh is triggered but the + // task runner doesn't need to restart. + for { + select { + case <-tr.killCtx.Done(): + break ALLOC_RESTART + case <-tr.shutdownCtx.Done(): + return + case <-tr.restartCh: + // Restart without delay since the task is not running anymore. + restart, _ := tr.shouldRestart() + if restart { + // Set runner as not dead to allow the MAIN loop to run. + dead = false + goto MAIN + } + } + } + } + + tr.stateLock.Lock() + tr.localState.RunComplete = true + err := tr.stateDB.PutTaskRunnerLocalState(tr.allocID, tr.taskName, tr.localState) + if err != nil { + tr.logger.Warn("error persisting task state on run loop exit", "error", err) + } + tr.stateLock.Unlock() + // Run the stop hooks if err := tr.stop(); err != nil { tr.logger.Error("stop failed", "error", err) @@ -1200,8 +1243,10 @@ func (tr *TaskRunner) UpdateState(state string, event *structs.TaskEvent) { tr.stateLock.Lock() defer tr.stateLock.Unlock() + tr.logger.Trace("setting task state", "state", state) + if event != nil { - tr.logger.Trace("setting task state", "state", state, "event", event.Type) + tr.logger.Trace("appending task event", "state", state, "event", event.Type) // Append the event tr.appendEvent(event) diff --git a/client/allocrunner/taskrunner/task_runner_test.go b/client/allocrunner/taskrunner/task_runner_test.go index 85de7b2eb..6c9eda48f 100644 --- a/client/allocrunner/taskrunner/task_runner_test.go +++ b/client/allocrunner/taskrunner/task_runner_test.go @@ -335,7 +335,7 @@ func TestTaskRunner_Restore_Running(t *testing.T) { defer newTR.Kill(context.Background(), structs.NewTaskEvent("cleanup")) // Wait for new task runner to exit when the process does - <-newTR.WaitCh() + testWaitForTaskToDie(t, newTR) // Assert that the process was only started once started := 0 @@ -349,6 +349,87 @@ func TestTaskRunner_Restore_Running(t *testing.T) { assert.Equal(t, 1, started) } +// TestTaskRunner_Restore_Dead asserts that restoring a dead task will place it +// back in the correct state. If the task was waiting for an alloc restart it +// must be able to be restarted after restore, otherwise a restart must fail. +func TestTaskRunner_Restore_Dead(t *testing.T) { + ci.Parallel(t) + + alloc := mock.BatchAlloc() + alloc.Job.TaskGroups[0].Count = 1 + task := alloc.Job.TaskGroups[0].Tasks[0] + task.Driver = "mock_driver" + task.Config = map[string]interface{}{ + "run_for": "2s", + } + conf, cleanup := testTaskRunnerConfig(t, alloc, task.Name) + conf.StateDB = cstate.NewMemDB(conf.Logger) // "persist" state between task runners + defer cleanup() + + // Run the first TaskRunner + origTR, err := NewTaskRunner(conf) + require.NoError(t, err) + go origTR.Run() + defer origTR.Kill(context.Background(), structs.NewTaskEvent("cleanup")) + + // Wait for it to be dead + testWaitForTaskToDie(t, origTR) + + // Cause TR to exit without shutting down task + origTR.Shutdown() + + // Start a new TaskRunner and do the Restore + newTR, err := NewTaskRunner(conf) + require.NoError(t, err) + require.NoError(t, newTR.Restore()) + + go newTR.Run() + defer newTR.Kill(context.Background(), structs.NewTaskEvent("cleanup")) + + // Verify that the TaskRunner is still active since it was recovered after + // a forced shutdown. + select { + case <-newTR.WaitCh(): + require.Fail(t, "WaitCh is not blocking") + default: + } + + // Verify that we can restart task. + // Retry a few times as the newTR.Run() may not have started yet. + testutil.WaitForResult(func() (bool, error) { + ev := &structs.TaskEvent{Type: structs.TaskRestartSignal} + err = newTR.ForceRestart(context.Background(), ev, false) + return err == nil, err + }, func(err error) { + require.NoError(t, err) + }) + testWaitForTaskToStart(t, newTR) + + // Kill task to verify that it's restored as dead and not able to restart. + newTR.Kill(context.Background(), nil) + testutil.WaitForResult(func() (bool, error) { + select { + case <-newTR.WaitCh(): + return true, nil + default: + return false, fmt.Errorf("task still running") + } + }, func(err error) { + require.NoError(t, err) + }) + + newTR2, err := NewTaskRunner(conf) + require.NoError(t, err) + require.NoError(t, newTR2.Restore()) + + go newTR2.Run() + defer newTR2.Kill(context.Background(), structs.NewTaskEvent("cleanup")) + + ev := &structs.TaskEvent{Type: structs.TaskRestartSignal} + err = newTR2.ForceRestart(context.Background(), ev, false) + require.Equal(t, err, ErrTaskNotRunning) +} + // setupRestoreFailureTest starts a service, shuts down the task runner, and // kills the task before restarting a new TaskRunner. The new TaskRunner is // returned once it is running and waiting in pending along with a cleanup @@ -603,11 +684,7 @@ func TestTaskRunner_TaskEnv_Interpolated(t *testing.T) { defer cleanup() // Wait for task to complete - select { - case <-tr.WaitCh(): - case <-time.After(3 * time.Second): - require.Fail("timeout waiting for task to exit") - } + testWaitForTaskToDie(t, tr) // Get the mock driver plugin driverPlugin, err := conf.DriverManager.Dispense(mockdriver.PluginID.Name) @@ -654,7 +731,9 @@ func TestTaskRunner_TaskEnv_Chroot(t *testing.T) { go tr.Run() defer tr.Kill(context.Background(), structs.NewTaskEvent("cleanup")) - // Wait for task to exit + // Wait for task to exit and kill the task runner to run the stop hooks. + testWaitForTaskToDie(t, tr) + tr.Kill(context.Background(), structs.NewTaskEvent("kill")) timeout := 15 * time.Second if testutil.IsCI() { timeout = 120 * time.Second @@ -703,7 +782,9 @@ func TestTaskRunner_TaskEnv_Image(t *testing.T) { tr, conf, cleanup := runTestTaskRunner(t, alloc, task.Name) defer cleanup() - // Wait for task to exit + // Wait for task to exit and kill task runner to run the stop hooks. + testWaitForTaskToDie(t, tr) + tr.Kill(context.Background(), structs.NewTaskEvent("kill")) select { case <-tr.WaitCh(): case <-time.After(15 * time.Second): @@ -750,7 +831,9 @@ func TestTaskRunner_TaskEnv_None(t *testing.T) { %s `, root, taskDir, taskDir, os.Getenv("PATH")) - // Wait for task to exit + // Wait for task to exit and kill the task runner to run the stop hooks. + testWaitForTaskToDie(t, tr) + tr.Kill(context.Background(), structs.NewTaskEvent("kill")) select { case <-tr.WaitCh(): case <-time.After(15 * time.Second): @@ -818,10 +901,7 @@ func TestTaskRunner_DevicePropogation(t *testing.T) { defer tr.Kill(context.Background(), structs.NewTaskEvent("cleanup")) // Wait for task to complete - select { - case <-tr.WaitCh(): - case <-time.After(3 * time.Second): - } + testWaitForTaskToDie(t, tr) // Get the mock driver plugin driverPlugin, err := conf.DriverManager.Dispense(mockdriver.PluginID.Name) @@ -1328,11 +1408,7 @@ func TestTaskRunner_CheckWatcher_Restart(t *testing.T) { // Wait until the task exits. Don't simply wait for it to run as it may // get restarted and terminated before the test is able to observe it // running. - select { - case <-tr.WaitCh(): - case <-time.After(time.Duration(testutil.TestMultiplier()*15) * time.Second): - require.Fail(t, "timeout") - } + testWaitForTaskToDie(t, tr) state := tr.TaskState() actualEvents := make([]string, len(state.Events)) @@ -1421,11 +1497,7 @@ func TestTaskRunner_BlockForSIDSToken(t *testing.T) { // task runner should exit now that it has been unblocked and it is a batch // job with a zero sleep time - select { - case <-tr.WaitCh(): - case <-time.After(15 * time.Second * time.Duration(testutil.TestMultiplier())): - r.Fail("timed out waiting for batch task to exist") - } + testWaitForTaskToDie(t, tr) // assert task exited successfully finalState := tr.TaskState() @@ -1478,11 +1550,7 @@ func TestTaskRunner_DeriveSIToken_Retry(t *testing.T) { go tr.Run() // assert task runner blocks on SI token - select { - case <-tr.WaitCh(): - case <-time.After(time.Duration(testutil.TestMultiplier()*15) * time.Second): - r.Fail("timed out waiting for task runner") - } + testWaitForTaskToDie(t, tr) // assert task exited successfully finalState := tr.TaskState() @@ -1598,11 +1666,7 @@ func TestTaskRunner_BlockForVaultToken(t *testing.T) { // TR should exit now that it's unblocked by vault as its a batch job // with 0 sleeping. - select { - case <-tr.WaitCh(): - case <-time.After(15 * time.Second * time.Duration(testutil.TestMultiplier())): - require.Fail(t, "timed out waiting for batch task to exit") - } + testWaitForTaskToDie(t, tr) // Assert task exited successfully finalState := tr.TaskState() @@ -1615,6 +1679,14 @@ func TestTaskRunner_BlockForVaultToken(t *testing.T) { require.NoError(t, err) require.Equal(t, token, string(data)) + // Kill task runner to trigger stop hooks + tr.Kill(context.Background(), structs.NewTaskEvent("kill")) + select { + case <-tr.WaitCh(): + case <-time.After(time.Duration(testutil.TestMultiplier()*15) * time.Second): + require.Fail(t, "timed out waiting for task runner to exit") + } + // Check the token was revoked testutil.WaitForResult(func() (bool, error) { if len(vaultClient.StoppedTokens()) != 1 { @@ -1661,17 +1733,21 @@ func TestTaskRunner_DeriveToken_Retry(t *testing.T) { defer tr.Kill(context.Background(), structs.NewTaskEvent("cleanup")) go tr.Run() - // Wait for TR to exit and check its state + // Wait for TR to die and check its state + testWaitForTaskToDie(t, tr) + + state := tr.TaskState() + require.Equal(t, structs.TaskStateDead, state.State) + require.False(t, state.Failed) + + // Kill task runner to trigger stop hooks + tr.Kill(context.Background(), structs.NewTaskEvent("kill")) select { case <-tr.WaitCh(): case <-time.After(time.Duration(testutil.TestMultiplier()*15) * time.Second): require.Fail(t, "timed out waiting for task runner to exit") } - state := tr.TaskState() - require.Equal(t, structs.TaskStateDead, state.State) - require.False(t, state.Failed) - require.Equal(t, 1, count) // Check that the token is on disk @@ -1771,11 +1847,7 @@ func TestTaskRunner_Download_ChrootExec(t *testing.T) { defer cleanup() // Wait for task to run and exit - select { - case <-tr.WaitCh(): - case <-time.After(time.Duration(testutil.TestMultiplier()*15) * time.Second): - require.Fail(t, "timed out waiting for task runner to exit") - } + testWaitForTaskToDie(t, tr) state := tr.TaskState() require.Equal(t, structs.TaskStateDead, state.State) @@ -1816,11 +1888,7 @@ func TestTaskRunner_Download_RawExec(t *testing.T) { defer cleanup() // Wait for task to run and exit - select { - case <-tr.WaitCh(): - case <-time.After(time.Duration(testutil.TestMultiplier()*15) * time.Second): - require.Fail(t, "timed out waiting for task runner to exit") - } + testWaitForTaskToDie(t, tr) state := tr.TaskState() require.Equal(t, structs.TaskStateDead, state.State) @@ -1851,11 +1919,7 @@ func TestTaskRunner_Download_List(t *testing.T) { defer cleanup() // Wait for task to run and exit - select { - case <-tr.WaitCh(): - case <-time.After(time.Duration(testutil.TestMultiplier()*15) * time.Second): - require.Fail(t, "timed out waiting for task runner to exit") - } + testWaitForTaskToDie(t, tr) state := tr.TaskState() require.Equal(t, structs.TaskStateDead, state.State) @@ -1902,11 +1966,7 @@ func TestTaskRunner_Download_Retries(t *testing.T) { tr, _, cleanup := runTestTaskRunner(t, alloc, task.Name) defer cleanup() - select { - case <-tr.WaitCh(): - case <-time.After(time.Duration(testutil.TestMultiplier()*15) * time.Second): - require.Fail(t, "timed out waiting for task to exit") - } + testWaitForTaskToDie(t, tr) state := tr.TaskState() require.Equal(t, structs.TaskStateDead, state.State) @@ -2100,6 +2160,8 @@ func TestTaskRunner_RestartSignalTask_NotRunning(t *testing.T) { case <-time.After(1 * time.Second): } + require.Equal(t, structs.TaskStatePending, tr.TaskState().State) + // Send a signal and restart err = tr.Signal(structs.NewTaskEvent("don't panic"), "QUIT") require.EqualError(t, err, ErrTaskNotRunning.Error()) @@ -2110,12 +2172,7 @@ func TestTaskRunner_RestartSignalTask_NotRunning(t *testing.T) { // Unblock and let it finish waitCh <- struct{}{} - - select { - case <-tr.WaitCh(): - case <-time.After(10 * time.Second): - require.Fail(t, "timed out waiting for task to complete") - } + testWaitForTaskToDie(t, tr) // Assert the task ran and never restarted state := tr.TaskState() @@ -2153,11 +2210,7 @@ func TestTaskRunner_Run_RecoverableStartError(t *testing.T) { tr, _, cleanup := runTestTaskRunner(t, alloc, task.Name) defer cleanup() - select { - case <-tr.WaitCh(): - case <-time.After(time.Duration(testutil.TestMultiplier()*15) * time.Second): - require.Fail(t, "timed out waiting for task to exit") - } + testWaitForTaskToDie(t, tr) state := tr.TaskState() require.Equal(t, structs.TaskStateDead, state.State) @@ -2202,11 +2255,7 @@ func TestTaskRunner_Template_Artifact(t *testing.T) { go tr.Run() // Wait for task to run and exit - select { - case <-tr.WaitCh(): - case <-time.After(15 * time.Second * time.Duration(testutil.TestMultiplier())): - require.Fail(t, "timed out waiting for task runner to exit") - } + testWaitForTaskToDie(t, tr) state := tr.TaskState() require.Equal(t, structs.TaskStateDead, state.State) @@ -2536,7 +2585,9 @@ func TestTaskRunner_UnregisterConsul_Retries(t *testing.T) { tr, err := NewTaskRunner(conf) require.NoError(t, err) defer tr.Kill(context.Background(), structs.NewTaskEvent("cleanup")) - tr.Run() + go tr.Run() + + testWaitForTaskToDie(t, tr) state := tr.TaskState() require.Equal(t, structs.TaskStateDead, state.State) @@ -2562,7 +2613,17 @@ func TestTaskRunner_UnregisterConsul_Retries(t *testing.T) { func testWaitForTaskToStart(t *testing.T, tr *TaskRunner) { testutil.WaitForResult(func() (bool, error) { ts := tr.TaskState() - return ts.State == structs.TaskStateRunning, fmt.Errorf("%v", ts.State) + return ts.State == structs.TaskStateRunning, fmt.Errorf("expected task to be running, got %v", ts.State) + }, func(err error) { + require.NoError(t, err) + }) +} + +// testWaitForTaskToDie waits for the task to die or fails the test +func testWaitForTaskToDie(t *testing.T, tr *TaskRunner) { + testutil.WaitForResult(func() (bool, error) { + ts := tr.TaskState() + return ts.State == structs.TaskStateDead, fmt.Errorf("expected task to be dead, got %v", ts.State) }, func(err error) { require.NoError(t, err) }) diff --git a/client/allocrunner/testing.go b/client/allocrunner/testing.go index 6f2fd7b03..44e3eb524 100644 --- a/client/allocrunner/testing.go +++ b/client/allocrunner/testing.go @@ -4,6 +4,7 @@ package allocrunner import ( + "fmt" "sync" "testing" @@ -20,6 +21,7 @@ import ( "github.com/hashicorp/nomad/client/state" "github.com/hashicorp/nomad/client/vaultclient" "github.com/hashicorp/nomad/nomad/structs" + "github.com/hashicorp/nomad/testutil" "github.com/stretchr/testify/require" ) @@ -104,3 +106,13 @@ func TestAllocRunnerFromAlloc(t *testing.T, alloc *structs.Allocation) (*allocRu return ar, cleanup } + +func WaitForClientState(t *testing.T, ar *allocRunner, state string) { + testutil.WaitForResult(func() (bool, error) { + got := ar.AllocState().ClientStatus + return got == state, + fmt.Errorf("expected alloc runner to be in state %s, got %s", state, got) + }, func(err error) { + require.NoError(t, err) + }) +} diff --git a/client/client.go b/client/client.go index cd4925b40..9e8253132 100644 --- a/client/client.go +++ b/client/client.go @@ -160,6 +160,7 @@ type AllocRunner interface { PersistState() error RestartTask(taskName string, taskEvent *structs.TaskEvent) error + RestartRunning(taskEvent *structs.TaskEvent) error RestartAll(taskEvent *structs.TaskEvent) error Reconnect(update *structs.Allocation) error @@ -925,20 +926,31 @@ func (c *Client) CollectAllAllocs() { c.garbageCollector.CollectAll() } -func (c *Client) RestartAllocation(allocID, taskName string) error { +func (c *Client) RestartAllocation(allocID, taskName string, allTasks bool) error { + if allTasks && taskName != "" { + return fmt.Errorf("task name cannot be set when restarting all tasks") + } + ar, err := c.getAllocRunner(allocID) if err != nil { return err } - event := structs.NewTaskEvent(structs.TaskRestartSignal). - SetRestartReason("User requested restart") - if taskName != "" { + event := structs.NewTaskEvent(structs.TaskRestartSignal). + SetRestartReason("User requested task to restart") return ar.RestartTask(taskName, event) } - return ar.RestartAll(event) + if allTasks { + event := structs.NewTaskEvent(structs.TaskRestartSignal). + SetRestartReason("User requested all tasks to restart") + return ar.RestartAll(event) + } + + event := structs.NewTaskEvent(structs.TaskRestartSignal). + SetRestartReason("User requested running tasks to restart") + return ar.RestartRunning(event) } // Node returns the locally registered node diff --git a/command/agent/alloc_endpoint.go b/command/agent/alloc_endpoint.go index e7e1e1d09..c043fe024 100644 --- a/command/agent/alloc_endpoint.go +++ b/command/agent/alloc_endpoint.go @@ -283,6 +283,7 @@ func (s *HTTPServer) allocRestart(allocID string, resp http.ResponseWriter, req // Explicitly parse the body separately to disallow overriding AllocID in req Body. var reqBody struct { TaskName string + AllTasks bool } err := json.NewDecoder(req.Body).Decode(&reqBody) if err != nil && err != io.EOF { @@ -291,6 +292,9 @@ func (s *HTTPServer) allocRestart(allocID string, resp http.ResponseWriter, req if reqBody.TaskName != "" { args.TaskName = reqBody.TaskName } + if reqBody.AllTasks { + args.AllTasks = reqBody.AllTasks + } // Determine the handler to use useLocalClient, useClientRPC, useServerRPC := s.rpcHandlerForAlloc(allocID) diff --git a/command/alloc_restart.go b/command/alloc_restart.go index 9c5dcb3be..cc6d91654 100644 --- a/command/alloc_restart.go +++ b/command/alloc_restart.go @@ -18,8 +18,11 @@ func (c *AllocRestartCommand) Help() string { Usage: nomad alloc restart [options] Restart an existing allocation. This command is used to restart a specific alloc - and its tasks. If no task is provided then all of the allocation's tasks will - be restarted. + and its tasks. If no task is provided then all of the allocation's tasks that + are currently running will be restarted. + + Use the option '-all-tasks' to restart tasks that have already run, such as + non-sidecar prestart and poststart tasks. When ACLs are enabled, this command requires a token with the 'alloc-lifecycle', 'read-job', and 'list-jobs' capabilities for the @@ -31,9 +34,15 @@ General Options: Restart Specific Options: + -all-tasks + If set, all tasks in the allocation will be restarted, even the ones that + already ran. This option cannot be used with '-task' or the '' + argument. + -task - Specify the individual task to restart. If task name is given with both an + Specify the individual task to restart. If task name is given with both an argument and the '-task' option, preference is given to the '-task' option. + This option cannot be used with '-all-tasks'. -verbose Show full information. @@ -44,11 +53,12 @@ Restart Specific Options: func (c *AllocRestartCommand) Name() string { return "alloc restart" } func (c *AllocRestartCommand) Run(args []string) int { - var verbose bool + var allTasks, verbose bool var task string flags := c.Meta.FlagSet(c.Name(), FlagSetClient) flags.Usage = func() { c.Ui.Output(c.Help()) } + flags.BoolVar(&allTasks, "all-tasks", false, "") flags.BoolVar(&verbose, "verbose", false, "") flags.StringVar(&task, "task", "", "") @@ -66,6 +76,17 @@ func (c *AllocRestartCommand) Run(args []string) int { allocID := args[0] + // If -task isn't provided fallback to reading the task name + // from args. + if task == "" && len(args) >= 2 { + task = args[1] + } + + if allTasks && task != "" { + c.Ui.Error("The -all-tasks option is not allowed when restarting a specific task.") + return 1 + } + // Truncate the id unless full length is requested length := shortId if verbose { @@ -113,12 +134,6 @@ func (c *AllocRestartCommand) Run(args []string) int { return 1 } - // If -task isn't provided fallback to reading the task name - // from args. - if task == "" && len(args) >= 2 { - task = args[1] - } - if task != "" { err := validateTaskExistsInAllocation(task, alloc) if err != nil { @@ -127,9 +142,17 @@ func (c *AllocRestartCommand) Run(args []string) int { } } - err = client.Allocations().Restart(alloc, task, nil) + if allTasks { + err = client.Allocations().RestartAllTasks(alloc, nil) + } else { + err = client.Allocations().Restart(alloc, task, nil) + } if err != nil { - c.Ui.Error(fmt.Sprintf("Failed to restart allocation:\n\n%s", err.Error())) + target := "allocation" + if task != "" { + target = "task" + } + c.Ui.Error(fmt.Sprintf("Failed to restart %s:\n\n%s", target, err.Error())) return 1 } diff --git a/nomad/mock/mock.go b/nomad/mock/mock.go index 63c7132c5..d5df8d285 100644 --- a/nomad/mock/mock.go +++ b/nomad/mock/mock.go @@ -700,6 +700,50 @@ func LifecycleAlloc() *structs.Allocation { return alloc } +type LifecycleTaskDef struct { + Name string + RunFor string + ExitCode int + Hook string + IsSidecar bool +} + +// LifecycleAllocFromTasks generates an Allocation with mock tasks that have +// the provided lifecycles. +func LifecycleAllocFromTasks(tasks []LifecycleTaskDef) *structs.Allocation { + alloc := LifecycleAlloc() + alloc.Job.TaskGroups[0].Tasks = []*structs.Task{} + for _, task := range tasks { + var lc *structs.TaskLifecycleConfig + if task.Hook != "" { + // TODO: task coordinator doesn't treat nil and empty structs the same + lc = &structs.TaskLifecycleConfig{ + Hook: task.Hook, + Sidecar: task.IsSidecar, + } + } + + alloc.Job.TaskGroups[0].Tasks = append(alloc.Job.TaskGroups[0].Tasks, + &structs.Task{ + Name: task.Name, + Driver: "mock_driver", + Config: map[string]interface{}{ + "run_for": task.RunFor, + "exit_code": task.ExitCode}, + Lifecycle: lc, + LogConfig: structs.DefaultLogConfig(), + Resources: &structs.Resources{CPU: 100, MemoryMB: 256}, + }, + ) + alloc.TaskResources[task.Name] = &structs.Resources{CPU: 100, MemoryMB: 256} + alloc.AllocatedResources.Tasks[task.Name] = &structs.AllocatedTaskResources{ + Cpu: structs.AllocatedCpuResources{CpuShares: 100}, + Memory: structs.AllocatedMemoryResources{MemoryMB: 256}, + } + } + return alloc +} + func LifecycleJobWithPoststopDeploy() *structs.Job { job := &structs.Job{ Region: "global", diff --git a/nomad/structs/structs.go b/nomad/structs/structs.go index 839feece5..aa089d1ab 100644 --- a/nomad/structs/structs.go +++ b/nomad/structs/structs.go @@ -1028,6 +1028,7 @@ type AllocsGetRequest struct { type AllocRestartRequest struct { AllocID string TaskName string + AllTasks bool QueryOptions } @@ -8043,7 +8044,7 @@ const ( // restarted because it has exceeded its restart policy. TaskNotRestarting = "Not Restarting" - // TaskRestartSignal indicates that the task has been signalled to be + // TaskRestartSignal indicates that the task has been signaled to be // restarted TaskRestartSignal = "Restart Signaled" @@ -8324,6 +8325,9 @@ func (e *TaskEvent) PopulateEventDisplayMessage() { } func (e *TaskEvent) GoString() string { + if e == nil { + return "" + } return fmt.Sprintf("%v - %v", e.Time, e.Type) } diff --git a/website/content/api-docs/allocations.mdx b/website/content/api-docs/allocations.mdx index 2afe3322f..128dbf660 100644 --- a/website/content/api-docs/allocations.mdx +++ b/website/content/api-docs/allocations.mdx @@ -731,6 +731,13 @@ The table below shows this endpoint's support for must be the full UUID, not the short 8-character one. This is specified as part of the path. +- `TaskName` `(string: "")` - Specifies the individual task to restart. Cannot + be used with `AllTasks` set to `true`. + +- `AllTasks` `(bool: false)` - If set to `true` all tasks in the allocation + will be restarted, even the ones that already ran. Cannot be set to `true` if + `TaskName` is defined. + ### Sample Payload ```json diff --git a/website/content/docs/commands/alloc/restart.mdx b/website/content/docs/commands/alloc/restart.mdx index a07820567..32f14b6e8 100644 --- a/website/content/docs/commands/alloc/restart.mdx +++ b/website/content/docs/commands/alloc/restart.mdx @@ -18,12 +18,16 @@ nomad alloc restart [options] This command accepts a single allocation ID and a task name. The task name must be part of the allocation and the task must be currently running. The task name -is optional and if omitted every task in the allocation will be restarted. +is optional and if omitted all tasks that are currently running will be +restarted. -Task name may also be specified using the `-task` option rather than a command -argument. If task name is given with both an argument and the `-task` option, +Task name may also be specified using the `-task` option rather than a command +argument. If task name is given with both an argument and the `-task` option, preference is given to the `-task` option. +Use the option `-all-tasks` to restart tasks that have already run, such as +non-sidecar prestart and poststart tasks. + When ACLs are enabled, this command requires a token with the `alloc-lifecycle`, `read-job`, and `list-jobs` capabilities for the allocation's namespace. @@ -34,7 +38,12 @@ allocation's namespace. ## Restart Options -- `-task`: Specify the individual task to restart. +- `-all-tasks`: If set, all tasks in the allocation will be restarted, even the + ones that already ran. This option cannot be used with `-task` or the + `` argument. + +- `-task`: Specify the individual task to restart. This option cannot be used + with `-all-tasks`. - `-verbose`: Display verbose output.