scheduler: stop allocs in unrelated nodes (#11391)

The system scheduler should leave allocs on draining nodes as-is, but
stop node stop allocs on nodes that are no longer part of the job
datacenters.

Previously, the scheduler did not make the distinction and left system
job allocs intact if they are already running.

I've added a failing test first, which you can see in https://app.circleci.com/jobs/github/hashicorp/nomad/179661 .

Fixes https://github.com/hashicorp/nomad/issues/11373
This commit is contained in:
Mahmood Ali 2021-10-27 07:04:13 -07:00 committed by GitHub
parent f03d65062d
commit e06ff1d613
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
6 changed files with 163 additions and 67 deletions

3
.changelog/11391.txt Normal file
View File

@ -0,0 +1,3 @@
```release-note:bug
core: Fix a bug to stop running system job allocations once their datacenters are removed from the job
```

View File

@ -471,7 +471,7 @@ func (s *GenericScheduler) downgradedJobForPlacement(p placementResult) (string,
// destructive updates to place and the set of new placements to place.
func (s *GenericScheduler) computePlacements(destructive, place []placementResult) error {
// Get the base nodes
nodes, byDC, err := readyNodesInDCs(s.state, s.job.Datacenters)
nodes, _, byDC, err := readyNodesInDCs(s.state, s.job.Datacenters)
if err != nil {
return err
}

View File

@ -38,6 +38,7 @@ type SystemScheduler struct {
stack *SystemStack
nodes []*structs.Node
notReadyNodes map[string]struct{}
nodesByDC map[string]int
limitReached bool
@ -122,7 +123,7 @@ func (s *SystemScheduler) process() (bool, error) {
// Get the ready nodes in the required datacenters
if !s.job.Stopped() {
s.nodes, s.nodesByDC, err = readyNodesInDCs(s.state, s.job.Datacenters)
s.nodes, s.notReadyNodes, s.nodesByDC, err = readyNodesInDCs(s.state, s.job.Datacenters)
if err != nil {
return false, fmt.Errorf("failed to get ready nodes: %v", err)
}
@ -219,7 +220,7 @@ func (s *SystemScheduler) computeJobAllocs() error {
live, term := structs.SplitTerminalAllocs(allocs)
// Diff the required and existing allocations
diff := diffSystemAllocs(s.job, s.nodes, tainted, live, term)
diff := diffSystemAllocs(s.job, s.nodes, s.notReadyNodes, tainted, live, term)
s.logger.Debug("reconciled current state with desired state",
"place", len(diff.place), "update", len(diff.update),
"migrate", len(diff.migrate), "stop", len(diff.stop),

View File

@ -766,6 +766,91 @@ func TestSystemSched_JobModify_InPlace(t *testing.T) {
}
}
func TestSystemSched_JobModify_RemoveDC(t *testing.T) {
h := NewHarness(t)
// Create some nodes
node1 := mock.Node()
node1.Datacenter = "dc1"
require.NoError(t, h.State.UpsertNode(structs.MsgTypeTestSetup, h.NextIndex(), node1))
node2 := mock.Node()
node2.Datacenter = "dc2"
require.NoError(t, h.State.UpsertNode(structs.MsgTypeTestSetup, h.NextIndex(), node2))
fmt.Println("DC1 node: ", node1.ID)
fmt.Println("DC2 node: ", node2.ID)
nodes := []*structs.Node{node1, node2}
// Generate a fake job with allocations
job := mock.SystemJob()
job.Datacenters = []string{"dc1", "dc2"}
require.NoError(t, h.State.UpsertJob(structs.MsgTypeTestSetup, h.NextIndex(), job))
var allocs []*structs.Allocation
for _, node := range nodes {
alloc := mock.Alloc()
alloc.Job = job
alloc.JobID = job.ID
alloc.NodeID = node.ID
alloc.Name = "my-job.web[0]"
allocs = append(allocs, alloc)
}
require.NoError(t, h.State.UpsertAllocs(structs.MsgTypeTestSetup, h.NextIndex(), allocs))
// Update the job
job2 := job.Copy()
job2.Datacenters = []string{"dc1"}
require.NoError(t, h.State.UpsertJob(structs.MsgTypeTestSetup, h.NextIndex(), job2))
// Create a mock evaluation to deal with update
eval := &structs.Evaluation{
Namespace: structs.DefaultNamespace,
ID: uuid.Generate(),
Priority: 50,
TriggeredBy: structs.EvalTriggerJobRegister,
JobID: job.ID,
Status: structs.EvalStatusPending,
}
require.NoError(t, h.State.UpsertEvals(structs.MsgTypeTestSetup, h.NextIndex(), []*structs.Evaluation{eval}))
// Process the evaluation
err := h.Process(NewSystemScheduler, eval)
require.NoError(t, err)
// Ensure a single plan
require.Len(t, h.Plans, 1)
plan := h.Plans[0]
// Ensure the plan did not evict any allocs
var update []*structs.Allocation
for _, updateList := range plan.NodeUpdate {
update = append(update, updateList...)
}
require.Len(t, update, 1)
// Ensure the plan updated the existing allocs
var planned []*structs.Allocation
for _, allocList := range plan.NodeAllocation {
planned = append(planned, allocList...)
}
require.Len(t, planned, 1)
for _, p := range planned {
require.Equal(t, job2, p.Job, "should update job")
}
// Lookup the allocations by JobID
ws := memdb.NewWatchSet()
out, err := h.State.AllocsByJob(ws, job.Namespace, job.ID, false)
require.NoError(t, err)
// Ensure all allocations placed
require.Len(t, out, 2)
h.AssertEvalStatus(t, structs.EvalStatusComplete)
}
func TestSystemSched_JobDeregister_Purged(t *testing.T) {
h := NewHarness(t)

View File

@ -65,10 +65,11 @@ func diffSystemAllocsForNode(
job *structs.Job, // job whose allocs are going to be diff-ed
nodeID string,
eligibleNodes map[string]*structs.Node,
taintedNodes map[string]*structs.Node, // nodes which are down or in drain (by node name)
notReadyNodes map[string]struct{}, // nodes that are not ready, e.g. draining
taintedNodes map[string]*structs.Node, // nodes which are down (by node id)
required map[string]*structs.TaskGroup, // set of allocations that must exist
allocs []*structs.Allocation, // non-terminal allocations that exist
terminal structs.TerminalByNodeByName, // latest terminal allocations (by node, name)
terminal structs.TerminalByNodeByName, // latest terminal allocations (by node, id)
) *diffResult {
result := new(diffResult)
@ -139,10 +140,21 @@ func diffSystemAllocsForNode(
// For an existing allocation, if the nodeID is no longer
// eligible, the diff should be ignored
if _, ok := eligibleNodes[nodeID]; !ok {
if _, ok := notReadyNodes[nodeID]; ok {
goto IGNORE
}
// Existing allocations on nodes that are no longer targeted
// should be stopped
if _, ok := eligibleNodes[nodeID]; !ok {
result.stop = append(result.stop, allocTuple{
Name: name,
TaskGroup: tg,
Alloc: exist,
})
continue
}
// If the definition is updated we need to update
if job.JobModifyIndex != exist.Job.JobModifyIndex {
result.update = append(result.update, allocTuple{
@ -229,21 +241,21 @@ func diffSystemAllocsForNode(
// diffResult contain the specific nodeID they should be allocated on.
func diffSystemAllocs(
job *structs.Job, // jobs whose allocations are going to be diff-ed
nodes []*structs.Node, // list of nodes in the ready state
taintedNodes map[string]*structs.Node, // nodes which are down or drain mode (by name)
readyNodes []*structs.Node, // list of nodes in the ready state
notReadyNodes map[string]struct{}, // list of nodes in DC but not ready, e.g. draining
taintedNodes map[string]*structs.Node, // nodes which are down or drain mode (by node id)
allocs []*structs.Allocation, // non-terminal allocations
terminal structs.TerminalByNodeByName, // latest terminal allocations (by name)
terminal structs.TerminalByNodeByName, // latest terminal allocations (by node id)
) *diffResult {
// Build a mapping of nodes to all their allocs.
nodeAllocs := make(map[string][]*structs.Allocation, len(allocs))
for _, alloc := range allocs {
nallocs := append(nodeAllocs[alloc.NodeID], alloc) //nolint:gocritic
nodeAllocs[alloc.NodeID] = nallocs
nodeAllocs[alloc.NodeID] = append(nodeAllocs[alloc.NodeID], alloc)
}
eligibleNodes := make(map[string]*structs.Node)
for _, node := range nodes {
for _, node := range readyNodes {
if _, ok := nodeAllocs[node.ID]; !ok {
nodeAllocs[node.ID] = nil
}
@ -255,7 +267,7 @@ func diffSystemAllocs(
result := new(diffResult)
for nodeID, allocs := range nodeAllocs {
diff := diffSystemAllocsForNode(job, nodeID, eligibleNodes, taintedNodes, required, allocs, terminal)
diff := diffSystemAllocsForNode(job, nodeID, eligibleNodes, notReadyNodes, taintedNodes, required, allocs, terminal)
result.Append(diff)
}
@ -264,7 +276,7 @@ func diffSystemAllocs(
// readyNodesInDCs returns all the ready nodes in the given datacenters and a
// mapping of each data center to the count of ready nodes.
func readyNodesInDCs(state State, dcs []string) ([]*structs.Node, map[string]int, error) {
func readyNodesInDCs(state State, dcs []string) ([]*structs.Node, map[string]struct{}, map[string]int, error) {
// Index the DCs
dcMap := make(map[string]int, len(dcs))
for _, dc := range dcs {
@ -274,9 +286,10 @@ func readyNodesInDCs(state State, dcs []string) ([]*structs.Node, map[string]int
// Scan the nodes
ws := memdb.NewWatchSet()
var out []*structs.Node
notReady := map[string]struct{}{}
iter, err := state.Nodes(ws)
if err != nil {
return nil, nil, err
return nil, nil, nil, err
}
for {
raw := iter.Next()
@ -287,6 +300,7 @@ func readyNodesInDCs(state State, dcs []string) ([]*structs.Node, map[string]int
// Filter on datacenter and status
node := raw.(*structs.Node)
if !node.Ready() {
notReady[node.ID] = struct{}{}
continue
}
if _, ok := dcMap[node.Datacenter]; !ok {
@ -295,7 +309,7 @@ func readyNodesInDCs(state State, dcs []string) ([]*structs.Node, map[string]int
out = append(out, node)
dcMap[node.Datacenter]++
}
return out, dcMap, nil
return out, notReady, dcMap, nil
}
// retryMax is used to retry a callback until it returns success or

View File

@ -61,7 +61,7 @@ func TestDiffSystemAllocsForNode_Sysbatch_terminal(t *testing.T) {
},
}
diff := diffSystemAllocsForNode(job, "node1", eligible, tainted, required, live, terminal)
diff := diffSystemAllocsForNode(job, "node1", eligible, nil, tainted, required, live, terminal)
require.Empty(t, diff.place)
require.Empty(t, diff.update)
require.Empty(t, diff.stop)
@ -87,9 +87,9 @@ func TestDiffSystemAllocsForNode_Sysbatch_terminal(t *testing.T) {
expAlloc := terminal["node1"]["my-sysbatch.pinger[0]"]
expAlloc.NodeID = "node1"
diff := diffSystemAllocsForNode(job, "node1", eligible, tainted, required, live, terminal)
diff := diffSystemAllocsForNode(job, "node1", eligible, nil, tainted, required, live, terminal)
require.Empty(t, diff.place)
require.Equal(t, 1, len(diff.update))
require.Len(t, diff.update, 1)
require.Empty(t, diff.stop)
require.Empty(t, diff.migrate)
require.Empty(t, diff.lost)
@ -191,31 +191,30 @@ func TestDiffSystemAllocsForNode(t *testing.T) {
},
}
diff := diffSystemAllocsForNode(job, "zip", eligible, tainted, required, allocs, terminal)
place := diff.place
update := diff.update
migrate := diff.migrate
stop := diff.stop
ignore := diff.ignore
lost := diff.lost
diff := diffSystemAllocsForNode(job, "zip", eligible, nil, tainted, required, allocs, terminal)
// We should update the first alloc
require.True(t, len(update) == 1 && update[0].Alloc == allocs[0])
require.Len(t, diff.update, 1)
require.Equal(t, allocs[0], diff.update[0].Alloc)
// We should ignore the second alloc
require.True(t, len(ignore) == 1 && ignore[0].Alloc == allocs[1])
require.Len(t, diff.ignore, 1)
require.Equal(t, allocs[1], diff.ignore[0].Alloc)
// We should stop the 3rd alloc
require.True(t, len(stop) == 1 && stop[0].Alloc == allocs[2])
require.Len(t, diff.stop, 1)
require.Equal(t, allocs[2], diff.stop[0].Alloc)
// We should migrate the 4rd alloc
require.True(t, len(migrate) == 1 && migrate[0].Alloc == allocs[3])
require.Len(t, diff.migrate, 1)
require.Equal(t, allocs[3], diff.migrate[0].Alloc)
// We should mark the 5th alloc as lost
require.True(t, len(lost) == 1 && lost[0].Alloc == allocs[4])
require.Len(t, diff.lost, 1)
require.Equal(t, allocs[4], diff.lost[0].Alloc)
// We should place 6
require.Equal(t, 6, len(place))
require.Len(t, diff.place, 6)
// Ensure that the allocations which are replacements of terminal allocs are
// annotated.
@ -223,8 +222,7 @@ func TestDiffSystemAllocsForNode(t *testing.T) {
for _, alloc := range m {
for _, tuple := range diff.place {
if alloc.Name == tuple.Name {
require.True(t, reflect.DeepEqual(alloc, tuple.Alloc),
"expected: %#v, actual: %#v", alloc, tuple.Alloc)
require.Equal(t, alloc, tuple.Alloc)
}
}
}
@ -274,20 +272,14 @@ func TestDiffSystemAllocsForNode_ExistingAllocIneligibleNode(t *testing.T) {
// No terminal allocs
terminal := make(structs.TerminalByNodeByName)
diff := diffSystemAllocsForNode(job, eligibleNode.ID, eligible, tainted, required, allocs, terminal)
place := diff.place
update := diff.update
migrate := diff.migrate
stop := diff.stop
ignore := diff.ignore
lost := diff.lost
diff := diffSystemAllocsForNode(job, eligibleNode.ID, eligible, nil, tainted, required, allocs, terminal)
require.Len(t, place, 0)
require.Len(t, update, 1)
require.Len(t, migrate, 0)
require.Len(t, stop, 0)
require.Len(t, ignore, 1)
require.Len(t, lost, 0)
require.Len(t, diff.place, 0)
require.Len(t, diff.update, 1)
require.Len(t, diff.migrate, 0)
require.Len(t, diff.stop, 0)
require.Len(t, diff.ignore, 1)
require.Len(t, diff.lost, 0)
}
func TestDiffSystemAllocs(t *testing.T) {
@ -360,31 +352,29 @@ func TestDiffSystemAllocs(t *testing.T) {
},
}
diff := diffSystemAllocs(job, nodes, tainted, allocs, terminal)
place := diff.place
update := diff.update
migrate := diff.migrate
stop := diff.stop
ignore := diff.ignore
lost := diff.lost
diff := diffSystemAllocs(job, nodes, nil, tainted, allocs, terminal)
// We should update the first alloc
require.True(t, len(update) == 1 && update[0].Alloc == allocs[0])
require.Len(t, diff.update, 1)
require.Equal(t, allocs[0], diff.update[0].Alloc)
// We should ignore the second alloc
require.True(t, len(ignore) == 1 && ignore[0].Alloc == allocs[1])
require.Len(t, diff.ignore, 1)
require.Equal(t, allocs[1], diff.ignore[0].Alloc)
// We should stop the third alloc
require.Empty(t, stop)
require.Empty(t, diff.stop)
// There should be no migrates.
require.True(t, len(migrate) == 1 && migrate[0].Alloc == allocs[2])
require.Len(t, diff.migrate, 1)
require.Equal(t, allocs[2], diff.migrate[0].Alloc)
// We should mark the 5th alloc as lost
require.True(t, len(lost) == 1 && lost[0].Alloc == allocs[3])
require.Len(t, diff.lost, 1)
require.Equal(t, allocs[3], diff.lost[0].Alloc)
// We should place 1
require.Equal(t, 2, len(place))
// We should place 2
require.Len(t, diff.place, 2)
// Ensure that the allocations which are replacements of terminal allocs are
// annotated.
@ -392,8 +382,7 @@ func TestDiffSystemAllocs(t *testing.T) {
for _, alloc := range m {
for _, tuple := range diff.place {
if alloc.NodeID == tuple.Alloc.NodeID {
require.True(t, reflect.DeepEqual(alloc, tuple.Alloc),
"expected: %#v, actual: %#v", alloc, tuple.Alloc)
require.Equal(t, alloc, tuple.Alloc)
}
}
}
@ -415,15 +404,19 @@ func TestReadyNodesInDCs(t *testing.T) {
require.NoError(t, state.UpsertNode(structs.MsgTypeTestSetup, 1002, node3))
require.NoError(t, state.UpsertNode(structs.MsgTypeTestSetup, 1003, node4))
nodes, dc, err := readyNodesInDCs(state, []string{"dc1", "dc2"})
nodes, notReady, dc, err := readyNodesInDCs(state, []string{"dc1", "dc2"})
require.NoError(t, err)
require.Equal(t, 2, len(nodes))
require.True(t, nodes[0].ID != node3.ID && nodes[1].ID != node3.ID)
require.NotEqual(t, node3.ID, nodes[0].ID)
require.NotEqual(t, node3.ID, nodes[1].ID)
require.Contains(t, dc, "dc1")
require.Equal(t, 1, dc["dc1"])
require.Contains(t, dc, "dc2")
require.Equal(t, 1, dc["dc2"])
require.Contains(t, notReady, node3.ID)
require.Contains(t, notReady, node4.ID)
}
func TestRetryMax(t *testing.T) {