open-nomad/scheduler/stack.go

439 lines
14 KiB
Go
Raw Normal View History

2015-08-14 00:48:26 +00:00
package scheduler
import (
"math"
2015-08-14 04:46:33 +00:00
"time"
2015-08-14 00:48:26 +00:00
"github.com/hashicorp/nomad/nomad/structs"
)
const (
// skipScoreThreshold is a threshold used in the limit iterator to skip nodes
// that have a score lower than this. -1 is the lowest possible score for a
// node with penalties (based on job anti affinity and node rescheduling penalties
skipScoreThreshold = 0.0
// maxSkip limits the number of nodes that can be skipped in the limit iterator
maxSkip = 3
)
2015-08-14 01:44:27 +00:00
// Stack is a chained collection of iterators. The stack is used to
// make placement decisions. Different schedulers may customize the
// stack they use to vary the way placements are made.
2015-08-14 00:48:26 +00:00
type Stack interface {
// SetNodes is used to set the base set of potential nodes
SetNodes([]*structs.Node)
2015-08-14 00:48:26 +00:00
// SetTaskGroup is used to set the job for selection
SetJob(job *structs.Job)
// Select is used to select a node for the task group
2018-10-02 20:36:04 +00:00
Select(tg *structs.TaskGroup, options *SelectOptions) *RankedNode
}
type SelectOptions struct {
PenaltyNodeIDs map[string]struct{}
PreferredNodes []*structs.Node
Preempt bool
AllocName string
2015-08-14 00:48:26 +00:00
}
// GenericStack is the Stack used for the Generic scheduler. It is
2015-08-14 01:44:27 +00:00
// designed to make better placement decisions at the cost of performance.
type GenericStack struct {
batch bool
ctx Context
source *StaticIterator
wrappedChecks *FeasibilityWrapper
quota FeasibleIterator
jobVersion *uint64
jobConstraint *ConstraintChecker
taskGroupDrivers *DriverChecker
taskGroupConstraint *ConstraintChecker
taskGroupDevices *DeviceChecker
taskGroupHostVolumes *HostVolumeChecker
taskGroupCSIVolumes *CSIVolumeChecker
2020-05-15 15:09:01 +00:00
taskGroupNetwork *NetworkChecker
distinctHostsConstraint *DistinctHostsIterator
distinctPropertyConstraint *DistinctPropertyIterator
binPack *BinPackIterator
jobAntiAff *JobAntiAffinityIterator
nodeReschedulingPenalty *NodeReschedulingPenaltyIterator
limit *LimitIterator
maxScore *MaxScoreIterator
nodeAffinity *NodeAffinityIterator
spread *SpreadIterator
scoreNorm *ScoreNormalizationIterator
2015-08-14 00:48:26 +00:00
}
func (s *GenericStack) SetNodes(baseNodes []*structs.Node) {
// Shuffle base nodes
shuffleNodes(baseNodes)
// Update the set of base nodes
s.source.SetNodes(baseNodes)
// Apply a limit function. This is to avoid scanning *every* possible node.
// For batch jobs we only need to evaluate 2 options and depend on the
2015-10-14 23:43:06 +00:00
// power of two choices. For services jobs we need to visit "enough".
// Using a log of the total number of nodes is a good restriction, with
// at least 2 as the floor
limit := 2
if n := len(baseNodes); !s.batch && n > 0 {
logLimit := int(math.Ceil(math.Log2(float64(n))))
if logLimit > limit {
limit = logLimit
}
}
s.limit.SetLimit(limit)
}
func (s *GenericStack) SetJob(job *structs.Job) {
if s.jobVersion != nil && *s.jobVersion == job.Version {
return
}
jobVer := job.Version
s.jobVersion = &jobVer
2015-08-14 01:44:27 +00:00
s.jobConstraint.SetConstraints(job.Constraints)
s.distinctHostsConstraint.SetJob(job)
s.distinctPropertyConstraint.SetJob(job)
s.binPack.SetJob(job)
s.jobAntiAff.SetJob(job)
s.nodeAffinity.SetJob(job)
s.spread.SetJob(job)
s.ctx.Eligibility().SetJob(job)
s.taskGroupCSIVolumes.SetNamespace(job.Namespace)
s.taskGroupCSIVolumes.SetJobID(job.ID)
2017-10-13 21:36:02 +00:00
if contextual, ok := s.quota.(ContextualIterator); ok {
contextual.SetJob(job)
}
2015-08-14 00:48:26 +00:00
}
2018-10-02 20:36:04 +00:00
func (s *GenericStack) Select(tg *structs.TaskGroup, options *SelectOptions) *RankedNode {
// This block handles trying to select from preferred nodes if options specify them
// It also sets back the set of nodes to the original nodes
if options != nil && len(options.PreferredNodes) > 0 {
originalNodes := s.source.nodes
s.source.SetNodes(options.PreferredNodes)
optionsNew := *options
optionsNew.PreferredNodes = nil
2018-10-02 20:36:04 +00:00
if option := s.Select(tg, &optionsNew); option != nil {
s.source.SetNodes(originalNodes)
2018-10-02 20:36:04 +00:00
return option
}
s.source.SetNodes(originalNodes)
return s.Select(tg, &optionsNew)
}
2015-08-14 01:44:27 +00:00
// Reset the max selector and context
s.maxScore.Reset()
s.ctx.Reset()
2015-08-14 04:46:33 +00:00
start := time.Now()
2015-08-14 01:44:27 +00:00
// Get the task groups constraints.
tgConstr := taskGroupConstraints(tg)
2015-08-14 00:48:26 +00:00
// Update the parameters of iterators
s.taskGroupDrivers.SetDrivers(tgConstr.drivers)
s.taskGroupConstraint.SetConstraints(tgConstr.constraints)
2018-10-10 18:32:56 +00:00
s.taskGroupDevices.SetTaskGroup(tg)
s.taskGroupHostVolumes.SetVolumes(tg.Volumes)
s.taskGroupCSIVolumes.SetVolumes(options.AllocName, tg.Volumes)
2020-05-15 15:09:01 +00:00
if len(tg.Networks) > 0 {
2020-06-16 15:53:10 +00:00
s.taskGroupNetwork.SetNetwork(tg.Networks[0])
2020-05-15 15:09:01 +00:00
}
s.distinctHostsConstraint.SetTaskGroup(tg)
s.distinctPropertyConstraint.SetTaskGroup(tg)
s.wrappedChecks.SetTaskGroup(tg.Name)
s.binPack.SetTaskGroup(tg)
if options != nil {
s.binPack.evict = options.Preempt
}
s.jobAntiAff.SetTaskGroup(tg)
if options != nil {
s.nodeReschedulingPenalty.SetPenaltyNodes(options.PenaltyNodeIDs)
}
s.nodeAffinity.SetTaskGroup(tg)
s.spread.SetTaskGroup(tg)
if s.nodeAffinity.hasAffinities() || s.spread.hasSpreads() {
scheduler: fix quadratic performance with spread blocks (#11712) When the scheduler picks a node for each evaluation, the `LimitIterator` provides at most 2 eligible nodes for the `MaxScoreIterator` to choose from. This keeps scheduling fast while producing acceptable results because the results are binpacked. Jobs with a `spread` block (or node affinity) remove this limit in order to produce correct spread scoring. This means that every allocation within a job with a `spread` block is evaluated against _all_ eligible nodes. Operators of large clusters have reported that jobs with `spread` blocks that are eligible on a large number of nodes can take longer than the nack timeout to evaluate (60s). Typical evaluations are processed in milliseconds. In practice, it's not necessary to evaluate every eligible node for every allocation on large clusters, because the `RandomIterator` at the base of the scheduler stack produces enough variation in each pass that the likelihood of an uneven spread is negligible. Note that feasibility is checked before the limit, so this only impacts the number of _eligible_ nodes available for scoring, not the total number of nodes. This changeset sets the iterator limit for "large" `spread` block and node affinity jobs to be equal to the number of desired allocations. This brings an example problematic job evaluation down from ~3min to ~10s. The included tests ensure that we have acceptable spread results across a variety of large cluster topologies.
2021-12-21 15:10:01 +00:00
// scoring spread across all nodes has quadratic behavior, so
// we need to consider a subset of nodes to keep evaluaton times
// reasonable but enough to ensure spread is correct. this
// value was empirically determined.
s.limit.SetLimit(tg.Count)
if tg.Count < 100 {
s.limit.SetLimit(100)
}
}
2015-08-14 01:36:13 +00:00
2017-10-13 21:36:02 +00:00
if contextual, ok := s.quota.(ContextualIterator); ok {
contextual.SetTaskGroup(tg)
}
2015-08-14 04:46:33 +00:00
// Find the node with the max score
option := s.maxScore.Next()
// Store the compute time
s.ctx.Metrics().AllocationTime = time.Since(start)
2018-10-02 20:36:04 +00:00
return option
2015-08-14 00:48:26 +00:00
}
2015-10-14 23:43:06 +00:00
// SystemStack is the Stack used for the System scheduler. It is designed to
// attempt to make placements on all nodes.
type SystemStack struct {
2018-10-10 18:32:56 +00:00
ctx Context
source *StaticIterator
wrappedChecks *FeasibilityWrapper
quota FeasibleIterator
jobConstraint *ConstraintChecker
taskGroupDrivers *DriverChecker
taskGroupConstraint *ConstraintChecker
taskGroupDevices *DeviceChecker
taskGroupHostVolumes *HostVolumeChecker
taskGroupCSIVolumes *CSIVolumeChecker
taskGroupNetwork *NetworkChecker
2018-10-10 18:32:56 +00:00
distinctPropertyConstraint *DistinctPropertyIterator
binPack *BinPackIterator
scoreNorm *ScoreNormalizationIterator
2015-10-14 23:43:06 +00:00
}
// NewSystemStack constructs a stack used for selecting system and sysbatch
// job placements.
//
// sysbatch is used to determine which scheduler config option is used to
// control the use of preemption.
func NewSystemStack(sysbatch bool, ctx Context) *SystemStack {
2015-10-14 23:43:06 +00:00
// Create a new stack
s := &SystemStack{ctx: ctx}
// Create the source iterator. We visit nodes in a linear order because we
// have to evaluate on all nodes.
s.source = NewStaticIterator(ctx, nil)
2015-10-14 23:43:06 +00:00
// Attach the job constraints. The job is filled in later.
s.jobConstraint = NewConstraintChecker(ctx, nil)
2015-10-14 23:43:06 +00:00
// Filter on task group drivers first as they are faster
s.taskGroupDrivers = NewDriverChecker(ctx, nil)
2015-10-14 23:43:06 +00:00
// Filter on task group constraints second
s.taskGroupConstraint = NewConstraintChecker(ctx, nil)
// Filter on task group host volumes
s.taskGroupHostVolumes = NewHostVolumeChecker(ctx)
// Filter on available, healthy CSI plugins
s.taskGroupCSIVolumes = NewCSIVolumeChecker(ctx)
2018-10-10 18:32:56 +00:00
// Filter on task group devices
s.taskGroupDevices = NewDeviceChecker(ctx)
// Filter on available client networks
s.taskGroupNetwork = NewNetworkChecker(ctx)
2016-01-27 00:43:42 +00:00
// Create the feasibility wrapper which wraps all feasibility checks in
// which feasibility checking can be skipped if the computed node class has
// previously been marked as eligible or ineligible. Generally this will be
// checks that only needs to examine the single node to determine feasibility.
jobs := []FeasibilityChecker{s.jobConstraint}
tgs := []FeasibilityChecker{
s.taskGroupDrivers,
s.taskGroupConstraint,
s.taskGroupHostVolumes,
s.taskGroupDevices,
s.taskGroupNetwork,
}
avail := []FeasibilityChecker{s.taskGroupCSIVolumes}
s.wrappedChecks = NewFeasibilityWrapper(ctx, s.source, jobs, tgs, avail)
2015-10-14 23:43:06 +00:00
// Filter on distinct property constraints.
s.distinctPropertyConstraint = NewDistinctPropertyIterator(ctx, s.wrappedChecks)
// Create the quota iterator to determine if placements would result in
// the quota attached to the namespace of the job to go over.
// Note: the quota iterator must be the last feasibility iterator before
// we upgrade to ranking, or our quota usage will include ineligible
// nodes!
s.quota = NewQuotaIterator(ctx, s.distinctPropertyConstraint)
2015-10-14 23:43:06 +00:00
// Upgrade from feasible to rank iterator
rankSource := NewFeasibleRankIterator(ctx, s.quota)
2015-10-14 23:43:06 +00:00
// Apply the bin packing, this depends on the resources needed
// by a particular task group. Enable eviction as system jobs are high
// priority.
_, schedConfig, _ := s.ctx.State().SchedulerConfig()
enablePreemption := true
if schedConfig != nil {
if sysbatch {
enablePreemption = schedConfig.PreemptionConfig.SysBatchSchedulerEnabled
} else {
enablePreemption = schedConfig.PreemptionConfig.SystemSchedulerEnabled
}
}
2020-04-08 19:59:16 +00:00
// Create binpack iterator
Allow configuring memory oversubscription (#10466) Cluster operators want to have better control over memory oversubscription and may want to enable/disable it based on their experience. This PR adds a scheduler configuration field to control memory oversubscription. It's additional field that can be set in the [API via Scheduler Config](https://www.nomadproject.io/api-docs/operator/scheduler), or [the agent server config](https://www.nomadproject.io/docs/configuration/server#configuring-scheduler-config). I opted to have the memory oversubscription be an opt-in, but happy to change it. To enable it, operators should call the API with: ```json { "MemoryOversubscriptionEnabled": true } ``` If memory oversubscription is disabled, submitting jobs specifying `memory_max` will get a "Memory oversubscription is not enabled" warnings, but the jobs will be accepted without them accessing the additional memory. The warning message is like: ``` $ nomad job run /tmp/j Job Warnings: 1 warning(s): * Memory oversubscription is not enabled; Task cache.redis memory_max value will be ignored ==> Monitoring evaluation "7c444157" Evaluation triggered by job "example" ==> Monitoring evaluation "7c444157" Evaluation within deployment: "9d826f13" Allocation "aa5c3cad" created: node "9272088e", group "cache" Evaluation status changed: "pending" -> "complete" ==> Evaluation "7c444157" finished with status "complete" # then you can examine the Alloc AllocatedResources to validate whether the task is allowed to exceed memory: $ nomad alloc status -json aa5c3cad | jq '.AllocatedResources.Tasks["redis"].Memory' { "MemoryMB": 256, "MemoryMaxMB": 0 } ```
2021-04-30 02:09:56 +00:00
s.binPack = NewBinPackIterator(ctx, rankSource, enablePreemption, 0, schedConfig)
// Apply score normalization
s.scoreNorm = NewScoreNormalizationIterator(ctx, s.binPack)
2015-10-14 23:43:06 +00:00
return s
}
func (s *SystemStack) SetNodes(baseNodes []*structs.Node) {
// Update the set of base nodes
s.source.SetNodes(baseNodes)
}
func (s *SystemStack) SetJob(job *structs.Job) {
s.jobConstraint.SetConstraints(job.Constraints)
s.distinctPropertyConstraint.SetJob(job)
s.binPack.SetJob(job)
s.ctx.Eligibility().SetJob(job)
2017-10-13 21:36:02 +00:00
if contextual, ok := s.quota.(ContextualIterator); ok {
contextual.SetJob(job)
}
2015-10-14 23:43:06 +00:00
}
2018-10-02 20:36:04 +00:00
func (s *SystemStack) Select(tg *structs.TaskGroup, options *SelectOptions) *RankedNode {
2015-10-14 23:43:06 +00:00
// Reset the binpack selector and context
s.scoreNorm.Reset()
2015-10-14 23:43:06 +00:00
s.ctx.Reset()
start := time.Now()
// Get the task groups constraints.
tgConstr := taskGroupConstraints(tg)
2015-10-14 23:43:06 +00:00
// Update the parameters of iterators
s.taskGroupDrivers.SetDrivers(tgConstr.drivers)
s.taskGroupConstraint.SetConstraints(tgConstr.constraints)
2018-10-10 18:32:56 +00:00
s.taskGroupDevices.SetTaskGroup(tg)
s.taskGroupHostVolumes.SetVolumes(tg.Volumes)
s.taskGroupCSIVolumes.SetVolumes(options.AllocName, tg.Volumes)
if len(tg.Networks) > 0 {
s.taskGroupNetwork.SetNetwork(tg.Networks[0])
}
s.wrappedChecks.SetTaskGroup(tg.Name)
s.distinctPropertyConstraint.SetTaskGroup(tg)
s.binPack.SetTaskGroup(tg)
2015-10-14 23:43:06 +00:00
2017-10-13 21:36:02 +00:00
if contextual, ok := s.quota.(ContextualIterator); ok {
contextual.SetTaskGroup(tg)
}
2015-10-14 23:43:06 +00:00
// Get the next option that satisfies the constraints.
option := s.scoreNorm.Next()
2015-10-14 23:43:06 +00:00
// Store the compute time
s.ctx.Metrics().AllocationTime = time.Since(start)
2018-10-02 20:36:04 +00:00
return option
2015-10-14 23:43:06 +00:00
}
// NewGenericStack constructs a stack used for selecting service placements
func NewGenericStack(batch bool, ctx Context) *GenericStack {
// Create a new stack
s := &GenericStack{
batch: batch,
ctx: ctx,
}
// Create the source iterator. We randomize the order we visit nodes
// to reduce collisions between schedulers and to do a basic load
// balancing across eligible nodes.
s.source = NewRandomIterator(ctx, nil)
// Attach the job constraints. The job is filled in later.
s.jobConstraint = NewConstraintChecker(ctx, nil)
// Filter on task group drivers first as they are faster
s.taskGroupDrivers = NewDriverChecker(ctx, nil)
// Filter on task group constraints second
s.taskGroupConstraint = NewConstraintChecker(ctx, nil)
// Filter on task group devices
s.taskGroupDevices = NewDeviceChecker(ctx)
// Filter on task group host volumes
s.taskGroupHostVolumes = NewHostVolumeChecker(ctx)
// Filter on available, healthy CSI plugins
s.taskGroupCSIVolumes = NewCSIVolumeChecker(ctx)
2020-05-15 15:09:01 +00:00
// Filter on available client networks
s.taskGroupNetwork = NewNetworkChecker(ctx)
// Create the feasibility wrapper which wraps all feasibility checks in
// which feasibility checking can be skipped if the computed node class has
// previously been marked as eligible or ineligible. Generally this will be
// checks that only needs to examine the single node to determine feasibility.
jobs := []FeasibilityChecker{s.jobConstraint}
tgs := []FeasibilityChecker{
s.taskGroupDrivers,
s.taskGroupConstraint,
s.taskGroupHostVolumes,
2020-05-15 15:09:01 +00:00
s.taskGroupDevices,
s.taskGroupNetwork,
}
avail := []FeasibilityChecker{s.taskGroupCSIVolumes}
s.wrappedChecks = NewFeasibilityWrapper(ctx, s.source, jobs, tgs, avail)
// Filter on distinct host constraints.
s.distinctHostsConstraint = NewDistinctHostsIterator(ctx, s.wrappedChecks)
// Filter on distinct property constraints.
s.distinctPropertyConstraint = NewDistinctPropertyIterator(ctx, s.distinctHostsConstraint)
// Create the quota iterator to determine if placements would result in
// the quota attached to the namespace of the job to go over.
// Note: the quota iterator must be the last feasibility iterator before
// we upgrade to ranking, or our quota usage will include ineligible
// nodes!
s.quota = NewQuotaIterator(ctx, s.distinctPropertyConstraint)
// Upgrade from feasible to rank iterator
rankSource := NewFeasibleRankIterator(ctx, s.quota)
// Apply the bin packing, this depends on the resources needed
// by a particular task group.
_, schedConfig, _ := ctx.State().SchedulerConfig()
Allow configuring memory oversubscription (#10466) Cluster operators want to have better control over memory oversubscription and may want to enable/disable it based on their experience. This PR adds a scheduler configuration field to control memory oversubscription. It's additional field that can be set in the [API via Scheduler Config](https://www.nomadproject.io/api-docs/operator/scheduler), or [the agent server config](https://www.nomadproject.io/docs/configuration/server#configuring-scheduler-config). I opted to have the memory oversubscription be an opt-in, but happy to change it. To enable it, operators should call the API with: ```json { "MemoryOversubscriptionEnabled": true } ``` If memory oversubscription is disabled, submitting jobs specifying `memory_max` will get a "Memory oversubscription is not enabled" warnings, but the jobs will be accepted without them accessing the additional memory. The warning message is like: ``` $ nomad job run /tmp/j Job Warnings: 1 warning(s): * Memory oversubscription is not enabled; Task cache.redis memory_max value will be ignored ==> Monitoring evaluation "7c444157" Evaluation triggered by job "example" ==> Monitoring evaluation "7c444157" Evaluation within deployment: "9d826f13" Allocation "aa5c3cad" created: node "9272088e", group "cache" Evaluation status changed: "pending" -> "complete" ==> Evaluation "7c444157" finished with status "complete" # then you can examine the Alloc AllocatedResources to validate whether the task is allowed to exceed memory: $ nomad alloc status -json aa5c3cad | jq '.AllocatedResources.Tasks["redis"].Memory' { "MemoryMB": 256, "MemoryMaxMB": 0 } ```
2021-04-30 02:09:56 +00:00
s.binPack = NewBinPackIterator(ctx, rankSource, false, 0, schedConfig)
// Apply the job anti-affinity iterator. This is to avoid placing
// multiple allocations on the same node for this job.
s.jobAntiAff = NewJobAntiAffinityIterator(ctx, s.binPack, "")
// Apply node rescheduling penalty. This tries to avoid placing on a
// node where the allocation failed previously
s.nodeReschedulingPenalty = NewNodeReschedulingPenaltyIterator(ctx, s.jobAntiAff)
// Apply scores based on affinity stanza
s.nodeAffinity = NewNodeAffinityIterator(ctx, s.nodeReschedulingPenalty)
// Apply scores based on spread stanza
s.spread = NewSpreadIterator(ctx, s.nodeAffinity)
// Add the preemption options scoring iterator
preemptionScorer := NewPreemptionScoringIterator(ctx, s.spread)
// Normalizes scores by averaging them across various scorers
s.scoreNorm = NewScoreNormalizationIterator(ctx, preemptionScorer)
// Apply a limit function. This is to avoid scanning *every* possible node.
s.limit = NewLimitIterator(ctx, s.scoreNorm, 2, skipScoreThreshold, maxSkip)
// Select the node with the maximum score for placement
s.maxScore = NewMaxScoreIterator(ctx, s.limit)
return s
}