diff --git a/client/allocrunner/taskrunner/task_runner.go b/client/allocrunner/taskrunner/task_runner.go index aff53994d..5ee294ffe 100644 --- a/client/allocrunner/taskrunner/task_runner.go +++ b/client/allocrunner/taskrunner/task_runner.go @@ -35,6 +35,10 @@ import ( ) const ( + // defaultMaxEvents is the default max capacity for task events on the + // task state. Overrideable for testing. + defaultMaxEvents = 10 + // killBackoffBaseline is the baseline time for exponential backoff while // killing a task. killBackoffBaseline = 5 * time.Second @@ -191,6 +195,10 @@ type TaskRunner struct { // be accessed via helpers runLaunched bool runLaunchedLock sync.Mutex + + // maxEvents is the capacity of the TaskEvents on the TaskState. + // Defaults to defaultMaxEvents but overrideable for testing. + maxEvents int } type Config struct { @@ -267,6 +275,7 @@ func NewTaskRunner(config *Config) (*TaskRunner, error) { waitCh: make(chan struct{}), devicemanager: config.DeviceManager, driverManager: config.DriverManager, + maxEvents: defaultMaxEvents, } // Create the logger based on the allocation ID @@ -1023,7 +1032,7 @@ func (tr *TaskRunner) appendEvent(event *structs.TaskEvent) error { } // Append event to slice - appendTaskEvent(tr.state, event) + appendTaskEvent(tr.state, event, tr.maxEvents) return nil } @@ -1189,8 +1198,7 @@ func (tr *TaskRunner) emitStats(ru *cstructs.TaskResourceUsage) { } // appendTaskEvent updates the task status by appending the new event. -func appendTaskEvent(state *structs.TaskState, event *structs.TaskEvent) { - const capacity = 10 +func appendTaskEvent(state *structs.TaskState, event *structs.TaskEvent, capacity int) { if state.Events == nil { state.Events = make([]*structs.TaskEvent, 1, capacity) state.Events[0] = event diff --git a/client/allocrunner/taskrunner/task_runner_test.go b/client/allocrunner/taskrunner/task_runner_test.go index 875afe5fe..bd37845c0 100644 --- a/client/allocrunner/taskrunner/task_runner_test.go +++ b/client/allocrunner/taskrunner/task_runner_test.go @@ -18,6 +18,7 @@ import ( "github.com/hashicorp/nomad/client/pluginmanager/drivermanager" cstate "github.com/hashicorp/nomad/client/state" "github.com/hashicorp/nomad/client/vaultclient" + agentconsul "github.com/hashicorp/nomad/command/agent/consul" mockdriver "github.com/hashicorp/nomad/drivers/mock" "github.com/hashicorp/nomad/helper/testlog" "github.com/hashicorp/nomad/nomad/mock" @@ -126,12 +127,7 @@ func TestTaskRunner_Restore_Running(t *testing.T) { defer origTR.Kill(context.Background(), structs.NewTaskEvent("cleanup")) // Wait for it to be running - testutil.WaitForResult(func() (bool, error) { - ts := origTR.TaskState() - return ts.State == structs.TaskStateRunning, fmt.Errorf("%v", ts.State) - }, func(err error) { - t.Fatalf("expected running; got: %v", err) - }) + testWaitForTaskToStart(t, origTR) // Cause TR to exit without shutting down task origTR.Shutdown() @@ -597,16 +593,186 @@ func TestTaskRunner_Dispatch_Payload(t *testing.T) { require.Equal(t, expected, data) } -// testWaitForTaskToStart waits for the task to or fails the test -func testWaitForTaskToStart(t *testing.T, tr *TaskRunner) { - // Wait for the task to start - testutil.WaitForResult(func() (bool, error) { - tr.stateLock.RLock() - started := !tr.state.StartedAt.IsZero() - tr.stateLock.RUnlock() +// TestTaskRunner_SignalFailure asserts that signal errors are properly +// propagated from the driver to TaskRunner. +func TestTaskRunner_SignalFailure(t *testing.T) { + t.Parallel() - return started, nil + alloc := mock.Alloc() + task := alloc.Job.TaskGroups[0].Tasks[0] + task.Driver = "mock_driver" + errMsg := "test forcing failure" + task.Config = map[string]interface{}{ + "run_for": "10m", + "signal_error": errMsg, + } + + conf, cleanup := testTaskRunnerConfig(t, alloc, task.Name) + defer cleanup() + + tr, err := NewTaskRunner(conf) + require.NoError(t, err) + go tr.Run() + defer tr.Kill(context.Background(), structs.NewTaskEvent("cleanup")) + + testWaitForTaskToStart(t, tr) + + require.EqualError(t, tr.Signal(&structs.TaskEvent{}, "SIGINT"), errMsg) +} + +// TestTaskRunner_RestartTask asserts that restarting a task works and emits a +// Restarting event. +func TestTaskRunner_RestartTask(t *testing.T) { + t.Parallel() + + alloc := mock.Alloc() + task := alloc.Job.TaskGroups[0].Tasks[0] + task.Driver = "mock_driver" + task.Config = map[string]interface{}{ + "run_for": "10m", + } + + conf, cleanup := testTaskRunnerConfig(t, alloc, task.Name) + defer cleanup() + + tr, err := NewTaskRunner(conf) + require.NoError(t, err) + go tr.Run() + defer tr.Kill(context.Background(), structs.NewTaskEvent("cleanup")) + + testWaitForTaskToStart(t, tr) + + // Restart task. Send a RestartSignal event like check watcher. Restart + // handler emits the Restarting event. + event := structs.NewTaskEvent(structs.TaskRestartSignal).SetRestartReason("test") + const fail = false + tr.Restart(context.Background(), event.Copy(), fail) + + // Wait for it to restart and be running again + testutil.WaitForResult(func() (bool, error) { + ts := tr.TaskState() + if ts.Restarts != 1 { + return false, fmt.Errorf("expected 1 restart but found %d\nevents: %s", + ts.Restarts, pretty.Sprint(ts.Events)) + } + if ts.State != structs.TaskStateRunning { + return false, fmt.Errorf("expected running but received %s", ts.State) + } + return true, nil }, func(err error) { - t.Fatalf("not started") + require.NoError(t, err) + }) + + // Assert the expected Restarting event was emitted + found := false + events := tr.TaskState().Events + for _, e := range events { + if e.Type == structs.TaskRestartSignal { + found = true + require.Equal(t, event.Time, e.Time) + require.Equal(t, event.RestartReason, e.RestartReason) + require.Contains(t, e.DisplayMessage, event.RestartReason) + } + } + require.True(t, found, "restarting task event not found", pretty.Sprint(events)) +} + +// TestTaskRunner_CheckWatcher_Restart asserts that when enabled an unhealthy +// Consul check will cause a task to restart following restart policy rules. +func TestTaskRunner_CheckWatcher_Restart(t *testing.T) { + t.Parallel() + + alloc := mock.Alloc() + + // Make the restart policy fail within this test + tg := alloc.Job.TaskGroups[0] + tg.RestartPolicy.Attempts = 2 + tg.RestartPolicy.Interval = 1 * time.Minute + tg.RestartPolicy.Delay = 10 * time.Millisecond + tg.RestartPolicy.Mode = structs.RestartPolicyModeFail + + task := tg.Tasks[0] + task.Driver = "mock_driver" + task.Config = map[string]interface{}{ + "run_for": "10m", + } + + // Make the task register a check that fails + task.Services[0].Checks[0] = &structs.ServiceCheck{ + Name: "test-restarts", + Type: structs.ServiceCheckTCP, + Interval: 50 * time.Millisecond, + CheckRestart: &structs.CheckRestart{ + Limit: 2, + Grace: 100 * time.Millisecond, + }, + } + + conf, cleanup := testTaskRunnerConfig(t, alloc, task.Name) + defer cleanup() + + // Replace mock Consul ServiceClient, with the real ServiceClient + // backed by a mock consul whose checks are always unhealthy. + consulAgent := agentconsul.NewMockAgent() + consulAgent.SetStatus("critical") + consulClient := agentconsul.NewServiceClient(consulAgent, conf.Logger, true) + go consulClient.Run() + defer consulClient.Shutdown() + + conf.Consul = consulClient + + tr, err := NewTaskRunner(conf) + require.NoError(t, err) + + expectedEvents := []string{ + "Received", + "Task Setup", + "Started", + "Restart Signaled", + "Terminated", + "Restarting", + "Started", + "Restart Signaled", + "Terminated", + "Restarting", + "Started", + "Restart Signaled", + "Terminated", + "Not Restarting", + } + + // Bump maxEvents so task events aren't dropped + tr.maxEvents = 100 + + go tr.Run() + defer tr.Kill(context.Background(), structs.NewTaskEvent("cleanup")) + + // Wait until the task exits. Don't simply wait for it to run as it may + // get restarted and terminated before the test is able to observe it + // running. + select { + case <-tr.WaitCh(): + case <-time.After(time.Duration(testutil.TestMultiplier()*15) * time.Second): + require.Fail(t, "timeout") + } + + state := tr.TaskState() + actualEvents := make([]string, len(state.Events)) + for i, e := range state.Events { + actualEvents[i] = string(e.Type) + } + require.Equal(t, actualEvents, expectedEvents) + + require.Equal(t, structs.TaskStateDead, state.State) + require.True(t, state.Failed, pretty.Sprint(state)) +} + +// testWaitForTaskToStart waits for the task to be running or fails the test +func testWaitForTaskToStart(t *testing.T, tr *TaskRunner) { + testutil.WaitForResult(func() (bool, error) { + ts := tr.TaskState() + return ts.State == structs.TaskStateRunning, fmt.Errorf("%v", ts.State) + }, func(err error) { + require.NoError(t, err) }) } diff --git a/drivers/mock/driver.go b/drivers/mock/driver.go index 0039da41e..f80b81514 100644 --- a/drivers/mock/driver.go +++ b/drivers/mock/driver.go @@ -76,7 +76,7 @@ var ( "exit_code": hclspec.NewAttr("exit_code", "number", false), "exit_signal": hclspec.NewAttr("exit_signal", "number", false), "exit_err_msg": hclspec.NewAttr("exit_err_msg", "string", false), - "signal_err": hclspec.NewAttr("signal_err", "string", false), + "signal_error": hclspec.NewAttr("signal_error", "string", false), "driver_ip": hclspec.NewAttr("driver_ip", "string", false), "driver_advertise": hclspec.NewAttr("driver_advertise", "bool", false), "driver_port_map": hclspec.NewAttr("driver_port_map", "string", false), @@ -88,7 +88,7 @@ var ( // capabilities is returned by the Capabilities RPC and indicates what // optional features this driver supports capabilities = &drivers.Capabilities{ - SendSignals: false, + SendSignals: true, Exec: true, FSIsolation: drivers.FSIsolationNone, }