Open source Preemption code

Nomad 0.12 OSS is to include preemption feature.

This commit moves the private code for managing preemption to OSS
repository.
This commit is contained in:
Mahmood Ali 2020-05-27 15:02:01 -04:00
parent 65b2672f16
commit d9792777d9
6 changed files with 366 additions and 106 deletions

View File

@ -642,3 +642,49 @@ func (s *GenericScheduler) findPreferredNode(place placementResult) (*structs.No
}
return nil, nil
}
// selectNextOption calls the stack to get a node for placement
func (s *GenericScheduler) selectNextOption(tg *structs.TaskGroup, selectOptions *SelectOptions) *RankedNode {
option := s.stack.Select(tg, selectOptions)
_, schedConfig, _ := s.ctx.State().SchedulerConfig()
// Check if preemption is enabled, defaults to true
enablePreemption := true
if schedConfig != nil {
if s.job.Type == structs.JobTypeBatch {
enablePreemption = schedConfig.PreemptionConfig.BatchSchedulerEnabled
} else {
enablePreemption = schedConfig.PreemptionConfig.ServiceSchedulerEnabled
}
}
// Run stack again with preemption enabled
if option == nil && enablePreemption {
selectOptions.Preempt = true
option = s.stack.Select(tg, selectOptions)
}
return option
}
// handlePreemptions sets relevant preeemption related fields. In OSS this is a no op.
func (s *GenericScheduler) handlePreemptions(option *RankedNode, alloc *structs.Allocation, missing placementResult) {
if option.PreemptedAllocs == nil {
return
}
// If this placement involves preemption, set DesiredState to evict for those allocations
var preemptedAllocIDs []string
for _, stop := range option.PreemptedAllocs {
s.plan.AppendPreemptedAlloc(stop, alloc.ID)
preemptedAllocIDs = append(preemptedAllocIDs, stop.ID)
if s.eval.AnnotatePlan && s.plan.Annotations != nil {
s.plan.Annotations.PreemptedAllocs = append(s.plan.Annotations.PreemptedAllocs, stop.Stub())
if s.plan.Annotations.DesiredTGUpdates != nil {
desired := s.plan.Annotations.DesiredTGUpdates[missing.TaskGroup().Name]
desired.Preemptions += 1
}
}
}
alloc.PreemptedAllocations = preemptedAllocIDs
}

View File

@ -1,15 +0,0 @@
// +build !ent
package scheduler
import "github.com/hashicorp/nomad/nomad/structs"
// selectNextOption calls the stack to get a node for placement
func (s *GenericScheduler) selectNextOption(tg *structs.TaskGroup, selectOptions *SelectOptions) *RankedNode {
return s.stack.Select(tg, selectOptions)
}
// handlePreemptions sets relevant preeemption related fields. In OSS this is a no op.
func (s *GenericScheduler) handlePreemptions(option *RankedNode, alloc *structs.Allocation, missing placementResult) {
}

View File

@ -5183,3 +5183,161 @@ func Test_updateRescheduleTracker(t *testing.T) {
}
}
func TestServiceSched_Preemption(t *testing.T) {
require := require.New(t)
h := NewHarness(t)
// Create a node
node := mock.Node()
node.Resources = nil
node.ReservedResources = nil
node.NodeResources = &structs.NodeResources{
Cpu: structs.NodeCpuResources{
CpuShares: 1000,
},
Memory: structs.NodeMemoryResources{
MemoryMB: 2048,
},
Disk: structs.NodeDiskResources{
DiskMB: 100 * 1024,
},
Networks: []*structs.NetworkResource{
{
Device: "eth0",
CIDR: "192.168.0.100/32",
MBits: 1000,
},
},
}
node.ReservedResources = &structs.NodeReservedResources{
Cpu: structs.NodeReservedCpuResources{
CpuShares: 50,
},
Memory: structs.NodeReservedMemoryResources{
MemoryMB: 256,
},
Disk: structs.NodeReservedDiskResources{
DiskMB: 4 * 1024,
},
Networks: structs.NodeReservedNetworkResources{
ReservedHostPorts: "22",
},
}
require.NoError(h.State.UpsertNode(h.NextIndex(), node))
// Create a couple of jobs and schedule them
job1 := mock.Job()
job1.TaskGroups[0].Count = 1
job1.Priority = 30
r1 := job1.TaskGroups[0].Tasks[0].Resources
r1.CPU = 500
r1.MemoryMB = 1024
r1.Networks = nil
require.NoError(h.State.UpsertJob(h.NextIndex(), job1))
job2 := mock.Job()
job2.TaskGroups[0].Count = 1
job2.Priority = 50
r2 := job2.TaskGroups[0].Tasks[0].Resources
r2.CPU = 350
r2.MemoryMB = 512
r2.Networks = nil
require.NoError(h.State.UpsertJob(h.NextIndex(), job2))
// Create a mock evaluation to register the jobs
eval1 := &structs.Evaluation{
Namespace: structs.DefaultNamespace,
ID: uuid.Generate(),
Priority: job1.Priority,
TriggeredBy: structs.EvalTriggerJobRegister,
JobID: job1.ID,
Status: structs.EvalStatusPending,
}
eval2 := &structs.Evaluation{
Namespace: structs.DefaultNamespace,
ID: uuid.Generate(),
Priority: job2.Priority,
TriggeredBy: structs.EvalTriggerJobRegister,
JobID: job2.ID,
Status: structs.EvalStatusPending,
}
require.NoError(h.State.UpsertEvals(h.NextIndex(), []*structs.Evaluation{eval1, eval2}))
expectedPreemptedAllocs := make(map[string]struct{})
// Process the two evals for job1 and job2 and make sure they allocated
for index, eval := range []*structs.Evaluation{eval1, eval2} {
// Process the evaluation
err := h.Process(NewServiceScheduler, eval)
require.Nil(err)
plan := h.Plans[index]
// Ensure the plan doesn't have annotations.
require.Nil(plan.Annotations)
// Ensure the eval has no spawned blocked eval
require.Equal(0, len(h.CreateEvals))
// Ensure the plan allocated
var planned []*structs.Allocation
for _, allocList := range plan.NodeAllocation {
planned = append(planned, allocList...)
}
require.Equal(1, len(planned))
expectedPreemptedAllocs[planned[0].ID] = struct{}{}
}
// Create a higher priority job
job3 := mock.Job()
job3.Priority = 100
job3.TaskGroups[0].Count = 1
r3 := job3.TaskGroups[0].Tasks[0].Resources
r3.CPU = 900
r3.MemoryMB = 1700
r3.Networks = nil
require.NoError(h.State.UpsertJob(h.NextIndex(), job3))
// Create a mock evaluation to register the job
eval := &structs.Evaluation{
Namespace: structs.DefaultNamespace,
ID: uuid.Generate(),
Priority: job3.Priority,
TriggeredBy: structs.EvalTriggerJobRegister,
JobID: job3.ID,
Status: structs.EvalStatusPending,
}
require.NoError(h.State.UpsertEvals(h.NextIndex(), []*structs.Evaluation{eval}))
// Process the evaluation
err := h.Process(NewServiceScheduler, eval)
require.Nil(err)
// New plan should be the third one in the harness
plan := h.Plans[2]
// Ensure the eval has no spawned blocked eval
require.Equal(0, len(h.CreateEvals))
// Ensure the plan allocated
var planned []*structs.Allocation
for _, allocList := range plan.NodeAllocation {
planned = append(planned, allocList...)
}
require.Equal(1, len(planned))
// Lookup the allocations by JobID
ws := memdb.NewWatchSet()
out, err := h.State.AllocsByJob(ws, job3.Namespace, job3.ID, false)
require.NoError(err)
// Ensure all allocations placed
require.Equal(1, len(out))
actualPreemptedAllocs := make(map[string]struct{})
for _, id := range out[0].PreemptedAllocations {
actualPreemptedAllocs[id] = struct{}{}
}
require.Equal(expectedPreemptedAllocs, actualPreemptedAllocs)
}

View File

@ -11,6 +11,13 @@ const (
// binPackingMaxFitScore is the maximum possible bin packing fitness score.
// This is used to normalize bin packing score to a value between 0 and 1
binPackingMaxFitScore = 18.0
// These values were chosen such that a net priority of 2048 would get a preemption score of 0.5
// rate is the decay parameter of the logistic function used in scoring preemption options
rate = 0.0048
// origin controls the inflection point of the logistic function used in scoring preemption options
origin = 2048.0
)
// Rank is used to provide a score and various ranking metadata
@ -705,3 +712,69 @@ func (iter *ScoreNormalizationIterator) Next() *RankedNode {
iter.ctx.Metrics().ScoreNode(option.Node, "normalized-score", option.FinalScore)
return option
}
// PreemptionScoringIterator is used to score nodes according to the
// combination of preemptible allocations in them
type PreemptionScoringIterator struct {
ctx Context
source RankIterator
}
// PreemptionScoringIterator is used to create a score based on net aggregate priority
// of preempted allocations
func NewPreemptionScoringIterator(ctx Context, source RankIterator) RankIterator {
return &PreemptionScoringIterator{
ctx: ctx,
source: source,
}
}
func (iter *PreemptionScoringIterator) Reset() {
iter.source.Reset()
}
func (iter *PreemptionScoringIterator) Next() *RankedNode {
option := iter.source.Next()
if option == nil || option.PreemptedAllocs == nil {
return option
}
netPriority := netPriority(option.PreemptedAllocs)
// preemption score is inversely proportional to netPriority
preemptionScore := preemptionScore(netPriority)
option.Scores = append(option.Scores, preemptionScore)
iter.ctx.Metrics().ScoreNode(option.Node, "preemption", preemptionScore)
return option
}
// netPriority is a scoring heuristic that represents a combination of two factors.
// First factor is the max priority in the set of allocations, with
// an additional factor that takes into account the individual priorities of allocations
func netPriority(allocs []*structs.Allocation) float64 {
sumPriority := 0
max := 0.0
for _, alloc := range allocs {
if float64(alloc.Job.Priority) > max {
max = float64(alloc.Job.Priority)
}
sumPriority += alloc.Job.Priority
}
// We use the maximum priority across all allocations
// with an additional penalty that increases proportional to the
// ratio of the sum by max
// This ensures that we penalize nodes that have a low max but a high
// number of preemptible allocations
ret := max + (float64(sumPriority) / max)
return ret
}
// preemptionScore is calculated using a logistic function
// see https://www.desmos.com/calculator/alaeiuaiey for a visual representation of the curve.
// Lower values of netPriority get a score closer to 1 and the inflection point is around 2048
// The score is modelled to be between 0 and 1 because its combined with other
// scoring factors like bin packing
func preemptionScore(netPriority float64) float64 {
// This function manifests as an s curve that asympotically moves towards zero for large values of netPriority
return 1.0 / (1 + math.Exp(rate*(netPriority-origin)))
}

View File

@ -296,3 +296,92 @@ func (s *SystemStack) Select(tg *structs.TaskGroup, options *SelectOptions) *Ran
s.ctx.Metrics().AllocationTime = time.Since(start)
return option
}
// NewGenericStack constructs a stack used for selecting service placements
func NewGenericStack(batch bool, ctx Context) *GenericStack {
// Create a new stack
s := &GenericStack{
batch: batch,
ctx: ctx,
}
// Create the source iterator. We randomize the order we visit nodes
// to reduce collisions between schedulers and to do a basic load
// balancing across eligible nodes.
s.source = NewRandomIterator(ctx, nil)
// Create the quota iterator to determine if placements would result in the
// quota attached to the namespace of the job to go over.
s.quota = NewQuotaIterator(ctx, s.source)
// Attach the job constraints. The job is filled in later.
s.jobConstraint = NewConstraintChecker(ctx, nil)
// Filter on task group drivers first as they are faster
s.taskGroupDrivers = NewDriverChecker(ctx, nil)
// Filter on task group constraints second
s.taskGroupConstraint = NewConstraintChecker(ctx, nil)
// Filter on task group devices
s.taskGroupDevices = NewDeviceChecker(ctx)
// Filter on task group host volumes
s.taskGroupHostVolumes = NewHostVolumeChecker(ctx)
// Filter on available, healthy CSI plugins
s.taskGroupCSIVolumes = NewCSIVolumeChecker(ctx)
// Create the feasibility wrapper which wraps all feasibility checks in
// which feasibility checking can be skipped if the computed node class has
// previously been marked as eligible or ineligible. Generally this will be
// checks that only needs to examine the single node to determine feasibility.
jobs := []FeasibilityChecker{s.jobConstraint}
tgs := []FeasibilityChecker{s.taskGroupDrivers,
s.taskGroupConstraint,
s.taskGroupHostVolumes,
s.taskGroupDevices}
avail := []FeasibilityChecker{s.taskGroupCSIVolumes}
s.wrappedChecks = NewFeasibilityWrapper(ctx, s.quota, jobs, tgs, avail)
// Filter on distinct host constraints.
s.distinctHostsConstraint = NewDistinctHostsIterator(ctx, s.wrappedChecks)
// Filter on distinct property constraints.
s.distinctPropertyConstraint = NewDistinctPropertyIterator(ctx, s.distinctHostsConstraint)
// Upgrade from feasible to rank iterator
rankSource := NewFeasibleRankIterator(ctx, s.distinctPropertyConstraint)
// Apply the bin packing, this depends on the resources needed
// by a particular task group.
_, schedConfig, _ := ctx.State().SchedulerConfig()
s.binPack = NewBinPackIterator(ctx, rankSource, false, 0, schedConfig.EffectiveSchedulerAlgorithm())
// Apply the job anti-affinity iterator. This is to avoid placing
// multiple allocations on the same node for this job.
s.jobAntiAff = NewJobAntiAffinityIterator(ctx, s.binPack, "")
// Apply node rescheduling penalty. This tries to avoid placing on a
// node where the allocation failed previously
s.nodeReschedulingPenalty = NewNodeReschedulingPenaltyIterator(ctx, s.jobAntiAff)
// Apply scores based on affinity stanza
s.nodeAffinity = NewNodeAffinityIterator(ctx, s.nodeReschedulingPenalty)
// Apply scores based on spread stanza
s.spread = NewSpreadIterator(ctx, s.nodeAffinity)
// Add the preemption options scoring iterator
preemptionScorer := NewPreemptionScoringIterator(ctx, s.spread)
// Normalizes scores by averaging them across various scorers
s.scoreNorm = NewScoreNormalizationIterator(ctx, preemptionScorer)
// Apply a limit function. This is to avoid scanning *every* possible node.
s.limit = NewLimitIterator(ctx, s.scoreNorm, 2, skipScoreThreshold, maxSkip)
// Select the node with the maximum score for placement
s.maxScore = NewMaxScoreIterator(ctx, s.limit)
return s
}

View File

@ -1,91 +0,0 @@
// +build !ent
package scheduler
// NewGenericStack constructs a stack used for selecting service placements
func NewGenericStack(batch bool, ctx Context) *GenericStack {
// Create a new stack
s := &GenericStack{
batch: batch,
ctx: ctx,
}
// Create the source iterator. We randomize the order we visit nodes
// to reduce collisions between schedulers and to do a basic load
// balancing across eligible nodes.
s.source = NewRandomIterator(ctx, nil)
// Create the quota iterator to determine if placements would result in the
// quota attached to the namespace of the job to go over.
s.quota = NewQuotaIterator(ctx, s.source)
// Attach the job constraints. The job is filled in later.
s.jobConstraint = NewConstraintChecker(ctx, nil)
// Filter on task group drivers first as they are faster
s.taskGroupDrivers = NewDriverChecker(ctx, nil)
// Filter on task group constraints second
s.taskGroupConstraint = NewConstraintChecker(ctx, nil)
// Filter on task group devices
s.taskGroupDevices = NewDeviceChecker(ctx)
// Filter on task group host volumes
s.taskGroupHostVolumes = NewHostVolumeChecker(ctx)
// Filter on available, healthy CSI plugins
s.taskGroupCSIVolumes = NewCSIVolumeChecker(ctx)
// Create the feasibility wrapper which wraps all feasibility checks in
// which feasibility checking can be skipped if the computed node class has
// previously been marked as eligible or ineligible. Generally this will be
// checks that only needs to examine the single node to determine feasibility.
jobs := []FeasibilityChecker{s.jobConstraint}
tgs := []FeasibilityChecker{s.taskGroupDrivers,
s.taskGroupConstraint,
s.taskGroupHostVolumes,
s.taskGroupDevices}
avail := []FeasibilityChecker{s.taskGroupCSIVolumes}
s.wrappedChecks = NewFeasibilityWrapper(ctx, s.quota, jobs, tgs, avail)
// Filter on distinct host constraints.
s.distinctHostsConstraint = NewDistinctHostsIterator(ctx, s.wrappedChecks)
// Filter on distinct property constraints.
s.distinctPropertyConstraint = NewDistinctPropertyIterator(ctx, s.distinctHostsConstraint)
// Upgrade from feasible to rank iterator
rankSource := NewFeasibleRankIterator(ctx, s.distinctPropertyConstraint)
// Apply the bin packing, this depends on the resources needed
// by a particular task group.
_, schedConfig, _ := s.ctx.State().SchedulerConfig()
schedulerAlgorithm := schedConfig.EffectiveSchedulerAlgorithm()
s.binPack = NewBinPackIterator(ctx, rankSource, false, 0, schedulerAlgorithm)
// Apply the job anti-affinity iterator. This is to avoid placing
// multiple allocations on the same node for this job.
s.jobAntiAff = NewJobAntiAffinityIterator(ctx, s.binPack, "")
// Apply node rescheduling penalty. This tries to avoid placing on a
// node where the allocation failed previously
s.nodeReschedulingPenalty = NewNodeReschedulingPenaltyIterator(ctx, s.jobAntiAff)
// Apply scores based on affinity stanza
s.nodeAffinity = NewNodeAffinityIterator(ctx, s.nodeReschedulingPenalty)
// Apply scores based on spread stanza
s.spread = NewSpreadIterator(ctx, s.nodeAffinity)
// Normalizes scores by averaging them across various scorers
s.scoreNorm = NewScoreNormalizationIterator(ctx, s.spread)
// Apply a limit function. This is to avoid scanning *every* possible node.
s.limit = NewLimitIterator(ctx, s.scoreNorm, 2, skipScoreThreshold, maxSkip)
// Select the node with the maximum score for placement
s.maxScore = NewMaxScoreIterator(ctx, s.limit)
return s
}