From 7f460d27068c8d04d7ea48856ed58963a72f5de6 Mon Sep 17 00:00:00 2001 From: Mahmood Ali Date: Mon, 29 Jun 2020 15:07:48 -0400 Subject: [PATCH] allocrunner: terminate sidecars in the end This fixes a bug where a batch allocation fails to complete if it has sidecars. If the only remaining running tasks in an allocations are sidecars - we must kill them and mark the allocation as complete. --- client/allocrunner/alloc_runner.go | 20 +-- client/allocrunner/alloc_runner_test.go | 123 +++++++++++++++++- client/allocrunner/task_hook_coordinator.go | 25 ++++ .../allocrunner/task_hook_coordinator_test.go | 89 +++++++++++++ nomad/structs/structs.go | 5 + 5 files changed, 253 insertions(+), 9 deletions(-) diff --git a/client/allocrunner/alloc_runner.go b/client/allocrunner/alloc_runner.go index 364f7b884..0972d77dc 100644 --- a/client/allocrunner/alloc_runner.go +++ b/client/allocrunner/alloc_runner.go @@ -443,6 +443,8 @@ func (ar *allocRunner) TaskStateUpdated() { func (ar *allocRunner) handleTaskStateUpdates() { defer close(ar.taskStateUpdateHandlerCh) + hasSidecars := hasSidecarTasks(ar.tasks) + for done := false; !done; { select { case <-ar.taskStateUpdatedCh: @@ -462,10 +464,6 @@ func (ar *allocRunner) handleTaskStateUpdates() { // name whose fault it is. killTask := "" - // True if task runners should be killed because a leader - // failed (informational). - leaderFailed := false - // Task state has been updated; gather the state of the other tasks trNum := len(ar.tasks) liveRunners := make([]*taskrunner.TaskRunner, 0, trNum) @@ -492,18 +490,24 @@ func (ar *allocRunner) handleTaskStateUpdates() { } } else if tr.IsLeader() { killEvent = structs.NewTaskEvent(structs.TaskLeaderDead) - leaderFailed = true - killTask = name } } + // if all live runners are sidecars - kill alloc + if killEvent == nil && hasSidecars && !hasNonSidecarTasks(liveRunners) { + killEvent = structs.NewTaskEvent(structs.TaskMainDead) + } + // If there's a kill event set and live runners, kill them if killEvent != nil && len(liveRunners) > 0 { // Log kill reason - if leaderFailed { + switch killEvent.Type { + case structs.TaskLeaderDead: ar.logger.Debug("leader task dead, destroying all tasks", "leader_task", killTask) - } else { + case structs.TaskMainDead: + ar.logger.Debug("main tasks dead, destroying all sidecar tasks") + default: ar.logger.Debug("task failure, destroying all tasks", "failed_task", killTask) } diff --git a/client/allocrunner/alloc_runner_test.go b/client/allocrunner/alloc_runner_test.go index 3379ba3d3..a9d6dcf34 100644 --- a/client/allocrunner/alloc_runner_test.go +++ b/client/allocrunner/alloc_runner_test.go @@ -110,7 +110,7 @@ func TestAllocRunner_TaskLeader_KillTG(t *testing.T) { found := false killingMsg := "" for _, e := range state1.Events { - if e.Type != structs.TaskLeaderDead { + if e.Type == structs.TaskLeaderDead { found = true } if e.Type == structs.TaskKilling { @@ -142,6 +142,127 @@ func TestAllocRunner_TaskLeader_KillTG(t *testing.T) { }) } +// TestAllocRunner_TaskMain_KillTG asserts that when main tasks die the +// entire task group is killed. +func TestAllocRunner_TaskMain_KillTG(t *testing.T) { + t.Parallel() + + alloc := mock.BatchAlloc() + tr := alloc.AllocatedResources.Tasks[alloc.Job.TaskGroups[0].Tasks[0].Name] + alloc.Job.TaskGroups[0].RestartPolicy.Attempts = 0 + alloc.Job.TaskGroups[0].Tasks[0].RestartPolicy.Attempts = 0 + + // Create three tasks in the task group + sidecar := alloc.Job.TaskGroups[0].Tasks[0].Copy() + sidecar.Name = "sidecar" + sidecar.Driver = "mock_driver" + sidecar.KillTimeout = 10 * time.Millisecond + sidecar.Lifecycle = &structs.TaskLifecycleConfig{ + Hook: structs.TaskLifecycleHookPrestart, + Sidecar: true, + } + + sidecar.Config = map[string]interface{}{ + "run_for": "100s", + } + + main1 := alloc.Job.TaskGroups[0].Tasks[0].Copy() + main1.Name = "task2" + main1.Driver = "mock_driver" + main1.Config = map[string]interface{}{ + "run_for": "1s", + } + + main2 := alloc.Job.TaskGroups[0].Tasks[0].Copy() + main2.Name = "task2" + main2.Driver = "mock_driver" + main2.Config = map[string]interface{}{ + "run_for": "2s", + } + + alloc.Job.TaskGroups[0].Tasks = []*structs.Task{sidecar, main1, main2} + alloc.AllocatedResources.Tasks = map[string]*structs.AllocatedTaskResources{ + sidecar.Name: tr, + main1.Name: tr, + main2.Name: tr, + } + + conf, cleanup := testAllocRunnerConfig(t, alloc) + defer cleanup() + ar, err := NewAllocRunner(conf) + require.NoError(t, err) + defer destroy(ar) + go ar.Run() + + hasTaskMainEvent := func(state *structs.TaskState) bool { + for _, e := range state.Events { + if e.Type == structs.TaskMainDead { + return true + } + } + + return false + } + + // Wait for all tasks to be killed + upd := conf.StateUpdater.(*MockStateUpdater) + testutil.WaitForResult(func() (bool, error) { + last := upd.Last() + if last == nil { + return false, fmt.Errorf("No updates") + } + if last.ClientStatus != structs.AllocClientStatusComplete { + return false, fmt.Errorf("got status %v; want %v", last.ClientStatus, structs.AllocClientStatusComplete) + } + + var state *structs.TaskState + + // Task1 should be killed because Task2 exited + state = last.TaskStates[sidecar.Name] + if state.State != structs.TaskStateDead { + return false, fmt.Errorf("got state %v; want %v", state.State, structs.TaskStateDead) + } + if state.FinishedAt.IsZero() || state.StartedAt.IsZero() { + return false, fmt.Errorf("expected to have a start and finish time") + } + if len(state.Events) < 2 { + // At least have a received and destroyed + return false, fmt.Errorf("Unexpected number of events") + } + + if !hasTaskMainEvent(state) { + return false, fmt.Errorf("Did not find event %v: %#+v", structs.TaskMainDead, state.Events) + } + + // main tasks should die naturely + state = last.TaskStates[main1.Name] + if state.State != structs.TaskStateDead { + return false, fmt.Errorf("got state %v; want %v", state.State, structs.TaskStateDead) + } + if state.FinishedAt.IsZero() || state.StartedAt.IsZero() { + return false, fmt.Errorf("expected to have a start and finish time") + } + if hasTaskMainEvent(state) { + return false, fmt.Errorf("unexpected event %#+v in %v", structs.TaskMainDead, state.Events) + } + + state = last.TaskStates[main2.Name] + if state.State != structs.TaskStateDead { + return false, fmt.Errorf("got state %v; want %v", state.State, structs.TaskStateDead) + } + if state.FinishedAt.IsZero() || state.StartedAt.IsZero() { + return false, fmt.Errorf("expected to have a start and finish time") + } + if hasTaskMainEvent(state) { + return false, fmt.Errorf("unexpected event %v in %#+v", structs.TaskMainDead, state.Events) + } + + return true, nil + }, func(err error) { + t.Fatalf("err: %v", err) + }) +} + func TestAllocRunner_TaskGroup_ShutdownDelay(t *testing.T) { t.Parallel() diff --git a/client/allocrunner/task_hook_coordinator.go b/client/allocrunner/task_hook_coordinator.go index 139757ed0..8f6d1c10b 100644 --- a/client/allocrunner/task_hook_coordinator.go +++ b/client/allocrunner/task_hook_coordinator.go @@ -4,6 +4,7 @@ import ( "context" "github.com/hashicorp/go-hclog" + "github.com/hashicorp/nomad/client/allocrunner/taskrunner" "github.com/hashicorp/nomad/nomad/structs" ) @@ -108,3 +109,27 @@ func (c *taskHookCoordinator) taskStateUpdated(states map[string]*structs.TaskSt c.mainTaskCtxCancel() } } + +// hasNonSidecarTasks returns false if all the passed tasks are sidecar tasks +func hasNonSidecarTasks(tasks []*taskrunner.TaskRunner) bool { + for _, tr := range tasks { + lc := tr.Task().Lifecycle + if lc == nil || !lc.Sidecar { + return true + } + } + + return false +} + +// hasSidecarTasks returns true if all the passed tasks are sidecar tasks +func hasSidecarTasks(tasks map[string]*taskrunner.TaskRunner) bool { + for _, tr := range tasks { + lc := tr.Task().Lifecycle + if lc != nil && lc.Sidecar { + return true + } + } + + return false +} diff --git a/client/allocrunner/task_hook_coordinator_test.go b/client/allocrunner/task_hook_coordinator_test.go index 91802f102..91405fbad 100644 --- a/client/allocrunner/task_hook_coordinator_test.go +++ b/client/allocrunner/task_hook_coordinator_test.go @@ -1,11 +1,13 @@ package allocrunner import ( + "fmt" "testing" "time" "github.com/stretchr/testify/require" + "github.com/hashicorp/nomad/client/allocrunner/taskrunner" "github.com/hashicorp/nomad/nomad/structs" "github.com/hashicorp/nomad/helper/testlog" @@ -230,3 +232,90 @@ func isChannelClosed(ch <-chan struct{}) bool { return false } } + +func TestHasSidecarTasks(t *testing.T) { + + falseV, trueV := false, true + + cases := []struct { + name string + // nil if main task, false if non-sidecar hook, true if sidecar hook + indicators []*bool + + hasSidecars bool + hasNonsidecars bool + }{ + { + name: "all sidecar - one", + indicators: []*bool{&trueV}, + hasSidecars: true, + hasNonsidecars: false, + }, + { + name: "all sidecar - multiple", + indicators: []*bool{&trueV, &trueV, &trueV}, + hasSidecars: true, + hasNonsidecars: false, + }, + { + name: "some sidecars, some others", + indicators: []*bool{nil, &falseV, &trueV}, + hasSidecars: true, + hasNonsidecars: true, + }, + { + name: "no sidecars", + indicators: []*bool{nil, &falseV, nil}, + hasSidecars: false, + hasNonsidecars: true, + }, + } + + for _, c := range cases { + t.Run(c.name, func(t *testing.T) { + alloc := allocWithSidecarIndicators(c.indicators) + arConf, cleanup := testAllocRunnerConfig(t, alloc) + defer cleanup() + + ar, err := NewAllocRunner(arConf) + require.NoError(t, err) + + require.Equal(t, c.hasSidecars, hasSidecarTasks(ar.tasks), "sidecars") + + runners := []*taskrunner.TaskRunner{} + for _, r := range ar.tasks { + runners = append(runners, r) + } + require.Equal(t, c.hasNonsidecars, hasNonSidecarTasks(runners), "non-sidecars") + + }) + } +} + +func allocWithSidecarIndicators(indicators []*bool) *structs.Allocation { + alloc := mock.BatchAlloc() + + tasks := []*structs.Task{} + resources := map[string]*structs.AllocatedTaskResources{} + + tr := alloc.AllocatedResources.Tasks[alloc.Job.TaskGroups[0].Tasks[0].Name] + + for i, indicator := range indicators { + task := alloc.Job.TaskGroups[0].Tasks[0].Copy() + task.Name = fmt.Sprintf("task%d", i) + if indicator != nil { + task.Lifecycle = &structs.TaskLifecycleConfig{ + Hook: structs.TaskLifecycleHookPrestart, + Sidecar: *indicator, + } + } + tasks = append(tasks, task) + resources[task.Name] = tr + } + + alloc.Job.TaskGroups[0].Tasks = tasks + + alloc.AllocatedResources.Tasks = resources + return alloc + +} diff --git a/nomad/structs/structs.go b/nomad/structs/structs.go index b463fc97b..97d3d4d06 100644 --- a/nomad/structs/structs.go +++ b/nomad/structs/structs.go @@ -6996,6 +6996,9 @@ const ( // TaskLeaderDead indicates that the leader task within the has finished. TaskLeaderDead = "Leader Task Dead" + // TaskMainDead indicates that the main tasks have dead + TaskMainDead = "Main Tasks Dead" + // TaskHookFailed indicates that one of the hooks for a task failed. TaskHookFailed = "Task hook failed" @@ -7217,6 +7220,8 @@ func (event *TaskEvent) PopulateEventDisplayMessage() { desc = event.DriverMessage case TaskLeaderDead: desc = "Leader Task in Group dead" + case TaskMainDead: + desc = "Main tasks in the group died" default: desc = event.Message }