node pools: implement support in scheduler (#17443)

Implement scheduler support for node pool:

* When a scheduler is invoked, we get a set of the ready nodes in the DCs that
  are allowed for that job. Extend the filter to include the node pool.
* Ensure that changes to a job's node pool are picked up as destructive
  allocation updates.
* Add `NodesInPool` as a metric to all reporting done by the scheduler.
* Add the node-in-pool the filter to the `Node.Register` RPC so that we don't
  generate spurious evals for nodes in the wrong pool.
This commit is contained in:
Tim Gross 2023-06-07 10:39:03 -04:00 committed by GitHub
parent 5878113c41
commit fbaf4c8b69
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
16 changed files with 153 additions and 18 deletions

View File

@ -283,6 +283,7 @@ type Allocation struct {
type AllocationMetric struct { type AllocationMetric struct {
NodesEvaluated int NodesEvaluated int
NodesFiltered int NodesFiltered int
NodesInPool int
NodesAvailable map[string]int NodesAvailable map[string]int
ClassFiltered map[string]int ClassFiltered map[string]int
ConstraintFiltered map[string]int ConstraintFiltered map[string]int

View File

@ -235,6 +235,7 @@ func TestMonitor_formatAllocMetric(t *testing.T) {
Name: "display all possible scores", Name: "display all possible scores",
Metrics: &api.AllocationMetric{ Metrics: &api.AllocationMetric{
NodesEvaluated: 3, NodesEvaluated: 3,
NodesInPool: 3,
ScoreMetaData: []*api.NodeScoreMeta{ ScoreMetaData: []*api.NodeScoreMeta{
{ {
NodeID: "node-1", NodeID: "node-1",

View File

@ -1643,12 +1643,11 @@ func (n *Node) createNodeEvals(node *structs.Node, nodeIndex uint64) ([]string,
var sysJobs []*structs.Job var sysJobs []*structs.Job
for jobI := sysJobsIter.Next(); jobI != nil; jobI = sysJobsIter.Next() { for jobI := sysJobsIter.Next(); jobI != nil; jobI = sysJobsIter.Next() {
job := jobI.(*structs.Job) job := jobI.(*structs.Job)
// Avoid creating evals for jobs that don't run in this // Avoid creating evals for jobs that don't run in this datacenter or
// datacenter. We could perform an entire feasibility check // node pool. We could perform an entire feasibility check here, but
// here, but datacenter is a good optimization to start with as // datacenter/pool is a good optimization to start with as their
// datacenter cardinality tends to be low so the check // cardinality tends to be low so the check shouldn't add much work.
// shouldn't add much work. if node.IsInPool(job.NodePool) && node.IsInAnyDC(job.Datacenters) {
if node.IsInAnyDC(job.Datacenters) {
sysJobs = append(sysJobs, job) sysJobs = append(sysJobs, job)
} }
} }

View File

@ -2357,6 +2357,12 @@ func (n *Node) IsInAnyDC(datacenters []string) bool {
return false return false
} }
// IsInPool returns true if the node is in the pool argument or if the pool
// argument is the special "all" pool
func (n *Node) IsInPool(pool string) bool {
return pool == NodePoolAll || n.NodePool == pool
}
// Stub returns a summarized version of the node // Stub returns a summarized version of the node
func (n *Node) Stub(fields *NodeStubFields) *NodeListStub { func (n *Node) Stub(fields *NodeStubFields) *NodeListStub {
@ -11141,6 +11147,9 @@ type AllocMetric struct {
// NodesFiltered is the number of nodes filtered due to a constraint // NodesFiltered is the number of nodes filtered due to a constraint
NodesFiltered int NodesFiltered int
// NodesInPool is the number of nodes in the node pool used by the job.
NodesInPool int
// NodesAvailable is the number of nodes available for evaluation per DC. // NodesAvailable is the number of nodes available for evaluation per DC.
NodesAvailable map[string]int NodesAvailable map[string]int

View File

@ -509,7 +509,7 @@ func (s *GenericScheduler) downgradedJobForPlacement(p placementResult) (string,
// destructive updates to place and the set of new placements to place. // destructive updates to place and the set of new placements to place.
func (s *GenericScheduler) computePlacements(destructive, place []placementResult) error { func (s *GenericScheduler) computePlacements(destructive, place []placementResult) error {
// Get the base nodes // Get the base nodes
nodes, _, byDC, err := readyNodesInDCs(s.state, s.job.Datacenters) nodes, _, byDC, err := readyNodesInDCsAndPool(s.state, s.job.Datacenters, s.job.NodePool)
if err != nil { if err != nil {
return err return err
} }
@ -591,6 +591,7 @@ func (s *GenericScheduler) computePlacements(destructive, place []placementResul
// Store the available nodes by datacenter // Store the available nodes by datacenter
s.ctx.Metrics().NodesAvailable = byDC s.ctx.Metrics().NodesAvailable = byDC
s.ctx.Metrics().NodesInPool = len(nodes)
// Compute top K scoring node metadata // Compute top K scoring node metadata
s.ctx.Metrics().PopulateScoreMetaData() s.ctx.Metrics().PopulateScoreMetaData()

View File

@ -1129,6 +1129,9 @@ func TestServiceSched_JobRegister_AllocFail(t *testing.T) {
must.False(t, ok, must.Sprintf( must.False(t, ok, must.Sprintf(
"expected NodesAvailable metric to be unpopulated when there are no nodes")) "expected NodesAvailable metric to be unpopulated when there are no nodes"))
must.Zero(t, metrics.NodesInPool, must.Sprint(
"expected NodesInPool metric to be unpopulated when there are no nodes"))
// Check queued allocations // Check queued allocations
queued := outEval.QueuedAllocations["web"] queued := outEval.QueuedAllocations["web"]
if queued != 10 { if queued != 10 {
@ -1234,6 +1237,8 @@ func TestServiceSched_JobRegister_CreateBlockedEval(t *testing.T) {
t.Fatalf("bad: %#v", metrics) t.Fatalf("bad: %#v", metrics)
} }
must.Eq(t, 2, metrics.NodesInPool, must.Sprint("expected NodesInPool metric to be set"))
h.AssertEvalStatus(t, structs.EvalStatusComplete) h.AssertEvalStatus(t, structs.EvalStatusComplete)
} }

View File

@ -75,6 +75,9 @@ type State interface {
// The type of each result is *structs.Node // The type of each result is *structs.Node
Nodes(ws memdb.WatchSet) (memdb.ResultIterator, error) Nodes(ws memdb.WatchSet) (memdb.ResultIterator, error)
// NodesByNodePool returns an iterator over all nodes in the node pool
NodesByNodePool(ws memdb.WatchSet, poolName string) (memdb.ResultIterator, error)
// AllocsByJob returns the allocations by JobID // AllocsByJob returns the allocations by JobID
AllocsByJob(ws memdb.WatchSet, namespace, jobID string, all bool) ([]*structs.Allocation, error) AllocsByJob(ws memdb.WatchSet, namespace, jobID string, all bool) ([]*structs.Allocation, error)

View File

@ -83,6 +83,9 @@ func TestSysBatch_JobRegister(t *testing.T) {
require.True(t, ok) require.True(t, ok)
require.Equal(t, 10, count, "bad metrics %#v:", out[0].Metrics) require.Equal(t, 10, count, "bad metrics %#v:", out[0].Metrics)
must.Eq(t, 10, out[0].Metrics.NodesInPool,
must.Sprint("expected NodesInPool metric to be set"))
// Ensure no allocations are queued // Ensure no allocations are queued
queued := h.Evals[0].QueuedAllocations["my-sysbatch"] queued := h.Evals[0].QueuedAllocations["my-sysbatch"]
require.Equal(t, 0, queued, "unexpected queued allocations") require.Equal(t, 0, queued, "unexpected queued allocations")

View File

@ -137,7 +137,8 @@ func (s *SystemScheduler) process() (bool, error) {
// Get the ready nodes in the required datacenters // Get the ready nodes in the required datacenters
if !s.job.Stopped() { if !s.job.Stopped() {
s.nodes, s.notReadyNodes, s.nodesByDC, err = readyNodesInDCs(s.state, s.job.Datacenters) s.nodes, s.notReadyNodes, s.nodesByDC, err = readyNodesInDCsAndPool(
s.state, s.job.Datacenters, s.job.NodePool)
if err != nil { if err != nil {
return false, fmt.Errorf("failed to get ready nodes: %v", err) return false, fmt.Errorf("failed to get ready nodes: %v", err)
} }
@ -396,6 +397,7 @@ func (s *SystemScheduler) computePlacements(place []allocTuple) error {
// Store the available nodes by datacenter // Store the available nodes by datacenter
s.ctx.Metrics().NodesAvailable = s.nodesByDC s.ctx.Metrics().NodesAvailable = s.nodesByDC
s.ctx.Metrics().NodesInPool = len(s.nodes)
// Compute top K scoring node metadata // Compute top K scoring node metadata
s.ctx.Metrics().PopulateScoreMetaData() s.ctx.Metrics().PopulateScoreMetaData()
@ -417,6 +419,7 @@ func (s *SystemScheduler) computePlacements(place []allocTuple) error {
// Store the available nodes by datacenter // Store the available nodes by datacenter
s.ctx.Metrics().NodesAvailable = s.nodesByDC s.ctx.Metrics().NodesAvailable = s.nodesByDC
s.ctx.Metrics().NodesInPool = len(s.nodes)
// Compute top K scoring node metadata // Compute top K scoring node metadata
s.ctx.Metrics().PopulateScoreMetaData() s.ctx.Metrics().PopulateScoreMetaData()

View File

@ -84,6 +84,9 @@ func TestSystemSched_JobRegister(t *testing.T) {
require.True(t, ok) require.True(t, ok)
require.Equal(t, 10, count, "bad metrics %#v:", out[0].Metrics) require.Equal(t, 10, count, "bad metrics %#v:", out[0].Metrics)
must.Eq(t, 10, out[0].Metrics.NodesInPool,
must.Sprint("expected NodesInPool metric to be set"))
// Ensure no allocations are queued // Ensure no allocations are queued
queued := h.Evals[0].QueuedAllocations["web"] queued := h.Evals[0].QueuedAllocations["web"]
require.Equal(t, 0, queued, "unexpected queued allocations") require.Equal(t, 0, queued, "unexpected queued allocations")
@ -391,6 +394,8 @@ func TestSystemSched_JobRegister_Annotate(t *testing.T) {
if count, ok := out[0].Metrics.NodesAvailable["dc1"]; !ok || count != 10 { if count, ok := out[0].Metrics.NodesAvailable["dc1"]; !ok || count != 10 {
t.Fatalf("bad: %#v", out[0].Metrics) t.Fatalf("bad: %#v", out[0].Metrics)
} }
must.Eq(t, 10, out[0].Metrics.NodesInPool,
must.Sprint("expected NodesInPool metric to be set"))
h.AssertEvalStatus(t, structs.EvalStatusComplete) h.AssertEvalStatus(t, structs.EvalStatusComplete)

View File

@ -44,9 +44,9 @@ func (d *diffResult) Append(other *diffResult) {
d.reconnecting = append(d.reconnecting, other.reconnecting...) d.reconnecting = append(d.reconnecting, other.reconnecting...)
} }
// readyNodesInDCs returns all the ready nodes in the given datacenters and a // readyNodesInDCsAndPool returns all the ready nodes in the given datacenters
// mapping of each data center to the count of ready nodes. // and pool, and a mapping of each data center to the count of ready nodes.
func readyNodesInDCs(state State, dcs []string) ([]*structs.Node, map[string]struct{}, map[string]int, error) { func readyNodesInDCsAndPool(state State, dcs []string, pool string) ([]*structs.Node, map[string]struct{}, map[string]int, error) {
// Index the DCs // Index the DCs
dcMap := make(map[string]int) dcMap := make(map[string]int)
@ -54,7 +54,15 @@ func readyNodesInDCs(state State, dcs []string) ([]*structs.Node, map[string]str
ws := memdb.NewWatchSet() ws := memdb.NewWatchSet()
var out []*structs.Node var out []*structs.Node
notReady := map[string]struct{}{} notReady := map[string]struct{}{}
iter, err := state.Nodes(ws)
var iter memdb.ResultIterator
var err error
if pool == structs.NodePoolAll || pool == "" {
iter, err = state.Nodes(ws)
} else {
iter, err = state.NodesByNodePool(ws, pool)
}
if err != nil { if err != nil {
return nil, nil, nil, err return nil, nil, nil, err
} }
@ -631,6 +639,10 @@ func inplaceUpdate(ctx Context, eval *structs.Evaluation, job *structs.Job,
if !node.IsInAnyDC(job.Datacenters) { if !node.IsInAnyDC(job.Datacenters) {
continue continue
} }
// The alloc is on a node that's now in an ineligible node pool
if !node.IsInPool(job.NodePool) {
continue
}
// Set the existing node as the base set // Set the existing node as the base set
stack.SetNodes([]*structs.Node{node}) stack.SetNodes([]*structs.Node{node})
@ -875,6 +887,9 @@ func genericAllocUpdateFn(ctx Context, stack Stack, evalID string) allocUpdateTy
if !node.IsInAnyDC(newJob.Datacenters) { if !node.IsInAnyDC(newJob.Datacenters) {
return false, true, nil return false, true, nil
} }
if !node.IsInPool(newJob.NodePool) {
return false, true, nil
}
// Set the existing node as the base set // Set the existing node as the base set
stack.SetNodes([]*structs.Node{node}) stack.SetNodes([]*structs.Node{node})

View File

@ -35,7 +35,7 @@ func newNode(name string) *structs.Node {
return n return n
} }
func TestReadyNodesInDCs(t *testing.T) { func TestReadyNodesInDCsAndPool(t *testing.T) {
ci.Parallel(t) ci.Parallel(t)
state := state.TestStateStore(t) state := state.TestStateStore(t)
@ -48,39 +48,77 @@ func TestReadyNodesInDCs(t *testing.T) {
node4 := mock.DrainNode() node4 := mock.DrainNode()
node5 := mock.Node() node5 := mock.Node()
node5.Datacenter = "not-this-dc" node5.Datacenter = "not-this-dc"
node6 := mock.Node()
node6.Datacenter = "dc1"
node6.NodePool = "other"
node7 := mock.Node()
node7.Datacenter = "dc2"
node7.NodePool = "other"
node8 := mock.Node()
node8.Datacenter = "dc1"
node8.NodePool = "other"
node8.Status = structs.NodeStatusDown
node9 := mock.DrainNode()
node9.Datacenter = "dc2"
node9.NodePool = "other"
must.NoError(t, state.UpsertNode(structs.MsgTypeTestSetup, 1000, node1)) // dc1 ready must.NoError(t, state.UpsertNode(structs.MsgTypeTestSetup, 1000, node1)) // dc1 ready
must.NoError(t, state.UpsertNode(structs.MsgTypeTestSetup, 1001, node2)) // dc2 ready must.NoError(t, state.UpsertNode(structs.MsgTypeTestSetup, 1001, node2)) // dc2 ready
must.NoError(t, state.UpsertNode(structs.MsgTypeTestSetup, 1002, node3)) // dc2 not ready must.NoError(t, state.UpsertNode(structs.MsgTypeTestSetup, 1002, node3)) // dc2 not ready
must.NoError(t, state.UpsertNode(structs.MsgTypeTestSetup, 1003, node4)) // dc2 not ready must.NoError(t, state.UpsertNode(structs.MsgTypeTestSetup, 1003, node4)) // dc2 not ready
must.NoError(t, state.UpsertNode(structs.MsgTypeTestSetup, 1004, node5)) // ready never match must.NoError(t, state.UpsertNode(structs.MsgTypeTestSetup, 1004, node5)) // ready never match
must.NoError(t, state.UpsertNode(structs.MsgTypeTestSetup, 1005, node6)) // dc1 other pool
must.NoError(t, state.UpsertNode(structs.MsgTypeTestSetup, 1006, node7)) // dc2 other pool
must.NoError(t, state.UpsertNode(structs.MsgTypeTestSetup, 1007, node8)) // dc1 other not ready
must.NoError(t, state.UpsertNode(structs.MsgTypeTestSetup, 1008, node9)) // dc2 other not ready
testCases := []struct { testCases := []struct {
name string name string
datacenters []string datacenters []string
pool string
expectReady []*structs.Node expectReady []*structs.Node
expectNotReady map[string]struct{} expectNotReady map[string]struct{}
expectIndex map[string]int expectIndex map[string]int
}{ }{
{ {
name: "no wildcards", name: "no wildcards in all pool",
datacenters: []string{"dc1", "dc2"},
pool: structs.NodePoolAll,
expectReady: []*structs.Node{node1, node2, node6, node7},
expectNotReady: map[string]struct{}{
node3.ID: {}, node4.ID: {}, node8.ID: {}, node9.ID: {}},
expectIndex: map[string]int{"dc1": 2, "dc2": 2},
},
{
name: "with wildcard in all pool",
datacenters: []string{"dc*"},
pool: structs.NodePoolAll,
expectReady: []*structs.Node{node1, node2, node6, node7},
expectNotReady: map[string]struct{}{
node3.ID: {}, node4.ID: {}, node8.ID: {}, node9.ID: {}},
expectIndex: map[string]int{"dc1": 2, "dc2": 2},
},
{
name: "no wildcards in default pool",
datacenters: []string{"dc1", "dc2"}, datacenters: []string{"dc1", "dc2"},
pool: structs.NodePoolDefault,
expectReady: []*structs.Node{node1, node2}, expectReady: []*structs.Node{node1, node2},
expectNotReady: map[string]struct{}{node3.ID: struct{}{}, node4.ID: struct{}{}}, expectNotReady: map[string]struct{}{node3.ID: {}, node4.ID: {}},
expectIndex: map[string]int{"dc1": 1, "dc2": 1}, expectIndex: map[string]int{"dc1": 1, "dc2": 1},
}, },
{ {
name: "with wildcard", name: "with wildcard in default pool",
datacenters: []string{"dc*"}, datacenters: []string{"dc*"},
pool: structs.NodePoolDefault,
expectReady: []*structs.Node{node1, node2}, expectReady: []*structs.Node{node1, node2},
expectNotReady: map[string]struct{}{node3.ID: struct{}{}, node4.ID: struct{}{}}, expectNotReady: map[string]struct{}{node3.ID: {}, node4.ID: {}},
expectIndex: map[string]int{"dc1": 1, "dc2": 1}, expectIndex: map[string]int{"dc1": 1, "dc2": 1},
}, },
} }
for _, tc := range testCases { for _, tc := range testCases {
t.Run(tc.name, func(t *testing.T) { t.Run(tc.name, func(t *testing.T) {
ready, notReady, dcIndex, err := readyNodesInDCs(state, tc.datacenters) ready, notReady, dcIndex, err := readyNodesInDCsAndPool(state, tc.datacenters, tc.pool)
must.NoError(t, err) must.NoError(t, err)
must.SliceContainsAll(t, tc.expectReady, ready, must.Sprint("expected ready to match")) must.SliceContainsAll(t, tc.expectReady, ready, must.Sprint("expected ready to match"))
must.Eq(t, tc.expectNotReady, notReady, must.Sprint("expected not-ready to match")) must.Eq(t, tc.expectNotReady, notReady, must.Sprint("expected not-ready to match"))
@ -1009,6 +1047,54 @@ func TestInplaceUpdate_WildcardDatacenters(t *testing.T) {
must.Sprintf("inplaceUpdate should have an inplace update")) must.Sprintf("inplaceUpdate should have an inplace update"))
} }
func TestInplaceUpdate_NodePools(t *testing.T) {
ci.Parallel(t)
store, ctx := testContext(t)
eval := mock.Eval()
job := mock.Job()
job.Datacenters = []string{"*"}
node1 := mock.Node()
must.NoError(t, store.UpsertNode(structs.MsgTypeTestSetup, 1000, node1))
node2 := mock.Node()
node2.NodePool = "other"
must.NoError(t, store.UpsertNode(structs.MsgTypeTestSetup, 1001, node2))
// Register an alloc
alloc1 := mock.AllocForNode(node1)
alloc1.Job = job
alloc1.JobID = job.ID
must.NoError(t, store.UpsertJobSummary(1002, mock.JobSummary(alloc1.JobID)))
alloc2 := mock.AllocForNode(node2)
alloc2.Job = job
alloc2.JobID = job.ID
must.NoError(t, store.UpsertJobSummary(1003, mock.JobSummary(alloc2.JobID)))
t.Logf("alloc1=%s alloc2=%s", alloc1.ID, alloc2.ID)
must.NoError(t, store.UpsertAllocs(structs.MsgTypeTestSetup, 1004,
[]*structs.Allocation{alloc1, alloc2}))
updates := []allocTuple{
{Alloc: alloc1, TaskGroup: job.TaskGroups[0]},
{Alloc: alloc2, TaskGroup: job.TaskGroups[0]},
}
stack := NewGenericStack(false, ctx)
destructive, inplace := inplaceUpdate(ctx, eval, job, stack, updates)
must.Len(t, 1, inplace, must.Sprint("should have an inplace update"))
must.Eq(t, alloc1.ID, inplace[0].Alloc.ID)
must.Len(t, 1, ctx.plan.NodeAllocation[node1.ID],
must.Sprint("NodeAllocation should have an inplace update for node1"))
// note that NodeUpdate with the new alloc won't be populated here yet
must.Len(t, 1, destructive, must.Sprint("should have a destructive update"))
must.Eq(t, alloc2.ID, destructive[0].Alloc.ID)
}
func TestUtil_connectUpdated(t *testing.T) { func TestUtil_connectUpdated(t *testing.T) {
ci.Parallel(t) ci.Parallel(t)

View File

@ -438,6 +438,7 @@ $ curl \
"NodesAvailable": { "NodesAvailable": {
"dc1": 1 "dc1": 1
}, },
"NodesInPool": 1,
"ClassFiltered": null, "ClassFiltered": null,
"ConstraintFiltered": null, "ConstraintFiltered": null,
"NodesExhausted": 0, "NodesExhausted": 0,

View File

@ -161,6 +161,7 @@ $ curl \
"NodesEvaluated": 0, "NodesEvaluated": 0,
"NodesExhausted": 0, "NodesExhausted": 0,
"NodesFiltered": 0, "NodesFiltered": 0,
"NodesInPool": 0,
"QuotaExhausted": null, "QuotaExhausted": null,
"ResourcesExhausted": null, "ResourcesExhausted": null,
"ScoreMetaData": null, "ScoreMetaData": null,

View File

@ -2156,6 +2156,7 @@ $ curl \
"NodesAvailable": { "NodesAvailable": {
"dc1": 1 "dc1": 1
}, },
"NodesInPool": 1,
"ClassFiltered": null, "ClassFiltered": null,
"ConstraintFiltered": null, "ConstraintFiltered": null,
"NodesExhausted": 1, "NodesExhausted": 1,

View File

@ -722,6 +722,7 @@ $ curl \
"NodesEvaluated": 2, "NodesEvaluated": 2,
"NodesExhausted": 0, "NodesExhausted": 0,
"NodesFiltered": 0, "NodesFiltered": 0,
"NodesInPool": 2,
"QuotaExhausted": null, "QuotaExhausted": null,
"Scores": { "Scores": {
"46f1c6c4-a0e5-21f6-fd5c-d76c3d84e806.binpack": 2.6950883117541586, "46f1c6c4-a0e5-21f6-fd5c-d76c3d84e806.binpack": 2.6950883117541586,