package scheduler import ( "fmt" "testing" "time" "github.com/hashicorp/nomad/nomad/mock" "github.com/hashicorp/nomad/nomad/structs" ) func TestServiceSched_JobRegister(t *testing.T) { h := NewHarness(t) // Create some nodes for i := 0; i < 10; i++ { node := mock.Node() noErr(t, h.State.UpsertNode(h.NextIndex(), node)) } // Create a job job := mock.Job() noErr(t, h.State.UpsertJob(h.NextIndex(), job)) // Create a mock evaluation to register the job eval := &structs.Evaluation{ ID: structs.GenerateUUID(), Priority: job.Priority, TriggeredBy: structs.EvalTriggerJobRegister, JobID: job.ID, } // Process the evaluation err := h.Process(NewServiceScheduler, eval) if err != nil { t.Fatalf("err: %v", err) } // Ensure a single plan if len(h.Plans) != 1 { t.Fatalf("bad: %#v", h.Plans) } plan := h.Plans[0] // Ensure the plan allocated var planned []*structs.Allocation for _, allocList := range plan.NodeAllocation { planned = append(planned, allocList...) } if len(planned) != 10 { t.Fatalf("bad: %#v", plan) } // Lookup the allocations by JobID out, err := h.State.AllocsByJob(job.ID) noErr(t, err) // Ensure all allocations placed if len(out) != 10 { t.Fatalf("bad: %#v", out) } // Ensure different ports were used. used := make(map[int]struct{}) for _, alloc := range out { for _, resource := range alloc.TaskResources { for _, port := range resource.Networks[0].DynamicPorts { if _, ok := used[port.Value]; ok { t.Fatalf("Port collision %v", port.Value) } used[port.Value] = struct{}{} } } } h.AssertEvalStatus(t, structs.EvalStatusComplete) } func TestServiceSched_JobRegister_AllocFail(t *testing.T) { h := NewHarness(t) // Create NO nodes // Create a job job := mock.Job() noErr(t, h.State.UpsertJob(h.NextIndex(), job)) // Create a mock evaluation to register the job eval := &structs.Evaluation{ ID: structs.GenerateUUID(), Priority: job.Priority, TriggeredBy: structs.EvalTriggerJobRegister, JobID: job.ID, } // Process the evaluation err := h.Process(NewServiceScheduler, eval) if err != nil { t.Fatalf("err: %v", err) } // Ensure a single plan if len(h.Plans) != 1 { t.Fatalf("bad: %#v", h.Plans) } plan := h.Plans[0] // Ensure the plan has created a follow up eval. if len(h.CreateEvals) != 1 || h.CreateEvals[0].Status != structs.EvalStatusBlocked { t.Fatalf("bad: %#v", h.CreateEvals) } // Ensure the plan failed to alloc if len(plan.FailedAllocs) != 1 { t.Fatalf("bad: %#v", plan) } // Lookup the allocations by JobID out, err := h.State.AllocsByJob(job.ID) noErr(t, err) // Ensure all allocations placed if len(out) != 1 { t.Fatalf("bad: %#v", out) } // Check the coalesced failures if out[0].Metrics.CoalescedFailures != 9 { t.Fatalf("bad: %#v", out[0].Metrics) } // Check the available nodes if count, ok := out[0].Metrics.NodesAvailable["dc1"]; !ok || count != 0 { t.Fatalf("bad: %#v", out[0].Metrics) } h.AssertEvalStatus(t, structs.EvalStatusComplete) } func TestServiceSched_JobRegister_BlockedEval(t *testing.T) { h := NewHarness(t) // Create a full node node := mock.Node() node.Reserved = node.Resources node.ComputeClass() noErr(t, h.State.UpsertNode(h.NextIndex(), node)) // Create an ineligible node node2 := mock.Node() node2.Attributes["kernel.name"] = "windows" node2.ComputeClass() noErr(t, h.State.UpsertNode(h.NextIndex(), node2)) // Create a jobs job := mock.Job() noErr(t, h.State.UpsertJob(h.NextIndex(), job)) // Create a mock evaluation to register the job eval := &structs.Evaluation{ ID: structs.GenerateUUID(), Priority: job.Priority, TriggeredBy: structs.EvalTriggerJobRegister, JobID: job.ID, } // Process the evaluation err := h.Process(NewServiceScheduler, eval) if err != nil { t.Fatalf("err: %v", err) } // Ensure a single plan if len(h.Plans) != 1 { t.Fatalf("bad: %#v", h.Plans) } plan := h.Plans[0] // Ensure the plan has created a follow up eval. if len(h.CreateEvals) != 1 { t.Fatalf("bad: %#v", h.CreateEvals) } created := h.CreateEvals[0] if created.Status != structs.EvalStatusBlocked { t.Fatalf("bad: %#v", created) } classes := created.ClassEligibility if len(classes) != 2 || !classes[node.ComputedClass] || classes[node2.ComputedClass] { t.Fatalf("bad: %#v", classes) } if created.EscapedComputedClass { t.Fatalf("bad: %#v", created) } // Ensure the plan failed to alloc if len(plan.FailedAllocs) != 1 { t.Fatalf("bad: %#v", plan) } // Lookup the allocations by JobID out, err := h.State.AllocsByJob(job.ID) noErr(t, err) // Ensure all allocations placed if len(out) != 1 { for _, a := range out { t.Logf("%#v", a) } t.Fatalf("bad: %#v", out) } // Check the coalesced failures if out[0].Metrics.CoalescedFailures != 9 { t.Fatalf("bad: %#v", out[0].Metrics) } // Check the available nodes if count, ok := out[0].Metrics.NodesAvailable["dc1"]; !ok || count != 2 { t.Fatalf("bad: %#v", out[0].Metrics) } h.AssertEvalStatus(t, structs.EvalStatusComplete) } func TestServiceSched_JobRegister_FeasibleAndInfeasibleTG(t *testing.T) { h := NewHarness(t) // Create one node node := mock.Node() node.NodeClass = "class_0" noErr(t, node.ComputeClass()) noErr(t, h.State.UpsertNode(h.NextIndex(), node)) // Create a job that constrains on a node class job := mock.Job() job.TaskGroups[0].Count = 2 job.TaskGroups[0].Constraints = append(job.Constraints, &structs.Constraint{ LTarget: "${node.class}", RTarget: "class_0", Operand: "=", }, ) tg2 := job.TaskGroups[0].Copy() tg2.Name = "web2" tg2.Constraints[1].RTarget = "class_1" job.TaskGroups = append(job.TaskGroups, tg2) noErr(t, h.State.UpsertJob(h.NextIndex(), job)) // Create a mock evaluation to register the job eval := &structs.Evaluation{ ID: structs.GenerateUUID(), Priority: job.Priority, TriggeredBy: structs.EvalTriggerJobRegister, JobID: job.ID, } // Process the evaluation err := h.Process(NewServiceScheduler, eval) if err != nil { t.Fatalf("err: %v", err) } // Ensure a single plan if len(h.Plans) != 1 { t.Fatalf("bad: %#v", h.Plans) } plan := h.Plans[0] // Ensure the plan allocated var planned []*structs.Allocation for _, allocList := range plan.NodeAllocation { planned = append(planned, allocList...) } if len(planned) != 2 { t.Fatalf("bad: %#v", plan) } if len(plan.FailedAllocs) != 1 { t.Fatalf("bad: %#v", plan) } // Lookup the allocations by JobID out, err := h.State.AllocsByJob(job.ID) noErr(t, err) // Ensure all allocations placed if len(out) != 3 { t.Fatalf("bad: %#v", out) } h.AssertEvalStatus(t, structs.EvalStatusComplete) } func TestServiceSched_JobModify(t *testing.T) { h := NewHarness(t) // Create some nodes var nodes []*structs.Node for i := 0; i < 10; i++ { node := mock.Node() nodes = append(nodes, node) noErr(t, h.State.UpsertNode(h.NextIndex(), node)) } // Generate a fake job with allocations job := mock.Job() noErr(t, h.State.UpsertJob(h.NextIndex(), job)) var allocs []*structs.Allocation for i := 0; i < 10; i++ { alloc := mock.Alloc() alloc.Job = job alloc.JobID = job.ID alloc.NodeID = nodes[i].ID alloc.Name = fmt.Sprintf("my-job.web[%d]", i) allocs = append(allocs, alloc) } noErr(t, h.State.UpsertAllocs(h.NextIndex(), allocs)) // Add a few terminal status allocations, these should be ignored var terminal []*structs.Allocation for i := 0; i < 5; i++ { alloc := mock.Alloc() alloc.Job = job alloc.JobID = job.ID alloc.NodeID = nodes[i].ID alloc.Name = fmt.Sprintf("my-job.web[%d]", i) alloc.DesiredStatus = structs.AllocDesiredStatusFailed terminal = append(terminal, alloc) } noErr(t, h.State.UpsertAllocs(h.NextIndex(), terminal)) // Update the job job2 := mock.Job() job2.ID = job.ID // Update the task, such that it cannot be done in-place job2.TaskGroups[0].Tasks[0].Config["command"] = "/bin/other" noErr(t, h.State.UpsertJob(h.NextIndex(), job2)) // Create a mock evaluation to deal with drain eval := &structs.Evaluation{ ID: structs.GenerateUUID(), Priority: 50, TriggeredBy: structs.EvalTriggerJobRegister, JobID: job.ID, } // Process the evaluation err := h.Process(NewServiceScheduler, eval) if err != nil { t.Fatalf("err: %v", err) } // Ensure a single plan if len(h.Plans) != 1 { t.Fatalf("bad: %#v", h.Plans) } plan := h.Plans[0] // Ensure the plan evicted all allocs var update []*structs.Allocation for _, updateList := range plan.NodeUpdate { update = append(update, updateList...) } if len(update) != len(allocs) { t.Fatalf("bad: %#v", plan) } // Ensure the plan allocated var planned []*structs.Allocation for _, allocList := range plan.NodeAllocation { planned = append(planned, allocList...) } if len(planned) != 10 { t.Fatalf("bad: %#v", plan) } // Lookup the allocations by JobID out, err := h.State.AllocsByJob(job.ID) noErr(t, err) // Ensure all allocations placed out = structs.FilterTerminalAllocs(out) if len(out) != 10 { t.Fatalf("bad: %#v", out) } h.AssertEvalStatus(t, structs.EvalStatusComplete) } func TestServiceSched_JobModify_Rolling(t *testing.T) { h := NewHarness(t) // Create some nodes var nodes []*structs.Node for i := 0; i < 10; i++ { node := mock.Node() nodes = append(nodes, node) noErr(t, h.State.UpsertNode(h.NextIndex(), node)) } // Generate a fake job with allocations job := mock.Job() noErr(t, h.State.UpsertJob(h.NextIndex(), job)) var allocs []*structs.Allocation for i := 0; i < 10; i++ { alloc := mock.Alloc() alloc.Job = job alloc.JobID = job.ID alloc.NodeID = nodes[i].ID alloc.Name = fmt.Sprintf("my-job.web[%d]", i) allocs = append(allocs, alloc) } noErr(t, h.State.UpsertAllocs(h.NextIndex(), allocs)) // Update the job job2 := mock.Job() job2.ID = job.ID job2.Update = structs.UpdateStrategy{ Stagger: 30 * time.Second, MaxParallel: 5, } // Update the task, such that it cannot be done in-place job2.TaskGroups[0].Tasks[0].Config["command"] = "/bin/other" noErr(t, h.State.UpsertJob(h.NextIndex(), job2)) // Create a mock evaluation to deal with drain eval := &structs.Evaluation{ ID: structs.GenerateUUID(), Priority: 50, TriggeredBy: structs.EvalTriggerJobRegister, JobID: job.ID, } // Process the evaluation err := h.Process(NewServiceScheduler, eval) if err != nil { t.Fatalf("err: %v", err) } // Ensure a single plan if len(h.Plans) != 1 { t.Fatalf("bad: %#v", h.Plans) } plan := h.Plans[0] // Ensure the plan evicted only MaxParallel var update []*structs.Allocation for _, updateList := range plan.NodeUpdate { update = append(update, updateList...) } if len(update) != job2.Update.MaxParallel { t.Fatalf("bad: %#v", plan) } // Ensure the plan allocated var planned []*structs.Allocation for _, allocList := range plan.NodeAllocation { planned = append(planned, allocList...) } if len(planned) != job2.Update.MaxParallel { t.Fatalf("bad: %#v", plan) } h.AssertEvalStatus(t, structs.EvalStatusComplete) // Ensure a follow up eval was created eval = h.Evals[0] if eval.NextEval == "" { t.Fatalf("missing next eval") } // Check for create if len(h.CreateEvals) == 0 { t.Fatalf("missing created eval") } create := h.CreateEvals[0] if eval.NextEval != create.ID { t.Fatalf("ID mismatch") } if create.PreviousEval != eval.ID { t.Fatalf("missing previous eval") } if create.TriggeredBy != structs.EvalTriggerRollingUpdate { t.Fatalf("bad: %#v", create) } } func TestServiceSched_JobModify_InPlace(t *testing.T) { h := NewHarness(t) // Create some nodes var nodes []*structs.Node for i := 0; i < 10; i++ { node := mock.Node() nodes = append(nodes, node) noErr(t, h.State.UpsertNode(h.NextIndex(), node)) } // Generate a fake job with allocations job := mock.Job() noErr(t, h.State.UpsertJob(h.NextIndex(), job)) var allocs []*structs.Allocation for i := 0; i < 10; i++ { alloc := mock.Alloc() alloc.Job = job alloc.JobID = job.ID alloc.NodeID = nodes[i].ID alloc.Name = fmt.Sprintf("my-job.web[%d]", i) allocs = append(allocs, alloc) } noErr(t, h.State.UpsertAllocs(h.NextIndex(), allocs)) // Update the job job2 := mock.Job() job2.ID = job.ID noErr(t, h.State.UpsertJob(h.NextIndex(), job2)) // Create a mock evaluation to deal with drain eval := &structs.Evaluation{ ID: structs.GenerateUUID(), Priority: 50, TriggeredBy: structs.EvalTriggerJobRegister, JobID: job.ID, } // Process the evaluation err := h.Process(NewServiceScheduler, eval) if err != nil { t.Fatalf("err: %v", err) } // Ensure a single plan if len(h.Plans) != 1 { t.Fatalf("bad: %#v", h.Plans) } plan := h.Plans[0] // Ensure the plan did not evict any allocs var update []*structs.Allocation for _, updateList := range plan.NodeUpdate { update = append(update, updateList...) } if len(update) != 0 { t.Fatalf("bad: %#v", plan) } // Ensure the plan updated the existing allocs var planned []*structs.Allocation for _, allocList := range plan.NodeAllocation { planned = append(planned, allocList...) } if len(planned) != 10 { t.Fatalf("bad: %#v", plan) } for _, p := range planned { if p.Job != job2 { t.Fatalf("should update job") } } // Lookup the allocations by JobID out, err := h.State.AllocsByJob(job.ID) noErr(t, err) // Ensure all allocations placed if len(out) != 10 { for _, alloc := range out { t.Logf("%#v", alloc) } t.Fatalf("bad: %#v", out) } h.AssertEvalStatus(t, structs.EvalStatusComplete) // Verify the network did not change rp := structs.Port{"main", 5000} for _, alloc := range out { for _, resources := range alloc.TaskResources { if resources.Networks[0].ReservedPorts[0] != rp { t.Fatalf("bad: %#v", alloc) } } } } func TestServiceSched_JobDeregister(t *testing.T) { h := NewHarness(t) // Generate a fake job with allocations job := mock.Job() var allocs []*structs.Allocation for i := 0; i < 10; i++ { alloc := mock.Alloc() alloc.Job = job alloc.JobID = job.ID allocs = append(allocs, alloc) } noErr(t, h.State.UpsertAllocs(h.NextIndex(), allocs)) // Create a mock evaluation to deregister the job eval := &structs.Evaluation{ ID: structs.GenerateUUID(), Priority: 50, TriggeredBy: structs.EvalTriggerJobDeregister, JobID: job.ID, } // Process the evaluation err := h.Process(NewServiceScheduler, eval) if err != nil { t.Fatalf("err: %v", err) } // Ensure a single plan if len(h.Plans) != 1 { t.Fatalf("bad: %#v", h.Plans) } plan := h.Plans[0] // Ensure the plan evicted all nodes if len(plan.NodeUpdate["12345678-abcd-efab-cdef-123456789abc"]) != len(allocs) { t.Fatalf("bad: %#v", plan) } // Lookup the allocations by JobID out, err := h.State.AllocsByJob(job.ID) noErr(t, err) // Ensure no remaining allocations out = structs.FilterTerminalAllocs(out) if len(out) != 0 { t.Fatalf("bad: %#v", out) } h.AssertEvalStatus(t, structs.EvalStatusComplete) } func TestServiceSched_NodeDrain(t *testing.T) { h := NewHarness(t) // Register a draining node node := mock.Node() node.Drain = true noErr(t, h.State.UpsertNode(h.NextIndex(), node)) // Create some nodes for i := 0; i < 10; i++ { node := mock.Node() noErr(t, h.State.UpsertNode(h.NextIndex(), node)) } // Generate a fake job with allocations and an update policy. job := mock.Job() noErr(t, h.State.UpsertJob(h.NextIndex(), job)) var allocs []*structs.Allocation for i := 0; i < 10; i++ { alloc := mock.Alloc() alloc.Job = job alloc.JobID = job.ID alloc.NodeID = node.ID alloc.Name = fmt.Sprintf("my-job.web[%d]", i) allocs = append(allocs, alloc) } noErr(t, h.State.UpsertAllocs(h.NextIndex(), allocs)) // Create a mock evaluation to deal with drain eval := &structs.Evaluation{ ID: structs.GenerateUUID(), Priority: 50, TriggeredBy: structs.EvalTriggerNodeUpdate, JobID: job.ID, NodeID: node.ID, } // Process the evaluation err := h.Process(NewServiceScheduler, eval) if err != nil { t.Fatalf("err: %v", err) } // Ensure a single plan if len(h.Plans) != 1 { t.Fatalf("bad: %#v", h.Plans) } plan := h.Plans[0] // Ensure the plan evicted all allocs if len(plan.NodeUpdate[node.ID]) != len(allocs) { t.Fatalf("bad: %#v", plan) } // Ensure the plan allocated var planned []*structs.Allocation for _, allocList := range plan.NodeAllocation { planned = append(planned, allocList...) } if len(planned) != 10 { t.Fatalf("bad: %#v", plan) } // Lookup the allocations by JobID out, err := h.State.AllocsByJob(job.ID) noErr(t, err) // Ensure all allocations placed out = structs.FilterTerminalAllocs(out) if len(out) != 10 { t.Fatalf("bad: %#v", out) } h.AssertEvalStatus(t, structs.EvalStatusComplete) } func TestServiceSched_NodeDrain_UpdateStrategy(t *testing.T) { h := NewHarness(t) // Register a draining node node := mock.Node() node.Drain = true noErr(t, h.State.UpsertNode(h.NextIndex(), node)) // Create some nodes for i := 0; i < 10; i++ { node := mock.Node() noErr(t, h.State.UpsertNode(h.NextIndex(), node)) } // Generate a fake job with allocations and an update policy. job := mock.Job() mp := 5 job.Update = structs.UpdateStrategy{ Stagger: time.Second, MaxParallel: mp, } noErr(t, h.State.UpsertJob(h.NextIndex(), job)) var allocs []*structs.Allocation for i := 0; i < 10; i++ { alloc := mock.Alloc() alloc.Job = job alloc.JobID = job.ID alloc.NodeID = node.ID alloc.Name = fmt.Sprintf("my-job.web[%d]", i) allocs = append(allocs, alloc) } noErr(t, h.State.UpsertAllocs(h.NextIndex(), allocs)) // Create a mock evaluation to deal with drain eval := &structs.Evaluation{ ID: structs.GenerateUUID(), Priority: 50, TriggeredBy: structs.EvalTriggerNodeUpdate, JobID: job.ID, NodeID: node.ID, } // Process the evaluation err := h.Process(NewServiceScheduler, eval) if err != nil { t.Fatalf("err: %v", err) } // Ensure a single plan if len(h.Plans) != 1 { t.Fatalf("bad: %#v", h.Plans) } plan := h.Plans[0] // Ensure the plan evicted all allocs if len(plan.NodeUpdate[node.ID]) != mp { t.Fatalf("bad: %#v", plan) } // Ensure the plan allocated var planned []*structs.Allocation for _, allocList := range plan.NodeAllocation { planned = append(planned, allocList...) } if len(planned) != mp { t.Fatalf("bad: %#v", plan) } // Ensure there is a followup eval. if len(h.CreateEvals) != 1 || h.CreateEvals[0].TriggeredBy != structs.EvalTriggerRollingUpdate { t.Fatalf("bad: %#v", h.CreateEvals) } h.AssertEvalStatus(t, structs.EvalStatusComplete) } func TestServiceSched_RetryLimit(t *testing.T) { h := NewHarness(t) h.Planner = &RejectPlan{h} // Create some nodes for i := 0; i < 10; i++ { node := mock.Node() noErr(t, h.State.UpsertNode(h.NextIndex(), node)) } // Create a job job := mock.Job() noErr(t, h.State.UpsertJob(h.NextIndex(), job)) // Create a mock evaluation to register the job eval := &structs.Evaluation{ ID: structs.GenerateUUID(), Priority: job.Priority, TriggeredBy: structs.EvalTriggerJobRegister, JobID: job.ID, } // Process the evaluation err := h.Process(NewServiceScheduler, eval) if err != nil { t.Fatalf("err: %v", err) } // Ensure multiple plans if len(h.Plans) == 0 { t.Fatalf("bad: %#v", h.Plans) } // Lookup the allocations by JobID out, err := h.State.AllocsByJob(job.ID) noErr(t, err) // Ensure no allocations placed if len(out) != 0 { t.Fatalf("bad: %#v", out) } // Should hit the retry limit h.AssertEvalStatus(t, structs.EvalStatusFailed) } func TestBatchSched_Run_DeadAlloc(t *testing.T) { h := NewHarness(t) // Create a node node := mock.Node() noErr(t, h.State.UpsertNode(h.NextIndex(), node)) // Create a job job := mock.Job() job.TaskGroups[0].Count = 1 noErr(t, h.State.UpsertJob(h.NextIndex(), job)) // Create a failed alloc alloc := mock.Alloc() alloc.Job = job alloc.JobID = job.ID alloc.NodeID = node.ID alloc.Name = "my-job.web[0]" alloc.ClientStatus = structs.AllocClientStatusDead noErr(t, h.State.UpsertAllocs(h.NextIndex(), []*structs.Allocation{alloc})) // Create a mock evaluation to register the job eval := &structs.Evaluation{ ID: structs.GenerateUUID(), Priority: job.Priority, TriggeredBy: structs.EvalTriggerJobRegister, JobID: job.ID, } // Process the evaluation err := h.Process(NewBatchScheduler, eval) if err != nil { t.Fatalf("err: %v", err) } // Ensure no plan as it should be a no-op if len(h.Plans) != 0 { t.Fatalf("bad: %#v", h.Plans) } // Lookup the allocations by JobID out, err := h.State.AllocsByJob(job.ID) noErr(t, err) // Ensure no allocations placed if len(out) != 1 { t.Fatalf("bad: %#v", out) } h.AssertEvalStatus(t, structs.EvalStatusComplete) } func TestBatchSched_Run_FailedAlloc(t *testing.T) { h := NewHarness(t) // Create a node node := mock.Node() noErr(t, h.State.UpsertNode(h.NextIndex(), node)) // Create a job job := mock.Job() job.TaskGroups[0].Count = 1 noErr(t, h.State.UpsertJob(h.NextIndex(), job)) // Create a failed alloc alloc := mock.Alloc() alloc.Job = job alloc.JobID = job.ID alloc.NodeID = node.ID alloc.Name = "my-job.web[0]" alloc.ClientStatus = structs.AllocClientStatusFailed noErr(t, h.State.UpsertAllocs(h.NextIndex(), []*structs.Allocation{alloc})) // Create a mock evaluation to register the job eval := &structs.Evaluation{ ID: structs.GenerateUUID(), Priority: job.Priority, TriggeredBy: structs.EvalTriggerJobRegister, JobID: job.ID, } // Process the evaluation err := h.Process(NewBatchScheduler, eval) if err != nil { t.Fatalf("err: %v", err) } // Ensure no plan as it should be a no-op if len(h.Plans) != 1 { t.Fatalf("bad: %#v", h.Plans) } // Lookup the allocations by JobID out, err := h.State.AllocsByJob(job.ID) noErr(t, err) // Ensure a replacement alloc was placed. if len(out) != 2 { t.Fatalf("bad: %#v", out) } h.AssertEvalStatus(t, structs.EvalStatusComplete) }