Merge pull request #4640 from hashicorp/f-affinities-spread

Affinities and spread
This commit is contained in:
Preetha 2018-09-04 16:25:14 -06:00 committed by GitHub
commit 94e4a5345e
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
39 changed files with 3674 additions and 766 deletions

View File

@ -109,9 +109,19 @@ type AllocationMetric struct {
ClassExhausted map[string]int
DimensionExhausted map[string]int
QuotaExhausted []string
Scores map[string]float64
AllocationTime time.Duration
CoalescedFailures int
// Deprecated, replaced with ScoreMetaData
Scores map[string]float64
AllocationTime time.Duration
CoalescedFailures int
ScoreMetaData []*NodeScoreMeta
}
// NodeScoreMeta is used to serialize node scoring metadata
// displayed in the CLI during verbose mode
type NodeScoreMeta struct {
NodeID string
Scores map[string]float64
NormScore float64
}
// AllocationListStub is used to return a subset of an allocation

View File

@ -29,8 +29,13 @@ func TestCompose(t *testing.T) {
})
// Compose a task group
st1 := NewSpreadTarget("dc1", 80)
st2 := NewSpreadTarget("dc2", 20)
grp := NewTaskGroup("grp1", 2).
Constrain(NewConstraint("kernel.name", "=", "linux")).
AddAffinity(NewAffinity("${node.class}", "=", "large", 50)).
AddSpread(NewSpread("${node.datacenter}", 30, []*SpreadTarget{st1, st2})).
SetMeta("foo", "bar").
AddTask(task)
@ -72,6 +77,30 @@ func TestCompose(t *testing.T) {
Operand: "=",
},
},
Affinities: []*Affinity{
{
LTarget: "${node.class}",
RTarget: "large",
Operand: "=",
Weight: 50,
},
},
Spreads: []*Spread{
{
Attribute: "${node.datacenter}",
Weight: 30,
SpreadTarget: []*SpreadTarget{
{
Value: "dc1",
Percent: 80,
},
{
Value: "dc2",
Percent: 20,
},
},
},
},
Tasks: []*Task{
{
Name: "task1",

View File

@ -610,8 +610,10 @@ type Job struct {
AllAtOnce *bool `mapstructure:"all_at_once"`
Datacenters []string
Constraints []*Constraint
Affinities []*Affinity
TaskGroups []*TaskGroup
Update *UpdateStrategy
Spreads []*Spread
Periodic *PeriodicConfig
ParameterizedJob *ParameterizedJobConfig
Dispatched bool
@ -836,6 +838,12 @@ func (j *Job) Constrain(c *Constraint) *Job {
return j
}
// AddAffinity is used to add an affinity to a job.
func (j *Job) AddAffinity(a *Affinity) *Job {
j.Affinities = append(j.Affinities, a)
return j
}
// AddTaskGroup adds a task group to an existing job.
func (j *Job) AddTaskGroup(grp *TaskGroup) *Job {
j.TaskGroups = append(j.TaskGroups, grp)
@ -848,6 +856,11 @@ func (j *Job) AddPeriodicConfig(cfg *PeriodicConfig) *Job {
return j
}
func (j *Job) AddSpread(s *Spread) *Job {
j.Spreads = append(j.Spreads, s)
return j
}
type WriteRequest struct {
// The target region for this write
Region string

View File

@ -1332,6 +1332,42 @@ func TestJobs_Constrain(t *testing.T) {
}
}
func TestJobs_AddAffinity(t *testing.T) {
t.Parallel()
job := &Job{Affinities: nil}
// Create and add an affinity
out := job.AddAffinity(NewAffinity("kernel.version", "=", "4.6", 100))
if n := len(job.Affinities); n != 1 {
t.Fatalf("expected 1 affinity, got: %d", n)
}
// Check that the job was returned
if job != out {
t.Fatalf("expect: %#v, got: %#v", job, out)
}
// Adding another affinity preserves the original
job.AddAffinity(NewAffinity("${node.datacenter}", "=", "dc2", 50))
expect := []*Affinity{
{
LTarget: "kernel.version",
RTarget: "4.6",
Operand: "=",
Weight: 100,
},
{
LTarget: "${node.datacenter}",
RTarget: "dc2",
Operand: "=",
Weight: 50,
},
}
if !reflect.DeepEqual(job.Affinities, expect) {
t.Fatalf("expect: %#v, got: %#v", expect, job.Affinities)
}
}
func TestJobs_Sort(t *testing.T) {
t.Parallel()
jobs := []*JobListStub{
@ -1351,6 +1387,57 @@ func TestJobs_Sort(t *testing.T) {
}
}
func TestJobs_AddSpread(t *testing.T) {
t.Parallel()
job := &Job{Spreads: nil}
// Create and add a Spread
spreadTarget := NewSpreadTarget("r1", 50)
spread := NewSpread("${meta.rack}", 100, []*SpreadTarget{spreadTarget})
out := job.AddSpread(spread)
if n := len(job.Spreads); n != 1 {
t.Fatalf("expected 1 spread, got: %d", n)
}
// Check that the job was returned
if job != out {
t.Fatalf("expect: %#v, got: %#v", job, out)
}
// Adding another spread preserves the original
spreadTarget2 := NewSpreadTarget("dc1", 100)
spread2 := NewSpread("${node.datacenter}", 100, []*SpreadTarget{spreadTarget2})
job.AddSpread(spread2)
expect := []*Spread{
{
Attribute: "${meta.rack}",
Weight: 100,
SpreadTarget: []*SpreadTarget{
{
Value: "r1",
Percent: 50,
},
},
},
{
Attribute: "${node.datacenter}",
Weight: 100,
SpreadTarget: []*SpreadTarget{
{
Value: "dc1",
Percent: 100,
},
},
},
}
if !reflect.DeepEqual(job.Spreads, expect) {
t.Fatalf("expect: %#v, got: %#v", expect, job.Spreads)
}
}
func TestJobs_Summary_WithACL(t *testing.T) {
t.Parallel()
assert := assert.New(t)

View File

@ -148,6 +148,23 @@ func (r *ReschedulePolicy) Canonicalize(jobType string) {
}
}
// Affinity is used to serialize task group affinities
type Affinity struct {
LTarget string // Left-hand target
RTarget string // Right-hand target
Operand string // Constraint operand (<=, <, =, !=, >, >=), set_contains_all, set_contains_any
Weight float64 // Weight applied to nodes that match the affinity. Can be negative
}
func NewAffinity(LTarget string, Operand string, RTarget string, Weight float64) *Affinity {
return &Affinity{
LTarget: LTarget,
RTarget: RTarget,
Operand: Operand,
Weight: Weight,
}
}
func NewDefaultReschedulePolicy(jobType string) *ReschedulePolicy {
var dp *ReschedulePolicy
switch jobType {
@ -202,6 +219,34 @@ func (p *ReschedulePolicy) String() string {
return fmt.Sprintf("%v in %v with %v delay, max_delay = %v", *p.Attempts, *p.Interval, *p.DelayFunction, *p.MaxDelay)
}
// Spread is used to serialize task group allocation spread preferences
type Spread struct {
Attribute string
Weight int
SpreadTarget []*SpreadTarget
}
// SpreadTarget is used to serialize target allocation spread percentages
type SpreadTarget struct {
Value string
Percent uint32
}
func NewSpreadTarget(value string, percent uint32) *SpreadTarget {
return &SpreadTarget{
Value: value,
Percent: percent,
}
}
func NewSpread(attribute string, weight int, spreadTargets []*SpreadTarget) *Spread {
return &Spread{
Attribute: attribute,
Weight: weight,
SpreadTarget: spreadTargets,
}
}
// CheckRestart describes if and when a task should be restarted based on
// failing health checks.
type CheckRestart struct {
@ -413,7 +458,9 @@ type TaskGroup struct {
Name *string
Count *int
Constraints []*Constraint
Affinities []*Affinity
Tasks []*Task
Spreads []*Spread
RestartPolicy *RestartPolicy
ReschedulePolicy *ReschedulePolicy
EphemeralDisk *EphemeralDisk
@ -543,12 +590,24 @@ func (g *TaskGroup) AddTask(t *Task) *TaskGroup {
return g
}
// AddAffinity is used to add a new affinity to a task group.
func (g *TaskGroup) AddAffinity(a *Affinity) *TaskGroup {
g.Affinities = append(g.Affinities, a)
return g
}
// RequireDisk adds a ephemeral disk to the task group
func (g *TaskGroup) RequireDisk(disk *EphemeralDisk) *TaskGroup {
g.EphemeralDisk = disk
return g
}
// AddSpread is used to add a new spread preference to a task group.
func (g *TaskGroup) AddSpread(s *Spread) *TaskGroup {
g.Spreads = append(g.Spreads, s)
return g
}
// LogConfig provides configuration for log rotation
type LogConfig struct {
MaxFiles *int `mapstructure:"max_files"`
@ -583,6 +642,7 @@ type Task struct {
User string
Config map[string]interface{}
Constraints []*Constraint
Affinities []*Affinity
Env map[string]string
Services []*Service
Resources *Resources
@ -771,6 +831,12 @@ func (t *Task) Constrain(c *Constraint) *Task {
return t
}
// AddAffinity adds a new affinity to a single task.
func (t *Task) AddAffinity(a *Affinity) *Task {
t.Affinities = append(t.Affinities, a)
return t
}
// SetLogConfig sets a log config to a task
func (t *Task) SetLogConfig(l *LogConfig) *Task {
t.LogConfig = l

View File

@ -8,6 +8,7 @@ import (
"github.com/hashicorp/nomad/helper"
"github.com/hashicorp/nomad/nomad/structs"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)
func TestTaskGroup_NewTaskGroup(t *testing.T) {
@ -56,6 +57,42 @@ func TestTaskGroup_Constrain(t *testing.T) {
}
}
func TestTaskGroup_AddAffinity(t *testing.T) {
t.Parallel()
grp := NewTaskGroup("grp1", 1)
// Add an affinity to the group
out := grp.AddAffinity(NewAffinity("kernel.version", "=", "4.6", 100))
if n := len(grp.Affinities); n != 1 {
t.Fatalf("expected 1 affinity, got: %d", n)
}
// Check that the group was returned
if out != grp {
t.Fatalf("expected: %#v, got: %#v", grp, out)
}
// Add a second affinity
grp.AddAffinity(NewAffinity("${node.affinity}", "=", "dc2", 50))
expect := []*Affinity{
{
LTarget: "kernel.version",
RTarget: "4.6",
Operand: "=",
Weight: 100,
},
{
LTarget: "${node.affinity}",
RTarget: "dc2",
Operand: "=",
Weight: 50,
},
}
if !reflect.DeepEqual(grp.Affinities, expect) {
t.Fatalf("expect: %#v, got: %#v", expect, grp.Constraints)
}
}
func TestTaskGroup_SetMeta(t *testing.T) {
t.Parallel()
grp := NewTaskGroup("grp1", 1)
@ -79,6 +116,57 @@ func TestTaskGroup_SetMeta(t *testing.T) {
}
}
func TestTaskGroup_AddSpread(t *testing.T) {
t.Parallel()
grp := NewTaskGroup("grp1", 1)
// Create and add spread
spreadTarget := NewSpreadTarget("r1", 50)
spread := NewSpread("${meta.rack}", 100, []*SpreadTarget{spreadTarget})
out := grp.AddSpread(spread)
if n := len(grp.Spreads); n != 1 {
t.Fatalf("expected 1 spread, got: %d", n)
}
// Check that the group was returned
if out != grp {
t.Fatalf("expected: %#v, got: %#v", grp, out)
}
// Add a second spread
spreadTarget2 := NewSpreadTarget("dc1", 100)
spread2 := NewSpread("${node.datacenter}", 100, []*SpreadTarget{spreadTarget2})
grp.AddSpread(spread2)
expect := []*Spread{
{
Attribute: "${meta.rack}",
Weight: 100,
SpreadTarget: []*SpreadTarget{
{
Value: "r1",
Percent: 50,
},
},
},
{
Attribute: "${node.datacenter}",
Weight: 100,
SpreadTarget: []*SpreadTarget{
{
Value: "dc1",
Percent: 100,
},
},
},
}
if !reflect.DeepEqual(grp.Spreads, expect) {
t.Fatalf("expect: %#v, got: %#v", expect, grp.Spreads)
}
}
func TestTaskGroup_AddTask(t *testing.T) {
t.Parallel()
grp := NewTaskGroup("grp1", 1)
@ -232,6 +320,41 @@ func TestTask_Constrain(t *testing.T) {
}
}
func TestTask_AddAffinity(t *testing.T) {
t.Parallel()
task := NewTask("task1", "exec")
// Add an affinity to the task
out := task.AddAffinity(NewAffinity("kernel.version", "=", "4.6", 100))
require := require.New(t)
require.Len(out.Affinities, 1)
// Check that the task was returned
if out != task {
t.Fatalf("expected: %#v, got: %#v", task, out)
}
// Add a second affinity
task.AddAffinity(NewAffinity("${node.datacenter}", "=", "dc2", 50))
expect := []*Affinity{
{
LTarget: "kernel.version",
RTarget: "4.6",
Operand: "=",
Weight: 100,
},
{
LTarget: "${node.datacenter}",
RTarget: "dc2",
Operand: "=",
Weight: 50,
},
}
if !reflect.DeepEqual(task.Affinities, expect) {
t.Fatalf("expect: %#v, got: %#v", expect, task.Affinities)
}
}
func TestTask_Artifact(t *testing.T) {
t.Parallel()
a := TaskArtifact{

File diff suppressed because one or more lines are too long

View File

@ -616,6 +616,13 @@ func ApiJobToStructJob(job *api.Job) *structs.Job {
}
}
if l := len(job.Affinities); l != 0 {
j.Affinities = make([]*structs.Affinity, l)
for i, a := range job.Affinities {
j.Affinities[i] = ApiAffinityToStructs(a)
}
}
// COMPAT: Remove in 0.7.0. Update has been pushed into the task groups
if job.Update != nil {
j.Update = structs.UpdateStrategy{}
@ -628,6 +635,13 @@ func ApiJobToStructJob(job *api.Job) *structs.Job {
}
}
if l := len(job.Spreads); l != 0 {
j.Spreads = make([]*structs.Spread, l)
for i, apiSpread := range job.Spreads {
j.Spreads[i] = ApiSpreadToStructs(apiSpread)
}
}
if job.Periodic != nil {
j.Periodic = &structs.PeriodicConfig{
Enabled: *job.Periodic.Enabled,
@ -675,6 +689,13 @@ func ApiTgToStructsTG(taskGroup *api.TaskGroup, tg *structs.TaskGroup) {
}
}
if l := len(taskGroup.Affinities); l != 0 {
tg.Affinities = make([]*structs.Affinity, l)
for k, affinity := range taskGroup.Affinities {
tg.Affinities[k] = ApiAffinityToStructs(affinity)
}
}
tg.RestartPolicy = &structs.RestartPolicy{
Attempts: *taskGroup.RestartPolicy.Attempts,
Interval: *taskGroup.RestartPolicy.Interval,
@ -708,6 +729,13 @@ func ApiTgToStructsTG(taskGroup *api.TaskGroup, tg *structs.TaskGroup) {
Migrate: *taskGroup.EphemeralDisk.Migrate,
}
if l := len(taskGroup.Spreads); l != 0 {
tg.Spreads = make([]*structs.Spread, l)
for k, spread := range taskGroup.Spreads {
tg.Spreads[k] = ApiSpreadToStructs(spread)
}
}
if taskGroup.Update != nil {
tg.Update = &structs.UpdateStrategy{
Stagger: *taskGroup.Update.Stagger,
@ -754,6 +782,13 @@ func ApiTaskToStructsTask(apiTask *api.Task, structsTask *structs.Task) {
}
}
if l := len(apiTask.Affinities); l != 0 {
structsTask.Affinities = make([]*structs.Affinity, l)
for i, a := range apiTask.Affinities {
structsTask.Affinities[i] = ApiAffinityToStructs(a)
}
}
if l := len(apiTask.Services); l != 0 {
structsTask.Services = make([]*structs.Service, l)
for i, service := range apiTask.Services {
@ -892,3 +927,28 @@ func ApiConstraintToStructs(c1 *api.Constraint, c2 *structs.Constraint) {
c2.RTarget = c1.RTarget
c2.Operand = c1.Operand
}
func ApiAffinityToStructs(a1 *api.Affinity) *structs.Affinity {
return &structs.Affinity{
LTarget: a1.LTarget,
Operand: a1.Operand,
RTarget: a1.RTarget,
Weight: a1.Weight,
}
}
func ApiSpreadToStructs(a1 *api.Spread) *structs.Spread {
ret := &structs.Spread{}
ret.Attribute = a1.Attribute
ret.Weight = a1.Weight
if a1.SpreadTarget != nil {
ret.SpreadTarget = make([]*structs.SpreadTarget, len(a1.SpreadTarget))
for i, st := range a1.SpreadTarget {
ret.SpreadTarget[i] = &structs.SpreadTarget{
Value: st.Value,
Percent: st.Percent,
}
}
}
return ret
}

View File

@ -1211,6 +1211,14 @@ func TestJobs_ApiJobToStructsJob(t *testing.T) {
Operand: "c",
},
},
Affinities: []*api.Affinity{
{
LTarget: "a",
RTarget: "b",
Operand: "c",
Weight: 50,
},
},
Update: &api.UpdateStrategy{
Stagger: helper.TimeToPtr(1 * time.Second),
MaxParallel: helper.IntToPtr(5),
@ -1221,6 +1229,18 @@ func TestJobs_ApiJobToStructsJob(t *testing.T) {
AutoRevert: helper.BoolToPtr(false),
Canary: helper.IntToPtr(1),
},
Spreads: []*api.Spread{
{
Attribute: "${meta.rack}",
Weight: 100,
SpreadTarget: []*api.SpreadTarget{
{
Value: "r1",
Percent: 50,
},
},
},
},
Periodic: &api.PeriodicConfig{
Enabled: helper.BoolToPtr(true),
Spec: helper.StringToPtr("spec"),
@ -1248,6 +1268,14 @@ func TestJobs_ApiJobToStructsJob(t *testing.T) {
Operand: "z",
},
},
Affinities: []*api.Affinity{
{
LTarget: "x",
RTarget: "y",
Operand: "z",
Weight: 100,
},
},
RestartPolicy: &api.RestartPolicy{
Interval: helper.TimeToPtr(1 * time.Second),
Attempts: helper.IntToPtr(5),
@ -1268,6 +1296,18 @@ func TestJobs_ApiJobToStructsJob(t *testing.T) {
MinHealthyTime: helper.TimeToPtr(12 * time.Hour),
HealthyDeadline: helper.TimeToPtr(12 * time.Hour),
},
Spreads: []*api.Spread{
{
Attribute: "${node.datacenter}",
Weight: 100,
SpreadTarget: []*api.SpreadTarget{
{
Value: "dc1",
Percent: 100,
},
},
},
},
EphemeralDisk: &api.EphemeralDisk{
SizeMB: helper.IntToPtr(100),
Sticky: helper.BoolToPtr(true),
@ -1303,6 +1343,14 @@ func TestJobs_ApiJobToStructsJob(t *testing.T) {
Operand: "z",
},
},
Affinities: []*api.Affinity{
{
LTarget: "a",
RTarget: "b",
Operand: "c",
Weight: 50,
},
},
Services: []*api.Service{
{
@ -1443,6 +1491,26 @@ func TestJobs_ApiJobToStructsJob(t *testing.T) {
Operand: "c",
},
},
Affinities: []*structs.Affinity{
{
LTarget: "a",
RTarget: "b",
Operand: "c",
Weight: 50,
},
},
Spreads: []*structs.Spread{
{
Attribute: "${meta.rack}",
Weight: 100,
SpreadTarget: []*structs.SpreadTarget{
{
Value: "r1",
Percent: 50,
},
},
},
},
Update: structs.UpdateStrategy{
Stagger: 1 * time.Second,
MaxParallel: 5,
@ -1474,12 +1542,32 @@ func TestJobs_ApiJobToStructsJob(t *testing.T) {
Operand: "z",
},
},
Affinities: []*structs.Affinity{
{
LTarget: "x",
RTarget: "y",
Operand: "z",
Weight: 100,
},
},
RestartPolicy: &structs.RestartPolicy{
Interval: 1 * time.Second,
Attempts: 5,
Delay: 10 * time.Second,
Mode: "delay",
},
Spreads: []*structs.Spread{
{
Attribute: "${node.datacenter}",
Weight: 100,
SpreadTarget: []*structs.SpreadTarget{
{
Value: "dc1",
Percent: 100,
},
},
},
},
ReschedulePolicy: &structs.ReschedulePolicy{
Interval: 12 * time.Hour,
Attempts: 5,
@ -1528,6 +1616,14 @@ func TestJobs_ApiJobToStructsJob(t *testing.T) {
Operand: "z",
},
},
Affinities: []*structs.Affinity{
{
LTarget: "a",
RTarget: "b",
Operand: "c",
Weight: 50,
},
},
Env: map[string]string{
"hello": "world",
},

View File

@ -224,6 +224,65 @@ func TestAllocStatusCommand_RescheduleInfo(t *testing.T) {
require.Regexp(regexp.MustCompile(".*Reschedule Attempts\\s*=\\s*1/2"), out)
}
func TestAllocStatusCommand_ScoreMetrics(t *testing.T) {
t.Parallel()
srv, client, url := testServer(t, true, nil)
defer srv.Shutdown()
// Wait for a node to be ready
testutil.WaitForResult(func() (bool, error) {
nodes, _, err := client.Nodes().List(nil)
if err != nil {
return false, err
}
for _, node := range nodes {
if node.Status == structs.NodeStatusReady {
return true, nil
}
}
return false, fmt.Errorf("no ready nodes")
}, func(err error) {
t.Fatalf("err: %v", err)
})
ui := new(cli.MockUi)
cmd := &AllocStatusCommand{Meta: Meta{Ui: ui}}
// Test reschedule attempt info
require := require.New(t)
state := srv.Agent.Server().State()
a := mock.Alloc()
mockNode1 := mock.Node()
mockNode2 := mock.Node()
a.Metrics = &structs.AllocMetric{
ScoreMetaData: []*structs.NodeScoreMeta{
{
NodeID: mockNode1.ID,
Scores: map[string]float64{
"binpack": 0.77,
"node-affinity": 0.5,
},
},
{
NodeID: mockNode2.ID,
Scores: map[string]float64{
"binpack": 0.75,
"node-affinity": 0.33,
},
},
},
}
require.Nil(state.UpsertAllocs(1000, []*structs.Allocation{a}))
if code := cmd.Run([]string{"-address=" + url, "-verbose", a.ID}); code != 0 {
t.Fatalf("expected exit 0, got: %d", code)
}
out := ui.OutputWriter.String()
require.Contains(out, "Placement Metrics")
require.Contains(out, mockNode1.ID)
require.Contains(out, mockNode2.ID)
require.Contains(out, "Final Score")
}
func TestAllocStatusCommand_AutocompleteArgs(t *testing.T) {
assert := assert.New(t)
t.Parallel()

View File

@ -373,8 +373,30 @@ func formatAllocMetrics(metrics *api.AllocationMetric, scores bool, prefix strin
// Print scores
if scores {
for name, score := range metrics.Scores {
out += fmt.Sprintf("%s* Score %q = %f\n", prefix, name, score)
if len(metrics.ScoreMetaData) > 0 {
scoreOutput := make([]string, len(metrics.ScoreMetaData)+1)
for i, scoreMeta := range metrics.ScoreMetaData {
// Add header as first row
if i == 0 {
scoreOutput[0] = "Node|"
for scorerName := range scoreMeta.Scores {
scoreOutput[0] += fmt.Sprintf("%v|", scorerName)
}
scoreOutput[0] += "Final Score"
}
scoreOutput[i+1] = fmt.Sprintf("%v|", scoreMeta.NodeID)
for _, scoreVal := range scoreMeta.Scores {
scoreOutput[i+1] += fmt.Sprintf("%v|", scoreVal)
}
scoreOutput[i+1] += fmt.Sprintf("%v", scoreMeta.NormScore)
}
out += formatList(scoreOutput)
} else {
// Backwards compatibility for old allocs
for name, score := range metrics.Scores {
out += fmt.Sprintf("%s* Score %q = %f\n", prefix, name, score)
}
}
}

View File

@ -103,6 +103,7 @@ func parseJob(result *api.Job, list *ast.ObjectList) error {
return err
}
delete(m, "constraint")
delete(m, "affinity")
delete(m, "meta")
delete(m, "migrate")
delete(m, "parameterized")
@ -110,6 +111,7 @@ func parseJob(result *api.Job, list *ast.ObjectList) error {
delete(m, "reschedule")
delete(m, "update")
delete(m, "vault")
delete(m, "spread")
// Set the ID and name to the object key
result.ID = helper.StringToPtr(obj.Keys[0].Token.Value().(string))
@ -132,6 +134,8 @@ func parseJob(result *api.Job, list *ast.ObjectList) error {
valid := []string{
"all_at_once",
"constraint",
"affinity",
"spread",
"datacenters",
"group",
"id",
@ -161,6 +165,13 @@ func parseJob(result *api.Job, list *ast.ObjectList) error {
}
}
// Parse affinities
if o := listVal.Filter("affinity"); len(o.Items) > 0 {
if err := parseAffinities(&result.Affinities, o); err != nil {
return multierror.Prefix(err, "affinity ->")
}
}
// If we have an update strategy, then parse that
if o := listVal.Filter("update"); len(o.Items) > 0 {
if err := parseUpdate(&result.Update, o); err != nil {
@ -175,6 +186,13 @@ func parseJob(result *api.Job, list *ast.ObjectList) error {
}
}
// Parse spread
if o := listVal.Filter("spread"); len(o.Items) > 0 {
if err := parseSpread(&result.Spreads, o); err != nil {
return multierror.Prefix(err, "spread ->")
}
}
// If we have a parameterized definition, then parse that
if o := listVal.Filter("parameterized"); len(o.Items) > 0 {
if err := parseParameterizedJob(&result.ParameterizedJob, o); err != nil {
@ -287,6 +305,7 @@ func parseGroups(result *api.Job, list *ast.ObjectList) error {
valid := []string{
"count",
"constraint",
"affinity",
"restart",
"meta",
"task",
@ -295,6 +314,7 @@ func parseGroups(result *api.Job, list *ast.ObjectList) error {
"reschedule",
"vault",
"migrate",
"spread",
}
if err := helper.CheckHCLKeys(listVal, valid); err != nil {
return multierror.Prefix(err, fmt.Sprintf("'%s' ->", n))
@ -305,6 +325,7 @@ func parseGroups(result *api.Job, list *ast.ObjectList) error {
return err
}
delete(m, "constraint")
delete(m, "affinity")
delete(m, "meta")
delete(m, "task")
delete(m, "restart")
@ -312,6 +333,7 @@ func parseGroups(result *api.Job, list *ast.ObjectList) error {
delete(m, "update")
delete(m, "vault")
delete(m, "migrate")
delete(m, "spread")
// Build the group with the basic decode
var g api.TaskGroup
@ -327,6 +349,13 @@ func parseGroups(result *api.Job, list *ast.ObjectList) error {
}
}
// Parse affinities
if o := listVal.Filter("affinity"); len(o.Items) > 0 {
if err := parseAffinities(&g.Affinities, o); err != nil {
return multierror.Prefix(err, fmt.Sprintf("'%s', affinity ->", n))
}
}
// Parse restart policy
if o := listVal.Filter("restart"); len(o.Items) > 0 {
if err := parseRestartPolicy(&g.RestartPolicy, o); err != nil {
@ -334,6 +363,13 @@ func parseGroups(result *api.Job, list *ast.ObjectList) error {
}
}
// Parse spread
if o := listVal.Filter("spread"); len(o.Items) > 0 {
if err := parseSpread(&g.Spreads, o); err != nil {
return multierror.Prefix(err, "spread ->")
}
}
// Parse reschedule policy
if o := listVal.Filter("reschedule"); len(o.Items) > 0 {
if err := parseReschedulePolicy(&g.ReschedulePolicy, o); err != nil {
@ -576,6 +612,82 @@ func parseConstraints(result *[]*api.Constraint, list *ast.ObjectList) error {
return nil
}
func parseAffinities(result *[]*api.Affinity, list *ast.ObjectList) error {
for _, o := range list.Elem().Items {
// Check for invalid keys
valid := []string{
"attribute",
"operator",
"regexp",
"set_contains",
"set_contains_any",
"set_contains_all",
"value",
"version",
"weight",
}
if err := helper.CheckHCLKeys(o.Val, valid); err != nil {
return err
}
var m map[string]interface{}
if err := hcl.DecodeObject(&m, o.Val); err != nil {
return err
}
m["LTarget"] = m["attribute"]
m["RTarget"] = m["value"]
m["Operand"] = m["operator"]
// If "version" is provided, set the operand
// to "version" and the value to the "RTarget"
if affinity, ok := m[structs.ConstraintVersion]; ok {
m["Operand"] = structs.ConstraintVersion
m["RTarget"] = affinity
}
// If "regexp" is provided, set the operand
// to "regexp" and the value to the "RTarget"
if affinity, ok := m[structs.ConstraintRegex]; ok {
m["Operand"] = structs.ConstraintRegex
m["RTarget"] = affinity
}
// If "set_contains_any" is provided, set the operand
// to "set_contains_any" and the value to the "RTarget"
if affinity, ok := m[structs.ConstraintSetContaintsAny]; ok {
m["Operand"] = structs.ConstraintSetContaintsAny
m["RTarget"] = affinity
}
// If "set_contains_all" is provided, set the operand
// to "set_contains_all" and the value to the "RTarget"
if affinity, ok := m[structs.ConstraintSetContainsAll]; ok {
m["Operand"] = structs.ConstraintSetContainsAll
m["RTarget"] = affinity
}
// set_contains is a synonym of set_contains_all
if affinity, ok := m[structs.ConstraintSetContains]; ok {
m["Operand"] = structs.ConstraintSetContains
m["RTarget"] = affinity
}
// Build the affinity
var a api.Affinity
if err := mapstructure.WeakDecode(m, &a); err != nil {
return err
}
if a.Operand == "" {
a.Operand = "="
}
*result = append(*result, &a)
}
return nil
}
func parseEphemeralDisk(result **api.EphemeralDisk, list *ast.ObjectList) error {
list = list.Elem()
if len(list.Items) > 1 {
@ -609,6 +721,95 @@ func parseEphemeralDisk(result **api.EphemeralDisk, list *ast.ObjectList) error
return nil
}
func parseSpread(result *[]*api.Spread, list *ast.ObjectList) error {
for _, o := range list.Elem().Items {
// Check for invalid keys
valid := []string{
"attribute",
"weight",
"target",
}
if err := helper.CheckHCLKeys(o.Val, valid); err != nil {
return err
}
// We need this later
var listVal *ast.ObjectList
if ot, ok := o.Val.(*ast.ObjectType); ok {
listVal = ot.List
} else {
return fmt.Errorf("spread should be an object")
}
var m map[string]interface{}
if err := hcl.DecodeObject(&m, o.Val); err != nil {
return err
}
delete(m, "target")
// Build spread
var s api.Spread
if err := mapstructure.WeakDecode(m, &s); err != nil {
return err
}
// Parse spread target
if o := listVal.Filter("target"); len(o.Items) > 0 {
if err := parseSpreadTarget(&s.SpreadTarget, o); err != nil {
return multierror.Prefix(err, fmt.Sprintf("error parsing spread target"))
}
}
*result = append(*result, &s)
}
return nil
}
func parseSpreadTarget(result *[]*api.SpreadTarget, list *ast.ObjectList) error {
seen := make(map[string]struct{})
for _, item := range list.Items {
n := item.Keys[0].Token.Value().(string)
// Make sure we haven't already found this
if _, ok := seen[n]; ok {
return fmt.Errorf("target '%s' defined more than once", n)
}
seen[n] = struct{}{}
// We need this later
var listVal *ast.ObjectList
if ot, ok := item.Val.(*ast.ObjectType); ok {
listVal = ot.List
} else {
return fmt.Errorf("target should be an object")
}
// Check for invalid keys
valid := []string{
"percent",
"value",
}
if err := helper.CheckHCLKeys(listVal, valid); err != nil {
return multierror.Prefix(err, fmt.Sprintf("'%s' ->", n))
}
var m map[string]interface{}
if err := hcl.DecodeObject(&m, item.Val); err != nil {
return err
}
// Decode spread target
var g api.SpreadTarget
g.Value = n
if err := mapstructure.WeakDecode(m, &g); err != nil {
return err
}
*result = append(*result, &g)
}
return nil
}
// parseBool takes an interface value and tries to convert it to a boolean and
// returns an error if the type can't be converted.
func parseBool(value interface{}) (bool, error) {
@ -656,6 +857,7 @@ func parseTasks(jobName string, taskGroupName string, result *[]*api.Task, list
"artifact",
"config",
"constraint",
"affinity",
"dispatch_payload",
"driver",
"env",
@ -682,6 +884,7 @@ func parseTasks(jobName string, taskGroupName string, result *[]*api.Task, list
delete(m, "artifact")
delete(m, "config")
delete(m, "constraint")
delete(m, "affinity")
delete(m, "dispatch_payload")
delete(m, "env")
delete(m, "logs")
@ -751,6 +954,13 @@ func parseTasks(jobName string, taskGroupName string, result *[]*api.Task, list
}
}
// Parse affinities
if o := listVal.Filter("affinity"); len(o.Items) > 0 {
if err := parseAffinities(&t.Affinities, o); err != nil {
return multierror.Prefix(err, "affinity ->")
}
}
// Parse out meta fields. These are in HCL as a list so we need
// to iterate over them and merge them.
if metaO := listVal.Filter("meta"); len(metaO.Items) > 0 {

View File

@ -46,6 +46,32 @@ func TestParse(t *testing.T) {
},
},
Affinities: []*api.Affinity{
{
LTarget: "${meta.team}",
RTarget: "mobile",
Operand: "=",
Weight: 50,
},
},
Spreads: []*api.Spread{
{
Attribute: "${meta.rack}",
Weight: 100,
SpreadTarget: []*api.SpreadTarget{
{
Value: "r1",
Percent: 40,
},
{
Value: "r2",
Percent: 60,
},
},
},
},
Update: &api.UpdateStrategy{
Stagger: helper.TimeToPtr(60 * time.Second),
MaxParallel: helper.IntToPtr(2),
@ -84,6 +110,14 @@ func TestParse(t *testing.T) {
Operand: "=",
},
},
Affinities: []*api.Affinity{
{
LTarget: "${node.datacenter}",
RTarget: "dc2",
Operand: "=",
Weight: 100,
},
},
Meta: map[string]string{
"elb_mode": "tcp",
"elb_interval": "10",
@ -95,6 +129,26 @@ func TestParse(t *testing.T) {
Delay: helper.TimeToPtr(15 * time.Second),
Mode: helper.StringToPtr("delay"),
},
Spreads: []*api.Spread{
{
Attribute: "${node.datacenter}",
Weight: 50,
SpreadTarget: []*api.SpreadTarget{
{
Value: "dc1",
Percent: 50,
},
{
Value: "dc2",
Percent: 25,
},
{
Value: "dc3",
Percent: 25,
},
},
},
},
ReschedulePolicy: &api.ReschedulePolicy{
Interval: helper.TimeToPtr(12 * time.Hour),
Attempts: helper.IntToPtr(5),
@ -131,6 +185,14 @@ func TestParse(t *testing.T) {
},
},
},
Affinities: []*api.Affinity{
{
LTarget: "${meta.foo}",
RTarget: "a,b,c",
Operand: "set_contains",
Weight: 25,
},
},
Services: []*api.Service{
{
Tags: []string{"foo", "bar"},

View File

@ -16,6 +16,24 @@ job "binstore-storagelocker" {
value = "windows"
}
affinity {
attribute = "${meta.team}"
value = "mobile"
operator = "="
weight = 50
}
spread {
attribute = "${meta.rack}"
weight = 100
target "r1" {
percent = 40
}
target "r2" {
percent = 60
}
}
update {
stagger = "60s"
max_parallel = 2
@ -76,11 +94,40 @@ job "binstore-storagelocker" {
healthy_deadline = "11m"
}
affinity {
attribute = "${node.datacenter}"
value = "dc2"
operator = "="
weight = 100
}
spread {
attribute = "${node.datacenter}"
weight = 50
target "dc1" {
percent = 50
}
target "dc2" {
percent = 25
}
target "dc3" {
percent = 25
}
}
task "binstore" {
driver = "docker"
user = "bob"
leader = true
affinity {
attribute = "${meta.foo}"
value = "a,b,c"
operator = "set_contains"
weight = 25
}
config {
image = "hashicorp/binstore"

View File

@ -1,4 +1,4 @@
package lib
package delayheap
import (
"container/heap"

View File

@ -1,4 +1,4 @@
package lib
package delayheap
import (
"testing"

76
lib/kheap/score_heap.go Normal file
View File

@ -0,0 +1,76 @@
package kheap
import (
"container/heap"
)
// HeapItem is an interface type implemented by objects stored in the ScoreHeap
type HeapItem interface {
Data() interface{} // The data object
Score() float64 // Score to use as the sort criteria
}
// A ScoreHeap implements heap.Interface and is a min heap
// that keeps the top K elements by Score. Push can be called
// with an arbitrary number of values but only the top K are stored
type ScoreHeap struct {
items []HeapItem
capacity int
}
func NewScoreHeap(capacity uint32) *ScoreHeap {
return &ScoreHeap{capacity: int(capacity)}
}
func (pq ScoreHeap) Len() int { return len(pq.items) }
func (pq ScoreHeap) Less(i, j int) bool {
return pq.items[i].Score() < pq.items[j].Score()
}
func (pq ScoreHeap) Swap(i, j int) {
pq.items[i], pq.items[j] = pq.items[j], pq.items[i]
}
// Push implements heap.Interface and only stores
// the top K elements by Score
func (pq *ScoreHeap) Push(x interface{}) {
item := x.(HeapItem)
if len(pq.items) < pq.capacity {
pq.items = append(pq.items, item)
} else {
// Pop the lowest scoring element if this item's Score is
// greater than the min Score so far
minIndex := 0
min := pq.items[minIndex]
if item.Score() > min.Score() {
// Replace min and heapify
pq.items[minIndex] = item
heap.Fix(pq, minIndex)
}
}
}
// Push implements heap.Interface and returns the top K scoring
// elements in increasing order of Score. Callers must reverse the order
// of returned elements to get the top K scoring elements in descending order
func (pq *ScoreHeap) Pop() interface{} {
old := pq.items
n := len(old)
item := old[n-1]
pq.items = old[0 : n-1]
return item
}
// GetItemsReverse returns the items in this min heap in reverse order
// sorted by score descending
func (pq *ScoreHeap) GetItemsReverse() []interface{} {
ret := make([]interface{}, pq.Len())
i := pq.Len() - 1
for pq.Len() > 0 {
item := heap.Pop(pq)
ret[i] = item
i--
}
return ret
}

View File

@ -0,0 +1,92 @@
package kheap
import (
"container/heap"
"testing"
"github.com/stretchr/testify/require"
)
type heapItem struct {
Value string
ScoreVal float64
}
func (h *heapItem) Data() interface{} {
return h.Value
}
func (h *heapItem) Score() float64 {
return h.ScoreVal
}
func TestScoreHeap(t *testing.T) {
type testCase struct {
desc string
items map[string]float64
expected []*heapItem
}
cases := []testCase{
{
desc: "More than K elements",
items: map[string]float64{
"banana": 3.0,
"apple": 2.25,
"pear": 2.32,
"watermelon": 5.45,
"orange": 0.20,
"strawberry": 9.03,
"blueberry": 0.44,
"lemon": 3.9,
"cherry": 0.03,
},
expected: []*heapItem{
{Value: "pear", ScoreVal: 2.32},
{Value: "banana", ScoreVal: 3.0},
{Value: "lemon", ScoreVal: 3.9},
{Value: "watermelon", ScoreVal: 5.45},
{Value: "strawberry", ScoreVal: 9.03},
},
},
{
desc: "Less than K elements",
items: map[string]float64{
"eggplant": 9.0,
"okra": -1.0,
"corn": 0.25,
},
expected: []*heapItem{
{Value: "okra", ScoreVal: -1.0},
{Value: "corn", ScoreVal: 0.25},
{Value: "eggplant", ScoreVal: 9.0},
},
},
}
for _, tc := range cases {
t.Run("", func(t *testing.T) {
// Create Score heap, push elements into it
pq := NewScoreHeap(5)
for value, score := range tc.items {
heapItem := &heapItem{
Value: value,
ScoreVal: score,
}
heap.Push(pq, heapItem)
}
// Take the items out; they arrive in increasing Score order
require := require.New(t)
require.Equal(len(tc.expected), pq.Len())
i := 0
for pq.Len() > 0 {
item := heap.Pop(pq).(*heapItem)
require.Equal(tc.expected[i], item)
i++
}
})
}
}

View File

@ -12,7 +12,7 @@ import (
"github.com/armon/go-metrics"
"github.com/hashicorp/nomad/helper/uuid"
"github.com/hashicorp/nomad/lib"
"github.com/hashicorp/nomad/lib/delayheap"
"github.com/hashicorp/nomad/nomad/structs"
)
@ -86,7 +86,7 @@ type EvalBroker struct {
// delayHeap is a heap used to track incoming evaluations that are
// not eligible to enqueue until their WaitTime
delayHeap *lib.DelayHeap
delayHeap *delayheap.DelayHeap
// delayedEvalsUpdateCh is used to trigger notifications for updates
// to the delayHeap
@ -142,7 +142,7 @@ func NewEvalBroker(timeout, initialNackDelay, subsequentNackDelay time.Duration,
timeWait: make(map[string]*time.Timer),
initialNackDelay: initialNackDelay,
subsequentNackDelay: subsequentNackDelay,
delayHeap: lib.NewDelayHeap(),
delayHeap: delayheap.NewDelayHeap(),
delayedEvalsUpdateCh: make(chan struct{}, 1),
}
b.stats.ByScheduler = make(map[string]*SchedulerStats)
@ -719,7 +719,7 @@ func (b *EvalBroker) flush() {
b.ready = make(map[string]PendingEvaluations)
b.unack = make(map[string]*unackEval)
b.timeWait = make(map[string]*time.Timer)
b.delayHeap = lib.NewDelayHeap()
b.delayHeap = delayheap.NewDelayHeap()
}
// evalWrapper satisfies the HeapNode interface

View File

@ -105,6 +105,17 @@ func (j *Job) Diff(other *Job, contextual bool) (*JobDiff, error) {
diff.Objects = append(diff.Objects, conDiff...)
}
// Affinities diff
affinitiesDiff := primitiveObjectSetDiff(
interfaceSlice(j.Affinities),
interfaceSlice(other.Affinities),
[]string{"str"},
"Affinity",
contextual)
if affinitiesDiff != nil {
diff.Objects = append(diff.Objects, affinitiesDiff...)
}
// Task groups diff
tgs, err := taskGroupDiffs(j.TaskGroups, other.TaskGroups, contextual)
if err != nil {
@ -228,6 +239,17 @@ func (tg *TaskGroup) Diff(other *TaskGroup, contextual bool) (*TaskGroupDiff, er
diff.Objects = append(diff.Objects, conDiff...)
}
// Affinities diff
affinitiesDiff := primitiveObjectSetDiff(
interfaceSlice(tg.Affinities),
interfaceSlice(other.Affinities),
[]string{"str"},
"Affinity",
contextual)
if affinitiesDiff != nil {
diff.Objects = append(diff.Objects, affinitiesDiff...)
}
// Restart policy diff
rDiff := primitiveObjectDiff(tg.RestartPolicy, other.RestartPolicy, nil, "RestartPolicy", contextual)
if rDiff != nil {
@ -387,6 +409,17 @@ func (t *Task) Diff(other *Task, contextual bool) (*TaskDiff, error) {
diff.Objects = append(diff.Objects, conDiff...)
}
// Affinities diff
affinitiesDiff := primitiveObjectSetDiff(
interfaceSlice(t.Affinities),
interfaceSlice(other.Affinities),
[]string{"str"},
"Affinity",
contextual)
if affinitiesDiff != nil {
diff.Objects = append(diff.Objects, affinitiesDiff...)
}
// Config diff
if cDiff := configDiff(t.Config, other.Config, contextual); cDiff != nil {
diff.Objects = append(diff.Objects, cDiff)

View File

@ -753,6 +753,110 @@ func TestJobDiff(t *testing.T) {
},
},
},
{
// Affinities edited
Old: &Job{
Affinities: []*Affinity{
{
LTarget: "foo",
RTarget: "foo",
Operand: "foo",
Weight: 20,
str: "foo",
},
{
LTarget: "bar",
RTarget: "bar",
Operand: "bar",
Weight: 20,
str: "bar",
},
},
},
New: &Job{
Affinities: []*Affinity{
{
LTarget: "foo",
RTarget: "foo",
Operand: "foo",
Weight: 20,
str: "foo",
},
{
LTarget: "baz",
RTarget: "baz",
Operand: "baz",
Weight: 20,
str: "baz",
},
},
},
Expected: &JobDiff{
Type: DiffTypeEdited,
Objects: []*ObjectDiff{
{
Type: DiffTypeAdded,
Name: "Affinity",
Fields: []*FieldDiff{
{
Type: DiffTypeAdded,
Name: "LTarget",
Old: "",
New: "baz",
},
{
Type: DiffTypeAdded,
Name: "Operand",
Old: "",
New: "baz",
},
{
Type: DiffTypeAdded,
Name: "RTarget",
Old: "",
New: "baz",
},
{
Type: DiffTypeAdded,
Name: "Weight",
Old: "",
New: "20",
},
},
},
{
Type: DiffTypeDeleted,
Name: "Affinity",
Fields: []*FieldDiff{
{
Type: DiffTypeDeleted,
Name: "LTarget",
Old: "bar",
New: "",
},
{
Type: DiffTypeDeleted,
Name: "Operand",
Old: "bar",
New: "",
},
{
Type: DiffTypeDeleted,
Name: "RTarget",
Old: "bar",
New: "",
},
{
Type: DiffTypeDeleted,
Name: "Weight",
Old: "20",
New: "",
},
},
},
},
},
},
{
// Task groups edited
Old: &Job{
@ -1303,6 +1407,110 @@ func TestTaskGroupDiff(t *testing.T) {
},
},
},
{
// Affinities edited
Old: &TaskGroup{
Affinities: []*Affinity{
{
LTarget: "foo",
RTarget: "foo",
Operand: "foo",
Weight: 20,
str: "foo",
},
{
LTarget: "bar",
RTarget: "bar",
Operand: "bar",
Weight: 20,
str: "bar",
},
},
},
New: &TaskGroup{
Affinities: []*Affinity{
{
LTarget: "foo",
RTarget: "foo",
Operand: "foo",
Weight: 20,
str: "foo",
},
{
LTarget: "baz",
RTarget: "baz",
Operand: "baz",
Weight: 20,
str: "baz",
},
},
},
Expected: &TaskGroupDiff{
Type: DiffTypeEdited,
Objects: []*ObjectDiff{
{
Type: DiffTypeAdded,
Name: "Affinity",
Fields: []*FieldDiff{
{
Type: DiffTypeAdded,
Name: "LTarget",
Old: "",
New: "baz",
},
{
Type: DiffTypeAdded,
Name: "Operand",
Old: "",
New: "baz",
},
{
Type: DiffTypeAdded,
Name: "RTarget",
Old: "",
New: "baz",
},
{
Type: DiffTypeAdded,
Name: "Weight",
Old: "",
New: "20",
},
},
},
{
Type: DiffTypeDeleted,
Name: "Affinity",
Fields: []*FieldDiff{
{
Type: DiffTypeDeleted,
Name: "LTarget",
Old: "bar",
New: "",
},
{
Type: DiffTypeDeleted,
Name: "Operand",
Old: "bar",
New: "",
},
{
Type: DiffTypeDeleted,
Name: "RTarget",
Old: "bar",
New: "",
},
{
Type: DiffTypeDeleted,
Name: "Weight",
Old: "20",
New: "",
},
},
},
},
},
},
{
// RestartPolicy added
Old: &TaskGroup{},
@ -2610,6 +2818,110 @@ func TestTaskDiff(t *testing.T) {
},
},
},
{
Name: "Affinities edited",
Old: &Task{
Affinities: []*Affinity{
{
LTarget: "foo",
RTarget: "foo",
Operand: "foo",
Weight: 20,
str: "foo",
},
{
LTarget: "bar",
RTarget: "bar",
Operand: "bar",
Weight: 20,
str: "bar",
},
},
},
New: &Task{
Affinities: []*Affinity{
{
LTarget: "foo",
RTarget: "foo",
Operand: "foo",
Weight: 20,
str: "foo",
},
{
LTarget: "baz",
RTarget: "baz",
Operand: "baz",
Weight: 20,
str: "baz",
},
},
},
Expected: &TaskDiff{
Type: DiffTypeEdited,
Objects: []*ObjectDiff{
{
Type: DiffTypeAdded,
Name: "Affinity",
Fields: []*FieldDiff{
{
Type: DiffTypeAdded,
Name: "LTarget",
Old: "",
New: "baz",
},
{
Type: DiffTypeAdded,
Name: "Operand",
Old: "",
New: "baz",
},
{
Type: DiffTypeAdded,
Name: "RTarget",
Old: "",
New: "baz",
},
{
Type: DiffTypeAdded,
Name: "Weight",
Old: "",
New: "20",
},
},
},
{
Type: DiffTypeDeleted,
Name: "Affinity",
Fields: []*FieldDiff{
{
Type: DiffTypeDeleted,
Name: "LTarget",
Old: "bar",
New: "",
},
{
Type: DiffTypeDeleted,
Name: "Operand",
Old: "bar",
New: "",
},
{
Type: DiffTypeDeleted,
Name: "RTarget",
Old: "bar",
New: "",
},
{
Type: DiffTypeDeleted,
Name: "Weight",
Old: "20",
New: "",
},
},
},
},
},
},
{
Name: "LogConfig added",
Old: &Task{},

View File

@ -208,6 +208,58 @@ func CopySliceConstraints(s []*Constraint) []*Constraint {
return c
}
func CopySliceAffinities(s []*Affinity) []*Affinity {
l := len(s)
if l == 0 {
return nil
}
c := make([]*Affinity, l)
for i, v := range s {
c[i] = v.Copy()
}
return c
}
func CopySliceSpreads(s []*Spread) []*Spread {
l := len(s)
if l == 0 {
return nil
}
c := make([]*Spread, l)
for i, v := range s {
c[i] = v.Copy()
}
return c
}
func CopySliceSpreadTarget(s []*SpreadTarget) []*SpreadTarget {
l := len(s)
if l == 0 {
return nil
}
c := make([]*SpreadTarget, l)
for i, v := range s {
c[i] = v.Copy()
}
return c
}
func CopySliceNodeScoreMeta(s []*NodeScoreMeta) []*NodeScoreMeta {
l := len(s)
if l == 0 {
return nil
}
c := make([]*NodeScoreMeta, l)
for i, v := range s {
c[i] = v.Copy()
}
return c
}
// VaultPoliciesSet takes the structure returned by VaultPolicies and returns
// the set of required policies
func VaultPoliciesSet(policies map[string]map[string]*Vault) []string {

View File

@ -24,6 +24,9 @@ import (
"golang.org/x/crypto/blake2b"
"container/heap"
"math"
"github.com/gorhill/cronexpr"
"github.com/hashicorp/consul/api"
multierror "github.com/hashicorp/go-multierror"
@ -32,11 +35,10 @@ import (
"github.com/hashicorp/nomad/helper"
"github.com/hashicorp/nomad/helper/args"
"github.com/hashicorp/nomad/helper/uuid"
"github.com/hashicorp/nomad/lib/kheap"
"github.com/mitchellh/copystructure"
"github.com/ugorji/go/codec"
"math"
hcodec "github.com/hashicorp/go-msgpack/codec"
)
@ -133,6 +135,13 @@ const (
// MaxRetainedNodeEvents is the maximum number of node events that will be
// retained for a single node
MaxRetainedNodeEvents = 10
// MaxRetainedNodeScores is the number of top scoring nodes for which we
// retain scoring metadata
MaxRetainedNodeScores = 5
// Normalized scorer name
NormScorerName = "normalized-score"
)
// Context defines the scope in which a search for Nomad object operates, and
@ -2004,6 +2013,14 @@ type Job struct {
// all the task groups and tasks.
Constraints []*Constraint
// Affinities can be specified at the job level to express
// scheduling preferences that apply to all groups and tasks
Affinities []*Affinity
// Spread can be specified at the job level to express spreading
// allocations across a desired attribute, such as datacenter
Spreads []*Spread
// TaskGroups are the collections of task groups that this job needs
// to run. Each task group is an atomic unit of scheduling and placement.
TaskGroups []*TaskGroup
@ -2112,6 +2129,7 @@ func (j *Job) Copy() *Job {
*nj = *j
nj.Datacenters = helper.CopySliceString(nj.Datacenters)
nj.Constraints = CopySliceConstraints(nj.Constraints)
nj.Affinities = CopySliceAffinities(nj.Affinities)
if j.TaskGroups != nil {
tgs := make([]*TaskGroup, len(nj.TaskGroups))
@ -2167,6 +2185,31 @@ func (j *Job) Validate() error {
mErr.Errors = append(mErr.Errors, outer)
}
}
if j.Type == JobTypeSystem {
if j.Affinities != nil {
mErr.Errors = append(mErr.Errors, fmt.Errorf("System jobs may not have an affinity stanza"))
}
} else {
for idx, affinity := range j.Affinities {
if err := affinity.Validate(); err != nil {
outer := fmt.Errorf("Affinity %d validation failed: %s", idx+1, err)
mErr.Errors = append(mErr.Errors, outer)
}
}
}
if j.Type == JobTypeSystem {
if j.Spreads != nil {
mErr.Errors = append(mErr.Errors, fmt.Errorf("System jobs may not have a spread stanza"))
}
} else {
for idx, spread := range j.Spreads {
if err := spread.Validate(); err != nil {
outer := fmt.Errorf("Spread %d validation failed: %s", idx+1, err)
mErr.Errors = append(mErr.Errors, outer)
}
}
}
// Check for duplicate task groups
taskGroups := make(map[string]int)
@ -3315,6 +3358,14 @@ type TaskGroup struct {
// ReschedulePolicy is used to configure how the scheduler should
// retry failed allocations.
ReschedulePolicy *ReschedulePolicy
// Affinities can be specified at the task group level to express
// scheduling preferences.
Affinities []*Affinity
// Spread can be specified at the task group level to express spreading
// allocations across a desired attribute, such as datacenter
Spreads []*Spread
}
func (tg *TaskGroup) Copy() *TaskGroup {
@ -3327,6 +3378,8 @@ func (tg *TaskGroup) Copy() *TaskGroup {
ntg.Constraints = CopySliceConstraints(ntg.Constraints)
ntg.RestartPolicy = ntg.RestartPolicy.Copy()
ntg.ReschedulePolicy = ntg.ReschedulePolicy.Copy()
ntg.Affinities = CopySliceAffinities(ntg.Affinities)
ntg.Spreads = CopySliceSpreads(ntg.Spreads)
if tg.Tasks != nil {
tasks := make([]*Task, len(ntg.Tasks))
@ -3407,6 +3460,18 @@ func (tg *TaskGroup) Validate(j *Job) error {
mErr.Errors = append(mErr.Errors, outer)
}
}
if j.Type == JobTypeSystem {
if tg.Affinities != nil {
mErr.Errors = append(mErr.Errors, fmt.Errorf("System jobs may not have an affinity stanza"))
}
} else {
for idx, affinity := range tg.Affinities {
if err := affinity.Validate(); err != nil {
outer := fmt.Errorf("Affinity %d validation failed: %s", idx+1, err)
mErr.Errors = append(mErr.Errors, outer)
}
}
}
if tg.RestartPolicy != nil {
if err := tg.RestartPolicy.Validate(); err != nil {
@ -3416,6 +3481,19 @@ func (tg *TaskGroup) Validate(j *Job) error {
mErr.Errors = append(mErr.Errors, fmt.Errorf("Task Group %v should have a restart policy", tg.Name))
}
if j.Type == JobTypeSystem {
if tg.Spreads != nil {
mErr.Errors = append(mErr.Errors, fmt.Errorf("System jobs may not have a spread stanza"))
}
} else {
for idx, spread := range tg.Spreads {
if err := spread.Validate(); err != nil {
outer := fmt.Errorf("Spread %d validation failed: %s", idx+1, err)
mErr.Errors = append(mErr.Errors, outer)
}
}
}
if j.Type == JobTypeSystem {
if tg.ReschedulePolicy != nil {
mErr.Errors = append(mErr.Errors, fmt.Errorf("System jobs should not have a reschedule policy"))
@ -3504,7 +3582,7 @@ func (tg *TaskGroup) Validate(j *Job) error {
// Validate the tasks
for _, task := range tg.Tasks {
if err := task.Validate(tg.EphemeralDisk); err != nil {
if err := task.Validate(tg.EphemeralDisk, j.Type); err != nil {
outer := fmt.Errorf("Task %s validation failed: %v", task.Name, err)
mErr.Errors = append(mErr.Errors, outer)
}
@ -4007,6 +4085,10 @@ type Task struct {
// the particular task.
Constraints []*Constraint
// Affinities can be specified at the task level to express
// scheduling preferences
Affinities []*Affinity
// Resources is the resources needed by this task
Resources *Resources
@ -4060,6 +4142,7 @@ func (t *Task) Copy() *Task {
}
nt.Constraints = CopySliceConstraints(nt.Constraints)
nt.Affinities = CopySliceAffinities(nt.Affinities)
nt.Vault = nt.Vault.Copy()
nt.Resources = nt.Resources.Copy()
@ -4135,7 +4218,7 @@ func (t *Task) GoString() string {
}
// Validate is used to sanity check a task
func (t *Task) Validate(ephemeralDisk *EphemeralDisk) error {
func (t *Task) Validate(ephemeralDisk *EphemeralDisk, jobType string) error {
var mErr multierror.Error
if t.Name == "" {
mErr.Errors = append(mErr.Errors, errors.New("Missing task name"))
@ -4189,6 +4272,19 @@ func (t *Task) Validate(ephemeralDisk *EphemeralDisk) error {
}
}
if jobType == JobTypeSystem {
if t.Affinities != nil {
mErr.Errors = append(mErr.Errors, fmt.Errorf("System jobs may not have an affinity stanza"))
}
} else {
for idx, affinity := range t.Affinities {
if err := affinity.Validate(); err != nil {
outer := fmt.Errorf("Affinity %d validation failed: %s", idx+1, err)
mErr.Errors = append(mErr.Errors, outer)
}
}
}
// Validate Services
if err := validateServices(t); err != nil {
mErr.Errors = append(mErr.Errors, err)
@ -5165,6 +5261,8 @@ const (
ConstraintRegex = "regexp"
ConstraintVersion = "version"
ConstraintSetContains = "set_contains"
ConstraintSetContainsAll = "set_contains_all"
ConstraintSetContaintsAny = "set_contains_any"
)
// Constraints are used to restrict placement options.
@ -5251,6 +5349,180 @@ func (c *Constraint) Validate() error {
return mErr.ErrorOrNil()
}
// Affinity is used to score placement options based on a weight
type Affinity struct {
LTarget string // Left-hand target
RTarget string // Right-hand target
Operand string // Affinity operand (<=, <, =, !=, >, >=), set_contains_all, set_contains_any
Weight float64 // Weight applied to nodes that match the affinity. Can be negative
str string // Memoized string
}
// Equal checks if two affinities are equal
func (a *Affinity) Equal(o *Affinity) bool {
return a.LTarget == o.LTarget &&
a.RTarget == o.RTarget &&
a.Operand == o.Operand &&
a.Weight == o.Weight
}
func (a *Affinity) Copy() *Affinity {
if a == nil {
return nil
}
na := new(Affinity)
*na = *a
return na
}
func (a *Affinity) String() string {
if a.str != "" {
return a.str
}
a.str = fmt.Sprintf("%s %s %s %v", a.LTarget, a.Operand, a.RTarget, a.Weight)
return a.str
}
func (a *Affinity) Validate() error {
var mErr multierror.Error
if a.Operand == "" {
mErr.Errors = append(mErr.Errors, errors.New("Missing affinity operand"))
}
// Perform additional validation based on operand
switch a.Operand {
case ConstraintSetContainsAll, ConstraintSetContaintsAny, ConstraintSetContains:
if a.RTarget == "" {
mErr.Errors = append(mErr.Errors, fmt.Errorf("Set contains operators require an RTarget"))
}
case ConstraintRegex:
if _, err := regexp.Compile(a.RTarget); err != nil {
mErr.Errors = append(mErr.Errors, fmt.Errorf("Regular expression failed to compile: %v", err))
}
case ConstraintVersion:
if _, err := version.NewConstraint(a.RTarget); err != nil {
mErr.Errors = append(mErr.Errors, fmt.Errorf("Version affinity is invalid: %v", err))
}
case "=", "==", "is", "!=", "not", "<", "<=", ">", ">=":
if a.RTarget == "" {
mErr.Errors = append(mErr.Errors, fmt.Errorf("Operator %q requires an RTarget", a.Operand))
}
default:
mErr.Errors = append(mErr.Errors, fmt.Errorf("Unknown affinity operator %q", a.Operand))
}
// Ensure we have an LTarget
if a.LTarget == "" {
mErr.Errors = append(mErr.Errors, fmt.Errorf("No LTarget provided but is required"))
}
// Ensure that weight is between -100 and 100, and not zero
if a.Weight == 0 {
mErr.Errors = append(mErr.Errors, fmt.Errorf("Affinity weight cannot be zero"))
}
if a.Weight > 100 || a.Weight < -100 {
mErr.Errors = append(mErr.Errors, fmt.Errorf("Affinity weight must be within the range [-100,100]"))
}
return mErr.ErrorOrNil()
}
// Spread is used to specify desired distribution of allocations according to weight
type Spread struct {
// Attribute is the node attribute used as the spread criteria
Attribute string
// Weight is the relative weight of this spread, useful when there are multiple
// spread and affinities
Weight int
// SpreadTarget is used to describe desired percentages for each attribute value
SpreadTarget []*SpreadTarget
// Memoized string representation
str string
}
func (s *Spread) Copy() *Spread {
if s == nil {
return nil
}
ns := new(Spread)
*ns = *s
ns.SpreadTarget = CopySliceSpreadTarget(s.SpreadTarget)
return ns
}
func (s *Spread) String() string {
if s.str != "" {
return s.str
}
s.str = fmt.Sprintf("%s %s %v", s.Attribute, s.SpreadTarget, s.Weight)
return s.str
}
func (s *Spread) Validate() error {
var mErr multierror.Error
if s.Attribute == "" {
mErr.Errors = append(mErr.Errors, errors.New("Missing spread attribute"))
}
if s.Weight <= 0 || s.Weight > 100 {
mErr.Errors = append(mErr.Errors, errors.New("Spread stanza must have a positive weight from 0 to 100"))
}
seen := make(map[string]struct{})
sumPercent := uint32(0)
for _, target := range s.SpreadTarget {
// Make sure there are no duplicates
_, ok := seen[target.Value]
if !ok {
seen[target.Value] = struct{}{}
} else {
mErr.Errors = append(mErr.Errors, errors.New(fmt.Sprintf("Spread target value %q already defined", target.Value)))
}
if target.Percent < 0 || target.Percent > 100 {
mErr.Errors = append(mErr.Errors, errors.New(fmt.Sprintf("Spread target percentage for value %q must be between 0 and 100", target.Value)))
}
sumPercent += target.Percent
}
if sumPercent > 100 {
mErr.Errors = append(mErr.Errors, errors.New(fmt.Sprintf("Sum of spread target percentages must not be greater than 100%%; got %d%%", sumPercent)))
}
return mErr.ErrorOrNil()
}
// SpreadTarget is used to specify desired percentages for each attribute value
type SpreadTarget struct {
// Value is a single attribute value, like "dc1"
Value string
// Percent is the desired percentage of allocs
Percent uint32
// Memoized string representation
str string
}
func (s *SpreadTarget) Copy() *SpreadTarget {
if s == nil {
return nil
}
ns := new(SpreadTarget)
*ns = *s
return ns
}
func (s *SpreadTarget) String() string {
if s.str != "" {
return s.str
}
s.str = fmt.Sprintf("%q %v%%", s.Value, s.Percent)
return s.str
}
// EphemeralDisk is an ephemeral disk object
type EphemeralDisk struct {
// Sticky indicates whether the allocation is sticky to a node
@ -6226,8 +6498,20 @@ type AllocMetric struct {
// Scores is the scores of the final few nodes remaining
// for placement. The top score is typically selected.
// Deprecated: Replaced by ScoreMetaData in Nomad 0.9
Scores map[string]float64
// ScoreMetaData is a slice of top scoring nodes displayed in the CLI
ScoreMetaData []*NodeScoreMeta
// nodeScoreMeta is used to keep scores for a single node id. It is cleared out after
// we receive normalized score during the last step of the scoring stack.
nodeScoreMeta *NodeScoreMeta
// topScores is used to maintain a heap of the top K nodes with
// the highest normalized score
topScores *kheap.ScoreHeap
// AllocationTime is a measure of how long the allocation
// attempt took. This can affect performance and SLAs.
AllocationTime time.Duration
@ -6252,6 +6536,7 @@ func (a *AllocMetric) Copy() *AllocMetric {
na.DimensionExhausted = helper.CopyMapStringInt(na.DimensionExhausted)
na.QuotaExhausted = helper.CopySliceString(na.QuotaExhausted)
na.Scores = helper.CopyMapStringFloat64(na.Scores)
na.ScoreMetaData = CopySliceNodeScoreMeta(na.ScoreMetaData)
return na
}
@ -6299,12 +6584,77 @@ func (a *AllocMetric) ExhaustQuota(dimensions []string) {
a.QuotaExhausted = append(a.QuotaExhausted, dimensions...)
}
// ScoreNode is used to gather top K scoring nodes in a heap
func (a *AllocMetric) ScoreNode(node *Node, name string, score float64) {
if a.Scores == nil {
a.Scores = make(map[string]float64)
// Create nodeScoreMeta lazily if its the first time or if its a new node
if a.nodeScoreMeta == nil || a.nodeScoreMeta.NodeID != node.ID {
a.nodeScoreMeta = &NodeScoreMeta{
NodeID: node.ID,
Scores: make(map[string]float64),
}
}
key := fmt.Sprintf("%s.%s", node.ID, name)
a.Scores[key] = score
if name == NormScorerName {
a.nodeScoreMeta.NormScore = score
// Once we have the normalized score we can push to the heap
// that tracks top K by normalized score
// Create the heap if its not there already
if a.topScores == nil {
a.topScores = kheap.NewScoreHeap(MaxRetainedNodeScores)
}
heap.Push(a.topScores, a.nodeScoreMeta)
// Clear out this entry because its now in the heap
a.nodeScoreMeta = nil
} else {
a.nodeScoreMeta.Scores[name] = score
}
}
// PopulateScoreMetaData populates a map of scorer to scoring metadata
// The map is populated by popping elements from a heap of top K scores
// maintained per scorer
func (a *AllocMetric) PopulateScoreMetaData() {
if a.topScores == nil {
return
}
if a.ScoreMetaData == nil {
a.ScoreMetaData = make([]*NodeScoreMeta, a.topScores.Len())
}
heapItems := a.topScores.GetItemsReverse()
for i, item := range heapItems {
a.ScoreMetaData[i] = item.(*NodeScoreMeta)
}
}
// NodeScoreMeta captures scoring meta data derived from
// different scoring factors.
type NodeScoreMeta struct {
NodeID string
Scores map[string]float64
NormScore float64
}
func (s *NodeScoreMeta) Copy() *NodeScoreMeta {
if s == nil {
return nil
}
ns := new(NodeScoreMeta)
*ns = *s
return ns
}
func (s *NodeScoreMeta) String() string {
return fmt.Sprintf("%s %f %v", s.NodeID, s.NormScore, s.Scores)
}
func (s *NodeScoreMeta) Score() float64 {
return s.NormScore
}
func (s *NodeScoreMeta) Data() interface{} {
return s
}
// AllocDeploymentStatus captures the status of the allocation as part of the

View File

@ -384,6 +384,42 @@ func TestJob_SystemJob_Validate(t *testing.T) {
if err := j.Validate(); err != nil {
t.Fatalf("unexpected err: %v", err)
}
// Add affinities at job, task group and task level, that should fail validation
j.Affinities = []*Affinity{{
Operand: "=",
LTarget: "${node.datacenter}",
RTarget: "dc1",
}}
j.TaskGroups[0].Affinities = []*Affinity{{
Operand: "=",
LTarget: "${meta.rack}",
RTarget: "r1",
}}
j.TaskGroups[0].Tasks[0].Affinities = []*Affinity{{
Operand: "=",
LTarget: "${meta.rack}",
RTarget: "r1",
}}
err = j.Validate()
require.NotNil(t, err)
require.Contains(t, err.Error(), "System jobs may not have an affinity stanza")
// Add spread at job and task group level, that should fail validation
j.Spreads = []*Spread{{
Attribute: "${node.datacenter}",
Weight: 100,
}}
j.TaskGroups[0].Spreads = []*Spread{{
Attribute: "${node.datacenter}",
Weight: 100,
}}
err = j.Validate()
require.NotNil(t, err)
require.Contains(t, err.Error(), "System jobs may not have a spread stanza")
}
func TestJob_VaultPolicies(t *testing.T) {
@ -739,7 +775,7 @@ func TestTaskGroup_Validate(t *testing.T) {
func TestTask_Validate(t *testing.T) {
task := &Task{}
ephemeralDisk := DefaultEphemeralDisk()
err := task.Validate(ephemeralDisk)
err := task.Validate(ephemeralDisk, JobTypeBatch)
mErr := err.(*multierror.Error)
if !strings.Contains(mErr.Errors[0].Error(), "task name") {
t.Fatalf("err: %s", err)
@ -752,7 +788,7 @@ func TestTask_Validate(t *testing.T) {
}
task = &Task{Name: "web/foo"}
err = task.Validate(ephemeralDisk)
err = task.Validate(ephemeralDisk, JobTypeBatch)
mErr = err.(*multierror.Error)
if !strings.Contains(mErr.Errors[0].Error(), "slashes") {
t.Fatalf("err: %s", err)
@ -769,7 +805,7 @@ func TestTask_Validate(t *testing.T) {
LogConfig: DefaultLogConfig(),
}
ephemeralDisk.SizeMB = 200
err = task.Validate(ephemeralDisk)
err = task.Validate(ephemeralDisk, JobTypeBatch)
if err != nil {
t.Fatalf("err: %s", err)
}
@ -783,7 +819,7 @@ func TestTask_Validate(t *testing.T) {
LTarget: "${meta.rack}",
})
err = task.Validate(ephemeralDisk)
err = task.Validate(ephemeralDisk, JobTypeBatch)
mErr = err.(*multierror.Error)
if !strings.Contains(mErr.Errors[0].Error(), "task level: distinct_hosts") {
t.Fatalf("err: %s", err)
@ -866,7 +902,7 @@ func TestTask_Validate_Services(t *testing.T) {
},
}
err := task.Validate(ephemeralDisk)
err := task.Validate(ephemeralDisk, JobTypeService)
if err == nil {
t.Fatal("expected an error")
}
@ -887,7 +923,7 @@ func TestTask_Validate_Services(t *testing.T) {
t.Fatalf("err: %v", err)
}
if err = task1.Validate(ephemeralDisk); err != nil {
if err = task1.Validate(ephemeralDisk, JobTypeService); err != nil {
t.Fatalf("err : %v", err)
}
}
@ -946,7 +982,7 @@ func TestTask_Validate_Service_AddressMode_Ok(t *testing.T) {
for _, service := range cases {
task := getTask(service)
t.Run(service.Name, func(t *testing.T) {
if err := task.Validate(ephemeralDisk); err != nil {
if err := task.Validate(ephemeralDisk, JobTypeService); err != nil {
t.Fatalf("unexpected err: %v", err)
}
})
@ -999,7 +1035,7 @@ func TestTask_Validate_Service_AddressMode_Bad(t *testing.T) {
for _, service := range cases {
task := getTask(service)
t.Run(service.Name, func(t *testing.T) {
err := task.Validate(ephemeralDisk)
err := task.Validate(ephemeralDisk, JobTypeService)
if err == nil {
t.Fatalf("expected an error")
}
@ -1320,7 +1356,7 @@ func TestTask_Validate_LogConfig(t *testing.T) {
SizeMB: 1,
}
err := task.Validate(ephemeralDisk)
err := task.Validate(ephemeralDisk, JobTypeService)
mErr := err.(*multierror.Error)
if !strings.Contains(mErr.Errors[3].Error(), "log storage") {
t.Fatalf("err: %s", err)
@ -1337,7 +1373,7 @@ func TestTask_Validate_Template(t *testing.T) {
SizeMB: 1,
}
err := task.Validate(ephemeralDisk)
err := task.Validate(ephemeralDisk, JobTypeService)
if !strings.Contains(err.Error(), "Template 1 validation failed") {
t.Fatalf("err: %s", err)
}
@ -1350,7 +1386,7 @@ func TestTask_Validate_Template(t *testing.T) {
}
task.Templates = []*Template{good, good}
err = task.Validate(ephemeralDisk)
err = task.Validate(ephemeralDisk, JobTypeService)
if !strings.Contains(err.Error(), "same destination as") {
t.Fatalf("err: %s", err)
}
@ -1363,7 +1399,7 @@ func TestTask_Validate_Template(t *testing.T) {
},
}
err = task.Validate(ephemeralDisk)
err = task.Validate(ephemeralDisk, JobTypeService)
if err == nil {
t.Fatalf("expected error from Template.Validate")
}
@ -1563,6 +1599,94 @@ func TestConstraint_Validate(t *testing.T) {
}
}
func TestAffinity_Validate(t *testing.T) {
type tc struct {
affinity *Affinity
err error
name string
}
testCases := []tc{
{
affinity: &Affinity{},
err: fmt.Errorf("Missing affinity operand"),
},
{
affinity: &Affinity{
Operand: "foo",
LTarget: "${meta.node_class}",
Weight: 10,
},
err: fmt.Errorf("Unknown affinity operator \"foo\""),
},
{
affinity: &Affinity{
Operand: "=",
LTarget: "${meta.node_class}",
Weight: 10,
},
err: fmt.Errorf("Operator \"=\" requires an RTarget"),
},
{
affinity: &Affinity{
Operand: "=",
LTarget: "${meta.node_class}",
RTarget: "c4",
Weight: 0,
},
err: fmt.Errorf("Affinity weight cannot be zero"),
},
{
affinity: &Affinity{
Operand: "=",
LTarget: "${meta.node_class}",
RTarget: "c4",
Weight: 500,
},
err: fmt.Errorf("Affinity weight must be within the range [-100,100]"),
},
{
affinity: &Affinity{
Operand: "=",
LTarget: "${node.class}",
Weight: 10,
},
err: fmt.Errorf("Operator \"=\" requires an RTarget"),
},
{
affinity: &Affinity{
Operand: "version",
LTarget: "${meta.os}",
RTarget: ">>2.0",
Weight: 500,
},
err: fmt.Errorf("Version affinity is invalid"),
},
{
affinity: &Affinity{
Operand: "regexp",
LTarget: "${meta.os}",
RTarget: "\\K2.0",
Weight: 100,
},
err: fmt.Errorf("Regular expression failed to compile"),
},
}
for _, tc := range testCases {
t.Run(tc.name, func(t *testing.T) {
err := tc.affinity.Validate()
if tc.err != nil {
require.NotNil(t, err)
require.Contains(t, err.Error(), tc.err.Error())
} else {
require.Nil(t, err)
}
})
}
}
func TestUpdateStrategy_Validate(t *testing.T) {
u := &UpdateStrategy{
MaxParallel: 0,
@ -3817,3 +3941,119 @@ func TestNode_Copy(t *testing.T) {
require.Equal(node.DrainStrategy, node2.DrainStrategy)
require.Equal(node.Drivers, node2.Drivers)
}
func TestSpread_Validate(t *testing.T) {
type tc struct {
spread *Spread
err error
name string
}
testCases := []tc{
{
spread: &Spread{},
err: fmt.Errorf("Missing spread attribute"),
name: "empty spread",
},
{
spread: &Spread{
Attribute: "${node.datacenter}",
Weight: -1,
},
err: fmt.Errorf("Spread stanza must have a positive weight from 0 to 100"),
name: "Invalid weight",
},
{
spread: &Spread{
Attribute: "${node.datacenter}",
Weight: 200,
},
err: fmt.Errorf("Spread stanza must have a positive weight from 0 to 100"),
name: "Invalid weight",
},
{
spread: &Spread{
Attribute: "${node.datacenter}",
Weight: 50,
SpreadTarget: []*SpreadTarget{
{
Value: "dc1",
Percent: 25,
},
{
Value: "dc2",
Percent: 150,
},
},
},
err: fmt.Errorf("Spread target percentage for value \"dc2\" must be between 0 and 100"),
name: "Invalid percentages",
},
{
spread: &Spread{
Attribute: "${node.datacenter}",
Weight: 50,
SpreadTarget: []*SpreadTarget{
{
Value: "dc1",
Percent: 75,
},
{
Value: "dc2",
Percent: 75,
},
},
},
err: fmt.Errorf("Sum of spread target percentages must not be greater than 100%%; got %d%%", 150),
name: "Invalid percentages",
},
{
spread: &Spread{
Attribute: "${node.datacenter}",
Weight: 50,
SpreadTarget: []*SpreadTarget{
{
Value: "dc1",
Percent: 25,
},
{
Value: "dc1",
Percent: 50,
},
},
},
err: fmt.Errorf("Spread target value \"dc1\" already defined"),
name: "No spread targets",
},
{
spread: &Spread{
Attribute: "${node.datacenter}",
Weight: 50,
SpreadTarget: []*SpreadTarget{
{
Value: "dc1",
Percent: 25,
},
{
Value: "dc2",
Percent: 50,
},
},
},
err: nil,
name: "Valid spread",
},
}
for _, tc := range testCases {
t.Run(tc.name, func(t *testing.T) {
err := tc.spread.Validate()
if tc.err != nil {
require.NotNil(t, err)
require.Contains(t, err.Error(), tc.err.Error())
} else {
require.Nil(t, err)
}
})
}
}

View File

@ -34,8 +34,8 @@ type Context interface {
// RegexpCache is a cache of regular expressions
RegexpCache() map[string]*regexp.Regexp
// ConstraintCache is a cache of version constraints
ConstraintCache() map[string]version.Constraints
// VersionConstraintCache is a cache of version constraints
VersionConstraintCache() map[string]version.Constraints
// Eligibility returns a tracker for node eligibility in the context of the
// eval.
@ -54,7 +54,8 @@ func (e *EvalCache) RegexpCache() map[string]*regexp.Regexp {
}
return e.reCache
}
func (e *EvalCache) ConstraintCache() map[string]version.Constraints {
func (e *EvalCache) VersionConstraintCache() map[string]version.Constraints {
if e.constraintCache == nil {
e.constraintCache = make(map[string]version.Constraints)
}

View File

@ -403,11 +403,11 @@ func (c *ConstraintChecker) Feasible(option *structs.Node) bool {
func (c *ConstraintChecker) meetsConstraint(constraint *structs.Constraint, option *structs.Node) bool {
// Resolve the targets
lVal, ok := resolveConstraintTarget(constraint.LTarget, option)
lVal, ok := resolveTarget(constraint.LTarget, option)
if !ok {
return false
}
rVal, ok := resolveConstraintTarget(constraint.RTarget, option)
rVal, ok := resolveTarget(constraint.RTarget, option)
if !ok {
return false
}
@ -416,8 +416,8 @@ func (c *ConstraintChecker) meetsConstraint(constraint *structs.Constraint, opti
return checkConstraint(c.ctx, constraint.Operand, lVal, rVal)
}
// resolveConstraintTarget is used to resolve the LTarget and RTarget of a Constraint
func resolveConstraintTarget(target string, node *structs.Node) (interface{}, bool) {
// resolveTarget is used to resolve the LTarget and RTarget of a Constraint
func resolveTarget(target string, node *structs.Node) (interface{}, bool) {
// If no prefix, this must be a literal value
if !strings.HasPrefix(target, "${") {
return target, true
@ -470,16 +470,28 @@ func checkConstraint(ctx Context, operand string, lVal, rVal interface{}) bool {
case "<", "<=", ">", ">=":
return checkLexicalOrder(operand, lVal, rVal)
case structs.ConstraintVersion:
return checkVersionConstraint(ctx, lVal, rVal)
return checkVersionMatch(ctx, lVal, rVal)
case structs.ConstraintRegex:
return checkRegexpConstraint(ctx, lVal, rVal)
return checkRegexpMatch(ctx, lVal, rVal)
case structs.ConstraintSetContains:
return checkSetContainsConstraint(ctx, lVal, rVal)
return checkSetContainsAll(ctx, lVal, rVal)
default:
return false
}
}
// checkAffinity checks if a specific affinity is satisfied
func checkAffinity(ctx Context, operand string, lVal, rVal interface{}) bool {
switch operand {
case structs.ConstraintSetContaintsAny:
return checkSetContainsAny(lVal, rVal)
case structs.ConstraintSetContainsAll, structs.ConstraintSetContains:
return checkSetContainsAll(ctx, lVal, rVal)
default:
return checkConstraint(ctx, operand, lVal, rVal)
}
}
// checkLexicalOrder is used to check for lexical ordering
func checkLexicalOrder(op string, lVal, rVal interface{}) bool {
// Ensure the values are strings
@ -506,9 +518,9 @@ func checkLexicalOrder(op string, lVal, rVal interface{}) bool {
}
}
// checkVersionConstraint is used to compare a version on the
// checkVersionMatch is used to compare a version on the
// left hand side with a set of constraints on the right hand side
func checkVersionConstraint(ctx Context, lVal, rVal interface{}) bool {
func checkVersionMatch(ctx Context, lVal, rVal interface{}) bool {
// Parse the version
var versionStr string
switch v := lVal.(type) {
@ -533,7 +545,7 @@ func checkVersionConstraint(ctx Context, lVal, rVal interface{}) bool {
}
// Check the cache for a match
cache := ctx.ConstraintCache()
cache := ctx.VersionConstraintCache()
constraints := cache[constraintStr]
// Parse the constraints
@ -549,9 +561,9 @@ func checkVersionConstraint(ctx Context, lVal, rVal interface{}) bool {
return constraints.Check(vers)
}
// checkRegexpConstraint is used to compare a value on the
// checkRegexpMatch is used to compare a value on the
// left hand side with a regexp on the right hand side
func checkRegexpConstraint(ctx Context, lVal, rVal interface{}) bool {
func checkRegexpMatch(ctx Context, lVal, rVal interface{}) bool {
// Ensure left-hand is string
lStr, ok := lVal.(string)
if !ok {
@ -582,9 +594,9 @@ func checkRegexpConstraint(ctx Context, lVal, rVal interface{}) bool {
return re.MatchString(lStr)
}
// checkSetContainsConstraint is used to see if the left hand side contains the
// checkSetContainsAll is used to see if the left hand side contains the
// string on the right hand side
func checkSetContainsConstraint(ctx Context, lVal, rVal interface{}) bool {
func checkSetContainsAll(ctx Context, lVal, rVal interface{}) bool {
// Ensure left-hand is string
lStr, ok := lVal.(string)
if !ok {
@ -614,6 +626,38 @@ func checkSetContainsConstraint(ctx Context, lVal, rVal interface{}) bool {
return true
}
// checkSetContainsAny is used to see if the left hand side contains any
// values on the right hand side
func checkSetContainsAny(lVal, rVal interface{}) bool {
// Ensure left-hand is string
lStr, ok := lVal.(string)
if !ok {
return false
}
// RHS must be a string
rStr, ok := rVal.(string)
if !ok {
return false
}
input := strings.Split(lStr, ",")
lookup := make(map[string]struct{}, len(input))
for _, in := range input {
cleaned := strings.TrimSpace(in)
lookup[cleaned] = struct{}{}
}
for _, r := range strings.Split(rStr, ",") {
cleaned := strings.TrimSpace(r)
if _, ok := lookup[cleaned]; ok {
return true
}
}
return false
}
// FeasibilityWrapper is a FeasibleIterator which wraps both job and task group
// FeasibilityCheckers in which feasibility checking can be skipped if the
// computed node class has previously been marked as eligible or ineligible.

View File

@ -309,7 +309,7 @@ func TestResolveConstraintTarget(t *testing.T) {
}
for _, tc := range cases {
res, ok := resolveConstraintTarget(tc.target, tc.node)
res, ok := resolveTarget(tc.target, tc.node)
if ok != tc.result {
t.Fatalf("TC: %#v, Result: %v %v", tc, res, ok)
}
@ -460,7 +460,7 @@ func TestCheckVersionConstraint(t *testing.T) {
}
for _, tc := range cases {
_, ctx := testContext(t)
if res := checkVersionConstraint(ctx, tc.lVal, tc.rVal); res != tc.result {
if res := checkVersionMatch(ctx, tc.lVal, tc.rVal); res != tc.result {
t.Fatalf("TC: %#v, Result: %v", tc, res)
}
}
@ -495,7 +495,7 @@ func TestCheckRegexpConstraint(t *testing.T) {
}
for _, tc := range cases {
_, ctx := testContext(t)
if res := checkRegexpConstraint(ctx, tc.lVal, tc.rVal); res != tc.result {
if res := checkRegexpMatch(ctx, tc.lVal, tc.rVal); res != tc.result {
t.Fatalf("TC: %#v, Result: %v", tc, res)
}
}
@ -1611,3 +1611,11 @@ func TestFeasibilityWrapper_JobEligible_TgEscaped(t *testing.T) {
t.Fatalf("bad: %v %v", e, ok)
}
}
func TestSetContainsAny(t *testing.T) {
require.True(t, checkSetContainsAny("a", "a"))
require.True(t, checkSetContainsAny("a,b", "a"))
require.True(t, checkSetContainsAny(" a,b ", "a "))
require.True(t, checkSetContainsAny("a", "a"))
require.False(t, checkSetContainsAny("b", "a"))
}

View File

@ -470,6 +470,9 @@ func (s *GenericScheduler) computePlacements(destructive, place []placementResul
// Store the available nodes by datacenter
s.ctx.Metrics().NodesAvailable = byDC
// Compute top K scoring node metadata
s.ctx.Metrics().PopulateScoreMetaData()
// Set fields based on if we found an allocation option
if option != nil {
// Create an allocation for this

View File

@ -602,6 +602,174 @@ func TestServiceSched_JobRegister_DistinctProperty_TaskGroup_Incr(t *testing.T)
h.AssertEvalStatus(t, structs.EvalStatusComplete)
}
// Test job registration with spread configured
func TestServiceSched_Spread(t *testing.T) {
assert := assert.New(t)
start := uint32(100)
step := uint32(10)
for i := 0; i < 10; i++ {
name := fmt.Sprintf("%d%% in dc1", start)
t.Run(name, func(t *testing.T) {
h := NewHarness(t)
remaining := uint32(100 - start)
// Create a job that uses spread over data center
job := mock.Job()
job.Datacenters = []string{"dc1", "dc2"}
job.TaskGroups[0].Count = 10
job.TaskGroups[0].Spreads = append(job.TaskGroups[0].Spreads,
&structs.Spread{
Attribute: "${node.datacenter}",
Weight: 100,
SpreadTarget: []*structs.SpreadTarget{
{
Value: "dc1",
Percent: start,
},
{
Value: "dc2",
Percent: remaining,
},
},
})
assert.Nil(h.State.UpsertJob(h.NextIndex(), job), "UpsertJob")
// Create some nodes, half in dc2
var nodes []*structs.Node
nodeMap := make(map[string]*structs.Node)
for i := 0; i < 10; i++ {
node := mock.Node()
if i%2 == 0 {
node.Datacenter = "dc2"
}
nodes = append(nodes, node)
assert.Nil(h.State.UpsertNode(h.NextIndex(), node), "UpsertNode")
nodeMap[node.ID] = node
}
// Create a mock evaluation to register the job
eval := &structs.Evaluation{
Namespace: structs.DefaultNamespace,
ID: uuid.Generate(),
Priority: job.Priority,
TriggeredBy: structs.EvalTriggerJobRegister,
JobID: job.ID,
Status: structs.EvalStatusPending,
}
noErr(t, h.State.UpsertEvals(h.NextIndex(), []*structs.Evaluation{eval}))
// Process the evaluation
assert.Nil(h.Process(NewServiceScheduler, eval), "Process")
// Ensure a single plan
assert.Len(h.Plans, 1, "Number of plans")
plan := h.Plans[0]
// Ensure the plan doesn't have annotations.
assert.Nil(plan.Annotations, "Plan.Annotations")
// Ensure the eval hasn't spawned blocked eval
assert.Len(h.CreateEvals, 0, "Created Evals")
// Ensure the plan allocated
var planned []*structs.Allocation
dcAllocsMap := make(map[string]int)
for nodeId, allocList := range plan.NodeAllocation {
planned = append(planned, allocList...)
dc := nodeMap[nodeId].Datacenter
c := dcAllocsMap[dc]
c += len(allocList)
dcAllocsMap[dc] = c
}
assert.Len(planned, 10, "Planned Allocations")
expectedCounts := make(map[string]int)
expectedCounts["dc1"] = 10 - i
if i > 0 {
expectedCounts["dc2"] = i
}
require.Equal(t, expectedCounts, dcAllocsMap)
h.AssertEvalStatus(t, structs.EvalStatusComplete)
})
start = start - step
}
}
// Test job registration with even spread across dc
func TestServiceSched_EvenSpread(t *testing.T) {
assert := assert.New(t)
h := NewHarness(t)
// Create a job that uses even spread over data center
job := mock.Job()
job.Datacenters = []string{"dc1", "dc2"}
job.TaskGroups[0].Count = 10
job.TaskGroups[0].Spreads = append(job.TaskGroups[0].Spreads,
&structs.Spread{
Attribute: "${node.datacenter}",
Weight: 100,
})
assert.Nil(h.State.UpsertJob(h.NextIndex(), job), "UpsertJob")
// Create some nodes, half in dc2
var nodes []*structs.Node
nodeMap := make(map[string]*structs.Node)
for i := 0; i < 10; i++ {
node := mock.Node()
if i%2 == 0 {
node.Datacenter = "dc2"
}
nodes = append(nodes, node)
assert.Nil(h.State.UpsertNode(h.NextIndex(), node), "UpsertNode")
nodeMap[node.ID] = node
}
// Create a mock evaluation to register the job
eval := &structs.Evaluation{
Namespace: structs.DefaultNamespace,
ID: uuid.Generate(),
Priority: job.Priority,
TriggeredBy: structs.EvalTriggerJobRegister,
JobID: job.ID,
Status: structs.EvalStatusPending,
}
noErr(t, h.State.UpsertEvals(h.NextIndex(), []*structs.Evaluation{eval}))
// Process the evaluation
assert.Nil(h.Process(NewServiceScheduler, eval), "Process")
// Ensure a single plan
assert.Len(h.Plans, 1, "Number of plans")
plan := h.Plans[0]
// Ensure the plan doesn't have annotations.
assert.Nil(plan.Annotations, "Plan.Annotations")
// Ensure the eval hasn't spawned blocked eval
assert.Len(h.CreateEvals, 0, "Created Evals")
// Ensure the plan allocated
var planned []*structs.Allocation
dcAllocsMap := make(map[string]int)
for nodeId, allocList := range plan.NodeAllocation {
planned = append(planned, allocList...)
dc := nodeMap[nodeId].Datacenter
c := dcAllocsMap[dc]
c += len(allocList)
dcAllocsMap[dc] = c
}
assert.Len(planned, 10, "Planned Allocations")
// Expect even split allocs across datacenter
expectedCounts := make(map[string]int)
expectedCounts["dc1"] = 5
expectedCounts["dc2"] = 5
require.Equal(t, expectedCounts, dcAllocsMap)
h.AssertEvalStatus(t, structs.EvalStatusComplete)
}
func TestServiceSched_JobRegister_Annotate(t *testing.T) {
h := NewHarness(t)

View File

@ -23,8 +23,8 @@ type propertySet struct {
// taskGroup is optionally set if the constraint is for a task group
taskGroup string
// constraint is the constraint this property set is checking
constraint *structs.Constraint
// targetAttribute is the attribute this property set is checking
targetAttribute string
// allowedCount is the allowed number of allocations that can have the
// distinct property
@ -75,14 +75,7 @@ func (p *propertySet) SetTGConstraint(constraint *structs.Constraint, taskGroup
// setConstraint is a shared helper for setting a job or task group constraint.
func (p *propertySet) setConstraint(constraint *structs.Constraint, taskGroup string) {
// Store that this is for a task group
if taskGroup != "" {
p.taskGroup = taskGroup
}
// Store the constraint
p.constraint = constraint
var allowedCount uint64
// Determine the number of allowed allocations with the property.
if v := constraint.RTarget; v != "" {
c, err := strconv.ParseUint(v, 10, 64)
@ -92,14 +85,35 @@ func (p *propertySet) setConstraint(constraint *structs.Constraint, taskGroup st
return
}
p.allowedCount = c
allowedCount = c
} else {
p.allowedCount = 1
allowedCount = 1
}
p.setTargetAttributeWithCount(constraint.LTarget, allowedCount, taskGroup)
}
// SetTargetAttribute is used to populate this property set without also storing allowed count
// This is used when evaluating spread stanzas
func (p *propertySet) SetTargetAttribute(targetAttribute string, taskGroup string) {
p.setTargetAttributeWithCount(targetAttribute, 0, taskGroup)
}
// setTargetAttributeWithCount is a shared helper for setting a job or task group attribute and allowedCount
// allowedCount can be zero when this is used in evaluating spread stanzas
func (p *propertySet) setTargetAttributeWithCount(targetAttribute string, allowedCount uint64, taskGroup string) {
// Store that this is for a task group
if taskGroup != "" {
p.taskGroup = taskGroup
}
// Store the constraint
p.targetAttribute = targetAttribute
p.allowedCount = allowedCount
// Determine the number of existing allocations that are using a property
// value
p.populateExisting(constraint)
p.populateExisting()
// Populate the proposed when setting the constraint. We do this because
// when detecting if we can inplace update an allocation we stage an
@ -110,7 +124,7 @@ func (p *propertySet) setConstraint(constraint *structs.Constraint, taskGroup st
// populateExisting is a helper shared when setting the constraint to populate
// the existing values.
func (p *propertySet) populateExisting(constraint *structs.Constraint) {
func (p *propertySet) populateExisting() {
// Retrieve all previously placed allocations
ws := memdb.NewWatchSet()
allocs, err := p.ctx.State().AllocsByJob(ws, p.namespace, p.jobID, false)
@ -193,19 +207,42 @@ func (p *propertySet) PopulateProposed() {
// placements. If the option does not satisfy the constraints an explanation is
// given.
func (p *propertySet) SatisfiesDistinctProperties(option *structs.Node, tg string) (bool, string) {
nValue, errorMsg, usedCount := p.UsedCount(option, tg)
if errorMsg != "" {
return false, errorMsg
}
// The property value has been used but within the number of allowed
// allocations.
if usedCount < p.allowedCount {
return true, ""
}
return false, fmt.Sprintf("distinct_property: %s=%s used by %d allocs", p.targetAttribute, nValue, usedCount)
}
// UsedCount returns the number of times the value of the attribute being tracked by this
// property set is used across current and proposed allocations. It also returns the resolved
// attribute value for the node, and an error message if it couldn't be resolved correctly
func (p *propertySet) UsedCount(option *structs.Node, tg string) (string, string, uint64) {
// Check if there was an error building
if p.errorBuilding != nil {
return false, p.errorBuilding.Error()
return "", p.errorBuilding.Error(), 0
}
// Get the nodes property value
nValue, ok := getProperty(option, p.constraint.LTarget)
nValue, ok := getProperty(option, p.targetAttribute)
if !ok {
return false, fmt.Sprintf("missing property %q", p.constraint.LTarget)
return nValue, fmt.Sprintf("missing property %q", p.targetAttribute), 0
}
combinedUse := p.GetCombinedUseMap()
usedCount := combinedUse[nValue]
return nValue, "", usedCount
}
// combine the counts of how many times the property has been used by
// existing and proposed allocations
// GetCombinedUseMap counts how many times the property has been used by
// existing and proposed allocations. It also takes into account any stopped
// allocations
func (p *propertySet) GetCombinedUseMap() map[string]uint64 {
combinedUse := make(map[string]uint64, helper.IntMax(len(p.existingValues), len(p.proposedValues)))
for _, usedValues := range []map[string]uint64{p.existingValues, p.proposedValues} {
for propertyValue, usedCount := range usedValues {
@ -228,20 +265,7 @@ func (p *propertySet) SatisfiesDistinctProperties(option *structs.Node, tg strin
combinedUse[propertyValue] = 0
}
}
usedCount, used := combinedUse[nValue]
if !used {
// The property value has never been used so we can use it.
return true, ""
}
// The property value has been used but within the number of allowed
// allocations.
if usedCount < p.allowedCount {
return true, ""
}
return false, fmt.Sprintf("distinct_property: %s=%s used by %d allocs", p.constraint.LTarget, nValue, usedCount)
return combinedUse
}
// filterAllocs filters a set of allocations to just be those that are running
@ -298,7 +322,7 @@ func (p *propertySet) populateProperties(allocs []*structs.Allocation, nodes map
properties map[string]uint64) {
for _, alloc := range allocs {
nProperty, ok := getProperty(nodes[alloc.NodeID], p.constraint.LTarget)
nProperty, ok := getProperty(nodes[alloc.NodeID], p.targetAttribute)
if !ok {
continue
}
@ -313,7 +337,7 @@ func getProperty(n *structs.Node, property string) (string, bool) {
return "", false
}
val, ok := resolveConstraintTarget(property, n)
val, ok := resolveTarget(property, n)
if !ok {
return "", false
}

View File

@ -3,15 +3,24 @@ package scheduler
import (
"fmt"
"math"
"github.com/hashicorp/nomad/nomad/structs"
)
const (
// binPackingMaxFitScore is the maximum possible bin packing fitness score.
// This is used to normalize bin packing score to a value between 0 and 1
binPackingMaxFitScore = 18.0
)
// Rank is used to provide a score and various ranking metadata
// along with a node when iterating. This state can be modified as
// various rank methods are applied.
type RankedNode struct {
Node *structs.Node
Score float64
FinalScore float64
Scores []float64
TaskResources map[string]*structs.Resources
// Allocs is used to cache the proposed allocations on the
@ -20,7 +29,7 @@ type RankedNode struct {
}
func (r *RankedNode) GoString() string {
return fmt.Sprintf("<Node: %s Score: %0.3f>", r.Node.ID, r.Score)
return fmt.Sprintf("<Node: %s Score: %0.3f>", r.Node.ID, r.FinalScore)
}
func (r *RankedNode) ProposedAllocs(ctx Context) ([]*structs.Allocation, error) {
@ -231,8 +240,9 @@ OUTER:
// Score the fit normally otherwise
fitness := structs.ScoreFit(option.Node, util)
option.Score += fitness
iter.ctx.Metrics().ScoreNode(option.Node, "binpack", fitness)
normalizedFit := fitness / binPackingMaxFitScore
option.Scores = append(option.Scores, normalizedFit)
iter.ctx.Metrics().ScoreNode(option.Node, "binpack", normalizedFit)
return option
}
}
@ -245,26 +255,31 @@ func (iter *BinPackIterator) Reset() {
// along side other allocations from this job. This is used to help distribute
// load across the cluster.
type JobAntiAffinityIterator struct {
ctx Context
source RankIterator
penalty float64
jobID string
ctx Context
source RankIterator
jobID string
taskGroup string
desiredCount int
}
// NewJobAntiAffinityIterator is used to create a JobAntiAffinityIterator that
// applies the given penalty for co-placement with allocs from this job.
func NewJobAntiAffinityIterator(ctx Context, source RankIterator, penalty float64, jobID string) *JobAntiAffinityIterator {
func NewJobAntiAffinityIterator(ctx Context, source RankIterator, jobID string) *JobAntiAffinityIterator {
iter := &JobAntiAffinityIterator{
ctx: ctx,
source: source,
penalty: penalty,
jobID: jobID,
ctx: ctx,
source: source,
jobID: jobID,
}
return iter
}
func (iter *JobAntiAffinityIterator) SetJob(jobID string) {
iter.jobID = jobID
func (iter *JobAntiAffinityIterator) SetJob(job *structs.Job) {
iter.jobID = job.ID
}
func (iter *JobAntiAffinityIterator) SetTaskGroup(tg *structs.TaskGroup) {
iter.taskGroup = tg.Name
iter.desiredCount = tg.Count
}
func (iter *JobAntiAffinityIterator) Next() *RankedNode {
@ -286,15 +301,16 @@ func (iter *JobAntiAffinityIterator) Next() *RankedNode {
// Determine the number of collisions
collisions := 0
for _, alloc := range proposed {
if alloc.JobID == iter.jobID {
if alloc.JobID == iter.jobID && alloc.TaskGroup == iter.taskGroup {
collisions += 1
}
}
// Apply a penalty if there are collisions
// Calculate the penalty based on number of collisions
// TODO(preetha): Figure out if batch jobs need a different scoring penalty where collisions matter less
if collisions > 0 {
scorePenalty := -1 * float64(collisions) * iter.penalty
option.Score += scorePenalty
scorePenalty := -1 * float64(collisions+1) / float64(iter.desiredCount)
option.Scores = append(option.Scores, scorePenalty)
iter.ctx.Metrics().ScoreNode(option.Node, "job-anti-affinity", scorePenalty)
}
return option
@ -305,32 +321,30 @@ func (iter *JobAntiAffinityIterator) Reset() {
iter.source.Reset()
}
// NodeAntiAffinityIterator is used to apply a penalty to
// NodeReschedulingPenaltyIterator is used to apply a penalty to
// a node that had a previous failed allocation for the same job.
// This is used when attempting to reschedule a failed alloc
type NodeAntiAffinityIterator struct {
type NodeReschedulingPenaltyIterator struct {
ctx Context
source RankIterator
penalty float64
penaltyNodes map[string]struct{}
}
// NewNodeAntiAffinityIterator is used to create a NodeAntiAffinityIterator that
// applies the given penalty for placement onto nodes in penaltyNodes
func NewNodeAntiAffinityIterator(ctx Context, source RankIterator, penalty float64) *NodeAntiAffinityIterator {
iter := &NodeAntiAffinityIterator{
ctx: ctx,
source: source,
penalty: penalty,
// NewNodeReschedulingPenaltyIterator is used to create a NodeReschedulingPenaltyIterator that
// applies the given scoring penalty for placement onto nodes in penaltyNodes
func NewNodeReschedulingPenaltyIterator(ctx Context, source RankIterator) *NodeReschedulingPenaltyIterator {
iter := &NodeReschedulingPenaltyIterator{
ctx: ctx,
source: source,
}
return iter
}
func (iter *NodeAntiAffinityIterator) SetPenaltyNodes(penaltyNodes map[string]struct{}) {
func (iter *NodeReschedulingPenaltyIterator) SetPenaltyNodes(penaltyNodes map[string]struct{}) {
iter.penaltyNodes = penaltyNodes
}
func (iter *NodeAntiAffinityIterator) Next() *RankedNode {
func (iter *NodeReschedulingPenaltyIterator) Next() *RankedNode {
for {
option := iter.source.Next()
if option == nil {
@ -339,14 +353,144 @@ func (iter *NodeAntiAffinityIterator) Next() *RankedNode {
_, ok := iter.penaltyNodes[option.Node.ID]
if ok {
option.Score -= iter.penalty
iter.ctx.Metrics().ScoreNode(option.Node, "node-anti-affinity", iter.penalty)
option.Scores = append(option.Scores, -1)
iter.ctx.Metrics().ScoreNode(option.Node, "node-reschedule-penalty", -1)
}
return option
}
}
func (iter *NodeAntiAffinityIterator) Reset() {
func (iter *NodeReschedulingPenaltyIterator) Reset() {
iter.penaltyNodes = make(map[string]struct{})
iter.source.Reset()
}
// NodeAffinityIterator is used to resolve any affinity rules in the job or task group,
// and apply a weighted score to nodes if they match.
type NodeAffinityIterator struct {
ctx Context
source RankIterator
jobAffinities []*structs.Affinity
affinities []*structs.Affinity
}
// NewNodeAffinityIterator is used to create a NodeAffinityIterator that
// applies a weighted score according to whether nodes match any
// affinities in the job or task group.
func NewNodeAffinityIterator(ctx Context, source RankIterator) *NodeAffinityIterator {
return &NodeAffinityIterator{
ctx: ctx,
source: source,
}
}
func (iter *NodeAffinityIterator) SetJob(job *structs.Job) {
iter.jobAffinities = job.Affinities
}
func (iter *NodeAffinityIterator) SetTaskGroup(tg *structs.TaskGroup) {
// Merge job affinities
if iter.jobAffinities != nil {
iter.affinities = append(iter.affinities, iter.jobAffinities...)
}
// Merge task group affinities and task affinities
if tg.Affinities != nil {
iter.affinities = append(iter.affinities, tg.Affinities...)
}
for _, task := range tg.Tasks {
if task.Affinities != nil {
iter.affinities = append(iter.affinities, task.Affinities...)
}
}
}
func (iter *NodeAffinityIterator) Reset() {
iter.source.Reset()
// This method is called between each task group, so only reset the merged list
iter.affinities = nil
}
func (iter *NodeAffinityIterator) hasAffinities() bool {
return len(iter.affinities) > 0
}
func (iter *NodeAffinityIterator) Next() *RankedNode {
option := iter.source.Next()
if option == nil {
return nil
}
if !iter.hasAffinities() {
return option
}
// TODO(preetha): we should calculate normalized weights once and reuse it here
sumWeight := 0.0
for _, affinity := range iter.affinities {
sumWeight += math.Abs(affinity.Weight)
}
totalAffinityScore := 0.0
for _, affinity := range iter.affinities {
if matchesAffinity(iter.ctx, affinity, option.Node) {
totalAffinityScore += affinity.Weight
}
}
normScore := totalAffinityScore / sumWeight
if totalAffinityScore != 0.0 {
option.Scores = append(option.Scores, normScore)
iter.ctx.Metrics().ScoreNode(option.Node, "node-affinity", normScore)
}
return option
}
func matchesAffinity(ctx Context, affinity *structs.Affinity, option *structs.Node) bool {
//TODO(preetha): Add a step here that filters based on computed node class for potential speedup
// Resolve the targets
lVal, ok := resolveTarget(affinity.LTarget, option)
if !ok {
return false
}
rVal, ok := resolveTarget(affinity.RTarget, option)
if !ok {
return false
}
// Check if satisfied
return checkAffinity(ctx, affinity.Operand, lVal, rVal)
}
// ScoreNormalizationIterator is used to combine scores from various prior
// iterators and combine them into one final score. The current implementation
// averages the scores together.
type ScoreNormalizationIterator struct {
ctx Context
source RankIterator
}
// NewScoreNormalizationIterator is used to create a ScoreNormalizationIterator that
// averages scores from various iterators into a final score.
func NewScoreNormalizationIterator(ctx Context, source RankIterator) *ScoreNormalizationIterator {
return &ScoreNormalizationIterator{
ctx: ctx,
source: source}
}
func (iter *ScoreNormalizationIterator) Reset() {
iter.source.Reset()
}
func (iter *ScoreNormalizationIterator) Next() *RankedNode {
option := iter.source.Next()
if option == nil || len(option.Scores) == 0 {
return option
}
numScorers := len(option.Scores)
sum := 0.0
for _, score := range option.Scores {
sum += score
}
option.FinalScore = sum / float64(numScorers)
//TODO(preetha): Turn map in allocmetrics into a heap of topK scores
iter.ctx.Metrics().ScoreNode(option.Node, "normalized-score", option.FinalScore)
return option
}

View File

@ -85,7 +85,9 @@ func TestBinPackIterator_NoExistingAlloc(t *testing.T) {
binp := NewBinPackIterator(ctx, static, false, 0)
binp.SetTaskGroup(taskGroup)
out := collectRanked(binp)
scoreNorm := NewScoreNormalizationIterator(ctx, binp)
out := collectRanked(scoreNorm)
if len(out) != 2 {
t.Fatalf("Bad: %v", out)
}
@ -93,11 +95,11 @@ func TestBinPackIterator_NoExistingAlloc(t *testing.T) {
t.Fatalf("Bad: %v", out)
}
if out[0].Score != 18 {
t.Fatalf("Bad: %v", out[0])
if out[0].FinalScore != 1.0 {
t.Fatalf("Bad Score: %v", out[0].FinalScore)
}
if out[1].Score < 10 || out[1].Score > 16 {
t.Fatalf("Bad: %v", out[1])
if out[1].FinalScore < 0.75 || out[1].FinalScore > 0.95 {
t.Fatalf("Bad Score: %v", out[1].FinalScore)
}
}
@ -164,16 +166,18 @@ func TestBinPackIterator_PlannedAlloc(t *testing.T) {
binp := NewBinPackIterator(ctx, static, false, 0)
binp.SetTaskGroup(taskGroup)
out := collectRanked(binp)
scoreNorm := NewScoreNormalizationIterator(ctx, binp)
out := collectRanked(scoreNorm)
if len(out) != 1 {
t.Fatalf("Bad: %#v", out)
}
if out[0] != nodes[1] {
t.Fatalf("Bad: %v", out)
t.Fatalf("Bad Score: %v", out)
}
if out[0].Score != 18 {
t.Fatalf("Bad: %v", out[0])
if out[0].FinalScore != 1.0 {
t.Fatalf("Bad Score: %v", out[0].FinalScore)
}
}
@ -254,15 +258,17 @@ func TestBinPackIterator_ExistingAlloc(t *testing.T) {
binp := NewBinPackIterator(ctx, static, false, 0)
binp.SetTaskGroup(taskGroup)
out := collectRanked(binp)
scoreNorm := NewScoreNormalizationIterator(ctx, binp)
out := collectRanked(scoreNorm)
if len(out) != 1 {
t.Fatalf("Bad: %#v", out)
}
if out[0] != nodes[1] {
t.Fatalf("Bad: %v", out)
}
if out[0].Score != 18 {
t.Fatalf("Bad: %v", out[0])
if out[0].FinalScore != 1.0 {
t.Fatalf("Bad Score: %v", out[0].FinalScore)
}
}
@ -348,18 +354,20 @@ func TestBinPackIterator_ExistingAlloc_PlannedEvict(t *testing.T) {
binp := NewBinPackIterator(ctx, static, false, 0)
binp.SetTaskGroup(taskGroup)
out := collectRanked(binp)
scoreNorm := NewScoreNormalizationIterator(ctx, binp)
out := collectRanked(scoreNorm)
if len(out) != 2 {
t.Fatalf("Bad: %#v", out)
}
if out[0] != nodes[0] || out[1] != nodes[1] {
t.Fatalf("Bad: %v", out)
}
if out[0].Score < 10 || out[0].Score > 16 {
t.Fatalf("Bad: %v", out[0])
if out[0].FinalScore < 0.50 || out[0].FinalScore > 0.95 {
t.Fatalf("Bad Score: %v", out[0].FinalScore)
}
if out[1].Score != 18 {
t.Fatalf("Bad: %v", out[1])
if out[1].FinalScore != 1 {
t.Fatalf("Bad Score: %v", out[1].FinalScore)
}
}
@ -379,16 +387,23 @@ func TestJobAntiAffinity_PlannedAlloc(t *testing.T) {
}
static := NewStaticRankIterator(ctx, nodes)
job := mock.Job()
job.ID = "foo"
tg := job.TaskGroups[0]
tg.Count = 4
// Add a planned alloc to node1 that fills it
plan := ctx.Plan()
plan.NodeAllocation[nodes[0].Node.ID] = []*structs.Allocation{
{
ID: uuid.Generate(),
JobID: "foo",
ID: uuid.Generate(),
JobID: "foo",
TaskGroup: tg.Name,
},
{
ID: uuid.Generate(),
JobID: "foo",
ID: uuid.Generate(),
JobID: "foo",
TaskGroup: tg.Name,
},
}
@ -399,24 +414,29 @@ func TestJobAntiAffinity_PlannedAlloc(t *testing.T) {
},
}
binp := NewJobAntiAffinityIterator(ctx, static, 5.0, "foo")
jobAntiAff := NewJobAntiAffinityIterator(ctx, static, "foo")
jobAntiAff.SetJob(job)
jobAntiAff.SetTaskGroup(tg)
out := collectRanked(binp)
scoreNorm := NewScoreNormalizationIterator(ctx, jobAntiAff)
out := collectRanked(scoreNorm)
if len(out) != 2 {
t.Fatalf("Bad: %#v", out)
}
if out[0] != nodes[0] {
t.Fatalf("Bad: %v", out)
}
if out[0].Score != -10.0 {
t.Fatalf("Bad: %#v", out[0])
// Score should be -(#collissions+1/desired_count) => -(3/4)
if out[0].FinalScore != -0.75 {
t.Fatalf("Bad Score: %#v", out[0].FinalScore)
}
if out[1] != nodes[1] {
t.Fatalf("Bad: %v", out)
}
if out[1].Score != 0.0 {
t.Fatalf("Bad: %v", out[1])
if out[1].FinalScore != 0.0 {
t.Fatalf("Bad Score: %v", out[1].FinalScore)
}
}
@ -450,17 +470,159 @@ func TestNodeAntiAffinity_PenaltyNodes(t *testing.T) {
}
static := NewStaticRankIterator(ctx, nodes)
nodeAntiAffIter := NewNodeAntiAffinityIterator(ctx, static, 50.0)
nodeAntiAffIter := NewNodeReschedulingPenaltyIterator(ctx, static)
nodeAntiAffIter.SetPenaltyNodes(map[string]struct{}{node1.ID: {}})
out := collectRanked(nodeAntiAffIter)
scoreNorm := NewScoreNormalizationIterator(ctx, nodeAntiAffIter)
out := collectRanked(scoreNorm)
require := require.New(t)
require.Equal(2, len(out))
require.Equal(node1.ID, out[0].Node.ID)
require.Equal(-50.0, out[0].Score)
require.Equal(-1.0, out[0].FinalScore)
require.Equal(node2.ID, out[1].Node.ID)
require.Equal(0.0, out[1].Score)
require.Equal(0.0, out[1].FinalScore)
}
func TestScoreNormalizationIterator(t *testing.T) {
// Test normalized scores when there is more than one scorer
_, ctx := testContext(t)
nodes := []*RankedNode{
{
Node: &structs.Node{
ID: uuid.Generate(),
},
},
{
Node: &structs.Node{
ID: uuid.Generate(),
},
},
}
static := NewStaticRankIterator(ctx, nodes)
job := mock.Job()
job.ID = "foo"
tg := job.TaskGroups[0]
tg.Count = 4
// Add a planned alloc to node1 that fills it
plan := ctx.Plan()
plan.NodeAllocation[nodes[0].Node.ID] = []*structs.Allocation{
{
ID: uuid.Generate(),
JobID: "foo",
TaskGroup: tg.Name,
},
{
ID: uuid.Generate(),
JobID: "foo",
TaskGroup: tg.Name,
},
}
// Add a planned alloc to node2 that half fills it
plan.NodeAllocation[nodes[1].Node.ID] = []*structs.Allocation{
{
JobID: "bar",
},
}
jobAntiAff := NewJobAntiAffinityIterator(ctx, static, "foo")
jobAntiAff.SetJob(job)
jobAntiAff.SetTaskGroup(tg)
nodeReschedulePenaltyIter := NewNodeReschedulingPenaltyIterator(ctx, jobAntiAff)
nodeReschedulePenaltyIter.SetPenaltyNodes(map[string]struct{}{nodes[0].Node.ID: {}})
scoreNorm := NewScoreNormalizationIterator(ctx, nodeReschedulePenaltyIter)
out := collectRanked(scoreNorm)
require := require.New(t)
require.Equal(2, len(out))
require.Equal(out[0], nodes[0])
// Score should be averaged between both scorers
// -0.75 from job anti affinity and -1 from node rescheduling penalty
require.Equal(-0.875, out[0].FinalScore)
require.Equal(out[1], nodes[1])
require.Equal(out[1].FinalScore, 0.0)
}
func TestNodeAffinityIterator(t *testing.T) {
_, ctx := testContext(t)
nodes := []*RankedNode{
{Node: mock.Node()},
{Node: mock.Node()},
{Node: mock.Node()},
{Node: mock.Node()},
}
nodes[0].Node.Attributes["kernel.version"] = "4.9"
nodes[1].Node.Datacenter = "dc2"
nodes[2].Node.Datacenter = "dc2"
nodes[2].Node.NodeClass = "large"
affinities := []*structs.Affinity{
{
Operand: "=",
LTarget: "${node.datacenter}",
RTarget: "dc1",
Weight: 200,
},
{
Operand: "=",
LTarget: "${node.datacenter}",
RTarget: "dc2",
Weight: -100,
},
{
Operand: "version",
LTarget: "${attr.kernel.version}",
RTarget: ">4.0",
Weight: 50,
},
{
Operand: "is",
LTarget: "${node.class}",
RTarget: "large",
Weight: 50,
},
}
static := NewStaticRankIterator(ctx, nodes)
job := mock.Job()
job.ID = "foo"
tg := job.TaskGroups[0]
tg.Affinities = affinities
nodeAffinity := NewNodeAffinityIterator(ctx, static)
nodeAffinity.SetTaskGroup(tg)
scoreNorm := NewScoreNormalizationIterator(ctx, nodeAffinity)
out := collectRanked(scoreNorm)
expectedScores := make(map[string]float64)
// Total weight = 400
// Node 0 matches two affinities(dc and kernel version), total weight =250
expectedScores[nodes[0].Node.ID] = 0.625
// Node 1 matches an anti affinity, weight = -100
expectedScores[nodes[1].Node.ID] = -0.25
// Node 2 matches one affinity(node class) with weight 50
expectedScores[nodes[2].Node.ID] = -0.125
// Node 3 matches one affinity (dc) with weight = 200
expectedScores[nodes[3].Node.ID] = 0.5
require := require.New(t)
for _, n := range out {
require.Equal(expectedScores[n.Node.ID], n.FinalScore)
}
}

View File

@ -43,7 +43,7 @@ func (iter *LimitIterator) Next() *RankedNode {
if len(iter.skippedNodes) < iter.maxSkip {
// Try skipping ahead up to maxSkip to find an option with score lesser than the threshold
for option != nil && option.Score <= iter.scoreThreshold && len(iter.skippedNodes) < iter.maxSkip {
for option != nil && option.FinalScore <= iter.scoreThreshold && len(iter.skippedNodes) < iter.maxSkip {
iter.skippedNodes = append(iter.skippedNodes, option)
option = iter.source.Next()
}
@ -104,7 +104,7 @@ func (iter *MaxScoreIterator) Next() *RankedNode {
return iter.max
}
if iter.max == nil || option.Score > iter.max.Score {
if iter.max == nil || option.FinalScore > iter.max.FinalScore {
iter.max = option
}
}

View File

@ -12,16 +12,16 @@ func TestLimitIterator(t *testing.T) {
_, ctx := testContext(t)
nodes := []*RankedNode{
{
Node: mock.Node(),
Score: 1,
Node: mock.Node(),
FinalScore: 1,
},
{
Node: mock.Node(),
Score: 2,
Node: mock.Node(),
FinalScore: 2,
},
{
Node: mock.Node(),
Score: 3,
Node: mock.Node(),
FinalScore: 3,
},
}
static := NewStaticRankIterator(ctx, nodes)
@ -73,26 +73,26 @@ func TestLimitIterator_ScoreThreshold(t *testing.T) {
desc: "Skips one low scoring node",
nodes: []*RankedNode{
{
Node: nodes[0],
Score: -1,
Node: nodes[0],
FinalScore: -1,
},
{
Node: nodes[1],
Score: 2,
Node: nodes[1],
FinalScore: 2,
},
{
Node: nodes[2],
Score: 3,
Node: nodes[2],
FinalScore: 3,
},
},
expectedOut: []*RankedNode{
{
Node: nodes[1],
Score: 2,
Node: nodes[1],
FinalScore: 2,
},
{
Node: nodes[2],
Score: 3,
Node: nodes[2],
FinalScore: 3,
},
},
threshold: -1,
@ -103,30 +103,30 @@ func TestLimitIterator_ScoreThreshold(t *testing.T) {
desc: "Skips maxSkip scoring nodes",
nodes: []*RankedNode{
{
Node: nodes[0],
Score: -1,
Node: nodes[0],
FinalScore: -1,
},
{
Node: nodes[1],
Score: -2,
Node: nodes[1],
FinalScore: -2,
},
{
Node: nodes[2],
Score: 3,
Node: nodes[2],
FinalScore: 3,
},
{
Node: nodes[3],
Score: 4,
Node: nodes[3],
FinalScore: 4,
},
},
expectedOut: []*RankedNode{
{
Node: nodes[2],
Score: 3,
Node: nodes[2],
FinalScore: 3,
},
{
Node: nodes[3],
Score: 4,
Node: nodes[3],
FinalScore: 4,
},
},
threshold: -1,
@ -137,30 +137,30 @@ func TestLimitIterator_ScoreThreshold(t *testing.T) {
desc: "maxSkip limit reached",
nodes: []*RankedNode{
{
Node: nodes[0],
Score: -1,
Node: nodes[0],
FinalScore: -1,
},
{
Node: nodes[1],
Score: -6,
Node: nodes[1],
FinalScore: -6,
},
{
Node: nodes[2],
Score: -3,
Node: nodes[2],
FinalScore: -3,
},
{
Node: nodes[3],
Score: -4,
Node: nodes[3],
FinalScore: -4,
},
},
expectedOut: []*RankedNode{
{
Node: nodes[2],
Score: -3,
Node: nodes[2],
FinalScore: -3,
},
{
Node: nodes[3],
Score: -4,
Node: nodes[3],
FinalScore: -4,
},
},
threshold: -1,
@ -171,22 +171,22 @@ func TestLimitIterator_ScoreThreshold(t *testing.T) {
desc: "draw both from skipped nodes",
nodes: []*RankedNode{
{
Node: nodes[0],
Score: -1,
Node: nodes[0],
FinalScore: -1,
},
{
Node: nodes[1],
Score: -6,
Node: nodes[1],
FinalScore: -6,
},
},
expectedOut: []*RankedNode{
{
Node: nodes[0],
Score: -1,
Node: nodes[0],
FinalScore: -1,
},
{
Node: nodes[1],
Score: -6,
Node: nodes[1],
FinalScore: -6,
},
},
threshold: -1,
@ -196,22 +196,22 @@ func TestLimitIterator_ScoreThreshold(t *testing.T) {
desc: "one node above threshold, one skipped node",
nodes: []*RankedNode{
{
Node: nodes[0],
Score: -1,
Node: nodes[0],
FinalScore: -1,
},
{
Node: nodes[1],
Score: 5,
Node: nodes[1],
FinalScore: 5,
},
},
expectedOut: []*RankedNode{
{
Node: nodes[1],
Score: 5,
Node: nodes[1],
FinalScore: 5,
},
{
Node: nodes[0],
Score: -1,
Node: nodes[0],
FinalScore: -1,
},
},
threshold: -1,
@ -222,30 +222,30 @@ func TestLimitIterator_ScoreThreshold(t *testing.T) {
desc: "low scoring nodes interspersed",
nodes: []*RankedNode{
{
Node: nodes[0],
Score: -1,
Node: nodes[0],
FinalScore: -1,
},
{
Node: nodes[1],
Score: 5,
Node: nodes[1],
FinalScore: 5,
},
{
Node: nodes[2],
Score: -2,
Node: nodes[2],
FinalScore: -2,
},
{
Node: nodes[3],
Score: 2,
Node: nodes[3],
FinalScore: 2,
},
},
expectedOut: []*RankedNode{
{
Node: nodes[1],
Score: 5,
Node: nodes[1],
FinalScore: 5,
},
{
Node: nodes[3],
Score: 2,
Node: nodes[3],
FinalScore: 2,
},
},
threshold: -1,
@ -256,14 +256,14 @@ func TestLimitIterator_ScoreThreshold(t *testing.T) {
desc: "only one node, score below threshold",
nodes: []*RankedNode{
{
Node: nodes[0],
Score: -1,
Node: nodes[0],
FinalScore: -1,
},
},
expectedOut: []*RankedNode{
{
Node: nodes[0],
Score: -1,
Node: nodes[0],
FinalScore: -1,
},
},
threshold: -1,
@ -274,22 +274,22 @@ func TestLimitIterator_ScoreThreshold(t *testing.T) {
desc: "maxSkip is more than available nodes",
nodes: []*RankedNode{
{
Node: nodes[0],
Score: -2,
Node: nodes[0],
FinalScore: -2,
},
{
Node: nodes[1],
Score: 1,
Node: nodes[1],
FinalScore: 1,
},
},
expectedOut: []*RankedNode{
{
Node: nodes[1],
Score: 1,
Node: nodes[1],
FinalScore: 1,
},
{
Node: nodes[0],
Score: -2,
Node: nodes[0],
FinalScore: -2,
},
},
threshold: -1,
@ -320,16 +320,16 @@ func TestMaxScoreIterator(t *testing.T) {
_, ctx := testContext(t)
nodes := []*RankedNode{
{
Node: mock.Node(),
Score: 1,
Node: mock.Node(),
FinalScore: 1,
},
{
Node: mock.Node(),
Score: 2,
Node: mock.Node(),
FinalScore: 2,
},
{
Node: mock.Node(),
Score: 3,
Node: mock.Node(),
FinalScore: 3,
},
}
static := NewStaticRankIterator(ctx, nodes)

252
scheduler/spread.go Normal file
View File

@ -0,0 +1,252 @@
package scheduler
import (
"github.com/hashicorp/nomad/nomad/structs"
)
const (
// implicitTarget is used to represent any remaining attribute values
// when target percentages don't add up to 100
implicitTarget = "*"
)
// SpreadIterator is used to spread allocations across a specified attribute
// according to preset weights
type SpreadIterator struct {
ctx Context
source RankIterator
job *structs.Job
tg *structs.TaskGroup
// jobSpreads is a slice of spread stored at the job level which apply
// to all task groups
jobSpreads []*structs.Spread
// tgSpreadInfo is a map per task group with precomputed
// values for desired counts and weight
tgSpreadInfo map[string]spreadAttributeMap
// sumSpreadWeights tracks the total weight across all spread
// stanzas
sumSpreadWeights int
// hasSpread is used to early return when the job/task group
// does not have spread configured
hasSpread bool
// groupProperySets is a memoized map from task group to property sets.
// existing allocs are computed once, and allocs from the plan are updated
// when Reset is called
groupPropertySets map[string][]*propertySet
}
type spreadAttributeMap map[string]*spreadInfo
type spreadInfo struct {
weight int
desiredCounts map[string]float64
}
func NewSpreadIterator(ctx Context, source RankIterator) *SpreadIterator {
iter := &SpreadIterator{
ctx: ctx,
source: source,
groupPropertySets: make(map[string][]*propertySet),
tgSpreadInfo: make(map[string]spreadAttributeMap),
}
return iter
}
func (iter *SpreadIterator) Reset() {
iter.source.Reset()
for _, sets := range iter.groupPropertySets {
for _, ps := range sets {
ps.PopulateProposed()
}
}
}
func (iter *SpreadIterator) SetJob(job *structs.Job) {
iter.job = job
if job.Spreads != nil {
iter.jobSpreads = job.Spreads
}
}
func (iter *SpreadIterator) SetTaskGroup(tg *structs.TaskGroup) {
iter.tg = tg
// Build the property set at the taskgroup level
if _, ok := iter.groupPropertySets[tg.Name]; !ok {
// First add property sets that are at the job level for this task group
for _, spread := range iter.jobSpreads {
pset := NewPropertySet(iter.ctx, iter.job)
pset.SetTargetAttribute(spread.Attribute, tg.Name)
iter.groupPropertySets[tg.Name] = append(iter.groupPropertySets[tg.Name], pset)
}
// Include property sets at the task group level
for _, spread := range tg.Spreads {
pset := NewPropertySet(iter.ctx, iter.job)
pset.SetTargetAttribute(spread.Attribute, tg.Name)
iter.groupPropertySets[tg.Name] = append(iter.groupPropertySets[tg.Name], pset)
}
}
// Check if there are any spreads configured
iter.hasSpread = len(iter.groupPropertySets[tg.Name]) != 0
// Build tgSpreadInfo at the task group level
if _, ok := iter.tgSpreadInfo[tg.Name]; !ok {
iter.computeSpreadInfo(tg)
}
}
func (iter *SpreadIterator) hasSpreads() bool {
return iter.hasSpread
}
func (iter *SpreadIterator) Next() *RankedNode {
for {
option := iter.source.Next()
// Hot path if there is nothing to check
if option == nil || !iter.hasSpreads() {
return option
}
tgName := iter.tg.Name
propertySets := iter.groupPropertySets[tgName]
// Iterate over each spread attribute's property set and add a weighted score
totalSpreadScore := 0.0
for _, pset := range propertySets {
nValue, errorMsg, usedCount := pset.UsedCount(option.Node, tgName)
// Add one to include placement on this node in the scoring calculation
usedCount += 1
// Set score to -1 if there were errors in building this attribute
if errorMsg != "" {
iter.ctx.Logger().Printf("[WARN] sched: error building spread attributes for task group %v:%v", tgName, errorMsg)
totalSpreadScore -= 1.0
continue
}
spreadAttributeMap := iter.tgSpreadInfo[tgName]
spreadDetails := spreadAttributeMap[pset.targetAttribute]
if len(spreadDetails.desiredCounts) == 0 {
// When desired counts map is empty the user didn't specify any targets
// Use even spreading scoring algorithm for this scenario
scoreBoost := evenSpreadScoreBoost(pset, option.Node)
totalSpreadScore += scoreBoost
} else {
// Get the desired count
desiredCount, ok := spreadDetails.desiredCounts[nValue]
if !ok {
// See if there is an implicit target
desiredCount, ok = spreadDetails.desiredCounts[implicitTarget]
if !ok {
// The desired count for this attribute is zero if it gets here
// so use the maximum possible penalty for this node
totalSpreadScore -= 1.0
continue
}
}
// Calculate the relative weight of this specific spread attribute
spreadWeight := float64(spreadDetails.weight) / float64(iter.sumSpreadWeights)
// Score Boost is proportional the difference between current and desired count
// It is negative when the used count is greater than the desired count
// It is multiplied with the spread weight to account for cases where the job has
// more than one spread attribute
scoreBoost := ((desiredCount - float64(usedCount)) / desiredCount) * spreadWeight
totalSpreadScore += scoreBoost
}
}
if totalSpreadScore != 0.0 {
option.Scores = append(option.Scores, totalSpreadScore)
iter.ctx.Metrics().ScoreNode(option.Node, "allocation-spread", totalSpreadScore)
}
return option
}
}
// evenSpreadScoreBoost is a scoring helper that calculates the score
// for the option when even spread is desired (all attribute values get equal preference)
func evenSpreadScoreBoost(pset *propertySet, option *structs.Node) float64 {
combinedUseMap := pset.GetCombinedUseMap()
if len(combinedUseMap) == 0 {
// Nothing placed yet, so return 0 as the score
return 0.0
}
// Get the nodes property value
nValue, ok := getProperty(option, pset.targetAttribute)
// Maximum possible penalty when the attribute isn't set on the node
if !ok {
return -1.0
}
currentAttributeCount := combinedUseMap[nValue]
minCount := uint64(0)
maxCount := uint64(0)
for _, value := range combinedUseMap {
if minCount == 0 || value < minCount {
minCount = value
}
if maxCount == 0 || value > maxCount {
maxCount = value
}
}
// calculate boost based on delta between the current and the minimum
var deltaBoost float64
if minCount == 0 {
deltaBoost = -1.0
} else {
delta := int(minCount - currentAttributeCount)
deltaBoost = float64(delta) / float64(minCount)
}
if currentAttributeCount != minCount {
// Boost based on delta between current and min
return deltaBoost
} else if minCount == maxCount {
// Maximum possible penalty when the distribution is even
return -1.0
}
// Penalty based on delta from max value
delta := int(maxCount - minCount)
deltaBoost = float64(delta) / float64(minCount)
return deltaBoost
}
// computeSpreadInfo computes and stores percentages and total values
// from all spreads that apply to a specific task group
func (iter *SpreadIterator) computeSpreadInfo(tg *structs.TaskGroup) {
spreadInfos := make(spreadAttributeMap, len(tg.Spreads))
totalCount := tg.Count
// Always combine any spread stanzas defined at the job level here
combinedSpreads := make([]*structs.Spread, 0, len(tg.Spreads)+len(iter.jobSpreads))
combinedSpreads = append(combinedSpreads, tg.Spreads...)
combinedSpreads = append(combinedSpreads, iter.jobSpreads...)
for _, spread := range combinedSpreads {
si := &spreadInfo{weight: spread.Weight, desiredCounts: make(map[string]float64)}
sumDesiredCounts := 0.0
for _, st := range spread.SpreadTarget {
desiredCount := (float64(st.Percent) / float64(100)) * float64(totalCount)
si.desiredCounts[st.Value] = desiredCount
sumDesiredCounts += desiredCount
}
// Account for remaining count only if there is any spread targets
if sumDesiredCounts > 0 && sumDesiredCounts < float64(totalCount) {
remainingCount := float64(totalCount) - sumDesiredCounts
si.desiredCounts[implicitTarget] = remainingCount
}
spreadInfos[spread.Attribute] = si
iter.sumSpreadWeights += spread.Weight
}
iter.tgSpreadInfo[tg.Name] = spreadInfos
}

545
scheduler/spread_test.go Normal file
View File

@ -0,0 +1,545 @@
package scheduler
import (
"testing"
"fmt"
"github.com/hashicorp/nomad/helper/uuid"
"github.com/hashicorp/nomad/nomad/mock"
"github.com/hashicorp/nomad/nomad/structs"
"github.com/stretchr/testify/require"
)
func TestSpreadIterator_SingleAttribute(t *testing.T) {
state, ctx := testContext(t)
dcs := []string{"dc1", "dc2", "dc1", "dc1"}
var nodes []*RankedNode
// Add these nodes to the state store
for i, dc := range dcs {
node := mock.Node()
node.Datacenter = dc
if err := state.UpsertNode(uint64(100+i), node); err != nil {
t.Fatalf("failed to upsert node: %v", err)
}
nodes = append(nodes, &RankedNode{Node: node})
}
static := NewStaticRankIterator(ctx, nodes)
job := mock.Job()
tg := job.TaskGroups[0]
job.TaskGroups[0].Count = 10
// add allocs to nodes in dc1
upserting := []*structs.Allocation{
{
Namespace: structs.DefaultNamespace,
TaskGroup: tg.Name,
JobID: job.ID,
Job: job,
ID: uuid.Generate(),
EvalID: uuid.Generate(),
NodeID: nodes[0].Node.ID,
},
{
Namespace: structs.DefaultNamespace,
TaskGroup: tg.Name,
JobID: job.ID,
Job: job,
ID: uuid.Generate(),
EvalID: uuid.Generate(),
NodeID: nodes[2].Node.ID,
},
}
if err := state.UpsertAllocs(1000, upserting); err != nil {
t.Fatalf("failed to UpsertAllocs: %v", err)
}
// Create spread target of 80% in dc1
// Implicitly, this means 20% in dc2
spread := &structs.Spread{
Weight: 100,
Attribute: "${node.datacenter}",
SpreadTarget: []*structs.SpreadTarget{
{
Value: "dc1",
Percent: 80,
},
},
}
tg.Spreads = []*structs.Spread{spread}
spreadIter := NewSpreadIterator(ctx, static)
spreadIter.SetJob(job)
spreadIter.SetTaskGroup(tg)
scoreNorm := NewScoreNormalizationIterator(ctx, spreadIter)
out := collectRanked(scoreNorm)
// Expect nodes in dc1 with existing allocs to get a boost
// Boost should be ((desiredCount-actual)/desired)*spreadWeight
// For this test, that becomes dc1 = ((8-3)/8 ) = 0.5, and dc2=(2-1)/2
expectedScores := map[string]float64{
"dc1": 0.625,
"dc2": 0.5,
}
for _, rn := range out {
require.Equal(t, expectedScores[rn.Node.Datacenter], rn.FinalScore)
}
// Update the plan to add more allocs to nodes in dc1
// After this step there are enough allocs to meet the desired count in dc1
ctx.plan.NodeAllocation[nodes[0].Node.ID] = []*structs.Allocation{
{
Namespace: structs.DefaultNamespace,
TaskGroup: tg.Name,
JobID: job.ID,
Job: job,
ID: uuid.Generate(),
NodeID: nodes[0].Node.ID,
},
{
Namespace: structs.DefaultNamespace,
TaskGroup: tg.Name,
JobID: job.ID,
Job: job,
ID: uuid.Generate(),
NodeID: nodes[0].Node.ID,
},
// Should be ignored as it is a different job.
{
Namespace: structs.DefaultNamespace,
TaskGroup: "bbb",
JobID: "ignore 2",
Job: job,
ID: uuid.Generate(),
NodeID: nodes[0].Node.ID,
},
}
ctx.plan.NodeAllocation[nodes[3].Node.ID] = []*structs.Allocation{
{
Namespace: structs.DefaultNamespace,
TaskGroup: tg.Name,
JobID: job.ID,
Job: job,
ID: uuid.Generate(),
NodeID: nodes[3].Node.ID,
},
{
Namespace: structs.DefaultNamespace,
TaskGroup: tg.Name,
JobID: job.ID,
Job: job,
ID: uuid.Generate(),
NodeID: nodes[3].Node.ID,
},
{
Namespace: structs.DefaultNamespace,
TaskGroup: tg.Name,
JobID: job.ID,
Job: job,
ID: uuid.Generate(),
NodeID: nodes[3].Node.ID,
},
}
// Reset the scores
for _, node := range nodes {
node.Scores = nil
node.FinalScore = 0
}
static = NewStaticRankIterator(ctx, nodes)
spreadIter = NewSpreadIterator(ctx, static)
spreadIter.SetJob(job)
spreadIter.SetTaskGroup(tg)
scoreNorm = NewScoreNormalizationIterator(ctx, spreadIter)
out = collectRanked(scoreNorm)
// Expect nodes in dc2 with existing allocs to get a boost
// DC1 nodes are not boosted because there are enough allocs to meet
// the desired count
expectedScores = map[string]float64{
"dc1": 0,
"dc2": 0.5,
}
for _, rn := range out {
require.Equal(t, expectedScores[rn.Node.Datacenter], rn.FinalScore)
}
}
func TestSpreadIterator_MultipleAttributes(t *testing.T) {
state, ctx := testContext(t)
dcs := []string{"dc1", "dc2", "dc1", "dc1"}
rack := []string{"r1", "r1", "r2", "r2"}
var nodes []*RankedNode
// Add these nodes to the state store
for i, dc := range dcs {
node := mock.Node()
node.Datacenter = dc
node.Meta["rack"] = rack[i]
if err := state.UpsertNode(uint64(100+i), node); err != nil {
t.Fatalf("failed to upsert node: %v", err)
}
nodes = append(nodes, &RankedNode{Node: node})
}
static := NewStaticRankIterator(ctx, nodes)
job := mock.Job()
tg := job.TaskGroups[0]
job.TaskGroups[0].Count = 10
// add allocs to nodes in dc1
upserting := []*structs.Allocation{
{
Namespace: structs.DefaultNamespace,
TaskGroup: tg.Name,
JobID: job.ID,
Job: job,
ID: uuid.Generate(),
EvalID: uuid.Generate(),
NodeID: nodes[0].Node.ID,
},
{
Namespace: structs.DefaultNamespace,
TaskGroup: tg.Name,
JobID: job.ID,
Job: job,
ID: uuid.Generate(),
EvalID: uuid.Generate(),
NodeID: nodes[2].Node.ID,
},
}
if err := state.UpsertAllocs(1000, upserting); err != nil {
t.Fatalf("failed to UpsertAllocs: %v", err)
}
spread1 := &structs.Spread{
Weight: 100,
Attribute: "${node.datacenter}",
SpreadTarget: []*structs.SpreadTarget{
{
Value: "dc1",
Percent: 60,
},
{
Value: "dc2",
Percent: 40,
},
},
}
spread2 := &structs.Spread{
Weight: 50,
Attribute: "${meta.rack}",
SpreadTarget: []*structs.SpreadTarget{
{
Value: "r1",
Percent: 40,
},
{
Value: "r2",
Percent: 60,
},
},
}
tg.Spreads = []*structs.Spread{spread1, spread2}
spreadIter := NewSpreadIterator(ctx, static)
spreadIter.SetJob(job)
spreadIter.SetTaskGroup(tg)
scoreNorm := NewScoreNormalizationIterator(ctx, spreadIter)
out := collectRanked(scoreNorm)
// Score comes from combining two different spread factors
// Second node should have the highest score because it has no allocs and its in dc2/r1
expectedScores := map[string]float64{
nodes[0].Node.ID: 0.500,
nodes[1].Node.ID: 0.667,
nodes[2].Node.ID: 0.556,
nodes[3].Node.ID: 0.556,
}
for _, rn := range out {
require.Equal(t, fmt.Sprintf("%.3f", expectedScores[rn.Node.ID]), fmt.Sprintf("%.3f", rn.FinalScore))
}
}
func TestSpreadIterator_EvenSpread(t *testing.T) {
state, ctx := testContext(t)
dcs := []string{"dc1", "dc2", "dc1", "dc2", "dc1", "dc2", "dc2", "dc1", "dc1", "dc1"}
var nodes []*RankedNode
// Add these nodes to the state store
for i, dc := range dcs {
node := mock.Node()
node.Datacenter = dc
if err := state.UpsertNode(uint64(100+i), node); err != nil {
t.Fatalf("failed to upsert node: %v", err)
}
nodes = append(nodes, &RankedNode{Node: node})
}
static := NewStaticRankIterator(ctx, nodes)
job := mock.Job()
tg := job.TaskGroups[0]
job.TaskGroups[0].Count = 10
// Configure even spread across node.datacenter
spread := &structs.Spread{
Weight: 100,
Attribute: "${node.datacenter}",
}
tg.Spreads = []*structs.Spread{spread}
spreadIter := NewSpreadIterator(ctx, static)
spreadIter.SetJob(job)
spreadIter.SetTaskGroup(tg)
scoreNorm := NewScoreNormalizationIterator(ctx, spreadIter)
out := collectRanked(scoreNorm)
// Nothing placed so both dc nodes get 0 as the score
expectedScores := map[string]float64{
"dc1": 0,
"dc2": 0,
}
for _, rn := range out {
require.Equal(t, fmt.Sprintf("%.3f", expectedScores[rn.Node.Datacenter]), fmt.Sprintf("%.3f", rn.FinalScore))
}
// Update the plan to add allocs to nodes in dc1
// After this step dc2 nodes should get boosted
ctx.plan.NodeAllocation[nodes[0].Node.ID] = []*structs.Allocation{
{
Namespace: structs.DefaultNamespace,
TaskGroup: tg.Name,
JobID: job.ID,
Job: job,
ID: uuid.Generate(),
NodeID: nodes[0].Node.ID,
},
}
ctx.plan.NodeAllocation[nodes[2].Node.ID] = []*structs.Allocation{
{
Namespace: structs.DefaultNamespace,
TaskGroup: tg.Name,
JobID: job.ID,
Job: job,
ID: uuid.Generate(),
NodeID: nodes[2].Node.ID,
},
}
// Reset the scores
for _, node := range nodes {
node.Scores = nil
node.FinalScore = 0
}
static = NewStaticRankIterator(ctx, nodes)
spreadIter = NewSpreadIterator(ctx, static)
spreadIter.SetJob(job)
spreadIter.SetTaskGroup(tg)
scoreNorm = NewScoreNormalizationIterator(ctx, spreadIter)
out = collectRanked(scoreNorm)
// Expect nodes in dc2 with existing allocs to get a boost
// dc1 nodes are penalized because they have allocs
expectedScores = map[string]float64{
"dc1": -1,
"dc2": 1,
}
for _, rn := range out {
require.Equal(t, expectedScores[rn.Node.Datacenter], rn.FinalScore)
}
// Update the plan to add more allocs to nodes in dc2
// After this step dc1 nodes should get boosted
ctx.plan.NodeAllocation[nodes[1].Node.ID] = []*structs.Allocation{
{
Namespace: structs.DefaultNamespace,
TaskGroup: tg.Name,
JobID: job.ID,
Job: job,
ID: uuid.Generate(),
NodeID: nodes[1].Node.ID,
},
{
Namespace: structs.DefaultNamespace,
TaskGroup: tg.Name,
JobID: job.ID,
Job: job,
ID: uuid.Generate(),
NodeID: nodes[1].Node.ID,
},
}
ctx.plan.NodeAllocation[nodes[3].Node.ID] = []*structs.Allocation{
{
Namespace: structs.DefaultNamespace,
TaskGroup: tg.Name,
JobID: job.ID,
Job: job,
ID: uuid.Generate(),
NodeID: nodes[3].Node.ID,
},
}
// Reset the scores
for _, node := range nodes {
node.Scores = nil
node.FinalScore = 0
}
static = NewStaticRankIterator(ctx, nodes)
spreadIter = NewSpreadIterator(ctx, static)
spreadIter.SetJob(job)
spreadIter.SetTaskGroup(tg)
scoreNorm = NewScoreNormalizationIterator(ctx, spreadIter)
out = collectRanked(scoreNorm)
// Expect nodes in dc2 to be penalized because there are 3 allocs there now
// dc1 nodes are boosted because that has 2 allocs
expectedScores = map[string]float64{
"dc1": 0.5,
"dc2": -0.5,
}
for _, rn := range out {
require.Equal(t, fmt.Sprintf("%3.3f", expectedScores[rn.Node.Datacenter]), fmt.Sprintf("%3.3f", rn.FinalScore))
}
// Add another node in dc3
node := mock.Node()
node.Datacenter = "dc3"
if err := state.UpsertNode(uint64(1111), node); err != nil {
t.Fatalf("failed to upsert node: %v", err)
}
nodes = append(nodes, &RankedNode{Node: node})
// Add another alloc to dc1, now its count matches dc2
ctx.plan.NodeAllocation[nodes[4].Node.ID] = []*structs.Allocation{
{
Namespace: structs.DefaultNamespace,
TaskGroup: tg.Name,
JobID: job.ID,
Job: job,
ID: uuid.Generate(),
NodeID: nodes[4].Node.ID,
},
}
// Reset scores
for _, node := range nodes {
node.Scores = nil
node.FinalScore = 0
}
static = NewStaticRankIterator(ctx, nodes)
spreadIter = NewSpreadIterator(ctx, static)
spreadIter.SetJob(job)
spreadIter.SetTaskGroup(tg)
scoreNorm = NewScoreNormalizationIterator(ctx, spreadIter)
out = collectRanked(scoreNorm)
// Expect dc1 and dc2 to be penalized because they have 3 allocs
// dc3 should get a boost because it has 0 allocs
expectedScores = map[string]float64{
"dc1": -1,
"dc2": -1,
"dc3": 1,
}
for _, rn := range out {
require.Equal(t, fmt.Sprintf("%.3f", expectedScores[rn.Node.Datacenter]), fmt.Sprintf("%.3f", rn.FinalScore))
}
}
// Test scenarios where the spread iterator sets maximum penalty (-1.0)
func TestSpreadIterator_MaxPenalty(t *testing.T) {
state, ctx := testContext(t)
var nodes []*RankedNode
// Add nodes in dc3 to the state store
for i := 0; i < 5; i++ {
node := mock.Node()
node.Datacenter = "dc3"
if err := state.UpsertNode(uint64(100+i), node); err != nil {
t.Fatalf("failed to upsert node: %v", err)
}
nodes = append(nodes, &RankedNode{Node: node})
}
static := NewStaticRankIterator(ctx, nodes)
job := mock.Job()
tg := job.TaskGroups[0]
job.TaskGroups[0].Count = 5
// Create spread target of 80% in dc1
// and 20% in dc2
spread := &structs.Spread{
Weight: 100,
Attribute: "${node.datacenter}",
SpreadTarget: []*structs.SpreadTarget{
{
Value: "dc1",
Percent: 80,
},
{
Value: "dc2",
Percent: 20,
},
},
}
tg.Spreads = []*structs.Spread{spread}
spreadIter := NewSpreadIterator(ctx, static)
spreadIter.SetJob(job)
spreadIter.SetTaskGroup(tg)
scoreNorm := NewScoreNormalizationIterator(ctx, spreadIter)
out := collectRanked(scoreNorm)
// All nodes are in dc3 so score should be -1
for _, rn := range out {
require.Equal(t, -1.0, rn.FinalScore)
}
// Reset scores
for _, node := range nodes {
node.Scores = nil
node.FinalScore = 0
}
// Create spread on attribute that doesn't exist on any nodes
spread = &structs.Spread{
Weight: 100,
Attribute: "${meta.foo}",
SpreadTarget: []*structs.SpreadTarget{
{
Value: "bar",
Percent: 80,
},
{
Value: "baz",
Percent: 20,
},
},
}
tg.Spreads = []*structs.Spread{spread}
static = NewStaticRankIterator(ctx, nodes)
spreadIter = NewSpreadIterator(ctx, static)
spreadIter.SetJob(job)
spreadIter.SetTaskGroup(tg)
scoreNorm = NewScoreNormalizationIterator(ctx, spreadIter)
out = collectRanked(scoreNorm)
// All nodes don't have the spread attribute so score should be -1
for _, rn := range out {
require.Equal(t, -1.0, rn.FinalScore)
}
}

View File

@ -8,23 +8,10 @@ import (
)
const (
// serviceJobAntiAffinityPenalty is the penalty applied
// to the score for placing an alloc on a node that
// already has an alloc for this job.
serviceJobAntiAffinityPenalty = 20.0
// batchJobAntiAffinityPenalty is the same as the
// serviceJobAntiAffinityPenalty but for batch type jobs.
batchJobAntiAffinityPenalty = 10.0
// previousFailedAllocNodePenalty is a scoring penalty for nodes
// that a failed allocation was previously run on
previousFailedAllocNodePenalty = 50.0
// skipScoreThreshold is a threshold used in the limit iterator to skip nodes
// that have a score lower than this. -10 is the highest possible score for a
// node with penalty (based on batchJobAntiAffinityPenalty)
skipScoreThreshold = -10.0
// that have a score lower than this. -1 is the lowest possible score for a
// node with penalties (based on job anti affinity and node rescheduling penalties
skipScoreThreshold = 0.0
// maxSkip limits the number of nodes that can be skipped in the limit iterator
maxSkip = 3
@ -66,9 +53,12 @@ type GenericStack struct {
distinctPropertyConstraint *DistinctPropertyIterator
binPack *BinPackIterator
jobAntiAff *JobAntiAffinityIterator
nodeAntiAff *NodeAntiAffinityIterator
nodeReschedulingPenalty *NodeReschedulingPenaltyIterator
limit *LimitIterator
maxScore *MaxScoreIterator
nodeAffinity *NodeAffinityIterator
spread *SpreadIterator
scoreNorm *ScoreNormalizationIterator
}
// NewGenericStack constructs a stack used for selecting service placements
@ -121,18 +111,19 @@ func NewGenericStack(batch bool, ctx Context) *GenericStack {
s.binPack = NewBinPackIterator(ctx, rankSource, evict, 0)
// Apply the job anti-affinity iterator. This is to avoid placing
// multiple allocations on the same node for this job. The penalty
// is less for batch jobs as it matters less.
penalty := serviceJobAntiAffinityPenalty
if batch {
penalty = batchJobAntiAffinityPenalty
}
s.jobAntiAff = NewJobAntiAffinityIterator(ctx, s.binPack, penalty, "")
// multiple allocations on the same node for this job.
s.jobAntiAff = NewJobAntiAffinityIterator(ctx, s.binPack, "")
s.nodeAntiAff = NewNodeAntiAffinityIterator(ctx, s.jobAntiAff, previousFailedAllocNodePenalty)
s.nodeReschedulingPenalty = NewNodeReschedulingPenaltyIterator(ctx, s.jobAntiAff)
s.nodeAffinity = NewNodeAffinityIterator(ctx, s.nodeReschedulingPenalty)
s.spread = NewSpreadIterator(ctx, s.nodeAffinity)
s.scoreNorm = NewScoreNormalizationIterator(ctx, s.spread)
// Apply a limit function. This is to avoid scanning *every* possible node.
s.limit = NewLimitIterator(ctx, s.nodeAntiAff, 2, skipScoreThreshold, maxSkip)
s.limit = NewLimitIterator(ctx, s.scoreNorm, 2, skipScoreThreshold, maxSkip)
// Select the node with the maximum score for placement
s.maxScore = NewMaxScoreIterator(ctx, s.limit)
@ -166,7 +157,9 @@ func (s *GenericStack) SetJob(job *structs.Job) {
s.distinctHostsConstraint.SetJob(job)
s.distinctPropertyConstraint.SetJob(job)
s.binPack.SetPriority(job.Priority)
s.jobAntiAff.SetJob(job.ID)
s.jobAntiAff.SetJob(job)
s.nodeAffinity.SetJob(job)
s.spread.SetJob(job)
s.ctx.Eligibility().SetJob(job)
if contextual, ok := s.quota.(ContextualIterator); ok {
@ -206,8 +199,15 @@ func (s *GenericStack) Select(tg *structs.TaskGroup, options *SelectOptions) (*R
s.distinctPropertyConstraint.SetTaskGroup(tg)
s.wrappedChecks.SetTaskGroup(tg.Name)
s.binPack.SetTaskGroup(tg)
s.jobAntiAff.SetTaskGroup(tg)
if options != nil {
s.nodeAntiAff.SetPenaltyNodes(options.PenaltyNodeIDs)
s.nodeReschedulingPenalty.SetPenaltyNodes(options.PenaltyNodeIDs)
}
s.nodeAffinity.SetTaskGroup(tg)
s.spread.SetTaskGroup(tg)
if s.nodeAffinity.hasAffinities() || s.spread.hasSpreads() {
s.limit.SetLimit(math.MaxInt32)
}
if contextual, ok := s.quota.(ContextualIterator); ok {
@ -241,6 +241,7 @@ type SystemStack struct {
taskGroupConstraint *ConstraintChecker
distinctPropertyConstraint *DistinctPropertyIterator
binPack *BinPackIterator
scoreNorm *ScoreNormalizationIterator
}
// NewSystemStack constructs a stack used for selecting service placements
@ -283,6 +284,9 @@ func NewSystemStack(ctx Context) *SystemStack {
// by a particular task group. Enable eviction as system jobs are high
// priority.
s.binPack = NewBinPackIterator(ctx, rankSource, true, 0)
// Apply score normalization
s.scoreNorm = NewScoreNormalizationIterator(ctx, s.binPack)
return s
}
@ -304,7 +308,7 @@ func (s *SystemStack) SetJob(job *structs.Job) {
func (s *SystemStack) Select(tg *structs.TaskGroup, options *SelectOptions) (*RankedNode, *structs.Resources) {
// Reset the binpack selector and context
s.binPack.Reset()
s.scoreNorm.Reset()
s.ctx.Reset()
start := time.Now()
@ -323,7 +327,7 @@ func (s *SystemStack) Select(tg *structs.TaskGroup, options *SelectOptions) (*Ra
}
// Get the next option that satisfies the constraints.
option := s.binPack.Next()
option := s.scoreNorm.Next()
// Ensure that the task resources were specified
if option != nil && len(option.TaskResources) != len(tg.Tasks) {

View File

@ -295,6 +295,7 @@ func TestServiceStack_Select_BinPack_Overflow(t *testing.T) {
stack.SetJob(job)
selectOptions := &SelectOptions{}
node, _ := stack.Select(job.TaskGroups[0], selectOptions)
ctx.Metrics().PopulateScoreMetaData()
if node == nil {
t.Fatalf("missing node %#v", ctx.Metrics())
}
@ -310,7 +311,8 @@ func TestServiceStack_Select_BinPack_Overflow(t *testing.T) {
if met.ClassExhausted["linux-medium-pci"] != 1 {
t.Fatalf("bad: %#v", met)
}
if len(met.Scores) != 1 {
// Expect score metadata for one node
if len(met.ScoreMetaData) != 1 {
t.Fatalf("bad: %#v", met)
}
}
@ -516,6 +518,7 @@ func TestSystemStack_Select_BinPack_Overflow(t *testing.T) {
selectOptions := &SelectOptions{}
node, _ := stack.Select(job.TaskGroups[0], selectOptions)
ctx.Metrics().PopulateScoreMetaData()
if node == nil {
t.Fatalf("missing node %#v", ctx.Metrics())
}
@ -531,7 +534,8 @@ func TestSystemStack_Select_BinPack_Overflow(t *testing.T) {
if met.ClassExhausted["linux-medium-pci"] != 1 {
t.Fatalf("bad: %#v", met)
}
if len(met.Scores) != 1 {
// Should have two scores, one from bin packing and one from normalization
if len(met.ScoreMetaData) != 1 {
t.Fatalf("bad: %#v", met)
}
}

View File

@ -304,6 +304,9 @@ func (s *SystemScheduler) computePlacements(place []allocTuple) error {
// Store the available nodes by datacenter
s.ctx.Metrics().NodesAvailable = s.nodesByDC
// Compute top K scoring node metadata
s.ctx.Metrics().PopulateScoreMetaData()
// Set fields based on if we found an allocation option
if option != nil {
// Create an allocation for this