From 811fe333da765f8f1accfd3888d6051509aea5f3 Mon Sep 17 00:00:00 2001 From: Tim Gross Date: Fri, 3 Feb 2023 12:29:39 -0500 Subject: [PATCH] scheduler: move utils into files specific to their scheduler type (#16051) Many of the functions in the `utils.go` file are specific to a particular scheduler, and very few of them have guards (or even names) that help avoid misuse with features specific to a given scheduler type. Move these functions (and their tests) into files specific to their scheduler type without any functionality changes to make it clear which bits go with what. --- scheduler/reconcile.go | 5 + scheduler/reconcile_util.go | 5 + scheduler/stack.go | 26 ++ scheduler/system_util.go | 348 ++++++++++++++++ scheduler/system_util_test.go | 580 ++++++++++++++++++++++++++ scheduler/util.go | 361 ---------------- scheduler/util_test.go | 761 ++++------------------------------ 7 files changed, 1054 insertions(+), 1032 deletions(-) create mode 100644 scheduler/system_util.go create mode 100644 scheduler/system_util_test.go diff --git a/scheduler/reconcile.go b/scheduler/reconcile.go index d309cb30f..b3da414bd 100644 --- a/scheduler/reconcile.go +++ b/scheduler/reconcile.go @@ -1,5 +1,10 @@ package scheduler +// The reconciler is the first stage in the scheduler for service and batch +// jobs. It compares the existing state to the desired state to determine the +// set of changes needed. System jobs and sysbatch jobs do not use the +// reconciler. + import ( "fmt" "sort" diff --git a/scheduler/reconcile_util.go b/scheduler/reconcile_util.go index e820cfafc..42e8fb8cd 100644 --- a/scheduler/reconcile_util.go +++ b/scheduler/reconcile_util.go @@ -1,5 +1,10 @@ package scheduler +// The structs and helpers in this file are split out of reconciler.go for code +// manageability and should not be shared to the system schedulers! If you need +// something here for system/sysbatch jobs, double-check it's safe to use for +// all scheduler types before moving it into util.go + import ( "fmt" "sort" diff --git a/scheduler/stack.go b/scheduler/stack.go index cbab8d469..dbade8531 100644 --- a/scheduler/stack.go +++ b/scheduler/stack.go @@ -439,3 +439,29 @@ func NewGenericStack(batch bool, ctx Context) *GenericStack { s.maxScore = NewMaxScoreIterator(ctx, s.limit) return s } + +// taskGroupConstraints collects the constraints, drivers and resources required by each +// sub-task to aggregate the TaskGroup totals +func taskGroupConstraints(tg *structs.TaskGroup) tgConstrainTuple { + c := tgConstrainTuple{ + constraints: make([]*structs.Constraint, 0, len(tg.Constraints)), + drivers: make(map[string]struct{}), + } + + c.constraints = append(c.constraints, tg.Constraints...) + for _, task := range tg.Tasks { + c.drivers[task.Driver] = struct{}{} + c.constraints = append(c.constraints, task.Constraints...) + } + + return c +} + +// tgConstrainTuple is used to store the total constraints of a task group. +type tgConstrainTuple struct { + // Holds the combined constraints of the task group and all it's sub-tasks. + constraints []*structs.Constraint + + // The set of required drivers within the task group. + drivers map[string]struct{} +} diff --git a/scheduler/system_util.go b/scheduler/system_util.go new file mode 100644 index 000000000..6c6248d5c --- /dev/null +++ b/scheduler/system_util.go @@ -0,0 +1,348 @@ +package scheduler + +// The structs and helpers in this file are split out of scheduler_system.go and +// shared by the system and sysbatch scheduler. No code in the generic scheduler +// or reconciler should use anything here! If you need something here for +// service/batch jobs, double-check it's safe to use for all scheduler types +// before moving it into util.go + +import ( + "fmt" + "time" + + "github.com/hashicorp/nomad/nomad/structs" +) + +// materializeSystemTaskGroups is used to materialize all the task groups +// a system or sysbatch job requires. +func materializeSystemTaskGroups(job *structs.Job) map[string]*structs.TaskGroup { + out := make(map[string]*structs.TaskGroup) + if job.Stopped() { + return out + } + + for _, tg := range job.TaskGroups { + for i := 0; i < tg.Count; i++ { + name := fmt.Sprintf("%s.%s[%d]", job.Name, tg.Name, i) + out[name] = tg + } + } + return out +} + +// diffSystemAllocsForNode is used to do a set difference between the target allocations +// and the existing allocations for a particular node. This returns 8 sets of results, +// the list of named task groups that need to be placed (no existing allocation), the +// allocations that need to be updated (job definition is newer), allocs that +// need to be migrated (node is draining), the allocs that need to be evicted +// (no longer required), those that should be ignored, those that are lost +// that need to be replaced (running on a lost node), those that are running on +// a disconnected node but may resume, and those that may still be running on +// a node that has resumed reconnected. +func diffSystemAllocsForNode( + job *structs.Job, // job whose allocs are going to be diff-ed + nodeID string, + eligibleNodes map[string]*structs.Node, + notReadyNodes map[string]struct{}, // nodes that are not ready, e.g. draining + taintedNodes map[string]*structs.Node, // nodes which are down (by node id) + required map[string]*structs.TaskGroup, // set of allocations that must exist + allocs []*structs.Allocation, // non-terminal allocations that exist + terminal structs.TerminalByNodeByName, // latest terminal allocations (by node, id) + serverSupportsDisconnectedClients bool, // flag indicating whether to apply disconnected client logic +) *diffResult { + result := new(diffResult) + + // Scan the existing updates + existing := make(map[string]struct{}) // set of alloc names + for _, exist := range allocs { + // Index the existing node + name := exist.Name + existing[name] = struct{}{} + + // Check for the definition in the required set + tg, ok := required[name] + + // If not required, we stop the alloc + if !ok { + result.stop = append(result.stop, allocTuple{ + Name: name, + TaskGroup: tg, + Alloc: exist, + }) + continue + } + + supportsDisconnectedClients := exist.SupportsDisconnectedClients(serverSupportsDisconnectedClients) + + reconnect := false + expired := false + + // Only compute reconnect for unknown and running since they need to go + // through the reconnect process. + if supportsDisconnectedClients && + (exist.ClientStatus == structs.AllocClientStatusUnknown || + exist.ClientStatus == structs.AllocClientStatusRunning) { + reconnect = exist.NeedsToReconnect() + if reconnect { + expired = exist.Expired(time.Now()) + } + } + + // If we have been marked for migration and aren't terminal, migrate + if !exist.TerminalStatus() && exist.DesiredTransition.ShouldMigrate() { + result.migrate = append(result.migrate, allocTuple{ + Name: name, + TaskGroup: tg, + Alloc: exist, + }) + continue + } + + // If we are a sysbatch job and terminal, ignore (or stop?) the alloc + if job.Type == structs.JobTypeSysBatch && exist.TerminalStatus() { + result.ignore = append(result.ignore, allocTuple{ + Name: name, + TaskGroup: tg, + Alloc: exist, + }) + continue + } + + // Expired unknown allocs are lost. Expired checks that status is unknown. + if supportsDisconnectedClients && expired { + result.lost = append(result.lost, allocTuple{ + Name: name, + TaskGroup: tg, + Alloc: exist, + }) + continue + } + + // Ignore unknown allocs that we want to reconnect eventually. + if supportsDisconnectedClients && + exist.ClientStatus == structs.AllocClientStatusUnknown && + exist.DesiredStatus == structs.AllocDesiredStatusRun { + result.ignore = append(result.ignore, allocTuple{ + Name: name, + TaskGroup: tg, + Alloc: exist, + }) + continue + } + + node, nodeIsTainted := taintedNodes[exist.NodeID] + + // Filter allocs on a node that is now re-connected to reconnecting. + if supportsDisconnectedClients && + !nodeIsTainted && + reconnect { + + // Record the new ClientStatus to indicate to future evals that the + // alloc has already reconnected. + reconnecting := exist.Copy() + reconnecting.AppendState(structs.AllocStateFieldClientStatus, exist.ClientStatus) + result.reconnecting = append(result.reconnecting, allocTuple{ + Name: name, + TaskGroup: tg, + Alloc: reconnecting, + }) + continue + } + + // If we are on a tainted node, we must migrate if we are a service or + // if the batch allocation did not finish + if nodeIsTainted { + // If the job is batch and finished successfully, the fact that the + // node is tainted does not mean it should be migrated or marked as + // lost as the work was already successfully finished. However for + // service/system jobs, tasks should never complete. The check of + // batch type, defends against client bugs. + if exist.Job.Type == structs.JobTypeSysBatch && exist.RanSuccessfully() { + goto IGNORE + } + + // Filter running allocs on a node that is disconnected to be marked as unknown. + if node != nil && + supportsDisconnectedClients && + node.Status == structs.NodeStatusDisconnected && + exist.ClientStatus == structs.AllocClientStatusRunning { + + disconnect := exist.Copy() + disconnect.ClientStatus = structs.AllocClientStatusUnknown + disconnect.AppendState(structs.AllocStateFieldClientStatus, structs.AllocClientStatusUnknown) + disconnect.ClientDescription = allocUnknown + result.disconnecting = append(result.disconnecting, allocTuple{ + Name: name, + TaskGroup: tg, + Alloc: disconnect, + }) + continue + } + + if !exist.TerminalStatus() && (node == nil || node.TerminalStatus()) { + result.lost = append(result.lost, allocTuple{ + Name: name, + TaskGroup: tg, + Alloc: exist, + }) + } else { + goto IGNORE + } + + continue + } + + // For an existing allocation, if the nodeID is no longer + // eligible, the diff should be ignored + if _, ineligible := notReadyNodes[nodeID]; ineligible { + goto IGNORE + } + + // Existing allocations on nodes that are no longer targeted + // should be stopped + if _, eligible := eligibleNodes[nodeID]; !eligible { + result.stop = append(result.stop, allocTuple{ + Name: name, + TaskGroup: tg, + Alloc: exist, + }) + continue + } + + // If the definition is updated we need to update + if job.JobModifyIndex != exist.Job.JobModifyIndex { + result.update = append(result.update, allocTuple{ + Name: name, + TaskGroup: tg, + Alloc: exist, + }) + continue + } + + // Everything is up-to-date + IGNORE: + result.ignore = append(result.ignore, allocTuple{ + Name: name, + TaskGroup: tg, + Alloc: exist, + }) + } + + // Scan the required groups + for name, tg := range required { + + // Check for an existing allocation + if _, ok := existing[name]; !ok { + + // Check for a terminal sysbatch allocation, which should be not placed + // again unless the job has been updated. + if job.Type == structs.JobTypeSysBatch { + if alloc, termExists := terminal.Get(nodeID, name); termExists { + // the alloc is terminal, but now the job has been updated + if job.JobModifyIndex != alloc.Job.JobModifyIndex { + result.update = append(result.update, allocTuple{ + Name: name, + TaskGroup: tg, + Alloc: alloc, + }) + } else { + // alloc is terminal and job unchanged, leave it alone + result.ignore = append(result.ignore, allocTuple{ + Name: name, + TaskGroup: tg, + Alloc: alloc, + }) + } + continue + } + } + + // Require a placement if no existing allocation. If there + // is an existing allocation, we would have checked for a potential + // update or ignore above. Ignore placements for tainted or + // ineligible nodes + + // Tainted and ineligible nodes for a non existing alloc + // should be filtered out and not count towards ignore or place + if _, tainted := taintedNodes[nodeID]; tainted { + continue + } + if _, eligible := eligibleNodes[nodeID]; !eligible { + continue + } + + termOnNode, _ := terminal.Get(nodeID, name) + allocTuple := allocTuple{ + Name: name, + TaskGroup: tg, + Alloc: termOnNode, + } + + // If the new allocation isn't annotated with a previous allocation + // or if the previous allocation isn't from the same node then we + // annotate the allocTuple with a new Allocation + if allocTuple.Alloc == nil || allocTuple.Alloc.NodeID != nodeID { + allocTuple.Alloc = &structs.Allocation{NodeID: nodeID} + } + + result.place = append(result.place, allocTuple) + } + } + return result +} + +// diffSystemAllocs is like diffSystemAllocsForNode however, the allocations in the +// diffResult contain the specific nodeID they should be allocated on. +func diffSystemAllocs( + job *structs.Job, // jobs whose allocations are going to be diff-ed + readyNodes []*structs.Node, // list of nodes in the ready state + notReadyNodes map[string]struct{}, // list of nodes in DC but not ready, e.g. draining + taintedNodes map[string]*structs.Node, // nodes which are down or drain mode (by node id) + allocs []*structs.Allocation, // non-terminal allocations + terminal structs.TerminalByNodeByName, // latest terminal allocations (by node id) + serverSupportsDisconnectedClients bool, // flag indicating whether to apply disconnected client logic +) *diffResult { + + // Build a mapping of nodes to all their allocs. + nodeAllocs := make(map[string][]*structs.Allocation, len(allocs)) + for _, alloc := range allocs { + nodeAllocs[alloc.NodeID] = append(nodeAllocs[alloc.NodeID], alloc) + } + + eligibleNodes := make(map[string]*structs.Node) + for _, node := range readyNodes { + if _, ok := nodeAllocs[node.ID]; !ok { + nodeAllocs[node.ID] = nil + } + eligibleNodes[node.ID] = node + } + + // Create the required task groups. + required := materializeSystemTaskGroups(job) + + result := new(diffResult) + for nodeID, allocs := range nodeAllocs { + diff := diffSystemAllocsForNode(job, nodeID, eligibleNodes, notReadyNodes, taintedNodes, required, allocs, terminal, serverSupportsDisconnectedClients) + result.Append(diff) + } + + return result +} + +// evictAndPlace is used to mark allocations for evicts and add them to the +// placement queue. evictAndPlace modifies both the diffResult and the +// limit. It returns true if the limit has been reached. +func evictAndPlace(ctx Context, diff *diffResult, allocs []allocTuple, desc string, limit *int) bool { + n := len(allocs) + for i := 0; i < n && i < *limit; i++ { + a := allocs[i] + ctx.Plan().AppendStoppedAlloc(a.Alloc, desc, "", "") + diff.place = append(diff.place, a) + } + if n <= *limit { + *limit -= n + return false + } + *limit = 0 + return true +} diff --git a/scheduler/system_util_test.go b/scheduler/system_util_test.go new file mode 100644 index 000000000..333d8787c --- /dev/null +++ b/scheduler/system_util_test.go @@ -0,0 +1,580 @@ +package scheduler + +import ( + "fmt" + "testing" + "time" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + + "github.com/hashicorp/nomad/ci" + "github.com/hashicorp/nomad/helper/pointer" + "github.com/hashicorp/nomad/helper/uuid" + "github.com/hashicorp/nomad/nomad/mock" + "github.com/hashicorp/nomad/nomad/structs" +) + +func TestDiffSystemAllocsForNode_Sysbatch_terminal(t *testing.T) { + ci.Parallel(t) + + // For a sysbatch job, the scheduler should not re-place an allocation + // that has become terminal, unless the job has been updated. + + job := mock.SystemBatchJob() + required := materializeSystemTaskGroups(job) + + eligible := map[string]*structs.Node{ + "node1": newNode("node1"), + } + + var live []*structs.Allocation // empty + + tainted := map[string]*structs.Node(nil) + + t.Run("current job", func(t *testing.T) { + terminal := structs.TerminalByNodeByName{ + "node1": map[string]*structs.Allocation{ + "my-sysbatch.pinger[0]": { + ID: uuid.Generate(), + NodeID: "node1", + Name: "my-sysbatch.pinger[0]", + Job: job, + ClientStatus: structs.AllocClientStatusComplete, + }, + }, + } + + diff := diffSystemAllocsForNode(job, "node1", eligible, nil, tainted, required, live, terminal, true) + require.Empty(t, diff.place) + require.Empty(t, diff.update) + require.Empty(t, diff.stop) + require.Empty(t, diff.migrate) + require.Empty(t, diff.lost) + require.True(t, len(diff.ignore) == 1 && diff.ignore[0].Alloc == terminal["node1"]["my-sysbatch.pinger[0]"]) + }) + + t.Run("outdated job", func(t *testing.T) { + previousJob := job.Copy() + previousJob.JobModifyIndex -= 1 + terminal := structs.TerminalByNodeByName{ + "node1": map[string]*structs.Allocation{ + "my-sysbatch.pinger[0]": { + ID: uuid.Generate(), + NodeID: "node1", + Name: "my-sysbatch.pinger[0]", + Job: previousJob, + }, + }, + } + + expAlloc := terminal["node1"]["my-sysbatch.pinger[0]"] + expAlloc.NodeID = "node1" + + diff := diffSystemAllocsForNode(job, "node1", eligible, nil, tainted, required, live, terminal, true) + require.Empty(t, diff.place) + require.Len(t, diff.update, 1) + require.Empty(t, diff.stop) + require.Empty(t, diff.migrate) + require.Empty(t, diff.lost) + require.Empty(t, diff.ignore) + }) +} + +func TestDiffSystemAllocsForNode(t *testing.T) { + ci.Parallel(t) + + job := mock.Job() + required := materializeSystemTaskGroups(job) + + // The "old" job has a previous modify index + oldJob := new(structs.Job) + *oldJob = *job + oldJob.JobModifyIndex -= 1 + + eligibleNode := mock.Node() + eligibleNode.ID = "zip" + + drainNode := mock.DrainNode() + + deadNode := mock.Node() + deadNode.Status = structs.NodeStatusDown + + tainted := map[string]*structs.Node{ + "dead": deadNode, + "drainNode": drainNode, + } + + eligible := map[string]*structs.Node{ + eligibleNode.ID: eligibleNode, + } + + allocs := []*structs.Allocation{ + // Update the 1st + { + ID: uuid.Generate(), + NodeID: "zip", + Name: "my-job.web[0]", + Job: oldJob, + }, + + // Ignore the 2rd + { + ID: uuid.Generate(), + NodeID: "zip", + Name: "my-job.web[1]", + Job: job, + }, + + // Evict 11th + { + ID: uuid.Generate(), + NodeID: "zip", + Name: "my-job.web[10]", + Job: oldJob, + }, + + // Migrate the 3rd + { + ID: uuid.Generate(), + NodeID: "drainNode", + Name: "my-job.web[2]", + Job: oldJob, + DesiredTransition: structs.DesiredTransition{ + Migrate: pointer.Of(true), + }, + }, + // Mark the 4th lost + { + ID: uuid.Generate(), + NodeID: "dead", + Name: "my-job.web[3]", + Job: oldJob, + }, + } + + // Have three terminal allocs + terminal := structs.TerminalByNodeByName{ + "zip": map[string]*structs.Allocation{ + "my-job.web[4]": { + ID: uuid.Generate(), + NodeID: "zip", + Name: "my-job.web[4]", + Job: job, + }, + "my-job.web[5]": { + ID: uuid.Generate(), + NodeID: "zip", + Name: "my-job.web[5]", + Job: job, + }, + "my-job.web[6]": { + ID: uuid.Generate(), + NodeID: "zip", + Name: "my-job.web[6]", + Job: job, + }, + }, + } + + diff := diffSystemAllocsForNode(job, "zip", eligible, nil, tainted, required, allocs, terminal, true) + + // We should update the first alloc + require.Len(t, diff.update, 1) + require.Equal(t, allocs[0], diff.update[0].Alloc) + + // We should ignore the second alloc + require.Len(t, diff.ignore, 1) + require.Equal(t, allocs[1], diff.ignore[0].Alloc) + + // We should stop the 3rd alloc + require.Len(t, diff.stop, 1) + require.Equal(t, allocs[2], diff.stop[0].Alloc) + + // We should migrate the 4rd alloc + require.Len(t, diff.migrate, 1) + require.Equal(t, allocs[3], diff.migrate[0].Alloc) + + // We should mark the 5th alloc as lost + require.Len(t, diff.lost, 1) + require.Equal(t, allocs[4], diff.lost[0].Alloc) + + // We should place 6 + require.Len(t, diff.place, 6) + + // Ensure that the allocations which are replacements of terminal allocs are + // annotated. + for _, m := range terminal { + for _, alloc := range m { + for _, tuple := range diff.place { + if alloc.Name == tuple.Name { + require.Equal(t, alloc, tuple.Alloc) + } + } + } + } +} + +// Test the desired diff for an updated system job running on a +// ineligible node +func TestDiffSystemAllocsForNode_ExistingAllocIneligibleNode(t *testing.T) { + ci.Parallel(t) + + job := mock.SystemJob() + required := materializeSystemTaskGroups(job) + + // The "old" job has a previous modify index + oldJob := new(structs.Job) + *oldJob = *job + oldJob.JobModifyIndex -= 1 + + eligibleNode := mock.Node() + ineligibleNode := mock.Node() + ineligibleNode.SchedulingEligibility = structs.NodeSchedulingIneligible + + tainted := map[string]*structs.Node{} + + eligible := map[string]*structs.Node{ + eligibleNode.ID: eligibleNode, + } + + allocs := []*structs.Allocation{ + // Update the TG alloc running on eligible node + { + ID: uuid.Generate(), + NodeID: eligibleNode.ID, + Name: "my-job.web[0]", + Job: oldJob, + }, + + // Ignore the TG alloc running on ineligible node + { + ID: uuid.Generate(), + NodeID: ineligibleNode.ID, + Name: "my-job.web[0]", + Job: job, + }, + } + + // No terminal allocs + terminal := make(structs.TerminalByNodeByName) + + diff := diffSystemAllocsForNode(job, eligibleNode.ID, eligible, nil, tainted, required, allocs, terminal, true) + + require.Len(t, diff.place, 0) + require.Len(t, diff.update, 1) + require.Len(t, diff.migrate, 0) + require.Len(t, diff.stop, 0) + require.Len(t, diff.ignore, 1) + require.Len(t, diff.lost, 0) +} + +func TestDiffSystemAllocsForNode_DisconnectedNode(t *testing.T) { + ci.Parallel(t) + + // Create job. + job := mock.SystemJob() + job.TaskGroups[0].MaxClientDisconnect = pointer.Of(time.Hour) + + // Create nodes. + readyNode := mock.Node() + readyNode.Status = structs.NodeStatusReady + + disconnectedNode := mock.Node() + disconnectedNode.Status = structs.NodeStatusDisconnected + + eligibleNodes := map[string]*structs.Node{ + readyNode.ID: readyNode, + } + + taintedNodes := map[string]*structs.Node{ + disconnectedNode.ID: disconnectedNode, + } + + // Create allocs. + required := materializeSystemTaskGroups(job) + terminal := make(structs.TerminalByNodeByName) + + type diffResultCount struct { + place, update, migrate, stop, ignore, lost, disconnecting, reconnecting int + } + + testCases := []struct { + name string + node *structs.Node + allocFn func(*structs.Allocation) + expect diffResultCount + }{ + { + name: "alloc in disconnected client is marked as unknown", + node: disconnectedNode, + allocFn: func(alloc *structs.Allocation) { + alloc.ClientStatus = structs.AllocClientStatusRunning + }, + expect: diffResultCount{ + disconnecting: 1, + }, + }, + { + name: "disconnected alloc reconnects", + node: readyNode, + allocFn: func(alloc *structs.Allocation) { + alloc.ClientStatus = structs.AllocClientStatusRunning + + alloc.AllocStates = []*structs.AllocState{{ + Field: structs.AllocStateFieldClientStatus, + Value: structs.AllocClientStatusUnknown, + Time: time.Now().Add(-time.Minute), + }} + }, + expect: diffResultCount{ + reconnecting: 1, + }, + }, + { + name: "alloc not reconnecting after it reconnects", + node: readyNode, + allocFn: func(alloc *structs.Allocation) { + alloc.ClientStatus = structs.AllocClientStatusRunning + + alloc.AllocStates = []*structs.AllocState{ + { + Field: structs.AllocStateFieldClientStatus, + Value: structs.AllocClientStatusUnknown, + Time: time.Now().Add(-time.Minute), + }, + { + Field: structs.AllocStateFieldClientStatus, + Value: structs.AllocClientStatusRunning, + Time: time.Now(), + }, + } + }, + expect: diffResultCount{ + ignore: 1, + }, + }, + { + name: "disconnected alloc is lost after it expires", + node: disconnectedNode, + allocFn: func(alloc *structs.Allocation) { + alloc.ClientStatus = structs.AllocClientStatusUnknown + + alloc.AllocStates = []*structs.AllocState{{ + Field: structs.AllocStateFieldClientStatus, + Value: structs.AllocClientStatusUnknown, + Time: time.Now().Add(-10 * time.Hour), + }} + }, + expect: diffResultCount{ + lost: 1, + }, + }, + { + name: "disconnected allocs are ignored", + node: disconnectedNode, + allocFn: func(alloc *structs.Allocation) { + alloc.ClientStatus = structs.AllocClientStatusUnknown + + alloc.AllocStates = []*structs.AllocState{{ + Field: structs.AllocStateFieldClientStatus, + Value: structs.AllocClientStatusUnknown, + Time: time.Now(), + }} + }, + expect: diffResultCount{ + ignore: 1, + }, + }, + } + + for _, tc := range testCases { + t.Run(tc.name, func(t *testing.T) { + alloc := mock.AllocForNode(tc.node) + alloc.JobID = job.ID + alloc.Job = job + alloc.Name = fmt.Sprintf("%s.%s[0]", job.Name, job.TaskGroups[0].Name) + + if tc.allocFn != nil { + tc.allocFn(alloc) + } + + got := diffSystemAllocsForNode( + job, tc.node.ID, eligibleNodes, nil, taintedNodes, + required, []*structs.Allocation{alloc}, terminal, true, + ) + + assert.Len(t, got.place, tc.expect.place, "place") + assert.Len(t, got.update, tc.expect.update, "update") + assert.Len(t, got.migrate, tc.expect.migrate, "migrate") + assert.Len(t, got.stop, tc.expect.stop, "stop") + assert.Len(t, got.ignore, tc.expect.ignore, "ignore") + assert.Len(t, got.lost, tc.expect.lost, "lost") + assert.Len(t, got.disconnecting, tc.expect.disconnecting, "disconnecting") + assert.Len(t, got.reconnecting, tc.expect.reconnecting, "reconnecting") + }) + } +} + +func TestDiffSystemAllocs(t *testing.T) { + ci.Parallel(t) + + job := mock.SystemJob() + + drainNode := mock.DrainNode() + + deadNode := mock.Node() + deadNode.Status = structs.NodeStatusDown + + tainted := map[string]*structs.Node{ + deadNode.ID: deadNode, + drainNode.ID: drainNode, + } + + // Create three alive nodes. + nodes := []*structs.Node{{ID: "foo"}, {ID: "bar"}, {ID: "baz"}, + {ID: "pipe"}, {ID: drainNode.ID}, {ID: deadNode.ID}} + + // The "old" job has a previous modify index + oldJob := new(structs.Job) + *oldJob = *job + oldJob.JobModifyIndex -= 1 + + allocs := []*structs.Allocation{ + // Update allocation on baz + { + ID: uuid.Generate(), + NodeID: "baz", + Name: "my-job.web[0]", + Job: oldJob, + }, + + // Ignore allocation on bar + { + ID: uuid.Generate(), + NodeID: "bar", + Name: "my-job.web[0]", + Job: job, + }, + + // Stop allocation on draining node. + { + ID: uuid.Generate(), + NodeID: drainNode.ID, + Name: "my-job.web[0]", + Job: oldJob, + DesiredTransition: structs.DesiredTransition{ + Migrate: pointer.Of(true), + }, + }, + // Mark as lost on a dead node + { + ID: uuid.Generate(), + NodeID: deadNode.ID, + Name: "my-job.web[0]", + Job: oldJob, + }, + } + + // Have three (?) terminal allocs + terminal := structs.TerminalByNodeByName{ + "pipe": map[string]*structs.Allocation{ + "my-job.web[0]": { + ID: uuid.Generate(), + NodeID: "pipe", + Name: "my-job.web[0]", + Job: job, + }, + }, + } + + diff := diffSystemAllocs(job, nodes, nil, tainted, allocs, terminal, true) + + // We should update the first alloc + require.Len(t, diff.update, 1) + require.Equal(t, allocs[0], diff.update[0].Alloc) + + // We should ignore the second alloc + require.Len(t, diff.ignore, 1) + require.Equal(t, allocs[1], diff.ignore[0].Alloc) + + // We should stop the third alloc + require.Empty(t, diff.stop) + + // There should be no migrates. + require.Len(t, diff.migrate, 1) + require.Equal(t, allocs[2], diff.migrate[0].Alloc) + + // We should mark the 5th alloc as lost + require.Len(t, diff.lost, 1) + require.Equal(t, allocs[3], diff.lost[0].Alloc) + + // We should place 2 + require.Len(t, diff.place, 2) + + // Ensure that the allocations which are replacements of terminal allocs are + // annotated. + for _, m := range terminal { + for _, alloc := range m { + for _, tuple := range diff.place { + if alloc.NodeID == tuple.Alloc.NodeID { + require.Equal(t, alloc, tuple.Alloc) + } + } + } + } +} + +func TestEvictAndPlace_LimitLessThanAllocs(t *testing.T) { + ci.Parallel(t) + + _, ctx := testContext(t) + allocs := []allocTuple{ + {Alloc: &structs.Allocation{ID: uuid.Generate()}}, + {Alloc: &structs.Allocation{ID: uuid.Generate()}}, + {Alloc: &structs.Allocation{ID: uuid.Generate()}}, + {Alloc: &structs.Allocation{ID: uuid.Generate()}}, + } + diff := &diffResult{} + + limit := 2 + require.True(t, evictAndPlace(ctx, diff, allocs, "", &limit), "evictAndReplace() should have returned true") + require.Zero(t, limit, "evictAndReplace() should decremented limit; got %v; want 0", limit) + require.Equal(t, 2, len(diff.place), "evictAndReplace() didn't insert into diffResult properly: %v", diff.place) +} + +func TestEvictAndPlace_LimitEqualToAllocs(t *testing.T) { + ci.Parallel(t) + + _, ctx := testContext(t) + allocs := []allocTuple{ + {Alloc: &structs.Allocation{ID: uuid.Generate()}}, + {Alloc: &structs.Allocation{ID: uuid.Generate()}}, + {Alloc: &structs.Allocation{ID: uuid.Generate()}}, + {Alloc: &structs.Allocation{ID: uuid.Generate()}}, + } + diff := &diffResult{} + + limit := 4 + require.False(t, evictAndPlace(ctx, diff, allocs, "", &limit), "evictAndReplace() should have returned false") + require.Zero(t, limit, "evictAndReplace() should decremented limit; got %v; want 0", limit) + require.Equal(t, 4, len(diff.place), "evictAndReplace() didn't insert into diffResult properly: %v", diff.place) +} + +func TestEvictAndPlace_LimitGreaterThanAllocs(t *testing.T) { + ci.Parallel(t) + + _, ctx := testContext(t) + allocs := []allocTuple{ + {Alloc: &structs.Allocation{ID: uuid.Generate()}}, + {Alloc: &structs.Allocation{ID: uuid.Generate()}}, + {Alloc: &structs.Allocation{ID: uuid.Generate()}}, + {Alloc: &structs.Allocation{ID: uuid.Generate()}}, + } + diff := &diffResult{} + + limit := 6 + require.False(t, evictAndPlace(ctx, diff, allocs, "", &limit)) + require.Equal(t, 2, limit, "evictAndReplace() should decremented limit") + require.Equal(t, 4, len(diff.place), "evictAndReplace() didn't insert into diffResult properly: %v", diff.place) +} diff --git a/scheduler/util.go b/scheduler/util.go index c02e05840..0a4572a0b 100644 --- a/scheduler/util.go +++ b/scheduler/util.go @@ -5,7 +5,6 @@ import ( "fmt" "math/rand" "reflect" - "time" log "github.com/hashicorp/go-hclog" memdb "github.com/hashicorp/go-memdb" @@ -20,23 +19,6 @@ type allocTuple struct { Alloc *structs.Allocation } -// materializeTaskGroups is used to materialize all the task groups -// a job requires. This is used to do the count expansion. -func materializeTaskGroups(job *structs.Job) map[string]*structs.TaskGroup { - out := make(map[string]*structs.TaskGroup) - if job.Stopped() { - return out - } - - for _, tg := range job.TaskGroups { - for i := 0; i < tg.Count; i++ { - name := fmt.Sprintf("%s.%s[%d]", job.Name, tg.Name, i) - out[name] = tg - } - } - return out -} - // diffResult is used to return the sets that result from the diff type diffResult struct { place, update, migrate, stop, ignore, lost, disconnecting, reconnecting []allocTuple @@ -58,305 +40,6 @@ func (d *diffResult) Append(other *diffResult) { d.reconnecting = append(d.reconnecting, other.reconnecting...) } -// diffSystemAllocsForNode is used to do a set difference between the target allocations -// and the existing allocations for a particular node. This returns 8 sets of results, -// the list of named task groups that need to be placed (no existing allocation), the -// allocations that need to be updated (job definition is newer), allocs that -// need to be migrated (node is draining), the allocs that need to be evicted -// (no longer required), those that should be ignored, those that are lost -// that need to be replaced (running on a lost node), those that are running on -// a disconnected node but may resume, and those that may still be running on -// a node that has resumed reconnected. -func diffSystemAllocsForNode( - job *structs.Job, // job whose allocs are going to be diff-ed - nodeID string, - eligibleNodes map[string]*structs.Node, - notReadyNodes map[string]struct{}, // nodes that are not ready, e.g. draining - taintedNodes map[string]*structs.Node, // nodes which are down (by node id) - required map[string]*structs.TaskGroup, // set of allocations that must exist - allocs []*structs.Allocation, // non-terminal allocations that exist - terminal structs.TerminalByNodeByName, // latest terminal allocations (by node, id) - serverSupportsDisconnectedClients bool, // flag indicating whether to apply disconnected client logic -) *diffResult { - result := new(diffResult) - - // Scan the existing updates - existing := make(map[string]struct{}) // set of alloc names - for _, exist := range allocs { - // Index the existing node - name := exist.Name - existing[name] = struct{}{} - - // Check for the definition in the required set - tg, ok := required[name] - - // If not required, we stop the alloc - if !ok { - result.stop = append(result.stop, allocTuple{ - Name: name, - TaskGroup: tg, - Alloc: exist, - }) - continue - } - - supportsDisconnectedClients := exist.SupportsDisconnectedClients(serverSupportsDisconnectedClients) - - reconnect := false - expired := false - - // Only compute reconnect for unknown and running since they need to go - // through the reconnect process. - if supportsDisconnectedClients && - (exist.ClientStatus == structs.AllocClientStatusUnknown || - exist.ClientStatus == structs.AllocClientStatusRunning) { - reconnect = exist.NeedsToReconnect() - if reconnect { - expired = exist.Expired(time.Now()) - } - } - - // If we have been marked for migration and aren't terminal, migrate - if !exist.TerminalStatus() && exist.DesiredTransition.ShouldMigrate() { - result.migrate = append(result.migrate, allocTuple{ - Name: name, - TaskGroup: tg, - Alloc: exist, - }) - continue - } - - // If we are a sysbatch job and terminal, ignore (or stop?) the alloc - if job.Type == structs.JobTypeSysBatch && exist.TerminalStatus() { - result.ignore = append(result.ignore, allocTuple{ - Name: name, - TaskGroup: tg, - Alloc: exist, - }) - continue - } - - // Expired unknown allocs are lost. Expired checks that status is unknown. - if supportsDisconnectedClients && expired { - result.lost = append(result.lost, allocTuple{ - Name: name, - TaskGroup: tg, - Alloc: exist, - }) - continue - } - - // Ignore unknown allocs that we want to reconnect eventually. - if supportsDisconnectedClients && - exist.ClientStatus == structs.AllocClientStatusUnknown && - exist.DesiredStatus == structs.AllocDesiredStatusRun { - result.ignore = append(result.ignore, allocTuple{ - Name: name, - TaskGroup: tg, - Alloc: exist, - }) - continue - } - - node, nodeIsTainted := taintedNodes[exist.NodeID] - - // Filter allocs on a node that is now re-connected to reconnecting. - if supportsDisconnectedClients && - !nodeIsTainted && - reconnect { - - // Record the new ClientStatus to indicate to future evals that the - // alloc has already reconnected. - reconnecting := exist.Copy() - reconnecting.AppendState(structs.AllocStateFieldClientStatus, exist.ClientStatus) - result.reconnecting = append(result.reconnecting, allocTuple{ - Name: name, - TaskGroup: tg, - Alloc: reconnecting, - }) - continue - } - - // If we are on a tainted node, we must migrate if we are a service or - // if the batch allocation did not finish - if nodeIsTainted { - // If the job is batch and finished successfully, the fact that the - // node is tainted does not mean it should be migrated or marked as - // lost as the work was already successfully finished. However for - // service/system jobs, tasks should never complete. The check of - // batch type, defends against client bugs. - if exist.Job.Type == structs.JobTypeSysBatch && exist.RanSuccessfully() { - goto IGNORE - } - - // Filter running allocs on a node that is disconnected to be marked as unknown. - if node != nil && - supportsDisconnectedClients && - node.Status == structs.NodeStatusDisconnected && - exist.ClientStatus == structs.AllocClientStatusRunning { - - disconnect := exist.Copy() - disconnect.ClientStatus = structs.AllocClientStatusUnknown - disconnect.AppendState(structs.AllocStateFieldClientStatus, structs.AllocClientStatusUnknown) - disconnect.ClientDescription = allocUnknown - result.disconnecting = append(result.disconnecting, allocTuple{ - Name: name, - TaskGroup: tg, - Alloc: disconnect, - }) - continue - } - - if !exist.TerminalStatus() && (node == nil || node.TerminalStatus()) { - result.lost = append(result.lost, allocTuple{ - Name: name, - TaskGroup: tg, - Alloc: exist, - }) - } else { - goto IGNORE - } - - continue - } - - // For an existing allocation, if the nodeID is no longer - // eligible, the diff should be ignored - if _, ineligible := notReadyNodes[nodeID]; ineligible { - goto IGNORE - } - - // Existing allocations on nodes that are no longer targeted - // should be stopped - if _, eligible := eligibleNodes[nodeID]; !eligible { - result.stop = append(result.stop, allocTuple{ - Name: name, - TaskGroup: tg, - Alloc: exist, - }) - continue - } - - // If the definition is updated we need to update - if job.JobModifyIndex != exist.Job.JobModifyIndex { - result.update = append(result.update, allocTuple{ - Name: name, - TaskGroup: tg, - Alloc: exist, - }) - continue - } - - // Everything is up-to-date - IGNORE: - result.ignore = append(result.ignore, allocTuple{ - Name: name, - TaskGroup: tg, - Alloc: exist, - }) - } - - // Scan the required groups - for name, tg := range required { - - // Check for an existing allocation - if _, ok := existing[name]; !ok { - - // Check for a terminal sysbatch allocation, which should be not placed - // again unless the job has been updated. - if job.Type == structs.JobTypeSysBatch { - if alloc, termExists := terminal.Get(nodeID, name); termExists { - // the alloc is terminal, but now the job has been updated - if job.JobModifyIndex != alloc.Job.JobModifyIndex { - result.update = append(result.update, allocTuple{ - Name: name, - TaskGroup: tg, - Alloc: alloc, - }) - } else { - // alloc is terminal and job unchanged, leave it alone - result.ignore = append(result.ignore, allocTuple{ - Name: name, - TaskGroup: tg, - Alloc: alloc, - }) - } - continue - } - } - - // Require a placement if no existing allocation. If there - // is an existing allocation, we would have checked for a potential - // update or ignore above. Ignore placements for tainted or - // ineligible nodes - - // Tainted and ineligible nodes for a non existing alloc - // should be filtered out and not count towards ignore or place - if _, tainted := taintedNodes[nodeID]; tainted { - continue - } - if _, eligible := eligibleNodes[nodeID]; !eligible { - continue - } - - termOnNode, _ := terminal.Get(nodeID, name) - allocTuple := allocTuple{ - Name: name, - TaskGroup: tg, - Alloc: termOnNode, - } - - // If the new allocation isn't annotated with a previous allocation - // or if the previous allocation isn't from the same node then we - // annotate the allocTuple with a new Allocation - if allocTuple.Alloc == nil || allocTuple.Alloc.NodeID != nodeID { - allocTuple.Alloc = &structs.Allocation{NodeID: nodeID} - } - - result.place = append(result.place, allocTuple) - } - } - return result -} - -// diffSystemAllocs is like diffSystemAllocsForNode however, the allocations in the -// diffResult contain the specific nodeID they should be allocated on. -func diffSystemAllocs( - job *structs.Job, // jobs whose allocations are going to be diff-ed - readyNodes []*structs.Node, // list of nodes in the ready state - notReadyNodes map[string]struct{}, // list of nodes in DC but not ready, e.g. draining - taintedNodes map[string]*structs.Node, // nodes which are down or drain mode (by node id) - allocs []*structs.Allocation, // non-terminal allocations - terminal structs.TerminalByNodeByName, // latest terminal allocations (by node id) - serverSupportsDisconnectedClients bool, // flag indicating whether to apply disconnected client logic -) *diffResult { - - // Build a mapping of nodes to all their allocs. - nodeAllocs := make(map[string][]*structs.Allocation, len(allocs)) - for _, alloc := range allocs { - nodeAllocs[alloc.NodeID] = append(nodeAllocs[alloc.NodeID], alloc) - } - - eligibleNodes := make(map[string]*structs.Node) - for _, node := range readyNodes { - if _, ok := nodeAllocs[node.ID]; !ok { - nodeAllocs[node.ID] = nil - } - eligibleNodes[node.ID] = node - } - - // Create the required task groups. - required := materializeTaskGroups(job) - - result := new(diffResult) - for nodeID, allocs := range nodeAllocs { - diff := diffSystemAllocsForNode(job, nodeID, eligibleNodes, notReadyNodes, taintedNodes, required, allocs, terminal, serverSupportsDisconnectedClients) - result.Append(diff) - } - - return result -} - // readyNodesInDCs returns all the ready nodes in the given datacenters and a // mapping of each data center to the count of ready nodes. func readyNodesInDCs(state State, dcs []string) ([]*structs.Node, map[string]struct{}, map[string]int, error) { @@ -951,50 +634,6 @@ func inplaceUpdate(ctx Context, eval *structs.Evaluation, job *structs.Job, return updates[:n], updates[n:] } -// evictAndPlace is used to mark allocations for evicts and add them to the -// placement queue. evictAndPlace modifies both the diffResult and the -// limit. It returns true if the limit has been reached. -func evictAndPlace(ctx Context, diff *diffResult, allocs []allocTuple, desc string, limit *int) bool { - n := len(allocs) - for i := 0; i < n && i < *limit; i++ { - a := allocs[i] - ctx.Plan().AppendStoppedAlloc(a.Alloc, desc, "", "") - diff.place = append(diff.place, a) - } - if n <= *limit { - *limit -= n - return false - } - *limit = 0 - return true -} - -// tgConstrainTuple is used to store the total constraints of a task group. -type tgConstrainTuple struct { - // Holds the combined constraints of the task group and all it's sub-tasks. - constraints []*structs.Constraint - - // The set of required drivers within the task group. - drivers map[string]struct{} -} - -// taskGroupConstraints collects the constraints, drivers and resources required by each -// sub-task to aggregate the TaskGroup totals -func taskGroupConstraints(tg *structs.TaskGroup) tgConstrainTuple { - c := tgConstrainTuple{ - constraints: make([]*structs.Constraint, 0, len(tg.Constraints)), - drivers: make(map[string]struct{}), - } - - c.constraints = append(c.constraints, tg.Constraints...) - for _, task := range tg.Tasks { - c.drivers[task.Driver] = struct{}{} - c.constraints = append(c.constraints, task.Constraints...) - } - - return c -} - // desiredUpdates takes the diffResult as well as the set of inplace and // destructive updates and returns a map of task groups to their set of desired // updates. diff --git a/scheduler/util_test.go b/scheduler/util_test.go index c8285709a..9936f47a0 100644 --- a/scheduler/util_test.go +++ b/scheduler/util_test.go @@ -1,14 +1,12 @@ package scheduler import ( - "fmt" "reflect" "testing" "time" "github.com/hashicorp/nomad/ci" "github.com/shoenig/test/must" - "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" "github.com/hashicorp/nomad/helper/pointer" @@ -19,537 +17,12 @@ import ( "github.com/hashicorp/nomad/nomad/structs" ) -func TestMaterializeTaskGroups(t *testing.T) { - ci.Parallel(t) - - job := mock.Job() - index := materializeTaskGroups(job) - require.Equal(t, 10, len(index)) - - for i := 0; i < 10; i++ { - name := fmt.Sprintf("my-job.web[%d]", i) - require.Contains(t, index, name) - require.Equal(t, job.TaskGroups[0], index[name]) - } -} - func newNode(name string) *structs.Node { n := mock.Node() n.Name = name return n } -func TestDiffSystemAllocsForNode_Sysbatch_terminal(t *testing.T) { - ci.Parallel(t) - - // For a sysbatch job, the scheduler should not re-place an allocation - // that has become terminal, unless the job has been updated. - - job := mock.SystemBatchJob() - required := materializeTaskGroups(job) - - eligible := map[string]*structs.Node{ - "node1": newNode("node1"), - } - - var live []*structs.Allocation // empty - - tainted := map[string]*structs.Node(nil) - - t.Run("current job", func(t *testing.T) { - terminal := structs.TerminalByNodeByName{ - "node1": map[string]*structs.Allocation{ - "my-sysbatch.pinger[0]": { - ID: uuid.Generate(), - NodeID: "node1", - Name: "my-sysbatch.pinger[0]", - Job: job, - ClientStatus: structs.AllocClientStatusComplete, - }, - }, - } - - diff := diffSystemAllocsForNode(job, "node1", eligible, nil, tainted, required, live, terminal, true) - require.Empty(t, diff.place) - require.Empty(t, diff.update) - require.Empty(t, diff.stop) - require.Empty(t, diff.migrate) - require.Empty(t, diff.lost) - require.True(t, len(diff.ignore) == 1 && diff.ignore[0].Alloc == terminal["node1"]["my-sysbatch.pinger[0]"]) - }) - - t.Run("outdated job", func(t *testing.T) { - previousJob := job.Copy() - previousJob.JobModifyIndex -= 1 - terminal := structs.TerminalByNodeByName{ - "node1": map[string]*structs.Allocation{ - "my-sysbatch.pinger[0]": { - ID: uuid.Generate(), - NodeID: "node1", - Name: "my-sysbatch.pinger[0]", - Job: previousJob, - }, - }, - } - - expAlloc := terminal["node1"]["my-sysbatch.pinger[0]"] - expAlloc.NodeID = "node1" - - diff := diffSystemAllocsForNode(job, "node1", eligible, nil, tainted, required, live, terminal, true) - require.Empty(t, diff.place) - require.Len(t, diff.update, 1) - require.Empty(t, diff.stop) - require.Empty(t, diff.migrate) - require.Empty(t, diff.lost) - require.Empty(t, diff.ignore) - }) -} - -func TestDiffSystemAllocsForNode(t *testing.T) { - ci.Parallel(t) - - job := mock.Job() - required := materializeTaskGroups(job) - - // The "old" job has a previous modify index - oldJob := new(structs.Job) - *oldJob = *job - oldJob.JobModifyIndex -= 1 - - eligibleNode := mock.Node() - eligibleNode.ID = "zip" - - drainNode := mock.DrainNode() - - deadNode := mock.Node() - deadNode.Status = structs.NodeStatusDown - - tainted := map[string]*structs.Node{ - "dead": deadNode, - "drainNode": drainNode, - } - - eligible := map[string]*structs.Node{ - eligibleNode.ID: eligibleNode, - } - - allocs := []*structs.Allocation{ - // Update the 1st - { - ID: uuid.Generate(), - NodeID: "zip", - Name: "my-job.web[0]", - Job: oldJob, - }, - - // Ignore the 2rd - { - ID: uuid.Generate(), - NodeID: "zip", - Name: "my-job.web[1]", - Job: job, - }, - - // Evict 11th - { - ID: uuid.Generate(), - NodeID: "zip", - Name: "my-job.web[10]", - Job: oldJob, - }, - - // Migrate the 3rd - { - ID: uuid.Generate(), - NodeID: "drainNode", - Name: "my-job.web[2]", - Job: oldJob, - DesiredTransition: structs.DesiredTransition{ - Migrate: pointer.Of(true), - }, - }, - // Mark the 4th lost - { - ID: uuid.Generate(), - NodeID: "dead", - Name: "my-job.web[3]", - Job: oldJob, - }, - } - - // Have three terminal allocs - terminal := structs.TerminalByNodeByName{ - "zip": map[string]*structs.Allocation{ - "my-job.web[4]": { - ID: uuid.Generate(), - NodeID: "zip", - Name: "my-job.web[4]", - Job: job, - }, - "my-job.web[5]": { - ID: uuid.Generate(), - NodeID: "zip", - Name: "my-job.web[5]", - Job: job, - }, - "my-job.web[6]": { - ID: uuid.Generate(), - NodeID: "zip", - Name: "my-job.web[6]", - Job: job, - }, - }, - } - - diff := diffSystemAllocsForNode(job, "zip", eligible, nil, tainted, required, allocs, terminal, true) - - // We should update the first alloc - require.Len(t, diff.update, 1) - require.Equal(t, allocs[0], diff.update[0].Alloc) - - // We should ignore the second alloc - require.Len(t, diff.ignore, 1) - require.Equal(t, allocs[1], diff.ignore[0].Alloc) - - // We should stop the 3rd alloc - require.Len(t, diff.stop, 1) - require.Equal(t, allocs[2], diff.stop[0].Alloc) - - // We should migrate the 4rd alloc - require.Len(t, diff.migrate, 1) - require.Equal(t, allocs[3], diff.migrate[0].Alloc) - - // We should mark the 5th alloc as lost - require.Len(t, diff.lost, 1) - require.Equal(t, allocs[4], diff.lost[0].Alloc) - - // We should place 6 - require.Len(t, diff.place, 6) - - // Ensure that the allocations which are replacements of terminal allocs are - // annotated. - for _, m := range terminal { - for _, alloc := range m { - for _, tuple := range diff.place { - if alloc.Name == tuple.Name { - require.Equal(t, alloc, tuple.Alloc) - } - } - } - } -} - -// Test the desired diff for an updated system job running on a -// ineligible node -func TestDiffSystemAllocsForNode_ExistingAllocIneligibleNode(t *testing.T) { - ci.Parallel(t) - - job := mock.Job() - job.TaskGroups[0].Count = 1 - required := materializeTaskGroups(job) - - // The "old" job has a previous modify index - oldJob := new(structs.Job) - *oldJob = *job - oldJob.JobModifyIndex -= 1 - - eligibleNode := mock.Node() - ineligibleNode := mock.Node() - ineligibleNode.SchedulingEligibility = structs.NodeSchedulingIneligible - - tainted := map[string]*structs.Node{} - - eligible := map[string]*structs.Node{ - eligibleNode.ID: eligibleNode, - } - - allocs := []*structs.Allocation{ - // Update the TG alloc running on eligible node - { - ID: uuid.Generate(), - NodeID: eligibleNode.ID, - Name: "my-job.web[0]", - Job: oldJob, - }, - - // Ignore the TG alloc running on ineligible node - { - ID: uuid.Generate(), - NodeID: ineligibleNode.ID, - Name: "my-job.web[0]", - Job: job, - }, - } - - // No terminal allocs - terminal := make(structs.TerminalByNodeByName) - - diff := diffSystemAllocsForNode(job, eligibleNode.ID, eligible, nil, tainted, required, allocs, terminal, true) - - require.Len(t, diff.place, 0) - require.Len(t, diff.update, 1) - require.Len(t, diff.migrate, 0) - require.Len(t, diff.stop, 0) - require.Len(t, diff.ignore, 1) - require.Len(t, diff.lost, 0) -} - -func TestDiffSystemAllocsForNode_DisconnectedNode(t *testing.T) { - ci.Parallel(t) - - // Create job. - job := mock.SystemJob() - job.TaskGroups[0].MaxClientDisconnect = pointer.Of(time.Hour) - - // Create nodes. - readyNode := mock.Node() - readyNode.Status = structs.NodeStatusReady - - disconnectedNode := mock.Node() - disconnectedNode.Status = structs.NodeStatusDisconnected - - eligibleNodes := map[string]*structs.Node{ - readyNode.ID: readyNode, - } - - taintedNodes := map[string]*structs.Node{ - disconnectedNode.ID: disconnectedNode, - } - - // Create allocs. - required := materializeTaskGroups(job) - terminal := make(structs.TerminalByNodeByName) - - type diffResultCount struct { - place, update, migrate, stop, ignore, lost, disconnecting, reconnecting int - } - - testCases := []struct { - name string - node *structs.Node - allocFn func(*structs.Allocation) - expect diffResultCount - }{ - { - name: "alloc in disconnected client is marked as unknown", - node: disconnectedNode, - allocFn: func(alloc *structs.Allocation) { - alloc.ClientStatus = structs.AllocClientStatusRunning - }, - expect: diffResultCount{ - disconnecting: 1, - }, - }, - { - name: "disconnected alloc reconnects", - node: readyNode, - allocFn: func(alloc *structs.Allocation) { - alloc.ClientStatus = structs.AllocClientStatusRunning - - alloc.AllocStates = []*structs.AllocState{{ - Field: structs.AllocStateFieldClientStatus, - Value: structs.AllocClientStatusUnknown, - Time: time.Now().Add(-time.Minute), - }} - }, - expect: diffResultCount{ - reconnecting: 1, - }, - }, - { - name: "alloc not reconnecting after it reconnects", - node: readyNode, - allocFn: func(alloc *structs.Allocation) { - alloc.ClientStatus = structs.AllocClientStatusRunning - - alloc.AllocStates = []*structs.AllocState{ - { - Field: structs.AllocStateFieldClientStatus, - Value: structs.AllocClientStatusUnknown, - Time: time.Now().Add(-time.Minute), - }, - { - Field: structs.AllocStateFieldClientStatus, - Value: structs.AllocClientStatusRunning, - Time: time.Now(), - }, - } - }, - expect: diffResultCount{ - ignore: 1, - }, - }, - { - name: "disconnected alloc is lost after it expires", - node: disconnectedNode, - allocFn: func(alloc *structs.Allocation) { - alloc.ClientStatus = structs.AllocClientStatusUnknown - - alloc.AllocStates = []*structs.AllocState{{ - Field: structs.AllocStateFieldClientStatus, - Value: structs.AllocClientStatusUnknown, - Time: time.Now().Add(-10 * time.Hour), - }} - }, - expect: diffResultCount{ - lost: 1, - }, - }, - { - name: "disconnected allocs are ignored", - node: disconnectedNode, - allocFn: func(alloc *structs.Allocation) { - alloc.ClientStatus = structs.AllocClientStatusUnknown - - alloc.AllocStates = []*structs.AllocState{{ - Field: structs.AllocStateFieldClientStatus, - Value: structs.AllocClientStatusUnknown, - Time: time.Now(), - }} - }, - expect: diffResultCount{ - ignore: 1, - }, - }, - } - - for _, tc := range testCases { - t.Run(tc.name, func(t *testing.T) { - alloc := mock.AllocForNode(tc.node) - alloc.JobID = job.ID - alloc.Job = job - alloc.Name = fmt.Sprintf("%s.%s[0]", job.Name, job.TaskGroups[0].Name) - - if tc.allocFn != nil { - tc.allocFn(alloc) - } - - got := diffSystemAllocsForNode( - job, tc.node.ID, eligibleNodes, nil, taintedNodes, - required, []*structs.Allocation{alloc}, terminal, true, - ) - - assert.Len(t, got.place, tc.expect.place, "place") - assert.Len(t, got.update, tc.expect.update, "update") - assert.Len(t, got.migrate, tc.expect.migrate, "migrate") - assert.Len(t, got.stop, tc.expect.stop, "stop") - assert.Len(t, got.ignore, tc.expect.ignore, "ignore") - assert.Len(t, got.lost, tc.expect.lost, "lost") - assert.Len(t, got.disconnecting, tc.expect.disconnecting, "disconnecting") - assert.Len(t, got.reconnecting, tc.expect.reconnecting, "reconnecting") - }) - } -} - -func TestDiffSystemAllocs(t *testing.T) { - ci.Parallel(t) - - job := mock.SystemJob() - - drainNode := mock.DrainNode() - - deadNode := mock.Node() - deadNode.Status = structs.NodeStatusDown - - tainted := map[string]*structs.Node{ - deadNode.ID: deadNode, - drainNode.ID: drainNode, - } - - // Create three alive nodes. - nodes := []*structs.Node{{ID: "foo"}, {ID: "bar"}, {ID: "baz"}, - {ID: "pipe"}, {ID: drainNode.ID}, {ID: deadNode.ID}} - - // The "old" job has a previous modify index - oldJob := new(structs.Job) - *oldJob = *job - oldJob.JobModifyIndex -= 1 - - allocs := []*structs.Allocation{ - // Update allocation on baz - { - ID: uuid.Generate(), - NodeID: "baz", - Name: "my-job.web[0]", - Job: oldJob, - }, - - // Ignore allocation on bar - { - ID: uuid.Generate(), - NodeID: "bar", - Name: "my-job.web[0]", - Job: job, - }, - - // Stop allocation on draining node. - { - ID: uuid.Generate(), - NodeID: drainNode.ID, - Name: "my-job.web[0]", - Job: oldJob, - DesiredTransition: structs.DesiredTransition{ - Migrate: pointer.Of(true), - }, - }, - // Mark as lost on a dead node - { - ID: uuid.Generate(), - NodeID: deadNode.ID, - Name: "my-job.web[0]", - Job: oldJob, - }, - } - - // Have three (?) terminal allocs - terminal := structs.TerminalByNodeByName{ - "pipe": map[string]*structs.Allocation{ - "my-job.web[0]": { - ID: uuid.Generate(), - NodeID: "pipe", - Name: "my-job.web[0]", - Job: job, - }, - }, - } - - diff := diffSystemAllocs(job, nodes, nil, tainted, allocs, terminal, true) - - // We should update the first alloc - require.Len(t, diff.update, 1) - require.Equal(t, allocs[0], diff.update[0].Alloc) - - // We should ignore the second alloc - require.Len(t, diff.ignore, 1) - require.Equal(t, allocs[1], diff.ignore[0].Alloc) - - // We should stop the third alloc - require.Empty(t, diff.stop) - - // There should be no migrates. - require.Len(t, diff.migrate, 1) - require.Equal(t, allocs[2], diff.migrate[0].Alloc) - - // We should mark the 5th alloc as lost - require.Len(t, diff.lost, 1) - require.Equal(t, allocs[3], diff.lost[0].Alloc) - - // We should place 2 - require.Len(t, diff.place, 2) - - // Ensure that the allocations which are replacements of terminal allocs are - // annotated. - for _, m := range terminal { - for _, alloc := range m { - for _, tuple := range diff.place { - if alloc.NodeID == tuple.Alloc.NodeID { - require.Equal(t, alloc, tuple.Alloc) - } - } - } - } -} - func TestReadyNodesInDCs(t *testing.T) { ci.Parallel(t) @@ -1177,42 +650,6 @@ func TestNetworkUpdated(t *testing.T) { } } -func TestEvictAndPlace_LimitLessThanAllocs(t *testing.T) { - ci.Parallel(t) - - _, ctx := testContext(t) - allocs := []allocTuple{ - {Alloc: &structs.Allocation{ID: uuid.Generate()}}, - {Alloc: &structs.Allocation{ID: uuid.Generate()}}, - {Alloc: &structs.Allocation{ID: uuid.Generate()}}, - {Alloc: &structs.Allocation{ID: uuid.Generate()}}, - } - diff := &diffResult{} - - limit := 2 - require.True(t, evictAndPlace(ctx, diff, allocs, "", &limit), "evictAndReplace() should have returned true") - require.Zero(t, limit, "evictAndReplace() should decremented limit; got %v; want 0", limit) - require.Equal(t, 2, len(diff.place), "evictAndReplace() didn't insert into diffResult properly: %v", diff.place) -} - -func TestEvictAndPlace_LimitEqualToAllocs(t *testing.T) { - ci.Parallel(t) - - _, ctx := testContext(t) - allocs := []allocTuple{ - {Alloc: &structs.Allocation{ID: uuid.Generate()}}, - {Alloc: &structs.Allocation{ID: uuid.Generate()}}, - {Alloc: &structs.Allocation{ID: uuid.Generate()}}, - {Alloc: &structs.Allocation{ID: uuid.Generate()}}, - } - diff := &diffResult{} - - limit := 4 - require.False(t, evictAndPlace(ctx, diff, allocs, "", &limit), "evictAndReplace() should have returned false") - require.Zero(t, limit, "evictAndReplace() should decremented limit; got %v; want 0", limit) - require.Equal(t, 4, len(diff.place), "evictAndReplace() didn't insert into diffResult properly: %v", diff.place) -} - func TestSetStatus(t *testing.T) { ci.Parallel(t) @@ -1535,22 +972,99 @@ func TestInplaceUpdate_Success(t *testing.T) { } } -func TestEvictAndPlace_LimitGreaterThanAllocs(t *testing.T) { +func TestUtil_connectUpdated(t *testing.T) { ci.Parallel(t) - _, ctx := testContext(t) - allocs := []allocTuple{ - {Alloc: &structs.Allocation{ID: uuid.Generate()}}, - {Alloc: &structs.Allocation{ID: uuid.Generate()}}, - {Alloc: &structs.Allocation{ID: uuid.Generate()}}, - {Alloc: &structs.Allocation{ID: uuid.Generate()}}, - } - diff := &diffResult{} + t.Run("both nil", func(t *testing.T) { + require.False(t, connectUpdated(nil, nil)) + }) - limit := 6 - require.False(t, evictAndPlace(ctx, diff, allocs, "", &limit)) - require.Equal(t, 2, limit, "evictAndReplace() should decremented limit") - require.Equal(t, 4, len(diff.place), "evictAndReplace() didn't insert into diffResult properly: %v", diff.place) + t.Run("one nil", func(t *testing.T) { + require.True(t, connectUpdated(nil, new(structs.ConsulConnect))) + }) + + t.Run("native differ", func(t *testing.T) { + a := &structs.ConsulConnect{Native: true} + b := &structs.ConsulConnect{Native: false} + require.True(t, connectUpdated(a, b)) + }) + + t.Run("gateway differ", func(t *testing.T) { + a := &structs.ConsulConnect{Gateway: &structs.ConsulGateway{ + Ingress: new(structs.ConsulIngressConfigEntry), + }} + b := &structs.ConsulConnect{Gateway: &structs.ConsulGateway{ + Terminating: new(structs.ConsulTerminatingConfigEntry), + }} + require.True(t, connectUpdated(a, b)) + }) + + t.Run("sidecar task differ", func(t *testing.T) { + a := &structs.ConsulConnect{SidecarTask: &structs.SidecarTask{ + Driver: "exec", + }} + b := &structs.ConsulConnect{SidecarTask: &structs.SidecarTask{ + Driver: "docker", + }} + require.True(t, connectUpdated(a, b)) + }) + + t.Run("sidecar service differ", func(t *testing.T) { + a := &structs.ConsulConnect{SidecarService: &structs.ConsulSidecarService{ + Port: "1111", + }} + b := &structs.ConsulConnect{SidecarService: &structs.ConsulSidecarService{ + Port: "2222", + }} + require.True(t, connectUpdated(a, b)) + }) + + t.Run("same", func(t *testing.T) { + a := new(structs.ConsulConnect) + b := new(structs.ConsulConnect) + require.False(t, connectUpdated(a, b)) + }) +} + +func TestUtil_connectSidecarServiceUpdated(t *testing.T) { + ci.Parallel(t) + + t.Run("both nil", func(t *testing.T) { + require.False(t, connectSidecarServiceUpdated(nil, nil)) + }) + + t.Run("one nil", func(t *testing.T) { + require.True(t, connectSidecarServiceUpdated(nil, new(structs.ConsulSidecarService))) + }) + + t.Run("ports differ", func(t *testing.T) { + a := &structs.ConsulSidecarService{Port: "1111"} + b := &structs.ConsulSidecarService{Port: "2222"} + require.True(t, connectSidecarServiceUpdated(a, b)) + }) + + t.Run("same", func(t *testing.T) { + a := &structs.ConsulSidecarService{Port: "1111"} + b := &structs.ConsulSidecarService{Port: "1111"} + require.False(t, connectSidecarServiceUpdated(a, b)) + }) +} + +func TestTasksUpdated_Identity(t *testing.T) { + ci.Parallel(t) + + j1 := mock.Job() + name := j1.TaskGroups[0].Name + j1.TaskGroups[0].Tasks[0].Identity = nil + + j2 := j1.Copy() + + must.False(t, tasksUpdated(j1, j2, name)) + + // Set identity on j1 and assert update + j1.TaskGroups[0].Tasks[0].Identity = &structs.WorkloadIdentity{} + + must.True(t, tasksUpdated(j1, j2, name)) } func TestTaskGroupConstraints(t *testing.T) { @@ -1770,98 +1284,3 @@ func TestUtil_UpdateNonTerminalAllocsToLost(t *testing.T) { expected = []string{} require.True(t, reflect.DeepEqual(allocsLost, expected), "actual: %v, expected: %v", allocsLost, expected) } - -func TestUtil_connectUpdated(t *testing.T) { - ci.Parallel(t) - - t.Run("both nil", func(t *testing.T) { - require.False(t, connectUpdated(nil, nil)) - }) - - t.Run("one nil", func(t *testing.T) { - require.True(t, connectUpdated(nil, new(structs.ConsulConnect))) - }) - - t.Run("native differ", func(t *testing.T) { - a := &structs.ConsulConnect{Native: true} - b := &structs.ConsulConnect{Native: false} - require.True(t, connectUpdated(a, b)) - }) - - t.Run("gateway differ", func(t *testing.T) { - a := &structs.ConsulConnect{Gateway: &structs.ConsulGateway{ - Ingress: new(structs.ConsulIngressConfigEntry), - }} - b := &structs.ConsulConnect{Gateway: &structs.ConsulGateway{ - Terminating: new(structs.ConsulTerminatingConfigEntry), - }} - require.True(t, connectUpdated(a, b)) - }) - - t.Run("sidecar task differ", func(t *testing.T) { - a := &structs.ConsulConnect{SidecarTask: &structs.SidecarTask{ - Driver: "exec", - }} - b := &structs.ConsulConnect{SidecarTask: &structs.SidecarTask{ - Driver: "docker", - }} - require.True(t, connectUpdated(a, b)) - }) - - t.Run("sidecar service differ", func(t *testing.T) { - a := &structs.ConsulConnect{SidecarService: &structs.ConsulSidecarService{ - Port: "1111", - }} - b := &structs.ConsulConnect{SidecarService: &structs.ConsulSidecarService{ - Port: "2222", - }} - require.True(t, connectUpdated(a, b)) - }) - - t.Run("same", func(t *testing.T) { - a := new(structs.ConsulConnect) - b := new(structs.ConsulConnect) - require.False(t, connectUpdated(a, b)) - }) -} - -func TestUtil_connectSidecarServiceUpdated(t *testing.T) { - ci.Parallel(t) - - t.Run("both nil", func(t *testing.T) { - require.False(t, connectSidecarServiceUpdated(nil, nil)) - }) - - t.Run("one nil", func(t *testing.T) { - require.True(t, connectSidecarServiceUpdated(nil, new(structs.ConsulSidecarService))) - }) - - t.Run("ports differ", func(t *testing.T) { - a := &structs.ConsulSidecarService{Port: "1111"} - b := &structs.ConsulSidecarService{Port: "2222"} - require.True(t, connectSidecarServiceUpdated(a, b)) - }) - - t.Run("same", func(t *testing.T) { - a := &structs.ConsulSidecarService{Port: "1111"} - b := &structs.ConsulSidecarService{Port: "1111"} - require.False(t, connectSidecarServiceUpdated(a, b)) - }) -} - -func TestTasksUpdated_Identity(t *testing.T) { - ci.Parallel(t) - - j1 := mock.Job() - name := j1.TaskGroups[0].Name - j1.TaskGroups[0].Tasks[0].Identity = nil - - j2 := j1.Copy() - - must.False(t, tasksUpdated(j1, j2, name)) - - // Set identity on j1 and assert update - j1.TaskGroups[0].Tasks[0].Identity = &structs.WorkloadIdentity{} - - must.True(t, tasksUpdated(j1, j2, name)) -}