From 12f3ee46ea9f66dac0cb677b516c8a736e749a35 Mon Sep 17 00:00:00 2001 From: Derek Strickland <1111455+DerekStrickland@users.noreply.github.com> Date: Tue, 7 Jun 2022 11:35:19 -0400 Subject: [PATCH] alloc_runner: stop sidecar tasks last (#13055) alloc_runner: stop sidecar tasks last --- .changelog/13055.txt | 3 + client/allocrunner/alloc_runner.go | 46 +++++-- client/allocrunner/alloc_runner_test.go | 129 ++++++++++++++++++ client/allocrunner/task_hook_coordinator.go | 8 +- .../taskrunner/task_runner_getters.go | 5 + 5 files changed, 175 insertions(+), 16 deletions(-) create mode 100644 .changelog/13055.txt diff --git a/.changelog/13055.txt b/.changelog/13055.txt new file mode 100644 index 000000000..f04454a19 --- /dev/null +++ b/.changelog/13055.txt @@ -0,0 +1,3 @@ +```release-note:bug +lifecycle: fixed a bug where sidecar tasks were not being stopped last +``` diff --git a/client/allocrunner/alloc_runner.go b/client/allocrunner/alloc_runner.go index c5294e223..5ef9e5743 100644 --- a/client/allocrunner/alloc_runner.go +++ b/client/allocrunner/alloc_runner.go @@ -525,21 +525,21 @@ func (ar *allocRunner) handleTaskStateUpdates() { states := make(map[string]*structs.TaskState, trNum) for name, tr := range ar.tasks { - state := tr.TaskState() - states[name] = state + taskState := tr.TaskState() + states[name] = taskState if tr.IsPoststopTask() { continue } // Capture live task runners in case we need to kill them - if state.State != structs.TaskStateDead { + if taskState.State != structs.TaskStateDead { liveRunners = append(liveRunners, tr) continue } // Task is dead, determine if other tasks should be killed - if state.Failed { + if taskState.Failed { // Only set failed event if no event has been // set yet to give dead leaders priority. if killEvent == nil { @@ -626,16 +626,16 @@ func (ar *allocRunner) killTasks() map[string]*structs.TaskState { ar.logger.Warn("error stopping leader task", "error", err, "task_name", name) } - state := tr.TaskState() - states[name] = state + taskState := tr.TaskState() + states[name] = taskState break } - // Kill the rest concurrently + // Kill the rest non-sidecar or poststop tasks concurrently wg := sync.WaitGroup{} for name, tr := range ar.tasks { - // Filter out poststop tasks so they run after all the other tasks are killed - if tr.IsLeader() || tr.IsPoststopTask() { + // Filter out poststop and sidecar tasks so that they stop after all the other tasks are killed + if tr.IsLeader() || tr.IsPoststopTask() || tr.IsSidecarTask() { continue } @@ -649,9 +649,33 @@ func (ar *allocRunner) killTasks() map[string]*structs.TaskState { ar.logger.Warn("error stopping task", "error", err, "task_name", name) } - state := tr.TaskState() + taskState := tr.TaskState() mu.Lock() - states[name] = state + states[name] = taskState + mu.Unlock() + }(name, tr) + } + wg.Wait() + + // Kill the sidecar tasks last. + for name, tr := range ar.tasks { + if !tr.IsSidecarTask() || tr.IsLeader() || tr.IsPoststopTask() { + continue + } + + wg.Add(1) + go func(name string, tr *taskrunner.TaskRunner) { + defer wg.Done() + taskEvent := structs.NewTaskEvent(structs.TaskKilling) + taskEvent.SetKillTimeout(tr.Task().KillTimeout) + err := tr.Kill(context.TODO(), taskEvent) + if err != nil && err != taskrunner.ErrTaskNotRunning { + ar.logger.Warn("error stopping sidecar task", "error", err, "task_name", name) + } + + taskState := tr.TaskState() + mu.Lock() + states[name] = taskState mu.Unlock() }(name, tr) } diff --git a/client/allocrunner/alloc_runner_test.go b/client/allocrunner/alloc_runner_test.go index 38e19da84..b28c87dd4 100644 --- a/client/allocrunner/alloc_runner_test.go +++ b/client/allocrunner/alloc_runner_test.go @@ -1682,3 +1682,132 @@ func TestAllocRunner_Reconnect(t *testing.T) { }) } } + +// TestAllocRunner_Lifecycle_Shutdown_Order asserts that a service job with 3 +// lifecycle hooks (1 sidecar, 1 ephemeral, 1 poststop) starts all 4 tasks, and shuts down +// the sidecar after main, but before poststop. +func TestAllocRunner_Lifecycle_Shutdown_Order(t *testing.T) { + alloc := mock.LifecycleAllocWithPoststopDeploy() + + alloc.Job.Type = structs.JobTypeService + + mainTask := alloc.Job.TaskGroups[0].Tasks[0] + mainTask.Config["run_for"] = "100s" + + sidecarTask := alloc.Job.TaskGroups[0].Tasks[1] + sidecarTask.Lifecycle.Hook = structs.TaskLifecycleHookPoststart + sidecarTask.Config["run_for"] = "100s" + + poststopTask := alloc.Job.TaskGroups[0].Tasks[2] + ephemeralTask := alloc.Job.TaskGroups[0].Tasks[3] + + alloc.Job.TaskGroups[0].Tasks = []*structs.Task{mainTask, ephemeralTask, sidecarTask, poststopTask} + + conf, cleanup := testAllocRunnerConfig(t, alloc) + defer cleanup() + ar, err := NewAllocRunner(conf) + require.NoError(t, err) + defer destroy(ar) + go ar.Run() + + upd := conf.StateUpdater.(*MockStateUpdater) + + // Wait for main and sidecar tasks to be running, and that the + // ephemeral task ran and exited. + testutil.WaitForResult(func() (bool, error) { + last := upd.Last() + if last == nil { + return false, fmt.Errorf("No updates") + } + + if last.ClientStatus != structs.AllocClientStatusRunning { + return false, fmt.Errorf("expected alloc to be running not %s", last.ClientStatus) + } + + if s := last.TaskStates[mainTask.Name].State; s != structs.TaskStateRunning { + return false, fmt.Errorf("expected main task to be running not %s", s) + } + + if s := last.TaskStates[sidecarTask.Name].State; s != structs.TaskStateRunning { + return false, fmt.Errorf("expected sidecar task to be running not %s", s) + } + + if s := last.TaskStates[ephemeralTask.Name].State; s != structs.TaskStateDead { + return false, fmt.Errorf("expected ephemeral task to be dead not %s", s) + } + + if last.TaskStates[ephemeralTask.Name].Failed { + return false, fmt.Errorf("expected ephemeral task to be successful not failed") + } + + return true, nil + }, func(err error) { + t.Fatalf("error waiting for initial state:\n%v", err) + }) + + // Tell the alloc to stop + stopAlloc := alloc.Copy() + stopAlloc.DesiredStatus = structs.AllocDesiredStatusStop + ar.Update(stopAlloc) + + // Wait for tasks to stop. + testutil.WaitForResult(func() (bool, error) { + last := upd.Last() + + if s := last.TaskStates[ephemeralTask.Name].State; s != structs.TaskStateDead { + return false, fmt.Errorf("expected ephemeral task to be dead not %s", s) + } + + if last.TaskStates[ephemeralTask.Name].Failed { + return false, fmt.Errorf("expected ephemeral task to be successful not failed") + } + + if s := last.TaskStates[mainTask.Name].State; s != structs.TaskStateDead { + return false, fmt.Errorf("expected main task to be dead not %s", s) + } + + if last.TaskStates[mainTask.Name].Failed { + return false, fmt.Errorf("expected main task to be successful not failed") + } + + if s := last.TaskStates[sidecarTask.Name].State; s != structs.TaskStateDead { + return false, fmt.Errorf("expected sidecar task to be dead not %s", s) + } + + if last.TaskStates[sidecarTask.Name].Failed { + return false, fmt.Errorf("expected sidecar task to be successful not failed") + } + + if s := last.TaskStates[poststopTask.Name].State; s != structs.TaskStateRunning { + return false, fmt.Errorf("expected poststop task to be running not %s", s) + } + + return true, nil + }, func(err error) { + t.Fatalf("error waiting for kill state:\n%v", err) + }) + + last := upd.Last() + require.Less(t, last.TaskStates[ephemeralTask.Name].FinishedAt, last.TaskStates[mainTask.Name].FinishedAt) + require.Less(t, last.TaskStates[mainTask.Name].FinishedAt, last.TaskStates[sidecarTask.Name].FinishedAt) + + // Wait for poststop task to stop. + testutil.WaitForResult(func() (bool, error) { + last := upd.Last() + + if s := last.TaskStates[poststopTask.Name].State; s != structs.TaskStateDead { + return false, fmt.Errorf("expected poststop task to be dead not %s", s) + } + + if last.TaskStates[poststopTask.Name].Failed { + return false, fmt.Errorf("expected poststop task to be successful not failed") + } + + return true, nil + }, func(err error) { + t.Fatalf("error waiting for poststop state:\n%v", err) + }) + + last = upd.Last() + require.Less(t, last.TaskStates[sidecarTask.Name].FinishedAt, last.TaskStates[poststopTask.Name].FinishedAt) +} diff --git a/client/allocrunner/task_hook_coordinator.go b/client/allocrunner/task_hook_coordinator.go index 09f90d22d..a056fa3eb 100644 --- a/client/allocrunner/task_hook_coordinator.go +++ b/client/allocrunner/task_hook_coordinator.go @@ -179,8 +179,7 @@ func (c *taskHookCoordinator) StartPoststopTasks() { // hasNonSidecarTasks returns false if all the passed tasks are sidecar tasks func hasNonSidecarTasks(tasks []*taskrunner.TaskRunner) bool { for _, tr := range tasks { - lc := tr.Task().Lifecycle - if lc == nil || !lc.Sidecar { + if !tr.IsSidecarTask() { return true } } @@ -188,11 +187,10 @@ func hasNonSidecarTasks(tasks []*taskrunner.TaskRunner) bool { return false } -// hasSidecarTasks returns true if all the passed tasks are sidecar tasks +// hasSidecarTasks returns true if any of the passed tasks are sidecar tasks func hasSidecarTasks(tasks map[string]*taskrunner.TaskRunner) bool { for _, tr := range tasks { - lc := tr.Task().Lifecycle - if lc != nil && lc.Sidecar { + if tr.IsSidecarTask() { return true } } diff --git a/client/allocrunner/taskrunner/task_runner_getters.go b/client/allocrunner/taskrunner/task_runner_getters.go index fcf3189b7..4d9c35e6e 100644 --- a/client/allocrunner/taskrunner/task_runner_getters.go +++ b/client/allocrunner/taskrunner/task_runner_getters.go @@ -33,6 +33,11 @@ func (tr *TaskRunner) IsPoststopTask() bool { return tr.Task().Lifecycle != nil && tr.Task().Lifecycle.Hook == structs.TaskLifecycleHookPoststop } +// IsSidecarTask returns true if this task is a sidecar task in its task group. +func (tr *TaskRunner) IsSidecarTask() bool { + return tr.Task().Lifecycle != nil && tr.Task().Lifecycle.Sidecar +} + func (tr *TaskRunner) Task() *structs.Task { tr.taskLock.RLock() defer tr.taskLock.RUnlock()