open-nomad/e2e/taskevents/taskevents.go
2020-01-31 19:06:11 -06:00

197 lines
6 KiB
Go

package taskevents
import (
"fmt"
"strings"
"time"
"github.com/hashicorp/nomad/api"
"github.com/hashicorp/nomad/e2e/framework"
"github.com/hashicorp/nomad/testutil"
"github.com/stretchr/testify/require"
"github.com/hashicorp/nomad/e2e/e2eutil"
"github.com/hashicorp/nomad/helper/uuid"
)
type TaskEventsTest struct {
framework.TC
jobIds []string
}
func init() {
framework.AddSuites(&framework.TestSuite{
Component: "TaskEvents",
CanRunLocal: true,
Cases: []framework.TestCase{
new(TaskEventsTest),
},
})
}
func (tc *TaskEventsTest) BeforeAll(f *framework.F) {
e2eutil.WaitForLeader(f.T(), tc.Nomad())
e2eutil.WaitForNodesReady(f.T(), tc.Nomad(), 1)
}
func (tc *TaskEventsTest) AfterEach(f *framework.F) {
nomadClient := tc.Nomad()
jobs := nomadClient.Jobs()
// Stop all jobs in test
for _, id := range tc.jobIds {
jobs.Deregister(id, true, nil)
}
// Garbage collect
nomadClient.System().GarbageCollect()
}
func formatEvents(events []*api.TaskEvent) string {
estrs := make([]string, len(events))
for i, e := range events {
estrs[i] = fmt.Sprintf("%2d %-20s fail=%t msg=> %s", i, e.Type, e.FailsTask, e.DisplayMessage)
}
return strings.Join(estrs, "\n")
}
// waitUntilEvents submits a job and then waits until the expected number of
// events exist.
//
// The job name is used to load the job file from "input/${job}.nomad", and
// events are only inspected for tasks named the same as the job. That task's
// state is returned as well as the last allocation received.
func (tc *TaskEventsTest) waitUntilEvents(f *framework.F, jobName string, numEvents int) (*api.Allocation, *api.TaskState) {
t := f.T()
nomadClient := tc.Nomad()
uuid := uuid.Generate()
uniqJobId := jobName + uuid[0:8]
tc.jobIds = append(tc.jobIds, uniqJobId)
jobFile := fmt.Sprintf("taskevents/input/%s.nomad", jobName)
allocs := e2eutil.RegisterAndWaitForAllocs(f.T(), nomadClient, jobFile, uniqJobId, "")
require.Len(t, allocs, 1)
allocID := allocs[0].ID
qo := &api.QueryOptions{
WaitTime: time.Second,
}
// Capture state outside of wait to ease assertions once expected
// number of events have been received.
var alloc *api.Allocation
var taskState *api.TaskState
testutil.WaitForResultRetries(10, func() (bool, error) {
a, meta, err := nomadClient.Allocations().Info(allocID, qo)
if err != nil {
return false, err
}
qo.WaitIndex = meta.LastIndex
// Capture alloc and task state
alloc = a
taskState = a.TaskStates[jobName]
if taskState == nil {
return false, fmt.Errorf("task state not found for %s", jobName)
}
// Assert expected number of task events
if len(taskState.Events) != numEvents {
return false, fmt.Errorf("expected %d task events but found %d\n%s",
numEvents, len(taskState.Events), formatEvents(taskState.Events),
)
}
return true, nil
}, func(err error) {
t.Fatalf("task events error: %v", err)
})
return alloc, taskState
}
func (tc *TaskEventsTest) TestTaskEvents_SimpleBatch(f *framework.F) {
t := f.T()
_, taskState := tc.waitUntilEvents(f, "simple_batch", 4)
events := taskState.Events
// Assert task did not fail
require.Falsef(t, taskState.Failed, "task unexpectedly failed after %d events\n%s",
len(events), formatEvents(events),
)
// Assert the expected type of events were emitted in a specific order
// (based on v0.8.6)
require.Equal(t, api.TaskReceived, events[0].Type)
require.Equal(t, api.TaskSetup, events[1].Type)
require.Equal(t, api.TaskStarted, events[2].Type)
require.Equal(t, api.TaskTerminated, events[3].Type)
}
func (tc *TaskEventsTest) TestTaskEvents_FailedBatch(f *framework.F) {
t := f.T()
_, taskState := tc.waitUntilEvents(f, "failed_batch", 4)
events := taskState.Events
// Assert task did fail
require.Truef(t, taskState.Failed, "task unexpectedly succeeded after %d events\n%s",
len(events), formatEvents(events),
)
// Assert the expected type of events were emitted in a specific order
// (based on v0.8.6)
require.Equal(t, api.TaskReceived, events[0].Type)
require.Equal(t, api.TaskSetup, events[1].Type)
require.Equal(t, api.TaskDriverFailure, events[2].Type)
require.Equal(t, api.TaskNotRestarting, events[3].Type)
require.True(t, events[3].FailsTask)
}
// TestTaskEvents_CompletedLeader asserts the proper events are emitted for a
// non-leader task when its leader task completes.
func (tc *TaskEventsTest) TestTaskEvents_CompletedLeader(f *framework.F) {
t := f.T()
_, taskState := tc.waitUntilEvents(f, "completed_leader", 7)
events := taskState.Events
// Assert task did not fail
require.Falsef(t, taskState.Failed, "task unexpectedly failed after %d events\n%s",
len(events), formatEvents(events),
)
// Assert the expected type of events were emitted in a specific order
require.Equal(t, api.TaskReceived, events[0].Type)
require.Equal(t, api.TaskSetup, events[1].Type)
require.Equal(t, api.TaskStarted, events[2].Type)
require.Equal(t, api.TaskLeaderDead, events[3].Type)
require.Equal(t, api.TaskKilling, events[4].Type)
require.Equal(t, api.TaskTerminated, events[5].Type)
require.Equal(t, api.TaskKilled, events[6].Type)
}
// TestTaskEvents_FailedSibling asserts the proper events are emitted for a
// task when another task in its task group fails.
func (tc *TaskEventsTest) TestTaskEvents_FailedSibling(f *framework.F) {
t := f.T()
alloc, taskState := tc.waitUntilEvents(f, "failed_sibling", 7)
events := taskState.Events
// Just because a sibling failed doesn't mean this task fails. It
// should exit cleanly. (same as in v0.8.6)
require.Falsef(t, taskState.Failed, "task unexpectedly failed after %d events\n%s",
len(events), formatEvents(events),
)
// The alloc should be faied
require.Equal(t, "failed", alloc.ClientStatus)
// Assert the expected type of events were emitted in a specific order
require.Equal(t, api.TaskReceived, events[0].Type)
require.Equal(t, api.TaskSetup, events[1].Type)
require.Equal(t, api.TaskStarted, events[2].Type)
require.Equal(t, api.TaskSiblingFailed, events[3].Type)
require.Equal(t, api.TaskKilling, events[4].Type)
require.Equal(t, api.TaskTerminated, events[5].Type)
require.Equal(t, api.TaskKilled, events[6].Type)
}