diff --git a/scheduler/generic_sched.go b/scheduler/generic_sched.go index 6e42c297a..4da71ccf7 100644 --- a/scheduler/generic_sched.go +++ b/scheduler/generic_sched.go @@ -642,3 +642,49 @@ func (s *GenericScheduler) findPreferredNode(place placementResult) (*structs.No } return nil, nil } + +// selectNextOption calls the stack to get a node for placement +func (s *GenericScheduler) selectNextOption(tg *structs.TaskGroup, selectOptions *SelectOptions) *RankedNode { + option := s.stack.Select(tg, selectOptions) + _, schedConfig, _ := s.ctx.State().SchedulerConfig() + + // Check if preemption is enabled, defaults to true + enablePreemption := true + if schedConfig != nil { + if s.job.Type == structs.JobTypeBatch { + enablePreemption = schedConfig.PreemptionConfig.BatchSchedulerEnabled + } else { + enablePreemption = schedConfig.PreemptionConfig.ServiceSchedulerEnabled + } + } + // Run stack again with preemption enabled + if option == nil && enablePreemption { + selectOptions.Preempt = true + option = s.stack.Select(tg, selectOptions) + } + return option +} + +// handlePreemptions sets relevant preeemption related fields. In OSS this is a no op. +func (s *GenericScheduler) handlePreemptions(option *RankedNode, alloc *structs.Allocation, missing placementResult) { + if option.PreemptedAllocs == nil { + return + } + + // If this placement involves preemption, set DesiredState to evict for those allocations + var preemptedAllocIDs []string + for _, stop := range option.PreemptedAllocs { + s.plan.AppendPreemptedAlloc(stop, alloc.ID) + preemptedAllocIDs = append(preemptedAllocIDs, stop.ID) + + if s.eval.AnnotatePlan && s.plan.Annotations != nil { + s.plan.Annotations.PreemptedAllocs = append(s.plan.Annotations.PreemptedAllocs, stop.Stub()) + if s.plan.Annotations.DesiredTGUpdates != nil { + desired := s.plan.Annotations.DesiredTGUpdates[missing.TaskGroup().Name] + desired.Preemptions += 1 + } + } + } + + alloc.PreemptedAllocations = preemptedAllocIDs +} diff --git a/scheduler/generic_sched_oss.go b/scheduler/generic_sched_oss.go deleted file mode 100644 index 9215a726c..000000000 --- a/scheduler/generic_sched_oss.go +++ /dev/null @@ -1,15 +0,0 @@ -// +build !ent - -package scheduler - -import "github.com/hashicorp/nomad/nomad/structs" - -// selectNextOption calls the stack to get a node for placement -func (s *GenericScheduler) selectNextOption(tg *structs.TaskGroup, selectOptions *SelectOptions) *RankedNode { - return s.stack.Select(tg, selectOptions) -} - -// handlePreemptions sets relevant preeemption related fields. In OSS this is a no op. -func (s *GenericScheduler) handlePreemptions(option *RankedNode, alloc *structs.Allocation, missing placementResult) { - -} diff --git a/scheduler/generic_sched_test.go b/scheduler/generic_sched_test.go index 779c38819..0a45a48da 100644 --- a/scheduler/generic_sched_test.go +++ b/scheduler/generic_sched_test.go @@ -5183,3 +5183,161 @@ func Test_updateRescheduleTracker(t *testing.T) { } } + +func TestServiceSched_Preemption(t *testing.T) { + require := require.New(t) + h := NewHarness(t) + + // Create a node + node := mock.Node() + node.Resources = nil + node.ReservedResources = nil + node.NodeResources = &structs.NodeResources{ + Cpu: structs.NodeCpuResources{ + CpuShares: 1000, + }, + Memory: structs.NodeMemoryResources{ + MemoryMB: 2048, + }, + Disk: structs.NodeDiskResources{ + DiskMB: 100 * 1024, + }, + Networks: []*structs.NetworkResource{ + { + Device: "eth0", + CIDR: "192.168.0.100/32", + MBits: 1000, + }, + }, + } + node.ReservedResources = &structs.NodeReservedResources{ + Cpu: structs.NodeReservedCpuResources{ + CpuShares: 50, + }, + Memory: structs.NodeReservedMemoryResources{ + MemoryMB: 256, + }, + Disk: structs.NodeReservedDiskResources{ + DiskMB: 4 * 1024, + }, + Networks: structs.NodeReservedNetworkResources{ + ReservedHostPorts: "22", + }, + } + require.NoError(h.State.UpsertNode(h.NextIndex(), node)) + + // Create a couple of jobs and schedule them + job1 := mock.Job() + job1.TaskGroups[0].Count = 1 + job1.Priority = 30 + r1 := job1.TaskGroups[0].Tasks[0].Resources + r1.CPU = 500 + r1.MemoryMB = 1024 + r1.Networks = nil + require.NoError(h.State.UpsertJob(h.NextIndex(), job1)) + + job2 := mock.Job() + job2.TaskGroups[0].Count = 1 + job2.Priority = 50 + r2 := job2.TaskGroups[0].Tasks[0].Resources + r2.CPU = 350 + r2.MemoryMB = 512 + r2.Networks = nil + require.NoError(h.State.UpsertJob(h.NextIndex(), job2)) + + // Create a mock evaluation to register the jobs + eval1 := &structs.Evaluation{ + Namespace: structs.DefaultNamespace, + ID: uuid.Generate(), + Priority: job1.Priority, + TriggeredBy: structs.EvalTriggerJobRegister, + JobID: job1.ID, + Status: structs.EvalStatusPending, + } + eval2 := &structs.Evaluation{ + Namespace: structs.DefaultNamespace, + ID: uuid.Generate(), + Priority: job2.Priority, + TriggeredBy: structs.EvalTriggerJobRegister, + JobID: job2.ID, + Status: structs.EvalStatusPending, + } + + require.NoError(h.State.UpsertEvals(h.NextIndex(), []*structs.Evaluation{eval1, eval2})) + + expectedPreemptedAllocs := make(map[string]struct{}) + // Process the two evals for job1 and job2 and make sure they allocated + for index, eval := range []*structs.Evaluation{eval1, eval2} { + // Process the evaluation + err := h.Process(NewServiceScheduler, eval) + require.Nil(err) + + plan := h.Plans[index] + + // Ensure the plan doesn't have annotations. + require.Nil(plan.Annotations) + + // Ensure the eval has no spawned blocked eval + require.Equal(0, len(h.CreateEvals)) + + // Ensure the plan allocated + var planned []*structs.Allocation + for _, allocList := range plan.NodeAllocation { + planned = append(planned, allocList...) + } + require.Equal(1, len(planned)) + expectedPreemptedAllocs[planned[0].ID] = struct{}{} + } + + // Create a higher priority job + job3 := mock.Job() + job3.Priority = 100 + job3.TaskGroups[0].Count = 1 + r3 := job3.TaskGroups[0].Tasks[0].Resources + r3.CPU = 900 + r3.MemoryMB = 1700 + r3.Networks = nil + require.NoError(h.State.UpsertJob(h.NextIndex(), job3)) + + // Create a mock evaluation to register the job + eval := &structs.Evaluation{ + Namespace: structs.DefaultNamespace, + ID: uuid.Generate(), + Priority: job3.Priority, + TriggeredBy: structs.EvalTriggerJobRegister, + JobID: job3.ID, + Status: structs.EvalStatusPending, + } + + require.NoError(h.State.UpsertEvals(h.NextIndex(), []*structs.Evaluation{eval})) + + // Process the evaluation + err := h.Process(NewServiceScheduler, eval) + require.Nil(err) + + // New plan should be the third one in the harness + plan := h.Plans[2] + + // Ensure the eval has no spawned blocked eval + require.Equal(0, len(h.CreateEvals)) + + // Ensure the plan allocated + var planned []*structs.Allocation + for _, allocList := range plan.NodeAllocation { + planned = append(planned, allocList...) + } + require.Equal(1, len(planned)) + + // Lookup the allocations by JobID + ws := memdb.NewWatchSet() + out, err := h.State.AllocsByJob(ws, job3.Namespace, job3.ID, false) + require.NoError(err) + + // Ensure all allocations placed + require.Equal(1, len(out)) + actualPreemptedAllocs := make(map[string]struct{}) + for _, id := range out[0].PreemptedAllocations { + actualPreemptedAllocs[id] = struct{}{} + } + require.Equal(expectedPreemptedAllocs, actualPreemptedAllocs) +} diff --git a/scheduler/rank.go b/scheduler/rank.go index 06ef9c452..dccd77f93 100644 --- a/scheduler/rank.go +++ b/scheduler/rank.go @@ -11,6 +11,13 @@ const ( // binPackingMaxFitScore is the maximum possible bin packing fitness score. // This is used to normalize bin packing score to a value between 0 and 1 binPackingMaxFitScore = 18.0 + + // These values were chosen such that a net priority of 2048 would get a preemption score of 0.5 + // rate is the decay parameter of the logistic function used in scoring preemption options + rate = 0.0048 + + // origin controls the inflection point of the logistic function used in scoring preemption options + origin = 2048.0 ) // Rank is used to provide a score and various ranking metadata @@ -705,3 +712,69 @@ func (iter *ScoreNormalizationIterator) Next() *RankedNode { iter.ctx.Metrics().ScoreNode(option.Node, "normalized-score", option.FinalScore) return option } + +// PreemptionScoringIterator is used to score nodes according to the +// combination of preemptible allocations in them +type PreemptionScoringIterator struct { + ctx Context + source RankIterator +} + +// PreemptionScoringIterator is used to create a score based on net aggregate priority +// of preempted allocations +func NewPreemptionScoringIterator(ctx Context, source RankIterator) RankIterator { + return &PreemptionScoringIterator{ + ctx: ctx, + source: source, + } +} + +func (iter *PreemptionScoringIterator) Reset() { + iter.source.Reset() +} + +func (iter *PreemptionScoringIterator) Next() *RankedNode { + option := iter.source.Next() + if option == nil || option.PreemptedAllocs == nil { + return option + } + + netPriority := netPriority(option.PreemptedAllocs) + // preemption score is inversely proportional to netPriority + preemptionScore := preemptionScore(netPriority) + option.Scores = append(option.Scores, preemptionScore) + iter.ctx.Metrics().ScoreNode(option.Node, "preemption", preemptionScore) + + return option +} + +// netPriority is a scoring heuristic that represents a combination of two factors. +// First factor is the max priority in the set of allocations, with +// an additional factor that takes into account the individual priorities of allocations +func netPriority(allocs []*structs.Allocation) float64 { + sumPriority := 0 + max := 0.0 + for _, alloc := range allocs { + if float64(alloc.Job.Priority) > max { + max = float64(alloc.Job.Priority) + } + sumPriority += alloc.Job.Priority + } + // We use the maximum priority across all allocations + // with an additional penalty that increases proportional to the + // ratio of the sum by max + // This ensures that we penalize nodes that have a low max but a high + // number of preemptible allocations + ret := max + (float64(sumPriority) / max) + return ret +} + +// preemptionScore is calculated using a logistic function +// see https://www.desmos.com/calculator/alaeiuaiey for a visual representation of the curve. +// Lower values of netPriority get a score closer to 1 and the inflection point is around 2048 +// The score is modelled to be between 0 and 1 because its combined with other +// scoring factors like bin packing +func preemptionScore(netPriority float64) float64 { + // This function manifests as an s curve that asympotically moves towards zero for large values of netPriority + return 1.0 / (1 + math.Exp(rate*(netPriority-origin))) +} diff --git a/scheduler/stack.go b/scheduler/stack.go index b9ceb19bb..b381e4c58 100644 --- a/scheduler/stack.go +++ b/scheduler/stack.go @@ -296,3 +296,92 @@ func (s *SystemStack) Select(tg *structs.TaskGroup, options *SelectOptions) *Ran s.ctx.Metrics().AllocationTime = time.Since(start) return option } + +// NewGenericStack constructs a stack used for selecting service placements +func NewGenericStack(batch bool, ctx Context) *GenericStack { + // Create a new stack + s := &GenericStack{ + batch: batch, + ctx: ctx, + } + + // Create the source iterator. We randomize the order we visit nodes + // to reduce collisions between schedulers and to do a basic load + // balancing across eligible nodes. + s.source = NewRandomIterator(ctx, nil) + + // Create the quota iterator to determine if placements would result in the + // quota attached to the namespace of the job to go over. + s.quota = NewQuotaIterator(ctx, s.source) + + // Attach the job constraints. The job is filled in later. + s.jobConstraint = NewConstraintChecker(ctx, nil) + + // Filter on task group drivers first as they are faster + s.taskGroupDrivers = NewDriverChecker(ctx, nil) + + // Filter on task group constraints second + s.taskGroupConstraint = NewConstraintChecker(ctx, nil) + + // Filter on task group devices + s.taskGroupDevices = NewDeviceChecker(ctx) + + // Filter on task group host volumes + s.taskGroupHostVolumes = NewHostVolumeChecker(ctx) + + // Filter on available, healthy CSI plugins + s.taskGroupCSIVolumes = NewCSIVolumeChecker(ctx) + + // Create the feasibility wrapper which wraps all feasibility checks in + // which feasibility checking can be skipped if the computed node class has + // previously been marked as eligible or ineligible. Generally this will be + // checks that only needs to examine the single node to determine feasibility. + jobs := []FeasibilityChecker{s.jobConstraint} + tgs := []FeasibilityChecker{s.taskGroupDrivers, + s.taskGroupConstraint, + s.taskGroupHostVolumes, + s.taskGroupDevices} + avail := []FeasibilityChecker{s.taskGroupCSIVolumes} + s.wrappedChecks = NewFeasibilityWrapper(ctx, s.quota, jobs, tgs, avail) + + // Filter on distinct host constraints. + s.distinctHostsConstraint = NewDistinctHostsIterator(ctx, s.wrappedChecks) + + // Filter on distinct property constraints. + s.distinctPropertyConstraint = NewDistinctPropertyIterator(ctx, s.distinctHostsConstraint) + + // Upgrade from feasible to rank iterator + rankSource := NewFeasibleRankIterator(ctx, s.distinctPropertyConstraint) + + // Apply the bin packing, this depends on the resources needed + // by a particular task group. + _, schedConfig, _ := ctx.State().SchedulerConfig() + s.binPack = NewBinPackIterator(ctx, rankSource, false, 0, schedConfig.EffectiveSchedulerAlgorithm()) + + // Apply the job anti-affinity iterator. This is to avoid placing + // multiple allocations on the same node for this job. + s.jobAntiAff = NewJobAntiAffinityIterator(ctx, s.binPack, "") + + // Apply node rescheduling penalty. This tries to avoid placing on a + // node where the allocation failed previously + s.nodeReschedulingPenalty = NewNodeReschedulingPenaltyIterator(ctx, s.jobAntiAff) + + // Apply scores based on affinity stanza + s.nodeAffinity = NewNodeAffinityIterator(ctx, s.nodeReschedulingPenalty) + + // Apply scores based on spread stanza + s.spread = NewSpreadIterator(ctx, s.nodeAffinity) + + // Add the preemption options scoring iterator + preemptionScorer := NewPreemptionScoringIterator(ctx, s.spread) + + // Normalizes scores by averaging them across various scorers + s.scoreNorm = NewScoreNormalizationIterator(ctx, preemptionScorer) + + // Apply a limit function. This is to avoid scanning *every* possible node. + s.limit = NewLimitIterator(ctx, s.scoreNorm, 2, skipScoreThreshold, maxSkip) + + // Select the node with the maximum score for placement + s.maxScore = NewMaxScoreIterator(ctx, s.limit) + return s +} diff --git a/scheduler/stack_oss.go b/scheduler/stack_oss.go deleted file mode 100644 index a705b8c73..000000000 --- a/scheduler/stack_oss.go +++ /dev/null @@ -1,91 +0,0 @@ -// +build !ent - -package scheduler - -// NewGenericStack constructs a stack used for selecting service placements -func NewGenericStack(batch bool, ctx Context) *GenericStack { - // Create a new stack - s := &GenericStack{ - batch: batch, - ctx: ctx, - } - - // Create the source iterator. We randomize the order we visit nodes - // to reduce collisions between schedulers and to do a basic load - // balancing across eligible nodes. - s.source = NewRandomIterator(ctx, nil) - - // Create the quota iterator to determine if placements would result in the - // quota attached to the namespace of the job to go over. - s.quota = NewQuotaIterator(ctx, s.source) - - // Attach the job constraints. The job is filled in later. - s.jobConstraint = NewConstraintChecker(ctx, nil) - - // Filter on task group drivers first as they are faster - s.taskGroupDrivers = NewDriverChecker(ctx, nil) - - // Filter on task group constraints second - s.taskGroupConstraint = NewConstraintChecker(ctx, nil) - - // Filter on task group devices - s.taskGroupDevices = NewDeviceChecker(ctx) - - // Filter on task group host volumes - s.taskGroupHostVolumes = NewHostVolumeChecker(ctx) - - // Filter on available, healthy CSI plugins - s.taskGroupCSIVolumes = NewCSIVolumeChecker(ctx) - - // Create the feasibility wrapper which wraps all feasibility checks in - // which feasibility checking can be skipped if the computed node class has - // previously been marked as eligible or ineligible. Generally this will be - // checks that only needs to examine the single node to determine feasibility. - jobs := []FeasibilityChecker{s.jobConstraint} - tgs := []FeasibilityChecker{s.taskGroupDrivers, - s.taskGroupConstraint, - s.taskGroupHostVolumes, - s.taskGroupDevices} - avail := []FeasibilityChecker{s.taskGroupCSIVolumes} - s.wrappedChecks = NewFeasibilityWrapper(ctx, s.quota, jobs, tgs, avail) - - // Filter on distinct host constraints. - s.distinctHostsConstraint = NewDistinctHostsIterator(ctx, s.wrappedChecks) - - // Filter on distinct property constraints. - s.distinctPropertyConstraint = NewDistinctPropertyIterator(ctx, s.distinctHostsConstraint) - - // Upgrade from feasible to rank iterator - rankSource := NewFeasibleRankIterator(ctx, s.distinctPropertyConstraint) - - // Apply the bin packing, this depends on the resources needed - // by a particular task group. - _, schedConfig, _ := s.ctx.State().SchedulerConfig() - schedulerAlgorithm := schedConfig.EffectiveSchedulerAlgorithm() - - s.binPack = NewBinPackIterator(ctx, rankSource, false, 0, schedulerAlgorithm) - - // Apply the job anti-affinity iterator. This is to avoid placing - // multiple allocations on the same node for this job. - s.jobAntiAff = NewJobAntiAffinityIterator(ctx, s.binPack, "") - - // Apply node rescheduling penalty. This tries to avoid placing on a - // node where the allocation failed previously - s.nodeReschedulingPenalty = NewNodeReschedulingPenaltyIterator(ctx, s.jobAntiAff) - - // Apply scores based on affinity stanza - s.nodeAffinity = NewNodeAffinityIterator(ctx, s.nodeReschedulingPenalty) - - // Apply scores based on spread stanza - s.spread = NewSpreadIterator(ctx, s.nodeAffinity) - - // Normalizes scores by averaging them across various scorers - s.scoreNorm = NewScoreNormalizationIterator(ctx, s.spread) - - // Apply a limit function. This is to avoid scanning *every* possible node. - s.limit = NewLimitIterator(ctx, s.scoreNorm, 2, skipScoreThreshold, maxSkip) - - // Select the node with the maximum score for placement - s.maxScore = NewMaxScoreIterator(ctx, s.limit) - return s -}