allocrunner: terminate sidecars in the end

This fixes a bug where a batch allocation fails to complete if it has
sidecars.

If the only remaining running tasks in an allocations are sidecars - we
must kill them and mark the allocation as complete.
This commit is contained in:
Mahmood Ali 2020-06-29 15:07:48 -04:00
parent 672c9ec4fe
commit 7f460d2706
5 changed files with 253 additions and 9 deletions

View file

@ -443,6 +443,8 @@ func (ar *allocRunner) TaskStateUpdated() {
func (ar *allocRunner) handleTaskStateUpdates() { func (ar *allocRunner) handleTaskStateUpdates() {
defer close(ar.taskStateUpdateHandlerCh) defer close(ar.taskStateUpdateHandlerCh)
hasSidecars := hasSidecarTasks(ar.tasks)
for done := false; !done; { for done := false; !done; {
select { select {
case <-ar.taskStateUpdatedCh: case <-ar.taskStateUpdatedCh:
@ -462,10 +464,6 @@ func (ar *allocRunner) handleTaskStateUpdates() {
// name whose fault it is. // name whose fault it is.
killTask := "" killTask := ""
// True if task runners should be killed because a leader
// failed (informational).
leaderFailed := false
// Task state has been updated; gather the state of the other tasks // Task state has been updated; gather the state of the other tasks
trNum := len(ar.tasks) trNum := len(ar.tasks)
liveRunners := make([]*taskrunner.TaskRunner, 0, trNum) liveRunners := make([]*taskrunner.TaskRunner, 0, trNum)
@ -492,18 +490,24 @@ func (ar *allocRunner) handleTaskStateUpdates() {
} }
} else if tr.IsLeader() { } else if tr.IsLeader() {
killEvent = structs.NewTaskEvent(structs.TaskLeaderDead) killEvent = structs.NewTaskEvent(structs.TaskLeaderDead)
leaderFailed = true
killTask = name
} }
} }
// if all live runners are sidecars - kill alloc
if killEvent == nil && hasSidecars && !hasNonSidecarTasks(liveRunners) {
killEvent = structs.NewTaskEvent(structs.TaskMainDead)
}
// If there's a kill event set and live runners, kill them // If there's a kill event set and live runners, kill them
if killEvent != nil && len(liveRunners) > 0 { if killEvent != nil && len(liveRunners) > 0 {
// Log kill reason // Log kill reason
if leaderFailed { switch killEvent.Type {
case structs.TaskLeaderDead:
ar.logger.Debug("leader task dead, destroying all tasks", "leader_task", killTask) ar.logger.Debug("leader task dead, destroying all tasks", "leader_task", killTask)
} else { case structs.TaskMainDead:
ar.logger.Debug("main tasks dead, destroying all sidecar tasks")
default:
ar.logger.Debug("task failure, destroying all tasks", "failed_task", killTask) ar.logger.Debug("task failure, destroying all tasks", "failed_task", killTask)
} }

View file

@ -110,7 +110,7 @@ func TestAllocRunner_TaskLeader_KillTG(t *testing.T) {
found := false found := false
killingMsg := "" killingMsg := ""
for _, e := range state1.Events { for _, e := range state1.Events {
if e.Type != structs.TaskLeaderDead { if e.Type == structs.TaskLeaderDead {
found = true found = true
} }
if e.Type == structs.TaskKilling { if e.Type == structs.TaskKilling {
@ -142,6 +142,127 @@ func TestAllocRunner_TaskLeader_KillTG(t *testing.T) {
}) })
} }
// TestAllocRunner_TaskMain_KillTG asserts that when main tasks die the
// entire task group is killed.
func TestAllocRunner_TaskMain_KillTG(t *testing.T) {
t.Parallel()
alloc := mock.BatchAlloc()
tr := alloc.AllocatedResources.Tasks[alloc.Job.TaskGroups[0].Tasks[0].Name]
alloc.Job.TaskGroups[0].RestartPolicy.Attempts = 0
alloc.Job.TaskGroups[0].Tasks[0].RestartPolicy.Attempts = 0
// Create three tasks in the task group
sidecar := alloc.Job.TaskGroups[0].Tasks[0].Copy()
sidecar.Name = "sidecar"
sidecar.Driver = "mock_driver"
sidecar.KillTimeout = 10 * time.Millisecond
sidecar.Lifecycle = &structs.TaskLifecycleConfig{
Hook: structs.TaskLifecycleHookPrestart,
Sidecar: true,
}
sidecar.Config = map[string]interface{}{
"run_for": "100s",
}
main1 := alloc.Job.TaskGroups[0].Tasks[0].Copy()
main1.Name = "task2"
main1.Driver = "mock_driver"
main1.Config = map[string]interface{}{
"run_for": "1s",
}
main2 := alloc.Job.TaskGroups[0].Tasks[0].Copy()
main2.Name = "task2"
main2.Driver = "mock_driver"
main2.Config = map[string]interface{}{
"run_for": "2s",
}
alloc.Job.TaskGroups[0].Tasks = []*structs.Task{sidecar, main1, main2}
alloc.AllocatedResources.Tasks = map[string]*structs.AllocatedTaskResources{
sidecar.Name: tr,
main1.Name: tr,
main2.Name: tr,
}
conf, cleanup := testAllocRunnerConfig(t, alloc)
defer cleanup()
ar, err := NewAllocRunner(conf)
require.NoError(t, err)
defer destroy(ar)
go ar.Run()
hasTaskMainEvent := func(state *structs.TaskState) bool {
for _, e := range state.Events {
if e.Type == structs.TaskMainDead {
return true
}
}
return false
}
// Wait for all tasks to be killed
upd := conf.StateUpdater.(*MockStateUpdater)
testutil.WaitForResult(func() (bool, error) {
last := upd.Last()
if last == nil {
return false, fmt.Errorf("No updates")
}
if last.ClientStatus != structs.AllocClientStatusComplete {
return false, fmt.Errorf("got status %v; want %v", last.ClientStatus, structs.AllocClientStatusComplete)
}
var state *structs.TaskState
// Task1 should be killed because Task2 exited
state = last.TaskStates[sidecar.Name]
if state.State != structs.TaskStateDead {
return false, fmt.Errorf("got state %v; want %v", state.State, structs.TaskStateDead)
}
if state.FinishedAt.IsZero() || state.StartedAt.IsZero() {
return false, fmt.Errorf("expected to have a start and finish time")
}
if len(state.Events) < 2 {
// At least have a received and destroyed
return false, fmt.Errorf("Unexpected number of events")
}
if !hasTaskMainEvent(state) {
return false, fmt.Errorf("Did not find event %v: %#+v", structs.TaskMainDead, state.Events)
}
// main tasks should die naturely
state = last.TaskStates[main1.Name]
if state.State != structs.TaskStateDead {
return false, fmt.Errorf("got state %v; want %v", state.State, structs.TaskStateDead)
}
if state.FinishedAt.IsZero() || state.StartedAt.IsZero() {
return false, fmt.Errorf("expected to have a start and finish time")
}
if hasTaskMainEvent(state) {
return false, fmt.Errorf("unexpected event %#+v in %v", structs.TaskMainDead, state.Events)
}
state = last.TaskStates[main2.Name]
if state.State != structs.TaskStateDead {
return false, fmt.Errorf("got state %v; want %v", state.State, structs.TaskStateDead)
}
if state.FinishedAt.IsZero() || state.StartedAt.IsZero() {
return false, fmt.Errorf("expected to have a start and finish time")
}
if hasTaskMainEvent(state) {
return false, fmt.Errorf("unexpected event %v in %#+v", structs.TaskMainDead, state.Events)
}
return true, nil
}, func(err error) {
t.Fatalf("err: %v", err)
})
}
func TestAllocRunner_TaskGroup_ShutdownDelay(t *testing.T) { func TestAllocRunner_TaskGroup_ShutdownDelay(t *testing.T) {
t.Parallel() t.Parallel()

View file

@ -4,6 +4,7 @@ import (
"context" "context"
"github.com/hashicorp/go-hclog" "github.com/hashicorp/go-hclog"
"github.com/hashicorp/nomad/client/allocrunner/taskrunner"
"github.com/hashicorp/nomad/nomad/structs" "github.com/hashicorp/nomad/nomad/structs"
) )
@ -108,3 +109,27 @@ func (c *taskHookCoordinator) taskStateUpdated(states map[string]*structs.TaskSt
c.mainTaskCtxCancel() c.mainTaskCtxCancel()
} }
} }
// hasNonSidecarTasks returns false if all the passed tasks are sidecar tasks
func hasNonSidecarTasks(tasks []*taskrunner.TaskRunner) bool {
for _, tr := range tasks {
lc := tr.Task().Lifecycle
if lc == nil || !lc.Sidecar {
return true
}
}
return false
}
// hasSidecarTasks returns true if all the passed tasks are sidecar tasks
func hasSidecarTasks(tasks map[string]*taskrunner.TaskRunner) bool {
for _, tr := range tasks {
lc := tr.Task().Lifecycle
if lc != nil && lc.Sidecar {
return true
}
}
return false
}

View file

@ -1,11 +1,13 @@
package allocrunner package allocrunner
import ( import (
"fmt"
"testing" "testing"
"time" "time"
"github.com/stretchr/testify/require" "github.com/stretchr/testify/require"
"github.com/hashicorp/nomad/client/allocrunner/taskrunner"
"github.com/hashicorp/nomad/nomad/structs" "github.com/hashicorp/nomad/nomad/structs"
"github.com/hashicorp/nomad/helper/testlog" "github.com/hashicorp/nomad/helper/testlog"
@ -230,3 +232,90 @@ func isChannelClosed(ch <-chan struct{}) bool {
return false return false
} }
} }
func TestHasSidecarTasks(t *testing.T) {
falseV, trueV := false, true
cases := []struct {
name string
// nil if main task, false if non-sidecar hook, true if sidecar hook
indicators []*bool
hasSidecars bool
hasNonsidecars bool
}{
{
name: "all sidecar - one",
indicators: []*bool{&trueV},
hasSidecars: true,
hasNonsidecars: false,
},
{
name: "all sidecar - multiple",
indicators: []*bool{&trueV, &trueV, &trueV},
hasSidecars: true,
hasNonsidecars: false,
},
{
name: "some sidecars, some others",
indicators: []*bool{nil, &falseV, &trueV},
hasSidecars: true,
hasNonsidecars: true,
},
{
name: "no sidecars",
indicators: []*bool{nil, &falseV, nil},
hasSidecars: false,
hasNonsidecars: true,
},
}
for _, c := range cases {
t.Run(c.name, func(t *testing.T) {
alloc := allocWithSidecarIndicators(c.indicators)
arConf, cleanup := testAllocRunnerConfig(t, alloc)
defer cleanup()
ar, err := NewAllocRunner(arConf)
require.NoError(t, err)
require.Equal(t, c.hasSidecars, hasSidecarTasks(ar.tasks), "sidecars")
runners := []*taskrunner.TaskRunner{}
for _, r := range ar.tasks {
runners = append(runners, r)
}
require.Equal(t, c.hasNonsidecars, hasNonSidecarTasks(runners), "non-sidecars")
})
}
}
func allocWithSidecarIndicators(indicators []*bool) *structs.Allocation {
alloc := mock.BatchAlloc()
tasks := []*structs.Task{}
resources := map[string]*structs.AllocatedTaskResources{}
tr := alloc.AllocatedResources.Tasks[alloc.Job.TaskGroups[0].Tasks[0].Name]
for i, indicator := range indicators {
task := alloc.Job.TaskGroups[0].Tasks[0].Copy()
task.Name = fmt.Sprintf("task%d", i)
if indicator != nil {
task.Lifecycle = &structs.TaskLifecycleConfig{
Hook: structs.TaskLifecycleHookPrestart,
Sidecar: *indicator,
}
}
tasks = append(tasks, task)
resources[task.Name] = tr
}
alloc.Job.TaskGroups[0].Tasks = tasks
alloc.AllocatedResources.Tasks = resources
return alloc
}

View file

@ -6996,6 +6996,9 @@ const (
// TaskLeaderDead indicates that the leader task within the has finished. // TaskLeaderDead indicates that the leader task within the has finished.
TaskLeaderDead = "Leader Task Dead" TaskLeaderDead = "Leader Task Dead"
// TaskMainDead indicates that the main tasks have dead
TaskMainDead = "Main Tasks Dead"
// TaskHookFailed indicates that one of the hooks for a task failed. // TaskHookFailed indicates that one of the hooks for a task failed.
TaskHookFailed = "Task hook failed" TaskHookFailed = "Task hook failed"
@ -7217,6 +7220,8 @@ func (event *TaskEvent) PopulateEventDisplayMessage() {
desc = event.DriverMessage desc = event.DriverMessage
case TaskLeaderDead: case TaskLeaderDead:
desc = "Leader Task in Group dead" desc = "Leader Task in Group dead"
case TaskMainDead:
desc = "Main tasks in the group died"
default: default:
desc = event.Message desc = event.Message
} }