scheduler: fix quadratic performance with spread blocks (#11712)
When the scheduler picks a node for each evaluation, the `LimitIterator` provides at most 2 eligible nodes for the `MaxScoreIterator` to choose from. This keeps scheduling fast while producing acceptable results because the results are binpacked. Jobs with a `spread` block (or node affinity) remove this limit in order to produce correct spread scoring. This means that every allocation within a job with a `spread` block is evaluated against _all_ eligible nodes. Operators of large clusters have reported that jobs with `spread` blocks that are eligible on a large number of nodes can take longer than the nack timeout to evaluate (60s). Typical evaluations are processed in milliseconds. In practice, it's not necessary to evaluate every eligible node for every allocation on large clusters, because the `RandomIterator` at the base of the scheduler stack produces enough variation in each pass that the likelihood of an uneven spread is negligible. Note that feasibility is checked before the limit, so this only impacts the number of _eligible_ nodes available for scoring, not the total number of nodes. This changeset sets the iterator limit for "large" `spread` block and node affinity jobs to be equal to the number of desired allocations. This brings an example problematic job evaluation down from ~3min to ~10s. The included tests ensure that we have acceptable spread results across a variety of large cluster topologies.
This commit is contained in:
parent
8ba4e063e2
commit
b0c3b99b03
|
@ -0,0 +1,3 @@
|
|||
```release-note:bug
|
||||
scheduler: Fixed a performance bug where `spread` and node affinity can cause a job to take longer than the nack timeout to be evaluated.
|
||||
```
|
|
@ -2,7 +2,10 @@ package scheduler
|
|||
|
||||
import (
|
||||
"math"
|
||||
"math/rand"
|
||||
"sort"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"fmt"
|
||||
|
||||
|
@ -568,3 +571,243 @@ func Test_evenSpreadScoreBoost(t *testing.T) {
|
|||
require.False(t, math.IsInf(boost, 1))
|
||||
require.Equal(t, 1.0, boost)
|
||||
}
|
||||
|
||||
// TestSpreadOnLargeCluster exercises potentially quadratic
|
||||
// performance cases with spread scheduling when we have a large
|
||||
// number of eligible nodes unless we limit the number that each
|
||||
// MaxScore attempt considers. By reducing the total from MaxInt, we
|
||||
// can prevent quadratic performance but then we need this test to
|
||||
// verify we have satisfactory spread results.
|
||||
func TestSpreadOnLargeCluster(t *testing.T) {
|
||||
t.Parallel()
|
||||
cases := []struct {
|
||||
name string
|
||||
nodeCount int
|
||||
racks map[string]int
|
||||
allocs int
|
||||
}{
|
||||
{
|
||||
name: "nodes=10k even racks=100 allocs=500",
|
||||
nodeCount: 10000,
|
||||
racks: generateEvenRacks(10000, 100),
|
||||
allocs: 500,
|
||||
},
|
||||
{
|
||||
name: "nodes=10k even racks=100 allocs=50",
|
||||
nodeCount: 10000,
|
||||
racks: generateEvenRacks(10000, 100),
|
||||
allocs: 50,
|
||||
},
|
||||
{
|
||||
name: "nodes=10k even racks=10 allocs=500",
|
||||
nodeCount: 10000,
|
||||
racks: generateEvenRacks(10000, 10),
|
||||
allocs: 500,
|
||||
},
|
||||
{
|
||||
name: "nodes=10k even racks=10 allocs=50",
|
||||
nodeCount: 10000,
|
||||
racks: generateEvenRacks(10000, 10),
|
||||
allocs: 500,
|
||||
},
|
||||
{
|
||||
name: "nodes=10k small uneven racks allocs=500",
|
||||
nodeCount: 10000,
|
||||
racks: generateUnevenRacks(t, 10000, 50),
|
||||
allocs: 500,
|
||||
},
|
||||
{
|
||||
name: "nodes=10k small uneven racks allocs=50",
|
||||
nodeCount: 10000,
|
||||
racks: generateUnevenRacks(t, 10000, 50),
|
||||
allocs: 500,
|
||||
},
|
||||
{
|
||||
name: "nodes=10k many uneven racks allocs=500",
|
||||
nodeCount: 10000,
|
||||
racks: generateUnevenRacks(t, 10000, 500),
|
||||
allocs: 500,
|
||||
},
|
||||
{
|
||||
name: "nodes=10k many uneven racks allocs=50",
|
||||
nodeCount: 10000,
|
||||
racks: generateUnevenRacks(t, 10000, 500),
|
||||
allocs: 50,
|
||||
},
|
||||
}
|
||||
|
||||
for i := range cases {
|
||||
tc := cases[i]
|
||||
t.Run(tc.name, func(t *testing.T) {
|
||||
t.Parallel()
|
||||
h := NewHarness(t)
|
||||
err := upsertNodes(h, tc.nodeCount, tc.racks)
|
||||
require.NoError(t, err)
|
||||
job := generateJob(tc.allocs)
|
||||
eval, err := upsertJob(h, job)
|
||||
require.NoError(t, err)
|
||||
|
||||
start := time.Now()
|
||||
err = h.Process(NewServiceScheduler, eval)
|
||||
require.NoError(t, err)
|
||||
require.LessOrEqual(t, time.Since(start), time.Duration(60*time.Second),
|
||||
"time to evaluate exceeded EvalNackTimeout")
|
||||
|
||||
require.Len(t, h.Plans, 1)
|
||||
require.False(t, h.Plans[0].IsNoOp())
|
||||
require.NoError(t, validateEqualSpread(h))
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
// generateUnevenRacks creates a map of rack names to a count of nodes
|
||||
// evenly distributed in those racks
|
||||
func generateEvenRacks(nodes int, rackCount int) map[string]int {
|
||||
racks := map[string]int{}
|
||||
for i := 0; i < nodes; i++ {
|
||||
racks[fmt.Sprintf("r%d", i%rackCount)]++
|
||||
}
|
||||
return racks
|
||||
}
|
||||
|
||||
// generateUnevenRacks creates a random map of rack names to a count
|
||||
// of nodes in that rack
|
||||
func generateUnevenRacks(t *testing.T, nodes int, rackCount int) map[string]int {
|
||||
rackNames := []string{}
|
||||
for i := 0; i < rackCount; i++ {
|
||||
rackNames = append(rackNames, fmt.Sprintf("r%d", i))
|
||||
}
|
||||
|
||||
// print this so that any future test flakes can be more easily
|
||||
// reproduced
|
||||
seed := time.Now().UnixNano()
|
||||
rand.Seed(seed)
|
||||
t.Logf("nodes=%d racks=%d seed=%d\n", nodes, rackCount, seed)
|
||||
|
||||
racks := map[string]int{}
|
||||
for i := 0; i < nodes; i++ {
|
||||
idx := rand.Intn(len(rackNames))
|
||||
racks[rackNames[idx]]++
|
||||
}
|
||||
return racks
|
||||
}
|
||||
|
||||
// upsertNodes creates a collection of Nodes in the state store,
|
||||
// distributed among the racks
|
||||
func upsertNodes(h *Harness, count int, racks map[string]int) error {
|
||||
|
||||
datacenters := []string{"dc-1", "dc-2"}
|
||||
rackAssignments := []string{}
|
||||
for rack, count := range racks {
|
||||
for i := 0; i < count; i++ {
|
||||
rackAssignments = append(rackAssignments, rack)
|
||||
}
|
||||
}
|
||||
|
||||
for i := 0; i < count; i++ {
|
||||
node := mock.Node()
|
||||
node.Datacenter = datacenters[i%2]
|
||||
node.Meta = map[string]string{}
|
||||
node.Meta["rack"] = fmt.Sprintf("r%s", rackAssignments[i])
|
||||
node.NodeResources.Cpu.CpuShares = 14000
|
||||
node.NodeResources.Memory.MemoryMB = 32000
|
||||
err := h.State.UpsertNode(structs.MsgTypeTestSetup, h.NextIndex(), node)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func generateJob(jobSize int) *structs.Job {
|
||||
job := mock.Job()
|
||||
job.Datacenters = []string{"dc-1", "dc-2"}
|
||||
job.Spreads = []*structs.Spread{{Attribute: "${meta.rack}"}}
|
||||
job.Constraints = []*structs.Constraint{}
|
||||
job.TaskGroups[0].Count = jobSize
|
||||
job.TaskGroups[0].Networks = nil
|
||||
job.TaskGroups[0].Services = []*structs.Service{}
|
||||
job.TaskGroups[0].Tasks[0].Resources = &structs.Resources{
|
||||
CPU: 6000,
|
||||
MemoryMB: 6000,
|
||||
}
|
||||
return job
|
||||
}
|
||||
|
||||
func upsertJob(h *Harness, job *structs.Job) (*structs.Evaluation, error) {
|
||||
err := h.State.UpsertJob(structs.MsgTypeTestSetup, h.NextIndex(), job)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
eval := &structs.Evaluation{
|
||||
Namespace: structs.DefaultNamespace,
|
||||
ID: uuid.Generate(),
|
||||
Priority: job.Priority,
|
||||
TriggeredBy: structs.EvalTriggerJobRegister,
|
||||
JobID: job.ID,
|
||||
Status: structs.EvalStatusPending,
|
||||
}
|
||||
err = h.State.UpsertEvals(structs.MsgTypeTestSetup,
|
||||
h.NextIndex(), []*structs.Evaluation{eval})
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return eval, nil
|
||||
}
|
||||
|
||||
// validateEqualSpread compares the resulting plan to the node
|
||||
// metadata to verify that each group of spread targets has an equal
|
||||
// distribution.
|
||||
func validateEqualSpread(h *Harness) error {
|
||||
|
||||
iter, err := h.State.Nodes(nil)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
i := 0
|
||||
nodesToRacks := map[string]string{}
|
||||
racksToAllocCount := map[string]int{}
|
||||
for {
|
||||
raw := iter.Next()
|
||||
if raw == nil {
|
||||
break
|
||||
}
|
||||
node := raw.(*structs.Node)
|
||||
rack, ok := node.Meta["rack"]
|
||||
if ok {
|
||||
nodesToRacks[node.ID] = rack
|
||||
racksToAllocCount[rack] = 0
|
||||
}
|
||||
i++
|
||||
}
|
||||
|
||||
// Collapse the count of allocations per node into a list of
|
||||
// counts. The results should be clustered within one of each
|
||||
// other.
|
||||
for nodeID, nodeAllocs := range h.Plans[0].NodeAllocation {
|
||||
racksToAllocCount[nodesToRacks[nodeID]] += len(nodeAllocs)
|
||||
}
|
||||
countSet := map[int]int{}
|
||||
for _, count := range racksToAllocCount {
|
||||
countSet[count]++
|
||||
}
|
||||
|
||||
countSlice := []int{}
|
||||
for count := range countSet {
|
||||
countSlice = append(countSlice, count)
|
||||
}
|
||||
|
||||
switch len(countSlice) {
|
||||
case 1:
|
||||
return nil
|
||||
case 2, 3:
|
||||
sort.Ints(countSlice)
|
||||
for i := 1; i < len(countSlice); i++ {
|
||||
if countSlice[i] != countSlice[i-1]+1 {
|
||||
return fmt.Errorf("expected even distributon of allocs to racks, but got:\n%+v", countSet)
|
||||
}
|
||||
}
|
||||
return nil
|
||||
}
|
||||
return fmt.Errorf("expected even distributon of allocs to racks, but got:\n%+v", countSet)
|
||||
}
|
||||
|
|
|
@ -163,7 +163,14 @@ func (s *GenericStack) Select(tg *structs.TaskGroup, options *SelectOptions) *Ra
|
|||
s.spread.SetTaskGroup(tg)
|
||||
|
||||
if s.nodeAffinity.hasAffinities() || s.spread.hasSpreads() {
|
||||
s.limit.SetLimit(math.MaxInt32)
|
||||
// scoring spread across all nodes has quadratic behavior, so
|
||||
// we need to consider a subset of nodes to keep evaluaton times
|
||||
// reasonable but enough to ensure spread is correct. this
|
||||
// value was empirically determined.
|
||||
s.limit.SetLimit(tg.Count)
|
||||
if tg.Count < 100 {
|
||||
s.limit.SetLimit(100)
|
||||
}
|
||||
}
|
||||
|
||||
if contextual, ok := s.quota.(ContextualIterator); ok {
|
||||
|
|
|
@ -54,8 +54,12 @@ spread stanza. Spread scores are combined with other scoring factors such as bin
|
|||
|
||||
A job or task group can have more than one spread criteria, with weights to express relative preference.
|
||||
|
||||
Spread criteria are treated as a soft preference by the Nomad scheduler.
|
||||
If no nodes match a given spread criteria, placement is still successful.
|
||||
Spread criteria are treated as a soft preference by the Nomad
|
||||
scheduler. If no nodes match a given spread criteria, placement is
|
||||
still successful. To avoid scoring every node for every placement,
|
||||
allocations may not be perfectly spread. Spread works best on
|
||||
attributes with similar number of nodes: identically configured racks
|
||||
or similarly configured datacenters.
|
||||
|
||||
Spread may be expressed on [attributes][interpolation] or [client metadata][client-meta].
|
||||
Additionally, spread may be specified at the [job][job] and [group][group] levels for ultimate flexibility. Job level spread criteria are inherited by all task groups in the job.
|
||||
|
|
Loading…
Reference in New Issue