open-nomad/scheduler/stack.go

418 lines
14 KiB
Go
Raw Normal View History

2015-08-14 00:48:26 +00:00
package scheduler
import (
"math"
2015-08-14 04:46:33 +00:00
"time"
2015-08-14 00:48:26 +00:00
"github.com/hashicorp/nomad/nomad/structs"
)
const (
// skipScoreThreshold is a threshold used in the limit iterator to skip nodes
// that have a score lower than this. -1 is the lowest possible score for a
// node with penalties (based on job anti affinity and node rescheduling penalties
skipScoreThreshold = 0.0
// maxSkip limits the number of nodes that can be skipped in the limit iterator
maxSkip = 3
)
2015-08-14 01:44:27 +00:00
// Stack is a chained collection of iterators. The stack is used to
// make placement decisions. Different schedulers may customize the
// stack they use to vary the way placements are made.
2015-08-14 00:48:26 +00:00
type Stack interface {
// SetNodes is used to set the base set of potential nodes
SetNodes([]*structs.Node)
2015-08-14 00:48:26 +00:00
// SetTaskGroup is used to set the job for selection
SetJob(job *structs.Job)
// Select is used to select a node for the task group
2018-10-02 20:36:04 +00:00
Select(tg *structs.TaskGroup, options *SelectOptions) *RankedNode
}
type SelectOptions struct {
PenaltyNodeIDs map[string]struct{}
PreferredNodes []*structs.Node
Preempt bool
AllocName string
2015-08-14 00:48:26 +00:00
}
// GenericStack is the Stack used for the Generic scheduler. It is
2015-08-14 01:44:27 +00:00
// designed to make better placement decisions at the cost of performance.
type GenericStack struct {
batch bool
ctx Context
source *StaticIterator
wrappedChecks *FeasibilityWrapper
quota FeasibleIterator
jobVersion *uint64
jobConstraint *ConstraintChecker
taskGroupDrivers *DriverChecker
taskGroupConstraint *ConstraintChecker
taskGroupDevices *DeviceChecker
taskGroupHostVolumes *HostVolumeChecker
taskGroupCSIVolumes *CSIVolumeChecker
2020-05-15 15:09:01 +00:00
taskGroupNetwork *NetworkChecker
distinctHostsConstraint *DistinctHostsIterator
distinctPropertyConstraint *DistinctPropertyIterator
binPack *BinPackIterator
jobAntiAff *JobAntiAffinityIterator
nodeReschedulingPenalty *NodeReschedulingPenaltyIterator
limit *LimitIterator
maxScore *MaxScoreIterator
nodeAffinity *NodeAffinityIterator
spread *SpreadIterator
scoreNorm *ScoreNormalizationIterator
2015-08-14 00:48:26 +00:00
}
func (s *GenericStack) SetNodes(baseNodes []*structs.Node) {
// Shuffle base nodes
shuffleNodes(baseNodes)
// Update the set of base nodes
s.source.SetNodes(baseNodes)
// Apply a limit function. This is to avoid scanning *every* possible node.
// For batch jobs we only need to evaluate 2 options and depend on the
2015-10-14 23:43:06 +00:00
// power of two choices. For services jobs we need to visit "enough".
// Using a log of the total number of nodes is a good restriction, with
// at least 2 as the floor
limit := 2
if n := len(baseNodes); !s.batch && n > 0 {
logLimit := int(math.Ceil(math.Log2(float64(n))))
if logLimit > limit {
limit = logLimit
}
}
s.limit.SetLimit(limit)
}
func (s *GenericStack) SetJob(job *structs.Job) {
if s.jobVersion != nil && *s.jobVersion == job.Version {
return
}
jobVer := job.Version
s.jobVersion = &jobVer
2015-08-14 01:44:27 +00:00
s.jobConstraint.SetConstraints(job.Constraints)
s.distinctHostsConstraint.SetJob(job)
s.distinctPropertyConstraint.SetJob(job)
s.binPack.SetJob(job)
s.jobAntiAff.SetJob(job)
s.nodeAffinity.SetJob(job)
s.spread.SetJob(job)
s.ctx.Eligibility().SetJob(job)
s.taskGroupCSIVolumes.SetNamespace(job.Namespace)
s.taskGroupCSIVolumes.SetJobID(job.ID)
2017-10-13 21:36:02 +00:00
if contextual, ok := s.quota.(ContextualIterator); ok {
contextual.SetJob(job)
}
2015-08-14 00:48:26 +00:00
}
2018-10-02 20:36:04 +00:00
func (s *GenericStack) Select(tg *structs.TaskGroup, options *SelectOptions) *RankedNode {
// This block handles trying to select from preferred nodes if options specify them
// It also sets back the set of nodes to the original nodes
if options != nil && len(options.PreferredNodes) > 0 {
originalNodes := s.source.nodes
s.source.SetNodes(options.PreferredNodes)
optionsNew := *options
optionsNew.PreferredNodes = nil
2018-10-02 20:36:04 +00:00
if option := s.Select(tg, &optionsNew); option != nil {
s.source.SetNodes(originalNodes)
2018-10-02 20:36:04 +00:00
return option
}
s.source.SetNodes(originalNodes)
return s.Select(tg, &optionsNew)
}
2015-08-14 01:44:27 +00:00
// Reset the max selector and context
s.maxScore.Reset()
s.ctx.Reset()
2015-08-14 04:46:33 +00:00
start := time.Now()
2015-08-14 01:44:27 +00:00
// Get the task groups constraints.
tgConstr := taskGroupConstraints(tg)
2015-08-14 00:48:26 +00:00
// Update the parameters of iterators
s.taskGroupDrivers.SetDrivers(tgConstr.drivers)
s.taskGroupConstraint.SetConstraints(tgConstr.constraints)
2018-10-10 18:32:56 +00:00
s.taskGroupDevices.SetTaskGroup(tg)
s.taskGroupHostVolumes.SetVolumes(tg.Volumes)
s.taskGroupCSIVolumes.SetVolumes(options.AllocName, tg.Volumes)
2020-05-15 15:09:01 +00:00
if len(tg.Networks) > 0 {
2020-06-16 15:53:10 +00:00
s.taskGroupNetwork.SetNetwork(tg.Networks[0])
2020-05-15 15:09:01 +00:00
}
s.distinctHostsConstraint.SetTaskGroup(tg)
s.distinctPropertyConstraint.SetTaskGroup(tg)
s.wrappedChecks.SetTaskGroup(tg.Name)
s.binPack.SetTaskGroup(tg)
if options != nil {
s.binPack.evict = options.Preempt
}
s.jobAntiAff.SetTaskGroup(tg)
if options != nil {
s.nodeReschedulingPenalty.SetPenaltyNodes(options.PenaltyNodeIDs)
}
s.nodeAffinity.SetTaskGroup(tg)
s.spread.SetTaskGroup(tg)
if s.nodeAffinity.hasAffinities() || s.spread.hasSpreads() {
s.limit.SetLimit(math.MaxInt32)
}
2015-08-14 01:36:13 +00:00
2017-10-13 21:36:02 +00:00
if contextual, ok := s.quota.(ContextualIterator); ok {
contextual.SetTaskGroup(tg)
}
2015-08-14 04:46:33 +00:00
// Find the node with the max score
option := s.maxScore.Next()
// Store the compute time
s.ctx.Metrics().AllocationTime = time.Since(start)
2018-10-02 20:36:04 +00:00
return option
2015-08-14 00:48:26 +00:00
}
2015-10-14 23:43:06 +00:00
// SystemStack is the Stack used for the System scheduler. It is designed to
// attempt to make placements on all nodes.
type SystemStack struct {
2018-10-10 18:32:56 +00:00
ctx Context
source *StaticIterator
wrappedChecks *FeasibilityWrapper
quota FeasibleIterator
jobConstraint *ConstraintChecker
taskGroupDrivers *DriverChecker
taskGroupConstraint *ConstraintChecker
taskGroupDevices *DeviceChecker
taskGroupHostVolumes *HostVolumeChecker
taskGroupCSIVolumes *CSIVolumeChecker
taskGroupNetwork *NetworkChecker
2018-10-10 18:32:56 +00:00
distinctPropertyConstraint *DistinctPropertyIterator
binPack *BinPackIterator
scoreNorm *ScoreNormalizationIterator
2015-10-14 23:43:06 +00:00
}
// NewSystemStack constructs a stack used for selecting system job placements.
func NewSystemStack(ctx Context) *SystemStack {
2015-10-14 23:43:06 +00:00
// Create a new stack
s := &SystemStack{ctx: ctx}
// Create the source iterator. We visit nodes in a linear order because we
// have to evaluate on all nodes.
s.source = NewStaticIterator(ctx, nil)
2015-10-14 23:43:06 +00:00
// Attach the job constraints. The job is filled in later.
s.jobConstraint = NewConstraintChecker(ctx, nil)
2015-10-14 23:43:06 +00:00
// Filter on task group drivers first as they are faster
s.taskGroupDrivers = NewDriverChecker(ctx, nil)
2015-10-14 23:43:06 +00:00
// Filter on task group constraints second
s.taskGroupConstraint = NewConstraintChecker(ctx, nil)
// Filter on task group host volumes
s.taskGroupHostVolumes = NewHostVolumeChecker(ctx)
// Filter on available, healthy CSI plugins
s.taskGroupCSIVolumes = NewCSIVolumeChecker(ctx)
2018-10-10 18:32:56 +00:00
// Filter on task group devices
s.taskGroupDevices = NewDeviceChecker(ctx)
// Filter on available client networks
s.taskGroupNetwork = NewNetworkChecker(ctx)
2016-01-27 00:43:42 +00:00
// Create the feasibility wrapper which wraps all feasibility checks in
// which feasibility checking can be skipped if the computed node class has
// previously been marked as eligible or ineligible. Generally this will be
// checks that only needs to examine the single node to determine feasibility.
jobs := []FeasibilityChecker{s.jobConstraint}
tgs := []FeasibilityChecker{s.taskGroupDrivers, s.taskGroupConstraint,
s.taskGroupHostVolumes,
s.taskGroupDevices,
s.taskGroupNetwork}
avail := []FeasibilityChecker{s.taskGroupCSIVolumes}
s.wrappedChecks = NewFeasibilityWrapper(ctx, s.source, jobs, tgs, avail)
2015-10-14 23:43:06 +00:00
// Filter on distinct property constraints.
s.distinctPropertyConstraint = NewDistinctPropertyIterator(ctx, s.wrappedChecks)
// Create the quota iterator to determine if placements would result in
// the quota attached to the namespace of the job to go over.
// Note: the quota iterator must be the last feasibility iterator before
// we upgrade to ranking, or our quota usage will include ineligible
// nodes!
s.quota = NewQuotaIterator(ctx, s.distinctPropertyConstraint)
2015-10-14 23:43:06 +00:00
// Upgrade from feasible to rank iterator
rankSource := NewFeasibleRankIterator(ctx, s.quota)
2015-10-14 23:43:06 +00:00
// Apply the bin packing, this depends on the resources needed
// by a particular task group. Enable eviction as system jobs are high
// priority.
_, schedConfig, _ := s.ctx.State().SchedulerConfig()
enablePreemption := true
if schedConfig != nil {
enablePreemption = schedConfig.PreemptionConfig.SystemSchedulerEnabled
}
2020-04-08 19:59:16 +00:00
Allow configuring memory oversubscription (#10466) Cluster operators want to have better control over memory oversubscription and may want to enable/disable it based on their experience. This PR adds a scheduler configuration field to control memory oversubscription. It's additional field that can be set in the [API via Scheduler Config](https://www.nomadproject.io/api-docs/operator/scheduler), or [the agent server config](https://www.nomadproject.io/docs/configuration/server#configuring-scheduler-config). I opted to have the memory oversubscription be an opt-in, but happy to change it. To enable it, operators should call the API with: ```json { "MemoryOversubscriptionEnabled": true } ``` If memory oversubscription is disabled, submitting jobs specifying `memory_max` will get a "Memory oversubscription is not enabled" warnings, but the jobs will be accepted without them accessing the additional memory. The warning message is like: ``` $ nomad job run /tmp/j Job Warnings: 1 warning(s): * Memory oversubscription is not enabled; Task cache.redis memory_max value will be ignored ==> Monitoring evaluation "7c444157" Evaluation triggered by job "example" ==> Monitoring evaluation "7c444157" Evaluation within deployment: "9d826f13" Allocation "aa5c3cad" created: node "9272088e", group "cache" Evaluation status changed: "pending" -> "complete" ==> Evaluation "7c444157" finished with status "complete" # then you can examine the Alloc AllocatedResources to validate whether the task is allowed to exceed memory: $ nomad alloc status -json aa5c3cad | jq '.AllocatedResources.Tasks["redis"].Memory' { "MemoryMB": 256, "MemoryMaxMB": 0 } ```
2021-04-30 02:09:56 +00:00
s.binPack = NewBinPackIterator(ctx, rankSource, enablePreemption, 0, schedConfig)
// Apply score normalization
s.scoreNorm = NewScoreNormalizationIterator(ctx, s.binPack)
2015-10-14 23:43:06 +00:00
return s
}
func (s *SystemStack) SetNodes(baseNodes []*structs.Node) {
// Update the set of base nodes
s.source.SetNodes(baseNodes)
}
func (s *SystemStack) SetJob(job *structs.Job) {
s.jobConstraint.SetConstraints(job.Constraints)
s.distinctPropertyConstraint.SetJob(job)
s.binPack.SetJob(job)
s.ctx.Eligibility().SetJob(job)
2017-10-13 21:36:02 +00:00
if contextual, ok := s.quota.(ContextualIterator); ok {
contextual.SetJob(job)
}
2015-10-14 23:43:06 +00:00
}
2018-10-02 20:36:04 +00:00
func (s *SystemStack) Select(tg *structs.TaskGroup, options *SelectOptions) *RankedNode {
2015-10-14 23:43:06 +00:00
// Reset the binpack selector and context
s.scoreNorm.Reset()
2015-10-14 23:43:06 +00:00
s.ctx.Reset()
start := time.Now()
// Get the task groups constraints.
tgConstr := taskGroupConstraints(tg)
2015-10-14 23:43:06 +00:00
// Update the parameters of iterators
s.taskGroupDrivers.SetDrivers(tgConstr.drivers)
s.taskGroupConstraint.SetConstraints(tgConstr.constraints)
2018-10-10 18:32:56 +00:00
s.taskGroupDevices.SetTaskGroup(tg)
s.taskGroupHostVolumes.SetVolumes(tg.Volumes)
s.taskGroupCSIVolumes.SetVolumes(options.AllocName, tg.Volumes)
if len(tg.Networks) > 0 {
s.taskGroupNetwork.SetNetwork(tg.Networks[0])
}
s.wrappedChecks.SetTaskGroup(tg.Name)
s.distinctPropertyConstraint.SetTaskGroup(tg)
s.binPack.SetTaskGroup(tg)
2015-10-14 23:43:06 +00:00
2017-10-13 21:36:02 +00:00
if contextual, ok := s.quota.(ContextualIterator); ok {
contextual.SetTaskGroup(tg)
}
2015-10-14 23:43:06 +00:00
// Get the next option that satisfies the constraints.
option := s.scoreNorm.Next()
2015-10-14 23:43:06 +00:00
// Store the compute time
s.ctx.Metrics().AllocationTime = time.Since(start)
2018-10-02 20:36:04 +00:00
return option
2015-10-14 23:43:06 +00:00
}
// NewGenericStack constructs a stack used for selecting service placements
func NewGenericStack(batch bool, ctx Context) *GenericStack {
// Create a new stack
s := &GenericStack{
batch: batch,
ctx: ctx,
}
// Create the source iterator. We randomize the order we visit nodes
// to reduce collisions between schedulers and to do a basic load
// balancing across eligible nodes.
s.source = NewRandomIterator(ctx, nil)
// Attach the job constraints. The job is filled in later.
s.jobConstraint = NewConstraintChecker(ctx, nil)
// Filter on task group drivers first as they are faster
s.taskGroupDrivers = NewDriverChecker(ctx, nil)
// Filter on task group constraints second
s.taskGroupConstraint = NewConstraintChecker(ctx, nil)
// Filter on task group devices
s.taskGroupDevices = NewDeviceChecker(ctx)
// Filter on task group host volumes
s.taskGroupHostVolumes = NewHostVolumeChecker(ctx)
// Filter on available, healthy CSI plugins
s.taskGroupCSIVolumes = NewCSIVolumeChecker(ctx)
2020-05-15 15:09:01 +00:00
// Filter on available client networks
s.taskGroupNetwork = NewNetworkChecker(ctx)
// Create the feasibility wrapper which wraps all feasibility checks in
// which feasibility checking can be skipped if the computed node class has
// previously been marked as eligible or ineligible. Generally this will be
// checks that only needs to examine the single node to determine feasibility.
jobs := []FeasibilityChecker{s.jobConstraint}
tgs := []FeasibilityChecker{s.taskGroupDrivers,
s.taskGroupConstraint,
s.taskGroupHostVolumes,
2020-05-15 15:09:01 +00:00
s.taskGroupDevices,
s.taskGroupNetwork}
avail := []FeasibilityChecker{s.taskGroupCSIVolumes}
s.wrappedChecks = NewFeasibilityWrapper(ctx, s.source, jobs, tgs, avail)
// Filter on distinct host constraints.
s.distinctHostsConstraint = NewDistinctHostsIterator(ctx, s.wrappedChecks)
// Filter on distinct property constraints.
s.distinctPropertyConstraint = NewDistinctPropertyIterator(ctx, s.distinctHostsConstraint)
// Create the quota iterator to determine if placements would result in
// the quota attached to the namespace of the job to go over.
// Note: the quota iterator must be the last feasibility iterator before
// we upgrade to ranking, or our quota usage will include ineligible
// nodes!
s.quota = NewQuotaIterator(ctx, s.distinctPropertyConstraint)
// Upgrade from feasible to rank iterator
rankSource := NewFeasibleRankIterator(ctx, s.quota)
// Apply the bin packing, this depends on the resources needed
// by a particular task group.
_, schedConfig, _ := ctx.State().SchedulerConfig()
Allow configuring memory oversubscription (#10466) Cluster operators want to have better control over memory oversubscription and may want to enable/disable it based on their experience. This PR adds a scheduler configuration field to control memory oversubscription. It's additional field that can be set in the [API via Scheduler Config](https://www.nomadproject.io/api-docs/operator/scheduler), or [the agent server config](https://www.nomadproject.io/docs/configuration/server#configuring-scheduler-config). I opted to have the memory oversubscription be an opt-in, but happy to change it. To enable it, operators should call the API with: ```json { "MemoryOversubscriptionEnabled": true } ``` If memory oversubscription is disabled, submitting jobs specifying `memory_max` will get a "Memory oversubscription is not enabled" warnings, but the jobs will be accepted without them accessing the additional memory. The warning message is like: ``` $ nomad job run /tmp/j Job Warnings: 1 warning(s): * Memory oversubscription is not enabled; Task cache.redis memory_max value will be ignored ==> Monitoring evaluation "7c444157" Evaluation triggered by job "example" ==> Monitoring evaluation "7c444157" Evaluation within deployment: "9d826f13" Allocation "aa5c3cad" created: node "9272088e", group "cache" Evaluation status changed: "pending" -> "complete" ==> Evaluation "7c444157" finished with status "complete" # then you can examine the Alloc AllocatedResources to validate whether the task is allowed to exceed memory: $ nomad alloc status -json aa5c3cad | jq '.AllocatedResources.Tasks["redis"].Memory' { "MemoryMB": 256, "MemoryMaxMB": 0 } ```
2021-04-30 02:09:56 +00:00
s.binPack = NewBinPackIterator(ctx, rankSource, false, 0, schedConfig)
// Apply the job anti-affinity iterator. This is to avoid placing
// multiple allocations on the same node for this job.
s.jobAntiAff = NewJobAntiAffinityIterator(ctx, s.binPack, "")
// Apply node rescheduling penalty. This tries to avoid placing on a
// node where the allocation failed previously
s.nodeReschedulingPenalty = NewNodeReschedulingPenaltyIterator(ctx, s.jobAntiAff)
// Apply scores based on affinity stanza
s.nodeAffinity = NewNodeAffinityIterator(ctx, s.nodeReschedulingPenalty)
// Apply scores based on spread stanza
s.spread = NewSpreadIterator(ctx, s.nodeAffinity)
// Add the preemption options scoring iterator
preemptionScorer := NewPreemptionScoringIterator(ctx, s.spread)
// Normalizes scores by averaging them across various scorers
s.scoreNorm = NewScoreNormalizationIterator(ctx, preemptionScorer)
// Apply a limit function. This is to avoid scanning *every* possible node.
s.limit = NewLimitIterator(ctx, s.scoreNorm, 2, skipScoreThreshold, maxSkip)
// Select the node with the maximum score for placement
s.maxScore = NewMaxScoreIterator(ctx, s.limit)
return s
}