open-nomad/scheduler/scheduler_system_test.go

Ignoring revisions in .git-blame-ignore-revs. Click here to bypass and see the normal blame view.

3090 lines
93 KiB
Go
Raw Normal View History

// Copyright (c) HashiCorp, Inc.
// SPDX-License-Identifier: MPL-2.0
2015-10-14 23:43:06 +00:00
package scheduler
import (
"fmt"
"reflect"
"sort"
2015-10-14 23:43:06 +00:00
"testing"
"time"
2017-02-08 05:22:48 +00:00
memdb "github.com/hashicorp/go-memdb"
"github.com/hashicorp/nomad/ci"
"github.com/hashicorp/nomad/helper"
"github.com/hashicorp/nomad/helper/pointer"
"github.com/hashicorp/nomad/helper/uuid"
2015-10-14 23:43:06 +00:00
"github.com/hashicorp/nomad/nomad/mock"
"github.com/hashicorp/nomad/nomad/structs"
"github.com/shoenig/test/must"
"github.com/stretchr/testify/require"
2015-10-14 23:43:06 +00:00
)
func TestSystemSched_JobRegister(t *testing.T) {
ci.Parallel(t)
2015-10-14 23:43:06 +00:00
h := NewHarness(t)
// Create some nodes
_ = createNodes(t, h, 10)
2015-10-14 23:43:06 +00:00
// Create a job
job := mock.SystemJob()
require.NoError(t, h.State.UpsertJob(structs.MsgTypeTestSetup, h.NextIndex(), nil, job))
2015-10-14 23:43:06 +00:00
// Create a mock evaluation to deregister the job
eval := &structs.Evaluation{
2017-09-07 23:56:15 +00:00
Namespace: structs.DefaultNamespace,
ID: uuid.Generate(),
2015-10-14 23:43:06 +00:00
Priority: job.Priority,
TriggeredBy: structs.EvalTriggerJobRegister,
JobID: job.ID,
Status: structs.EvalStatusPending,
2015-10-14 23:43:06 +00:00
}
require.NoError(t, h.State.UpsertEvals(structs.MsgTypeTestSetup, h.NextIndex(), []*structs.Evaluation{eval}))
2015-10-14 23:43:06 +00:00
// Process the evaluation
err := h.Process(NewSystemScheduler, eval)
require.NoError(t, err)
2015-10-14 23:43:06 +00:00
// Ensure a single plan
require.Len(t, h.Plans, 1)
2015-10-14 23:43:06 +00:00
plan := h.Plans[0]
// Ensure the plan does not have annotations
require.Nil(t, plan.Annotations, "expected no annotations")
// Ensure the plan allocated
var planned []*structs.Allocation
for _, allocList := range plan.NodeAllocation {
planned = append(planned, allocList...)
}
require.Len(t, planned, 10)
// Lookup the allocations by JobID
2017-02-08 05:22:48 +00:00
ws := memdb.NewWatchSet()
2017-09-07 23:56:15 +00:00
out, err := h.State.AllocsByJob(ws, job.Namespace, job.ID, false)
require.NoError(t, err)
// Ensure all allocations placed
require.Len(t, out, 10)
// Note that all system allocations have the same name derived from Job.Name
allocNames := helper.ConvertSlice(out,
func(alloc *structs.Allocation) string { return alloc.Name })
expectAllocNames := []string{}
for i := 0; i < 10; i++ {
expectAllocNames = append(expectAllocNames, fmt.Sprintf("%s.web[0]", job.Name))
}
must.SliceContainsAll(t, expectAllocNames, allocNames)
// Check the available nodes
count, ok := out[0].Metrics.NodesAvailable["dc1"]
require.True(t, ok)
require.Equal(t, 10, count, "bad metrics %#v:", out[0].Metrics)
// Ensure no allocations are queued
queued := h.Evals[0].QueuedAllocations["web"]
require.Equal(t, 0, queued, "unexpected queued allocations")
h.AssertEvalStatus(t, structs.EvalStatusComplete)
}
2018-03-11 19:01:19 +00:00
func TestSystemSched_JobRegister_StickyAllocs(t *testing.T) {
ci.Parallel(t)
h := NewHarness(t)
// Create some nodes
_ = createNodes(t, h, 10)
// Create a job
job := mock.SystemJob()
job.TaskGroups[0].EphemeralDisk.Sticky = true
require.NoError(t, h.State.UpsertJob(structs.MsgTypeTestSetup, h.NextIndex(), nil, job))
// Create a mock evaluation to register the job
eval := &structs.Evaluation{
2017-09-07 23:56:15 +00:00
Namespace: structs.DefaultNamespace,
ID: uuid.Generate(),
Priority: job.Priority,
TriggeredBy: structs.EvalTriggerJobRegister,
JobID: job.ID,
Status: structs.EvalStatusPending,
}
require.NoError(t, h.State.UpsertEvals(structs.MsgTypeTestSetup, h.NextIndex(), []*structs.Evaluation{eval}))
// Process the evaluation
if err := h.Process(NewSystemScheduler, eval); err != nil {
t.Fatalf("err: %v", err)
}
// Ensure the plan allocated
plan := h.Plans[0]
var planned []*structs.Allocation
for _, allocList := range plan.NodeAllocation {
planned = append(planned, allocList...)
}
if len(planned) != 10 {
t.Fatalf("bad: %#v", plan)
}
// Get an allocation and mark it as failed
alloc := planned[4].Copy()
alloc.ClientStatus = structs.AllocClientStatusFailed
require.NoError(t, h.State.UpdateAllocsFromClient(structs.MsgTypeTestSetup, h.NextIndex(), []*structs.Allocation{alloc}))
// Create a mock evaluation to handle the update
eval = &structs.Evaluation{
2017-09-07 23:56:15 +00:00
Namespace: structs.DefaultNamespace,
ID: uuid.Generate(),
Priority: job.Priority,
TriggeredBy: structs.EvalTriggerNodeUpdate,
JobID: job.ID,
Status: structs.EvalStatusPending,
}
require.NoError(t, h.State.UpsertEvals(structs.MsgTypeTestSetup, h.NextIndex(), []*structs.Evaluation{eval}))
h1 := NewHarnessWithState(t, h.State)
if err := h1.Process(NewSystemScheduler, eval); err != nil {
t.Fatalf("err: %v", err)
}
// Ensure we have created only one new allocation
plan = h1.Plans[0]
var newPlanned []*structs.Allocation
for _, allocList := range plan.NodeAllocation {
newPlanned = append(newPlanned, allocList...)
}
if len(newPlanned) != 1 {
t.Fatalf("bad plan: %#v", plan)
}
// Ensure that the new allocation was placed on the same node as the older
// one
if newPlanned[0].NodeID != alloc.NodeID || newPlanned[0].PreviousAllocation != alloc.ID {
t.Fatalf("expected: %#v, actual: %#v", alloc, newPlanned[0])
}
}
func TestSystemSched_JobRegister_EphemeralDiskConstraint(t *testing.T) {
ci.Parallel(t)
h := NewHarness(t)
// Create a node
node := mock.Node()
require.NoError(t, h.State.UpsertNode(structs.MsgTypeTestSetup, h.NextIndex(), node))
// Create a job
job := mock.SystemJob()
job.TaskGroups[0].EphemeralDisk.SizeMB = 60 * 1024
require.NoError(t, h.State.UpsertJob(structs.MsgTypeTestSetup, h.NextIndex(), nil, job))
// Create another job with a lot of disk resource ask so that it doesn't fit
// the node
job1 := mock.SystemJob()
job1.TaskGroups[0].EphemeralDisk.SizeMB = 60 * 1024
require.NoError(t, h.State.UpsertJob(structs.MsgTypeTestSetup, h.NextIndex(), nil, job1))
// Create a mock evaluation to register the job
eval := &structs.Evaluation{
2017-09-07 23:56:15 +00:00
Namespace: structs.DefaultNamespace,
ID: uuid.Generate(),
Priority: job.Priority,
TriggeredBy: structs.EvalTriggerJobRegister,
JobID: job.ID,
Status: structs.EvalStatusPending,
}
require.NoError(t, h.State.UpsertEvals(structs.MsgTypeTestSetup, h.NextIndex(), []*structs.Evaluation{eval}))
// Process the evaluation
if err := h.Process(NewSystemScheduler, eval); err != nil {
t.Fatalf("err: %v", err)
}
// Lookup the allocations by JobID
2017-02-08 05:22:48 +00:00
ws := memdb.NewWatchSet()
2017-09-07 23:56:15 +00:00
out, err := h.State.AllocsByJob(ws, job.Namespace, job.ID, false)
require.NoError(t, err)
// Ensure all allocations placed
if len(out) != 1 {
t.Fatalf("bad: %#v", out)
}
// Create a new harness to test the scheduling result for the second job
h1 := NewHarnessWithState(t, h.State)
// Create a mock evaluation to register the job
eval1 := &structs.Evaluation{
2017-09-07 23:56:15 +00:00
Namespace: structs.DefaultNamespace,
ID: uuid.Generate(),
Priority: job1.Priority,
TriggeredBy: structs.EvalTriggerJobRegister,
JobID: job1.ID,
Status: structs.EvalStatusPending,
}
require.NoError(t, h.State.UpsertEvals(structs.MsgTypeTestSetup, h.NextIndex(), []*structs.Evaluation{eval1}))
// Process the evaluation
if err := h1.Process(NewSystemScheduler, eval1); err != nil {
t.Fatalf("err: %v", err)
}
2017-09-07 23:56:15 +00:00
out, err = h1.State.AllocsByJob(ws, job.Namespace, job1.ID, false)
require.NoError(t, err)
if len(out) != 0 {
t.Fatalf("bad: %#v", out)
}
}
func TestSystemSched_ExhaustResources(t *testing.T) {
ci.Parallel(t)
h := NewHarness(t)
// Create a node
node := mock.Node()
require.NoError(t, h.State.UpsertNode(structs.MsgTypeTestSetup, h.NextIndex(), node))
// Enable Preemption
h.State.SchedulerSetConfig(h.NextIndex(), &structs.SchedulerConfiguration{
PreemptionConfig: structs.PreemptionConfig{
SystemSchedulerEnabled: true,
},
})
// Create a service job which consumes most of the system resources
svcJob := mock.Job()
svcJob.TaskGroups[0].Count = 1
svcJob.TaskGroups[0].Tasks[0].Resources.CPU = 3600
require.NoError(t, h.State.UpsertJob(structs.MsgTypeTestSetup, h.NextIndex(), nil, svcJob))
// Create a mock evaluation to register the job
eval := &structs.Evaluation{
2017-09-07 23:56:15 +00:00
Namespace: structs.DefaultNamespace,
ID: uuid.Generate(),
Priority: svcJob.Priority,
TriggeredBy: structs.EvalTriggerJobRegister,
JobID: svcJob.ID,
Status: structs.EvalStatusPending,
}
require.NoError(t, h.State.UpsertEvals(structs.MsgTypeTestSetup, h.NextIndex(), []*structs.Evaluation{eval}))
// Process the evaluation
err := h.Process(NewServiceScheduler, eval)
if err != nil {
t.Fatalf("err: %v", err)
}
// Create a system job
job := mock.SystemJob()
require.NoError(t, h.State.UpsertJob(structs.MsgTypeTestSetup, h.NextIndex(), nil, job))
// Create a mock evaluation to register the job
eval1 := &structs.Evaluation{
2017-09-07 23:56:15 +00:00
Namespace: structs.DefaultNamespace,
ID: uuid.Generate(),
Priority: job.Priority,
TriggeredBy: structs.EvalTriggerJobRegister,
JobID: job.ID,
Status: structs.EvalStatusPending,
}
require.NoError(t, h.State.UpsertEvals(structs.MsgTypeTestSetup, h.NextIndex(), []*structs.Evaluation{eval1}))
// Process the evaluation
if err := h.Process(NewSystemScheduler, eval1); err != nil {
t.Fatalf("err: %v", err)
}
// System scheduler will preempt the service job and would have placed eval1
require := require.New(t)
newPlan := h.Plans[1]
require.Len(newPlan.NodeAllocation, 1)
require.Len(newPlan.NodePreemptions, 1)
for _, allocList := range newPlan.NodeAllocation {
require.Len(allocList, 1)
require.Equal(job.ID, allocList[0].JobID)
}
for _, allocList := range newPlan.NodePreemptions {
require.Len(allocList, 1)
require.Equal(svcJob.ID, allocList[0].JobID)
}
// Ensure that we have no queued allocations on the second eval
queued := h.Evals[1].QueuedAllocations["web"]
if queued != 0 {
t.Fatalf("expected: %v, actual: %v", 1, queued)
}
}
func TestSystemSched_JobRegister_Annotate(t *testing.T) {
ci.Parallel(t)
h := NewHarness(t)
// Create some nodes
for i := 0; i < 10; i++ {
node := mock.Node()
if i < 9 {
node.NodeClass = "foo"
} else {
node.NodeClass = "bar"
}
node.ComputeClass()
require.NoError(t, h.State.UpsertNode(structs.MsgTypeTestSetup, h.NextIndex(), node))
}
// Create a job constraining on node class
job := mock.SystemJob()
fooConstraint := &structs.Constraint{
LTarget: "${node.class}",
RTarget: "foo",
Operand: "==",
}
job.Constraints = append(job.Constraints, fooConstraint)
require.NoError(t, h.State.UpsertJob(structs.MsgTypeTestSetup, h.NextIndex(), nil, job))
// Create a mock evaluation to deregister the job
eval := &structs.Evaluation{
2017-09-07 23:56:15 +00:00
Namespace: structs.DefaultNamespace,
ID: uuid.Generate(),
Priority: job.Priority,
TriggeredBy: structs.EvalTriggerJobRegister,
JobID: job.ID,
AnnotatePlan: true,
Status: structs.EvalStatusPending,
}
require.NoError(t, h.State.UpsertEvals(structs.MsgTypeTestSetup, h.NextIndex(), []*structs.Evaluation{eval}))
// Process the evaluation
err := h.Process(NewSystemScheduler, eval)
if err != nil {
t.Fatalf("err: %v", err)
}
// Ensure a single plan
if len(h.Plans) != 1 {
t.Fatalf("bad: %#v", h.Plans)
}
plan := h.Plans[0]
2015-10-14 23:43:06 +00:00
// Ensure the plan allocated
var planned []*structs.Allocation
for _, allocList := range plan.NodeAllocation {
planned = append(planned, allocList...)
}
if len(planned) != 9 {
t.Fatalf("bad: %#v %d", planned, len(planned))
2015-10-14 23:43:06 +00:00
}
// Lookup the allocations by JobID
2017-02-08 05:22:48 +00:00
ws := memdb.NewWatchSet()
2017-09-07 23:56:15 +00:00
out, err := h.State.AllocsByJob(ws, job.Namespace, job.ID, false)
require.NoError(t, err)
2015-10-14 23:43:06 +00:00
// Ensure all allocations placed
if len(out) != 9 {
2015-10-14 23:43:06 +00:00
t.Fatalf("bad: %#v", out)
}
2016-01-04 22:23:06 +00:00
// Check the available nodes
if count, ok := out[0].Metrics.NodesAvailable["dc1"]; !ok || count != 10 {
t.Fatalf("bad: %#v", out[0].Metrics)
}
2015-10-14 23:43:06 +00:00
h.AssertEvalStatus(t, structs.EvalStatusComplete)
// Ensure the plan had annotations.
if plan.Annotations == nil {
t.Fatalf("expected annotations")
}
desiredTGs := plan.Annotations.DesiredTGUpdates
if l := len(desiredTGs); l != 1 {
t.Fatalf("incorrect number of task groups; got %v; want %v", l, 1)
}
desiredChanges, ok := desiredTGs["web"]
if !ok {
t.Fatalf("expected task group web to have desired changes")
}
expected := &structs.DesiredUpdates{Place: 9}
if !reflect.DeepEqual(desiredChanges, expected) {
t.Fatalf("Unexpected desired updates; got %#v; want %#v", desiredChanges, expected)
}
2015-10-14 23:43:06 +00:00
}
func TestSystemSched_JobRegister_AddNode(t *testing.T) {
ci.Parallel(t)
2015-10-14 23:43:06 +00:00
h := NewHarness(t)
// Create some nodes
nodes := createNodes(t, h, 10)
2015-10-14 23:43:06 +00:00
// Generate a fake job with allocations
job := mock.SystemJob()
require.NoError(t, h.State.UpsertJob(structs.MsgTypeTestSetup, h.NextIndex(), nil, job))
2015-10-14 23:43:06 +00:00
var allocs []*structs.Allocation
for _, node := range nodes {
alloc := mock.Alloc()
alloc.Job = job
alloc.JobID = job.ID
alloc.NodeID = node.ID
alloc.Name = "my-job.web[0]"
2015-10-14 23:43:06 +00:00
allocs = append(allocs, alloc)
}
require.NoError(t, h.State.UpsertAllocs(structs.MsgTypeTestSetup, h.NextIndex(), allocs))
2015-10-14 23:43:06 +00:00
// Add a new node.
node := mock.Node()
require.NoError(t, h.State.UpsertNode(structs.MsgTypeTestSetup, h.NextIndex(), node))
2015-10-14 23:43:06 +00:00
// Create a mock evaluation to deal with the node update
eval := &structs.Evaluation{
2017-09-07 23:56:15 +00:00
Namespace: structs.DefaultNamespace,
ID: uuid.Generate(),
2015-10-14 23:43:06 +00:00
Priority: 50,
TriggeredBy: structs.EvalTriggerNodeUpdate,
JobID: job.ID,
Status: structs.EvalStatusPending,
2015-10-14 23:43:06 +00:00
}
require.NoError(t, h.State.UpsertEvals(structs.MsgTypeTestSetup, h.NextIndex(), []*structs.Evaluation{eval}))
2015-10-14 23:43:06 +00:00
// Process the evaluation
err := h.Process(NewSystemScheduler, eval)
if err != nil {
t.Fatalf("err: %v", err)
}
// Ensure a single plan
require.Len(t, h.Plans, 1)
2015-10-14 23:43:06 +00:00
plan := h.Plans[0]
// Ensure the plan had no node updates
var update []*structs.Allocation
for _, updateList := range plan.NodeUpdate {
update = append(update, updateList...)
}
require.Empty(t, update)
2015-10-14 23:43:06 +00:00
// Ensure the plan allocated on the new node
var planned []*structs.Allocation
for _, allocList := range plan.NodeAllocation {
planned = append(planned, allocList...)
}
require.Len(t, planned, 1)
2015-10-14 23:43:06 +00:00
// Ensure it allocated on the right node
if _, ok := plan.NodeAllocation[node.ID]; !ok {
t.Fatalf("allocated on wrong node: %#v", plan)
}
// Lookup the allocations by JobID
2017-02-08 05:22:48 +00:00
ws := memdb.NewWatchSet()
2017-09-07 23:56:15 +00:00
out, err := h.State.AllocsByJob(ws, job.Namespace, job.ID, false)
require.NoError(t, err)
2015-10-14 23:43:06 +00:00
// Ensure all allocations placed
out, _ = structs.FilterTerminalAllocs(out)
2015-10-14 23:43:06 +00:00
if len(out) != 11 {
t.Fatalf("bad: %#v", out)
}
h.AssertEvalStatus(t, structs.EvalStatusComplete)
}
func TestSystemSched_JobRegister_AllocFail(t *testing.T) {
ci.Parallel(t)
2015-10-14 23:43:06 +00:00
h := NewHarness(t)
// Create NO nodes
// Create a job
job := mock.SystemJob()
require.NoError(t, h.State.UpsertJob(structs.MsgTypeTestSetup, h.NextIndex(), nil, job))
2015-10-14 23:43:06 +00:00
// Create a mock evaluation to register the job
eval := &structs.Evaluation{
2017-09-07 23:56:15 +00:00
Namespace: structs.DefaultNamespace,
ID: uuid.Generate(),
2015-10-14 23:43:06 +00:00
Priority: job.Priority,
TriggeredBy: structs.EvalTriggerJobRegister,
JobID: job.ID,
Status: structs.EvalStatusPending,
2015-10-14 23:43:06 +00:00
}
require.NoError(t, h.State.UpsertEvals(structs.MsgTypeTestSetup, h.NextIndex(), []*structs.Evaluation{eval}))
2015-10-14 23:43:06 +00:00
// Process the evaluation
err := h.Process(NewSystemScheduler, eval)
if err != nil {
t.Fatalf("err: %v", err)
}
// Ensure no plan as this should be a no-op.
if len(h.Plans) != 0 {
t.Fatalf("bad: %#v", h.Plans)
}
h.AssertEvalStatus(t, structs.EvalStatusComplete)
}
func TestSystemSched_JobModify(t *testing.T) {
ci.Parallel(t)
2015-10-14 23:43:06 +00:00
h := NewHarness(t)
// Create some nodes
nodes := createNodes(t, h, 10)
2015-10-14 23:43:06 +00:00
// Generate a fake job with allocations
job := mock.SystemJob()
require.NoError(t, h.State.UpsertJob(structs.MsgTypeTestSetup, h.NextIndex(), nil, job))
2015-10-14 23:43:06 +00:00
var allocs []*structs.Allocation
for _, node := range nodes {
alloc := mock.Alloc()
alloc.Job = job
alloc.JobID = job.ID
alloc.NodeID = node.ID
alloc.Name = "my-job.web[0]"
2015-10-14 23:43:06 +00:00
allocs = append(allocs, alloc)
}
require.NoError(t, h.State.UpsertAllocs(structs.MsgTypeTestSetup, h.NextIndex(), allocs))
2015-10-14 23:43:06 +00:00
// Add a few terminal status allocations, these should be ignored
var terminal []*structs.Allocation
for i := 0; i < 5; i++ {
alloc := mock.Alloc()
alloc.Job = job
alloc.JobID = job.ID
alloc.NodeID = nodes[i].ID
alloc.Name = "my-job.web[0]"
alloc.DesiredStatus = structs.AllocDesiredStatusStop
2015-10-14 23:43:06 +00:00
terminal = append(terminal, alloc)
}
require.NoError(t, h.State.UpsertAllocs(structs.MsgTypeTestSetup, h.NextIndex(), terminal))
2015-10-14 23:43:06 +00:00
// Update the job
job2 := mock.SystemJob()
job2.ID = job.ID
// Update the task, such that it cannot be done in-place
job2.TaskGroups[0].Tasks[0].Config["command"] = "/bin/other"
require.NoError(t, h.State.UpsertJob(structs.MsgTypeTestSetup, h.NextIndex(), nil, job2))
2015-10-14 23:43:06 +00:00
// Create a mock evaluation to deal with drain
eval := &structs.Evaluation{
2017-09-07 23:56:15 +00:00
Namespace: structs.DefaultNamespace,
ID: uuid.Generate(),
2015-10-14 23:43:06 +00:00
Priority: 50,
TriggeredBy: structs.EvalTriggerJobRegister,
JobID: job.ID,
Status: structs.EvalStatusPending,
2015-10-14 23:43:06 +00:00
}
require.NoError(t, h.State.UpsertEvals(structs.MsgTypeTestSetup, h.NextIndex(), []*structs.Evaluation{eval}))
2015-10-14 23:43:06 +00:00
// Process the evaluation
err := h.Process(NewSystemScheduler, eval)
require.NoError(t, err)
2015-10-14 23:43:06 +00:00
// Ensure a single plan
require.Len(t, h.Plans, 1)
2015-10-14 23:43:06 +00:00
plan := h.Plans[0]
// Ensure the plan evicted all allocs
var update []*structs.Allocation
for _, updateList := range plan.NodeUpdate {
update = append(update, updateList...)
}
require.Equal(t, len(allocs), len(update))
2015-10-14 23:43:06 +00:00
// Ensure the plan allocated
var planned []*structs.Allocation
for _, allocList := range plan.NodeAllocation {
planned = append(planned, allocList...)
}
require.Len(t, planned, 10)
2015-10-14 23:43:06 +00:00
// Lookup the allocations by JobID
2017-02-08 05:22:48 +00:00
ws := memdb.NewWatchSet()
2017-09-07 23:56:15 +00:00
out, err := h.State.AllocsByJob(ws, job.Namespace, job.ID, false)
require.NoError(t, err)
2015-10-14 23:43:06 +00:00
// Ensure all allocations placed
out, _ = structs.FilterTerminalAllocs(out)
require.Len(t, out, 10)
2015-10-14 23:43:06 +00:00
h.AssertEvalStatus(t, structs.EvalStatusComplete)
}
func TestSystemSched_JobModify_Rolling(t *testing.T) {
ci.Parallel(t)
2015-10-14 23:43:06 +00:00
h := NewHarness(t)
// Create some nodes
nodes := createNodes(t, h, 10)
2015-10-14 23:43:06 +00:00
// Generate a fake job with allocations
job := mock.SystemJob()
require.NoError(t, h.State.UpsertJob(structs.MsgTypeTestSetup, h.NextIndex(), nil, job))
2015-10-14 23:43:06 +00:00
var allocs []*structs.Allocation
for _, node := range nodes {
alloc := mock.Alloc()
alloc.Job = job
alloc.JobID = job.ID
alloc.NodeID = node.ID
alloc.Name = "my-job.web[0]"
2015-10-14 23:43:06 +00:00
allocs = append(allocs, alloc)
}
require.NoError(t, h.State.UpsertAllocs(structs.MsgTypeTestSetup, h.NextIndex(), allocs))
2015-10-14 23:43:06 +00:00
// Update the job
job2 := mock.SystemJob()
job2.ID = job.ID
job2.Update = structs.UpdateStrategy{
Stagger: 30 * time.Second,
MaxParallel: 5,
}
// Update the task, such that it cannot be done in-place
job2.TaskGroups[0].Tasks[0].Config["command"] = "/bin/other"
require.NoError(t, h.State.UpsertJob(structs.MsgTypeTestSetup, h.NextIndex(), nil, job2))
2015-10-14 23:43:06 +00:00
// Create a mock evaluation to deal with drain
eval := &structs.Evaluation{
2017-09-07 23:56:15 +00:00
Namespace: structs.DefaultNamespace,
ID: uuid.Generate(),
2015-10-14 23:43:06 +00:00
Priority: 50,
TriggeredBy: structs.EvalTriggerJobRegister,
JobID: job.ID,
Status: structs.EvalStatusPending,
2015-10-14 23:43:06 +00:00
}
require.NoError(t, h.State.UpsertEvals(structs.MsgTypeTestSetup, h.NextIndex(), []*structs.Evaluation{eval}))
2015-10-14 23:43:06 +00:00
// Process the evaluation
err := h.Process(NewSystemScheduler, eval)
if err != nil {
t.Fatalf("err: %v", err)
}
// Ensure a single plan
if len(h.Plans) != 1 {
t.Fatalf("bad: %#v", h.Plans)
}
plan := h.Plans[0]
// Ensure the plan evicted only MaxParallel
var update []*structs.Allocation
for _, updateList := range plan.NodeUpdate {
update = append(update, updateList...)
}
if len(update) != job2.Update.MaxParallel {
t.Fatalf("bad: %#v", plan)
}
// Ensure the plan allocated
var planned []*structs.Allocation
for _, allocList := range plan.NodeAllocation {
planned = append(planned, allocList...)
}
if len(planned) != job2.Update.MaxParallel {
t.Fatalf("bad: %#v", plan)
}
h.AssertEvalStatus(t, structs.EvalStatusComplete)
// Ensure a follow up eval was created
eval = h.Evals[0]
if eval.NextEval == "" {
t.Fatalf("missing next eval")
}
// Check for create
if len(h.CreateEvals) == 0 {
t.Fatalf("missing created eval")
}
create := h.CreateEvals[0]
if eval.NextEval != create.ID {
t.Fatalf("ID mismatch")
}
if create.PreviousEval != eval.ID {
t.Fatalf("missing previous eval")
}
if create.TriggeredBy != structs.EvalTriggerRollingUpdate {
t.Fatalf("bad: %#v", create)
}
}
func TestSystemSched_JobModify_InPlace(t *testing.T) {
ci.Parallel(t)
2015-10-14 23:43:06 +00:00
h := NewHarness(t)
// Create some nodes
nodes := createNodes(t, h, 10)
2015-10-14 23:43:06 +00:00
// Generate a fake job with allocations
job := mock.SystemJob()
require.NoError(t, h.State.UpsertJob(structs.MsgTypeTestSetup, h.NextIndex(), nil, job))
2015-10-14 23:43:06 +00:00
var allocs []*structs.Allocation
for _, node := range nodes {
alloc := mock.AllocForNode(node)
2015-10-14 23:43:06 +00:00
alloc.Job = job
alloc.JobID = job.ID
alloc.Name = "my-job.web[0]"
2015-10-14 23:43:06 +00:00
allocs = append(allocs, alloc)
}
require.NoError(t, h.State.UpsertAllocs(structs.MsgTypeTestSetup, h.NextIndex(), allocs))
2015-10-14 23:43:06 +00:00
// Update the job
job2 := mock.SystemJob()
job2.ID = job.ID
require.NoError(t, h.State.UpsertJob(structs.MsgTypeTestSetup, h.NextIndex(), nil, job2))
2015-10-14 23:43:06 +00:00
// Create a mock evaluation to deal with update
2015-10-14 23:43:06 +00:00
eval := &structs.Evaluation{
2017-09-07 23:56:15 +00:00
Namespace: structs.DefaultNamespace,
ID: uuid.Generate(),
2015-10-14 23:43:06 +00:00
Priority: 50,
TriggeredBy: structs.EvalTriggerJobRegister,
JobID: job.ID,
Status: structs.EvalStatusPending,
2015-10-14 23:43:06 +00:00
}
require.NoError(t, h.State.UpsertEvals(structs.MsgTypeTestSetup, h.NextIndex(), []*structs.Evaluation{eval}))
2015-10-14 23:43:06 +00:00
// Process the evaluation
err := h.Process(NewSystemScheduler, eval)
require.NoError(t, err)
2015-10-14 23:43:06 +00:00
// Ensure a single plan
require.Len(t, h.Plans, 1)
2015-10-14 23:43:06 +00:00
plan := h.Plans[0]
// Ensure the plan did not evict any allocs
var update []*structs.Allocation
for _, updateList := range plan.NodeUpdate {
update = append(update, updateList...)
}
require.Empty(t, update)
2015-10-14 23:43:06 +00:00
// Ensure the plan updated the existing allocs
var planned []*structs.Allocation
for _, allocList := range plan.NodeAllocation {
planned = append(planned, allocList...)
}
require.Len(t, planned, 10)
2015-10-14 23:43:06 +00:00
for _, p := range planned {
require.Equal(t, job2, p.Job, "should update job")
2015-10-14 23:43:06 +00:00
}
// Lookup the allocations by JobID
2017-02-08 05:22:48 +00:00
ws := memdb.NewWatchSet()
2017-09-07 23:56:15 +00:00
out, err := h.State.AllocsByJob(ws, job.Namespace, job.ID, false)
require.NoError(t, err)
2015-10-14 23:43:06 +00:00
// Ensure all allocations placed
require.Len(t, out, 10)
2015-10-14 23:43:06 +00:00
h.AssertEvalStatus(t, structs.EvalStatusComplete)
// Verify the network did not change
2017-12-08 22:50:06 +00:00
rp := structs.Port{Label: "admin", Value: 5000}
2015-10-14 23:43:06 +00:00
for _, alloc := range out {
for _, resources := range alloc.TaskResources {
require.Equal(t, rp, resources.Networks[0].ReservedPorts[0])
2015-10-14 23:43:06 +00:00
}
}
}
func TestSystemSched_JobModify_RemoveDC(t *testing.T) {
ci.Parallel(t)
h := NewHarness(t)
// Create some nodes
node1 := mock.Node()
node1.Datacenter = "dc1"
require.NoError(t, h.State.UpsertNode(structs.MsgTypeTestSetup, h.NextIndex(), node1))
node2 := mock.Node()
node2.Datacenter = "dc2"
require.NoError(t, h.State.UpsertNode(structs.MsgTypeTestSetup, h.NextIndex(), node2))
fmt.Println("DC1 node: ", node1.ID)
fmt.Println("DC2 node: ", node2.ID)
nodes := []*structs.Node{node1, node2}
// Generate a fake job with allocations
job := mock.SystemJob()
job.Datacenters = []string{"dc1", "dc2"}
require.NoError(t, h.State.UpsertJob(structs.MsgTypeTestSetup, h.NextIndex(), nil, job))
var allocs []*structs.Allocation
for _, node := range nodes {
alloc := mock.Alloc()
alloc.Job = job
alloc.JobID = job.ID
alloc.NodeID = node.ID
alloc.Name = "my-job.web[0]"
allocs = append(allocs, alloc)
}
require.NoError(t, h.State.UpsertAllocs(structs.MsgTypeTestSetup, h.NextIndex(), allocs))
// Update the job
job2 := job.Copy()
job2.Datacenters = []string{"dc1"}
require.NoError(t, h.State.UpsertJob(structs.MsgTypeTestSetup, h.NextIndex(), nil, job2))
// Create a mock evaluation to deal with update
eval := &structs.Evaluation{
Namespace: structs.DefaultNamespace,
ID: uuid.Generate(),
Priority: 50,
TriggeredBy: structs.EvalTriggerJobRegister,
JobID: job.ID,
Status: structs.EvalStatusPending,
}
require.NoError(t, h.State.UpsertEvals(structs.MsgTypeTestSetup, h.NextIndex(), []*structs.Evaluation{eval}))
// Process the evaluation
err := h.Process(NewSystemScheduler, eval)
require.NoError(t, err)
// Ensure a single plan
require.Len(t, h.Plans, 1)
plan := h.Plans[0]
// Ensure the plan did not evict any allocs
var update []*structs.Allocation
for _, updateList := range plan.NodeUpdate {
update = append(update, updateList...)
}
require.Len(t, update, 1)
// Ensure the plan updated the existing allocs
var planned []*structs.Allocation
for _, allocList := range plan.NodeAllocation {
planned = append(planned, allocList...)
}
require.Len(t, planned, 1)
for _, p := range planned {
require.Equal(t, job2, p.Job, "should update job")
}
// Lookup the allocations by JobID
ws := memdb.NewWatchSet()
out, err := h.State.AllocsByJob(ws, job.Namespace, job.ID, false)
require.NoError(t, err)
// Ensure all allocations placed
require.Len(t, out, 2)
h.AssertEvalStatus(t, structs.EvalStatusComplete)
}
2017-04-15 03:54:30 +00:00
func TestSystemSched_JobDeregister_Purged(t *testing.T) {
ci.Parallel(t)
2015-10-14 23:43:06 +00:00
h := NewHarness(t)
// Create some nodes
nodes := createNodes(t, h, 10)
2015-10-14 23:43:06 +00:00
// Generate a fake job with allocations
job := mock.SystemJob()
var allocs []*structs.Allocation
for _, node := range nodes {
alloc := mock.Alloc()
alloc.Job = job
alloc.JobID = job.ID
alloc.NodeID = node.ID
alloc.Name = "my-job.web[0]"
2015-10-14 23:43:06 +00:00
allocs = append(allocs, alloc)
}
2016-07-22 21:53:49 +00:00
for _, alloc := range allocs {
require.NoError(t, h.State.UpsertJobSummary(h.NextIndex(), mock.JobSummary(alloc.JobID)))
2016-07-22 21:53:49 +00:00
}
require.NoError(t, h.State.UpsertAllocs(structs.MsgTypeTestSetup, h.NextIndex(), allocs))
2015-10-14 23:43:06 +00:00
// Create a mock evaluation to deregister the job
eval := &structs.Evaluation{
2017-09-07 23:56:15 +00:00
Namespace: structs.DefaultNamespace,
ID: uuid.Generate(),
2015-10-14 23:43:06 +00:00
Priority: 50,
TriggeredBy: structs.EvalTriggerJobDeregister,
JobID: job.ID,
Status: structs.EvalStatusPending,
2015-10-14 23:43:06 +00:00
}
require.NoError(t, h.State.UpsertEvals(structs.MsgTypeTestSetup, h.NextIndex(), []*structs.Evaluation{eval}))
2015-10-14 23:43:06 +00:00
// Process the evaluation
err := h.Process(NewSystemScheduler, eval)
require.NoError(t, err)
2015-10-14 23:43:06 +00:00
// Ensure a single plan
require.Len(t, h.Plans, 1)
2017-04-15 03:54:30 +00:00
plan := h.Plans[0]
// Ensure the plan evicted the job from all nodes.
for _, node := range nodes {
require.Len(t, plan.NodeUpdate[node.ID], 1)
2017-04-15 03:54:30 +00:00
}
// Lookup the allocations by JobID
ws := memdb.NewWatchSet()
2017-09-07 23:56:15 +00:00
out, err := h.State.AllocsByJob(ws, job.Namespace, job.ID, false)
require.NoError(t, err)
2017-04-15 03:54:30 +00:00
// Ensure no remaining allocations
out, _ = structs.FilterTerminalAllocs(out)
require.Empty(t, out)
2017-04-15 03:54:30 +00:00
h.AssertEvalStatus(t, structs.EvalStatusComplete)
}
func TestSystemSched_JobDeregister_Stopped(t *testing.T) {
ci.Parallel(t)
2017-04-15 03:54:30 +00:00
h := NewHarness(t)
// Create some nodes
nodes := createNodes(t, h, 10)
2017-04-15 03:54:30 +00:00
// Generate a fake job with allocations
job := mock.SystemJob()
job.Stop = true
require.NoError(t, h.State.UpsertJob(structs.MsgTypeTestSetup, h.NextIndex(), nil, job))
2017-04-15 03:54:30 +00:00
var allocs []*structs.Allocation
for _, node := range nodes {
alloc := mock.Alloc()
alloc.Job = job
alloc.JobID = job.ID
alloc.NodeID = node.ID
alloc.Name = "my-job.web[0]"
allocs = append(allocs, alloc)
}
for _, alloc := range allocs {
require.NoError(t, h.State.UpsertJobSummary(h.NextIndex(), mock.JobSummary(alloc.JobID)))
2017-04-15 03:54:30 +00:00
}
require.NoError(t, h.State.UpsertAllocs(structs.MsgTypeTestSetup, h.NextIndex(), allocs))
2017-04-15 03:54:30 +00:00
// Create a mock evaluation to deregister the job
eval := &structs.Evaluation{
2017-09-07 23:56:15 +00:00
Namespace: structs.DefaultNamespace,
ID: uuid.Generate(),
2017-04-15 03:54:30 +00:00
Priority: 50,
TriggeredBy: structs.EvalTriggerJobDeregister,
JobID: job.ID,
Status: structs.EvalStatusPending,
2017-04-15 03:54:30 +00:00
}
require.NoError(t, h.State.UpsertEvals(structs.MsgTypeTestSetup, h.NextIndex(), []*structs.Evaluation{eval}))
2017-04-15 03:54:30 +00:00
// Process the evaluation
err := h.Process(NewSystemScheduler, eval)
require.NoError(t, err)
2017-04-15 03:54:30 +00:00
// Ensure a single plan
require.Len(t, h.Plans, 1)
2015-10-14 23:43:06 +00:00
plan := h.Plans[0]
// Ensure the plan evicted the job from all nodes.
for _, node := range nodes {
require.Len(t, plan.NodeUpdate[node.ID], 1)
2015-10-14 23:43:06 +00:00
}
// Lookup the allocations by JobID
2017-02-08 05:22:48 +00:00
ws := memdb.NewWatchSet()
2017-09-07 23:56:15 +00:00
out, err := h.State.AllocsByJob(ws, job.Namespace, job.ID, false)
require.NoError(t, err)
2015-10-14 23:43:06 +00:00
// Ensure no remaining allocations
out, _ = structs.FilterTerminalAllocs(out)
require.Empty(t, out)
2015-10-14 23:43:06 +00:00
h.AssertEvalStatus(t, structs.EvalStatusComplete)
}
func TestSystemSched_NodeDown(t *testing.T) {
ci.Parallel(t)
h := NewHarness(t)
// Register a down node
node := mock.Node()
node.Status = structs.NodeStatusDown
require.NoError(t, h.State.UpsertNode(structs.MsgTypeTestSetup, h.NextIndex(), node))
// Generate a fake job allocated on that node.
job := mock.SystemJob()
require.NoError(t, h.State.UpsertJob(structs.MsgTypeTestSetup, h.NextIndex(), nil, job))
alloc := mock.Alloc()
alloc.Job = job
alloc.JobID = job.ID
alloc.NodeID = node.ID
alloc.Name = "my-job.web[0]"
alloc.DesiredTransition.Migrate = pointer.Of(true)
require.NoError(t, h.State.UpsertAllocs(structs.MsgTypeTestSetup, h.NextIndex(), []*structs.Allocation{alloc}))
// Create a mock evaluation to deal with drain
eval := &structs.Evaluation{
2017-09-07 23:56:15 +00:00
Namespace: structs.DefaultNamespace,
ID: uuid.Generate(),
Priority: 50,
TriggeredBy: structs.EvalTriggerNodeUpdate,
JobID: job.ID,
NodeID: node.ID,
Status: structs.EvalStatusPending,
}
require.NoError(t, h.State.UpsertEvals(structs.MsgTypeTestSetup, h.NextIndex(), []*structs.Evaluation{eval}))
// Process the evaluation
err := h.Process(NewSystemScheduler, eval)
require.NoError(t, err)
// Ensure a single plan
require.Len(t, h.Plans, 1)
plan := h.Plans[0]
// Ensure the plan evicted all allocs
require.Len(t, plan.NodeUpdate[node.ID], 1)
// Ensure the plan updated the allocation.
planned := make([]*structs.Allocation, 0)
for _, allocList := range plan.NodeUpdate {
planned = append(planned, allocList...)
}
require.Len(t, planned, 1)
// Ensure the allocations is stopped
p := planned[0]
require.Equal(t, structs.AllocDesiredStatusStop, p.DesiredStatus)
// removed badly designed assertion on client_status = lost
// the actual client_status is pending
h.AssertEvalStatus(t, structs.EvalStatusComplete)
}
2016-08-09 21:48:25 +00:00
func TestSystemSched_NodeDrain_Down(t *testing.T) {
ci.Parallel(t)
2016-08-09 21:48:25 +00:00
h := NewHarness(t)
// Register a draining node
node := mock.DrainNode()
2016-08-09 21:48:25 +00:00
node.Status = structs.NodeStatusDown
require.NoError(t, h.State.UpsertNode(structs.MsgTypeTestSetup, h.NextIndex(), node))
2016-08-09 21:48:25 +00:00
// Generate a fake job allocated on that node.
job := mock.SystemJob()
require.NoError(t, h.State.UpsertJob(structs.MsgTypeTestSetup, h.NextIndex(), nil, job))
2016-08-09 21:48:25 +00:00
alloc := mock.Alloc()
alloc.Job = job
alloc.JobID = job.ID
alloc.NodeID = node.ID
alloc.Name = "my-job.web[0]"
require.NoError(t, h.State.UpsertAllocs(structs.MsgTypeTestSetup, h.NextIndex(), []*structs.Allocation{alloc}))
2016-08-09 21:48:25 +00:00
// Create a mock evaluation to deal with the node update
eval := &structs.Evaluation{
2017-09-07 23:56:15 +00:00
Namespace: structs.DefaultNamespace,
ID: uuid.Generate(),
2016-08-09 21:48:25 +00:00
Priority: 50,
TriggeredBy: structs.EvalTriggerNodeUpdate,
JobID: job.ID,
NodeID: node.ID,
Status: structs.EvalStatusPending,
2016-08-09 21:48:25 +00:00
}
require.NoError(t, h.State.UpsertEvals(structs.MsgTypeTestSetup, h.NextIndex(), []*structs.Evaluation{eval}))
2016-08-09 21:48:25 +00:00
// Process the evaluation
err := h.Process(NewSystemScheduler, eval) // todo: yikes
require.NoError(t, err)
2016-08-09 21:48:25 +00:00
// Ensure a single plan
require.Len(t, h.Plans, 1)
2016-08-09 21:48:25 +00:00
plan := h.Plans[0]
// Ensure the plan evicted non terminal allocs
require.Len(t, plan.NodeUpdate[node.ID], 1)
2016-08-09 21:48:25 +00:00
// Ensure that the allocation is marked as lost
var lost []string
2016-08-09 21:48:25 +00:00
for _, alloc := range plan.NodeUpdate[node.ID] {
lost = append(lost, alloc.ID)
2016-08-09 21:48:25 +00:00
}
require.Equal(t, []string{alloc.ID}, lost)
2016-08-09 21:48:25 +00:00
h.AssertEvalStatus(t, structs.EvalStatusComplete)
}
2015-10-14 23:43:06 +00:00
func TestSystemSched_NodeDrain(t *testing.T) {
ci.Parallel(t)
2015-10-14 23:43:06 +00:00
h := NewHarness(t)
// Register a draining node
node := mock.DrainNode()
require.NoError(t, h.State.UpsertNode(structs.MsgTypeTestSetup, h.NextIndex(), node))
2015-10-14 23:43:06 +00:00
// Generate a fake job allocated on that node.
job := mock.SystemJob()
require.NoError(t, h.State.UpsertJob(structs.MsgTypeTestSetup, h.NextIndex(), nil, job))
2015-10-14 23:43:06 +00:00
alloc := mock.Alloc()
alloc.Job = job
alloc.JobID = job.ID
alloc.NodeID = node.ID
alloc.Name = "my-job.web[0]"
alloc.DesiredTransition.Migrate = pointer.Of(true)
require.NoError(t, h.State.UpsertAllocs(structs.MsgTypeTestSetup, h.NextIndex(), []*structs.Allocation{alloc}))
2015-10-14 23:43:06 +00:00
// Create a mock evaluation to deal with drain
eval := &structs.Evaluation{
2017-09-07 23:56:15 +00:00
Namespace: structs.DefaultNamespace,
ID: uuid.Generate(),
2015-10-14 23:43:06 +00:00
Priority: 50,
TriggeredBy: structs.EvalTriggerNodeUpdate,
JobID: job.ID,
NodeID: node.ID,
Status: structs.EvalStatusPending,
2015-10-14 23:43:06 +00:00
}
require.NoError(t, h.State.UpsertEvals(structs.MsgTypeTestSetup, h.NextIndex(), []*structs.Evaluation{eval}))
2015-10-14 23:43:06 +00:00
// Process the evaluation
err := h.Process(NewSystemScheduler, eval)
require.NoError(t, err)
2015-10-14 23:43:06 +00:00
// Ensure a single plan
require.Len(t, h.Plans, 1)
2015-10-14 23:43:06 +00:00
plan := h.Plans[0]
// Ensure the plan evicted all allocs
require.Len(t, plan.NodeUpdate[node.ID], 1)
2015-10-14 23:43:06 +00:00
// Ensure the plan updated the allocation.
planned := make([]*structs.Allocation, 0)
2015-10-14 23:43:06 +00:00
for _, allocList := range plan.NodeUpdate {
planned = append(planned, allocList...)
}
require.Len(t, planned, 1)
2015-10-14 23:43:06 +00:00
// Ensure the allocations is stopped
require.Equal(t, structs.AllocDesiredStatusStop, planned[0].DesiredStatus)
2015-10-14 23:43:06 +00:00
h.AssertEvalStatus(t, structs.EvalStatusComplete)
}
func TestSystemSched_NodeUpdate(t *testing.T) {
ci.Parallel(t)
h := NewHarness(t)
// Register a node
node := mock.Node()
require.NoError(t, h.State.UpsertNode(structs.MsgTypeTestSetup, h.NextIndex(), node))
// Generate a fake job allocated on that node.
job := mock.SystemJob()
require.NoError(t, h.State.UpsertJob(structs.MsgTypeTestSetup, h.NextIndex(), nil, job))
alloc := mock.Alloc()
alloc.Job = job
alloc.JobID = job.ID
alloc.NodeID = node.ID
alloc.Name = "my-job.web[0]"
require.NoError(t, h.State.UpsertAllocs(structs.MsgTypeTestSetup, h.NextIndex(), []*structs.Allocation{alloc}))
// Create a mock evaluation to deal with the node update
eval := &structs.Evaluation{
2017-09-07 23:56:15 +00:00
Namespace: structs.DefaultNamespace,
ID: uuid.Generate(),
Priority: 50,
TriggeredBy: structs.EvalTriggerNodeUpdate,
JobID: job.ID,
NodeID: node.ID,
Status: structs.EvalStatusPending,
}
require.NoError(t, h.State.UpsertEvals(structs.MsgTypeTestSetup, h.NextIndex(), []*structs.Evaluation{eval}))
// Process the evaluation
err := h.Process(NewSystemScheduler, eval)
require.NoError(t, err)
// Ensure that queued allocations is zero
val, ok := h.Evals[0].QueuedAllocations["web"]
require.True(t, ok)
require.Zero(t, val)
2015-10-14 23:43:06 +00:00
h.AssertEvalStatus(t, structs.EvalStatusComplete)
}
func TestSystemSched_RetryLimit(t *testing.T) {
ci.Parallel(t)
2015-10-14 23:43:06 +00:00
h := NewHarness(t)
h.Planner = &RejectPlan{h}
// Create some nodes
_ = createNodes(t, h, 10)
2015-10-14 23:43:06 +00:00
// Create a job
job := mock.SystemJob()
require.NoError(t, h.State.UpsertJob(structs.MsgTypeTestSetup, h.NextIndex(), nil, job))
2015-10-14 23:43:06 +00:00
// Create a mock evaluation to register the job
2015-10-14 23:43:06 +00:00
eval := &structs.Evaluation{
2017-09-07 23:56:15 +00:00
Namespace: structs.DefaultNamespace,
ID: uuid.Generate(),
2015-10-14 23:43:06 +00:00
Priority: job.Priority,
TriggeredBy: structs.EvalTriggerJobRegister,
JobID: job.ID,
Status: structs.EvalStatusPending,
2015-10-14 23:43:06 +00:00
}
require.NoError(t, h.State.UpsertEvals(structs.MsgTypeTestSetup, h.NextIndex(), []*structs.Evaluation{eval}))
2015-10-14 23:43:06 +00:00
// Process the evaluation
err := h.Process(NewSystemScheduler, eval)
require.NoError(t, err)
2015-10-14 23:43:06 +00:00
// Ensure multiple plans
require.NotEmpty(t, h.Plans)
2015-10-14 23:43:06 +00:00
// Lookup the allocations by JobID
2017-02-08 05:22:48 +00:00
ws := memdb.NewWatchSet()
2017-09-07 23:56:15 +00:00
out, err := h.State.AllocsByJob(ws, job.Namespace, job.ID, false)
require.NoError(t, err)
2015-10-14 23:43:06 +00:00
// Ensure no allocations placed
require.Empty(t, out)
2015-10-14 23:43:06 +00:00
// Should hit the retry limit
h.AssertEvalStatus(t, structs.EvalStatusFailed)
}
// This test ensures that the scheduler doesn't increment the queued allocation
// count for a task group when allocations can't be created on currently
// available nodes because of constraint mismatches.
func TestSystemSched_Queued_With_Constraints(t *testing.T) {
ci.Parallel(t)
h := NewHarness(t)
// Register a node
node := mock.Node()
node.Attributes["kernel.name"] = "darwin"
require.NoError(t, h.State.UpsertNode(structs.MsgTypeTestSetup, h.NextIndex(), node))
// Generate a system job which can't be placed on the node
job := mock.SystemJob()
require.NoError(t, h.State.UpsertJob(structs.MsgTypeTestSetup, h.NextIndex(), nil, job))
// Create a mock evaluation to deal with the node update
eval := &structs.Evaluation{
2017-09-07 23:56:15 +00:00
Namespace: structs.DefaultNamespace,
ID: uuid.Generate(),
Priority: 50,
TriggeredBy: structs.EvalTriggerNodeUpdate,
JobID: job.ID,
NodeID: node.ID,
Status: structs.EvalStatusPending,
}
require.NoError(t, h.State.UpsertEvals(structs.MsgTypeTestSetup, h.NextIndex(), []*structs.Evaluation{eval}))
// Process the evaluation
err := h.Process(NewSystemScheduler, eval)
require.NoError(t, err)
// Ensure that queued allocations is zero
val, ok := h.Evals[0].QueuedAllocations["web"]
require.True(t, ok)
require.Zero(t, val)
}
// This test ensures that the scheduler correctly ignores ineligible
// nodes when scheduling due to a new node being added. The job has two
// task groups constrained to a particular node class. The desired behavior
// should be that the TaskGroup constrained to the newly added node class is
// added and that the TaskGroup constrained to the ineligible node is ignored.
func TestSystemSched_JobConstraint_AddNode(t *testing.T) {
ci.Parallel(t)
h := NewHarness(t)
// Create two nodes
var node *structs.Node
node = mock.Node()
node.NodeClass = "Class-A"
require.NoError(t, node.ComputeClass())
require.Nil(t, h.State.UpsertNode(structs.MsgTypeTestSetup, h.NextIndex(), node))
var nodeB *structs.Node
nodeB = mock.Node()
nodeB.NodeClass = "Class-B"
require.NoError(t, nodeB.ComputeClass())
require.Nil(t, h.State.UpsertNode(structs.MsgTypeTestSetup, h.NextIndex(), nodeB))
// Make a job with two task groups, each constraint to a node class
job := mock.SystemJob()
tgA := job.TaskGroups[0]
tgA.Name = "groupA"
tgA.Constraints = []*structs.Constraint{
{
LTarget: "${node.class}",
RTarget: node.NodeClass,
Operand: "=",
},
}
tgB := job.TaskGroups[0].Copy()
tgB.Name = "groupB"
tgB.Constraints = []*structs.Constraint{
{
LTarget: "${node.class}",
RTarget: nodeB.NodeClass,
Operand: "=",
},
}
// Upsert Job
job.TaskGroups = []*structs.TaskGroup{tgA, tgB}
require.Nil(t, h.State.UpsertJob(structs.MsgTypeTestSetup, h.NextIndex(), nil, job))
// Evaluate the job
eval := &structs.Evaluation{
Namespace: structs.DefaultNamespace,
ID: uuid.Generate(),
Priority: job.Priority,
TriggeredBy: structs.EvalTriggerJobRegister,
JobID: job.ID,
Status: structs.EvalStatusPending,
}
require.Nil(t, h.State.UpsertEvals(structs.MsgTypeTestSetup, h.NextIndex(), []*structs.Evaluation{eval}))
require.Nil(t, h.Process(NewSystemScheduler, eval))
require.Equal(t, "complete", h.Evals[0].Status)
// QueuedAllocations is drained
val, ok := h.Evals[0].QueuedAllocations["groupA"]
require.True(t, ok)
require.Equal(t, 0, val)
val, ok = h.Evals[0].QueuedAllocations["groupB"]
require.True(t, ok)
require.Equal(t, 0, val)
// Single plan with two NodeAllocations
require.Len(t, h.Plans, 1)
require.Len(t, h.Plans[0].NodeAllocation, 2)
// Mark the node as ineligible
node.SchedulingEligibility = structs.NodeSchedulingIneligible
// Evaluate the node update
eval2 := &structs.Evaluation{
Namespace: structs.DefaultNamespace,
ID: uuid.Generate(),
Priority: job.Priority,
TriggeredBy: structs.EvalTriggerNodeUpdate,
NodeID: node.ID,
JobID: job.ID,
Status: structs.EvalStatusPending,
}
require.Nil(t, h.State.UpsertEvals(structs.MsgTypeTestSetup, h.NextIndex(), []*structs.Evaluation{eval2}))
require.Nil(t, h.Process(NewSystemScheduler, eval2))
require.Equal(t, "complete", h.Evals[1].Status)
// Ensure no new plans
require.Len(t, h.Plans, 1)
// Ensure all NodeAllocations are from first Eval
for _, allocs := range h.Plans[0].NodeAllocation {
require.Len(t, allocs, 1)
require.Equal(t, eval.ID, allocs[0].EvalID)
}
// Add a new node Class-B
var nodeBTwo *structs.Node
nodeBTwo = mock.Node()
nodeBTwo.NodeClass = "Class-B"
require.NoError(t, nodeBTwo.ComputeClass())
require.Nil(t, h.State.UpsertNode(structs.MsgTypeTestSetup, h.NextIndex(), nodeBTwo))
// Evaluate the new node
eval3 := &structs.Evaluation{
Namespace: structs.DefaultNamespace,
ID: uuid.Generate(),
Priority: 50,
TriggeredBy: structs.EvalTriggerNodeUpdate,
NodeID: nodeBTwo.ID,
JobID: job.ID,
Status: structs.EvalStatusPending,
}
// Ensure New eval is complete
require.Nil(t, h.State.UpsertEvals(structs.MsgTypeTestSetup, h.NextIndex(), []*structs.Evaluation{eval3}))
require.Nil(t, h.Process(NewSystemScheduler, eval3))
require.Equal(t, "complete", h.Evals[2].Status)
// Ensure `groupA` fails to be placed due to its constraint, but `groupB` doesn't
require.Len(t, h.Evals[2].FailedTGAllocs, 1)
require.Contains(t, h.Evals[2].FailedTGAllocs, "groupA")
require.NotContains(t, h.Evals[2].FailedTGAllocs, "groupB")
require.Len(t, h.Plans, 2)
require.Len(t, h.Plans[1].NodeAllocation, 1)
// Ensure all NodeAllocations are from first Eval
for _, allocs := range h.Plans[1].NodeAllocation {
require.Len(t, allocs, 1)
require.Equal(t, eval3.ID, allocs[0].EvalID)
}
ws := memdb.NewWatchSet()
allocsNodeOne, err := h.State.AllocsByNode(ws, node.ID)
require.NoError(t, err)
require.Len(t, allocsNodeOne, 1)
allocsNodeTwo, err := h.State.AllocsByNode(ws, nodeB.ID)
require.NoError(t, err)
require.Len(t, allocsNodeTwo, 1)
allocsNodeThree, err := h.State.AllocsByNode(ws, nodeBTwo.ID)
require.NoError(t, err)
require.Len(t, allocsNodeThree, 1)
}
// No errors reported when no available nodes prevent placement
func TestSystemSched_ExistingAllocNoNodes(t *testing.T) {
ci.Parallel(t)
h := NewHarness(t)
var node *structs.Node
// Create a node
node = mock.Node()
require.NoError(t, node.ComputeClass())
require.Nil(t, h.State.UpsertNode(structs.MsgTypeTestSetup, h.NextIndex(), node))
// Make a job
job := mock.SystemJob()
require.Nil(t, h.State.UpsertJob(structs.MsgTypeTestSetup, h.NextIndex(), nil, job))
// Evaluate the job
eval := &structs.Evaluation{
Namespace: structs.DefaultNamespace,
ID: uuid.Generate(),
Priority: job.Priority,
TriggeredBy: structs.EvalTriggerJobRegister,
JobID: job.ID,
Status: structs.EvalStatusPending,
}
require.Nil(t, h.State.UpsertEvals(structs.MsgTypeTestSetup, h.NextIndex(), []*structs.Evaluation{eval}))
require.Nil(t, h.Process(NewSystemScheduler, eval))
require.Equal(t, "complete", h.Evals[0].Status)
// QueuedAllocations is drained
val, ok := h.Evals[0].QueuedAllocations["web"]
require.True(t, ok)
require.Equal(t, 0, val)
// The plan has one NodeAllocations
require.Equal(t, 1, len(h.Plans))
// Mark the node as ineligible
node.SchedulingEligibility = structs.NodeSchedulingIneligible
// Evaluate the job
eval2 := &structs.Evaluation{
Namespace: structs.DefaultNamespace,
ID: uuid.Generate(),
Priority: job.Priority,
TriggeredBy: structs.EvalTriggerNodeUpdate,
JobID: job.ID,
NodeID: node.ID,
Status: structs.EvalStatusPending,
}
require.Nil(t, h.State.UpsertEvals(structs.MsgTypeTestSetup, h.NextIndex(), []*structs.Evaluation{eval2}))
require.Nil(t, h.Process(NewSystemScheduler, eval2))
require.Equal(t, "complete", h.Evals[1].Status)
// Create a new job version, deploy
job2 := job.Copy()
job2.Meta["version"] = "2"
require.Nil(t, h.State.UpsertJob(structs.MsgTypeTestSetup, h.NextIndex(), nil, job2))
// Run evaluation as a plan
eval3 := &structs.Evaluation{
Namespace: structs.DefaultNamespace,
ID: uuid.Generate(),
Priority: job2.Priority,
TriggeredBy: structs.EvalTriggerJobRegister,
JobID: job2.ID,
Status: structs.EvalStatusPending,
AnnotatePlan: true,
}
// Ensure New eval is complete
require.Nil(t, h.State.UpsertEvals(structs.MsgTypeTestSetup, h.NextIndex(), []*structs.Evaluation{eval3}))
require.Nil(t, h.Process(NewSystemScheduler, eval3))
require.Equal(t, "complete", h.Evals[2].Status)
// Ensure there are no FailedTGAllocs
require.Equal(t, 0, len(h.Evals[2].FailedTGAllocs))
require.Equal(t, 0, h.Evals[2].QueuedAllocations[job2.Name])
}
// No errors reported when constraints prevent placement
func TestSystemSched_ConstraintErrors(t *testing.T) {
ci.Parallel(t)
h := NewHarness(t)
var node *structs.Node
// Register some nodes
// the tag "aaaaaa" is hashed so that the nodes are processed
// in an order other than good, good, bad
for _, tag := range []string{"aaaaaa", "foo", "foo", "foo"} {
node = mock.Node()
node.Meta["tag"] = tag
require.NoError(t, node.ComputeClass())
require.Nil(t, h.State.UpsertNode(structs.MsgTypeTestSetup, h.NextIndex(), node))
}
// Mark the last node as ineligible
node.SchedulingEligibility = structs.NodeSchedulingIneligible
2019-04-30 19:32:39 +00:00
// Make a job with a constraint that matches a subset of the nodes
job := mock.SystemJob()
job.Constraints = append(job.Constraints,
&structs.Constraint{
LTarget: "${meta.tag}",
RTarget: "foo",
Operand: "=",
})
require.Nil(t, h.State.UpsertJob(structs.MsgTypeTestSetup, h.NextIndex(), nil, job))
// Evaluate the job
eval := &structs.Evaluation{
Namespace: structs.DefaultNamespace,
ID: uuid.Generate(),
Priority: job.Priority,
TriggeredBy: structs.EvalTriggerJobRegister,
JobID: job.ID,
Status: structs.EvalStatusPending,
}
require.Nil(t, h.State.UpsertEvals(structs.MsgTypeTestSetup, h.NextIndex(), []*structs.Evaluation{eval}))
require.Nil(t, h.Process(NewSystemScheduler, eval))
require.Equal(t, "complete", h.Evals[0].Status)
// QueuedAllocations is drained
val, ok := h.Evals[0].QueuedAllocations["web"]
require.True(t, ok)
require.Equal(t, 0, val)
// The plan has two NodeAllocations
require.Equal(t, 1, len(h.Plans))
require.Nil(t, h.Plans[0].Annotations)
require.Equal(t, 2, len(h.Plans[0].NodeAllocation))
// Two nodes were allocated and are running
ws := memdb.NewWatchSet()
as, err := h.State.AllocsByJob(ws, structs.DefaultNamespace, job.ID, false)
require.Nil(t, err)
running := 0
for _, a := range as {
if "running" == a.Job.Status {
running++
}
}
require.Equal(t, 2, len(as))
require.Equal(t, 2, running)
// Failed allocations is empty
require.Equal(t, 0, len(h.Evals[0].FailedTGAllocs))
}
func TestSystemSched_ChainedAlloc(t *testing.T) {
ci.Parallel(t)
h := NewHarness(t)
// Create some nodes
_ = createNodes(t, h, 10)
// Create a job
job := mock.SystemJob()
require.NoError(t, h.State.UpsertJob(structs.MsgTypeTestSetup, h.NextIndex(), nil, job))
// Create a mock evaluation to register the job
eval := &structs.Evaluation{
2017-09-07 23:56:15 +00:00
Namespace: structs.DefaultNamespace,
ID: uuid.Generate(),
Priority: job.Priority,
TriggeredBy: structs.EvalTriggerJobRegister,
JobID: job.ID,
Status: structs.EvalStatusPending,
}
require.NoError(t, h.State.UpsertEvals(structs.MsgTypeTestSetup, h.NextIndex(), []*structs.Evaluation{eval}))
// Process the evaluation
err := h.Process(NewSystemScheduler, eval)
require.NoError(t, err)
var allocIDs []string
for _, allocList := range h.Plans[0].NodeAllocation {
for _, alloc := range allocList {
allocIDs = append(allocIDs, alloc.ID)
}
}
sort.Strings(allocIDs)
// Create a new harness to invoke the scheduler again
h1 := NewHarnessWithState(t, h.State)
job1 := mock.SystemJob()
job1.ID = job.ID
job1.TaskGroups[0].Tasks[0].Env = make(map[string]string)
job1.TaskGroups[0].Tasks[0].Env["foo"] = "bar"
require.NoError(t, h1.State.UpsertJob(structs.MsgTypeTestSetup, h1.NextIndex(), nil, job1))
// Insert two more nodes
for i := 0; i < 2; i++ {
node := mock.Node()
require.NoError(t, h.State.UpsertNode(structs.MsgTypeTestSetup, h.NextIndex(), node))
}
// Create a mock evaluation to update the job
eval1 := &structs.Evaluation{
2017-09-07 23:56:15 +00:00
Namespace: structs.DefaultNamespace,
ID: uuid.Generate(),
Priority: job1.Priority,
TriggeredBy: structs.EvalTriggerJobRegister,
JobID: job1.ID,
Status: structs.EvalStatusPending,
}
require.NoError(t, h.State.UpsertEvals(structs.MsgTypeTestSetup, h.NextIndex(), []*structs.Evaluation{eval1}))
// Process the evaluation
if err := h1.Process(NewSystemScheduler, eval1); err != nil {
t.Fatalf("err: %v", err)
}
require.Len(t, h.Plans, 1)
plan := h1.Plans[0]
// Collect all the chained allocation ids and the new allocations which
// don't have any chained allocations
var prevAllocs []string
var newAllocs []string
for _, allocList := range plan.NodeAllocation {
for _, alloc := range allocList {
if alloc.PreviousAllocation == "" {
newAllocs = append(newAllocs, alloc.ID)
continue
}
prevAllocs = append(prevAllocs, alloc.PreviousAllocation)
}
}
sort.Strings(prevAllocs)
2018-03-11 17:51:41 +00:00
// Ensure that the new allocations has their corresponding original
// allocation ids
require.Equal(t, allocIDs, prevAllocs)
// Ensuring two new allocations don't have any chained allocations
require.Len(t, newAllocs, 2)
}
func TestSystemSched_PlanWithDrainedNode(t *testing.T) {
ci.Parallel(t)
h := NewHarness(t)
// Register two nodes with two different classes
node := mock.DrainNode()
node.NodeClass = "green"
require.NoError(t, node.ComputeClass())
require.NoError(t, h.State.UpsertNode(structs.MsgTypeTestSetup, h.NextIndex(), node))
node2 := mock.Node()
node2.NodeClass = "blue"
require.NoError(t, node2.ComputeClass())
require.NoError(t, h.State.UpsertNode(structs.MsgTypeTestSetup, h.NextIndex(), node2))
2017-12-13 17:36:03 +00:00
// Create a Job with two task groups, each constrained on node class
job := mock.SystemJob()
tg1 := job.TaskGroups[0]
tg1.Constraints = append(tg1.Constraints,
&structs.Constraint{
LTarget: "${node.class}",
RTarget: "green",
Operand: "==",
})
tg2 := tg1.Copy()
tg2.Name = "web2"
tg2.Constraints[0].RTarget = "blue"
job.TaskGroups = append(job.TaskGroups, tg2)
require.NoError(t, h.State.UpsertJob(structs.MsgTypeTestSetup, h.NextIndex(), nil, job))
// Create an allocation on each node
alloc := mock.Alloc()
alloc.Job = job
alloc.JobID = job.ID
alloc.NodeID = node.ID
alloc.Name = "my-job.web[0]"
alloc.DesiredTransition.Migrate = pointer.Of(true)
alloc.TaskGroup = "web"
alloc2 := mock.Alloc()
alloc2.Job = job
alloc2.JobID = job.ID
alloc2.NodeID = node2.ID
alloc2.Name = "my-job.web2[0]"
alloc2.TaskGroup = "web2"
require.NoError(t, h.State.UpsertAllocs(structs.MsgTypeTestSetup, h.NextIndex(), []*structs.Allocation{alloc, alloc2}))
// Create a mock evaluation to deal with drain
eval := &structs.Evaluation{
2017-09-07 23:56:15 +00:00
Namespace: structs.DefaultNamespace,
ID: uuid.Generate(),
Priority: 50,
TriggeredBy: structs.EvalTriggerNodeUpdate,
JobID: job.ID,
NodeID: node.ID,
Status: structs.EvalStatusPending,
}
require.NoError(t, h.State.UpsertEvals(structs.MsgTypeTestSetup, h.NextIndex(), []*structs.Evaluation{eval}))
// Process the evaluation
err := h.Process(NewSystemScheduler, eval)
require.NoError(t, err)
// Ensure a single plan
require.Len(t, h.Plans, 1)
plan := h.Plans[0]
// Ensure the plan evicted the alloc on the failed node
planned := plan.NodeUpdate[node.ID]
require.Len(t, plan.NodeUpdate[node.ID], 1)
// Ensure the plan didn't place
require.Empty(t, plan.NodeAllocation)
// Ensure the allocations is stopped
require.Equal(t, structs.AllocDesiredStatusStop, planned[0].DesiredStatus)
h.AssertEvalStatus(t, structs.EvalStatusComplete)
}
func TestSystemSched_QueuedAllocsMultTG(t *testing.T) {
ci.Parallel(t)
h := NewHarness(t)
// Register two nodes with two different classes
node := mock.Node()
node.NodeClass = "green"
require.NoError(t, node.ComputeClass())
require.NoError(t, h.State.UpsertNode(structs.MsgTypeTestSetup, h.NextIndex(), node))
node2 := mock.Node()
node2.NodeClass = "blue"
require.NoError(t, node2.ComputeClass())
require.NoError(t, h.State.UpsertNode(structs.MsgTypeTestSetup, h.NextIndex(), node2))
2017-12-13 17:36:03 +00:00
// Create a Job with two task groups, each constrained on node class
job := mock.SystemJob()
tg1 := job.TaskGroups[0]
tg1.Constraints = append(tg1.Constraints,
&structs.Constraint{
LTarget: "${node.class}",
RTarget: "green",
Operand: "==",
})
tg2 := tg1.Copy()
tg2.Name = "web2"
tg2.Constraints[0].RTarget = "blue"
job.TaskGroups = append(job.TaskGroups, tg2)
require.NoError(t, h.State.UpsertJob(structs.MsgTypeTestSetup, h.NextIndex(), nil, job))
// Create a mock evaluation to deal with drain
eval := &structs.Evaluation{
2017-09-07 23:56:15 +00:00
Namespace: structs.DefaultNamespace,
ID: uuid.Generate(),
Priority: 50,
TriggeredBy: structs.EvalTriggerNodeUpdate,
JobID: job.ID,
NodeID: node.ID,
Status: structs.EvalStatusPending,
}
require.NoError(t, h.State.UpsertEvals(structs.MsgTypeTestSetup, h.NextIndex(), []*structs.Evaluation{eval}))
// Process the evaluation
err := h.Process(NewSystemScheduler, eval)
require.NoError(t, err)
// Ensure a single plan
require.Len(t, h.Plans, 1)
qa := h.Evals[0].QueuedAllocations
require.Zero(t, qa["pinger"])
require.Zero(t, qa["pinger2"])
h.AssertEvalStatus(t, structs.EvalStatusComplete)
}
func TestSystemSched_Preemption(t *testing.T) {
ci.Parallel(t)
h := NewHarness(t)
// Create nodes
nodes := make([]*structs.Node, 0)
for i := 0; i < 2; i++ {
node := mock.Node()
// TODO: remove in 0.11
node.Resources = &structs.Resources{
CPU: 3072,
MemoryMB: 5034,
DiskMB: 20 * 1024,
Networks: []*structs.NetworkResource{{
Device: "eth0",
CIDR: "192.168.0.100/32",
MBits: 1000,
}},
}
node.NodeResources = &structs.NodeResources{
Cpu: structs.NodeCpuResources{CpuShares: 3072},
Memory: structs.NodeMemoryResources{MemoryMB: 5034},
Disk: structs.NodeDiskResources{DiskMB: 20 * 1024},
Networks: []*structs.NetworkResource{{
Device: "eth0",
CIDR: "192.168.0.100/32",
MBits: 1000,
}},
NodeNetworks: []*structs.NodeNetworkResource{{
Mode: "host",
Device: "eth0",
Addresses: []structs.NodeNetworkAddress{{
Family: structs.NodeNetworkAF_IPv4,
Alias: "default",
Address: "192.168.0.100",
}},
}},
}
require.NoError(t, h.State.UpsertNode(structs.MsgTypeTestSetup, h.NextIndex(), node))
nodes = append(nodes, node)
}
// Enable Preemption
err := h.State.SchedulerSetConfig(h.NextIndex(), &structs.SchedulerConfiguration{
PreemptionConfig: structs.PreemptionConfig{
SystemSchedulerEnabled: true,
},
})
require.NoError(t, err)
// Create some low priority batch jobs and allocations for them
// One job uses a reserved port
job1 := mock.BatchJob()
job1.Type = structs.JobTypeBatch
job1.Priority = 20
job1.TaskGroups[0].Tasks[0].Resources = &structs.Resources{
CPU: 512,
MemoryMB: 1024,
Networks: []*structs.NetworkResource{{
MBits: 200,
ReservedPorts: []structs.Port{{
Label: "web",
Value: 80,
}},
}},
}
alloc1 := mock.Alloc()
alloc1.Job = job1
alloc1.JobID = job1.ID
alloc1.NodeID = nodes[0].ID
alloc1.Name = "my-job[0]"
alloc1.TaskGroup = job1.TaskGroups[0].Name
alloc1.AllocatedResources = &structs.AllocatedResources{
Tasks: map[string]*structs.AllocatedTaskResources{
"web": {
Cpu: structs.AllocatedCpuResources{CpuShares: 512},
Memory: structs.AllocatedMemoryResources{MemoryMB: 1024},
Networks: []*structs.NetworkResource{{
Device: "eth0",
IP: "192.168.0.100",
MBits: 200,
}},
},
},
Shared: structs.AllocatedSharedResources{DiskMB: 5 * 1024},
}
require.NoError(t, h.State.UpsertJob(structs.MsgTypeTestSetup, h.NextIndex(), nil, job1))
job2 := mock.BatchJob()
job2.Type = structs.JobTypeBatch
job2.Priority = 20
job2.TaskGroups[0].Tasks[0].Resources = &structs.Resources{
CPU: 512,
MemoryMB: 1024,
Networks: []*structs.NetworkResource{{MBits: 200}},
}
alloc2 := mock.Alloc()
alloc2.Job = job2
alloc2.JobID = job2.ID
alloc2.NodeID = nodes[0].ID
alloc2.Name = "my-job[2]"
alloc2.TaskGroup = job2.TaskGroups[0].Name
alloc2.AllocatedResources = &structs.AllocatedResources{
Tasks: map[string]*structs.AllocatedTaskResources{
"web": {
Cpu: structs.AllocatedCpuResources{CpuShares: 512},
Memory: structs.AllocatedMemoryResources{MemoryMB: 1024},
Networks: []*structs.NetworkResource{{
Device: "eth0",
IP: "192.168.0.100",
MBits: 200,
}},
},
},
Shared: structs.AllocatedSharedResources{DiskMB: 5 * 1024},
}
require.NoError(t, h.State.UpsertJob(structs.MsgTypeTestSetup, h.NextIndex(), nil, job2))
job3 := mock.Job()
job3.Type = structs.JobTypeBatch
job3.Priority = 40
job3.TaskGroups[0].Tasks[0].Resources = &structs.Resources{
CPU: 1024,
MemoryMB: 2048,
Networks: []*structs.NetworkResource{{
Device: "eth0",
MBits: 400,
}},
}
alloc3 := mock.Alloc()
alloc3.Job = job3
alloc3.JobID = job3.ID
alloc3.NodeID = nodes[0].ID
alloc3.Name = "my-job[0]"
alloc3.TaskGroup = job3.TaskGroups[0].Name
alloc3.AllocatedResources = &structs.AllocatedResources{
Tasks: map[string]*structs.AllocatedTaskResources{
"web": {
Cpu: structs.AllocatedCpuResources{CpuShares: 1024},
Memory: structs.AllocatedMemoryResources{MemoryMB: 25},
Networks: []*structs.NetworkResource{{
Device: "eth0",
IP: "192.168.0.100",
MBits: 400,
}},
},
},
Shared: structs.AllocatedSharedResources{DiskMB: 5 * 1024},
}
require.NoError(t, h.State.UpsertAllocs(structs.MsgTypeTestSetup, h.NextIndex(), []*structs.Allocation{alloc1, alloc2, alloc3}))
// Create a high priority job and allocs for it
// These allocs should not be preempted
job4 := mock.BatchJob()
job4.Type = structs.JobTypeBatch
job4.Priority = 100
job4.TaskGroups[0].Tasks[0].Resources = &structs.Resources{
CPU: 1024,
MemoryMB: 2048,
Networks: []*structs.NetworkResource{{MBits: 100}},
}
alloc4 := mock.Alloc()
alloc4.Job = job4
alloc4.JobID = job4.ID
alloc4.NodeID = nodes[0].ID
alloc4.Name = "my-job4[0]"
alloc4.TaskGroup = job4.TaskGroups[0].Name
alloc4.AllocatedResources = &structs.AllocatedResources{
Tasks: map[string]*structs.AllocatedTaskResources{
"web": {
Cpu: structs.AllocatedCpuResources{
CpuShares: 1024,
},
Memory: structs.AllocatedMemoryResources{
MemoryMB: 2048,
},
Networks: []*structs.NetworkResource{
{
Device: "eth0",
IP: "192.168.0.100",
ReservedPorts: []structs.Port{{Label: "web", Value: 80}},
MBits: 100,
},
},
},
},
Shared: structs.AllocatedSharedResources{
DiskMB: 2 * 1024,
},
}
require.NoError(t, h.State.UpsertJob(structs.MsgTypeTestSetup, h.NextIndex(), nil, job4))
require.NoError(t, h.State.UpsertAllocs(structs.MsgTypeTestSetup, h.NextIndex(), []*structs.Allocation{alloc4}))
// Create a system job such that it would need to preempt both allocs to succeed
job := mock.SystemJob()
job.TaskGroups[0].Tasks[0].Resources = &structs.Resources{
CPU: 1948,
MemoryMB: 256,
Networks: []*structs.NetworkResource{{
MBits: 800,
DynamicPorts: []structs.Port{{Label: "http"}},
}},
}
require.NoError(t, h.State.UpsertJob(structs.MsgTypeTestSetup, h.NextIndex(), nil, job))
// Create a mock evaluation to register the job
eval := &structs.Evaluation{
Namespace: structs.DefaultNamespace,
ID: uuid.Generate(),
Priority: job.Priority,
TriggeredBy: structs.EvalTriggerJobRegister,
JobID: job.ID,
Status: structs.EvalStatusPending,
}
require.NoError(t, h.State.UpsertEvals(structs.MsgTypeTestSetup, h.NextIndex(), []*structs.Evaluation{eval}))
// Process the evaluation
err = h.Process(NewSystemScheduler, eval)
require.Nil(t, err)
// Ensure a single plan
require.Equal(t, 1, len(h.Plans))
plan := h.Plans[0]
// Ensure the plan doesn't have annotations
require.Nil(t, plan.Annotations)
// Ensure the plan allocated on both nodes
var planned []*structs.Allocation
preemptingAllocId := ""
require.Equal(t, 2, len(plan.NodeAllocation))
// The alloc that got placed on node 1 is the preemptor
for _, allocList := range plan.NodeAllocation {
planned = append(planned, allocList...)
for _, alloc := range allocList {
if alloc.NodeID == nodes[0].ID {
preemptingAllocId = alloc.ID
}
}
}
// Lookup the allocations by JobID
ws := memdb.NewWatchSet()
out, err := h.State.AllocsByJob(ws, job.Namespace, job.ID, false)
require.NoError(t, err)
// Ensure all allocations placed
require.Equal(t, 2, len(out))
// Verify that one node has preempted allocs
require.NotNil(t, plan.NodePreemptions[nodes[0].ID])
preemptedAllocs := plan.NodePreemptions[nodes[0].ID]
// Verify that three jobs have preempted allocs
require.Equal(t, 3, len(preemptedAllocs))
expectedPreemptedJobIDs := []string{job1.ID, job2.ID, job3.ID}
// We expect job1, job2 and job3 to have preempted allocations
// job4 should not have any allocs preempted
for _, alloc := range preemptedAllocs {
require.Contains(t, expectedPreemptedJobIDs, alloc.JobID)
}
// Look up the preempted allocs by job ID
ws = memdb.NewWatchSet()
for _, jobId := range expectedPreemptedJobIDs {
out, err = h.State.AllocsByJob(ws, structs.DefaultNamespace, jobId, false)
require.NoError(t, err)
for _, alloc := range out {
require.Equal(t, structs.AllocDesiredStatusEvict, alloc.DesiredStatus)
require.Equal(t, fmt.Sprintf("Preempted by alloc ID %v", preemptingAllocId), alloc.DesiredDescription)
}
}
h.AssertEvalStatus(t, structs.EvalStatusComplete)
}
func TestSystemSched_canHandle(t *testing.T) {
ci.Parallel(t)
s := SystemScheduler{sysbatch: false}
t.Run("system register", func(t *testing.T) {
require.True(t, s.canHandle(structs.EvalTriggerJobRegister))
})
t.Run("system scheduled", func(t *testing.T) {
require.False(t, s.canHandle(structs.EvalTriggerScheduled))
})
t.Run("system periodic", func(t *testing.T) {
require.False(t, s.canHandle(structs.EvalTriggerPeriodicJob))
})
}
func TestSystemSched_NodeDisconnected(t *testing.T) {
ci.Parallel(t)
systemJob := mock.SystemJob()
systemAlloc := mock.SystemAlloc()
systemAlloc.Name = fmt.Sprintf("my-job.%s[0]", systemJob.TaskGroups[0].Name)
sysBatchJob := mock.SystemBatchJob()
sysBatchJob.TaskGroups[0].Tasks[0].Env = make(map[string]string)
sysBatchJob.TaskGroups[0].Tasks[0].Env["foo"] = "bar"
sysBatchAlloc := mock.SysBatchAlloc()
sysBatchAlloc.Name = fmt.Sprintf("my-sysbatch.%s[0]", sysBatchJob.TaskGroups[0].Name)
now := time.Now().UTC()
unknownAllocState := []*structs.AllocState{{
Field: structs.AllocStateFieldClientStatus,
Value: structs.AllocClientStatusUnknown,
Time: now,
}}
expiredAllocState := []*structs.AllocState{{
Field: structs.AllocStateFieldClientStatus,
Value: structs.AllocClientStatusUnknown,
Time: now.Add(-60 * time.Second),
}}
Update alloc after reconnect and enforece client heartbeat order (#15068) * scheduler: allow updates after alloc reconnects When an allocation reconnects to a cluster the scheduler needs to run special logic to handle the reconnection, check if a replacement was create and stop one of them. If the allocation kept running while the node was disconnected, it will be reconnected with `ClientStatus: running` and the node will have `Status: ready`. This combination is the same as the normal steady state of allocation, where everything is running as expected. In order to differentiate between the two states (an allocation that is reconnecting and one that is just running) the scheduler needs an extra piece of state. The current implementation uses the presence of a `TaskClientReconnected` task event to detect when the allocation has reconnected and thus must go through the reconnection process. But this event remains even after the allocation is reconnected, causing all future evals to consider the allocation as still reconnecting. This commit changes the reconnect logic to use an `AllocState` to register when the allocation was reconnected. This provides the following benefits: - Only a limited number of task states are kept, and they are used for many other events. It's possible that, upon reconnecting, several actions are triggered that could cause the `TaskClientReconnected` event to be dropped. - Task events are set by clients and so their timestamps are subject to time skew from servers. This prevents using time to determine if an allocation reconnected after a disconnect event. - Disconnect events are already stored as `AllocState` and so storing reconnects there as well makes it the only source of information required. With the new logic, the reconnection logic is only triggered if the last `AllocState` is a disconnect event, meaning that the allocation has not been reconnected yet. After the reconnection is handled, the new `ClientStatus` is store in `AllocState` allowing future evals to skip the reconnection logic. * scheduler: prevent spurious placement on reconnect When a client reconnects it makes two independent RPC calls: - `Node.UpdateStatus` to heartbeat and set its status as `ready`. - `Node.UpdateAlloc` to update the status of its allocations. These two calls can happen in any order, and in case the allocations are updated before a heartbeat it causes the state to be the same as a node being disconnected: the node status will still be `disconnected` while the allocation `ClientStatus` is set to `running`. The current implementation did not handle this order of events properly, and the scheduler would create an unnecessary placement since it considered the allocation was being disconnected. This extra allocation would then be quickly stopped by the heartbeat eval. This commit adds a new code path to handle this order of events. If the node is `disconnected` and the allocation `ClientStatus` is `running` the scheduler will check if the allocation is actually reconnecting using its `AllocState` events. * rpc: only allow alloc updates from `ready` nodes Clients interact with servers using three main RPC methods: - `Node.GetAllocs` reads allocation data from the server and writes it to the client. - `Node.UpdateAlloc` reads allocation from from the client and writes them to the server. - `Node.UpdateStatus` writes the client status to the server and is used as the heartbeat mechanism. These three methods are called periodically by the clients and are done so independently from each other, meaning that there can't be any assumptions in their ordering. This can generate scenarios that are hard to reason about and to code for. For example, when a client misses too many heartbeats it will be considered `down` or `disconnected` and the allocations it was running are set to `lost` or `unknown`. When connectivity is restored the to rest of the cluster, the natural mental model is to think that the client will heartbeat first and then update its allocations status into the servers. But since there's no inherit order in these calls the reverse is just as possible: the client updates the alloc status and then heartbeats. This results in a state where allocs are, for example, `running` while the client is still `disconnected`. This commit adds a new verification to the `Node.UpdateAlloc` method to reject updates from nodes that are not `ready`, forcing clients to heartbeat first. Since this check is done server-side there is no need to coordinate operations client-side: they can continue sending these requests independently and alloc update will succeed after the heartbeat is done. * chagelog: add entry for #15068 * code review * client: skip terminal allocations on reconnect When the client reconnects with the server it synchronizes the state of its allocations by sending data using the `Node.UpdateAlloc` RPC and fetching data using the `Node.GetClientAllocs` RPC. If the data fetch happens before the data write, `unknown` allocations will still be in this state and would trigger the `allocRunner.Reconnect` flow. But when the server `DesiredStatus` for the allocation is `stop` the client should not reconnect the allocation. * apply more code review changes * scheduler: persist changes to reconnected allocs Reconnected allocs have a new AllocState entry that must be persisted by the plan applier. * rpc: read node ID from allocs in UpdateAlloc The AllocUpdateRequest struct is used in three disjoint use cases: 1. Stripped allocs from clients Node.UpdateAlloc RPC using the Allocs, and WriteRequest fields 2. Raft log message using the Allocs, Evals, and WriteRequest fields 3. Plan updates using the AllocsStopped, AllocsUpdated, and Job fields Adding a new field that would only be used in one these cases (1) made things more confusing and error prone. While in theory an AllocUpdateRequest could send allocations from different nodes, in practice this never actually happens since only clients call this method with their own allocations. * scheduler: remove logic to handle exceptional case This condition could only be hit if, somehow, the allocation status was set to "running" while the client was "unknown". This was addressed by enforcing an order in "Node.UpdateStatus" and "Node.UpdateAlloc" RPC calls, so this scenario is not expected to happen. Adding unnecessary code to the scheduler makes it harder to read and reason about it. * more code review * remove another unused test
2022-11-04 20:25:11 +00:00
reconnectedAllocState := []*structs.AllocState{
{
Field: structs.AllocStateFieldClientStatus,
Value: structs.AllocClientStatusUnknown,
Time: now.Add(-60 * time.Second),
},
{
Field: structs.AllocStateFieldClientStatus,
Value: structs.AllocClientStatusRunning,
Time: now,
},
}
successTaskState := map[string]*structs.TaskState{
systemJob.TaskGroups[0].Tasks[0].Name: {
State: structs.TaskStateDead,
Failed: false,
},
}
type testCase struct {
name string
jobType string
exists bool
required bool
migrate bool
draining bool
targeted bool
modifyJob bool
previousTerminal bool
nodeStatus string
clientStatus string
desiredStatus string
allocState []*structs.AllocState
taskState map[string]*structs.TaskState
expectedPlanCount int
expectedNodeAllocation map[string]*structs.Allocation
expectedNodeUpdate map[string]*structs.Allocation
}
testCases := []testCase{
{
name: "system-running-disconnect",
jobType: structs.JobTypeSystem,
exists: true,
required: true,
nodeStatus: structs.NodeStatusDisconnected,
migrate: false,
draining: false,
targeted: true,
modifyJob: false,
previousTerminal: false,
clientStatus: structs.AllocClientStatusRunning,
desiredStatus: structs.AllocDesiredStatusRun,
allocState: nil,
expectedPlanCount: 1,
expectedNodeAllocation: map[string]*structs.Allocation{
"id": {
ClientStatus: structs.AllocClientStatusUnknown,
DesiredStatus: structs.AllocDesiredStatusRun,
},
},
expectedNodeUpdate: nil,
},
{
name: "system-running-reconnect",
jobType: structs.JobTypeSystem,
required: true,
exists: true,
nodeStatus: structs.NodeStatusReady,
migrate: false,
draining: false,
targeted: true,
modifyJob: false,
previousTerminal: false,
clientStatus: structs.AllocClientStatusRunning,
desiredStatus: structs.AllocDesiredStatusRun,
Update alloc after reconnect and enforece client heartbeat order (#15068) * scheduler: allow updates after alloc reconnects When an allocation reconnects to a cluster the scheduler needs to run special logic to handle the reconnection, check if a replacement was create and stop one of them. If the allocation kept running while the node was disconnected, it will be reconnected with `ClientStatus: running` and the node will have `Status: ready`. This combination is the same as the normal steady state of allocation, where everything is running as expected. In order to differentiate between the two states (an allocation that is reconnecting and one that is just running) the scheduler needs an extra piece of state. The current implementation uses the presence of a `TaskClientReconnected` task event to detect when the allocation has reconnected and thus must go through the reconnection process. But this event remains even after the allocation is reconnected, causing all future evals to consider the allocation as still reconnecting. This commit changes the reconnect logic to use an `AllocState` to register when the allocation was reconnected. This provides the following benefits: - Only a limited number of task states are kept, and they are used for many other events. It's possible that, upon reconnecting, several actions are triggered that could cause the `TaskClientReconnected` event to be dropped. - Task events are set by clients and so their timestamps are subject to time skew from servers. This prevents using time to determine if an allocation reconnected after a disconnect event. - Disconnect events are already stored as `AllocState` and so storing reconnects there as well makes it the only source of information required. With the new logic, the reconnection logic is only triggered if the last `AllocState` is a disconnect event, meaning that the allocation has not been reconnected yet. After the reconnection is handled, the new `ClientStatus` is store in `AllocState` allowing future evals to skip the reconnection logic. * scheduler: prevent spurious placement on reconnect When a client reconnects it makes two independent RPC calls: - `Node.UpdateStatus` to heartbeat and set its status as `ready`. - `Node.UpdateAlloc` to update the status of its allocations. These two calls can happen in any order, and in case the allocations are updated before a heartbeat it causes the state to be the same as a node being disconnected: the node status will still be `disconnected` while the allocation `ClientStatus` is set to `running`. The current implementation did not handle this order of events properly, and the scheduler would create an unnecessary placement since it considered the allocation was being disconnected. This extra allocation would then be quickly stopped by the heartbeat eval. This commit adds a new code path to handle this order of events. If the node is `disconnected` and the allocation `ClientStatus` is `running` the scheduler will check if the allocation is actually reconnecting using its `AllocState` events. * rpc: only allow alloc updates from `ready` nodes Clients interact with servers using three main RPC methods: - `Node.GetAllocs` reads allocation data from the server and writes it to the client. - `Node.UpdateAlloc` reads allocation from from the client and writes them to the server. - `Node.UpdateStatus` writes the client status to the server and is used as the heartbeat mechanism. These three methods are called periodically by the clients and are done so independently from each other, meaning that there can't be any assumptions in their ordering. This can generate scenarios that are hard to reason about and to code for. For example, when a client misses too many heartbeats it will be considered `down` or `disconnected` and the allocations it was running are set to `lost` or `unknown`. When connectivity is restored the to rest of the cluster, the natural mental model is to think that the client will heartbeat first and then update its allocations status into the servers. But since there's no inherit order in these calls the reverse is just as possible: the client updates the alloc status and then heartbeats. This results in a state where allocs are, for example, `running` while the client is still `disconnected`. This commit adds a new verification to the `Node.UpdateAlloc` method to reject updates from nodes that are not `ready`, forcing clients to heartbeat first. Since this check is done server-side there is no need to coordinate operations client-side: they can continue sending these requests independently and alloc update will succeed after the heartbeat is done. * chagelog: add entry for #15068 * code review * client: skip terminal allocations on reconnect When the client reconnects with the server it synchronizes the state of its allocations by sending data using the `Node.UpdateAlloc` RPC and fetching data using the `Node.GetClientAllocs` RPC. If the data fetch happens before the data write, `unknown` allocations will still be in this state and would trigger the `allocRunner.Reconnect` flow. But when the server `DesiredStatus` for the allocation is `stop` the client should not reconnect the allocation. * apply more code review changes * scheduler: persist changes to reconnected allocs Reconnected allocs have a new AllocState entry that must be persisted by the plan applier. * rpc: read node ID from allocs in UpdateAlloc The AllocUpdateRequest struct is used in three disjoint use cases: 1. Stripped allocs from clients Node.UpdateAlloc RPC using the Allocs, and WriteRequest fields 2. Raft log message using the Allocs, Evals, and WriteRequest fields 3. Plan updates using the AllocsStopped, AllocsUpdated, and Job fields Adding a new field that would only be used in one these cases (1) made things more confusing and error prone. While in theory an AllocUpdateRequest could send allocations from different nodes, in practice this never actually happens since only clients call this method with their own allocations. * scheduler: remove logic to handle exceptional case This condition could only be hit if, somehow, the allocation status was set to "running" while the client was "unknown". This was addressed by enforcing an order in "Node.UpdateStatus" and "Node.UpdateAlloc" RPC calls, so this scenario is not expected to happen. Adding unnecessary code to the scheduler makes it harder to read and reason about it. * more code review * remove another unused test
2022-11-04 20:25:11 +00:00
allocState: reconnectedAllocState,
expectedPlanCount: 0,
expectedNodeAllocation: nil,
expectedNodeUpdate: nil,
},
{
name: "system-unknown-expired",
jobType: structs.JobTypeSystem,
required: true,
exists: true,
nodeStatus: structs.NodeStatusDisconnected,
migrate: false,
draining: false,
targeted: true,
modifyJob: false,
previousTerminal: false,
clientStatus: structs.AllocClientStatusUnknown,
desiredStatus: structs.AllocDesiredStatusRun,
allocState: expiredAllocState,
expectedPlanCount: 1,
expectedNodeAllocation: nil,
expectedNodeUpdate: map[string]*structs.Allocation{
"id": {
ClientStatus: structs.AllocClientStatusLost,
DesiredStatus: structs.AllocDesiredStatusStop,
},
},
},
{
name: "system-migrate",
jobType: structs.JobTypeSysBatch,
required: true,
exists: true,
nodeStatus: structs.NodeStatusReady,
migrate: true,
draining: true,
targeted: true,
modifyJob: false,
previousTerminal: false,
clientStatus: structs.AllocClientStatusRunning,
desiredStatus: structs.AllocDesiredStatusRun,
allocState: nil,
expectedPlanCount: 1,
expectedNodeAllocation: nil,
expectedNodeUpdate: map[string]*structs.Allocation{
"id": {
ClientStatus: structs.AllocClientStatusRunning,
DesiredStatus: structs.AllocDesiredStatusStop,
},
},
},
{
name: "sysbatch-running-unknown",
jobType: structs.JobTypeSysBatch,
required: true,
exists: true,
nodeStatus: structs.NodeStatusDisconnected,
migrate: false,
draining: false,
targeted: true,
modifyJob: false,
previousTerminal: false,
clientStatus: structs.AllocClientStatusRunning,
desiredStatus: structs.AllocDesiredStatusRun,
allocState: nil,
expectedPlanCount: 1,
expectedNodeAllocation: map[string]*structs.Allocation{
"id": {
ClientStatus: structs.AllocClientStatusUnknown,
DesiredStatus: structs.AllocDesiredStatusRun,
},
},
expectedNodeUpdate: nil,
},
{
name: "system-ignore-unknown",
jobType: structs.JobTypeSystem,
required: true,
exists: true,
nodeStatus: structs.NodeStatusDisconnected,
migrate: false,
draining: false,
targeted: true,
modifyJob: false,
previousTerminal: false,
clientStatus: structs.AllocClientStatusUnknown,
desiredStatus: structs.AllocDesiredStatusRun,
allocState: unknownAllocState,
expectedPlanCount: 0,
expectedNodeAllocation: nil,
expectedNodeUpdate: nil,
},
{
name: "sysbatch-ignore-unknown",
jobType: structs.JobTypeSysBatch,
required: true,
exists: true,
nodeStatus: structs.NodeStatusDisconnected,
migrate: false,
draining: false,
targeted: true,
modifyJob: false,
previousTerminal: false,
clientStatus: structs.AllocClientStatusUnknown,
desiredStatus: structs.AllocDesiredStatusRun,
allocState: unknownAllocState,
expectedPlanCount: 0,
expectedNodeAllocation: nil,
expectedNodeUpdate: nil,
},
{
name: "sysbatch-ignore-complete-disconnected",
jobType: structs.JobTypeSysBatch,
required: true,
exists: true,
nodeStatus: structs.NodeStatusDisconnected,
migrate: false,
draining: false,
targeted: true,
modifyJob: false,
previousTerminal: false,
clientStatus: structs.AllocClientStatusComplete,
desiredStatus: structs.AllocDesiredStatusRun,
allocState: unknownAllocState,
expectedPlanCount: 0,
expectedNodeAllocation: nil,
expectedNodeUpdate: nil,
},
{
name: "sysbatch-running-reconnect",
jobType: structs.JobTypeSysBatch,
required: true,
exists: true,
nodeStatus: structs.NodeStatusReady,
migrate: false,
draining: false,
targeted: true,
modifyJob: false,
previousTerminal: false,
clientStatus: structs.AllocClientStatusRunning,
desiredStatus: structs.AllocDesiredStatusRun,
Update alloc after reconnect and enforece client heartbeat order (#15068) * scheduler: allow updates after alloc reconnects When an allocation reconnects to a cluster the scheduler needs to run special logic to handle the reconnection, check if a replacement was create and stop one of them. If the allocation kept running while the node was disconnected, it will be reconnected with `ClientStatus: running` and the node will have `Status: ready`. This combination is the same as the normal steady state of allocation, where everything is running as expected. In order to differentiate between the two states (an allocation that is reconnecting and one that is just running) the scheduler needs an extra piece of state. The current implementation uses the presence of a `TaskClientReconnected` task event to detect when the allocation has reconnected and thus must go through the reconnection process. But this event remains even after the allocation is reconnected, causing all future evals to consider the allocation as still reconnecting. This commit changes the reconnect logic to use an `AllocState` to register when the allocation was reconnected. This provides the following benefits: - Only a limited number of task states are kept, and they are used for many other events. It's possible that, upon reconnecting, several actions are triggered that could cause the `TaskClientReconnected` event to be dropped. - Task events are set by clients and so their timestamps are subject to time skew from servers. This prevents using time to determine if an allocation reconnected after a disconnect event. - Disconnect events are already stored as `AllocState` and so storing reconnects there as well makes it the only source of information required. With the new logic, the reconnection logic is only triggered if the last `AllocState` is a disconnect event, meaning that the allocation has not been reconnected yet. After the reconnection is handled, the new `ClientStatus` is store in `AllocState` allowing future evals to skip the reconnection logic. * scheduler: prevent spurious placement on reconnect When a client reconnects it makes two independent RPC calls: - `Node.UpdateStatus` to heartbeat and set its status as `ready`. - `Node.UpdateAlloc` to update the status of its allocations. These two calls can happen in any order, and in case the allocations are updated before a heartbeat it causes the state to be the same as a node being disconnected: the node status will still be `disconnected` while the allocation `ClientStatus` is set to `running`. The current implementation did not handle this order of events properly, and the scheduler would create an unnecessary placement since it considered the allocation was being disconnected. This extra allocation would then be quickly stopped by the heartbeat eval. This commit adds a new code path to handle this order of events. If the node is `disconnected` and the allocation `ClientStatus` is `running` the scheduler will check if the allocation is actually reconnecting using its `AllocState` events. * rpc: only allow alloc updates from `ready` nodes Clients interact with servers using three main RPC methods: - `Node.GetAllocs` reads allocation data from the server and writes it to the client. - `Node.UpdateAlloc` reads allocation from from the client and writes them to the server. - `Node.UpdateStatus` writes the client status to the server and is used as the heartbeat mechanism. These three methods are called periodically by the clients and are done so independently from each other, meaning that there can't be any assumptions in their ordering. This can generate scenarios that are hard to reason about and to code for. For example, when a client misses too many heartbeats it will be considered `down` or `disconnected` and the allocations it was running are set to `lost` or `unknown`. When connectivity is restored the to rest of the cluster, the natural mental model is to think that the client will heartbeat first and then update its allocations status into the servers. But since there's no inherit order in these calls the reverse is just as possible: the client updates the alloc status and then heartbeats. This results in a state where allocs are, for example, `running` while the client is still `disconnected`. This commit adds a new verification to the `Node.UpdateAlloc` method to reject updates from nodes that are not `ready`, forcing clients to heartbeat first. Since this check is done server-side there is no need to coordinate operations client-side: they can continue sending these requests independently and alloc update will succeed after the heartbeat is done. * chagelog: add entry for #15068 * code review * client: skip terminal allocations on reconnect When the client reconnects with the server it synchronizes the state of its allocations by sending data using the `Node.UpdateAlloc` RPC and fetching data using the `Node.GetClientAllocs` RPC. If the data fetch happens before the data write, `unknown` allocations will still be in this state and would trigger the `allocRunner.Reconnect` flow. But when the server `DesiredStatus` for the allocation is `stop` the client should not reconnect the allocation. * apply more code review changes * scheduler: persist changes to reconnected allocs Reconnected allocs have a new AllocState entry that must be persisted by the plan applier. * rpc: read node ID from allocs in UpdateAlloc The AllocUpdateRequest struct is used in three disjoint use cases: 1. Stripped allocs from clients Node.UpdateAlloc RPC using the Allocs, and WriteRequest fields 2. Raft log message using the Allocs, Evals, and WriteRequest fields 3. Plan updates using the AllocsStopped, AllocsUpdated, and Job fields Adding a new field that would only be used in one these cases (1) made things more confusing and error prone. While in theory an AllocUpdateRequest could send allocations from different nodes, in practice this never actually happens since only clients call this method with their own allocations. * scheduler: remove logic to handle exceptional case This condition could only be hit if, somehow, the allocation status was set to "running" while the client was "unknown". This was addressed by enforcing an order in "Node.UpdateStatus" and "Node.UpdateAlloc" RPC calls, so this scenario is not expected to happen. Adding unnecessary code to the scheduler makes it harder to read and reason about it. * more code review * remove another unused test
2022-11-04 20:25:11 +00:00
allocState: reconnectedAllocState,
expectedPlanCount: 0,
expectedNodeAllocation: nil,
expectedNodeUpdate: nil,
},
{
name: "sysbatch-failed-reconnect",
jobType: structs.JobTypeSysBatch,
required: true,
exists: true,
nodeStatus: structs.NodeStatusReady,
migrate: false,
draining: false,
targeted: true,
modifyJob: false,
previousTerminal: false,
clientStatus: structs.AllocClientStatusFailed,
desiredStatus: structs.AllocDesiredStatusRun,
Update alloc after reconnect and enforece client heartbeat order (#15068) * scheduler: allow updates after alloc reconnects When an allocation reconnects to a cluster the scheduler needs to run special logic to handle the reconnection, check if a replacement was create and stop one of them. If the allocation kept running while the node was disconnected, it will be reconnected with `ClientStatus: running` and the node will have `Status: ready`. This combination is the same as the normal steady state of allocation, where everything is running as expected. In order to differentiate between the two states (an allocation that is reconnecting and one that is just running) the scheduler needs an extra piece of state. The current implementation uses the presence of a `TaskClientReconnected` task event to detect when the allocation has reconnected and thus must go through the reconnection process. But this event remains even after the allocation is reconnected, causing all future evals to consider the allocation as still reconnecting. This commit changes the reconnect logic to use an `AllocState` to register when the allocation was reconnected. This provides the following benefits: - Only a limited number of task states are kept, and they are used for many other events. It's possible that, upon reconnecting, several actions are triggered that could cause the `TaskClientReconnected` event to be dropped. - Task events are set by clients and so their timestamps are subject to time skew from servers. This prevents using time to determine if an allocation reconnected after a disconnect event. - Disconnect events are already stored as `AllocState` and so storing reconnects there as well makes it the only source of information required. With the new logic, the reconnection logic is only triggered if the last `AllocState` is a disconnect event, meaning that the allocation has not been reconnected yet. After the reconnection is handled, the new `ClientStatus` is store in `AllocState` allowing future evals to skip the reconnection logic. * scheduler: prevent spurious placement on reconnect When a client reconnects it makes two independent RPC calls: - `Node.UpdateStatus` to heartbeat and set its status as `ready`. - `Node.UpdateAlloc` to update the status of its allocations. These two calls can happen in any order, and in case the allocations are updated before a heartbeat it causes the state to be the same as a node being disconnected: the node status will still be `disconnected` while the allocation `ClientStatus` is set to `running`. The current implementation did not handle this order of events properly, and the scheduler would create an unnecessary placement since it considered the allocation was being disconnected. This extra allocation would then be quickly stopped by the heartbeat eval. This commit adds a new code path to handle this order of events. If the node is `disconnected` and the allocation `ClientStatus` is `running` the scheduler will check if the allocation is actually reconnecting using its `AllocState` events. * rpc: only allow alloc updates from `ready` nodes Clients interact with servers using three main RPC methods: - `Node.GetAllocs` reads allocation data from the server and writes it to the client. - `Node.UpdateAlloc` reads allocation from from the client and writes them to the server. - `Node.UpdateStatus` writes the client status to the server and is used as the heartbeat mechanism. These three methods are called periodically by the clients and are done so independently from each other, meaning that there can't be any assumptions in their ordering. This can generate scenarios that are hard to reason about and to code for. For example, when a client misses too many heartbeats it will be considered `down` or `disconnected` and the allocations it was running are set to `lost` or `unknown`. When connectivity is restored the to rest of the cluster, the natural mental model is to think that the client will heartbeat first and then update its allocations status into the servers. But since there's no inherit order in these calls the reverse is just as possible: the client updates the alloc status and then heartbeats. This results in a state where allocs are, for example, `running` while the client is still `disconnected`. This commit adds a new verification to the `Node.UpdateAlloc` method to reject updates from nodes that are not `ready`, forcing clients to heartbeat first. Since this check is done server-side there is no need to coordinate operations client-side: they can continue sending these requests independently and alloc update will succeed after the heartbeat is done. * chagelog: add entry for #15068 * code review * client: skip terminal allocations on reconnect When the client reconnects with the server it synchronizes the state of its allocations by sending data using the `Node.UpdateAlloc` RPC and fetching data using the `Node.GetClientAllocs` RPC. If the data fetch happens before the data write, `unknown` allocations will still be in this state and would trigger the `allocRunner.Reconnect` flow. But when the server `DesiredStatus` for the allocation is `stop` the client should not reconnect the allocation. * apply more code review changes * scheduler: persist changes to reconnected allocs Reconnected allocs have a new AllocState entry that must be persisted by the plan applier. * rpc: read node ID from allocs in UpdateAlloc The AllocUpdateRequest struct is used in three disjoint use cases: 1. Stripped allocs from clients Node.UpdateAlloc RPC using the Allocs, and WriteRequest fields 2. Raft log message using the Allocs, Evals, and WriteRequest fields 3. Plan updates using the AllocsStopped, AllocsUpdated, and Job fields Adding a new field that would only be used in one these cases (1) made things more confusing and error prone. While in theory an AllocUpdateRequest could send allocations from different nodes, in practice this never actually happens since only clients call this method with their own allocations. * scheduler: remove logic to handle exceptional case This condition could only be hit if, somehow, the allocation status was set to "running" while the client was "unknown". This was addressed by enforcing an order in "Node.UpdateStatus" and "Node.UpdateAlloc" RPC calls, so this scenario is not expected to happen. Adding unnecessary code to the scheduler makes it harder to read and reason about it. * more code review * remove another unused test
2022-11-04 20:25:11 +00:00
allocState: reconnectedAllocState,
expectedPlanCount: 0,
expectedNodeAllocation: nil,
expectedNodeUpdate: nil,
},
{
name: "sysbatch-complete-reconnect",
jobType: structs.JobTypeSysBatch,
required: true,
exists: true,
nodeStatus: structs.NodeStatusReady,
migrate: false,
draining: false,
targeted: true,
modifyJob: false,
previousTerminal: false,
clientStatus: structs.AllocClientStatusComplete,
desiredStatus: structs.AllocDesiredStatusRun,
Update alloc after reconnect and enforece client heartbeat order (#15068) * scheduler: allow updates after alloc reconnects When an allocation reconnects to a cluster the scheduler needs to run special logic to handle the reconnection, check if a replacement was create and stop one of them. If the allocation kept running while the node was disconnected, it will be reconnected with `ClientStatus: running` and the node will have `Status: ready`. This combination is the same as the normal steady state of allocation, where everything is running as expected. In order to differentiate between the two states (an allocation that is reconnecting and one that is just running) the scheduler needs an extra piece of state. The current implementation uses the presence of a `TaskClientReconnected` task event to detect when the allocation has reconnected and thus must go through the reconnection process. But this event remains even after the allocation is reconnected, causing all future evals to consider the allocation as still reconnecting. This commit changes the reconnect logic to use an `AllocState` to register when the allocation was reconnected. This provides the following benefits: - Only a limited number of task states are kept, and they are used for many other events. It's possible that, upon reconnecting, several actions are triggered that could cause the `TaskClientReconnected` event to be dropped. - Task events are set by clients and so their timestamps are subject to time skew from servers. This prevents using time to determine if an allocation reconnected after a disconnect event. - Disconnect events are already stored as `AllocState` and so storing reconnects there as well makes it the only source of information required. With the new logic, the reconnection logic is only triggered if the last `AllocState` is a disconnect event, meaning that the allocation has not been reconnected yet. After the reconnection is handled, the new `ClientStatus` is store in `AllocState` allowing future evals to skip the reconnection logic. * scheduler: prevent spurious placement on reconnect When a client reconnects it makes two independent RPC calls: - `Node.UpdateStatus` to heartbeat and set its status as `ready`. - `Node.UpdateAlloc` to update the status of its allocations. These two calls can happen in any order, and in case the allocations are updated before a heartbeat it causes the state to be the same as a node being disconnected: the node status will still be `disconnected` while the allocation `ClientStatus` is set to `running`. The current implementation did not handle this order of events properly, and the scheduler would create an unnecessary placement since it considered the allocation was being disconnected. This extra allocation would then be quickly stopped by the heartbeat eval. This commit adds a new code path to handle this order of events. If the node is `disconnected` and the allocation `ClientStatus` is `running` the scheduler will check if the allocation is actually reconnecting using its `AllocState` events. * rpc: only allow alloc updates from `ready` nodes Clients interact with servers using three main RPC methods: - `Node.GetAllocs` reads allocation data from the server and writes it to the client. - `Node.UpdateAlloc` reads allocation from from the client and writes them to the server. - `Node.UpdateStatus` writes the client status to the server and is used as the heartbeat mechanism. These three methods are called periodically by the clients and are done so independently from each other, meaning that there can't be any assumptions in their ordering. This can generate scenarios that are hard to reason about and to code for. For example, when a client misses too many heartbeats it will be considered `down` or `disconnected` and the allocations it was running are set to `lost` or `unknown`. When connectivity is restored the to rest of the cluster, the natural mental model is to think that the client will heartbeat first and then update its allocations status into the servers. But since there's no inherit order in these calls the reverse is just as possible: the client updates the alloc status and then heartbeats. This results in a state where allocs are, for example, `running` while the client is still `disconnected`. This commit adds a new verification to the `Node.UpdateAlloc` method to reject updates from nodes that are not `ready`, forcing clients to heartbeat first. Since this check is done server-side there is no need to coordinate operations client-side: they can continue sending these requests independently and alloc update will succeed after the heartbeat is done. * chagelog: add entry for #15068 * code review * client: skip terminal allocations on reconnect When the client reconnects with the server it synchronizes the state of its allocations by sending data using the `Node.UpdateAlloc` RPC and fetching data using the `Node.GetClientAllocs` RPC. If the data fetch happens before the data write, `unknown` allocations will still be in this state and would trigger the `allocRunner.Reconnect` flow. But when the server `DesiredStatus` for the allocation is `stop` the client should not reconnect the allocation. * apply more code review changes * scheduler: persist changes to reconnected allocs Reconnected allocs have a new AllocState entry that must be persisted by the plan applier. * rpc: read node ID from allocs in UpdateAlloc The AllocUpdateRequest struct is used in three disjoint use cases: 1. Stripped allocs from clients Node.UpdateAlloc RPC using the Allocs, and WriteRequest fields 2. Raft log message using the Allocs, Evals, and WriteRequest fields 3. Plan updates using the AllocsStopped, AllocsUpdated, and Job fields Adding a new field that would only be used in one these cases (1) made things more confusing and error prone. While in theory an AllocUpdateRequest could send allocations from different nodes, in practice this never actually happens since only clients call this method with their own allocations. * scheduler: remove logic to handle exceptional case This condition could only be hit if, somehow, the allocation status was set to "running" while the client was "unknown". This was addressed by enforcing an order in "Node.UpdateStatus" and "Node.UpdateAlloc" RPC calls, so this scenario is not expected to happen. Adding unnecessary code to the scheduler makes it harder to read and reason about it. * more code review * remove another unused test
2022-11-04 20:25:11 +00:00
allocState: reconnectedAllocState,
expectedPlanCount: 0,
expectedNodeAllocation: nil,
expectedNodeUpdate: nil,
},
{
name: "sysbatch-unknown-expired",
jobType: structs.JobTypeSysBatch,
required: true,
exists: true,
nodeStatus: structs.NodeStatusReady,
migrate: false,
draining: false,
targeted: true,
modifyJob: false,
previousTerminal: false,
clientStatus: structs.AllocClientStatusUnknown,
desiredStatus: structs.AllocDesiredStatusRun,
allocState: expiredAllocState,
expectedPlanCount: 1,
expectedNodeAllocation: nil,
expectedNodeUpdate: map[string]*structs.Allocation{
"id": {
ClientStatus: structs.AllocClientStatusLost,
DesiredStatus: structs.AllocDesiredStatusStop,
},
},
},
{
name: "sysbatch-migrate",
jobType: structs.JobTypeSysBatch,
required: true,
exists: true,
nodeStatus: structs.NodeStatusDown,
migrate: true,
draining: true,
targeted: true,
modifyJob: false,
previousTerminal: false,
clientStatus: structs.AllocClientStatusRunning,
desiredStatus: structs.AllocDesiredStatusRun,
allocState: nil,
expectedPlanCount: 1,
expectedNodeAllocation: nil,
expectedNodeUpdate: map[string]*structs.Allocation{
"id": {
ClientStatus: structs.AllocClientStatusRunning,
DesiredStatus: structs.AllocDesiredStatusStop,
},
},
},
{
name: "system-stopped",
jobType: structs.JobTypeSysBatch,
required: false,
exists: true,
nodeStatus: structs.NodeStatusDown,
migrate: false,
draining: false,
targeted: true,
modifyJob: false,
previousTerminal: false,
clientStatus: structs.AllocClientStatusRunning,
desiredStatus: structs.AllocDesiredStatusRun,
allocState: nil,
expectedPlanCount: 1,
expectedNodeAllocation: nil,
expectedNodeUpdate: map[string]*structs.Allocation{
"id": {
ClientStatus: structs.AllocClientStatusRunning,
DesiredStatus: structs.AllocDesiredStatusStop,
},
},
},
{
name: "system-lost",
jobType: structs.JobTypeSystem,
required: true,
exists: true,
nodeStatus: structs.NodeStatusDown,
migrate: false,
draining: false,
targeted: true,
modifyJob: false,
previousTerminal: false,
clientStatus: structs.AllocClientStatusRunning,
desiredStatus: structs.AllocDesiredStatusRun,
allocState: nil,
expectedPlanCount: 1,
expectedNodeAllocation: nil,
expectedNodeUpdate: map[string]*structs.Allocation{
"id": {
ClientStatus: structs.AllocClientStatusLost,
DesiredStatus: structs.AllocDesiredStatusStop,
},
},
},
{
name: "sysbatch-lost",
jobType: structs.JobTypeSysBatch,
required: true,
exists: true,
nodeStatus: structs.NodeStatusDown,
migrate: false,
draining: false,
targeted: true,
modifyJob: false,
previousTerminal: false,
clientStatus: structs.AllocClientStatusRunning,
desiredStatus: structs.AllocDesiredStatusRun,
allocState: nil,
expectedPlanCount: 1,
expectedNodeAllocation: nil,
expectedNodeUpdate: map[string]*structs.Allocation{
"id": {
ClientStatus: structs.AllocClientStatusLost,
DesiredStatus: structs.AllocDesiredStatusStop,
},
},
},
{
name: "system-node-draining",
jobType: structs.JobTypeSystem,
required: true,
exists: true,
nodeStatus: structs.NodeStatusReady,
migrate: false,
draining: true,
targeted: true,
modifyJob: false,
previousTerminal: false,
clientStatus: structs.AllocClientStatusRunning,
desiredStatus: structs.AllocDesiredStatusRun,
expectedPlanCount: 0,
expectedNodeAllocation: nil,
expectedNodeUpdate: nil,
},
{
name: "sysbatch-node-draining",
jobType: structs.JobTypeSysBatch,
required: true,
exists: true,
nodeStatus: structs.NodeStatusReady,
migrate: false,
draining: true,
targeted: true,
modifyJob: false,
previousTerminal: false,
clientStatus: structs.AllocClientStatusRunning,
desiredStatus: structs.AllocDesiredStatusRun,
expectedPlanCount: 0,
expectedNodeAllocation: nil,
expectedNodeUpdate: nil,
},
{
name: "system-node-down-complete",
jobType: structs.JobTypeSystem,
required: true,
exists: true,
nodeStatus: structs.NodeStatusDown,
migrate: false,
draining: false,
targeted: true,
modifyJob: false,
previousTerminal: false,
clientStatus: structs.AllocClientStatusComplete,
desiredStatus: structs.AllocDesiredStatusRun,
expectedPlanCount: 0,
expectedNodeAllocation: nil,
expectedNodeUpdate: nil,
},
{
name: "sysbatch-node-down-complete",
jobType: structs.JobTypeSysBatch,
required: true,
exists: true,
nodeStatus: structs.NodeStatusDown,
migrate: false,
draining: false,
targeted: true,
modifyJob: false,
previousTerminal: false,
clientStatus: structs.AllocClientStatusComplete,
desiredStatus: structs.AllocDesiredStatusRun,
expectedPlanCount: 0,
expectedNodeAllocation: nil,
expectedNodeUpdate: nil,
},
{
name: "sysbatch-ignore-terminal",
jobType: structs.JobTypeSysBatch,
required: true,
exists: true,
nodeStatus: structs.NodeStatusReady,
migrate: false,
draining: false,
targeted: true,
modifyJob: false,
previousTerminal: false,
clientStatus: structs.AllocClientStatusRunning,
desiredStatus: structs.AllocDesiredStatusEvict,
expectedPlanCount: 0,
expectedNodeAllocation: nil,
expectedNodeUpdate: nil,
},
{
name: "system-ignore-ineligible",
jobType: structs.JobTypeSystem,
required: true,
exists: true,
nodeStatus: structs.NodeStatusDisconnected,
migrate: false,
draining: false,
targeted: true,
modifyJob: false,
previousTerminal: false,
clientStatus: structs.AllocClientStatusPending,
desiredStatus: structs.AllocDesiredStatusRun,
expectedPlanCount: 0,
expectedNodeAllocation: nil,
expectedNodeUpdate: nil,
},
{
name: "sysbatch-ignore-ineligible",
jobType: structs.JobTypeSysBatch,
required: true,
exists: true,
nodeStatus: structs.NodeStatusDisconnected,
migrate: false,
draining: false,
targeted: true,
modifyJob: false,
previousTerminal: false,
clientStatus: structs.AllocClientStatusPending,
desiredStatus: structs.AllocDesiredStatusRun,
expectedPlanCount: 0,
expectedNodeAllocation: nil,
expectedNodeUpdate: nil,
},
{
name: "system-stop-not-targeted",
jobType: structs.JobTypeSystem,
required: true,
exists: true,
nodeStatus: structs.NodeStatusReady,
migrate: false,
draining: false,
targeted: false,
modifyJob: false,
previousTerminal: false,
clientStatus: structs.AllocClientStatusRunning,
desiredStatus: structs.AllocDesiredStatusRun,
expectedPlanCount: 1,
expectedNodeAllocation: nil,
expectedNodeUpdate: map[string]*structs.Allocation{
"id": {
ClientStatus: structs.AllocClientStatusRunning,
DesiredStatus: structs.AllocDesiredStatusStop,
},
},
},
{
name: "sysbatch-stop-not-targeted",
jobType: structs.JobTypeSysBatch,
required: true,
exists: true,
nodeStatus: structs.NodeStatusReady,
migrate: false,
draining: false,
targeted: false,
modifyJob: false,
previousTerminal: false,
clientStatus: structs.AllocClientStatusRunning,
desiredStatus: structs.AllocDesiredStatusRun,
allocState: nil,
expectedPlanCount: 1,
expectedNodeAllocation: nil,
expectedNodeUpdate: map[string]*structs.Allocation{
"id": {
ClientStatus: structs.AllocClientStatusRunning,
DesiredStatus: structs.AllocDesiredStatusStop,
},
},
},
{
name: "system-update-job-version",
jobType: structs.JobTypeSystem,
required: true,
exists: true,
nodeStatus: structs.NodeStatusReady,
migrate: false,
draining: false,
targeted: true,
modifyJob: true,
previousTerminal: false,
clientStatus: structs.AllocClientStatusRunning,
desiredStatus: structs.AllocDesiredStatusRun,
allocState: nil,
expectedPlanCount: 1,
expectedNodeAllocation: map[string]*structs.Allocation{
"id": {
ClientStatus: structs.AllocClientStatusPending,
DesiredStatus: structs.AllocDesiredStatusRun,
},
},
expectedNodeUpdate: map[string]*structs.Allocation{
"id": {
ClientStatus: structs.AllocClientStatusRunning,
DesiredStatus: structs.AllocDesiredStatusStop,
},
},
},
{
name: "sysbatch-update-job-version",
jobType: structs.JobTypeSysBatch,
required: true,
exists: true,
nodeStatus: structs.NodeStatusReady,
migrate: false,
draining: false,
targeted: true,
modifyJob: true,
previousTerminal: false,
clientStatus: structs.AllocClientStatusRunning,
desiredStatus: structs.AllocDesiredStatusRun,
allocState: nil,
expectedPlanCount: 1,
expectedNodeAllocation: map[string]*structs.Allocation{
"id": {
ClientStatus: structs.AllocClientStatusPending,
DesiredStatus: structs.AllocDesiredStatusRun,
},
},
expectedNodeUpdate: map[string]*structs.Allocation{
"id": {
ClientStatus: structs.AllocClientStatusRunning,
DesiredStatus: structs.AllocDesiredStatusStop,
},
},
},
{
name: "sysbatch-ignore-successful-tainted",
jobType: structs.JobTypeSysBatch,
required: true,
exists: true,
nodeStatus: structs.NodeStatusDown,
migrate: false,
draining: false,
targeted: true,
modifyJob: false,
previousTerminal: false,
clientStatus: structs.AllocClientStatusRunning,
desiredStatus: structs.AllocDesiredStatusRun,
allocState: nil,
taskState: successTaskState,
expectedPlanCount: 0,
expectedNodeAllocation: nil,
expectedNodeUpdate: nil,
},
{
name: "sysbatch-annotate-when-not-existing",
jobType: structs.JobTypeSysBatch,
required: true,
exists: false,
nodeStatus: structs.NodeStatusReady,
migrate: false,
draining: false,
targeted: true,
modifyJob: false,
previousTerminal: false,
clientStatus: structs.AllocClientStatusRunning,
desiredStatus: structs.AllocDesiredStatusRun,
allocState: nil,
expectedPlanCount: 1,
expectedNodeAllocation: map[string]*structs.Allocation{
"id": {
ClientStatus: structs.AllocClientStatusPending,
DesiredStatus: structs.AllocDesiredStatusRun,
},
},
expectedNodeUpdate: nil,
},
{
name: "sysbatch-update-modified-terminal-when-not-existing",
jobType: structs.JobTypeSysBatch,
required: true,
exists: false,
nodeStatus: structs.NodeStatusReady,
migrate: false,
draining: false,
targeted: true,
modifyJob: true,
previousTerminal: true,
clientStatus: structs.AllocClientStatusRunning,
desiredStatus: structs.AllocDesiredStatusRun,
allocState: nil,
expectedPlanCount: 1,
expectedNodeAllocation: map[string]*structs.Allocation{
"id": {
ClientStatus: structs.AllocClientStatusPending,
DesiredStatus: structs.AllocDesiredStatusRun,
},
},
expectedNodeUpdate: map[string]*structs.Allocation{
"id": {
ClientStatus: structs.AllocClientStatusComplete,
DesiredStatus: structs.AllocDesiredStatusStop,
},
},
},
{
name: "sysbatch-ignore-unmodified-terminal-when-not-existing",
jobType: structs.JobTypeSysBatch,
required: true,
exists: false,
nodeStatus: structs.NodeStatusReady,
migrate: false,
draining: false,
targeted: true,
modifyJob: false,
previousTerminal: true,
clientStatus: structs.AllocClientStatusRunning,
desiredStatus: structs.AllocDesiredStatusRun,
allocState: nil,
expectedPlanCount: 0,
expectedNodeAllocation: nil,
expectedNodeUpdate: nil,
},
}
for _, tc := range testCases {
t.Run(tc.name, func(t *testing.T) {
h := NewHarness(t)
// Register a node
node := mock.Node()
node.Status = tc.nodeStatus
if tc.draining {
node.SchedulingEligibility = structs.NodeSchedulingIneligible
}
require.NoError(t, h.State.UpsertNode(structs.MsgTypeTestSetup, h.NextIndex(), node))
// Generate a fake job allocated on that node.
var job *structs.Job
var alloc *structs.Allocation
switch tc.jobType {
case structs.JobTypeSystem:
job = systemJob.Copy()
alloc = systemAlloc.Copy()
case structs.JobTypeSysBatch:
job = sysBatchJob.Copy()
alloc = sysBatchAlloc.Copy()
default:
require.FailNow(t, "invalid jobType")
}
job.TaskGroups[0].MaxClientDisconnect = pointer.Of(5 * time.Second)
if !tc.required {
job.Stop = true
}
// If we are no longer on a targeted node, change it to a non-targeted datacenter
if !tc.targeted {
job.Datacenters = []string{"not-targeted"}
}
require.NoError(t, h.State.UpsertJob(structs.MsgTypeTestSetup, h.NextIndex(), nil, job))
alloc.Job = job.Copy()
alloc.JobID = job.ID
alloc.NodeID = node.ID
alloc.TaskGroup = job.TaskGroups[0].Name
alloc.ClientStatus = tc.clientStatus
alloc.DesiredStatus = tc.desiredStatus
alloc.DesiredTransition.Migrate = pointer.Of(tc.migrate)
alloc.AllocStates = tc.allocState
alloc.TaskStates = tc.taskState
if tc.exists {
require.NoError(t, h.State.UpsertAllocs(structs.MsgTypeTestSetup, h.NextIndex(), []*structs.Allocation{alloc}))
}
if tc.modifyJob {
if tc.jobType == structs.JobTypeSystem {
job.TaskGroups[0].Tasks[0].Resources.Networks[0].DynamicPorts = []structs.Port{{Label: "grpc"}}
}
if tc.jobType == structs.JobTypeSysBatch {
alloc.Job.TaskGroups[0].Tasks[0].Driver = "raw_exec"
}
require.NoError(t, h.State.UpsertJob(structs.MsgTypeTestSetup, h.NextIndex(), nil, job))
}
if tc.previousTerminal {
prev := alloc.Copy()
if tc.modifyJob {
prev.Job.JobModifyIndex = alloc.Job.JobModifyIndex - 1
}
prev.ClientStatus = structs.AllocClientStatusComplete
prev.DesiredStatus = structs.AllocDesiredStatusRun
require.NoError(t, h.State.UpsertAllocs(structs.MsgTypeTestSetup, h.NextIndex(), []*structs.Allocation{prev}))
}
// Create a mock evaluation to deal with disconnect
eval := &structs.Evaluation{
Namespace: structs.DefaultNamespace,
ID: uuid.Generate(),
Priority: 50,
TriggeredBy: structs.EvalTriggerNodeUpdate,
JobID: job.ID,
NodeID: node.ID,
Status: structs.EvalStatusPending,
}
require.NoError(t, h.State.UpsertEvals(structs.MsgTypeTestSetup, h.NextIndex(), []*structs.Evaluation{eval}))
// Process the evaluation
err := h.Process(NewSystemScheduler, eval)
require.NoError(t, err)
// Ensure a single plan
require.Len(t, h.Plans, tc.expectedPlanCount)
if tc.expectedPlanCount == 0 {
return
}
plan := h.Plans[0]
// Ensure the plan creates the expected plan
require.Len(t, plan.NodeAllocation[node.ID], len(tc.expectedNodeAllocation))
require.Len(t, plan.NodeUpdate[node.ID], len(tc.expectedNodeUpdate))
foundMatch := false
for _, plannedNodeAllocs := range plan.NodeAllocation {
for _, actual := range plannedNodeAllocs {
for _, expected := range tc.expectedNodeAllocation {
if expected.ClientStatus == actual.ClientStatus &&
expected.DesiredStatus == actual.DesiredStatus {
foundMatch = true
break
}
}
}
}
if len(tc.expectedNodeAllocation) > 0 {
require.True(t, foundMatch, "NodeAllocation did not match")
}
foundMatch = false
for _, plannedNodeUpdates := range plan.NodeUpdate {
for _, actual := range plannedNodeUpdates {
for _, expected := range tc.expectedNodeUpdate {
if expected.ClientStatus == actual.ClientStatus &&
expected.DesiredStatus == actual.DesiredStatus {
foundMatch = true
break
}
}
}
}
if len(tc.expectedNodeUpdate) > 0 {
require.True(t, foundMatch, "NodeUpdate did not match")
}
h.AssertEvalStatus(t, structs.EvalStatusComplete)
})
}
}
func TestSystemSched_CSITopology(t *testing.T) {
ci.Parallel(t)
h := NewHarness(t)
zones := []string{"zone-0", "zone-1", "zone-2", "zone-3"}
// Create some nodes, each running a CSI plugin with topology for
// a different "zone"
for i := 0; i < 12; i++ {
node := mock.Node()
node.Datacenter = zones[i%4]
node.CSINodePlugins = map[string]*structs.CSIInfo{
"test-plugin-" + zones[i%4]: {
PluginID: "test-plugin-" + zones[i%4],
Healthy: true,
NodeInfo: &structs.CSINodeInfo{
MaxVolumes: 3,
AccessibleTopology: &structs.CSITopology{
Segments: map[string]string{"zone": zones[i%4]}},
},
},
}
must.NoError(t, h.State.UpsertNode(
structs.MsgTypeTestSetup, h.NextIndex(), node))
}
// create a non-default namespace for the job and volume
ns := "non-default-namespace"
must.NoError(t, h.State.UpsertNamespaces(h.NextIndex(),
[]*structs.Namespace{{Name: ns}}))
// create a volume that lives in one zone
vol0 := structs.NewCSIVolume("myvolume", 0)
vol0.PluginID = "test-plugin-zone-0"
vol0.Namespace = ns
vol0.AccessMode = structs.CSIVolumeAccessModeMultiNodeMultiWriter
vol0.AttachmentMode = structs.CSIVolumeAttachmentModeFilesystem
vol0.RequestedTopologies = &structs.CSITopologyRequest{
Required: []*structs.CSITopology{{
Segments: map[string]string{"zone": "zone-0"},
}},
}
must.NoError(t, h.State.UpsertCSIVolume(
h.NextIndex(), []*structs.CSIVolume{vol0}))
// Create a job that uses that volumes
job := mock.SystemJob()
job.Datacenters = zones
job.Namespace = ns
job.TaskGroups[0].Volumes = map[string]*structs.VolumeRequest{
"myvolume": {
Type: "csi",
Name: "unique",
Source: "myvolume",
},
}
must.NoError(t, h.State.UpsertJob(structs.MsgTypeTestSetup, h.NextIndex(), nil, job))
// Create a mock evaluation to register the job
eval := &structs.Evaluation{
Namespace: ns,
ID: uuid.Generate(),
Priority: job.Priority,
TriggeredBy: structs.EvalTriggerJobRegister,
JobID: job.ID,
Status: structs.EvalStatusPending,
}
must.NoError(t, h.State.UpsertEvals(structs.MsgTypeTestSetup,
h.NextIndex(), []*structs.Evaluation{eval}))
// Process the evaluation and expect a single plan without annotations
err := h.Process(NewSystemScheduler, eval)
must.NoError(t, err)
must.Len(t, 1, h.Plans, must.Sprint("expected one plan"))
must.Nil(t, h.Plans[0].Annotations, must.Sprint("expected no annotations"))
// Expect the eval has not spawned a blocked eval
must.Eq(t, len(h.CreateEvals), 0)
must.Eq(t, "", h.Evals[0].BlockedEval, must.Sprint("did not expect a blocked eval"))
must.Eq(t, structs.EvalStatusComplete, h.Evals[0].Status)
}