open-nomad/scheduler/spread_test.go
Drew Bailey 6c788fdccd
Events/msgtype cleanup (#9117)
* use msgtype in upsert node

adds message type to signature for upsert node, update tests, remove placeholder method

* UpsertAllocs msg type test setup

* use upsertallocs with msg type in signature

update test usage of delete node

delete placeholder msgtype method

* add msgtype to upsert evals signature, update test call sites with test setup msg type

handle snapshot upsert eval outside of FSM and ignore eval event

remove placeholder upsertevalsmsgtype

handle job plan rpc and prevent event creation for plan

msgtype cleanup upsertnodeevents

updatenodedrain msgtype

msg type 0 is a node registration event, so set the default  to the ignore type

* fix named import

* fix signature ordering on upsertnode to match
2020-10-19 09:30:15 -04:00

571 lines
14 KiB
Go

package scheduler
import (
"math"
"testing"
"fmt"
"github.com/hashicorp/nomad/helper/uuid"
"github.com/hashicorp/nomad/nomad/mock"
"github.com/hashicorp/nomad/nomad/structs"
"github.com/stretchr/testify/require"
)
func TestSpreadIterator_SingleAttribute(t *testing.T) {
state, ctx := testContext(t)
dcs := []string{"dc1", "dc2", "dc1", "dc1"}
var nodes []*RankedNode
// Add these nodes to the state store
for i, dc := range dcs {
node := mock.Node()
node.Datacenter = dc
if err := state.UpsertNode(structs.MsgTypeTestSetup, uint64(100+i), node); err != nil {
t.Fatalf("failed to upsert node: %v", err)
}
nodes = append(nodes, &RankedNode{Node: node})
}
static := NewStaticRankIterator(ctx, nodes)
job := mock.Job()
tg := job.TaskGroups[0]
job.TaskGroups[0].Count = 10
// add allocs to nodes in dc1
upserting := []*structs.Allocation{
{
Namespace: structs.DefaultNamespace,
TaskGroup: tg.Name,
JobID: job.ID,
Job: job,
ID: uuid.Generate(),
EvalID: uuid.Generate(),
NodeID: nodes[0].Node.ID,
},
{
Namespace: structs.DefaultNamespace,
TaskGroup: tg.Name,
JobID: job.ID,
Job: job,
ID: uuid.Generate(),
EvalID: uuid.Generate(),
NodeID: nodes[2].Node.ID,
},
}
if err := state.UpsertAllocs(structs.MsgTypeTestSetup, 1000, upserting); err != nil {
t.Fatalf("failed to UpsertAllocs: %v", err)
}
// Create spread target of 80% in dc1
// Implicitly, this means 20% in dc2
spread := &structs.Spread{
Weight: 100,
Attribute: "${node.datacenter}",
SpreadTarget: []*structs.SpreadTarget{
{
Value: "dc1",
Percent: 80,
},
},
}
tg.Spreads = []*structs.Spread{spread}
spreadIter := NewSpreadIterator(ctx, static)
spreadIter.SetJob(job)
spreadIter.SetTaskGroup(tg)
scoreNorm := NewScoreNormalizationIterator(ctx, spreadIter)
out := collectRanked(scoreNorm)
// Expect nodes in dc1 with existing allocs to get a boost
// Boost should be ((desiredCount-actual)/desired)*spreadWeight
// For this test, that becomes dc1 = ((8-3)/8 ) = 0.5, and dc2=(2-1)/2
expectedScores := map[string]float64{
"dc1": 0.625,
"dc2": 0.5,
}
for _, rn := range out {
require.Equal(t, expectedScores[rn.Node.Datacenter], rn.FinalScore)
}
// Update the plan to add more allocs to nodes in dc1
// After this step there are enough allocs to meet the desired count in dc1
ctx.plan.NodeAllocation[nodes[0].Node.ID] = []*structs.Allocation{
{
Namespace: structs.DefaultNamespace,
TaskGroup: tg.Name,
JobID: job.ID,
Job: job,
ID: uuid.Generate(),
NodeID: nodes[0].Node.ID,
},
{
Namespace: structs.DefaultNamespace,
TaskGroup: tg.Name,
JobID: job.ID,
Job: job,
ID: uuid.Generate(),
NodeID: nodes[0].Node.ID,
},
// Should be ignored as it is a different job.
{
Namespace: structs.DefaultNamespace,
TaskGroup: "bbb",
JobID: "ignore 2",
Job: job,
ID: uuid.Generate(),
NodeID: nodes[0].Node.ID,
},
}
ctx.plan.NodeAllocation[nodes[3].Node.ID] = []*structs.Allocation{
{
Namespace: structs.DefaultNamespace,
TaskGroup: tg.Name,
JobID: job.ID,
Job: job,
ID: uuid.Generate(),
NodeID: nodes[3].Node.ID,
},
{
Namespace: structs.DefaultNamespace,
TaskGroup: tg.Name,
JobID: job.ID,
Job: job,
ID: uuid.Generate(),
NodeID: nodes[3].Node.ID,
},
{
Namespace: structs.DefaultNamespace,
TaskGroup: tg.Name,
JobID: job.ID,
Job: job,
ID: uuid.Generate(),
NodeID: nodes[3].Node.ID,
},
}
// Reset the scores
for _, node := range nodes {
node.Scores = nil
node.FinalScore = 0
}
static = NewStaticRankIterator(ctx, nodes)
spreadIter = NewSpreadIterator(ctx, static)
spreadIter.SetJob(job)
spreadIter.SetTaskGroup(tg)
scoreNorm = NewScoreNormalizationIterator(ctx, spreadIter)
out = collectRanked(scoreNorm)
// Expect nodes in dc2 with existing allocs to get a boost
// DC1 nodes are not boosted because there are enough allocs to meet
// the desired count
expectedScores = map[string]float64{
"dc1": 0,
"dc2": 0.5,
}
for _, rn := range out {
require.Equal(t, expectedScores[rn.Node.Datacenter], rn.FinalScore)
}
}
func TestSpreadIterator_MultipleAttributes(t *testing.T) {
state, ctx := testContext(t)
dcs := []string{"dc1", "dc2", "dc1", "dc1"}
rack := []string{"r1", "r1", "r2", "r2"}
var nodes []*RankedNode
// Add these nodes to the state store
for i, dc := range dcs {
node := mock.Node()
node.Datacenter = dc
node.Meta["rack"] = rack[i]
if err := state.UpsertNode(structs.MsgTypeTestSetup, uint64(100+i), node); err != nil {
t.Fatalf("failed to upsert node: %v", err)
}
nodes = append(nodes, &RankedNode{Node: node})
}
static := NewStaticRankIterator(ctx, nodes)
job := mock.Job()
tg := job.TaskGroups[0]
job.TaskGroups[0].Count = 10
// add allocs to nodes in dc1
upserting := []*structs.Allocation{
{
Namespace: structs.DefaultNamespace,
TaskGroup: tg.Name,
JobID: job.ID,
Job: job,
ID: uuid.Generate(),
EvalID: uuid.Generate(),
NodeID: nodes[0].Node.ID,
},
{
Namespace: structs.DefaultNamespace,
TaskGroup: tg.Name,
JobID: job.ID,
Job: job,
ID: uuid.Generate(),
EvalID: uuid.Generate(),
NodeID: nodes[2].Node.ID,
},
}
if err := state.UpsertAllocs(structs.MsgTypeTestSetup, 1000, upserting); err != nil {
t.Fatalf("failed to UpsertAllocs: %v", err)
}
spread1 := &structs.Spread{
Weight: 100,
Attribute: "${node.datacenter}",
SpreadTarget: []*structs.SpreadTarget{
{
Value: "dc1",
Percent: 60,
},
{
Value: "dc2",
Percent: 40,
},
},
}
spread2 := &structs.Spread{
Weight: 50,
Attribute: "${meta.rack}",
SpreadTarget: []*structs.SpreadTarget{
{
Value: "r1",
Percent: 40,
},
{
Value: "r2",
Percent: 60,
},
},
}
tg.Spreads = []*structs.Spread{spread1, spread2}
spreadIter := NewSpreadIterator(ctx, static)
spreadIter.SetJob(job)
spreadIter.SetTaskGroup(tg)
scoreNorm := NewScoreNormalizationIterator(ctx, spreadIter)
out := collectRanked(scoreNorm)
// Score comes from combining two different spread factors
// Second node should have the highest score because it has no allocs and its in dc2/r1
expectedScores := map[string]float64{
nodes[0].Node.ID: 0.500,
nodes[1].Node.ID: 0.667,
nodes[2].Node.ID: 0.556,
nodes[3].Node.ID: 0.556,
}
for _, rn := range out {
require.Equal(t, fmt.Sprintf("%.3f", expectedScores[rn.Node.ID]), fmt.Sprintf("%.3f", rn.FinalScore))
}
}
func TestSpreadIterator_EvenSpread(t *testing.T) {
state, ctx := testContext(t)
dcs := []string{"dc1", "dc2", "dc1", "dc2", "dc1", "dc2", "dc2", "dc1", "dc1", "dc1"}
var nodes []*RankedNode
// Add these nodes to the state store
for i, dc := range dcs {
node := mock.Node()
node.Datacenter = dc
if err := state.UpsertNode(structs.MsgTypeTestSetup, uint64(100+i), node); err != nil {
t.Fatalf("failed to upsert node: %v", err)
}
nodes = append(nodes, &RankedNode{Node: node})
}
static := NewStaticRankIterator(ctx, nodes)
job := mock.Job()
tg := job.TaskGroups[0]
job.TaskGroups[0].Count = 10
// Configure even spread across node.datacenter
spread := &structs.Spread{
Weight: 100,
Attribute: "${node.datacenter}",
}
tg.Spreads = []*structs.Spread{spread}
spreadIter := NewSpreadIterator(ctx, static)
spreadIter.SetJob(job)
spreadIter.SetTaskGroup(tg)
scoreNorm := NewScoreNormalizationIterator(ctx, spreadIter)
out := collectRanked(scoreNorm)
// Nothing placed so both dc nodes get 0 as the score
expectedScores := map[string]float64{
"dc1": 0,
"dc2": 0,
}
for _, rn := range out {
require.Equal(t, fmt.Sprintf("%.3f", expectedScores[rn.Node.Datacenter]), fmt.Sprintf("%.3f", rn.FinalScore))
}
// Update the plan to add allocs to nodes in dc1
// After this step dc2 nodes should get boosted
ctx.plan.NodeAllocation[nodes[0].Node.ID] = []*structs.Allocation{
{
Namespace: structs.DefaultNamespace,
TaskGroup: tg.Name,
JobID: job.ID,
Job: job,
ID: uuid.Generate(),
NodeID: nodes[0].Node.ID,
},
}
ctx.plan.NodeAllocation[nodes[2].Node.ID] = []*structs.Allocation{
{
Namespace: structs.DefaultNamespace,
TaskGroup: tg.Name,
JobID: job.ID,
Job: job,
ID: uuid.Generate(),
NodeID: nodes[2].Node.ID,
},
}
// Reset the scores
for _, node := range nodes {
node.Scores = nil
node.FinalScore = 0
}
static = NewStaticRankIterator(ctx, nodes)
spreadIter = NewSpreadIterator(ctx, static)
spreadIter.SetJob(job)
spreadIter.SetTaskGroup(tg)
scoreNorm = NewScoreNormalizationIterator(ctx, spreadIter)
out = collectRanked(scoreNorm)
// Expect nodes in dc2 with existing allocs to get a boost
// dc1 nodes are penalized because they have allocs
expectedScores = map[string]float64{
"dc1": -1,
"dc2": 1,
}
for _, rn := range out {
require.Equal(t, expectedScores[rn.Node.Datacenter], rn.FinalScore)
}
// Update the plan to add more allocs to nodes in dc2
// After this step dc1 nodes should get boosted
ctx.plan.NodeAllocation[nodes[1].Node.ID] = []*structs.Allocation{
{
Namespace: structs.DefaultNamespace,
TaskGroup: tg.Name,
JobID: job.ID,
Job: job,
ID: uuid.Generate(),
NodeID: nodes[1].Node.ID,
},
{
Namespace: structs.DefaultNamespace,
TaskGroup: tg.Name,
JobID: job.ID,
Job: job,
ID: uuid.Generate(),
NodeID: nodes[1].Node.ID,
},
}
ctx.plan.NodeAllocation[nodes[3].Node.ID] = []*structs.Allocation{
{
Namespace: structs.DefaultNamespace,
TaskGroup: tg.Name,
JobID: job.ID,
Job: job,
ID: uuid.Generate(),
NodeID: nodes[3].Node.ID,
},
}
// Reset the scores
for _, node := range nodes {
node.Scores = nil
node.FinalScore = 0
}
static = NewStaticRankIterator(ctx, nodes)
spreadIter = NewSpreadIterator(ctx, static)
spreadIter.SetJob(job)
spreadIter.SetTaskGroup(tg)
scoreNorm = NewScoreNormalizationIterator(ctx, spreadIter)
out = collectRanked(scoreNorm)
// Expect nodes in dc2 to be penalized because there are 3 allocs there now
// dc1 nodes are boosted because that has 2 allocs
expectedScores = map[string]float64{
"dc1": 0.5,
"dc2": -0.5,
}
for _, rn := range out {
require.Equal(t, fmt.Sprintf("%3.3f", expectedScores[rn.Node.Datacenter]), fmt.Sprintf("%3.3f", rn.FinalScore))
}
// Add another node in dc3
node := mock.Node()
node.Datacenter = "dc3"
if err := state.UpsertNode(structs.MsgTypeTestSetup, uint64(1111), node); err != nil {
t.Fatalf("failed to upsert node: %v", err)
}
nodes = append(nodes, &RankedNode{Node: node})
// Add another alloc to dc1, now its count matches dc2
ctx.plan.NodeAllocation[nodes[4].Node.ID] = []*structs.Allocation{
{
Namespace: structs.DefaultNamespace,
TaskGroup: tg.Name,
JobID: job.ID,
Job: job,
ID: uuid.Generate(),
NodeID: nodes[4].Node.ID,
},
}
// Reset scores
for _, node := range nodes {
node.Scores = nil
node.FinalScore = 0
}
static = NewStaticRankIterator(ctx, nodes)
spreadIter = NewSpreadIterator(ctx, static)
spreadIter.SetJob(job)
spreadIter.SetTaskGroup(tg)
scoreNorm = NewScoreNormalizationIterator(ctx, spreadIter)
out = collectRanked(scoreNorm)
// Expect dc1 and dc2 to be penalized because they have 3 allocs
// dc3 should get a boost because it has 0 allocs
expectedScores = map[string]float64{
"dc1": -1,
"dc2": -1,
"dc3": 1,
}
for _, rn := range out {
require.Equal(t, fmt.Sprintf("%.3f", expectedScores[rn.Node.Datacenter]), fmt.Sprintf("%.3f", rn.FinalScore))
}
}
// Test scenarios where the spread iterator sets maximum penalty (-1.0)
func TestSpreadIterator_MaxPenalty(t *testing.T) {
state, ctx := testContext(t)
var nodes []*RankedNode
// Add nodes in dc3 to the state store
for i := 0; i < 5; i++ {
node := mock.Node()
node.Datacenter = "dc3"
if err := state.UpsertNode(structs.MsgTypeTestSetup, uint64(100+i), node); err != nil {
t.Fatalf("failed to upsert node: %v", err)
}
nodes = append(nodes, &RankedNode{Node: node})
}
static := NewStaticRankIterator(ctx, nodes)
job := mock.Job()
tg := job.TaskGroups[0]
job.TaskGroups[0].Count = 5
// Create spread target of 80% in dc1
// and 20% in dc2
spread := &structs.Spread{
Weight: 100,
Attribute: "${node.datacenter}",
SpreadTarget: []*structs.SpreadTarget{
{
Value: "dc1",
Percent: 80,
},
{
Value: "dc2",
Percent: 20,
},
},
}
tg.Spreads = []*structs.Spread{spread}
spreadIter := NewSpreadIterator(ctx, static)
spreadIter.SetJob(job)
spreadIter.SetTaskGroup(tg)
scoreNorm := NewScoreNormalizationIterator(ctx, spreadIter)
out := collectRanked(scoreNorm)
// All nodes are in dc3 so score should be -1
for _, rn := range out {
require.Equal(t, -1.0, rn.FinalScore)
}
// Reset scores
for _, node := range nodes {
node.Scores = nil
node.FinalScore = 0
}
// Create spread on attribute that doesn't exist on any nodes
spread = &structs.Spread{
Weight: 100,
Attribute: "${meta.foo}",
SpreadTarget: []*structs.SpreadTarget{
{
Value: "bar",
Percent: 80,
},
{
Value: "baz",
Percent: 20,
},
},
}
tg.Spreads = []*structs.Spread{spread}
static = NewStaticRankIterator(ctx, nodes)
spreadIter = NewSpreadIterator(ctx, static)
spreadIter.SetJob(job)
spreadIter.SetTaskGroup(tg)
scoreNorm = NewScoreNormalizationIterator(ctx, spreadIter)
out = collectRanked(scoreNorm)
// All nodes don't have the spread attribute so score should be -1
for _, rn := range out {
require.Equal(t, -1.0, rn.FinalScore)
}
}
func Test_evenSpreadScoreBoost(t *testing.T) {
pset := &propertySet{
existingValues: map[string]uint64{},
proposedValues: map[string]uint64{
"dc2": 1,
"dc1": 1,
"dc3": 1,
},
clearedValues: map[string]uint64{
"dc2": 1,
"dc3": 1,
},
targetAttribute: "${node.datacenter}",
}
opt := &structs.Node{
Datacenter: "dc2",
}
boost := evenSpreadScoreBoost(pset, opt)
require.False(t, math.IsInf(boost, 1))
require.Equal(t, 1.0, boost)
}