From 9edff19625918daa80bb932f8b7b15b6bb89e738 Mon Sep 17 00:00:00 2001 From: Michael Schurter Date: Thu, 17 Jan 2019 15:01:49 -0800 Subject: [PATCH 1/3] test: port SignalFailure test from 0.8 Also fix signal error handling in mock_driver. --- .../taskrunner/task_runner_test.go | 48 +++++++++++++------ drivers/mock/driver.go | 4 +- 2 files changed, 35 insertions(+), 17 deletions(-) diff --git a/client/allocrunner/taskrunner/task_runner_test.go b/client/allocrunner/taskrunner/task_runner_test.go index 875afe5fe..5df453d26 100644 --- a/client/allocrunner/taskrunner/task_runner_test.go +++ b/client/allocrunner/taskrunner/task_runner_test.go @@ -126,12 +126,7 @@ func TestTaskRunner_Restore_Running(t *testing.T) { defer origTR.Kill(context.Background(), structs.NewTaskEvent("cleanup")) // Wait for it to be running - testutil.WaitForResult(func() (bool, error) { - ts := origTR.TaskState() - return ts.State == structs.TaskStateRunning, fmt.Errorf("%v", ts.State) - }, func(err error) { - t.Fatalf("expected running; got: %v", err) - }) + testWaitForTaskToStart(t, origTR) // Cause TR to exit without shutting down task origTR.Shutdown() @@ -597,16 +592,39 @@ func TestTaskRunner_Dispatch_Payload(t *testing.T) { require.Equal(t, expected, data) } -// testWaitForTaskToStart waits for the task to or fails the test -func testWaitForTaskToStart(t *testing.T, tr *TaskRunner) { - // Wait for the task to start - testutil.WaitForResult(func() (bool, error) { - tr.stateLock.RLock() - started := !tr.state.StartedAt.IsZero() - tr.stateLock.RUnlock() +func TestTaskRunner_SignalFailure(t *testing.T) { + t.Parallel() - return started, nil + alloc := mock.Alloc() + task := alloc.Job.TaskGroups[0].Tasks[0] + task.Driver = "mock_driver" + errMsg := "test forcing failure" + task.Config = map[string]interface{}{ + "run_for": "10m", + "signal_error": errMsg, + } + + conf, cleanup := testTaskRunnerConfig(t, alloc, task.Name) + defer cleanup() + + tr, err := NewTaskRunner(conf) + require.NoError(t, err) + go tr.Run() + defer tr.Kill(context.Background(), structs.NewTaskEvent("cleanup")) + + testWaitForTaskToStart(t, tr) + + err = tr.Signal(&structs.TaskEvent{}, "SIGINT") + require.NotNil(t, err) + require.Equal(t, errMsg, err.Error()) +} + +// testWaitForTaskToStart waits for the task to be running or fails the test +func testWaitForTaskToStart(t *testing.T, tr *TaskRunner) { + testutil.WaitForResult(func() (bool, error) { + ts := tr.TaskState() + return ts.State == structs.TaskStateRunning, fmt.Errorf("%v", ts.State) }, func(err error) { - t.Fatalf("not started") + require.NoError(t, err) }) } diff --git a/drivers/mock/driver.go b/drivers/mock/driver.go index 0039da41e..f80b81514 100644 --- a/drivers/mock/driver.go +++ b/drivers/mock/driver.go @@ -76,7 +76,7 @@ var ( "exit_code": hclspec.NewAttr("exit_code", "number", false), "exit_signal": hclspec.NewAttr("exit_signal", "number", false), "exit_err_msg": hclspec.NewAttr("exit_err_msg", "string", false), - "signal_err": hclspec.NewAttr("signal_err", "string", false), + "signal_error": hclspec.NewAttr("signal_error", "string", false), "driver_ip": hclspec.NewAttr("driver_ip", "string", false), "driver_advertise": hclspec.NewAttr("driver_advertise", "bool", false), "driver_port_map": hclspec.NewAttr("driver_port_map", "string", false), @@ -88,7 +88,7 @@ var ( // capabilities is returned by the Capabilities RPC and indicates what // optional features this driver supports capabilities = &drivers.Capabilities{ - SendSignals: false, + SendSignals: true, Exec: true, FSIsolation: drivers.FSIsolationNone, } From 1719752a9de540672c97cab1bb90450bc3f42f64 Mon Sep 17 00:00:00 2001 From: Michael Schurter Date: Fri, 18 Jan 2019 07:18:00 -0800 Subject: [PATCH 2/3] test: port RestartTask from 0.8 --- .../taskrunner/task_runner_test.go | 59 +++++++++++++++++++ 1 file changed, 59 insertions(+) diff --git a/client/allocrunner/taskrunner/task_runner_test.go b/client/allocrunner/taskrunner/task_runner_test.go index 5df453d26..4c83d667c 100644 --- a/client/allocrunner/taskrunner/task_runner_test.go +++ b/client/allocrunner/taskrunner/task_runner_test.go @@ -592,6 +592,8 @@ func TestTaskRunner_Dispatch_Payload(t *testing.T) { require.Equal(t, expected, data) } +// TestTaskRunner_SignalFailure asserts that signal errors are properly +// propagated from the driver to TaskRunner. func TestTaskRunner_SignalFailure(t *testing.T) { t.Parallel() @@ -619,6 +621,63 @@ func TestTaskRunner_SignalFailure(t *testing.T) { require.Equal(t, errMsg, err.Error()) } +// TestTaskRunner_RestartTask asserts that restarting a task works and emits a +// Restarting event. +func TestTaskRunner_RestartTask(t *testing.T) { + t.Parallel() + + alloc := mock.Alloc() + task := alloc.Job.TaskGroups[0].Tasks[0] + task.Driver = "mock_driver" + task.Config = map[string]interface{}{ + "run_for": "10m", + } + + conf, cleanup := testTaskRunnerConfig(t, alloc, task.Name) + defer cleanup() + + tr, err := NewTaskRunner(conf) + require.NoError(t, err) + go tr.Run() + defer tr.Kill(context.Background(), structs.NewTaskEvent("cleanup")) + + testWaitForTaskToStart(t, tr) + + // Restart task. Send a RestartSignal event like check watcher. Restart + // handler emits the Restarting event. + event := structs.NewTaskEvent(structs.TaskRestartSignal).SetRestartReason("test") + const fail = false + tr.Restart(context.Background(), event.Copy(), fail) + + // Wait for it to restart and be running again + testutil.WaitForResult(func() (bool, error) { + ts := tr.TaskState() + if ts.Restarts != 1 { + return false, fmt.Errorf("expected 1 restart but found %d\nevents: %s", + ts.Restarts, pretty.Sprint(ts.Events)) + } + if ts.State != structs.TaskStateRunning { + return false, fmt.Errorf("expected running but received %s", ts.State) + } + return true, nil + }, func(err error) { + require.NoError(t, err) + }) + + // Assert the expected Restarting event was emitted + found := false + events := tr.TaskState().Events + for _, e := range events { + if e.Type == structs.TaskRestartSignal { + found = true + require.Equal(t, event.Time, e.Time) + require.Equal(t, event.RestartReason, e.RestartReason) + require.Contains(t, e.DisplayMessage, event.RestartReason) + } + } + require.True(t, found, "restarting task event not found", pretty.Sprint(events)) +} + // testWaitForTaskToStart waits for the task to be running or fails the test func testWaitForTaskToStart(t *testing.T, tr *TaskRunner) { testutil.WaitForResult(func() (bool, error) { From 8ced0adb67a0ef40566692a673cba9c7728fb96d Mon Sep 17 00:00:00 2001 From: Michael Schurter Date: Fri, 18 Jan 2019 08:30:44 -0800 Subject: [PATCH 3/3] test: port TestTaskRunner_CheckWatcher_Restart Added ability to adjust the number of events the TaskRunner keeps as there's no way to observe all events otherwise. Task events differ slightly from 0.8 because 0.9 emits Terminated every time a task exits instead of only when it exits on its own (not due to restart or kill). 0.9 does not emit Killing/Killed for restarts like 0.8 which seems fine as `Restart Signaled/Terminated/Restarting` is more descriptive. Original v0.8 events emitted: ``` expected := []string{ "Received", "Task Setup", "Started", "Restart Signaled", "Killing", "Killed", "Restarting", "Started", "Restart Signaled", "Killing", "Killed", "Restarting", "Started", "Restart Signaled", "Killing", "Killed", "Not Restarting", } ``` --- client/allocrunner/taskrunner/task_runner.go | 14 ++- .../taskrunner/task_runner_test.go | 95 ++++++++++++++++++- 2 files changed, 103 insertions(+), 6 deletions(-) diff --git a/client/allocrunner/taskrunner/task_runner.go b/client/allocrunner/taskrunner/task_runner.go index 5d22a6965..719563ff2 100644 --- a/client/allocrunner/taskrunner/task_runner.go +++ b/client/allocrunner/taskrunner/task_runner.go @@ -35,6 +35,10 @@ import ( ) const ( + // defaultMaxEvents is the default max capacity for task events on the + // task state. Overrideable for testing. + defaultMaxEvents = 10 + // killBackoffBaseline is the baseline time for exponential backoff while // killing a task. killBackoffBaseline = 5 * time.Second @@ -191,6 +195,10 @@ type TaskRunner struct { // be accessed via helpers runLaunched bool runLaunchedLock sync.Mutex + + // maxEvents is the capacity of the TaskEvents on the TaskState. + // Defaults to defaultMaxEvents but overrideable for testing. + maxEvents int } type Config struct { @@ -267,6 +275,7 @@ func NewTaskRunner(config *Config) (*TaskRunner, error) { waitCh: make(chan struct{}), devicemanager: config.DeviceManager, driverManager: config.DriverManager, + maxEvents: defaultMaxEvents, } // Create the logger based on the allocation ID @@ -1023,7 +1032,7 @@ func (tr *TaskRunner) appendEvent(event *structs.TaskEvent) error { } // Append event to slice - appendTaskEvent(tr.state, event) + appendTaskEvent(tr.state, event, tr.maxEvents) return nil } @@ -1189,8 +1198,7 @@ func (tr *TaskRunner) emitStats(ru *cstructs.TaskResourceUsage) { } // appendTaskEvent updates the task status by appending the new event. -func appendTaskEvent(state *structs.TaskState, event *structs.TaskEvent) { - const capacity = 10 +func appendTaskEvent(state *structs.TaskState, event *structs.TaskEvent, capacity int) { if state.Events == nil { state.Events = make([]*structs.TaskEvent, 1, capacity) state.Events[0] = event diff --git a/client/allocrunner/taskrunner/task_runner_test.go b/client/allocrunner/taskrunner/task_runner_test.go index 4c83d667c..bd37845c0 100644 --- a/client/allocrunner/taskrunner/task_runner_test.go +++ b/client/allocrunner/taskrunner/task_runner_test.go @@ -18,6 +18,7 @@ import ( "github.com/hashicorp/nomad/client/pluginmanager/drivermanager" cstate "github.com/hashicorp/nomad/client/state" "github.com/hashicorp/nomad/client/vaultclient" + agentconsul "github.com/hashicorp/nomad/command/agent/consul" mockdriver "github.com/hashicorp/nomad/drivers/mock" "github.com/hashicorp/nomad/helper/testlog" "github.com/hashicorp/nomad/nomad/mock" @@ -616,9 +617,7 @@ func TestTaskRunner_SignalFailure(t *testing.T) { testWaitForTaskToStart(t, tr) - err = tr.Signal(&structs.TaskEvent{}, "SIGINT") - require.NotNil(t, err) - require.Equal(t, errMsg, err.Error()) + require.EqualError(t, tr.Signal(&structs.TaskEvent{}, "SIGINT"), errMsg) } // TestTaskRunner_RestartTask asserts that restarting a task works and emits a @@ -678,6 +677,96 @@ func TestTaskRunner_RestartTask(t *testing.T) { require.True(t, found, "restarting task event not found", pretty.Sprint(events)) } +// TestTaskRunner_CheckWatcher_Restart asserts that when enabled an unhealthy +// Consul check will cause a task to restart following restart policy rules. +func TestTaskRunner_CheckWatcher_Restart(t *testing.T) { + t.Parallel() + + alloc := mock.Alloc() + + // Make the restart policy fail within this test + tg := alloc.Job.TaskGroups[0] + tg.RestartPolicy.Attempts = 2 + tg.RestartPolicy.Interval = 1 * time.Minute + tg.RestartPolicy.Delay = 10 * time.Millisecond + tg.RestartPolicy.Mode = structs.RestartPolicyModeFail + + task := tg.Tasks[0] + task.Driver = "mock_driver" + task.Config = map[string]interface{}{ + "run_for": "10m", + } + + // Make the task register a check that fails + task.Services[0].Checks[0] = &structs.ServiceCheck{ + Name: "test-restarts", + Type: structs.ServiceCheckTCP, + Interval: 50 * time.Millisecond, + CheckRestart: &structs.CheckRestart{ + Limit: 2, + Grace: 100 * time.Millisecond, + }, + } + + conf, cleanup := testTaskRunnerConfig(t, alloc, task.Name) + defer cleanup() + + // Replace mock Consul ServiceClient, with the real ServiceClient + // backed by a mock consul whose checks are always unhealthy. + consulAgent := agentconsul.NewMockAgent() + consulAgent.SetStatus("critical") + consulClient := agentconsul.NewServiceClient(consulAgent, conf.Logger, true) + go consulClient.Run() + defer consulClient.Shutdown() + + conf.Consul = consulClient + + tr, err := NewTaskRunner(conf) + require.NoError(t, err) + + expectedEvents := []string{ + "Received", + "Task Setup", + "Started", + "Restart Signaled", + "Terminated", + "Restarting", + "Started", + "Restart Signaled", + "Terminated", + "Restarting", + "Started", + "Restart Signaled", + "Terminated", + "Not Restarting", + } + + // Bump maxEvents so task events aren't dropped + tr.maxEvents = 100 + + go tr.Run() + defer tr.Kill(context.Background(), structs.NewTaskEvent("cleanup")) + + // Wait until the task exits. Don't simply wait for it to run as it may + // get restarted and terminated before the test is able to observe it + // running. + select { + case <-tr.WaitCh(): + case <-time.After(time.Duration(testutil.TestMultiplier()*15) * time.Second): + require.Fail(t, "timeout") + } + + state := tr.TaskState() + actualEvents := make([]string, len(state.Events)) + for i, e := range state.Events { + actualEvents[i] = string(e.Type) + } + require.Equal(t, actualEvents, expectedEvents) + + require.Equal(t, structs.TaskStateDead, state.State) + require.True(t, state.Failed, pretty.Sprint(state)) +} + // testWaitForTaskToStart waits for the task to be running or fails the test func testWaitForTaskToStart(t *testing.T, tr *TaskRunner) { testutil.WaitForResult(func() (bool, error) {