open-nomad/nomad/periodic_test.go
Drew Bailey 6c788fdccd
Events/msgtype cleanup (#9117)
* use msgtype in upsert node

adds message type to signature for upsert node, update tests, remove placeholder method

* UpsertAllocs msg type test setup

* use upsertallocs with msg type in signature

update test usage of delete node

delete placeholder msgtype method

* add msgtype to upsert evals signature, update test call sites with test setup msg type

handle snapshot upsert eval outside of FSM and ignore eval event

remove placeholder upsertevalsmsgtype

handle job plan rpc and prevent event creation for plan

msgtype cleanup upsertnodeevents

updatenodedrain msgtype

msg type 0 is a node registration event, so set the default  to the ignore type

* fix named import

* fix signature ordering on upsertnode to match
2020-10-19 09:30:15 -04:00

757 lines
19 KiB
Go

package nomad
import (
"fmt"
"math/rand"
"reflect"
"sort"
"strconv"
"strings"
"sync"
"testing"
"time"
"github.com/hashicorp/nomad/helper/testlog"
"github.com/hashicorp/nomad/nomad/mock"
"github.com/hashicorp/nomad/nomad/structs"
"github.com/hashicorp/nomad/testutil"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)
type MockJobEvalDispatcher struct {
Jobs map[structs.NamespacedID]*structs.Job
lock sync.Mutex
}
func NewMockJobEvalDispatcher() *MockJobEvalDispatcher {
return &MockJobEvalDispatcher{Jobs: make(map[structs.NamespacedID]*structs.Job)}
}
func (m *MockJobEvalDispatcher) DispatchJob(job *structs.Job) (*structs.Evaluation, error) {
m.lock.Lock()
defer m.lock.Unlock()
tuple := structs.NamespacedID{
ID: job.ID,
Namespace: job.Namespace,
}
m.Jobs[tuple] = job
return nil, nil
}
func (m *MockJobEvalDispatcher) RunningChildren(parent *structs.Job) (bool, error) {
m.lock.Lock()
defer m.lock.Unlock()
for _, job := range m.Jobs {
if job.ParentID == parent.ID && job.Namespace == parent.Namespace {
return true, nil
}
}
return false, nil
}
// LaunchTimes returns the launch times of child jobs in sorted order.
func (m *MockJobEvalDispatcher) LaunchTimes(p *PeriodicDispatch, namespace, parentID string) ([]time.Time, error) {
m.lock.Lock()
defer m.lock.Unlock()
var launches []time.Time
for _, job := range m.Jobs {
if job.ParentID != parentID || job.Namespace != namespace {
continue
}
t, err := p.LaunchTime(job.ID)
if err != nil {
return nil, err
}
launches = append(launches, t)
}
sort.Sort(times(launches))
return launches, nil
}
type times []time.Time
func (t times) Len() int { return len(t) }
func (t times) Swap(i, j int) { t[i], t[j] = t[j], t[i] }
func (t times) Less(i, j int) bool { return t[i].Before(t[j]) }
// testPeriodicDispatcher returns an enabled PeriodicDispatcher which uses the
// MockJobEvalDispatcher.
func testPeriodicDispatcher(t *testing.T) (*PeriodicDispatch, *MockJobEvalDispatcher) {
logger := testlog.HCLogger(t)
m := NewMockJobEvalDispatcher()
d := NewPeriodicDispatch(logger, m)
t.Cleanup(func() { d.SetEnabled(false) })
d.SetEnabled(true)
return d, m
}
// testPeriodicJob is a helper that creates a periodic job that launches at the
// passed times.
func testPeriodicJob(times ...time.Time) *structs.Job {
job := mock.PeriodicJob()
job.Periodic.SpecType = structs.PeriodicSpecTest
l := make([]string, len(times))
for i, t := range times {
l[i] = strconv.Itoa(int(t.Round(1 * time.Second).Unix()))
}
job.Periodic.Spec = strings.Join(l, ",")
return job
}
// TestPeriodicDispatch_SetEnabled test that setting enabled twice is a no-op.
// This tests the reported issue: https://github.com/hashicorp/nomad/issues/2829
func TestPeriodicDispatch_SetEnabled(t *testing.T) {
t.Parallel()
p, _ := testPeriodicDispatcher(t)
// SetEnabled has been called once but do it again.
p.SetEnabled(true)
// Now disable and make sure everything is fine.
p.SetEnabled(false)
// Enable and track something
p.SetEnabled(true)
job := mock.PeriodicJob()
if err := p.Add(job); err != nil {
t.Fatalf("Add failed %v", err)
}
tracked := p.Tracked()
if len(tracked) != 1 {
t.Fatalf("Add didn't track the job: %v", tracked)
}
}
func TestPeriodicDispatch_Add_NonPeriodic(t *testing.T) {
t.Parallel()
p, _ := testPeriodicDispatcher(t)
job := mock.Job()
if err := p.Add(job); err != nil {
t.Fatalf("Add of non-periodic job failed: %v; expect no-op", err)
}
tracked := p.Tracked()
if len(tracked) != 0 {
t.Fatalf("Add of non-periodic job should be no-op: %v", tracked)
}
}
func TestPeriodicDispatch_Add_Periodic_Parameterized(t *testing.T) {
t.Parallel()
p, _ := testPeriodicDispatcher(t)
job := mock.PeriodicJob()
job.ParameterizedJob = &structs.ParameterizedJobConfig{}
if err := p.Add(job); err != nil {
t.Fatalf("Add of periodic parameterized job failed: %v", err)
}
tracked := p.Tracked()
if len(tracked) != 0 {
t.Fatalf("Add of periodic parameterized job should be no-op: %v", tracked)
}
}
func TestPeriodicDispatch_Add_Periodic_Stopped(t *testing.T) {
t.Parallel()
p, _ := testPeriodicDispatcher(t)
job := mock.PeriodicJob()
job.Stop = true
if err := p.Add(job); err != nil {
t.Fatalf("Add of stopped periodic job failed: %v", err)
}
tracked := p.Tracked()
if len(tracked) != 0 {
t.Fatalf("Add of periodic parameterized job should be no-op: %v", tracked)
}
}
func TestPeriodicDispatch_Add_UpdateJob(t *testing.T) {
t.Parallel()
p, _ := testPeriodicDispatcher(t)
job := mock.PeriodicJob()
err := p.Add(job)
require.NoError(t, err)
tracked := p.Tracked()
require.Lenf(t, tracked, 1, "did not track the job")
// Update the job and add it again.
job.Periodic.Spec = "foo"
err = p.Add(job)
require.Error(t, err)
require.Contains(t, err.Error(), "failed parsing cron expression")
tracked = p.Tracked()
require.Lenf(t, tracked, 1, "did not update")
require.Equalf(t, job, tracked[0], "add did not properly update")
}
func TestPeriodicDispatch_Add_Remove_Namespaced(t *testing.T) {
assert := assert.New(t)
t.Parallel()
p, _ := testPeriodicDispatcher(t)
job := mock.PeriodicJob()
job2 := mock.PeriodicJob()
job2.Namespace = "test"
assert.Nil(p.Add(job))
assert.Nil(p.Add(job2))
assert.Len(p.Tracked(), 2)
assert.Nil(p.Remove(job2.Namespace, job2.ID))
assert.Len(p.Tracked(), 1)
assert.Equal(p.Tracked()[0], job)
}
func TestPeriodicDispatch_Add_RemoveJob(t *testing.T) {
t.Parallel()
p, _ := testPeriodicDispatcher(t)
job := mock.PeriodicJob()
if err := p.Add(job); err != nil {
t.Fatalf("Add failed %v", err)
}
tracked := p.Tracked()
if len(tracked) != 1 {
t.Fatalf("Add didn't track the job: %v", tracked)
}
// Update the job to be non-periodic and add it again.
job.Periodic = nil
if err := p.Add(job); err != nil {
t.Fatalf("Add failed %v", err)
}
tracked = p.Tracked()
if len(tracked) != 0 {
t.Fatalf("Add didn't remove: %v", tracked)
}
}
func TestPeriodicDispatch_Add_TriggersUpdate(t *testing.T) {
t.Parallel()
p, m := testPeriodicDispatcher(t)
// Create a job that won't be evaluated for a while.
job := testPeriodicJob(time.Now().Add(10 * time.Second))
// Add it.
if err := p.Add(job); err != nil {
t.Fatalf("Add failed %v", err)
}
// Update it to be sooner and re-add.
expected := time.Now().Round(1 * time.Second).Add(1 * time.Second)
job.Periodic.Spec = fmt.Sprintf("%d", expected.Unix())
if err := p.Add(job); err != nil {
t.Fatalf("Add failed %v", err)
}
// Check that nothing is created.
tuple := structs.NamespacedID{
ID: job.ID,
Namespace: job.Namespace,
}
if _, ok := m.Jobs[tuple]; ok {
t.Fatalf("periodic dispatcher created eval at the wrong time")
}
time.Sleep(2 * time.Second)
// Check that job was launched correctly.
times, err := m.LaunchTimes(p, job.Namespace, job.ID)
if err != nil {
t.Fatalf("failed to get launch times for job %q", job.ID)
}
if len(times) != 1 {
t.Fatalf("incorrect number of launch times for job %q", job.ID)
}
if times[0] != expected {
t.Fatalf("periodic dispatcher created eval for time %v; want %v", times[0], expected)
}
}
func TestPeriodicDispatch_Remove_Untracked(t *testing.T) {
t.Parallel()
p, _ := testPeriodicDispatcher(t)
if err := p.Remove("ns", "foo"); err != nil {
t.Fatalf("Remove failed %v; expected a no-op", err)
}
}
func TestPeriodicDispatch_Remove_Tracked(t *testing.T) {
t.Parallel()
p, _ := testPeriodicDispatcher(t)
job := mock.PeriodicJob()
if err := p.Add(job); err != nil {
t.Fatalf("Add failed %v", err)
}
tracked := p.Tracked()
if len(tracked) != 1 {
t.Fatalf("Add didn't track the job: %v", tracked)
}
if err := p.Remove(job.Namespace, job.ID); err != nil {
t.Fatalf("Remove failed %v", err)
}
tracked = p.Tracked()
if len(tracked) != 0 {
t.Fatalf("Remove didn't untrack the job: %v", tracked)
}
}
func TestPeriodicDispatch_Remove_TriggersUpdate(t *testing.T) {
t.Parallel()
p, _ := testPeriodicDispatcher(t)
// Create a job that will be evaluated soon.
job := testPeriodicJob(time.Now().Add(1 * time.Second))
// Add it.
if err := p.Add(job); err != nil {
t.Fatalf("Add failed %v", err)
}
// Remove the job.
if err := p.Remove(job.Namespace, job.ID); err != nil {
t.Fatalf("Remove failed %v", err)
}
time.Sleep(2 * time.Second)
// Check that an eval wasn't created.
d := p.dispatcher.(*MockJobEvalDispatcher)
tuple := structs.NamespacedID{
ID: job.ID,
Namespace: job.Namespace,
}
if _, ok := d.Jobs[tuple]; ok {
t.Fatalf("Remove didn't cancel creation of an eval")
}
}
func TestPeriodicDispatch_ForceRun_Untracked(t *testing.T) {
t.Parallel()
p, _ := testPeriodicDispatcher(t)
if _, err := p.ForceRun("ns", "foo"); err == nil {
t.Fatal("ForceRun of untracked job should fail")
}
}
func TestPeriodicDispatch_ForceRun_Tracked(t *testing.T) {
t.Parallel()
p, m := testPeriodicDispatcher(t)
// Create a job that won't be evaluated for a while.
job := testPeriodicJob(time.Now().Add(10 * time.Second))
// Add it.
if err := p.Add(job); err != nil {
t.Fatalf("Add failed %v", err)
}
// ForceRun the job
if _, err := p.ForceRun(job.Namespace, job.ID); err != nil {
t.Fatalf("ForceRun failed %v", err)
}
// Check that job was launched correctly.
launches, err := m.LaunchTimes(p, job.Namespace, job.ID)
if err != nil {
t.Fatalf("failed to get launch times for job %q: %v", job.ID, err)
}
l := len(launches)
if l != 1 {
t.Fatalf("restorePeriodicDispatcher() created an unexpected"+
" number of evals; got %d; want 1", l)
}
}
func TestPeriodicDispatch_Run_DisallowOverlaps(t *testing.T) {
t.Parallel()
p, m := testPeriodicDispatcher(t)
// Create a job that will trigger two launches but disallows overlapping.
launch1 := time.Now().Round(1 * time.Second).Add(1 * time.Second)
launch2 := time.Now().Round(1 * time.Second).Add(2 * time.Second)
job := testPeriodicJob(launch1, launch2)
job.Periodic.ProhibitOverlap = true
// Add it.
if err := p.Add(job); err != nil {
t.Fatalf("Add failed %v", err)
}
time.Sleep(3 * time.Second)
// Check that only one job was launched.
times, err := m.LaunchTimes(p, job.Namespace, job.ID)
if err != nil {
t.Fatalf("failed to get launch times for job %q", job.ID)
}
if len(times) != 1 {
t.Fatalf("incorrect number of launch times for job %q; got %v", job.ID, times)
}
if times[0] != launch1 {
t.Fatalf("periodic dispatcher created eval for time %v; want %v", times[0], launch1)
}
}
func TestPeriodicDispatch_Run_Multiple(t *testing.T) {
t.Parallel()
p, m := testPeriodicDispatcher(t)
// Create a job that will be launched twice.
launch1 := time.Now().Round(1 * time.Second).Add(1 * time.Second)
launch2 := time.Now().Round(1 * time.Second).Add(2 * time.Second)
job := testPeriodicJob(launch1, launch2)
// Add it.
if err := p.Add(job); err != nil {
t.Fatalf("Add failed %v", err)
}
time.Sleep(3 * time.Second)
// Check that job was launched correctly.
times, err := m.LaunchTimes(p, job.Namespace, job.ID)
if err != nil {
t.Fatalf("failed to get launch times for job %q", job.ID)
}
if len(times) != 2 {
t.Fatalf("incorrect number of launch times for job %q", job.ID)
}
if times[0] != launch1 {
t.Fatalf("periodic dispatcher created eval for time %v; want %v", times[0], launch1)
}
if times[1] != launch2 {
t.Fatalf("periodic dispatcher created eval for time %v; want %v", times[1], launch2)
}
}
func TestPeriodicDispatch_Run_SameTime(t *testing.T) {
t.Parallel()
p, m := testPeriodicDispatcher(t)
// Create two job that will be launched at the same time.
launch := time.Now().Round(1 * time.Second).Add(1 * time.Second)
job := testPeriodicJob(launch)
job2 := testPeriodicJob(launch)
// Add them.
if err := p.Add(job); err != nil {
t.Fatalf("Add failed %v", err)
}
if err := p.Add(job2); err != nil {
t.Fatalf("Add failed %v", err)
}
if l := len(p.Tracked()); l != 2 {
t.Fatalf("got %d tracked; want 2", l)
}
time.Sleep(2 * time.Second)
// Check that the jobs were launched correctly.
for _, job := range []*structs.Job{job, job2} {
times, err := m.LaunchTimes(p, job.Namespace, job.ID)
if err != nil {
t.Fatalf("failed to get launch times for job %q", job.ID)
}
if len(times) != 1 {
t.Fatalf("incorrect number of launch times for job %q; got %d; want 1", job.ID, len(times))
}
if times[0] != launch {
t.Fatalf("periodic dispatcher created eval for time %v; want %v", times[0], launch)
}
}
}
func TestPeriodicDispatch_Run_SameID_Different_Namespace(t *testing.T) {
t.Parallel()
p, m := testPeriodicDispatcher(t)
// Create two job that will be launched at the same time.
launch := time.Now().Round(1 * time.Second).Add(1 * time.Second)
job := testPeriodicJob(launch)
job2 := testPeriodicJob(launch)
job2.ID = job.ID
job2.Namespace = "test"
// Add them.
if err := p.Add(job); err != nil {
t.Fatalf("Add failed %v", err)
}
if err := p.Add(job2); err != nil {
t.Fatalf("Add failed %v", err)
}
if l := len(p.Tracked()); l != 2 {
t.Fatalf("got %d tracked; want 2", l)
}
if l := len(p.Tracked()); l != 2 {
t.Fatalf("got %d tracked; want 2", l)
}
time.Sleep(2 * time.Second)
// Check that the jobs were launched correctly.
for _, job := range []*structs.Job{job, job2} {
times, err := m.LaunchTimes(p, job.Namespace, job.ID)
if err != nil {
t.Fatalf("failed to get launch times for job %q", job.ID)
}
if len(times) != 1 {
t.Fatalf("incorrect number of launch times for job %q; got %d; want 1", job.ID, len(times))
}
if times[0] != launch {
t.Fatalf("periodic dispatcher created eval for time %v; want %v", times[0], launch)
}
}
}
// This test adds and removes a bunch of jobs, some launching at the same time,
// some after each other and some invalid times, and ensures the correct
// behavior.
func TestPeriodicDispatch_Complex(t *testing.T) {
t.Parallel()
p, m := testPeriodicDispatcher(t)
// Create some jobs launching at different times.
now := time.Now().Round(1 * time.Second)
same := now.Add(1 * time.Second)
launch1 := same.Add(1 * time.Second)
launch2 := same.Add(2 * time.Second)
launch3 := same.Add(3 * time.Second)
invalid := now.Add(-200 * time.Second)
// Create two jobs launching at the same time.
job1 := testPeriodicJob(same)
job2 := testPeriodicJob(same)
// Create a job that will never launch.
job3 := testPeriodicJob(invalid)
// Create a job that launches twice.
job4 := testPeriodicJob(launch1, launch3)
// Create a job that launches once.
job5 := testPeriodicJob(launch2)
// Create 3 jobs we will delete.
job6 := testPeriodicJob(same)
job7 := testPeriodicJob(launch1, launch3)
job8 := testPeriodicJob(launch2)
// Create a map of expected eval job ids.
expected := map[string][]time.Time{
job1.ID: {same},
job2.ID: {same},
job3.ID: nil,
job4.ID: {launch1, launch3},
job5.ID: {launch2},
job6.ID: nil,
job7.ID: nil,
job8.ID: nil,
}
// Shuffle the jobs so they can be added randomly
jobs := []*structs.Job{job1, job2, job3, job4, job5, job6, job7, job8}
toDelete := []*structs.Job{job6, job7, job8}
shuffle(jobs)
shuffle(toDelete)
for _, job := range jobs {
if err := p.Add(job); err != nil {
t.Fatalf("Add failed %v", err)
}
}
for _, job := range toDelete {
if err := p.Remove(job.Namespace, job.ID); err != nil {
t.Fatalf("Remove failed %v", err)
}
}
time.Sleep(5 * time.Second)
actual := make(map[string][]time.Time, len(expected))
for _, job := range jobs {
launches, err := m.LaunchTimes(p, job.Namespace, job.ID)
if err != nil {
t.Fatalf("LaunchTimes(%v, %v) failed %v", job.Namespace, job.ID, err)
}
actual[job.ID] = launches
}
if !reflect.DeepEqual(actual, expected) {
t.Fatalf("Unexpected launches; got %#v; want %#v", actual, expected)
}
}
func shuffle(jobs []*structs.Job) {
rand.Seed(time.Now().Unix())
for i := range jobs {
j := rand.Intn(len(jobs))
jobs[i], jobs[j] = jobs[j], jobs[i]
}
}
func TestPeriodicHeap_Order(t *testing.T) {
t.Parallel()
h := NewPeriodicHeap()
j1 := mock.PeriodicJob()
j2 := mock.PeriodicJob()
j3 := mock.PeriodicJob()
lookup := map[*structs.Job]string{
j1: "j1",
j2: "j2",
j3: "j3",
}
h.Push(j1, time.Time{})
h.Push(j2, time.Unix(10, 0))
h.Push(j3, time.Unix(11, 0))
exp := []string{"j2", "j3", "j1"}
var act []string
for i := 0; i < 3; i++ {
pJob := h.Pop()
act = append(act, lookup[pJob.job])
}
if !reflect.DeepEqual(act, exp) {
t.Fatalf("Wrong ordering; got %v; want %v", act, exp)
}
}
// deriveChildJob takes a parent periodic job and returns a job with fields set
// such that it appears spawned from the parent.
func deriveChildJob(parent *structs.Job) *structs.Job {
childjob := mock.Job()
childjob.ParentID = parent.ID
childjob.ID = fmt.Sprintf("%s%s%v", parent.ID, structs.PeriodicLaunchSuffix, time.Now().Unix())
return childjob
}
func TestPeriodicDispatch_RunningChildren_NoEvals(t *testing.T) {
t.Parallel()
s1, cleanupS1 := TestServer(t, nil)
defer cleanupS1()
testutil.WaitForLeader(t, s1.RPC)
// Insert job.
state := s1.fsm.State()
job := mock.PeriodicJob()
if err := state.UpsertJob(structs.MsgTypeTestSetup, 1000, job); err != nil {
t.Fatalf("UpsertJob failed: %v", err)
}
running, err := s1.RunningChildren(job)
if err != nil {
t.Fatalf("RunningChildren failed: %v", err)
}
if running {
t.Fatalf("RunningChildren should return false")
}
}
func TestPeriodicDispatch_RunningChildren_ActiveEvals(t *testing.T) {
t.Parallel()
s1, cleanupS1 := TestServer(t, nil)
defer cleanupS1()
testutil.WaitForLeader(t, s1.RPC)
// Insert periodic job and child.
state := s1.fsm.State()
job := mock.PeriodicJob()
if err := state.UpsertJob(structs.MsgTypeTestSetup, 1000, job); err != nil {
t.Fatalf("UpsertJob failed: %v", err)
}
childjob := deriveChildJob(job)
if err := state.UpsertJob(structs.MsgTypeTestSetup, 1001, childjob); err != nil {
t.Fatalf("UpsertJob failed: %v", err)
}
// Insert non-terminal eval
eval := mock.Eval()
eval.JobID = childjob.ID
eval.Status = structs.EvalStatusPending
if err := state.UpsertEvals(structs.MsgTypeTestSetup, 1002, []*structs.Evaluation{eval}); err != nil {
t.Fatalf("UpsertEvals failed: %v", err)
}
running, err := s1.RunningChildren(job)
if err != nil {
t.Fatalf("RunningChildren failed: %v", err)
}
if !running {
t.Fatalf("RunningChildren should return true")
}
}
func TestPeriodicDispatch_RunningChildren_ActiveAllocs(t *testing.T) {
t.Parallel()
s1, cleanupS1 := TestServer(t, nil)
defer cleanupS1()
testutil.WaitForLeader(t, s1.RPC)
// Insert periodic job and child.
state := s1.fsm.State()
job := mock.PeriodicJob()
if err := state.UpsertJob(structs.MsgTypeTestSetup, 1000, job); err != nil {
t.Fatalf("UpsertJob failed: %v", err)
}
childjob := deriveChildJob(job)
if err := state.UpsertJob(structs.MsgTypeTestSetup, 1001, childjob); err != nil {
t.Fatalf("UpsertJob failed: %v", err)
}
// Insert terminal eval
eval := mock.Eval()
eval.JobID = childjob.ID
eval.Status = structs.EvalStatusPending
if err := state.UpsertEvals(structs.MsgTypeTestSetup, 1002, []*structs.Evaluation{eval}); err != nil {
t.Fatalf("UpsertEvals failed: %v", err)
}
// Insert active alloc
alloc := mock.Alloc()
alloc.JobID = childjob.ID
alloc.EvalID = eval.ID
alloc.DesiredStatus = structs.AllocDesiredStatusRun
if err := state.UpsertAllocs(structs.MsgTypeTestSetup, 1003, []*structs.Allocation{alloc}); err != nil {
t.Fatalf("UpsertAllocs failed: %v", err)
}
running, err := s1.RunningChildren(job)
if err != nil {
t.Fatalf("RunningChildren failed: %v", err)
}
if !running {
t.Fatalf("RunningChildren should return true")
}
}