diff --git a/e2e/e2e_test.go b/e2e/e2e_test.go index 0b9a3e142..f6588ed17 100644 --- a/e2e/e2e_test.go +++ b/e2e/e2e_test.go @@ -7,6 +7,7 @@ import ( _ "github.com/hashicorp/nomad/e2e/consultemplate" _ "github.com/hashicorp/nomad/e2e/example" _ "github.com/hashicorp/nomad/e2e/spread" + _ "github.com/hashicorp/nomad/e2e/taskevents" ) func TestE2E(t *testing.T) { diff --git a/e2e/taskevents/input/completed_leader.nomad b/e2e/taskevents/input/completed_leader.nomad new file mode 100644 index 000000000..7b3d9a552 --- /dev/null +++ b/e2e/taskevents/input/completed_leader.nomad @@ -0,0 +1,28 @@ +job "completed_leader" { + type = "batch" + datacenters = ["dc1"] + + group "completed_leader" { + restart { + attempts = 0 + } + + # Only the task named the same as the job has its events tested. + task "completed_leader" { + driver = "raw_exec" + config { + command = "sleep" + args = ["1000"] + } + } + + task "leader" { + leader = true + driver = "raw_exec" + config { + command = "sleep" + args = ["1"] + } + } + } +} diff --git a/e2e/taskevents/input/failed_batch.nomad b/e2e/taskevents/input/failed_batch.nomad new file mode 100644 index 000000000..ae5d1f122 --- /dev/null +++ b/e2e/taskevents/input/failed_batch.nomad @@ -0,0 +1,17 @@ +job "failed_batch" { + type = "batch" + datacenters = ["dc1"] + + group "failed_batch" { + restart { + attempts = 0 + } + + task "failed_batch" { + driver = "raw_exec" + config { + command = "SomeInvalidCommand" + } + } + } +} diff --git a/e2e/taskevents/input/simple_batch.nomad b/e2e/taskevents/input/simple_batch.nomad new file mode 100644 index 000000000..333084f57 --- /dev/null +++ b/e2e/taskevents/input/simple_batch.nomad @@ -0,0 +1,12 @@ +job "simple_batch" { + type = "batch" + datacenters = ["dc1"] + + task "simple_batch" { + driver = "raw_exec" + config { + command = "sleep" + args = ["1"] + } + } +} diff --git a/e2e/taskevents/taskevents.go b/e2e/taskevents/taskevents.go new file mode 100644 index 000000000..c59615a0e --- /dev/null +++ b/e2e/taskevents/taskevents.go @@ -0,0 +1,167 @@ +package taskevents + +import ( + "fmt" + "strings" + "time" + + "github.com/hashicorp/nomad/api" + "github.com/hashicorp/nomad/e2e/framework" + "github.com/hashicorp/nomad/testutil" + "github.com/stretchr/testify/require" + + "github.com/hashicorp/nomad/e2e/e2eutil" + "github.com/hashicorp/nomad/helper/uuid" +) + +type TaskEventsTest struct { + framework.TC + jobIds []string +} + +func init() { + framework.AddSuites(&framework.TestSuite{ + Component: "TaskEvents", + CanRunLocal: true, + Cases: []framework.TestCase{ + new(TaskEventsTest), + }, + }) +} + +func (tc *TaskEventsTest) BeforeAll(f *framework.F) { + e2eutil.WaitForLeader(f.T(), tc.Nomad()) + e2eutil.WaitForNodesReady(f.T(), tc.Nomad(), 1) +} + +func (tc *TaskEventsTest) AfterEach(f *framework.F) { + nomadClient := tc.Nomad() + jobs := nomadClient.Jobs() + // Stop all jobs in test + for _, id := range tc.jobIds { + jobs.Deregister(id, true, nil) + } + // Garbage collect + nomadClient.System().GarbageCollect() +} + +func formatEvents(events []*api.TaskEvent) string { + estrs := make([]string, len(events)) + for i, e := range events { + estrs[i] = fmt.Sprintf("%2d %-20s fail=%t msg=> %s", i, e.Type, e.FailsTask, e.DisplayMessage) + } + return strings.Join(estrs, "\n") +} + +// waitUntilEvents submits a job and then waits until the expected number of +// events exist. +// +// The job name is used to load the job file from "input/${job}.nomad", and +// events are only inspected for tasks named the same as the job. +func (tc *TaskEventsTest) waitUntilEvents(f *framework.F, jobName string, numEvents int) *api.TaskState { + t := f.T() + nomadClient := tc.Nomad() + uuid := uuid.Generate() + uniqJobId := jobName + uuid[0:8] + tc.jobIds = append(tc.jobIds, uniqJobId) + + jobFile := fmt.Sprintf("taskevents/input/%s.nomad", jobName) + allocs := e2eutil.RegisterAndWaitForAllocs(f, nomadClient, jobFile, uniqJobId) + + require.Len(t, allocs, 1) + alloc := allocs[0] + qo := &api.QueryOptions{ + WaitTime: time.Second, + } + + // Capture state outside of wait to ease assertions once expected + // number of events have been received. + var taskState *api.TaskState + + testutil.WaitForResultRetries(10, func() (bool, error) { + a, meta, err := nomadClient.Allocations().Info(alloc.ID, qo) + if err != nil { + return false, err + } + + qo.WaitIndex = meta.LastIndex + + // Capture task state + taskState = a.TaskStates[jobName] + if taskState == nil { + return false, fmt.Errorf("task state not found for %s", jobName) + } + + // Assert expected number of task events + if len(taskState.Events) != numEvents { + return false, fmt.Errorf("expected %d task events but found %d\n%s", + numEvents, len(taskState.Events), formatEvents(taskState.Events), + ) + } + + return true, nil + }, func(err error) { + t.Fatalf("task events error: %v", err) + }) + + return taskState +} + +func (tc *TaskEventsTest) TestTaskEvents_SimpleBatch(f *framework.F) { + t := f.T() + taskState := tc.waitUntilEvents(f, "simple_batch", 4) + events := taskState.Events + + // Assert task did not fail + require.Falsef(t, taskState.Failed, "task unexpectedly failed after %d events\n%s", + len(events), formatEvents(events), + ) + + // Assert the expected type of events were emitted in a specific order + // (based on v0.8.6) + require.Equal(t, api.TaskReceived, events[0].Type) + require.Equal(t, api.TaskSetup, events[1].Type) + require.Equal(t, api.TaskStarted, events[2].Type) + require.Equal(t, api.TaskTerminated, events[3].Type) +} + +func (tc *TaskEventsTest) TestTaskEvents_FailedBatch(f *framework.F) { + t := f.T() + taskState := tc.waitUntilEvents(f, "failed_batch", 4) + events := taskState.Events + + // Assert task did fail + require.Truef(t, taskState.Failed, "task unexpectedly succeeded after %d events\n%s", + len(events), formatEvents(events), + ) + + // Assert the expected type of events were emitted in a specific order + // (based on v0.8.6) + require.Equal(t, api.TaskReceived, events[0].Type) + require.Equal(t, api.TaskSetup, events[1].Type) + require.Equal(t, api.TaskDriverFailure, events[2].Type) + require.Equal(t, api.TaskNotRestarting, events[3].Type) + require.True(t, events[3].FailsTask) +} + +// TestTaskEvents_CompletedLeader asserts the proper events are emitted for a +// non-leader task when its leader task completes. +func (tc *TaskEventsTest) TestTaskEvents_CompletedLeader(f *framework.F) { + t := f.T() + taskState := tc.waitUntilEvents(f, "completed_leader", 7) + events := taskState.Events + + // Assert task did not fail + require.Falsef(t, taskState.Failed, "task unexpectedly failed after %d events\n%s", + len(events), formatEvents(events), + ) + + // Assert the expected type of events were emitted in a specific order + require.Equal(t, api.TaskReceived, events[0].Type) + require.Equal(t, api.TaskSetup, events[1].Type) + require.Equal(t, api.TaskStarted, events[2].Type) + require.Equal(t, api.TaskLeaderDead, events[3].Type) + require.Equal(t, api.TaskKilling, events[4].Type) + require.Equal(t, api.TaskTerminated, events[5].Type) + require.Equal(t, api.TaskKilled, events[6].Type) +}