From fbaf4c8b69d8cb3edacb302c441fb38b3b795666 Mon Sep 17 00:00:00 2001 From: Tim Gross Date: Wed, 7 Jun 2023 10:39:03 -0400 Subject: [PATCH] node pools: implement support in scheduler (#17443) Implement scheduler support for node pool: * When a scheduler is invoked, we get a set of the ready nodes in the DCs that are allowed for that job. Extend the filter to include the node pool. * Ensure that changes to a job's node pool are picked up as destructive allocation updates. * Add `NodesInPool` as a metric to all reporting done by the scheduler. * Add the node-in-pool the filter to the `Node.Register` RPC so that we don't generate spurious evals for nodes in the wrong pool. --- api/allocations.go | 1 + command/monitor_test.go | 1 + nomad/node_endpoint.go | 11 ++- nomad/structs/structs.go | 9 +++ scheduler/generic_sched.go | 3 +- scheduler/generic_sched_test.go | 5 ++ scheduler/scheduler.go | 3 + scheduler/scheduler_sysbatch_test.go | 3 + scheduler/scheduler_system.go | 5 +- scheduler/scheduler_system_test.go | 5 ++ scheduler/util.go | 23 +++++- scheduler/util_test.go | 98 ++++++++++++++++++++++-- website/content/api-docs/allocations.mdx | 1 + website/content/api-docs/evaluations.mdx | 1 + website/content/api-docs/jobs.mdx | 1 + website/content/api-docs/nodes.mdx | 1 + 16 files changed, 153 insertions(+), 18 deletions(-) diff --git a/api/allocations.go b/api/allocations.go index 87f7d9b11..1cf66aba0 100644 --- a/api/allocations.go +++ b/api/allocations.go @@ -283,6 +283,7 @@ type Allocation struct { type AllocationMetric struct { NodesEvaluated int NodesFiltered int + NodesInPool int NodesAvailable map[string]int ClassFiltered map[string]int ConstraintFiltered map[string]int diff --git a/command/monitor_test.go b/command/monitor_test.go index a9b7bd34f..4d416110e 100644 --- a/command/monitor_test.go +++ b/command/monitor_test.go @@ -235,6 +235,7 @@ func TestMonitor_formatAllocMetric(t *testing.T) { Name: "display all possible scores", Metrics: &api.AllocationMetric{ NodesEvaluated: 3, + NodesInPool: 3, ScoreMetaData: []*api.NodeScoreMeta{ { NodeID: "node-1", diff --git a/nomad/node_endpoint.go b/nomad/node_endpoint.go index 4c0de6503..29cd571f8 100644 --- a/nomad/node_endpoint.go +++ b/nomad/node_endpoint.go @@ -1643,12 +1643,11 @@ func (n *Node) createNodeEvals(node *structs.Node, nodeIndex uint64) ([]string, var sysJobs []*structs.Job for jobI := sysJobsIter.Next(); jobI != nil; jobI = sysJobsIter.Next() { job := jobI.(*structs.Job) - // Avoid creating evals for jobs that don't run in this - // datacenter. We could perform an entire feasibility check - // here, but datacenter is a good optimization to start with as - // datacenter cardinality tends to be low so the check - // shouldn't add much work. - if node.IsInAnyDC(job.Datacenters) { + // Avoid creating evals for jobs that don't run in this datacenter or + // node pool. We could perform an entire feasibility check here, but + // datacenter/pool is a good optimization to start with as their + // cardinality tends to be low so the check shouldn't add much work. + if node.IsInPool(job.NodePool) && node.IsInAnyDC(job.Datacenters) { sysJobs = append(sysJobs, job) } } diff --git a/nomad/structs/structs.go b/nomad/structs/structs.go index 33812e35c..f2bb1c648 100644 --- a/nomad/structs/structs.go +++ b/nomad/structs/structs.go @@ -2357,6 +2357,12 @@ func (n *Node) IsInAnyDC(datacenters []string) bool { return false } +// IsInPool returns true if the node is in the pool argument or if the pool +// argument is the special "all" pool +func (n *Node) IsInPool(pool string) bool { + return pool == NodePoolAll || n.NodePool == pool +} + // Stub returns a summarized version of the node func (n *Node) Stub(fields *NodeStubFields) *NodeListStub { @@ -11141,6 +11147,9 @@ type AllocMetric struct { // NodesFiltered is the number of nodes filtered due to a constraint NodesFiltered int + // NodesInPool is the number of nodes in the node pool used by the job. + NodesInPool int + // NodesAvailable is the number of nodes available for evaluation per DC. NodesAvailable map[string]int diff --git a/scheduler/generic_sched.go b/scheduler/generic_sched.go index 0bfc844c9..3d04d9c5b 100644 --- a/scheduler/generic_sched.go +++ b/scheduler/generic_sched.go @@ -509,7 +509,7 @@ func (s *GenericScheduler) downgradedJobForPlacement(p placementResult) (string, // destructive updates to place and the set of new placements to place. func (s *GenericScheduler) computePlacements(destructive, place []placementResult) error { // Get the base nodes - nodes, _, byDC, err := readyNodesInDCs(s.state, s.job.Datacenters) + nodes, _, byDC, err := readyNodesInDCsAndPool(s.state, s.job.Datacenters, s.job.NodePool) if err != nil { return err } @@ -591,6 +591,7 @@ func (s *GenericScheduler) computePlacements(destructive, place []placementResul // Store the available nodes by datacenter s.ctx.Metrics().NodesAvailable = byDC + s.ctx.Metrics().NodesInPool = len(nodes) // Compute top K scoring node metadata s.ctx.Metrics().PopulateScoreMetaData() diff --git a/scheduler/generic_sched_test.go b/scheduler/generic_sched_test.go index 7c125e7fb..20bbbb462 100644 --- a/scheduler/generic_sched_test.go +++ b/scheduler/generic_sched_test.go @@ -1129,6 +1129,9 @@ func TestServiceSched_JobRegister_AllocFail(t *testing.T) { must.False(t, ok, must.Sprintf( "expected NodesAvailable metric to be unpopulated when there are no nodes")) + must.Zero(t, metrics.NodesInPool, must.Sprint( + "expected NodesInPool metric to be unpopulated when there are no nodes")) + // Check queued allocations queued := outEval.QueuedAllocations["web"] if queued != 10 { @@ -1234,6 +1237,8 @@ func TestServiceSched_JobRegister_CreateBlockedEval(t *testing.T) { t.Fatalf("bad: %#v", metrics) } + must.Eq(t, 2, metrics.NodesInPool, must.Sprint("expected NodesInPool metric to be set")) + h.AssertEvalStatus(t, structs.EvalStatusComplete) } diff --git a/scheduler/scheduler.go b/scheduler/scheduler.go index 8c06a0931..6a8d2bda9 100644 --- a/scheduler/scheduler.go +++ b/scheduler/scheduler.go @@ -75,6 +75,9 @@ type State interface { // The type of each result is *structs.Node Nodes(ws memdb.WatchSet) (memdb.ResultIterator, error) + // NodesByNodePool returns an iterator over all nodes in the node pool + NodesByNodePool(ws memdb.WatchSet, poolName string) (memdb.ResultIterator, error) + // AllocsByJob returns the allocations by JobID AllocsByJob(ws memdb.WatchSet, namespace, jobID string, all bool) ([]*structs.Allocation, error) diff --git a/scheduler/scheduler_sysbatch_test.go b/scheduler/scheduler_sysbatch_test.go index 212e6efa6..5b750d14e 100644 --- a/scheduler/scheduler_sysbatch_test.go +++ b/scheduler/scheduler_sysbatch_test.go @@ -83,6 +83,9 @@ func TestSysBatch_JobRegister(t *testing.T) { require.True(t, ok) require.Equal(t, 10, count, "bad metrics %#v:", out[0].Metrics) + must.Eq(t, 10, out[0].Metrics.NodesInPool, + must.Sprint("expected NodesInPool metric to be set")) + // Ensure no allocations are queued queued := h.Evals[0].QueuedAllocations["my-sysbatch"] require.Equal(t, 0, queued, "unexpected queued allocations") diff --git a/scheduler/scheduler_system.go b/scheduler/scheduler_system.go index 3fd367e29..1471701eb 100644 --- a/scheduler/scheduler_system.go +++ b/scheduler/scheduler_system.go @@ -137,7 +137,8 @@ func (s *SystemScheduler) process() (bool, error) { // Get the ready nodes in the required datacenters if !s.job.Stopped() { - s.nodes, s.notReadyNodes, s.nodesByDC, err = readyNodesInDCs(s.state, s.job.Datacenters) + s.nodes, s.notReadyNodes, s.nodesByDC, err = readyNodesInDCsAndPool( + s.state, s.job.Datacenters, s.job.NodePool) if err != nil { return false, fmt.Errorf("failed to get ready nodes: %v", err) } @@ -396,6 +397,7 @@ func (s *SystemScheduler) computePlacements(place []allocTuple) error { // Store the available nodes by datacenter s.ctx.Metrics().NodesAvailable = s.nodesByDC + s.ctx.Metrics().NodesInPool = len(s.nodes) // Compute top K scoring node metadata s.ctx.Metrics().PopulateScoreMetaData() @@ -417,6 +419,7 @@ func (s *SystemScheduler) computePlacements(place []allocTuple) error { // Store the available nodes by datacenter s.ctx.Metrics().NodesAvailable = s.nodesByDC + s.ctx.Metrics().NodesInPool = len(s.nodes) // Compute top K scoring node metadata s.ctx.Metrics().PopulateScoreMetaData() diff --git a/scheduler/scheduler_system_test.go b/scheduler/scheduler_system_test.go index 923fe13ac..57b6e984d 100644 --- a/scheduler/scheduler_system_test.go +++ b/scheduler/scheduler_system_test.go @@ -84,6 +84,9 @@ func TestSystemSched_JobRegister(t *testing.T) { require.True(t, ok) require.Equal(t, 10, count, "bad metrics %#v:", out[0].Metrics) + must.Eq(t, 10, out[0].Metrics.NodesInPool, + must.Sprint("expected NodesInPool metric to be set")) + // Ensure no allocations are queued queued := h.Evals[0].QueuedAllocations["web"] require.Equal(t, 0, queued, "unexpected queued allocations") @@ -391,6 +394,8 @@ func TestSystemSched_JobRegister_Annotate(t *testing.T) { if count, ok := out[0].Metrics.NodesAvailable["dc1"]; !ok || count != 10 { t.Fatalf("bad: %#v", out[0].Metrics) } + must.Eq(t, 10, out[0].Metrics.NodesInPool, + must.Sprint("expected NodesInPool metric to be set")) h.AssertEvalStatus(t, structs.EvalStatusComplete) diff --git a/scheduler/util.go b/scheduler/util.go index 3ce6cdb62..ebf3bbce3 100644 --- a/scheduler/util.go +++ b/scheduler/util.go @@ -44,9 +44,9 @@ func (d *diffResult) Append(other *diffResult) { d.reconnecting = append(d.reconnecting, other.reconnecting...) } -// readyNodesInDCs returns all the ready nodes in the given datacenters and a -// mapping of each data center to the count of ready nodes. -func readyNodesInDCs(state State, dcs []string) ([]*structs.Node, map[string]struct{}, map[string]int, error) { +// readyNodesInDCsAndPool returns all the ready nodes in the given datacenters +// and pool, and a mapping of each data center to the count of ready nodes. +func readyNodesInDCsAndPool(state State, dcs []string, pool string) ([]*structs.Node, map[string]struct{}, map[string]int, error) { // Index the DCs dcMap := make(map[string]int) @@ -54,7 +54,15 @@ func readyNodesInDCs(state State, dcs []string) ([]*structs.Node, map[string]str ws := memdb.NewWatchSet() var out []*structs.Node notReady := map[string]struct{}{} - iter, err := state.Nodes(ws) + + var iter memdb.ResultIterator + var err error + + if pool == structs.NodePoolAll || pool == "" { + iter, err = state.Nodes(ws) + } else { + iter, err = state.NodesByNodePool(ws, pool) + } if err != nil { return nil, nil, nil, err } @@ -631,6 +639,10 @@ func inplaceUpdate(ctx Context, eval *structs.Evaluation, job *structs.Job, if !node.IsInAnyDC(job.Datacenters) { continue } + // The alloc is on a node that's now in an ineligible node pool + if !node.IsInPool(job.NodePool) { + continue + } // Set the existing node as the base set stack.SetNodes([]*structs.Node{node}) @@ -875,6 +887,9 @@ func genericAllocUpdateFn(ctx Context, stack Stack, evalID string) allocUpdateTy if !node.IsInAnyDC(newJob.Datacenters) { return false, true, nil } + if !node.IsInPool(newJob.NodePool) { + return false, true, nil + } // Set the existing node as the base set stack.SetNodes([]*structs.Node{node}) diff --git a/scheduler/util_test.go b/scheduler/util_test.go index 37fbc140d..947e55478 100644 --- a/scheduler/util_test.go +++ b/scheduler/util_test.go @@ -35,7 +35,7 @@ func newNode(name string) *structs.Node { return n } -func TestReadyNodesInDCs(t *testing.T) { +func TestReadyNodesInDCsAndPool(t *testing.T) { ci.Parallel(t) state := state.TestStateStore(t) @@ -48,39 +48,77 @@ func TestReadyNodesInDCs(t *testing.T) { node4 := mock.DrainNode() node5 := mock.Node() node5.Datacenter = "not-this-dc" + node6 := mock.Node() + node6.Datacenter = "dc1" + node6.NodePool = "other" + node7 := mock.Node() + node7.Datacenter = "dc2" + node7.NodePool = "other" + node8 := mock.Node() + node8.Datacenter = "dc1" + node8.NodePool = "other" + node8.Status = structs.NodeStatusDown + node9 := mock.DrainNode() + node9.Datacenter = "dc2" + node9.NodePool = "other" must.NoError(t, state.UpsertNode(structs.MsgTypeTestSetup, 1000, node1)) // dc1 ready must.NoError(t, state.UpsertNode(structs.MsgTypeTestSetup, 1001, node2)) // dc2 ready must.NoError(t, state.UpsertNode(structs.MsgTypeTestSetup, 1002, node3)) // dc2 not ready must.NoError(t, state.UpsertNode(structs.MsgTypeTestSetup, 1003, node4)) // dc2 not ready must.NoError(t, state.UpsertNode(structs.MsgTypeTestSetup, 1004, node5)) // ready never match + must.NoError(t, state.UpsertNode(structs.MsgTypeTestSetup, 1005, node6)) // dc1 other pool + must.NoError(t, state.UpsertNode(structs.MsgTypeTestSetup, 1006, node7)) // dc2 other pool + must.NoError(t, state.UpsertNode(structs.MsgTypeTestSetup, 1007, node8)) // dc1 other not ready + must.NoError(t, state.UpsertNode(structs.MsgTypeTestSetup, 1008, node9)) // dc2 other not ready testCases := []struct { name string datacenters []string + pool string expectReady []*structs.Node expectNotReady map[string]struct{} expectIndex map[string]int }{ { - name: "no wildcards", + name: "no wildcards in all pool", + datacenters: []string{"dc1", "dc2"}, + pool: structs.NodePoolAll, + expectReady: []*structs.Node{node1, node2, node6, node7}, + expectNotReady: map[string]struct{}{ + node3.ID: {}, node4.ID: {}, node8.ID: {}, node9.ID: {}}, + expectIndex: map[string]int{"dc1": 2, "dc2": 2}, + }, + { + name: "with wildcard in all pool", + datacenters: []string{"dc*"}, + pool: structs.NodePoolAll, + expectReady: []*structs.Node{node1, node2, node6, node7}, + expectNotReady: map[string]struct{}{ + node3.ID: {}, node4.ID: {}, node8.ID: {}, node9.ID: {}}, + expectIndex: map[string]int{"dc1": 2, "dc2": 2}, + }, + { + name: "no wildcards in default pool", datacenters: []string{"dc1", "dc2"}, + pool: structs.NodePoolDefault, expectReady: []*structs.Node{node1, node2}, - expectNotReady: map[string]struct{}{node3.ID: struct{}{}, node4.ID: struct{}{}}, + expectNotReady: map[string]struct{}{node3.ID: {}, node4.ID: {}}, expectIndex: map[string]int{"dc1": 1, "dc2": 1}, }, { - name: "with wildcard", + name: "with wildcard in default pool", datacenters: []string{"dc*"}, + pool: structs.NodePoolDefault, expectReady: []*structs.Node{node1, node2}, - expectNotReady: map[string]struct{}{node3.ID: struct{}{}, node4.ID: struct{}{}}, + expectNotReady: map[string]struct{}{node3.ID: {}, node4.ID: {}}, expectIndex: map[string]int{"dc1": 1, "dc2": 1}, }, } for _, tc := range testCases { t.Run(tc.name, func(t *testing.T) { - ready, notReady, dcIndex, err := readyNodesInDCs(state, tc.datacenters) + ready, notReady, dcIndex, err := readyNodesInDCsAndPool(state, tc.datacenters, tc.pool) must.NoError(t, err) must.SliceContainsAll(t, tc.expectReady, ready, must.Sprint("expected ready to match")) must.Eq(t, tc.expectNotReady, notReady, must.Sprint("expected not-ready to match")) @@ -1009,6 +1047,54 @@ func TestInplaceUpdate_WildcardDatacenters(t *testing.T) { must.Sprintf("inplaceUpdate should have an inplace update")) } +func TestInplaceUpdate_NodePools(t *testing.T) { + ci.Parallel(t) + + store, ctx := testContext(t) + eval := mock.Eval() + job := mock.Job() + job.Datacenters = []string{"*"} + + node1 := mock.Node() + must.NoError(t, store.UpsertNode(structs.MsgTypeTestSetup, 1000, node1)) + + node2 := mock.Node() + node2.NodePool = "other" + must.NoError(t, store.UpsertNode(structs.MsgTypeTestSetup, 1001, node2)) + + // Register an alloc + alloc1 := mock.AllocForNode(node1) + alloc1.Job = job + alloc1.JobID = job.ID + must.NoError(t, store.UpsertJobSummary(1002, mock.JobSummary(alloc1.JobID))) + + alloc2 := mock.AllocForNode(node2) + alloc2.Job = job + alloc2.JobID = job.ID + must.NoError(t, store.UpsertJobSummary(1003, mock.JobSummary(alloc2.JobID))) + + t.Logf("alloc1=%s alloc2=%s", alloc1.ID, alloc2.ID) + + must.NoError(t, store.UpsertAllocs(structs.MsgTypeTestSetup, 1004, + []*structs.Allocation{alloc1, alloc2})) + + updates := []allocTuple{ + {Alloc: alloc1, TaskGroup: job.TaskGroups[0]}, + {Alloc: alloc2, TaskGroup: job.TaskGroups[0]}, + } + stack := NewGenericStack(false, ctx) + destructive, inplace := inplaceUpdate(ctx, eval, job, stack, updates) + + must.Len(t, 1, inplace, must.Sprint("should have an inplace update")) + must.Eq(t, alloc1.ID, inplace[0].Alloc.ID) + must.Len(t, 1, ctx.plan.NodeAllocation[node1.ID], + must.Sprint("NodeAllocation should have an inplace update for node1")) + + // note that NodeUpdate with the new alloc won't be populated here yet + must.Len(t, 1, destructive, must.Sprint("should have a destructive update")) + must.Eq(t, alloc2.ID, destructive[0].Alloc.ID) +} + func TestUtil_connectUpdated(t *testing.T) { ci.Parallel(t) diff --git a/website/content/api-docs/allocations.mdx b/website/content/api-docs/allocations.mdx index 88aa46d82..498f86aa8 100644 --- a/website/content/api-docs/allocations.mdx +++ b/website/content/api-docs/allocations.mdx @@ -438,6 +438,7 @@ $ curl \ "NodesAvailable": { "dc1": 1 }, + "NodesInPool": 1, "ClassFiltered": null, "ConstraintFiltered": null, "NodesExhausted": 0, diff --git a/website/content/api-docs/evaluations.mdx b/website/content/api-docs/evaluations.mdx index db1bce0e4..2984cdaf1 100644 --- a/website/content/api-docs/evaluations.mdx +++ b/website/content/api-docs/evaluations.mdx @@ -161,6 +161,7 @@ $ curl \ "NodesEvaluated": 0, "NodesExhausted": 0, "NodesFiltered": 0, + "NodesInPool": 0, "QuotaExhausted": null, "ResourcesExhausted": null, "ScoreMetaData": null, diff --git a/website/content/api-docs/jobs.mdx b/website/content/api-docs/jobs.mdx index 83213a741..f8ea1f134 100644 --- a/website/content/api-docs/jobs.mdx +++ b/website/content/api-docs/jobs.mdx @@ -2156,6 +2156,7 @@ $ curl \ "NodesAvailable": { "dc1": 1 }, + "NodesInPool": 1, "ClassFiltered": null, "ConstraintFiltered": null, "NodesExhausted": 1, diff --git a/website/content/api-docs/nodes.mdx b/website/content/api-docs/nodes.mdx index bb73e9c76..5688040c0 100644 --- a/website/content/api-docs/nodes.mdx +++ b/website/content/api-docs/nodes.mdx @@ -722,6 +722,7 @@ $ curl \ "NodesEvaluated": 2, "NodesExhausted": 0, "NodesFiltered": 0, + "NodesInPool": 2, "QuotaExhausted": null, "Scores": { "46f1c6c4-a0e5-21f6-fd5c-d76c3d84e806.binpack": 2.6950883117541586,