diff --git a/client/allocrunner/alloc_runner_test.go b/client/allocrunner/alloc_runner_test.go index 139786b44..58c1d2bd1 100644 --- a/client/allocrunner/alloc_runner_test.go +++ b/client/allocrunner/alloc_runner_test.go @@ -777,3 +777,168 @@ func TestAllocRunner_HandlesArtifactFailure(t *testing.T) { require.Equal(t, structs.TaskStateDead, state.TaskStates["bad"].State) require.True(t, state.TaskStates["bad"].Failed) } + +// Test that alloc runner kills tasks in task group when another task fails +func TestAllocRunner_TaskFailed_KillTG(t *testing.T) { + alloc := mock.BatchAlloc() + tr := alloc.AllocatedResources.Tasks[alloc.Job.TaskGroups[0].Tasks[0].Name] + alloc.Job.TaskGroups[0].RestartPolicy.Attempts = 0 + + // Create two tasks in the task group + task := alloc.Job.TaskGroups[0].Tasks[0] + task.Name = "task1" + task.Driver = "mock_driver" + task.KillTimeout = 10 * time.Millisecond + task.Config = map[string]interface{}{ + "run_for": "10s", + } + + task2 := alloc.Job.TaskGroups[0].Tasks[0].Copy() + task2.Name = "task 2" + task2.Driver = "mock_driver" + task2.Config = map[string]interface{}{ + "start_error": "fail task please", + } + alloc.Job.TaskGroups[0].Tasks = append(alloc.Job.TaskGroups[0].Tasks, task2) + alloc.AllocatedResources.Tasks[task.Name] = tr + alloc.AllocatedResources.Tasks[task2.Name] = tr + + conf, cleanup := testAllocRunnerConfig(t, alloc) + defer cleanup() + ar, err := NewAllocRunner(conf) + require.NoError(t, err) + defer destroy(ar) + go ar.Run() + upd := conf.StateUpdater.(*MockStateUpdater) + + testutil.WaitForResult(func() (bool, error) { + last := upd.Last() + if last == nil { + return false, fmt.Errorf("No updates") + } + if last.ClientStatus != structs.AllocClientStatusFailed { + return false, fmt.Errorf("got status %v; want %v", last.ClientStatus, structs.AllocClientStatusFailed) + } + + // Task One should be killed + state1 := last.TaskStates[task.Name] + if state1.State != structs.TaskStateDead { + return false, fmt.Errorf("got state %v; want %v", state1.State, structs.TaskStateDead) + } + if len(state1.Events) < 2 { + // At least have a received and destroyed + return false, fmt.Errorf("Unexpected number of events") + } + + found := false + for _, e := range state1.Events { + if e.Type != structs.TaskSiblingFailed { + found = true + } + } + + if !found { + return false, fmt.Errorf("Did not find event %v", structs.TaskSiblingFailed) + } + + // Task Two should be failed + state2 := last.TaskStates[task2.Name] + if state2.State != structs.TaskStateDead { + return false, fmt.Errorf("got state %v; want %v", state2.State, structs.TaskStateDead) + } + if !state2.Failed { + return false, fmt.Errorf("task2 should have failed") + } + + return true, nil + }, func(err error) { + require.Fail(t, "err: %v", err) + }) +} + +// Test that alloc becoming terminal should destroy the alloc runner +func TestAllocRunner_TerminalUpdate_Destroy(t *testing.T) { + t.Parallel() + alloc := mock.BatchAlloc() + tr := alloc.AllocatedResources.Tasks[alloc.Job.TaskGroups[0].Tasks[0].Name] + alloc.Job.TaskGroups[0].RestartPolicy.Attempts = 0 + // Ensure task takes some time + task := alloc.Job.TaskGroups[0].Tasks[0] + task.Driver = "mock_driver" + task.Config["run_for"] = "10s" + alloc.AllocatedResources.Tasks[task.Name] = tr + + conf, cleanup := testAllocRunnerConfig(t, alloc) + defer cleanup() + ar, err := NewAllocRunner(conf) + require.NoError(t, err) + defer destroy(ar) + go ar.Run() + upd := conf.StateUpdater.(*MockStateUpdater) + + testutil.WaitForResult(func() (bool, error) { + last := upd.Last() + if last == nil { + return false, fmt.Errorf("No updates") + } + if last.ClientStatus != structs.AllocClientStatusRunning { + return false, fmt.Errorf("got status %v; want %v", last.ClientStatus, structs.AllocClientStatusRunning) + } + return true, nil + }, func(err error) { + require.Fail(t, "err: %v", err) + }) + + // Update the alloc to be terminal which should cause the alloc runner to + // stop the tasks and wait for a destroy. + update := ar.alloc.Copy() + update.DesiredStatus = structs.AllocDesiredStatusStop + ar.Update(update) + + testutil.WaitForResult(func() (bool, error) { + last := upd.Last() + if last == nil { + return false, fmt.Errorf("No updates") + } + + // Check the status has changed. + if last.ClientStatus != structs.AllocClientStatusComplete { + return false, fmt.Errorf("got client status %v; want %v", last.ClientStatus, structs.AllocClientStatusComplete) + } + + // Check the alloc directory still exists + if _, err := os.Stat(ar.allocDir.AllocDir); err != nil { + return false, fmt.Errorf("alloc dir destroyed: %v", ar.allocDir.AllocDir) + } + + return true, nil + }, func(err error) { + require.Fail(t, "err: %v", err) + }) + + // Send the destroy signal and ensure the AllocRunner cleans up. + ar.Destroy() + + testutil.WaitForResult(func() (bool, error) { + last := upd.Last() + if last == nil { + return false, fmt.Errorf("No updates") + } + + // Check the status has changed. + if last.ClientStatus != structs.AllocClientStatusComplete { + return false, fmt.Errorf("got client status %v; want %v", last.ClientStatus, structs.AllocClientStatusComplete) + } + + // Check the alloc directory was cleaned + if _, err := os.Stat(ar.allocDir.AllocDir); err == nil { + return false, fmt.Errorf("alloc dir still exists: %v", ar.allocDir.AllocDir) + } else if !os.IsNotExist(err) { + return false, fmt.Errorf("stat err: %v", err) + } + + return true, nil + }, func(err error) { + require.Fail(t, "err: %v", err) + }) +}