diff --git a/.changelog/11391.txt b/.changelog/11391.txt new file mode 100644 index 000000000..6afb8a976 --- /dev/null +++ b/.changelog/11391.txt @@ -0,0 +1,3 @@ +```release-note:bug +core: Fix a bug to stop running system job allocations once their datacenters are removed from the job +``` diff --git a/scheduler/generic_sched.go b/scheduler/generic_sched.go index e5273c00d..69fcfbddb 100644 --- a/scheduler/generic_sched.go +++ b/scheduler/generic_sched.go @@ -471,7 +471,7 @@ func (s *GenericScheduler) downgradedJobForPlacement(p placementResult) (string, // destructive updates to place and the set of new placements to place. func (s *GenericScheduler) computePlacements(destructive, place []placementResult) error { // Get the base nodes - nodes, byDC, err := readyNodesInDCs(s.state, s.job.Datacenters) + nodes, _, byDC, err := readyNodesInDCs(s.state, s.job.Datacenters) if err != nil { return err } diff --git a/scheduler/scheduler_system.go b/scheduler/scheduler_system.go index 2b0da38bd..5d0278eae 100644 --- a/scheduler/scheduler_system.go +++ b/scheduler/scheduler_system.go @@ -37,8 +37,9 @@ type SystemScheduler struct { ctx *EvalContext stack *SystemStack - nodes []*structs.Node - nodesByDC map[string]int + nodes []*structs.Node + notReadyNodes map[string]struct{} + nodesByDC map[string]int limitReached bool nextEval *structs.Evaluation @@ -122,7 +123,7 @@ func (s *SystemScheduler) process() (bool, error) { // Get the ready nodes in the required datacenters if !s.job.Stopped() { - s.nodes, s.nodesByDC, err = readyNodesInDCs(s.state, s.job.Datacenters) + s.nodes, s.notReadyNodes, s.nodesByDC, err = readyNodesInDCs(s.state, s.job.Datacenters) if err != nil { return false, fmt.Errorf("failed to get ready nodes: %v", err) } @@ -219,7 +220,7 @@ func (s *SystemScheduler) computeJobAllocs() error { live, term := structs.SplitTerminalAllocs(allocs) // Diff the required and existing allocations - diff := diffSystemAllocs(s.job, s.nodes, tainted, live, term) + diff := diffSystemAllocs(s.job, s.nodes, s.notReadyNodes, tainted, live, term) s.logger.Debug("reconciled current state with desired state", "place", len(diff.place), "update", len(diff.update), "migrate", len(diff.migrate), "stop", len(diff.stop), diff --git a/scheduler/scheduler_system_test.go b/scheduler/scheduler_system_test.go index b46bbc9e4..ac9c3725e 100644 --- a/scheduler/scheduler_system_test.go +++ b/scheduler/scheduler_system_test.go @@ -766,6 +766,91 @@ func TestSystemSched_JobModify_InPlace(t *testing.T) { } } +func TestSystemSched_JobModify_RemoveDC(t *testing.T) { + h := NewHarness(t) + + // Create some nodes + node1 := mock.Node() + node1.Datacenter = "dc1" + require.NoError(t, h.State.UpsertNode(structs.MsgTypeTestSetup, h.NextIndex(), node1)) + + node2 := mock.Node() + node2.Datacenter = "dc2" + require.NoError(t, h.State.UpsertNode(structs.MsgTypeTestSetup, h.NextIndex(), node2)) + + fmt.Println("DC1 node: ", node1.ID) + fmt.Println("DC2 node: ", node2.ID) + nodes := []*structs.Node{node1, node2} + + // Generate a fake job with allocations + job := mock.SystemJob() + job.Datacenters = []string{"dc1", "dc2"} + require.NoError(t, h.State.UpsertJob(structs.MsgTypeTestSetup, h.NextIndex(), job)) + + var allocs []*structs.Allocation + for _, node := range nodes { + alloc := mock.Alloc() + alloc.Job = job + alloc.JobID = job.ID + alloc.NodeID = node.ID + alloc.Name = "my-job.web[0]" + allocs = append(allocs, alloc) + } + require.NoError(t, h.State.UpsertAllocs(structs.MsgTypeTestSetup, h.NextIndex(), allocs)) + + // Update the job + job2 := job.Copy() + job2.Datacenters = []string{"dc1"} + require.NoError(t, h.State.UpsertJob(structs.MsgTypeTestSetup, h.NextIndex(), job2)) + + // Create a mock evaluation to deal with update + eval := &structs.Evaluation{ + Namespace: structs.DefaultNamespace, + ID: uuid.Generate(), + Priority: 50, + TriggeredBy: structs.EvalTriggerJobRegister, + JobID: job.ID, + Status: structs.EvalStatusPending, + } + require.NoError(t, h.State.UpsertEvals(structs.MsgTypeTestSetup, h.NextIndex(), []*structs.Evaluation{eval})) + + // Process the evaluation + err := h.Process(NewSystemScheduler, eval) + require.NoError(t, err) + + // Ensure a single plan + require.Len(t, h.Plans, 1) + plan := h.Plans[0] + + // Ensure the plan did not evict any allocs + var update []*structs.Allocation + for _, updateList := range plan.NodeUpdate { + update = append(update, updateList...) + } + require.Len(t, update, 1) + + // Ensure the plan updated the existing allocs + var planned []*structs.Allocation + for _, allocList := range plan.NodeAllocation { + planned = append(planned, allocList...) + } + require.Len(t, planned, 1) + + for _, p := range planned { + require.Equal(t, job2, p.Job, "should update job") + } + + // Lookup the allocations by JobID + ws := memdb.NewWatchSet() + out, err := h.State.AllocsByJob(ws, job.Namespace, job.ID, false) + require.NoError(t, err) + + // Ensure all allocations placed + require.Len(t, out, 2) + h.AssertEvalStatus(t, structs.EvalStatusComplete) + +} + func TestSystemSched_JobDeregister_Purged(t *testing.T) { h := NewHarness(t) diff --git a/scheduler/util.go b/scheduler/util.go index 112993a57..869442b78 100644 --- a/scheduler/util.go +++ b/scheduler/util.go @@ -65,10 +65,11 @@ func diffSystemAllocsForNode( job *structs.Job, // job whose allocs are going to be diff-ed nodeID string, eligibleNodes map[string]*structs.Node, - taintedNodes map[string]*structs.Node, // nodes which are down or in drain (by node name) + notReadyNodes map[string]struct{}, // nodes that are not ready, e.g. draining + taintedNodes map[string]*structs.Node, // nodes which are down (by node id) required map[string]*structs.TaskGroup, // set of allocations that must exist allocs []*structs.Allocation, // non-terminal allocations that exist - terminal structs.TerminalByNodeByName, // latest terminal allocations (by node, name) + terminal structs.TerminalByNodeByName, // latest terminal allocations (by node, id) ) *diffResult { result := new(diffResult) @@ -139,10 +140,21 @@ func diffSystemAllocsForNode( // For an existing allocation, if the nodeID is no longer // eligible, the diff should be ignored - if _, ok := eligibleNodes[nodeID]; !ok { + if _, ok := notReadyNodes[nodeID]; ok { goto IGNORE } + // Existing allocations on nodes that are no longer targeted + // should be stopped + if _, ok := eligibleNodes[nodeID]; !ok { + result.stop = append(result.stop, allocTuple{ + Name: name, + TaskGroup: tg, + Alloc: exist, + }) + continue + } + // If the definition is updated we need to update if job.JobModifyIndex != exist.Job.JobModifyIndex { result.update = append(result.update, allocTuple{ @@ -229,21 +241,21 @@ func diffSystemAllocsForNode( // diffResult contain the specific nodeID they should be allocated on. func diffSystemAllocs( job *structs.Job, // jobs whose allocations are going to be diff-ed - nodes []*structs.Node, // list of nodes in the ready state - taintedNodes map[string]*structs.Node, // nodes which are down or drain mode (by name) + readyNodes []*structs.Node, // list of nodes in the ready state + notReadyNodes map[string]struct{}, // list of nodes in DC but not ready, e.g. draining + taintedNodes map[string]*structs.Node, // nodes which are down or drain mode (by node id) allocs []*structs.Allocation, // non-terminal allocations - terminal structs.TerminalByNodeByName, // latest terminal allocations (by name) + terminal structs.TerminalByNodeByName, // latest terminal allocations (by node id) ) *diffResult { // Build a mapping of nodes to all their allocs. nodeAllocs := make(map[string][]*structs.Allocation, len(allocs)) for _, alloc := range allocs { - nallocs := append(nodeAllocs[alloc.NodeID], alloc) //nolint:gocritic - nodeAllocs[alloc.NodeID] = nallocs + nodeAllocs[alloc.NodeID] = append(nodeAllocs[alloc.NodeID], alloc) } eligibleNodes := make(map[string]*structs.Node) - for _, node := range nodes { + for _, node := range readyNodes { if _, ok := nodeAllocs[node.ID]; !ok { nodeAllocs[node.ID] = nil } @@ -255,7 +267,7 @@ func diffSystemAllocs( result := new(diffResult) for nodeID, allocs := range nodeAllocs { - diff := diffSystemAllocsForNode(job, nodeID, eligibleNodes, taintedNodes, required, allocs, terminal) + diff := diffSystemAllocsForNode(job, nodeID, eligibleNodes, notReadyNodes, taintedNodes, required, allocs, terminal) result.Append(diff) } @@ -264,7 +276,7 @@ func diffSystemAllocs( // readyNodesInDCs returns all the ready nodes in the given datacenters and a // mapping of each data center to the count of ready nodes. -func readyNodesInDCs(state State, dcs []string) ([]*structs.Node, map[string]int, error) { +func readyNodesInDCs(state State, dcs []string) ([]*structs.Node, map[string]struct{}, map[string]int, error) { // Index the DCs dcMap := make(map[string]int, len(dcs)) for _, dc := range dcs { @@ -274,9 +286,10 @@ func readyNodesInDCs(state State, dcs []string) ([]*structs.Node, map[string]int // Scan the nodes ws := memdb.NewWatchSet() var out []*structs.Node + notReady := map[string]struct{}{} iter, err := state.Nodes(ws) if err != nil { - return nil, nil, err + return nil, nil, nil, err } for { raw := iter.Next() @@ -287,6 +300,7 @@ func readyNodesInDCs(state State, dcs []string) ([]*structs.Node, map[string]int // Filter on datacenter and status node := raw.(*structs.Node) if !node.Ready() { + notReady[node.ID] = struct{}{} continue } if _, ok := dcMap[node.Datacenter]; !ok { @@ -295,7 +309,7 @@ func readyNodesInDCs(state State, dcs []string) ([]*structs.Node, map[string]int out = append(out, node) dcMap[node.Datacenter]++ } - return out, dcMap, nil + return out, notReady, dcMap, nil } // retryMax is used to retry a callback until it returns success or diff --git a/scheduler/util_test.go b/scheduler/util_test.go index eba4227e6..7064c50ae 100644 --- a/scheduler/util_test.go +++ b/scheduler/util_test.go @@ -61,7 +61,7 @@ func TestDiffSystemAllocsForNode_Sysbatch_terminal(t *testing.T) { }, } - diff := diffSystemAllocsForNode(job, "node1", eligible, tainted, required, live, terminal) + diff := diffSystemAllocsForNode(job, "node1", eligible, nil, tainted, required, live, terminal) require.Empty(t, diff.place) require.Empty(t, diff.update) require.Empty(t, diff.stop) @@ -87,9 +87,9 @@ func TestDiffSystemAllocsForNode_Sysbatch_terminal(t *testing.T) { expAlloc := terminal["node1"]["my-sysbatch.pinger[0]"] expAlloc.NodeID = "node1" - diff := diffSystemAllocsForNode(job, "node1", eligible, tainted, required, live, terminal) + diff := diffSystemAllocsForNode(job, "node1", eligible, nil, tainted, required, live, terminal) require.Empty(t, diff.place) - require.Equal(t, 1, len(diff.update)) + require.Len(t, diff.update, 1) require.Empty(t, diff.stop) require.Empty(t, diff.migrate) require.Empty(t, diff.lost) @@ -191,31 +191,30 @@ func TestDiffSystemAllocsForNode(t *testing.T) { }, } - diff := diffSystemAllocsForNode(job, "zip", eligible, tainted, required, allocs, terminal) - place := diff.place - update := diff.update - migrate := diff.migrate - stop := diff.stop - ignore := diff.ignore - lost := diff.lost + diff := diffSystemAllocsForNode(job, "zip", eligible, nil, tainted, required, allocs, terminal) // We should update the first alloc - require.True(t, len(update) == 1 && update[0].Alloc == allocs[0]) + require.Len(t, diff.update, 1) + require.Equal(t, allocs[0], diff.update[0].Alloc) // We should ignore the second alloc - require.True(t, len(ignore) == 1 && ignore[0].Alloc == allocs[1]) + require.Len(t, diff.ignore, 1) + require.Equal(t, allocs[1], diff.ignore[0].Alloc) // We should stop the 3rd alloc - require.True(t, len(stop) == 1 && stop[0].Alloc == allocs[2]) + require.Len(t, diff.stop, 1) + require.Equal(t, allocs[2], diff.stop[0].Alloc) // We should migrate the 4rd alloc - require.True(t, len(migrate) == 1 && migrate[0].Alloc == allocs[3]) + require.Len(t, diff.migrate, 1) + require.Equal(t, allocs[3], diff.migrate[0].Alloc) // We should mark the 5th alloc as lost - require.True(t, len(lost) == 1 && lost[0].Alloc == allocs[4]) + require.Len(t, diff.lost, 1) + require.Equal(t, allocs[4], diff.lost[0].Alloc) // We should place 6 - require.Equal(t, 6, len(place)) + require.Len(t, diff.place, 6) // Ensure that the allocations which are replacements of terminal allocs are // annotated. @@ -223,8 +222,7 @@ func TestDiffSystemAllocsForNode(t *testing.T) { for _, alloc := range m { for _, tuple := range diff.place { if alloc.Name == tuple.Name { - require.True(t, reflect.DeepEqual(alloc, tuple.Alloc), - "expected: %#v, actual: %#v", alloc, tuple.Alloc) + require.Equal(t, alloc, tuple.Alloc) } } } @@ -274,20 +272,14 @@ func TestDiffSystemAllocsForNode_ExistingAllocIneligibleNode(t *testing.T) { // No terminal allocs terminal := make(structs.TerminalByNodeByName) - diff := diffSystemAllocsForNode(job, eligibleNode.ID, eligible, tainted, required, allocs, terminal) - place := diff.place - update := diff.update - migrate := diff.migrate - stop := diff.stop - ignore := diff.ignore - lost := diff.lost + diff := diffSystemAllocsForNode(job, eligibleNode.ID, eligible, nil, tainted, required, allocs, terminal) - require.Len(t, place, 0) - require.Len(t, update, 1) - require.Len(t, migrate, 0) - require.Len(t, stop, 0) - require.Len(t, ignore, 1) - require.Len(t, lost, 0) + require.Len(t, diff.place, 0) + require.Len(t, diff.update, 1) + require.Len(t, diff.migrate, 0) + require.Len(t, diff.stop, 0) + require.Len(t, diff.ignore, 1) + require.Len(t, diff.lost, 0) } func TestDiffSystemAllocs(t *testing.T) { @@ -360,31 +352,29 @@ func TestDiffSystemAllocs(t *testing.T) { }, } - diff := diffSystemAllocs(job, nodes, tainted, allocs, terminal) - place := diff.place - update := diff.update - migrate := diff.migrate - stop := diff.stop - ignore := diff.ignore - lost := diff.lost + diff := diffSystemAllocs(job, nodes, nil, tainted, allocs, terminal) // We should update the first alloc - require.True(t, len(update) == 1 && update[0].Alloc == allocs[0]) + require.Len(t, diff.update, 1) + require.Equal(t, allocs[0], diff.update[0].Alloc) // We should ignore the second alloc - require.True(t, len(ignore) == 1 && ignore[0].Alloc == allocs[1]) + require.Len(t, diff.ignore, 1) + require.Equal(t, allocs[1], diff.ignore[0].Alloc) // We should stop the third alloc - require.Empty(t, stop) + require.Empty(t, diff.stop) // There should be no migrates. - require.True(t, len(migrate) == 1 && migrate[0].Alloc == allocs[2]) + require.Len(t, diff.migrate, 1) + require.Equal(t, allocs[2], diff.migrate[0].Alloc) // We should mark the 5th alloc as lost - require.True(t, len(lost) == 1 && lost[0].Alloc == allocs[3]) + require.Len(t, diff.lost, 1) + require.Equal(t, allocs[3], diff.lost[0].Alloc) - // We should place 1 - require.Equal(t, 2, len(place)) + // We should place 2 + require.Len(t, diff.place, 2) // Ensure that the allocations which are replacements of terminal allocs are // annotated. @@ -392,8 +382,7 @@ func TestDiffSystemAllocs(t *testing.T) { for _, alloc := range m { for _, tuple := range diff.place { if alloc.NodeID == tuple.Alloc.NodeID { - require.True(t, reflect.DeepEqual(alloc, tuple.Alloc), - "expected: %#v, actual: %#v", alloc, tuple.Alloc) + require.Equal(t, alloc, tuple.Alloc) } } } @@ -415,15 +404,19 @@ func TestReadyNodesInDCs(t *testing.T) { require.NoError(t, state.UpsertNode(structs.MsgTypeTestSetup, 1002, node3)) require.NoError(t, state.UpsertNode(structs.MsgTypeTestSetup, 1003, node4)) - nodes, dc, err := readyNodesInDCs(state, []string{"dc1", "dc2"}) + nodes, notReady, dc, err := readyNodesInDCs(state, []string{"dc1", "dc2"}) require.NoError(t, err) require.Equal(t, 2, len(nodes)) - require.True(t, nodes[0].ID != node3.ID && nodes[1].ID != node3.ID) + require.NotEqual(t, node3.ID, nodes[0].ID) + require.NotEqual(t, node3.ID, nodes[1].ID) require.Contains(t, dc, "dc1") require.Equal(t, 1, dc["dc1"]) require.Contains(t, dc, "dc2") require.Equal(t, 1, dc["dc2"]) + + require.Contains(t, notReady, node3.ID) + require.Contains(t, notReady, node4.ID) } func TestRetryMax(t *testing.T) {