open-nomad/scheduler/spread_test.go

571 lines
14 KiB
Go
Raw Normal View History

package scheduler
import (
"math"
"testing"
"fmt"
"github.com/hashicorp/nomad/helper/uuid"
"github.com/hashicorp/nomad/nomad/mock"
"github.com/hashicorp/nomad/nomad/structs"
"github.com/stretchr/testify/require"
)
func TestSpreadIterator_SingleAttribute(t *testing.T) {
state, ctx := testContext(t)
dcs := []string{"dc1", "dc2", "dc1", "dc1"}
var nodes []*RankedNode
// Add these nodes to the state store
for i, dc := range dcs {
node := mock.Node()
node.Datacenter = dc
if err := state.UpsertNode(structs.MsgTypeTestSetup, uint64(100+i), node); err != nil {
t.Fatalf("failed to upsert node: %v", err)
}
nodes = append(nodes, &RankedNode{Node: node})
}
static := NewStaticRankIterator(ctx, nodes)
job := mock.Job()
tg := job.TaskGroups[0]
job.TaskGroups[0].Count = 10
// add allocs to nodes in dc1
upserting := []*structs.Allocation{
{
Namespace: structs.DefaultNamespace,
TaskGroup: tg.Name,
JobID: job.ID,
Job: job,
ID: uuid.Generate(),
EvalID: uuid.Generate(),
NodeID: nodes[0].Node.ID,
},
{
Namespace: structs.DefaultNamespace,
TaskGroup: tg.Name,
JobID: job.ID,
Job: job,
ID: uuid.Generate(),
EvalID: uuid.Generate(),
NodeID: nodes[2].Node.ID,
},
}
if err := state.UpsertAllocs(structs.MsgTypeTestSetup, 1000, upserting); err != nil {
t.Fatalf("failed to UpsertAllocs: %v", err)
}
// Create spread target of 80% in dc1
// Implicitly, this means 20% in dc2
spread := &structs.Spread{
Weight: 100,
Attribute: "${node.datacenter}",
SpreadTarget: []*structs.SpreadTarget{
{
Value: "dc1",
Percent: 80,
},
},
}
tg.Spreads = []*structs.Spread{spread}
spreadIter := NewSpreadIterator(ctx, static)
spreadIter.SetJob(job)
spreadIter.SetTaskGroup(tg)
scoreNorm := NewScoreNormalizationIterator(ctx, spreadIter)
out := collectRanked(scoreNorm)
// Expect nodes in dc1 with existing allocs to get a boost
// Boost should be ((desiredCount-actual)/desired)*spreadWeight
// For this test, that becomes dc1 = ((8-3)/8 ) = 0.5, and dc2=(2-1)/2
expectedScores := map[string]float64{
"dc1": 0.625,
"dc2": 0.5,
}
for _, rn := range out {
require.Equal(t, expectedScores[rn.Node.Datacenter], rn.FinalScore)
}
// Update the plan to add more allocs to nodes in dc1
// After this step there are enough allocs to meet the desired count in dc1
ctx.plan.NodeAllocation[nodes[0].Node.ID] = []*structs.Allocation{
{
Namespace: structs.DefaultNamespace,
TaskGroup: tg.Name,
JobID: job.ID,
Job: job,
ID: uuid.Generate(),
NodeID: nodes[0].Node.ID,
},
{
Namespace: structs.DefaultNamespace,
TaskGroup: tg.Name,
JobID: job.ID,
Job: job,
ID: uuid.Generate(),
NodeID: nodes[0].Node.ID,
},
// Should be ignored as it is a different job.
{
Namespace: structs.DefaultNamespace,
TaskGroup: "bbb",
JobID: "ignore 2",
Job: job,
ID: uuid.Generate(),
NodeID: nodes[0].Node.ID,
},
}
ctx.plan.NodeAllocation[nodes[3].Node.ID] = []*structs.Allocation{
{
Namespace: structs.DefaultNamespace,
TaskGroup: tg.Name,
JobID: job.ID,
Job: job,
ID: uuid.Generate(),
NodeID: nodes[3].Node.ID,
},
{
Namespace: structs.DefaultNamespace,
TaskGroup: tg.Name,
JobID: job.ID,
Job: job,
ID: uuid.Generate(),
NodeID: nodes[3].Node.ID,
},
{
Namespace: structs.DefaultNamespace,
TaskGroup: tg.Name,
JobID: job.ID,
Job: job,
ID: uuid.Generate(),
NodeID: nodes[3].Node.ID,
},
}
// Reset the scores
for _, node := range nodes {
node.Scores = nil
node.FinalScore = 0
}
static = NewStaticRankIterator(ctx, nodes)
spreadIter = NewSpreadIterator(ctx, static)
spreadIter.SetJob(job)
spreadIter.SetTaskGroup(tg)
scoreNorm = NewScoreNormalizationIterator(ctx, spreadIter)
out = collectRanked(scoreNorm)
// Expect nodes in dc2 with existing allocs to get a boost
// DC1 nodes are not boosted because there are enough allocs to meet
// the desired count
expectedScores = map[string]float64{
"dc1": 0,
"dc2": 0.5,
}
for _, rn := range out {
require.Equal(t, expectedScores[rn.Node.Datacenter], rn.FinalScore)
}
}
func TestSpreadIterator_MultipleAttributes(t *testing.T) {
state, ctx := testContext(t)
dcs := []string{"dc1", "dc2", "dc1", "dc1"}
rack := []string{"r1", "r1", "r2", "r2"}
var nodes []*RankedNode
// Add these nodes to the state store
for i, dc := range dcs {
node := mock.Node()
node.Datacenter = dc
node.Meta["rack"] = rack[i]
if err := state.UpsertNode(structs.MsgTypeTestSetup, uint64(100+i), node); err != nil {
t.Fatalf("failed to upsert node: %v", err)
}
nodes = append(nodes, &RankedNode{Node: node})
}
static := NewStaticRankIterator(ctx, nodes)
job := mock.Job()
tg := job.TaskGroups[0]
job.TaskGroups[0].Count = 10
// add allocs to nodes in dc1
upserting := []*structs.Allocation{
{
Namespace: structs.DefaultNamespace,
TaskGroup: tg.Name,
JobID: job.ID,
Job: job,
ID: uuid.Generate(),
EvalID: uuid.Generate(),
NodeID: nodes[0].Node.ID,
},
{
Namespace: structs.DefaultNamespace,
TaskGroup: tg.Name,
JobID: job.ID,
Job: job,
ID: uuid.Generate(),
EvalID: uuid.Generate(),
NodeID: nodes[2].Node.ID,
},
}
if err := state.UpsertAllocs(structs.MsgTypeTestSetup, 1000, upserting); err != nil {
t.Fatalf("failed to UpsertAllocs: %v", err)
}
spread1 := &structs.Spread{
Weight: 100,
Attribute: "${node.datacenter}",
SpreadTarget: []*structs.SpreadTarget{
{
Value: "dc1",
Percent: 60,
},
{
Value: "dc2",
Percent: 40,
},
},
}
spread2 := &structs.Spread{
Weight: 50,
Attribute: "${meta.rack}",
SpreadTarget: []*structs.SpreadTarget{
{
Value: "r1",
Percent: 40,
},
{
Value: "r2",
Percent: 60,
},
},
}
tg.Spreads = []*structs.Spread{spread1, spread2}
spreadIter := NewSpreadIterator(ctx, static)
spreadIter.SetJob(job)
spreadIter.SetTaskGroup(tg)
scoreNorm := NewScoreNormalizationIterator(ctx, spreadIter)
out := collectRanked(scoreNorm)
// Score comes from combining two different spread factors
// Second node should have the highest score because it has no allocs and its in dc2/r1
expectedScores := map[string]float64{
nodes[0].Node.ID: 0.500,
nodes[1].Node.ID: 0.667,
nodes[2].Node.ID: 0.556,
nodes[3].Node.ID: 0.556,
}
for _, rn := range out {
require.Equal(t, fmt.Sprintf("%.3f", expectedScores[rn.Node.ID]), fmt.Sprintf("%.3f", rn.FinalScore))
}
}
func TestSpreadIterator_EvenSpread(t *testing.T) {
state, ctx := testContext(t)
dcs := []string{"dc1", "dc2", "dc1", "dc2", "dc1", "dc2", "dc2", "dc1", "dc1", "dc1"}
var nodes []*RankedNode
// Add these nodes to the state store
for i, dc := range dcs {
node := mock.Node()
node.Datacenter = dc
if err := state.UpsertNode(structs.MsgTypeTestSetup, uint64(100+i), node); err != nil {
t.Fatalf("failed to upsert node: %v", err)
}
nodes = append(nodes, &RankedNode{Node: node})
}
static := NewStaticRankIterator(ctx, nodes)
job := mock.Job()
tg := job.TaskGroups[0]
job.TaskGroups[0].Count = 10
// Configure even spread across node.datacenter
spread := &structs.Spread{
Weight: 100,
Attribute: "${node.datacenter}",
}
tg.Spreads = []*structs.Spread{spread}
spreadIter := NewSpreadIterator(ctx, static)
spreadIter.SetJob(job)
spreadIter.SetTaskGroup(tg)
scoreNorm := NewScoreNormalizationIterator(ctx, spreadIter)
out := collectRanked(scoreNorm)
// Nothing placed so both dc nodes get 0 as the score
expectedScores := map[string]float64{
"dc1": 0,
"dc2": 0,
}
for _, rn := range out {
require.Equal(t, fmt.Sprintf("%.3f", expectedScores[rn.Node.Datacenter]), fmt.Sprintf("%.3f", rn.FinalScore))
}
// Update the plan to add allocs to nodes in dc1
// After this step dc2 nodes should get boosted
ctx.plan.NodeAllocation[nodes[0].Node.ID] = []*structs.Allocation{
{
Namespace: structs.DefaultNamespace,
TaskGroup: tg.Name,
JobID: job.ID,
Job: job,
ID: uuid.Generate(),
NodeID: nodes[0].Node.ID,
},
}
ctx.plan.NodeAllocation[nodes[2].Node.ID] = []*structs.Allocation{
{
Namespace: structs.DefaultNamespace,
TaskGroup: tg.Name,
JobID: job.ID,
Job: job,
ID: uuid.Generate(),
NodeID: nodes[2].Node.ID,
},
}
// Reset the scores
for _, node := range nodes {
node.Scores = nil
node.FinalScore = 0
}
static = NewStaticRankIterator(ctx, nodes)
spreadIter = NewSpreadIterator(ctx, static)
spreadIter.SetJob(job)
spreadIter.SetTaskGroup(tg)
scoreNorm = NewScoreNormalizationIterator(ctx, spreadIter)
out = collectRanked(scoreNorm)
// Expect nodes in dc2 with existing allocs to get a boost
// dc1 nodes are penalized because they have allocs
expectedScores = map[string]float64{
"dc1": -1,
"dc2": 1,
}
for _, rn := range out {
require.Equal(t, expectedScores[rn.Node.Datacenter], rn.FinalScore)
}
// Update the plan to add more allocs to nodes in dc2
// After this step dc1 nodes should get boosted
ctx.plan.NodeAllocation[nodes[1].Node.ID] = []*structs.Allocation{
{
Namespace: structs.DefaultNamespace,
TaskGroup: tg.Name,
JobID: job.ID,
Job: job,
ID: uuid.Generate(),
NodeID: nodes[1].Node.ID,
},
{
Namespace: structs.DefaultNamespace,
TaskGroup: tg.Name,
JobID: job.ID,
Job: job,
ID: uuid.Generate(),
NodeID: nodes[1].Node.ID,
},
}
ctx.plan.NodeAllocation[nodes[3].Node.ID] = []*structs.Allocation{
{
Namespace: structs.DefaultNamespace,
TaskGroup: tg.Name,
JobID: job.ID,
Job: job,
ID: uuid.Generate(),
NodeID: nodes[3].Node.ID,
},
}
// Reset the scores
for _, node := range nodes {
node.Scores = nil
node.FinalScore = 0
}
static = NewStaticRankIterator(ctx, nodes)
spreadIter = NewSpreadIterator(ctx, static)
spreadIter.SetJob(job)
spreadIter.SetTaskGroup(tg)
scoreNorm = NewScoreNormalizationIterator(ctx, spreadIter)
out = collectRanked(scoreNorm)
// Expect nodes in dc2 to be penalized because there are 3 allocs there now
// dc1 nodes are boosted because that has 2 allocs
expectedScores = map[string]float64{
"dc1": 0.5,
"dc2": -0.5,
}
for _, rn := range out {
require.Equal(t, fmt.Sprintf("%3.3f", expectedScores[rn.Node.Datacenter]), fmt.Sprintf("%3.3f", rn.FinalScore))
}
// Add another node in dc3
node := mock.Node()
node.Datacenter = "dc3"
if err := state.UpsertNode(structs.MsgTypeTestSetup, uint64(1111), node); err != nil {
t.Fatalf("failed to upsert node: %v", err)
}
nodes = append(nodes, &RankedNode{Node: node})
// Add another alloc to dc1, now its count matches dc2
ctx.plan.NodeAllocation[nodes[4].Node.ID] = []*structs.Allocation{
{
Namespace: structs.DefaultNamespace,
TaskGroup: tg.Name,
JobID: job.ID,
Job: job,
ID: uuid.Generate(),
NodeID: nodes[4].Node.ID,
},
}
// Reset scores
for _, node := range nodes {
node.Scores = nil
node.FinalScore = 0
}
static = NewStaticRankIterator(ctx, nodes)
spreadIter = NewSpreadIterator(ctx, static)
spreadIter.SetJob(job)
spreadIter.SetTaskGroup(tg)
scoreNorm = NewScoreNormalizationIterator(ctx, spreadIter)
out = collectRanked(scoreNorm)
// Expect dc1 and dc2 to be penalized because they have 3 allocs
// dc3 should get a boost because it has 0 allocs
expectedScores = map[string]float64{
"dc1": -1,
"dc2": -1,
"dc3": 1,
}
for _, rn := range out {
require.Equal(t, fmt.Sprintf("%.3f", expectedScores[rn.Node.Datacenter]), fmt.Sprintf("%.3f", rn.FinalScore))
}
}
// Test scenarios where the spread iterator sets maximum penalty (-1.0)
func TestSpreadIterator_MaxPenalty(t *testing.T) {
state, ctx := testContext(t)
var nodes []*RankedNode
// Add nodes in dc3 to the state store
for i := 0; i < 5; i++ {
node := mock.Node()
node.Datacenter = "dc3"
if err := state.UpsertNode(structs.MsgTypeTestSetup, uint64(100+i), node); err != nil {
t.Fatalf("failed to upsert node: %v", err)
}
nodes = append(nodes, &RankedNode{Node: node})
}
static := NewStaticRankIterator(ctx, nodes)
job := mock.Job()
tg := job.TaskGroups[0]
job.TaskGroups[0].Count = 5
// Create spread target of 80% in dc1
// and 20% in dc2
spread := &structs.Spread{
Weight: 100,
Attribute: "${node.datacenter}",
SpreadTarget: []*structs.SpreadTarget{
{
Value: "dc1",
Percent: 80,
},
{
Value: "dc2",
Percent: 20,
},
},
}
tg.Spreads = []*structs.Spread{spread}
spreadIter := NewSpreadIterator(ctx, static)
spreadIter.SetJob(job)
spreadIter.SetTaskGroup(tg)
scoreNorm := NewScoreNormalizationIterator(ctx, spreadIter)
out := collectRanked(scoreNorm)
// All nodes are in dc3 so score should be -1
for _, rn := range out {
require.Equal(t, -1.0, rn.FinalScore)
}
// Reset scores
for _, node := range nodes {
node.Scores = nil
node.FinalScore = 0
}
// Create spread on attribute that doesn't exist on any nodes
spread = &structs.Spread{
Weight: 100,
Attribute: "${meta.foo}",
SpreadTarget: []*structs.SpreadTarget{
{
Value: "bar",
Percent: 80,
},
{
Value: "baz",
Percent: 20,
},
},
}
tg.Spreads = []*structs.Spread{spread}
static = NewStaticRankIterator(ctx, nodes)
spreadIter = NewSpreadIterator(ctx, static)
spreadIter.SetJob(job)
spreadIter.SetTaskGroup(tg)
scoreNorm = NewScoreNormalizationIterator(ctx, spreadIter)
out = collectRanked(scoreNorm)
// All nodes don't have the spread attribute so score should be -1
for _, rn := range out {
require.Equal(t, -1.0, rn.FinalScore)
}
}
func Test_evenSpreadScoreBoost(t *testing.T) {
pset := &propertySet{
existingValues: map[string]uint64{},
proposedValues: map[string]uint64{
"dc2": 1,
"dc1": 1,
"dc3": 1,
},
clearedValues: map[string]uint64{
"dc2": 1,
"dc3": 1,
},
targetAttribute: "${node.datacenter}",
}
opt := &structs.Node{
Datacenter: "dc2",
}
boost := evenSpreadScoreBoost(pset, opt)
require.False(t, math.IsInf(boost, 1))
require.Equal(t, 1.0, boost)
}