alloc_runner: stop sidecar tasks last (#13055)

alloc_runner: stop sidecar tasks last
This commit is contained in:
Derek Strickland 2022-06-07 11:35:19 -04:00 committed by GitHub
parent c3c10d8c10
commit 12f3ee46ea
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
5 changed files with 175 additions and 16 deletions

3
.changelog/13055.txt Normal file
View File

@ -0,0 +1,3 @@
```release-note:bug
lifecycle: fixed a bug where sidecar tasks were not being stopped last
```

View File

@ -525,21 +525,21 @@ func (ar *allocRunner) handleTaskStateUpdates() {
states := make(map[string]*structs.TaskState, trNum)
for name, tr := range ar.tasks {
state := tr.TaskState()
states[name] = state
taskState := tr.TaskState()
states[name] = taskState
if tr.IsPoststopTask() {
continue
}
// Capture live task runners in case we need to kill them
if state.State != structs.TaskStateDead {
if taskState.State != structs.TaskStateDead {
liveRunners = append(liveRunners, tr)
continue
}
// Task is dead, determine if other tasks should be killed
if state.Failed {
if taskState.Failed {
// Only set failed event if no event has been
// set yet to give dead leaders priority.
if killEvent == nil {
@ -626,16 +626,16 @@ func (ar *allocRunner) killTasks() map[string]*structs.TaskState {
ar.logger.Warn("error stopping leader task", "error", err, "task_name", name)
}
state := tr.TaskState()
states[name] = state
taskState := tr.TaskState()
states[name] = taskState
break
}
// Kill the rest concurrently
// Kill the rest non-sidecar or poststop tasks concurrently
wg := sync.WaitGroup{}
for name, tr := range ar.tasks {
// Filter out poststop tasks so they run after all the other tasks are killed
if tr.IsLeader() || tr.IsPoststopTask() {
// Filter out poststop and sidecar tasks so that they stop after all the other tasks are killed
if tr.IsLeader() || tr.IsPoststopTask() || tr.IsSidecarTask() {
continue
}
@ -649,9 +649,33 @@ func (ar *allocRunner) killTasks() map[string]*structs.TaskState {
ar.logger.Warn("error stopping task", "error", err, "task_name", name)
}
state := tr.TaskState()
taskState := tr.TaskState()
mu.Lock()
states[name] = state
states[name] = taskState
mu.Unlock()
}(name, tr)
}
wg.Wait()
// Kill the sidecar tasks last.
for name, tr := range ar.tasks {
if !tr.IsSidecarTask() || tr.IsLeader() || tr.IsPoststopTask() {
continue
}
wg.Add(1)
go func(name string, tr *taskrunner.TaskRunner) {
defer wg.Done()
taskEvent := structs.NewTaskEvent(structs.TaskKilling)
taskEvent.SetKillTimeout(tr.Task().KillTimeout)
err := tr.Kill(context.TODO(), taskEvent)
if err != nil && err != taskrunner.ErrTaskNotRunning {
ar.logger.Warn("error stopping sidecar task", "error", err, "task_name", name)
}
taskState := tr.TaskState()
mu.Lock()
states[name] = taskState
mu.Unlock()
}(name, tr)
}

View File

@ -1682,3 +1682,132 @@ func TestAllocRunner_Reconnect(t *testing.T) {
})
}
}
// TestAllocRunner_Lifecycle_Shutdown_Order asserts that a service job with 3
// lifecycle hooks (1 sidecar, 1 ephemeral, 1 poststop) starts all 4 tasks, and shuts down
// the sidecar after main, but before poststop.
func TestAllocRunner_Lifecycle_Shutdown_Order(t *testing.T) {
alloc := mock.LifecycleAllocWithPoststopDeploy()
alloc.Job.Type = structs.JobTypeService
mainTask := alloc.Job.TaskGroups[0].Tasks[0]
mainTask.Config["run_for"] = "100s"
sidecarTask := alloc.Job.TaskGroups[0].Tasks[1]
sidecarTask.Lifecycle.Hook = structs.TaskLifecycleHookPoststart
sidecarTask.Config["run_for"] = "100s"
poststopTask := alloc.Job.TaskGroups[0].Tasks[2]
ephemeralTask := alloc.Job.TaskGroups[0].Tasks[3]
alloc.Job.TaskGroups[0].Tasks = []*structs.Task{mainTask, ephemeralTask, sidecarTask, poststopTask}
conf, cleanup := testAllocRunnerConfig(t, alloc)
defer cleanup()
ar, err := NewAllocRunner(conf)
require.NoError(t, err)
defer destroy(ar)
go ar.Run()
upd := conf.StateUpdater.(*MockStateUpdater)
// Wait for main and sidecar tasks to be running, and that the
// ephemeral task ran and exited.
testutil.WaitForResult(func() (bool, error) {
last := upd.Last()
if last == nil {
return false, fmt.Errorf("No updates")
}
if last.ClientStatus != structs.AllocClientStatusRunning {
return false, fmt.Errorf("expected alloc to be running not %s", last.ClientStatus)
}
if s := last.TaskStates[mainTask.Name].State; s != structs.TaskStateRunning {
return false, fmt.Errorf("expected main task to be running not %s", s)
}
if s := last.TaskStates[sidecarTask.Name].State; s != structs.TaskStateRunning {
return false, fmt.Errorf("expected sidecar task to be running not %s", s)
}
if s := last.TaskStates[ephemeralTask.Name].State; s != structs.TaskStateDead {
return false, fmt.Errorf("expected ephemeral task to be dead not %s", s)
}
if last.TaskStates[ephemeralTask.Name].Failed {
return false, fmt.Errorf("expected ephemeral task to be successful not failed")
}
return true, nil
}, func(err error) {
t.Fatalf("error waiting for initial state:\n%v", err)
})
// Tell the alloc to stop
stopAlloc := alloc.Copy()
stopAlloc.DesiredStatus = structs.AllocDesiredStatusStop
ar.Update(stopAlloc)
// Wait for tasks to stop.
testutil.WaitForResult(func() (bool, error) {
last := upd.Last()
if s := last.TaskStates[ephemeralTask.Name].State; s != structs.TaskStateDead {
return false, fmt.Errorf("expected ephemeral task to be dead not %s", s)
}
if last.TaskStates[ephemeralTask.Name].Failed {
return false, fmt.Errorf("expected ephemeral task to be successful not failed")
}
if s := last.TaskStates[mainTask.Name].State; s != structs.TaskStateDead {
return false, fmt.Errorf("expected main task to be dead not %s", s)
}
if last.TaskStates[mainTask.Name].Failed {
return false, fmt.Errorf("expected main task to be successful not failed")
}
if s := last.TaskStates[sidecarTask.Name].State; s != structs.TaskStateDead {
return false, fmt.Errorf("expected sidecar task to be dead not %s", s)
}
if last.TaskStates[sidecarTask.Name].Failed {
return false, fmt.Errorf("expected sidecar task to be successful not failed")
}
if s := last.TaskStates[poststopTask.Name].State; s != structs.TaskStateRunning {
return false, fmt.Errorf("expected poststop task to be running not %s", s)
}
return true, nil
}, func(err error) {
t.Fatalf("error waiting for kill state:\n%v", err)
})
last := upd.Last()
require.Less(t, last.TaskStates[ephemeralTask.Name].FinishedAt, last.TaskStates[mainTask.Name].FinishedAt)
require.Less(t, last.TaskStates[mainTask.Name].FinishedAt, last.TaskStates[sidecarTask.Name].FinishedAt)
// Wait for poststop task to stop.
testutil.WaitForResult(func() (bool, error) {
last := upd.Last()
if s := last.TaskStates[poststopTask.Name].State; s != structs.TaskStateDead {
return false, fmt.Errorf("expected poststop task to be dead not %s", s)
}
if last.TaskStates[poststopTask.Name].Failed {
return false, fmt.Errorf("expected poststop task to be successful not failed")
}
return true, nil
}, func(err error) {
t.Fatalf("error waiting for poststop state:\n%v", err)
})
last = upd.Last()
require.Less(t, last.TaskStates[sidecarTask.Name].FinishedAt, last.TaskStates[poststopTask.Name].FinishedAt)
}

View File

@ -179,8 +179,7 @@ func (c *taskHookCoordinator) StartPoststopTasks() {
// hasNonSidecarTasks returns false if all the passed tasks are sidecar tasks
func hasNonSidecarTasks(tasks []*taskrunner.TaskRunner) bool {
for _, tr := range tasks {
lc := tr.Task().Lifecycle
if lc == nil || !lc.Sidecar {
if !tr.IsSidecarTask() {
return true
}
}
@ -188,11 +187,10 @@ func hasNonSidecarTasks(tasks []*taskrunner.TaskRunner) bool {
return false
}
// hasSidecarTasks returns true if all the passed tasks are sidecar tasks
// hasSidecarTasks returns true if any of the passed tasks are sidecar tasks
func hasSidecarTasks(tasks map[string]*taskrunner.TaskRunner) bool {
for _, tr := range tasks {
lc := tr.Task().Lifecycle
if lc != nil && lc.Sidecar {
if tr.IsSidecarTask() {
return true
}
}

View File

@ -33,6 +33,11 @@ func (tr *TaskRunner) IsPoststopTask() bool {
return tr.Task().Lifecycle != nil && tr.Task().Lifecycle.Hook == structs.TaskLifecycleHookPoststop
}
// IsSidecarTask returns true if this task is a sidecar task in its task group.
func (tr *TaskRunner) IsSidecarTask() bool {
return tr.Task().Lifecycle != nil && tr.Task().Lifecycle.Sidecar
}
func (tr *TaskRunner) Task() *structs.Task {
tr.taskLock.RLock()
defer tr.taskLock.RUnlock()