open-nomad/scheduler/spread_test.go

1158 lines
29 KiB
Go

// Copyright (c) HashiCorp, Inc.
// SPDX-License-Identifier: MPL-2.0
package scheduler
import (
"fmt"
"math"
"math/rand"
"sort"
"testing"
"time"
"github.com/shoenig/test"
"github.com/shoenig/test/must"
"github.com/stretchr/testify/require"
"github.com/hashicorp/go-set"
"github.com/hashicorp/nomad/ci"
"github.com/hashicorp/nomad/helper/pointer"
"github.com/hashicorp/nomad/helper/uuid"
"github.com/hashicorp/nomad/nomad/mock"
"github.com/hashicorp/nomad/nomad/structs"
)
func TestSpreadIterator_SingleAttribute(t *testing.T) {
ci.Parallel(t)
state, ctx := testContext(t)
dcs := []string{"dc1", "dc2", "dc1", "dc1"}
var nodes []*RankedNode
// Add these nodes to the state store
for i, dc := range dcs {
node := mock.Node()
node.Datacenter = dc
if err := state.UpsertNode(structs.MsgTypeTestSetup, uint64(100+i), node); err != nil {
t.Fatalf("failed to upsert node: %v", err)
}
nodes = append(nodes, &RankedNode{Node: node})
}
static := NewStaticRankIterator(ctx, nodes)
job := mock.Job()
tg := job.TaskGroups[0]
job.TaskGroups[0].Count = 10
// add allocs to nodes in dc1
upserting := []*structs.Allocation{
{
Namespace: structs.DefaultNamespace,
TaskGroup: tg.Name,
JobID: job.ID,
Job: job,
ID: uuid.Generate(),
EvalID: uuid.Generate(),
NodeID: nodes[0].Node.ID,
},
{
Namespace: structs.DefaultNamespace,
TaskGroup: tg.Name,
JobID: job.ID,
Job: job,
ID: uuid.Generate(),
EvalID: uuid.Generate(),
NodeID: nodes[2].Node.ID,
},
}
if err := state.UpsertAllocs(structs.MsgTypeTestSetup, 1000, upserting); err != nil {
t.Fatalf("failed to UpsertAllocs: %v", err)
}
// Create spread target of 80% in dc1
// Implicitly, this means 20% in dc2
spread := &structs.Spread{
Weight: 100,
Attribute: "${node.datacenter}",
SpreadTarget: []*structs.SpreadTarget{
{
Value: "dc1",
Percent: 80,
},
},
}
tg.Spreads = []*structs.Spread{spread}
spreadIter := NewSpreadIterator(ctx, static)
spreadIter.SetJob(job)
spreadIter.SetTaskGroup(tg)
scoreNorm := NewScoreNormalizationIterator(ctx, spreadIter)
out := collectRanked(scoreNorm)
// Expect nodes in dc1 with existing allocs to get a boost
// Boost should be ((desiredCount-actual)/desired)*spreadWeight
// For this test, that becomes dc1 = ((8-3)/8 ) = 0.5, and dc2=(2-1)/2
expectedScores := map[string]float64{
"dc1": 0.625,
"dc2": 0.5,
}
for _, rn := range out {
require.Equal(t, expectedScores[rn.Node.Datacenter], rn.FinalScore)
}
// Update the plan to add more allocs to nodes in dc1
// After this step there are enough allocs to meet the desired count in dc1
ctx.plan.NodeAllocation[nodes[0].Node.ID] = []*structs.Allocation{
{
Namespace: structs.DefaultNamespace,
TaskGroup: tg.Name,
JobID: job.ID,
Job: job,
ID: uuid.Generate(),
NodeID: nodes[0].Node.ID,
},
{
Namespace: structs.DefaultNamespace,
TaskGroup: tg.Name,
JobID: job.ID,
Job: job,
ID: uuid.Generate(),
NodeID: nodes[0].Node.ID,
},
// Should be ignored as it is a different job.
{
Namespace: structs.DefaultNamespace,
TaskGroup: "bbb",
JobID: "ignore 2",
Job: job,
ID: uuid.Generate(),
NodeID: nodes[0].Node.ID,
},
}
ctx.plan.NodeAllocation[nodes[3].Node.ID] = []*structs.Allocation{
{
Namespace: structs.DefaultNamespace,
TaskGroup: tg.Name,
JobID: job.ID,
Job: job,
ID: uuid.Generate(),
NodeID: nodes[3].Node.ID,
},
{
Namespace: structs.DefaultNamespace,
TaskGroup: tg.Name,
JobID: job.ID,
Job: job,
ID: uuid.Generate(),
NodeID: nodes[3].Node.ID,
},
{
Namespace: structs.DefaultNamespace,
TaskGroup: tg.Name,
JobID: job.ID,
Job: job,
ID: uuid.Generate(),
NodeID: nodes[3].Node.ID,
},
}
// Reset the scores
for _, node := range nodes {
node.Scores = nil
node.FinalScore = 0
}
static = NewStaticRankIterator(ctx, nodes)
spreadIter = NewSpreadIterator(ctx, static)
spreadIter.SetJob(job)
spreadIter.SetTaskGroup(tg)
scoreNorm = NewScoreNormalizationIterator(ctx, spreadIter)
out = collectRanked(scoreNorm)
// Expect nodes in dc2 with existing allocs to get a boost
// DC1 nodes are not boosted because there are enough allocs to meet
// the desired count
expectedScores = map[string]float64{
"dc1": 0,
"dc2": 0.5,
}
for _, rn := range out {
require.Equal(t, expectedScores[rn.Node.Datacenter], rn.FinalScore)
}
}
func TestSpreadIterator_MultipleAttributes(t *testing.T) {
ci.Parallel(t)
state, ctx := testContext(t)
dcs := []string{"dc1", "dc2", "dc1", "dc1"}
rack := []string{"r1", "r1", "r2", "r2"}
var nodes []*RankedNode
// Add these nodes to the state store
for i, dc := range dcs {
node := mock.Node()
node.Datacenter = dc
node.Meta["rack"] = rack[i]
if err := state.UpsertNode(structs.MsgTypeTestSetup, uint64(100+i), node); err != nil {
t.Fatalf("failed to upsert node: %v", err)
}
nodes = append(nodes, &RankedNode{Node: node})
}
static := NewStaticRankIterator(ctx, nodes)
job := mock.Job()
tg := job.TaskGroups[0]
job.TaskGroups[0].Count = 10
// add allocs to nodes in dc1
upserting := []*structs.Allocation{
{
Namespace: structs.DefaultNamespace,
TaskGroup: tg.Name,
JobID: job.ID,
Job: job,
ID: uuid.Generate(),
EvalID: uuid.Generate(),
NodeID: nodes[0].Node.ID,
},
{
Namespace: structs.DefaultNamespace,
TaskGroup: tg.Name,
JobID: job.ID,
Job: job,
ID: uuid.Generate(),
EvalID: uuid.Generate(),
NodeID: nodes[2].Node.ID,
},
}
if err := state.UpsertAllocs(structs.MsgTypeTestSetup, 1000, upserting); err != nil {
t.Fatalf("failed to UpsertAllocs: %v", err)
}
spread1 := &structs.Spread{
Weight: 100,
Attribute: "${node.datacenter}",
SpreadTarget: []*structs.SpreadTarget{
{
Value: "dc1",
Percent: 60,
},
{
Value: "dc2",
Percent: 40,
},
},
}
spread2 := &structs.Spread{
Weight: 50,
Attribute: "${meta.rack}",
SpreadTarget: []*structs.SpreadTarget{
{
Value: "r1",
Percent: 40,
},
{
Value: "r2",
Percent: 60,
},
},
}
tg.Spreads = []*structs.Spread{spread1, spread2}
spreadIter := NewSpreadIterator(ctx, static)
spreadIter.SetJob(job)
spreadIter.SetTaskGroup(tg)
scoreNorm := NewScoreNormalizationIterator(ctx, spreadIter)
out := collectRanked(scoreNorm)
// Score comes from combining two different spread factors
// Second node should have the highest score because it has no allocs and its in dc2/r1
expectedScores := map[string]float64{
nodes[0].Node.ID: 0.500,
nodes[1].Node.ID: 0.667,
nodes[2].Node.ID: 0.556,
nodes[3].Node.ID: 0.556,
}
for _, rn := range out {
require.Equal(t, fmt.Sprintf("%.3f", expectedScores[rn.Node.ID]), fmt.Sprintf("%.3f", rn.FinalScore))
}
}
func TestSpreadIterator_EvenSpread(t *testing.T) {
ci.Parallel(t)
state, ctx := testContext(t)
dcs := []string{"dc1", "dc2", "dc1", "dc2", "dc1", "dc2", "dc2", "dc1", "dc1", "dc1"}
var nodes []*RankedNode
// Add these nodes to the state store
for i, dc := range dcs {
node := mock.Node()
node.Datacenter = dc
if err := state.UpsertNode(structs.MsgTypeTestSetup, uint64(100+i), node); err != nil {
t.Fatalf("failed to upsert node: %v", err)
}
nodes = append(nodes, &RankedNode{Node: node})
}
static := NewStaticRankIterator(ctx, nodes)
job := mock.Job()
tg := job.TaskGroups[0]
job.TaskGroups[0].Count = 10
// Configure even spread across node.datacenter
spread := &structs.Spread{
Weight: 100,
Attribute: "${node.datacenter}",
}
tg.Spreads = []*structs.Spread{spread}
spreadIter := NewSpreadIterator(ctx, static)
spreadIter.SetJob(job)
spreadIter.SetTaskGroup(tg)
scoreNorm := NewScoreNormalizationIterator(ctx, spreadIter)
out := collectRanked(scoreNorm)
// Nothing placed so both dc nodes get 0 as the score
expectedScores := map[string]float64{
"dc1": 0,
"dc2": 0,
}
for _, rn := range out {
require.Equal(t, fmt.Sprintf("%.3f", expectedScores[rn.Node.Datacenter]), fmt.Sprintf("%.3f", rn.FinalScore))
}
// Update the plan to add allocs to nodes in dc1
// After this step dc2 nodes should get boosted
ctx.plan.NodeAllocation[nodes[0].Node.ID] = []*structs.Allocation{
{
Namespace: structs.DefaultNamespace,
TaskGroup: tg.Name,
JobID: job.ID,
Job: job,
ID: uuid.Generate(),
NodeID: nodes[0].Node.ID,
},
}
ctx.plan.NodeAllocation[nodes[2].Node.ID] = []*structs.Allocation{
{
Namespace: structs.DefaultNamespace,
TaskGroup: tg.Name,
JobID: job.ID,
Job: job,
ID: uuid.Generate(),
NodeID: nodes[2].Node.ID,
},
}
// Reset the scores
for _, node := range nodes {
node.Scores = nil
node.FinalScore = 0
}
static = NewStaticRankIterator(ctx, nodes)
spreadIter = NewSpreadIterator(ctx, static)
spreadIter.SetJob(job)
spreadIter.SetTaskGroup(tg)
scoreNorm = NewScoreNormalizationIterator(ctx, spreadIter)
out = collectRanked(scoreNorm)
// Expect nodes in dc2 with existing allocs to get a boost
// dc1 nodes are penalized because they have allocs
expectedScores = map[string]float64{
"dc1": -1,
"dc2": 1,
}
for _, rn := range out {
require.Equal(t, expectedScores[rn.Node.Datacenter], rn.FinalScore)
}
// Update the plan to add more allocs to nodes in dc2
// After this step dc1 nodes should get boosted
ctx.plan.NodeAllocation[nodes[1].Node.ID] = []*structs.Allocation{
{
Namespace: structs.DefaultNamespace,
TaskGroup: tg.Name,
JobID: job.ID,
Job: job,
ID: uuid.Generate(),
NodeID: nodes[1].Node.ID,
},
{
Namespace: structs.DefaultNamespace,
TaskGroup: tg.Name,
JobID: job.ID,
Job: job,
ID: uuid.Generate(),
NodeID: nodes[1].Node.ID,
},
}
ctx.plan.NodeAllocation[nodes[3].Node.ID] = []*structs.Allocation{
{
Namespace: structs.DefaultNamespace,
TaskGroup: tg.Name,
JobID: job.ID,
Job: job,
ID: uuid.Generate(),
NodeID: nodes[3].Node.ID,
},
}
// Reset the scores
for _, node := range nodes {
node.Scores = nil
node.FinalScore = 0
}
static = NewStaticRankIterator(ctx, nodes)
spreadIter = NewSpreadIterator(ctx, static)
spreadIter.SetJob(job)
spreadIter.SetTaskGroup(tg)
scoreNorm = NewScoreNormalizationIterator(ctx, spreadIter)
out = collectRanked(scoreNorm)
// Expect nodes in dc2 to be penalized because there are 3 allocs there now
// dc1 nodes are boosted because that has 2 allocs
expectedScores = map[string]float64{
"dc1": 0.5,
"dc2": -0.5,
}
for _, rn := range out {
require.Equal(t, fmt.Sprintf("%3.3f", expectedScores[rn.Node.Datacenter]), fmt.Sprintf("%3.3f", rn.FinalScore))
}
// Add another node in dc3
node := mock.Node()
node.Datacenter = "dc3"
if err := state.UpsertNode(structs.MsgTypeTestSetup, uint64(1111), node); err != nil {
t.Fatalf("failed to upsert node: %v", err)
}
nodes = append(nodes, &RankedNode{Node: node})
// Add another alloc to dc1, now its count matches dc2
ctx.plan.NodeAllocation[nodes[4].Node.ID] = []*structs.Allocation{
{
Namespace: structs.DefaultNamespace,
TaskGroup: tg.Name,
JobID: job.ID,
Job: job,
ID: uuid.Generate(),
NodeID: nodes[4].Node.ID,
},
}
// Reset scores
for _, node := range nodes {
node.Scores = nil
node.FinalScore = 0
}
static = NewStaticRankIterator(ctx, nodes)
spreadIter = NewSpreadIterator(ctx, static)
spreadIter.SetJob(job)
spreadIter.SetTaskGroup(tg)
scoreNorm = NewScoreNormalizationIterator(ctx, spreadIter)
out = collectRanked(scoreNorm)
// Expect dc1 and dc2 to be penalized because they have 3 allocs
// dc3 should get a boost because it has 0 allocs
expectedScores = map[string]float64{
"dc1": -1,
"dc2": -1,
"dc3": 1,
}
for _, rn := range out {
require.Equal(t, fmt.Sprintf("%.3f", expectedScores[rn.Node.Datacenter]), fmt.Sprintf("%.3f", rn.FinalScore))
}
}
// Test scenarios where the spread iterator sets maximum penalty (-1.0)
func TestSpreadIterator_MaxPenalty(t *testing.T) {
ci.Parallel(t)
state, ctx := testContext(t)
var nodes []*RankedNode
// Add nodes in dc3 to the state store
for i := 0; i < 5; i++ {
node := mock.Node()
node.Datacenter = "dc3"
if err := state.UpsertNode(structs.MsgTypeTestSetup, uint64(100+i), node); err != nil {
t.Fatalf("failed to upsert node: %v", err)
}
nodes = append(nodes, &RankedNode{Node: node})
}
static := NewStaticRankIterator(ctx, nodes)
job := mock.Job()
tg := job.TaskGroups[0]
job.TaskGroups[0].Count = 5
// Create spread target of 80% in dc1
// and 20% in dc2
spread := &structs.Spread{
Weight: 100,
Attribute: "${node.datacenter}",
SpreadTarget: []*structs.SpreadTarget{
{
Value: "dc1",
Percent: 80,
},
{
Value: "dc2",
Percent: 20,
},
},
}
tg.Spreads = []*structs.Spread{spread}
spreadIter := NewSpreadIterator(ctx, static)
spreadIter.SetJob(job)
spreadIter.SetTaskGroup(tg)
scoreNorm := NewScoreNormalizationIterator(ctx, spreadIter)
out := collectRanked(scoreNorm)
// All nodes are in dc3 so score should be -1
for _, rn := range out {
require.Equal(t, -1.0, rn.FinalScore)
}
// Reset scores
for _, node := range nodes {
node.Scores = nil
node.FinalScore = 0
}
// Create spread on attribute that doesn't exist on any nodes
spread = &structs.Spread{
Weight: 100,
Attribute: "${meta.foo}",
SpreadTarget: []*structs.SpreadTarget{
{
Value: "bar",
Percent: 80,
},
{
Value: "baz",
Percent: 20,
},
},
}
tg.Spreads = []*structs.Spread{spread}
static = NewStaticRankIterator(ctx, nodes)
spreadIter = NewSpreadIterator(ctx, static)
spreadIter.SetJob(job)
spreadIter.SetTaskGroup(tg)
scoreNorm = NewScoreNormalizationIterator(ctx, spreadIter)
out = collectRanked(scoreNorm)
// All nodes don't have the spread attribute so score should be -1
for _, rn := range out {
require.Equal(t, -1.0, rn.FinalScore)
}
}
func TestSpreadIterator_NoInfinity(t *testing.T) {
ci.Parallel(t)
store, ctx := testContext(t)
var nodes []*RankedNode
// Add 3 nodes in different DCs to the state store
for i := 1; i < 4; i++ {
node := mock.Node()
node.Datacenter = fmt.Sprintf("dc%d", i)
must.NoError(t, store.UpsertNode(structs.MsgTypeTestSetup, uint64(100+i), node))
nodes = append(nodes, &RankedNode{Node: node})
}
static := NewStaticRankIterator(ctx, nodes)
job := mock.Job()
tg := job.TaskGroups[0]
job.TaskGroups[0].Count = 8
// Create spread target of 50% in dc1, 50% in dc2, and 0% in the implicit target
spread := &structs.Spread{
Weight: 100,
Attribute: "${node.datacenter}",
SpreadTarget: []*structs.SpreadTarget{
{
Value: "dc1",
Percent: 50,
},
{
Value: "dc2",
Percent: 50,
},
{
Value: "*",
Percent: 0,
},
},
}
tg.Spreads = []*structs.Spread{spread}
spreadIter := NewSpreadIterator(ctx, static)
spreadIter.SetJob(job)
spreadIter.SetTaskGroup(tg)
scoreNorm := NewScoreNormalizationIterator(ctx, spreadIter)
out := collectRanked(scoreNorm)
// Scores should be even between dc1 and dc2 nodes, without an -Inf on dc3
must.Len(t, 3, out)
test.Eq(t, 0.75, out[0].FinalScore)
test.Eq(t, 0.75, out[1].FinalScore)
test.Eq(t, -1, out[2].FinalScore)
// Reset scores
for _, node := range nodes {
node.Scores = nil
node.FinalScore = 0
}
// Create very unbalanced spread target to force large negative scores
spread = &structs.Spread{
Weight: 100,
Attribute: "${node.datacenter}",
SpreadTarget: []*structs.SpreadTarget{
{
Value: "dc1",
Percent: 99,
},
{
Value: "dc2",
Percent: 1,
},
{
Value: "*",
Percent: 0,
},
},
}
tg.Spreads = []*structs.Spread{spread}
static = NewStaticRankIterator(ctx, nodes)
spreadIter = NewSpreadIterator(ctx, static)
spreadIter.SetJob(job)
spreadIter.SetTaskGroup(tg)
scoreNorm = NewScoreNormalizationIterator(ctx, spreadIter)
out = collectRanked(scoreNorm)
// Scores should heavily favor dc1, with an -Inf on dc3
must.Len(t, 3, out)
desired := 8 * 0.99 // 8 allocs * 99%
test.Eq(t, (desired-1)/desired, out[0].FinalScore)
test.Eq(t, -11.5, out[1].FinalScore)
test.LessEq(t, out[1].FinalScore, out[2].FinalScore,
test.Sprintf("expected implicit dc3 to be <= dc2"))
}
func Test_evenSpreadScoreBoost(t *testing.T) {
ci.Parallel(t)
pset := &propertySet{
existingValues: map[string]uint64{},
proposedValues: map[string]uint64{
"dc2": 1,
"dc1": 1,
"dc3": 1,
},
clearedValues: map[string]uint64{
"dc2": 1,
"dc3": 1,
},
targetAttribute: "${node.datacenter}",
targetValues: &set.Set[string]{},
}
opt := &structs.Node{
Datacenter: "dc2",
}
boost := evenSpreadScoreBoost(pset, opt)
require.False(t, math.IsInf(boost, 1))
require.Equal(t, 1.0, boost)
}
// TestSpreadOnLargeCluster exercises potentially quadratic
// performance cases with spread scheduling when we have a large
// number of eligible nodes unless we limit the number that each
// MaxScore attempt considers. By reducing the total from MaxInt, we
// can prevent quadratic performance but then we need this test to
// verify we have satisfactory spread results.
func TestSpreadOnLargeCluster(t *testing.T) {
ci.Parallel(t)
cases := []struct {
name string
nodeCount int
racks map[string]int
allocs int
}{
{
name: "nodes=10k even racks=100 allocs=500",
nodeCount: 10000,
racks: generateEvenRacks(10000, 100),
allocs: 500,
},
{
name: "nodes=10k even racks=100 allocs=50",
nodeCount: 10000,
racks: generateEvenRacks(10000, 100),
allocs: 50,
},
{
name: "nodes=10k even racks=10 allocs=500",
nodeCount: 10000,
racks: generateEvenRacks(10000, 10),
allocs: 500,
},
{
name: "nodes=10k even racks=10 allocs=50",
nodeCount: 10000,
racks: generateEvenRacks(10000, 10),
allocs: 500,
},
{
name: "nodes=10k small uneven racks allocs=500",
nodeCount: 10000,
racks: generateUnevenRacks(t, 10000, 50),
allocs: 500,
},
{
name: "nodes=10k small uneven racks allocs=50",
nodeCount: 10000,
racks: generateUnevenRacks(t, 10000, 50),
allocs: 500,
},
{
name: "nodes=10k many uneven racks allocs=500",
nodeCount: 10000,
racks: generateUnevenRacks(t, 10000, 500),
allocs: 500,
},
{
name: "nodes=10k many uneven racks allocs=50",
nodeCount: 10000,
racks: generateUnevenRacks(t, 10000, 500),
allocs: 50,
},
}
for i := range cases {
tc := cases[i]
t.Run(tc.name, func(t *testing.T) {
ci.Parallel(t)
h := NewHarness(t)
err := upsertNodes(h, tc.nodeCount, tc.racks)
require.NoError(t, err)
job := generateJob(tc.allocs)
eval, err := upsertJob(h, job)
require.NoError(t, err)
start := time.Now()
err = h.Process(NewServiceScheduler, eval)
require.NoError(t, err)
require.LessOrEqual(t, time.Since(start), time.Duration(60*time.Second),
"time to evaluate exceeded EvalNackTimeout")
require.Len(t, h.Plans, 1)
require.False(t, h.Plans[0].IsNoOp())
require.NoError(t, validateEqualSpread(h))
})
}
}
// generateUnevenRacks creates a map of rack names to a count of nodes
// evenly distributed in those racks
func generateEvenRacks(nodes int, rackCount int) map[string]int {
racks := map[string]int{}
for i := 0; i < nodes; i++ {
racks[fmt.Sprintf("r%d", i%rackCount)]++
}
return racks
}
// generateUnevenRacks creates a random map of rack names to a count
// of nodes in that rack
func generateUnevenRacks(t *testing.T, nodes int, rackCount int) map[string]int {
rackNames := []string{}
for i := 0; i < rackCount; i++ {
rackNames = append(rackNames, fmt.Sprintf("r%d", i))
}
// print this so that any future test flakes can be more easily
// reproduced
seed := time.Now().Unix()
random := rand.NewSource(seed)
t.Logf("nodes=%d racks=%d seed=%d\n", nodes, rackCount, seed)
racks := map[string]int{}
for i := 0; i < nodes; i++ {
idx := int(random.Int63()) % len(rackNames)
racks[rackNames[idx]]++
}
return racks
}
// upsertNodes creates a collection of Nodes in the state store,
// distributed among the racks
func upsertNodes(h *Harness, count int, racks map[string]int) error {
datacenters := []string{"dc-1", "dc-2"}
rackAssignments := []string{}
for rack, count := range racks {
for i := 0; i < count; i++ {
rackAssignments = append(rackAssignments, rack)
}
}
for i := 0; i < count; i++ {
node := mock.Node()
node.Datacenter = datacenters[i%2]
node.Meta = map[string]string{}
node.Meta["rack"] = fmt.Sprintf("r%s", rackAssignments[i])
node.NodeResources.Cpu.CpuShares = 14000
node.NodeResources.Memory.MemoryMB = 32000
err := h.State.UpsertNode(structs.MsgTypeTestSetup, h.NextIndex(), node)
if err != nil {
return err
}
}
return nil
}
func generateJob(jobSize int) *structs.Job {
job := mock.Job()
job.Datacenters = []string{"dc-1", "dc-2"}
job.Spreads = []*structs.Spread{{Attribute: "${meta.rack}"}}
job.Constraints = []*structs.Constraint{}
job.TaskGroups[0].Count = jobSize
job.TaskGroups[0].Networks = nil
job.TaskGroups[0].Services = []*structs.Service{}
job.TaskGroups[0].Tasks[0].Resources = &structs.Resources{
CPU: 6000,
MemoryMB: 6000,
}
return job
}
func upsertJob(h *Harness, job *structs.Job) (*structs.Evaluation, error) {
err := h.State.UpsertJob(structs.MsgTypeTestSetup, h.NextIndex(), nil, job)
if err != nil {
return nil, err
}
eval := &structs.Evaluation{
Namespace: structs.DefaultNamespace,
ID: uuid.Generate(),
Priority: job.Priority,
TriggeredBy: structs.EvalTriggerJobRegister,
JobID: job.ID,
Status: structs.EvalStatusPending,
}
err = h.State.UpsertEvals(structs.MsgTypeTestSetup,
h.NextIndex(), []*structs.Evaluation{eval})
if err != nil {
return nil, err
}
return eval, nil
}
// validateEqualSpread compares the resulting plan to the node
// metadata to verify that each group of spread targets has an equal
// distribution.
func validateEqualSpread(h *Harness) error {
iter, err := h.State.Nodes(nil)
if err != nil {
return err
}
i := 0
nodesToRacks := map[string]string{}
racksToAllocCount := map[string]int{}
for {
raw := iter.Next()
if raw == nil {
break
}
node := raw.(*structs.Node)
rack, ok := node.Meta["rack"]
if ok {
nodesToRacks[node.ID] = rack
racksToAllocCount[rack] = 0
}
i++
}
// Collapse the count of allocations per node into a list of
// counts. The results should be clustered within one of each
// other.
for nodeID, nodeAllocs := range h.Plans[0].NodeAllocation {
racksToAllocCount[nodesToRacks[nodeID]] += len(nodeAllocs)
}
countSet := map[int]int{}
for _, count := range racksToAllocCount {
countSet[count]++
}
countSlice := []int{}
for count := range countSet {
countSlice = append(countSlice, count)
}
switch len(countSlice) {
case 1:
return nil
case 2, 3:
sort.Ints(countSlice)
for i := 1; i < len(countSlice); i++ {
if countSlice[i] != countSlice[i-1]+1 {
return fmt.Errorf("expected even distributon of allocs to racks, but got:\n%+v", countSet)
}
}
return nil
}
return fmt.Errorf("expected even distributon of allocs to racks, but got:\n%+v", countSet)
}
func TestSpreadPanicDowngrade(t *testing.T) {
ci.Parallel(t)
h := NewHarness(t)
nodes := []*structs.Node{}
for i := 0; i < 5; i++ {
node := mock.Node()
nodes = append(nodes, node)
err := h.State.UpsertNode(structs.MsgTypeTestSetup,
h.NextIndex(), node)
require.NoError(t, err)
}
// job version 1
// max_parallel = 0, canary = 1, spread != nil, 1 failed alloc
job1 := mock.Job()
job1.Spreads = []*structs.Spread{
{
Attribute: "${node.unique.name}",
Weight: 50,
SpreadTarget: []*structs.SpreadTarget{},
},
}
job1.Update = structs.UpdateStrategy{
Stagger: time.Duration(30 * time.Second),
MaxParallel: 0,
}
job1.Status = structs.JobStatusRunning
job1.TaskGroups[0].Count = 4
job1.TaskGroups[0].Update = &structs.UpdateStrategy{
Stagger: time.Duration(30 * time.Second),
MaxParallel: 1,
HealthCheck: "checks",
MinHealthyTime: time.Duration(30 * time.Second),
HealthyDeadline: time.Duration(9 * time.Minute),
ProgressDeadline: time.Duration(10 * time.Minute),
AutoRevert: true,
Canary: 1,
}
job1.Version = 1
job1.TaskGroups[0].Count = 5
err := h.State.UpsertJob(structs.MsgTypeTestSetup, h.NextIndex(), nil, job1)
require.NoError(t, err)
allocs := []*structs.Allocation{}
for i := 0; i < 4; i++ {
alloc := mock.Alloc()
alloc.Job = job1
alloc.JobID = job1.ID
alloc.NodeID = nodes[i].ID
alloc.DeploymentStatus = &structs.AllocDeploymentStatus{
Healthy: pointer.Of(true),
Timestamp: time.Now(),
Canary: false,
ModifyIndex: h.NextIndex(),
}
if i == 0 {
alloc.DeploymentStatus.Canary = true
}
if i == 1 {
alloc.ClientStatus = structs.AllocClientStatusFailed
}
allocs = append(allocs, alloc)
}
err = h.State.UpsertAllocs(structs.MsgTypeTestSetup, h.NextIndex(), allocs)
require.NoError(t, err)
// job version 2
// max_parallel = 0, canary = 1, spread == nil
job2 := job1.Copy()
job2.Version = 2
job2.Spreads = nil
err = h.State.UpsertJob(structs.MsgTypeTestSetup, h.NextIndex(), nil, job2)
require.NoError(t, err)
eval := &structs.Evaluation{
Namespace: job2.Namespace,
ID: uuid.Generate(),
Priority: job2.Priority,
TriggeredBy: structs.EvalTriggerJobRegister,
JobID: job2.ID,
Status: structs.EvalStatusPending,
}
err = h.State.UpsertEvals(structs.MsgTypeTestSetup,
h.NextIndex(), []*structs.Evaluation{eval})
require.NoError(t, err)
processErr := h.Process(NewServiceScheduler, eval)
require.NoError(t, processErr, "failed to process eval")
require.Len(t, h.Plans, 1)
}
func TestSpread_ImplicitTargets(t *testing.T) {
dcs := []string{"dc1", "dc2", "dc3"}
setupNodes := func(h *Harness) map[string]string {
nodesToDcs := map[string]string{}
var nodes []*RankedNode
for i, dc := range dcs {
for n := 0; n < 4; n++ {
node := mock.Node()
node.Datacenter = dc
must.NoError(t, h.State.UpsertNode(
structs.MsgTypeTestSetup, uint64(100+i), node))
nodes = append(nodes, &RankedNode{Node: node})
nodesToDcs[node.ID] = node.Datacenter
}
}
return nodesToDcs
}
setupJob := func(h *Harness, testCaseSpread *structs.Spread) *structs.Evaluation {
job := mock.MinJob()
job.Datacenters = dcs
job.TaskGroups[0].Count = 12
job.TaskGroups[0].Spreads = []*structs.Spread{testCaseSpread}
must.NoError(t, h.State.UpsertJob(
structs.MsgTypeTestSetup, h.NextIndex(), nil, job))
eval := &structs.Evaluation{
Namespace: structs.DefaultNamespace,
ID: uuid.Generate(),
Priority: job.Priority,
TriggeredBy: structs.EvalTriggerJobRegister,
JobID: job.ID,
Status: structs.EvalStatusPending,
}
must.NoError(t, h.State.UpsertEvals(structs.MsgTypeTestSetup,
h.NextIndex(), []*structs.Evaluation{eval}))
return eval
}
testCases := []struct {
name string
spread *structs.Spread
expect map[string]int
}{
{
name: "empty implicit target",
spread: &structs.Spread{
Weight: 100,
Attribute: "${node.datacenter}",
SpreadTarget: []*structs.SpreadTarget{
{
Value: "dc1",
Percent: 50,
},
},
},
expect: map[string]int{"dc1": 6},
},
{
name: "wildcard implicit target",
spread: &structs.Spread{
Weight: 100,
Attribute: "${node.datacenter}",
SpreadTarget: []*structs.SpreadTarget{
{
Value: "dc1",
Percent: 50,
},
{
Value: "*",
Percent: 50,
},
},
},
expect: map[string]int{"dc1": 6},
},
{
name: "explicit targets",
spread: &structs.Spread{
Weight: 100,
Attribute: "${node.datacenter}",
SpreadTarget: []*structs.SpreadTarget{
{
Value: "dc1",
Percent: 50,
},
{
Value: "dc2",
Percent: 25,
},
{
Value: "dc3",
Percent: 25,
},
},
},
expect: map[string]int{"dc1": 6, "dc2": 3, "dc3": 3},
},
}
for _, tc := range testCases {
t.Run(tc.name, func(t *testing.T) {
h := NewHarness(t)
nodesToDcs := setupNodes(h)
eval := setupJob(h, tc.spread)
must.NoError(t, h.Process(NewServiceScheduler, eval))
must.Len(t, 1, h.Plans)
plan := h.Plans[0]
must.False(t, plan.IsNoOp())
dcCounts := map[string]int{}
for node, allocs := range plan.NodeAllocation {
dcCounts[nodesToDcs[node]] += len(allocs)
}
for dc, expectVal := range tc.expect {
// not using test.MapEqual here because we have incomplete
// expectations for the implicit DCs on some tests.
test.Eq(t, expectVal, dcCounts[dc],
test.Sprintf("expected %d in %q", expectVal, dc))
}
})
}
}