6c788fdccd
* use msgtype in upsert node adds message type to signature for upsert node, update tests, remove placeholder method * UpsertAllocs msg type test setup * use upsertallocs with msg type in signature update test usage of delete node delete placeholder msgtype method * add msgtype to upsert evals signature, update test call sites with test setup msg type handle snapshot upsert eval outside of FSM and ignore eval event remove placeholder upsertevalsmsgtype handle job plan rpc and prevent event creation for plan msgtype cleanup upsertnodeevents updatenodedrain msgtype msg type 0 is a node registration event, so set the default to the ignore type * fix named import * fix signature ordering on upsertnode to match
571 lines
14 KiB
Go
571 lines
14 KiB
Go
package scheduler
|
|
|
|
import (
|
|
"math"
|
|
"testing"
|
|
|
|
"fmt"
|
|
|
|
"github.com/hashicorp/nomad/helper/uuid"
|
|
"github.com/hashicorp/nomad/nomad/mock"
|
|
"github.com/hashicorp/nomad/nomad/structs"
|
|
"github.com/stretchr/testify/require"
|
|
)
|
|
|
|
func TestSpreadIterator_SingleAttribute(t *testing.T) {
|
|
state, ctx := testContext(t)
|
|
dcs := []string{"dc1", "dc2", "dc1", "dc1"}
|
|
var nodes []*RankedNode
|
|
|
|
// Add these nodes to the state store
|
|
for i, dc := range dcs {
|
|
node := mock.Node()
|
|
node.Datacenter = dc
|
|
if err := state.UpsertNode(structs.MsgTypeTestSetup, uint64(100+i), node); err != nil {
|
|
t.Fatalf("failed to upsert node: %v", err)
|
|
}
|
|
nodes = append(nodes, &RankedNode{Node: node})
|
|
}
|
|
|
|
static := NewStaticRankIterator(ctx, nodes)
|
|
|
|
job := mock.Job()
|
|
tg := job.TaskGroups[0]
|
|
job.TaskGroups[0].Count = 10
|
|
// add allocs to nodes in dc1
|
|
upserting := []*structs.Allocation{
|
|
{
|
|
Namespace: structs.DefaultNamespace,
|
|
TaskGroup: tg.Name,
|
|
JobID: job.ID,
|
|
Job: job,
|
|
ID: uuid.Generate(),
|
|
EvalID: uuid.Generate(),
|
|
NodeID: nodes[0].Node.ID,
|
|
},
|
|
{
|
|
Namespace: structs.DefaultNamespace,
|
|
TaskGroup: tg.Name,
|
|
JobID: job.ID,
|
|
Job: job,
|
|
ID: uuid.Generate(),
|
|
EvalID: uuid.Generate(),
|
|
NodeID: nodes[2].Node.ID,
|
|
},
|
|
}
|
|
|
|
if err := state.UpsertAllocs(structs.MsgTypeTestSetup, 1000, upserting); err != nil {
|
|
t.Fatalf("failed to UpsertAllocs: %v", err)
|
|
}
|
|
|
|
// Create spread target of 80% in dc1
|
|
// Implicitly, this means 20% in dc2
|
|
spread := &structs.Spread{
|
|
Weight: 100,
|
|
Attribute: "${node.datacenter}",
|
|
SpreadTarget: []*structs.SpreadTarget{
|
|
{
|
|
Value: "dc1",
|
|
Percent: 80,
|
|
},
|
|
},
|
|
}
|
|
tg.Spreads = []*structs.Spread{spread}
|
|
spreadIter := NewSpreadIterator(ctx, static)
|
|
spreadIter.SetJob(job)
|
|
spreadIter.SetTaskGroup(tg)
|
|
|
|
scoreNorm := NewScoreNormalizationIterator(ctx, spreadIter)
|
|
|
|
out := collectRanked(scoreNorm)
|
|
|
|
// Expect nodes in dc1 with existing allocs to get a boost
|
|
// Boost should be ((desiredCount-actual)/desired)*spreadWeight
|
|
// For this test, that becomes dc1 = ((8-3)/8 ) = 0.5, and dc2=(2-1)/2
|
|
expectedScores := map[string]float64{
|
|
"dc1": 0.625,
|
|
"dc2": 0.5,
|
|
}
|
|
for _, rn := range out {
|
|
require.Equal(t, expectedScores[rn.Node.Datacenter], rn.FinalScore)
|
|
}
|
|
|
|
// Update the plan to add more allocs to nodes in dc1
|
|
// After this step there are enough allocs to meet the desired count in dc1
|
|
ctx.plan.NodeAllocation[nodes[0].Node.ID] = []*structs.Allocation{
|
|
{
|
|
Namespace: structs.DefaultNamespace,
|
|
TaskGroup: tg.Name,
|
|
JobID: job.ID,
|
|
Job: job,
|
|
ID: uuid.Generate(),
|
|
NodeID: nodes[0].Node.ID,
|
|
},
|
|
{
|
|
Namespace: structs.DefaultNamespace,
|
|
TaskGroup: tg.Name,
|
|
JobID: job.ID,
|
|
Job: job,
|
|
ID: uuid.Generate(),
|
|
NodeID: nodes[0].Node.ID,
|
|
},
|
|
// Should be ignored as it is a different job.
|
|
{
|
|
Namespace: structs.DefaultNamespace,
|
|
TaskGroup: "bbb",
|
|
JobID: "ignore 2",
|
|
Job: job,
|
|
ID: uuid.Generate(),
|
|
NodeID: nodes[0].Node.ID,
|
|
},
|
|
}
|
|
ctx.plan.NodeAllocation[nodes[3].Node.ID] = []*structs.Allocation{
|
|
{
|
|
Namespace: structs.DefaultNamespace,
|
|
TaskGroup: tg.Name,
|
|
JobID: job.ID,
|
|
Job: job,
|
|
ID: uuid.Generate(),
|
|
NodeID: nodes[3].Node.ID,
|
|
},
|
|
{
|
|
Namespace: structs.DefaultNamespace,
|
|
TaskGroup: tg.Name,
|
|
JobID: job.ID,
|
|
Job: job,
|
|
ID: uuid.Generate(),
|
|
NodeID: nodes[3].Node.ID,
|
|
},
|
|
{
|
|
Namespace: structs.DefaultNamespace,
|
|
TaskGroup: tg.Name,
|
|
JobID: job.ID,
|
|
Job: job,
|
|
ID: uuid.Generate(),
|
|
NodeID: nodes[3].Node.ID,
|
|
},
|
|
}
|
|
|
|
// Reset the scores
|
|
for _, node := range nodes {
|
|
node.Scores = nil
|
|
node.FinalScore = 0
|
|
}
|
|
static = NewStaticRankIterator(ctx, nodes)
|
|
spreadIter = NewSpreadIterator(ctx, static)
|
|
spreadIter.SetJob(job)
|
|
spreadIter.SetTaskGroup(tg)
|
|
scoreNorm = NewScoreNormalizationIterator(ctx, spreadIter)
|
|
out = collectRanked(scoreNorm)
|
|
|
|
// Expect nodes in dc2 with existing allocs to get a boost
|
|
// DC1 nodes are not boosted because there are enough allocs to meet
|
|
// the desired count
|
|
expectedScores = map[string]float64{
|
|
"dc1": 0,
|
|
"dc2": 0.5,
|
|
}
|
|
for _, rn := range out {
|
|
require.Equal(t, expectedScores[rn.Node.Datacenter], rn.FinalScore)
|
|
}
|
|
}
|
|
|
|
func TestSpreadIterator_MultipleAttributes(t *testing.T) {
|
|
state, ctx := testContext(t)
|
|
dcs := []string{"dc1", "dc2", "dc1", "dc1"}
|
|
rack := []string{"r1", "r1", "r2", "r2"}
|
|
var nodes []*RankedNode
|
|
|
|
// Add these nodes to the state store
|
|
for i, dc := range dcs {
|
|
node := mock.Node()
|
|
node.Datacenter = dc
|
|
node.Meta["rack"] = rack[i]
|
|
if err := state.UpsertNode(structs.MsgTypeTestSetup, uint64(100+i), node); err != nil {
|
|
t.Fatalf("failed to upsert node: %v", err)
|
|
}
|
|
nodes = append(nodes, &RankedNode{Node: node})
|
|
}
|
|
|
|
static := NewStaticRankIterator(ctx, nodes)
|
|
|
|
job := mock.Job()
|
|
tg := job.TaskGroups[0]
|
|
job.TaskGroups[0].Count = 10
|
|
// add allocs to nodes in dc1
|
|
upserting := []*structs.Allocation{
|
|
{
|
|
Namespace: structs.DefaultNamespace,
|
|
TaskGroup: tg.Name,
|
|
JobID: job.ID,
|
|
Job: job,
|
|
ID: uuid.Generate(),
|
|
EvalID: uuid.Generate(),
|
|
NodeID: nodes[0].Node.ID,
|
|
},
|
|
{
|
|
Namespace: structs.DefaultNamespace,
|
|
TaskGroup: tg.Name,
|
|
JobID: job.ID,
|
|
Job: job,
|
|
ID: uuid.Generate(),
|
|
EvalID: uuid.Generate(),
|
|
NodeID: nodes[2].Node.ID,
|
|
},
|
|
}
|
|
|
|
if err := state.UpsertAllocs(structs.MsgTypeTestSetup, 1000, upserting); err != nil {
|
|
t.Fatalf("failed to UpsertAllocs: %v", err)
|
|
}
|
|
|
|
spread1 := &structs.Spread{
|
|
Weight: 100,
|
|
Attribute: "${node.datacenter}",
|
|
SpreadTarget: []*structs.SpreadTarget{
|
|
{
|
|
Value: "dc1",
|
|
Percent: 60,
|
|
},
|
|
{
|
|
Value: "dc2",
|
|
Percent: 40,
|
|
},
|
|
},
|
|
}
|
|
|
|
spread2 := &structs.Spread{
|
|
Weight: 50,
|
|
Attribute: "${meta.rack}",
|
|
SpreadTarget: []*structs.SpreadTarget{
|
|
{
|
|
Value: "r1",
|
|
Percent: 40,
|
|
},
|
|
{
|
|
Value: "r2",
|
|
Percent: 60,
|
|
},
|
|
},
|
|
}
|
|
|
|
tg.Spreads = []*structs.Spread{spread1, spread2}
|
|
spreadIter := NewSpreadIterator(ctx, static)
|
|
spreadIter.SetJob(job)
|
|
spreadIter.SetTaskGroup(tg)
|
|
|
|
scoreNorm := NewScoreNormalizationIterator(ctx, spreadIter)
|
|
|
|
out := collectRanked(scoreNorm)
|
|
|
|
// Score comes from combining two different spread factors
|
|
// Second node should have the highest score because it has no allocs and its in dc2/r1
|
|
expectedScores := map[string]float64{
|
|
nodes[0].Node.ID: 0.500,
|
|
nodes[1].Node.ID: 0.667,
|
|
nodes[2].Node.ID: 0.556,
|
|
nodes[3].Node.ID: 0.556,
|
|
}
|
|
for _, rn := range out {
|
|
require.Equal(t, fmt.Sprintf("%.3f", expectedScores[rn.Node.ID]), fmt.Sprintf("%.3f", rn.FinalScore))
|
|
}
|
|
|
|
}
|
|
|
|
func TestSpreadIterator_EvenSpread(t *testing.T) {
|
|
state, ctx := testContext(t)
|
|
dcs := []string{"dc1", "dc2", "dc1", "dc2", "dc1", "dc2", "dc2", "dc1", "dc1", "dc1"}
|
|
var nodes []*RankedNode
|
|
|
|
// Add these nodes to the state store
|
|
for i, dc := range dcs {
|
|
node := mock.Node()
|
|
node.Datacenter = dc
|
|
if err := state.UpsertNode(structs.MsgTypeTestSetup, uint64(100+i), node); err != nil {
|
|
t.Fatalf("failed to upsert node: %v", err)
|
|
}
|
|
nodes = append(nodes, &RankedNode{Node: node})
|
|
}
|
|
|
|
static := NewStaticRankIterator(ctx, nodes)
|
|
job := mock.Job()
|
|
tg := job.TaskGroups[0]
|
|
job.TaskGroups[0].Count = 10
|
|
|
|
// Configure even spread across node.datacenter
|
|
spread := &structs.Spread{
|
|
Weight: 100,
|
|
Attribute: "${node.datacenter}",
|
|
}
|
|
tg.Spreads = []*structs.Spread{spread}
|
|
spreadIter := NewSpreadIterator(ctx, static)
|
|
spreadIter.SetJob(job)
|
|
spreadIter.SetTaskGroup(tg)
|
|
|
|
scoreNorm := NewScoreNormalizationIterator(ctx, spreadIter)
|
|
|
|
out := collectRanked(scoreNorm)
|
|
|
|
// Nothing placed so both dc nodes get 0 as the score
|
|
expectedScores := map[string]float64{
|
|
"dc1": 0,
|
|
"dc2": 0,
|
|
}
|
|
for _, rn := range out {
|
|
require.Equal(t, fmt.Sprintf("%.3f", expectedScores[rn.Node.Datacenter]), fmt.Sprintf("%.3f", rn.FinalScore))
|
|
|
|
}
|
|
|
|
// Update the plan to add allocs to nodes in dc1
|
|
// After this step dc2 nodes should get boosted
|
|
ctx.plan.NodeAllocation[nodes[0].Node.ID] = []*structs.Allocation{
|
|
{
|
|
Namespace: structs.DefaultNamespace,
|
|
TaskGroup: tg.Name,
|
|
JobID: job.ID,
|
|
Job: job,
|
|
ID: uuid.Generate(),
|
|
NodeID: nodes[0].Node.ID,
|
|
},
|
|
}
|
|
ctx.plan.NodeAllocation[nodes[2].Node.ID] = []*structs.Allocation{
|
|
{
|
|
Namespace: structs.DefaultNamespace,
|
|
TaskGroup: tg.Name,
|
|
JobID: job.ID,
|
|
Job: job,
|
|
ID: uuid.Generate(),
|
|
NodeID: nodes[2].Node.ID,
|
|
},
|
|
}
|
|
|
|
// Reset the scores
|
|
for _, node := range nodes {
|
|
node.Scores = nil
|
|
node.FinalScore = 0
|
|
}
|
|
static = NewStaticRankIterator(ctx, nodes)
|
|
spreadIter = NewSpreadIterator(ctx, static)
|
|
spreadIter.SetJob(job)
|
|
spreadIter.SetTaskGroup(tg)
|
|
scoreNorm = NewScoreNormalizationIterator(ctx, spreadIter)
|
|
out = collectRanked(scoreNorm)
|
|
|
|
// Expect nodes in dc2 with existing allocs to get a boost
|
|
// dc1 nodes are penalized because they have allocs
|
|
expectedScores = map[string]float64{
|
|
"dc1": -1,
|
|
"dc2": 1,
|
|
}
|
|
for _, rn := range out {
|
|
require.Equal(t, expectedScores[rn.Node.Datacenter], rn.FinalScore)
|
|
}
|
|
|
|
// Update the plan to add more allocs to nodes in dc2
|
|
// After this step dc1 nodes should get boosted
|
|
ctx.plan.NodeAllocation[nodes[1].Node.ID] = []*structs.Allocation{
|
|
{
|
|
Namespace: structs.DefaultNamespace,
|
|
TaskGroup: tg.Name,
|
|
JobID: job.ID,
|
|
Job: job,
|
|
ID: uuid.Generate(),
|
|
NodeID: nodes[1].Node.ID,
|
|
},
|
|
{
|
|
Namespace: structs.DefaultNamespace,
|
|
TaskGroup: tg.Name,
|
|
JobID: job.ID,
|
|
Job: job,
|
|
ID: uuid.Generate(),
|
|
NodeID: nodes[1].Node.ID,
|
|
},
|
|
}
|
|
ctx.plan.NodeAllocation[nodes[3].Node.ID] = []*structs.Allocation{
|
|
{
|
|
Namespace: structs.DefaultNamespace,
|
|
TaskGroup: tg.Name,
|
|
JobID: job.ID,
|
|
Job: job,
|
|
ID: uuid.Generate(),
|
|
NodeID: nodes[3].Node.ID,
|
|
},
|
|
}
|
|
|
|
// Reset the scores
|
|
for _, node := range nodes {
|
|
node.Scores = nil
|
|
node.FinalScore = 0
|
|
}
|
|
static = NewStaticRankIterator(ctx, nodes)
|
|
spreadIter = NewSpreadIterator(ctx, static)
|
|
spreadIter.SetJob(job)
|
|
spreadIter.SetTaskGroup(tg)
|
|
scoreNorm = NewScoreNormalizationIterator(ctx, spreadIter)
|
|
out = collectRanked(scoreNorm)
|
|
|
|
// Expect nodes in dc2 to be penalized because there are 3 allocs there now
|
|
// dc1 nodes are boosted because that has 2 allocs
|
|
expectedScores = map[string]float64{
|
|
"dc1": 0.5,
|
|
"dc2": -0.5,
|
|
}
|
|
for _, rn := range out {
|
|
require.Equal(t, fmt.Sprintf("%3.3f", expectedScores[rn.Node.Datacenter]), fmt.Sprintf("%3.3f", rn.FinalScore))
|
|
}
|
|
|
|
// Add another node in dc3
|
|
node := mock.Node()
|
|
node.Datacenter = "dc3"
|
|
if err := state.UpsertNode(structs.MsgTypeTestSetup, uint64(1111), node); err != nil {
|
|
t.Fatalf("failed to upsert node: %v", err)
|
|
}
|
|
nodes = append(nodes, &RankedNode{Node: node})
|
|
|
|
// Add another alloc to dc1, now its count matches dc2
|
|
ctx.plan.NodeAllocation[nodes[4].Node.ID] = []*structs.Allocation{
|
|
{
|
|
Namespace: structs.DefaultNamespace,
|
|
TaskGroup: tg.Name,
|
|
JobID: job.ID,
|
|
Job: job,
|
|
ID: uuid.Generate(),
|
|
NodeID: nodes[4].Node.ID,
|
|
},
|
|
}
|
|
|
|
// Reset scores
|
|
for _, node := range nodes {
|
|
node.Scores = nil
|
|
node.FinalScore = 0
|
|
}
|
|
static = NewStaticRankIterator(ctx, nodes)
|
|
spreadIter = NewSpreadIterator(ctx, static)
|
|
spreadIter.SetJob(job)
|
|
spreadIter.SetTaskGroup(tg)
|
|
scoreNorm = NewScoreNormalizationIterator(ctx, spreadIter)
|
|
out = collectRanked(scoreNorm)
|
|
|
|
// Expect dc1 and dc2 to be penalized because they have 3 allocs
|
|
// dc3 should get a boost because it has 0 allocs
|
|
expectedScores = map[string]float64{
|
|
"dc1": -1,
|
|
"dc2": -1,
|
|
"dc3": 1,
|
|
}
|
|
for _, rn := range out {
|
|
require.Equal(t, fmt.Sprintf("%.3f", expectedScores[rn.Node.Datacenter]), fmt.Sprintf("%.3f", rn.FinalScore))
|
|
}
|
|
|
|
}
|
|
|
|
// Test scenarios where the spread iterator sets maximum penalty (-1.0)
|
|
func TestSpreadIterator_MaxPenalty(t *testing.T) {
|
|
state, ctx := testContext(t)
|
|
var nodes []*RankedNode
|
|
|
|
// Add nodes in dc3 to the state store
|
|
for i := 0; i < 5; i++ {
|
|
node := mock.Node()
|
|
node.Datacenter = "dc3"
|
|
if err := state.UpsertNode(structs.MsgTypeTestSetup, uint64(100+i), node); err != nil {
|
|
t.Fatalf("failed to upsert node: %v", err)
|
|
}
|
|
nodes = append(nodes, &RankedNode{Node: node})
|
|
}
|
|
|
|
static := NewStaticRankIterator(ctx, nodes)
|
|
|
|
job := mock.Job()
|
|
tg := job.TaskGroups[0]
|
|
job.TaskGroups[0].Count = 5
|
|
|
|
// Create spread target of 80% in dc1
|
|
// and 20% in dc2
|
|
spread := &structs.Spread{
|
|
Weight: 100,
|
|
Attribute: "${node.datacenter}",
|
|
SpreadTarget: []*structs.SpreadTarget{
|
|
{
|
|
Value: "dc1",
|
|
Percent: 80,
|
|
},
|
|
{
|
|
Value: "dc2",
|
|
Percent: 20,
|
|
},
|
|
},
|
|
}
|
|
tg.Spreads = []*structs.Spread{spread}
|
|
spreadIter := NewSpreadIterator(ctx, static)
|
|
spreadIter.SetJob(job)
|
|
spreadIter.SetTaskGroup(tg)
|
|
|
|
scoreNorm := NewScoreNormalizationIterator(ctx, spreadIter)
|
|
|
|
out := collectRanked(scoreNorm)
|
|
|
|
// All nodes are in dc3 so score should be -1
|
|
for _, rn := range out {
|
|
require.Equal(t, -1.0, rn.FinalScore)
|
|
}
|
|
|
|
// Reset scores
|
|
for _, node := range nodes {
|
|
node.Scores = nil
|
|
node.FinalScore = 0
|
|
}
|
|
|
|
// Create spread on attribute that doesn't exist on any nodes
|
|
spread = &structs.Spread{
|
|
Weight: 100,
|
|
Attribute: "${meta.foo}",
|
|
SpreadTarget: []*structs.SpreadTarget{
|
|
{
|
|
Value: "bar",
|
|
Percent: 80,
|
|
},
|
|
{
|
|
Value: "baz",
|
|
Percent: 20,
|
|
},
|
|
},
|
|
}
|
|
|
|
tg.Spreads = []*structs.Spread{spread}
|
|
static = NewStaticRankIterator(ctx, nodes)
|
|
spreadIter = NewSpreadIterator(ctx, static)
|
|
spreadIter.SetJob(job)
|
|
spreadIter.SetTaskGroup(tg)
|
|
scoreNorm = NewScoreNormalizationIterator(ctx, spreadIter)
|
|
out = collectRanked(scoreNorm)
|
|
|
|
// All nodes don't have the spread attribute so score should be -1
|
|
for _, rn := range out {
|
|
require.Equal(t, -1.0, rn.FinalScore)
|
|
}
|
|
|
|
}
|
|
|
|
func Test_evenSpreadScoreBoost(t *testing.T) {
|
|
pset := &propertySet{
|
|
existingValues: map[string]uint64{},
|
|
proposedValues: map[string]uint64{
|
|
"dc2": 1,
|
|
"dc1": 1,
|
|
"dc3": 1,
|
|
},
|
|
clearedValues: map[string]uint64{
|
|
"dc2": 1,
|
|
"dc3": 1,
|
|
},
|
|
targetAttribute: "${node.datacenter}",
|
|
}
|
|
|
|
opt := &structs.Node{
|
|
Datacenter: "dc2",
|
|
}
|
|
boost := evenSpreadScoreBoost(pset, opt)
|
|
require.False(t, math.IsInf(boost, 1))
|
|
require.Equal(t, 1.0, boost)
|
|
}
|