b0c3b99b03
When the scheduler picks a node for each evaluation, the `LimitIterator` provides at most 2 eligible nodes for the `MaxScoreIterator` to choose from. This keeps scheduling fast while producing acceptable results because the results are binpacked. Jobs with a `spread` block (or node affinity) remove this limit in order to produce correct spread scoring. This means that every allocation within a job with a `spread` block is evaluated against _all_ eligible nodes. Operators of large clusters have reported that jobs with `spread` blocks that are eligible on a large number of nodes can take longer than the nack timeout to evaluate (60s). Typical evaluations are processed in milliseconds. In practice, it's not necessary to evaluate every eligible node for every allocation on large clusters, because the `RandomIterator` at the base of the scheduler stack produces enough variation in each pass that the likelihood of an uneven spread is negligible. Note that feasibility is checked before the limit, so this only impacts the number of _eligible_ nodes available for scoring, not the total number of nodes. This changeset sets the iterator limit for "large" `spread` block and node affinity jobs to be equal to the number of desired allocations. This brings an example problematic job evaluation down from ~3min to ~10s. The included tests ensure that we have acceptable spread results across a variety of large cluster topologies.
814 lines
20 KiB
Go
814 lines
20 KiB
Go
package scheduler
|
|
|
|
import (
|
|
"math"
|
|
"math/rand"
|
|
"sort"
|
|
"testing"
|
|
"time"
|
|
|
|
"fmt"
|
|
|
|
"github.com/hashicorp/nomad/helper/uuid"
|
|
"github.com/hashicorp/nomad/nomad/mock"
|
|
"github.com/hashicorp/nomad/nomad/structs"
|
|
"github.com/stretchr/testify/require"
|
|
)
|
|
|
|
func TestSpreadIterator_SingleAttribute(t *testing.T) {
|
|
state, ctx := testContext(t)
|
|
dcs := []string{"dc1", "dc2", "dc1", "dc1"}
|
|
var nodes []*RankedNode
|
|
|
|
// Add these nodes to the state store
|
|
for i, dc := range dcs {
|
|
node := mock.Node()
|
|
node.Datacenter = dc
|
|
if err := state.UpsertNode(structs.MsgTypeTestSetup, uint64(100+i), node); err != nil {
|
|
t.Fatalf("failed to upsert node: %v", err)
|
|
}
|
|
nodes = append(nodes, &RankedNode{Node: node})
|
|
}
|
|
|
|
static := NewStaticRankIterator(ctx, nodes)
|
|
|
|
job := mock.Job()
|
|
tg := job.TaskGroups[0]
|
|
job.TaskGroups[0].Count = 10
|
|
// add allocs to nodes in dc1
|
|
upserting := []*structs.Allocation{
|
|
{
|
|
Namespace: structs.DefaultNamespace,
|
|
TaskGroup: tg.Name,
|
|
JobID: job.ID,
|
|
Job: job,
|
|
ID: uuid.Generate(),
|
|
EvalID: uuid.Generate(),
|
|
NodeID: nodes[0].Node.ID,
|
|
},
|
|
{
|
|
Namespace: structs.DefaultNamespace,
|
|
TaskGroup: tg.Name,
|
|
JobID: job.ID,
|
|
Job: job,
|
|
ID: uuid.Generate(),
|
|
EvalID: uuid.Generate(),
|
|
NodeID: nodes[2].Node.ID,
|
|
},
|
|
}
|
|
|
|
if err := state.UpsertAllocs(structs.MsgTypeTestSetup, 1000, upserting); err != nil {
|
|
t.Fatalf("failed to UpsertAllocs: %v", err)
|
|
}
|
|
|
|
// Create spread target of 80% in dc1
|
|
// Implicitly, this means 20% in dc2
|
|
spread := &structs.Spread{
|
|
Weight: 100,
|
|
Attribute: "${node.datacenter}",
|
|
SpreadTarget: []*structs.SpreadTarget{
|
|
{
|
|
Value: "dc1",
|
|
Percent: 80,
|
|
},
|
|
},
|
|
}
|
|
tg.Spreads = []*structs.Spread{spread}
|
|
spreadIter := NewSpreadIterator(ctx, static)
|
|
spreadIter.SetJob(job)
|
|
spreadIter.SetTaskGroup(tg)
|
|
|
|
scoreNorm := NewScoreNormalizationIterator(ctx, spreadIter)
|
|
|
|
out := collectRanked(scoreNorm)
|
|
|
|
// Expect nodes in dc1 with existing allocs to get a boost
|
|
// Boost should be ((desiredCount-actual)/desired)*spreadWeight
|
|
// For this test, that becomes dc1 = ((8-3)/8 ) = 0.5, and dc2=(2-1)/2
|
|
expectedScores := map[string]float64{
|
|
"dc1": 0.625,
|
|
"dc2": 0.5,
|
|
}
|
|
for _, rn := range out {
|
|
require.Equal(t, expectedScores[rn.Node.Datacenter], rn.FinalScore)
|
|
}
|
|
|
|
// Update the plan to add more allocs to nodes in dc1
|
|
// After this step there are enough allocs to meet the desired count in dc1
|
|
ctx.plan.NodeAllocation[nodes[0].Node.ID] = []*structs.Allocation{
|
|
{
|
|
Namespace: structs.DefaultNamespace,
|
|
TaskGroup: tg.Name,
|
|
JobID: job.ID,
|
|
Job: job,
|
|
ID: uuid.Generate(),
|
|
NodeID: nodes[0].Node.ID,
|
|
},
|
|
{
|
|
Namespace: structs.DefaultNamespace,
|
|
TaskGroup: tg.Name,
|
|
JobID: job.ID,
|
|
Job: job,
|
|
ID: uuid.Generate(),
|
|
NodeID: nodes[0].Node.ID,
|
|
},
|
|
// Should be ignored as it is a different job.
|
|
{
|
|
Namespace: structs.DefaultNamespace,
|
|
TaskGroup: "bbb",
|
|
JobID: "ignore 2",
|
|
Job: job,
|
|
ID: uuid.Generate(),
|
|
NodeID: nodes[0].Node.ID,
|
|
},
|
|
}
|
|
ctx.plan.NodeAllocation[nodes[3].Node.ID] = []*structs.Allocation{
|
|
{
|
|
Namespace: structs.DefaultNamespace,
|
|
TaskGroup: tg.Name,
|
|
JobID: job.ID,
|
|
Job: job,
|
|
ID: uuid.Generate(),
|
|
NodeID: nodes[3].Node.ID,
|
|
},
|
|
{
|
|
Namespace: structs.DefaultNamespace,
|
|
TaskGroup: tg.Name,
|
|
JobID: job.ID,
|
|
Job: job,
|
|
ID: uuid.Generate(),
|
|
NodeID: nodes[3].Node.ID,
|
|
},
|
|
{
|
|
Namespace: structs.DefaultNamespace,
|
|
TaskGroup: tg.Name,
|
|
JobID: job.ID,
|
|
Job: job,
|
|
ID: uuid.Generate(),
|
|
NodeID: nodes[3].Node.ID,
|
|
},
|
|
}
|
|
|
|
// Reset the scores
|
|
for _, node := range nodes {
|
|
node.Scores = nil
|
|
node.FinalScore = 0
|
|
}
|
|
static = NewStaticRankIterator(ctx, nodes)
|
|
spreadIter = NewSpreadIterator(ctx, static)
|
|
spreadIter.SetJob(job)
|
|
spreadIter.SetTaskGroup(tg)
|
|
scoreNorm = NewScoreNormalizationIterator(ctx, spreadIter)
|
|
out = collectRanked(scoreNorm)
|
|
|
|
// Expect nodes in dc2 with existing allocs to get a boost
|
|
// DC1 nodes are not boosted because there are enough allocs to meet
|
|
// the desired count
|
|
expectedScores = map[string]float64{
|
|
"dc1": 0,
|
|
"dc2": 0.5,
|
|
}
|
|
for _, rn := range out {
|
|
require.Equal(t, expectedScores[rn.Node.Datacenter], rn.FinalScore)
|
|
}
|
|
}
|
|
|
|
func TestSpreadIterator_MultipleAttributes(t *testing.T) {
|
|
state, ctx := testContext(t)
|
|
dcs := []string{"dc1", "dc2", "dc1", "dc1"}
|
|
rack := []string{"r1", "r1", "r2", "r2"}
|
|
var nodes []*RankedNode
|
|
|
|
// Add these nodes to the state store
|
|
for i, dc := range dcs {
|
|
node := mock.Node()
|
|
node.Datacenter = dc
|
|
node.Meta["rack"] = rack[i]
|
|
if err := state.UpsertNode(structs.MsgTypeTestSetup, uint64(100+i), node); err != nil {
|
|
t.Fatalf("failed to upsert node: %v", err)
|
|
}
|
|
nodes = append(nodes, &RankedNode{Node: node})
|
|
}
|
|
|
|
static := NewStaticRankIterator(ctx, nodes)
|
|
|
|
job := mock.Job()
|
|
tg := job.TaskGroups[0]
|
|
job.TaskGroups[0].Count = 10
|
|
// add allocs to nodes in dc1
|
|
upserting := []*structs.Allocation{
|
|
{
|
|
Namespace: structs.DefaultNamespace,
|
|
TaskGroup: tg.Name,
|
|
JobID: job.ID,
|
|
Job: job,
|
|
ID: uuid.Generate(),
|
|
EvalID: uuid.Generate(),
|
|
NodeID: nodes[0].Node.ID,
|
|
},
|
|
{
|
|
Namespace: structs.DefaultNamespace,
|
|
TaskGroup: tg.Name,
|
|
JobID: job.ID,
|
|
Job: job,
|
|
ID: uuid.Generate(),
|
|
EvalID: uuid.Generate(),
|
|
NodeID: nodes[2].Node.ID,
|
|
},
|
|
}
|
|
|
|
if err := state.UpsertAllocs(structs.MsgTypeTestSetup, 1000, upserting); err != nil {
|
|
t.Fatalf("failed to UpsertAllocs: %v", err)
|
|
}
|
|
|
|
spread1 := &structs.Spread{
|
|
Weight: 100,
|
|
Attribute: "${node.datacenter}",
|
|
SpreadTarget: []*structs.SpreadTarget{
|
|
{
|
|
Value: "dc1",
|
|
Percent: 60,
|
|
},
|
|
{
|
|
Value: "dc2",
|
|
Percent: 40,
|
|
},
|
|
},
|
|
}
|
|
|
|
spread2 := &structs.Spread{
|
|
Weight: 50,
|
|
Attribute: "${meta.rack}",
|
|
SpreadTarget: []*structs.SpreadTarget{
|
|
{
|
|
Value: "r1",
|
|
Percent: 40,
|
|
},
|
|
{
|
|
Value: "r2",
|
|
Percent: 60,
|
|
},
|
|
},
|
|
}
|
|
|
|
tg.Spreads = []*structs.Spread{spread1, spread2}
|
|
spreadIter := NewSpreadIterator(ctx, static)
|
|
spreadIter.SetJob(job)
|
|
spreadIter.SetTaskGroup(tg)
|
|
|
|
scoreNorm := NewScoreNormalizationIterator(ctx, spreadIter)
|
|
|
|
out := collectRanked(scoreNorm)
|
|
|
|
// Score comes from combining two different spread factors
|
|
// Second node should have the highest score because it has no allocs and its in dc2/r1
|
|
expectedScores := map[string]float64{
|
|
nodes[0].Node.ID: 0.500,
|
|
nodes[1].Node.ID: 0.667,
|
|
nodes[2].Node.ID: 0.556,
|
|
nodes[3].Node.ID: 0.556,
|
|
}
|
|
for _, rn := range out {
|
|
require.Equal(t, fmt.Sprintf("%.3f", expectedScores[rn.Node.ID]), fmt.Sprintf("%.3f", rn.FinalScore))
|
|
}
|
|
|
|
}
|
|
|
|
func TestSpreadIterator_EvenSpread(t *testing.T) {
|
|
state, ctx := testContext(t)
|
|
dcs := []string{"dc1", "dc2", "dc1", "dc2", "dc1", "dc2", "dc2", "dc1", "dc1", "dc1"}
|
|
var nodes []*RankedNode
|
|
|
|
// Add these nodes to the state store
|
|
for i, dc := range dcs {
|
|
node := mock.Node()
|
|
node.Datacenter = dc
|
|
if err := state.UpsertNode(structs.MsgTypeTestSetup, uint64(100+i), node); err != nil {
|
|
t.Fatalf("failed to upsert node: %v", err)
|
|
}
|
|
nodes = append(nodes, &RankedNode{Node: node})
|
|
}
|
|
|
|
static := NewStaticRankIterator(ctx, nodes)
|
|
job := mock.Job()
|
|
tg := job.TaskGroups[0]
|
|
job.TaskGroups[0].Count = 10
|
|
|
|
// Configure even spread across node.datacenter
|
|
spread := &structs.Spread{
|
|
Weight: 100,
|
|
Attribute: "${node.datacenter}",
|
|
}
|
|
tg.Spreads = []*structs.Spread{spread}
|
|
spreadIter := NewSpreadIterator(ctx, static)
|
|
spreadIter.SetJob(job)
|
|
spreadIter.SetTaskGroup(tg)
|
|
|
|
scoreNorm := NewScoreNormalizationIterator(ctx, spreadIter)
|
|
|
|
out := collectRanked(scoreNorm)
|
|
|
|
// Nothing placed so both dc nodes get 0 as the score
|
|
expectedScores := map[string]float64{
|
|
"dc1": 0,
|
|
"dc2": 0,
|
|
}
|
|
for _, rn := range out {
|
|
require.Equal(t, fmt.Sprintf("%.3f", expectedScores[rn.Node.Datacenter]), fmt.Sprintf("%.3f", rn.FinalScore))
|
|
|
|
}
|
|
|
|
// Update the plan to add allocs to nodes in dc1
|
|
// After this step dc2 nodes should get boosted
|
|
ctx.plan.NodeAllocation[nodes[0].Node.ID] = []*structs.Allocation{
|
|
{
|
|
Namespace: structs.DefaultNamespace,
|
|
TaskGroup: tg.Name,
|
|
JobID: job.ID,
|
|
Job: job,
|
|
ID: uuid.Generate(),
|
|
NodeID: nodes[0].Node.ID,
|
|
},
|
|
}
|
|
ctx.plan.NodeAllocation[nodes[2].Node.ID] = []*structs.Allocation{
|
|
{
|
|
Namespace: structs.DefaultNamespace,
|
|
TaskGroup: tg.Name,
|
|
JobID: job.ID,
|
|
Job: job,
|
|
ID: uuid.Generate(),
|
|
NodeID: nodes[2].Node.ID,
|
|
},
|
|
}
|
|
|
|
// Reset the scores
|
|
for _, node := range nodes {
|
|
node.Scores = nil
|
|
node.FinalScore = 0
|
|
}
|
|
static = NewStaticRankIterator(ctx, nodes)
|
|
spreadIter = NewSpreadIterator(ctx, static)
|
|
spreadIter.SetJob(job)
|
|
spreadIter.SetTaskGroup(tg)
|
|
scoreNorm = NewScoreNormalizationIterator(ctx, spreadIter)
|
|
out = collectRanked(scoreNorm)
|
|
|
|
// Expect nodes in dc2 with existing allocs to get a boost
|
|
// dc1 nodes are penalized because they have allocs
|
|
expectedScores = map[string]float64{
|
|
"dc1": -1,
|
|
"dc2": 1,
|
|
}
|
|
for _, rn := range out {
|
|
require.Equal(t, expectedScores[rn.Node.Datacenter], rn.FinalScore)
|
|
}
|
|
|
|
// Update the plan to add more allocs to nodes in dc2
|
|
// After this step dc1 nodes should get boosted
|
|
ctx.plan.NodeAllocation[nodes[1].Node.ID] = []*structs.Allocation{
|
|
{
|
|
Namespace: structs.DefaultNamespace,
|
|
TaskGroup: tg.Name,
|
|
JobID: job.ID,
|
|
Job: job,
|
|
ID: uuid.Generate(),
|
|
NodeID: nodes[1].Node.ID,
|
|
},
|
|
{
|
|
Namespace: structs.DefaultNamespace,
|
|
TaskGroup: tg.Name,
|
|
JobID: job.ID,
|
|
Job: job,
|
|
ID: uuid.Generate(),
|
|
NodeID: nodes[1].Node.ID,
|
|
},
|
|
}
|
|
ctx.plan.NodeAllocation[nodes[3].Node.ID] = []*structs.Allocation{
|
|
{
|
|
Namespace: structs.DefaultNamespace,
|
|
TaskGroup: tg.Name,
|
|
JobID: job.ID,
|
|
Job: job,
|
|
ID: uuid.Generate(),
|
|
NodeID: nodes[3].Node.ID,
|
|
},
|
|
}
|
|
|
|
// Reset the scores
|
|
for _, node := range nodes {
|
|
node.Scores = nil
|
|
node.FinalScore = 0
|
|
}
|
|
static = NewStaticRankIterator(ctx, nodes)
|
|
spreadIter = NewSpreadIterator(ctx, static)
|
|
spreadIter.SetJob(job)
|
|
spreadIter.SetTaskGroup(tg)
|
|
scoreNorm = NewScoreNormalizationIterator(ctx, spreadIter)
|
|
out = collectRanked(scoreNorm)
|
|
|
|
// Expect nodes in dc2 to be penalized because there are 3 allocs there now
|
|
// dc1 nodes are boosted because that has 2 allocs
|
|
expectedScores = map[string]float64{
|
|
"dc1": 0.5,
|
|
"dc2": -0.5,
|
|
}
|
|
for _, rn := range out {
|
|
require.Equal(t, fmt.Sprintf("%3.3f", expectedScores[rn.Node.Datacenter]), fmt.Sprintf("%3.3f", rn.FinalScore))
|
|
}
|
|
|
|
// Add another node in dc3
|
|
node := mock.Node()
|
|
node.Datacenter = "dc3"
|
|
if err := state.UpsertNode(structs.MsgTypeTestSetup, uint64(1111), node); err != nil {
|
|
t.Fatalf("failed to upsert node: %v", err)
|
|
}
|
|
nodes = append(nodes, &RankedNode{Node: node})
|
|
|
|
// Add another alloc to dc1, now its count matches dc2
|
|
ctx.plan.NodeAllocation[nodes[4].Node.ID] = []*structs.Allocation{
|
|
{
|
|
Namespace: structs.DefaultNamespace,
|
|
TaskGroup: tg.Name,
|
|
JobID: job.ID,
|
|
Job: job,
|
|
ID: uuid.Generate(),
|
|
NodeID: nodes[4].Node.ID,
|
|
},
|
|
}
|
|
|
|
// Reset scores
|
|
for _, node := range nodes {
|
|
node.Scores = nil
|
|
node.FinalScore = 0
|
|
}
|
|
static = NewStaticRankIterator(ctx, nodes)
|
|
spreadIter = NewSpreadIterator(ctx, static)
|
|
spreadIter.SetJob(job)
|
|
spreadIter.SetTaskGroup(tg)
|
|
scoreNorm = NewScoreNormalizationIterator(ctx, spreadIter)
|
|
out = collectRanked(scoreNorm)
|
|
|
|
// Expect dc1 and dc2 to be penalized because they have 3 allocs
|
|
// dc3 should get a boost because it has 0 allocs
|
|
expectedScores = map[string]float64{
|
|
"dc1": -1,
|
|
"dc2": -1,
|
|
"dc3": 1,
|
|
}
|
|
for _, rn := range out {
|
|
require.Equal(t, fmt.Sprintf("%.3f", expectedScores[rn.Node.Datacenter]), fmt.Sprintf("%.3f", rn.FinalScore))
|
|
}
|
|
|
|
}
|
|
|
|
// Test scenarios where the spread iterator sets maximum penalty (-1.0)
|
|
func TestSpreadIterator_MaxPenalty(t *testing.T) {
|
|
state, ctx := testContext(t)
|
|
var nodes []*RankedNode
|
|
|
|
// Add nodes in dc3 to the state store
|
|
for i := 0; i < 5; i++ {
|
|
node := mock.Node()
|
|
node.Datacenter = "dc3"
|
|
if err := state.UpsertNode(structs.MsgTypeTestSetup, uint64(100+i), node); err != nil {
|
|
t.Fatalf("failed to upsert node: %v", err)
|
|
}
|
|
nodes = append(nodes, &RankedNode{Node: node})
|
|
}
|
|
|
|
static := NewStaticRankIterator(ctx, nodes)
|
|
|
|
job := mock.Job()
|
|
tg := job.TaskGroups[0]
|
|
job.TaskGroups[0].Count = 5
|
|
|
|
// Create spread target of 80% in dc1
|
|
// and 20% in dc2
|
|
spread := &structs.Spread{
|
|
Weight: 100,
|
|
Attribute: "${node.datacenter}",
|
|
SpreadTarget: []*structs.SpreadTarget{
|
|
{
|
|
Value: "dc1",
|
|
Percent: 80,
|
|
},
|
|
{
|
|
Value: "dc2",
|
|
Percent: 20,
|
|
},
|
|
},
|
|
}
|
|
tg.Spreads = []*structs.Spread{spread}
|
|
spreadIter := NewSpreadIterator(ctx, static)
|
|
spreadIter.SetJob(job)
|
|
spreadIter.SetTaskGroup(tg)
|
|
|
|
scoreNorm := NewScoreNormalizationIterator(ctx, spreadIter)
|
|
|
|
out := collectRanked(scoreNorm)
|
|
|
|
// All nodes are in dc3 so score should be -1
|
|
for _, rn := range out {
|
|
require.Equal(t, -1.0, rn.FinalScore)
|
|
}
|
|
|
|
// Reset scores
|
|
for _, node := range nodes {
|
|
node.Scores = nil
|
|
node.FinalScore = 0
|
|
}
|
|
|
|
// Create spread on attribute that doesn't exist on any nodes
|
|
spread = &structs.Spread{
|
|
Weight: 100,
|
|
Attribute: "${meta.foo}",
|
|
SpreadTarget: []*structs.SpreadTarget{
|
|
{
|
|
Value: "bar",
|
|
Percent: 80,
|
|
},
|
|
{
|
|
Value: "baz",
|
|
Percent: 20,
|
|
},
|
|
},
|
|
}
|
|
|
|
tg.Spreads = []*structs.Spread{spread}
|
|
static = NewStaticRankIterator(ctx, nodes)
|
|
spreadIter = NewSpreadIterator(ctx, static)
|
|
spreadIter.SetJob(job)
|
|
spreadIter.SetTaskGroup(tg)
|
|
scoreNorm = NewScoreNormalizationIterator(ctx, spreadIter)
|
|
out = collectRanked(scoreNorm)
|
|
|
|
// All nodes don't have the spread attribute so score should be -1
|
|
for _, rn := range out {
|
|
require.Equal(t, -1.0, rn.FinalScore)
|
|
}
|
|
|
|
}
|
|
|
|
func Test_evenSpreadScoreBoost(t *testing.T) {
|
|
pset := &propertySet{
|
|
existingValues: map[string]uint64{},
|
|
proposedValues: map[string]uint64{
|
|
"dc2": 1,
|
|
"dc1": 1,
|
|
"dc3": 1,
|
|
},
|
|
clearedValues: map[string]uint64{
|
|
"dc2": 1,
|
|
"dc3": 1,
|
|
},
|
|
targetAttribute: "${node.datacenter}",
|
|
}
|
|
|
|
opt := &structs.Node{
|
|
Datacenter: "dc2",
|
|
}
|
|
boost := evenSpreadScoreBoost(pset, opt)
|
|
require.False(t, math.IsInf(boost, 1))
|
|
require.Equal(t, 1.0, boost)
|
|
}
|
|
|
|
// TestSpreadOnLargeCluster exercises potentially quadratic
|
|
// performance cases with spread scheduling when we have a large
|
|
// number of eligible nodes unless we limit the number that each
|
|
// MaxScore attempt considers. By reducing the total from MaxInt, we
|
|
// can prevent quadratic performance but then we need this test to
|
|
// verify we have satisfactory spread results.
|
|
func TestSpreadOnLargeCluster(t *testing.T) {
|
|
t.Parallel()
|
|
cases := []struct {
|
|
name string
|
|
nodeCount int
|
|
racks map[string]int
|
|
allocs int
|
|
}{
|
|
{
|
|
name: "nodes=10k even racks=100 allocs=500",
|
|
nodeCount: 10000,
|
|
racks: generateEvenRacks(10000, 100),
|
|
allocs: 500,
|
|
},
|
|
{
|
|
name: "nodes=10k even racks=100 allocs=50",
|
|
nodeCount: 10000,
|
|
racks: generateEvenRacks(10000, 100),
|
|
allocs: 50,
|
|
},
|
|
{
|
|
name: "nodes=10k even racks=10 allocs=500",
|
|
nodeCount: 10000,
|
|
racks: generateEvenRacks(10000, 10),
|
|
allocs: 500,
|
|
},
|
|
{
|
|
name: "nodes=10k even racks=10 allocs=50",
|
|
nodeCount: 10000,
|
|
racks: generateEvenRacks(10000, 10),
|
|
allocs: 500,
|
|
},
|
|
{
|
|
name: "nodes=10k small uneven racks allocs=500",
|
|
nodeCount: 10000,
|
|
racks: generateUnevenRacks(t, 10000, 50),
|
|
allocs: 500,
|
|
},
|
|
{
|
|
name: "nodes=10k small uneven racks allocs=50",
|
|
nodeCount: 10000,
|
|
racks: generateUnevenRacks(t, 10000, 50),
|
|
allocs: 500,
|
|
},
|
|
{
|
|
name: "nodes=10k many uneven racks allocs=500",
|
|
nodeCount: 10000,
|
|
racks: generateUnevenRacks(t, 10000, 500),
|
|
allocs: 500,
|
|
},
|
|
{
|
|
name: "nodes=10k many uneven racks allocs=50",
|
|
nodeCount: 10000,
|
|
racks: generateUnevenRacks(t, 10000, 500),
|
|
allocs: 50,
|
|
},
|
|
}
|
|
|
|
for i := range cases {
|
|
tc := cases[i]
|
|
t.Run(tc.name, func(t *testing.T) {
|
|
t.Parallel()
|
|
h := NewHarness(t)
|
|
err := upsertNodes(h, tc.nodeCount, tc.racks)
|
|
require.NoError(t, err)
|
|
job := generateJob(tc.allocs)
|
|
eval, err := upsertJob(h, job)
|
|
require.NoError(t, err)
|
|
|
|
start := time.Now()
|
|
err = h.Process(NewServiceScheduler, eval)
|
|
require.NoError(t, err)
|
|
require.LessOrEqual(t, time.Since(start), time.Duration(60*time.Second),
|
|
"time to evaluate exceeded EvalNackTimeout")
|
|
|
|
require.Len(t, h.Plans, 1)
|
|
require.False(t, h.Plans[0].IsNoOp())
|
|
require.NoError(t, validateEqualSpread(h))
|
|
})
|
|
}
|
|
}
|
|
|
|
// generateUnevenRacks creates a map of rack names to a count of nodes
|
|
// evenly distributed in those racks
|
|
func generateEvenRacks(nodes int, rackCount int) map[string]int {
|
|
racks := map[string]int{}
|
|
for i := 0; i < nodes; i++ {
|
|
racks[fmt.Sprintf("r%d", i%rackCount)]++
|
|
}
|
|
return racks
|
|
}
|
|
|
|
// generateUnevenRacks creates a random map of rack names to a count
|
|
// of nodes in that rack
|
|
func generateUnevenRacks(t *testing.T, nodes int, rackCount int) map[string]int {
|
|
rackNames := []string{}
|
|
for i := 0; i < rackCount; i++ {
|
|
rackNames = append(rackNames, fmt.Sprintf("r%d", i))
|
|
}
|
|
|
|
// print this so that any future test flakes can be more easily
|
|
// reproduced
|
|
seed := time.Now().UnixNano()
|
|
rand.Seed(seed)
|
|
t.Logf("nodes=%d racks=%d seed=%d\n", nodes, rackCount, seed)
|
|
|
|
racks := map[string]int{}
|
|
for i := 0; i < nodes; i++ {
|
|
idx := rand.Intn(len(rackNames))
|
|
racks[rackNames[idx]]++
|
|
}
|
|
return racks
|
|
}
|
|
|
|
// upsertNodes creates a collection of Nodes in the state store,
|
|
// distributed among the racks
|
|
func upsertNodes(h *Harness, count int, racks map[string]int) error {
|
|
|
|
datacenters := []string{"dc-1", "dc-2"}
|
|
rackAssignments := []string{}
|
|
for rack, count := range racks {
|
|
for i := 0; i < count; i++ {
|
|
rackAssignments = append(rackAssignments, rack)
|
|
}
|
|
}
|
|
|
|
for i := 0; i < count; i++ {
|
|
node := mock.Node()
|
|
node.Datacenter = datacenters[i%2]
|
|
node.Meta = map[string]string{}
|
|
node.Meta["rack"] = fmt.Sprintf("r%s", rackAssignments[i])
|
|
node.NodeResources.Cpu.CpuShares = 14000
|
|
node.NodeResources.Memory.MemoryMB = 32000
|
|
err := h.State.UpsertNode(structs.MsgTypeTestSetup, h.NextIndex(), node)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
}
|
|
return nil
|
|
}
|
|
|
|
func generateJob(jobSize int) *structs.Job {
|
|
job := mock.Job()
|
|
job.Datacenters = []string{"dc-1", "dc-2"}
|
|
job.Spreads = []*structs.Spread{{Attribute: "${meta.rack}"}}
|
|
job.Constraints = []*structs.Constraint{}
|
|
job.TaskGroups[0].Count = jobSize
|
|
job.TaskGroups[0].Networks = nil
|
|
job.TaskGroups[0].Services = []*structs.Service{}
|
|
job.TaskGroups[0].Tasks[0].Resources = &structs.Resources{
|
|
CPU: 6000,
|
|
MemoryMB: 6000,
|
|
}
|
|
return job
|
|
}
|
|
|
|
func upsertJob(h *Harness, job *structs.Job) (*structs.Evaluation, error) {
|
|
err := h.State.UpsertJob(structs.MsgTypeTestSetup, h.NextIndex(), job)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
eval := &structs.Evaluation{
|
|
Namespace: structs.DefaultNamespace,
|
|
ID: uuid.Generate(),
|
|
Priority: job.Priority,
|
|
TriggeredBy: structs.EvalTriggerJobRegister,
|
|
JobID: job.ID,
|
|
Status: structs.EvalStatusPending,
|
|
}
|
|
err = h.State.UpsertEvals(structs.MsgTypeTestSetup,
|
|
h.NextIndex(), []*structs.Evaluation{eval})
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
return eval, nil
|
|
}
|
|
|
|
// validateEqualSpread compares the resulting plan to the node
|
|
// metadata to verify that each group of spread targets has an equal
|
|
// distribution.
|
|
func validateEqualSpread(h *Harness) error {
|
|
|
|
iter, err := h.State.Nodes(nil)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
i := 0
|
|
nodesToRacks := map[string]string{}
|
|
racksToAllocCount := map[string]int{}
|
|
for {
|
|
raw := iter.Next()
|
|
if raw == nil {
|
|
break
|
|
}
|
|
node := raw.(*structs.Node)
|
|
rack, ok := node.Meta["rack"]
|
|
if ok {
|
|
nodesToRacks[node.ID] = rack
|
|
racksToAllocCount[rack] = 0
|
|
}
|
|
i++
|
|
}
|
|
|
|
// Collapse the count of allocations per node into a list of
|
|
// counts. The results should be clustered within one of each
|
|
// other.
|
|
for nodeID, nodeAllocs := range h.Plans[0].NodeAllocation {
|
|
racksToAllocCount[nodesToRacks[nodeID]] += len(nodeAllocs)
|
|
}
|
|
countSet := map[int]int{}
|
|
for _, count := range racksToAllocCount {
|
|
countSet[count]++
|
|
}
|
|
|
|
countSlice := []int{}
|
|
for count := range countSet {
|
|
countSlice = append(countSlice, count)
|
|
}
|
|
|
|
switch len(countSlice) {
|
|
case 1:
|
|
return nil
|
|
case 2, 3:
|
|
sort.Ints(countSlice)
|
|
for i := 1; i < len(countSlice); i++ {
|
|
if countSlice[i] != countSlice[i-1]+1 {
|
|
return fmt.Errorf("expected even distributon of allocs to racks, but got:\n%+v", countSet)
|
|
}
|
|
}
|
|
return nil
|
|
}
|
|
return fmt.Errorf("expected even distributon of allocs to racks, but got:\n%+v", countSet)
|
|
}
|