From e06ff1d61322248d52ab81f3477ce44b316a5cc3 Mon Sep 17 00:00:00 2001 From: Mahmood Ali Date: Wed, 27 Oct 2021 07:04:13 -0700 Subject: [PATCH] scheduler: stop allocs in unrelated nodes (#11391) The system scheduler should leave allocs on draining nodes as-is, but stop node stop allocs on nodes that are no longer part of the job datacenters. Previously, the scheduler did not make the distinction and left system job allocs intact if they are already running. I've added a failing test first, which you can see in https://app.circleci.com/jobs/github/hashicorp/nomad/179661 . Fixes https://github.com/hashicorp/nomad/issues/11373 --- .changelog/11391.txt | 3 + scheduler/generic_sched.go | 2 +- scheduler/scheduler_system.go | 9 +-- scheduler/scheduler_system_test.go | 85 ++++++++++++++++++++++++++++ scheduler/util.go | 40 ++++++++----- scheduler/util_test.go | 91 ++++++++++++++---------------- 6 files changed, 163 insertions(+), 67 deletions(-) create mode 100644 .changelog/11391.txt diff --git a/.changelog/11391.txt b/.changelog/11391.txt new file mode 100644 index 000000000..6afb8a976 --- /dev/null +++ b/.changelog/11391.txt @@ -0,0 +1,3 @@ +```release-note:bug +core: Fix a bug to stop running system job allocations once their datacenters are removed from the job +``` diff --git a/scheduler/generic_sched.go b/scheduler/generic_sched.go index e5273c00d..69fcfbddb 100644 --- a/scheduler/generic_sched.go +++ b/scheduler/generic_sched.go @@ -471,7 +471,7 @@ func (s *GenericScheduler) downgradedJobForPlacement(p placementResult) (string, // destructive updates to place and the set of new placements to place. func (s *GenericScheduler) computePlacements(destructive, place []placementResult) error { // Get the base nodes - nodes, byDC, err := readyNodesInDCs(s.state, s.job.Datacenters) + nodes, _, byDC, err := readyNodesInDCs(s.state, s.job.Datacenters) if err != nil { return err } diff --git a/scheduler/scheduler_system.go b/scheduler/scheduler_system.go index 2b0da38bd..5d0278eae 100644 --- a/scheduler/scheduler_system.go +++ b/scheduler/scheduler_system.go @@ -37,8 +37,9 @@ type SystemScheduler struct { ctx *EvalContext stack *SystemStack - nodes []*structs.Node - nodesByDC map[string]int + nodes []*structs.Node + notReadyNodes map[string]struct{} + nodesByDC map[string]int limitReached bool nextEval *structs.Evaluation @@ -122,7 +123,7 @@ func (s *SystemScheduler) process() (bool, error) { // Get the ready nodes in the required datacenters if !s.job.Stopped() { - s.nodes, s.nodesByDC, err = readyNodesInDCs(s.state, s.job.Datacenters) + s.nodes, s.notReadyNodes, s.nodesByDC, err = readyNodesInDCs(s.state, s.job.Datacenters) if err != nil { return false, fmt.Errorf("failed to get ready nodes: %v", err) } @@ -219,7 +220,7 @@ func (s *SystemScheduler) computeJobAllocs() error { live, term := structs.SplitTerminalAllocs(allocs) // Diff the required and existing allocations - diff := diffSystemAllocs(s.job, s.nodes, tainted, live, term) + diff := diffSystemAllocs(s.job, s.nodes, s.notReadyNodes, tainted, live, term) s.logger.Debug("reconciled current state with desired state", "place", len(diff.place), "update", len(diff.update), "migrate", len(diff.migrate), "stop", len(diff.stop), diff --git a/scheduler/scheduler_system_test.go b/scheduler/scheduler_system_test.go index b46bbc9e4..ac9c3725e 100644 --- a/scheduler/scheduler_system_test.go +++ b/scheduler/scheduler_system_test.go @@ -766,6 +766,91 @@ func TestSystemSched_JobModify_InPlace(t *testing.T) { } } +func TestSystemSched_JobModify_RemoveDC(t *testing.T) { + h := NewHarness(t) + + // Create some nodes + node1 := mock.Node() + node1.Datacenter = "dc1" + require.NoError(t, h.State.UpsertNode(structs.MsgTypeTestSetup, h.NextIndex(), node1)) + + node2 := mock.Node() + node2.Datacenter = "dc2" + require.NoError(t, h.State.UpsertNode(structs.MsgTypeTestSetup, h.NextIndex(), node2)) + + fmt.Println("DC1 node: ", node1.ID) + fmt.Println("DC2 node: ", node2.ID) + nodes := []*structs.Node{node1, node2} + + // Generate a fake job with allocations + job := mock.SystemJob() + job.Datacenters = []string{"dc1", "dc2"} + require.NoError(t, h.State.UpsertJob(structs.MsgTypeTestSetup, h.NextIndex(), job)) + + var allocs []*structs.Allocation + for _, node := range nodes { + alloc := mock.Alloc() + alloc.Job = job + alloc.JobID = job.ID + alloc.NodeID = node.ID + alloc.Name = "my-job.web[0]" + allocs = append(allocs, alloc) + } + require.NoError(t, h.State.UpsertAllocs(structs.MsgTypeTestSetup, h.NextIndex(), allocs)) + + // Update the job + job2 := job.Copy() + job2.Datacenters = []string{"dc1"} + require.NoError(t, h.State.UpsertJob(structs.MsgTypeTestSetup, h.NextIndex(), job2)) + + // Create a mock evaluation to deal with update + eval := &structs.Evaluation{ + Namespace: structs.DefaultNamespace, + ID: uuid.Generate(), + Priority: 50, + TriggeredBy: structs.EvalTriggerJobRegister, + JobID: job.ID, + Status: structs.EvalStatusPending, + } + require.NoError(t, h.State.UpsertEvals(structs.MsgTypeTestSetup, h.NextIndex(), []*structs.Evaluation{eval})) + + // Process the evaluation + err := h.Process(NewSystemScheduler, eval) + require.NoError(t, err) + + // Ensure a single plan + require.Len(t, h.Plans, 1) + plan := h.Plans[0] + + // Ensure the plan did not evict any allocs + var update []*structs.Allocation + for _, updateList := range plan.NodeUpdate { + update = append(update, updateList...) + } + require.Len(t, update, 1) + + // Ensure the plan updated the existing allocs + var planned []*structs.Allocation + for _, allocList := range plan.NodeAllocation { + planned = append(planned, allocList...) + } + require.Len(t, planned, 1) + + for _, p := range planned { + require.Equal(t, job2, p.Job, "should update job") + } + + // Lookup the allocations by JobID + ws := memdb.NewWatchSet() + out, err := h.State.AllocsByJob(ws, job.Namespace, job.ID, false) + require.NoError(t, err) + + // Ensure all allocations placed + require.Len(t, out, 2) + h.AssertEvalStatus(t, structs.EvalStatusComplete) + +} + func TestSystemSched_JobDeregister_Purged(t *testing.T) { h := NewHarness(t) diff --git a/scheduler/util.go b/scheduler/util.go index 112993a57..869442b78 100644 --- a/scheduler/util.go +++ b/scheduler/util.go @@ -65,10 +65,11 @@ func diffSystemAllocsForNode( job *structs.Job, // job whose allocs are going to be diff-ed nodeID string, eligibleNodes map[string]*structs.Node, - taintedNodes map[string]*structs.Node, // nodes which are down or in drain (by node name) + notReadyNodes map[string]struct{}, // nodes that are not ready, e.g. draining + taintedNodes map[string]*structs.Node, // nodes which are down (by node id) required map[string]*structs.TaskGroup, // set of allocations that must exist allocs []*structs.Allocation, // non-terminal allocations that exist - terminal structs.TerminalByNodeByName, // latest terminal allocations (by node, name) + terminal structs.TerminalByNodeByName, // latest terminal allocations (by node, id) ) *diffResult { result := new(diffResult) @@ -139,10 +140,21 @@ func diffSystemAllocsForNode( // For an existing allocation, if the nodeID is no longer // eligible, the diff should be ignored - if _, ok := eligibleNodes[nodeID]; !ok { + if _, ok := notReadyNodes[nodeID]; ok { goto IGNORE } + // Existing allocations on nodes that are no longer targeted + // should be stopped + if _, ok := eligibleNodes[nodeID]; !ok { + result.stop = append(result.stop, allocTuple{ + Name: name, + TaskGroup: tg, + Alloc: exist, + }) + continue + } + // If the definition is updated we need to update if job.JobModifyIndex != exist.Job.JobModifyIndex { result.update = append(result.update, allocTuple{ @@ -229,21 +241,21 @@ func diffSystemAllocsForNode( // diffResult contain the specific nodeID they should be allocated on. func diffSystemAllocs( job *structs.Job, // jobs whose allocations are going to be diff-ed - nodes []*structs.Node, // list of nodes in the ready state - taintedNodes map[string]*structs.Node, // nodes which are down or drain mode (by name) + readyNodes []*structs.Node, // list of nodes in the ready state + notReadyNodes map[string]struct{}, // list of nodes in DC but not ready, e.g. draining + taintedNodes map[string]*structs.Node, // nodes which are down or drain mode (by node id) allocs []*structs.Allocation, // non-terminal allocations - terminal structs.TerminalByNodeByName, // latest terminal allocations (by name) + terminal structs.TerminalByNodeByName, // latest terminal allocations (by node id) ) *diffResult { // Build a mapping of nodes to all their allocs. nodeAllocs := make(map[string][]*structs.Allocation, len(allocs)) for _, alloc := range allocs { - nallocs := append(nodeAllocs[alloc.NodeID], alloc) //nolint:gocritic - nodeAllocs[alloc.NodeID] = nallocs + nodeAllocs[alloc.NodeID] = append(nodeAllocs[alloc.NodeID], alloc) } eligibleNodes := make(map[string]*structs.Node) - for _, node := range nodes { + for _, node := range readyNodes { if _, ok := nodeAllocs[node.ID]; !ok { nodeAllocs[node.ID] = nil } @@ -255,7 +267,7 @@ func diffSystemAllocs( result := new(diffResult) for nodeID, allocs := range nodeAllocs { - diff := diffSystemAllocsForNode(job, nodeID, eligibleNodes, taintedNodes, required, allocs, terminal) + diff := diffSystemAllocsForNode(job, nodeID, eligibleNodes, notReadyNodes, taintedNodes, required, allocs, terminal) result.Append(diff) } @@ -264,7 +276,7 @@ func diffSystemAllocs( // readyNodesInDCs returns all the ready nodes in the given datacenters and a // mapping of each data center to the count of ready nodes. -func readyNodesInDCs(state State, dcs []string) ([]*structs.Node, map[string]int, error) { +func readyNodesInDCs(state State, dcs []string) ([]*structs.Node, map[string]struct{}, map[string]int, error) { // Index the DCs dcMap := make(map[string]int, len(dcs)) for _, dc := range dcs { @@ -274,9 +286,10 @@ func readyNodesInDCs(state State, dcs []string) ([]*structs.Node, map[string]int // Scan the nodes ws := memdb.NewWatchSet() var out []*structs.Node + notReady := map[string]struct{}{} iter, err := state.Nodes(ws) if err != nil { - return nil, nil, err + return nil, nil, nil, err } for { raw := iter.Next() @@ -287,6 +300,7 @@ func readyNodesInDCs(state State, dcs []string) ([]*structs.Node, map[string]int // Filter on datacenter and status node := raw.(*structs.Node) if !node.Ready() { + notReady[node.ID] = struct{}{} continue } if _, ok := dcMap[node.Datacenter]; !ok { @@ -295,7 +309,7 @@ func readyNodesInDCs(state State, dcs []string) ([]*structs.Node, map[string]int out = append(out, node) dcMap[node.Datacenter]++ } - return out, dcMap, nil + return out, notReady, dcMap, nil } // retryMax is used to retry a callback until it returns success or diff --git a/scheduler/util_test.go b/scheduler/util_test.go index eba4227e6..7064c50ae 100644 --- a/scheduler/util_test.go +++ b/scheduler/util_test.go @@ -61,7 +61,7 @@ func TestDiffSystemAllocsForNode_Sysbatch_terminal(t *testing.T) { }, } - diff := diffSystemAllocsForNode(job, "node1", eligible, tainted, required, live, terminal) + diff := diffSystemAllocsForNode(job, "node1", eligible, nil, tainted, required, live, terminal) require.Empty(t, diff.place) require.Empty(t, diff.update) require.Empty(t, diff.stop) @@ -87,9 +87,9 @@ func TestDiffSystemAllocsForNode_Sysbatch_terminal(t *testing.T) { expAlloc := terminal["node1"]["my-sysbatch.pinger[0]"] expAlloc.NodeID = "node1" - diff := diffSystemAllocsForNode(job, "node1", eligible, tainted, required, live, terminal) + diff := diffSystemAllocsForNode(job, "node1", eligible, nil, tainted, required, live, terminal) require.Empty(t, diff.place) - require.Equal(t, 1, len(diff.update)) + require.Len(t, diff.update, 1) require.Empty(t, diff.stop) require.Empty(t, diff.migrate) require.Empty(t, diff.lost) @@ -191,31 +191,30 @@ func TestDiffSystemAllocsForNode(t *testing.T) { }, } - diff := diffSystemAllocsForNode(job, "zip", eligible, tainted, required, allocs, terminal) - place := diff.place - update := diff.update - migrate := diff.migrate - stop := diff.stop - ignore := diff.ignore - lost := diff.lost + diff := diffSystemAllocsForNode(job, "zip", eligible, nil, tainted, required, allocs, terminal) // We should update the first alloc - require.True(t, len(update) == 1 && update[0].Alloc == allocs[0]) + require.Len(t, diff.update, 1) + require.Equal(t, allocs[0], diff.update[0].Alloc) // We should ignore the second alloc - require.True(t, len(ignore) == 1 && ignore[0].Alloc == allocs[1]) + require.Len(t, diff.ignore, 1) + require.Equal(t, allocs[1], diff.ignore[0].Alloc) // We should stop the 3rd alloc - require.True(t, len(stop) == 1 && stop[0].Alloc == allocs[2]) + require.Len(t, diff.stop, 1) + require.Equal(t, allocs[2], diff.stop[0].Alloc) // We should migrate the 4rd alloc - require.True(t, len(migrate) == 1 && migrate[0].Alloc == allocs[3]) + require.Len(t, diff.migrate, 1) + require.Equal(t, allocs[3], diff.migrate[0].Alloc) // We should mark the 5th alloc as lost - require.True(t, len(lost) == 1 && lost[0].Alloc == allocs[4]) + require.Len(t, diff.lost, 1) + require.Equal(t, allocs[4], diff.lost[0].Alloc) // We should place 6 - require.Equal(t, 6, len(place)) + require.Len(t, diff.place, 6) // Ensure that the allocations which are replacements of terminal allocs are // annotated. @@ -223,8 +222,7 @@ func TestDiffSystemAllocsForNode(t *testing.T) { for _, alloc := range m { for _, tuple := range diff.place { if alloc.Name == tuple.Name { - require.True(t, reflect.DeepEqual(alloc, tuple.Alloc), - "expected: %#v, actual: %#v", alloc, tuple.Alloc) + require.Equal(t, alloc, tuple.Alloc) } } } @@ -274,20 +272,14 @@ func TestDiffSystemAllocsForNode_ExistingAllocIneligibleNode(t *testing.T) { // No terminal allocs terminal := make(structs.TerminalByNodeByName) - diff := diffSystemAllocsForNode(job, eligibleNode.ID, eligible, tainted, required, allocs, terminal) - place := diff.place - update := diff.update - migrate := diff.migrate - stop := diff.stop - ignore := diff.ignore - lost := diff.lost + diff := diffSystemAllocsForNode(job, eligibleNode.ID, eligible, nil, tainted, required, allocs, terminal) - require.Len(t, place, 0) - require.Len(t, update, 1) - require.Len(t, migrate, 0) - require.Len(t, stop, 0) - require.Len(t, ignore, 1) - require.Len(t, lost, 0) + require.Len(t, diff.place, 0) + require.Len(t, diff.update, 1) + require.Len(t, diff.migrate, 0) + require.Len(t, diff.stop, 0) + require.Len(t, diff.ignore, 1) + require.Len(t, diff.lost, 0) } func TestDiffSystemAllocs(t *testing.T) { @@ -360,31 +352,29 @@ func TestDiffSystemAllocs(t *testing.T) { }, } - diff := diffSystemAllocs(job, nodes, tainted, allocs, terminal) - place := diff.place - update := diff.update - migrate := diff.migrate - stop := diff.stop - ignore := diff.ignore - lost := diff.lost + diff := diffSystemAllocs(job, nodes, nil, tainted, allocs, terminal) // We should update the first alloc - require.True(t, len(update) == 1 && update[0].Alloc == allocs[0]) + require.Len(t, diff.update, 1) + require.Equal(t, allocs[0], diff.update[0].Alloc) // We should ignore the second alloc - require.True(t, len(ignore) == 1 && ignore[0].Alloc == allocs[1]) + require.Len(t, diff.ignore, 1) + require.Equal(t, allocs[1], diff.ignore[0].Alloc) // We should stop the third alloc - require.Empty(t, stop) + require.Empty(t, diff.stop) // There should be no migrates. - require.True(t, len(migrate) == 1 && migrate[0].Alloc == allocs[2]) + require.Len(t, diff.migrate, 1) + require.Equal(t, allocs[2], diff.migrate[0].Alloc) // We should mark the 5th alloc as lost - require.True(t, len(lost) == 1 && lost[0].Alloc == allocs[3]) + require.Len(t, diff.lost, 1) + require.Equal(t, allocs[3], diff.lost[0].Alloc) - // We should place 1 - require.Equal(t, 2, len(place)) + // We should place 2 + require.Len(t, diff.place, 2) // Ensure that the allocations which are replacements of terminal allocs are // annotated. @@ -392,8 +382,7 @@ func TestDiffSystemAllocs(t *testing.T) { for _, alloc := range m { for _, tuple := range diff.place { if alloc.NodeID == tuple.Alloc.NodeID { - require.True(t, reflect.DeepEqual(alloc, tuple.Alloc), - "expected: %#v, actual: %#v", alloc, tuple.Alloc) + require.Equal(t, alloc, tuple.Alloc) } } } @@ -415,15 +404,19 @@ func TestReadyNodesInDCs(t *testing.T) { require.NoError(t, state.UpsertNode(structs.MsgTypeTestSetup, 1002, node3)) require.NoError(t, state.UpsertNode(structs.MsgTypeTestSetup, 1003, node4)) - nodes, dc, err := readyNodesInDCs(state, []string{"dc1", "dc2"}) + nodes, notReady, dc, err := readyNodesInDCs(state, []string{"dc1", "dc2"}) require.NoError(t, err) require.Equal(t, 2, len(nodes)) - require.True(t, nodes[0].ID != node3.ID && nodes[1].ID != node3.ID) + require.NotEqual(t, node3.ID, nodes[0].ID) + require.NotEqual(t, node3.ID, nodes[1].ID) require.Contains(t, dc, "dc1") require.Equal(t, 1, dc["dc1"]) require.Contains(t, dc, "dc2") require.Equal(t, 1, dc["dc2"]) + + require.Contains(t, notReady, node3.ID) + require.Contains(t, notReady, node4.ID) } func TestRetryMax(t *testing.T) {