2023-04-10 15:36:59 +00:00
|
|
|
// Copyright (c) HashiCorp, Inc.
|
|
|
|
// SPDX-License-Identifier: MPL-2.0
|
|
|
|
|
2015-08-12 01:27:54 +00:00
|
|
|
package scheduler
|
|
|
|
|
2015-08-13 20:08:15 +00:00
|
|
|
import (
|
|
|
|
"fmt"
|
2018-07-16 13:47:18 +00:00
|
|
|
"math"
|
|
|
|
|
2021-03-19 04:29:07 +00:00
|
|
|
"github.com/hashicorp/nomad/lib/cpuset"
|
|
|
|
|
2015-08-13 20:08:15 +00:00
|
|
|
"github.com/hashicorp/nomad/nomad/structs"
|
|
|
|
)
|
2015-08-12 01:27:54 +00:00
|
|
|
|
2018-07-16 13:47:18 +00:00
|
|
|
const (
|
|
|
|
// binPackingMaxFitScore is the maximum possible bin packing fitness score.
|
|
|
|
// This is used to normalize bin packing score to a value between 0 and 1
|
|
|
|
binPackingMaxFitScore = 18.0
|
|
|
|
)
|
|
|
|
|
2015-08-12 01:27:54 +00:00
|
|
|
// Rank is used to provide a score and various ranking metadata
|
|
|
|
// along with a node when iterating. This state can be modified as
|
|
|
|
// various rank methods are applied.
|
|
|
|
type RankedNode struct {
|
2019-06-11 04:29:12 +00:00
|
|
|
Node *structs.Node
|
|
|
|
FinalScore float64
|
|
|
|
Scores []float64
|
|
|
|
TaskResources map[string]*structs.AllocatedTaskResources
|
2019-12-16 20:34:58 +00:00
|
|
|
TaskLifecycles map[string]*structs.TaskLifecycleConfig
|
2019-06-18 04:55:43 +00:00
|
|
|
AllocResources *structs.AllocatedSharedResources
|
2015-08-16 17:28:58 +00:00
|
|
|
|
2020-10-09 21:31:38 +00:00
|
|
|
// Proposed is used to cache the proposed allocations on the
|
2015-08-16 17:28:58 +00:00
|
|
|
// node. This can be shared between iterators that require it.
|
|
|
|
Proposed []*structs.Allocation
|
2018-09-21 21:05:00 +00:00
|
|
|
|
|
|
|
// PreemptedAllocs is used by the BinpackIterator to identify allocs
|
|
|
|
// that should be preempted in order to make the placement
|
|
|
|
PreemptedAllocs []*structs.Allocation
|
2015-08-12 01:27:54 +00:00
|
|
|
}
|
|
|
|
|
2015-08-13 20:08:15 +00:00
|
|
|
func (r *RankedNode) GoString() string {
|
2018-07-16 13:47:18 +00:00
|
|
|
return fmt.Sprintf("<Node: %s Score: %0.3f>", r.Node.ID, r.FinalScore)
|
2015-08-13 20:08:15 +00:00
|
|
|
}
|
|
|
|
|
2015-09-07 23:19:21 +00:00
|
|
|
func (r *RankedNode) ProposedAllocs(ctx Context) ([]*structs.Allocation, error) {
|
|
|
|
if r.Proposed != nil {
|
|
|
|
return r.Proposed, nil
|
|
|
|
}
|
|
|
|
|
|
|
|
p, err := ctx.ProposedAllocs(r.Node.ID)
|
|
|
|
if err != nil {
|
|
|
|
return nil, err
|
|
|
|
}
|
|
|
|
r.Proposed = p
|
|
|
|
return p, nil
|
|
|
|
}
|
|
|
|
|
2015-09-13 21:31:32 +00:00
|
|
|
func (r *RankedNode) SetTaskResources(task *structs.Task,
|
2018-10-02 20:36:04 +00:00
|
|
|
resource *structs.AllocatedTaskResources) {
|
2015-09-13 21:31:32 +00:00
|
|
|
if r.TaskResources == nil {
|
2018-10-02 20:36:04 +00:00
|
|
|
r.TaskResources = make(map[string]*structs.AllocatedTaskResources)
|
2019-12-16 20:34:58 +00:00
|
|
|
r.TaskLifecycles = make(map[string]*structs.TaskLifecycleConfig)
|
2015-09-13 21:31:32 +00:00
|
|
|
}
|
|
|
|
r.TaskResources[task.Name] = resource
|
2019-12-16 20:34:58 +00:00
|
|
|
r.TaskLifecycles[task.Name] = task.Lifecycle
|
2015-09-13 21:31:32 +00:00
|
|
|
}
|
|
|
|
|
2020-10-09 21:31:38 +00:00
|
|
|
// RankIterator is used to iteratively yield nodes along
|
2015-08-12 01:27:54 +00:00
|
|
|
// with ranking metadata. The iterators may manage some state for
|
|
|
|
// performance optimizations.
|
|
|
|
type RankIterator interface {
|
2015-08-13 22:01:02 +00:00
|
|
|
// Next yields a ranked option or nil if exhausted
|
2015-08-12 01:27:54 +00:00
|
|
|
Next() *RankedNode
|
2015-08-13 22:01:02 +00:00
|
|
|
|
|
|
|
// Reset is invoked when an allocation has been placed
|
|
|
|
// to reset any stale state.
|
|
|
|
Reset()
|
2015-08-12 01:27:54 +00:00
|
|
|
}
|
|
|
|
|
|
|
|
// FeasibleRankIterator is used to consume from a FeasibleIterator
|
|
|
|
// and return an unranked node with base ranking.
|
|
|
|
type FeasibleRankIterator struct {
|
2015-08-12 01:30:45 +00:00
|
|
|
ctx Context
|
2015-08-12 01:27:54 +00:00
|
|
|
source FeasibleIterator
|
|
|
|
}
|
|
|
|
|
|
|
|
// NewFeasibleRankIterator is used to return a new FeasibleRankIterator
|
|
|
|
// from a FeasibleIterator source.
|
|
|
|
func NewFeasibleRankIterator(ctx Context, source FeasibleIterator) *FeasibleRankIterator {
|
|
|
|
iter := &FeasibleRankIterator{
|
2015-08-12 01:30:45 +00:00
|
|
|
ctx: ctx,
|
2015-08-12 01:27:54 +00:00
|
|
|
source: source,
|
|
|
|
}
|
|
|
|
return iter
|
|
|
|
}
|
|
|
|
|
|
|
|
func (iter *FeasibleRankIterator) Next() *RankedNode {
|
|
|
|
option := iter.source.Next()
|
2015-08-13 17:13:11 +00:00
|
|
|
if option == nil {
|
|
|
|
return nil
|
|
|
|
}
|
2015-08-12 01:27:54 +00:00
|
|
|
ranked := &RankedNode{
|
|
|
|
Node: option,
|
|
|
|
}
|
|
|
|
return ranked
|
|
|
|
}
|
|
|
|
|
2015-08-13 22:01:02 +00:00
|
|
|
func (iter *FeasibleRankIterator) Reset() {
|
|
|
|
iter.source.Reset()
|
|
|
|
}
|
|
|
|
|
2015-08-12 01:30:45 +00:00
|
|
|
// StaticRankIterator is a RankIterator that returns a static set of results.
|
|
|
|
// This is largely only useful for testing.
|
|
|
|
type StaticRankIterator struct {
|
|
|
|
ctx Context
|
|
|
|
nodes []*RankedNode
|
|
|
|
offset int
|
2015-08-13 22:01:02 +00:00
|
|
|
seen int
|
2015-08-12 01:30:45 +00:00
|
|
|
}
|
|
|
|
|
2015-08-13 17:05:54 +00:00
|
|
|
// NewStaticRankIterator returns a new static rank iterator over the given nodes
|
|
|
|
func NewStaticRankIterator(ctx Context, nodes []*RankedNode) *StaticRankIterator {
|
|
|
|
iter := &StaticRankIterator{
|
|
|
|
ctx: ctx,
|
|
|
|
nodes: nodes,
|
|
|
|
}
|
|
|
|
return iter
|
|
|
|
}
|
|
|
|
|
2015-08-12 01:30:45 +00:00
|
|
|
func (iter *StaticRankIterator) Next() *RankedNode {
|
|
|
|
// Check if exhausted
|
2015-08-13 22:01:02 +00:00
|
|
|
n := len(iter.nodes)
|
|
|
|
if iter.offset == n || iter.seen == n {
|
|
|
|
if iter.seen != n {
|
|
|
|
iter.offset = 0
|
|
|
|
} else {
|
|
|
|
return nil
|
|
|
|
}
|
2015-08-12 01:30:45 +00:00
|
|
|
}
|
|
|
|
|
|
|
|
// Return the next offset
|
|
|
|
offset := iter.offset
|
|
|
|
iter.offset += 1
|
2015-08-13 22:01:02 +00:00
|
|
|
iter.seen += 1
|
2015-08-12 01:30:45 +00:00
|
|
|
return iter.nodes[offset]
|
|
|
|
}
|
|
|
|
|
2015-08-13 22:01:02 +00:00
|
|
|
func (iter *StaticRankIterator) Reset() {
|
|
|
|
iter.seen = 0
|
|
|
|
}
|
|
|
|
|
2015-08-12 01:27:54 +00:00
|
|
|
// BinPackIterator is a RankIterator that scores potential options
|
|
|
|
// based on a bin-packing algorithm.
|
|
|
|
type BinPackIterator struct {
|
2021-04-30 02:09:56 +00:00
|
|
|
ctx Context
|
|
|
|
source RankIterator
|
|
|
|
evict bool
|
|
|
|
priority int
|
2021-08-18 13:50:37 +00:00
|
|
|
jobId structs.NamespacedID
|
2021-04-30 02:09:56 +00:00
|
|
|
taskGroup *structs.TaskGroup
|
|
|
|
memoryOversubscription bool
|
|
|
|
scoreFit func(*structs.Node, *structs.ComparableResources) float64
|
2015-08-12 01:27:54 +00:00
|
|
|
}
|
|
|
|
|
2015-09-13 21:31:32 +00:00
|
|
|
// NewBinPackIterator returns a BinPackIterator which tries to fit tasks
|
|
|
|
// potentially evicting other tasks based on a given priority.
|
2023-06-22 00:31:50 +00:00
|
|
|
func NewBinPackIterator(ctx Context, source RankIterator, evict bool, priority int) *BinPackIterator {
|
|
|
|
return &BinPackIterator{
|
|
|
|
ctx: ctx,
|
|
|
|
source: source,
|
|
|
|
evict: evict,
|
|
|
|
priority: priority,
|
|
|
|
|
|
|
|
// These are default values that may be overwritten by
|
|
|
|
// SetSchedulerConfiguration.
|
|
|
|
memoryOversubscription: false,
|
|
|
|
scoreFit: structs.ScoreFitBinPack,
|
2015-08-12 01:27:54 +00:00
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2019-04-11 01:20:22 +00:00
|
|
|
func (iter *BinPackIterator) SetJob(job *structs.Job) {
|
|
|
|
iter.priority = job.Priority
|
|
|
|
iter.jobId = job.NamespacedID()
|
2015-08-14 00:48:26 +00:00
|
|
|
}
|
|
|
|
|
2016-08-25 17:27:19 +00:00
|
|
|
func (iter *BinPackIterator) SetTaskGroup(taskGroup *structs.TaskGroup) {
|
|
|
|
iter.taskGroup = taskGroup
|
2015-09-13 21:31:32 +00:00
|
|
|
}
|
|
|
|
|
2023-06-22 00:31:50 +00:00
|
|
|
func (iter *BinPackIterator) SetSchedulerConfiguration(schedConfig *structs.SchedulerConfiguration) {
|
|
|
|
// Set scoring function.
|
|
|
|
algorithm := schedConfig.EffectiveSchedulerAlgorithm()
|
|
|
|
scoreFn := structs.ScoreFitBinPack
|
|
|
|
if algorithm == structs.SchedulerAlgorithmSpread {
|
|
|
|
scoreFn = structs.ScoreFitSpread
|
|
|
|
}
|
|
|
|
iter.scoreFit = scoreFn
|
|
|
|
|
|
|
|
// Set memory oversubscription.
|
|
|
|
iter.memoryOversubscription = schedConfig != nil && schedConfig.MemoryOversubscriptionEnabled
|
|
|
|
}
|
|
|
|
|
2015-08-12 01:27:54 +00:00
|
|
|
func (iter *BinPackIterator) Next() *RankedNode {
|
2015-09-13 21:31:32 +00:00
|
|
|
OUTER:
|
2015-08-12 01:27:54 +00:00
|
|
|
for {
|
2015-08-13 18:54:59 +00:00
|
|
|
// Get the next potential option
|
2015-08-12 01:27:54 +00:00
|
|
|
option := iter.source.Next()
|
|
|
|
if option == nil {
|
|
|
|
return nil
|
|
|
|
}
|
2015-08-13 18:54:59 +00:00
|
|
|
|
scheduling: prevent self-collision in dynamic port network offerings (#16401)
When the scheduler tries to find a placement for a new allocation, it iterates
over a subset of nodes. For each node, we populate a `NetworkIndex` bitmap with
the ports of all existing allocations and any other allocations already proposed
as part of this same evaluation via its `SetAllocs` method. Then we make an
"ask" of the `NetworkIndex` in `AssignPorts` for any ports we need and receive
an "offer" in return. The offer will include both static ports and any dynamic
port assignments.
The `AssignPorts` method was written to support group networks, and it shares
code that selects dynamic ports with the original `AssignTaskNetwork`
code. `AssignTaskNetwork` can request multiple ports from the bitmap at a
time. But `AssignPorts` requests them one at a time and does not account for
possible collisions, and doesn't return an error in that case.
What happens next varies:
1. If the scheduler doesn't place the allocation on that node, the port
conflict is thrown away and there's no problem.
2. If the node is picked and this is the only allocation (or last allocation),
the plan applier will reject the plan when it calls `SetAllocs`, as we'd expect.
3. If the node is picked and there are additional allocations in the same eval
that iterate over the same node, their call to `SetAllocs` will detect the
impossible state and the node will be rejected. This can have the puzzling
behavior where a second task group for the job without any networking at all
can hit a port collision error!
It looks like this bug has existed since we implemented group networks, but
there are several factors that add up to making the issue rare for many users
yet frustratingly frequent for others:
* You're more likely to hit this bug the more tightly packed your range for
dynamic ports is. With 12000 ports in the range by default, many clusters can
avoid this for a long time.
* You're more likely to hit case (3) for jobs with lots of allocations or if a
scheduler has to iterate over a large number of nodes, such as with system jobs,
jobs with `spread` blocks, or (sometimes) jobs using `unique` constraints.
For unlucky combinations of these factors, it's possible that case (3) happens
repeatedly, preventing scheduling of a given job until a client state
change (ex. restarting the agent so all its allocations are rescheduled
elsewhere) re-opens the range of dynamic ports available.
This changeset:
* Fixes the bug by accounting for collisions in dynamic port selection in
`AssignPorts`.
* Adds test coverage for `AssignPorts`, expands coverage of this case for the
deprecated `AssignTaskNetwork`, and tightens the dynamic port range in a
scheduler test for spread scheduling to more easily detect this kind of problem
in the future.
* Adds a `String()` method to `Bitmap` so that any future "screaming" log lines
have a human-readable list of used ports.
2023-03-09 15:09:54 +00:00
|
|
|
// Get the allocations that already exist on the node + those allocs
|
|
|
|
// that have been placed as part of this same evaluation
|
2015-09-07 23:19:21 +00:00
|
|
|
proposed, err := option.ProposedAllocs(iter.ctx)
|
|
|
|
if err != nil {
|
2018-09-15 23:23:13 +00:00
|
|
|
iter.ctx.Logger().Named("binpack").Error("failed retrieving proposed allocations", "error", err)
|
2015-09-07 23:19:21 +00:00
|
|
|
continue
|
2015-08-13 18:54:59 +00:00
|
|
|
}
|
2015-08-13 18:28:02 +00:00
|
|
|
|
2022-01-15 01:09:14 +00:00
|
|
|
// Index the existing network usage.
|
|
|
|
// This should never collide, since it represents the current state of
|
|
|
|
// the node. If it does collide though, it means we found a bug! So
|
|
|
|
// collect as much information as possible.
|
2015-09-13 21:37:09 +00:00
|
|
|
netIdx := structs.NewNetworkIndex()
|
2022-07-12 21:40:25 +00:00
|
|
|
if err := netIdx.SetNode(option.Node); err != nil {
|
2022-01-15 01:09:14 +00:00
|
|
|
iter.ctx.SendEvent(&PortCollisionEvent{
|
2022-07-12 21:40:25 +00:00
|
|
|
Reason: err.Error(),
|
2022-01-15 01:09:14 +00:00
|
|
|
NetIndex: netIdx.Copy(),
|
|
|
|
Node: option.Node,
|
|
|
|
})
|
2022-07-12 21:40:25 +00:00
|
|
|
iter.ctx.Metrics().ExhaustedNode(option.Node, "network: invalid node")
|
2022-01-15 01:09:14 +00:00
|
|
|
continue
|
|
|
|
}
|
|
|
|
if collide, reason := netIdx.AddAllocs(proposed); collide {
|
|
|
|
event := &PortCollisionEvent{
|
|
|
|
Reason: reason,
|
|
|
|
NetIndex: netIdx.Copy(),
|
|
|
|
Node: option.Node,
|
|
|
|
Allocations: make([]*structs.Allocation, len(proposed)),
|
|
|
|
}
|
|
|
|
for i, alloc := range proposed {
|
|
|
|
event.Allocations[i] = alloc.Copy()
|
|
|
|
}
|
|
|
|
iter.ctx.SendEvent(event)
|
|
|
|
iter.ctx.Metrics().ExhaustedNode(option.Node, "network: port collision")
|
|
|
|
continue
|
|
|
|
}
|
2015-09-13 21:31:32 +00:00
|
|
|
|
2018-10-15 22:15:46 +00:00
|
|
|
// Create a device allocator
|
|
|
|
devAllocator := newDeviceAllocator(iter.ctx, option.Node)
|
|
|
|
devAllocator.AddAllocs(proposed)
|
|
|
|
|
2018-10-17 18:04:54 +00:00
|
|
|
// Track the affinities of the devices
|
2018-11-08 23:01:58 +00:00
|
|
|
totalDeviceAffinityWeight := 0.0
|
|
|
|
sumMatchingAffinities := 0.0
|
2018-10-17 18:04:54 +00:00
|
|
|
|
2015-09-13 21:31:32 +00:00
|
|
|
// Assign the resources for each task
|
2018-10-02 20:36:04 +00:00
|
|
|
total := &structs.AllocatedResources{
|
|
|
|
Tasks: make(map[string]*structs.AllocatedTaskResources,
|
|
|
|
len(iter.taskGroup.Tasks)),
|
2019-12-16 20:34:58 +00:00
|
|
|
TaskLifecycles: make(map[string]*structs.TaskLifecycleConfig,
|
|
|
|
len(iter.taskGroup.Tasks)),
|
2018-10-02 20:36:04 +00:00
|
|
|
Shared: structs.AllocatedSharedResources{
|
2018-10-16 22:34:32 +00:00
|
|
|
DiskMB: int64(iter.taskGroup.EphemeralDisk.SizeMB),
|
2018-10-02 20:36:04 +00:00
|
|
|
},
|
2016-08-25 17:27:19 +00:00
|
|
|
}
|
2018-09-21 21:05:00 +00:00
|
|
|
|
|
|
|
var allocsToPreempt []*structs.Allocation
|
|
|
|
|
2018-10-29 17:34:03 +00:00
|
|
|
// Initialize preemptor with node
|
2021-08-18 13:50:37 +00:00
|
|
|
preemptor := NewPreemptor(iter.priority, iter.ctx, &iter.jobId)
|
2018-10-29 17:34:03 +00:00
|
|
|
preemptor.SetNode(option.Node)
|
|
|
|
|
2018-09-21 21:05:00 +00:00
|
|
|
// Count the number of existing preemptions
|
|
|
|
allPreemptions := iter.ctx.Plan().NodePreemptions
|
|
|
|
var currentPreemptions []*structs.Allocation
|
|
|
|
for _, allocs := range allPreemptions {
|
|
|
|
currentPreemptions = append(currentPreemptions, allocs...)
|
|
|
|
}
|
2018-10-29 17:34:03 +00:00
|
|
|
preemptor.SetPreemptions(currentPreemptions)
|
2018-09-21 21:05:00 +00:00
|
|
|
|
2019-06-11 04:29:12 +00:00
|
|
|
// Check if we need task group network resource
|
|
|
|
if len(iter.taskGroup.Networks) > 0 {
|
|
|
|
ask := iter.taskGroup.Networks[0].Copy()
|
2021-04-13 13:53:05 +00:00
|
|
|
for i, port := range ask.DynamicPorts {
|
|
|
|
if port.HostNetwork != "" {
|
|
|
|
if hostNetworkValue, hostNetworkOk := resolveTarget(port.HostNetwork, option.Node); hostNetworkOk {
|
2022-07-12 21:40:25 +00:00
|
|
|
ask.DynamicPorts[i].HostNetwork = hostNetworkValue
|
2021-04-13 13:53:05 +00:00
|
|
|
} else {
|
|
|
|
iter.ctx.Logger().Named("binpack").Error(fmt.Sprintf("Invalid template for %s host network in port %s", port.HostNetwork, port.Label))
|
|
|
|
netIdx.Release()
|
|
|
|
continue OUTER
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
for i, port := range ask.ReservedPorts {
|
|
|
|
if port.HostNetwork != "" {
|
|
|
|
if hostNetworkValue, hostNetworkOk := resolveTarget(port.HostNetwork, option.Node); hostNetworkOk {
|
2022-07-12 21:40:25 +00:00
|
|
|
ask.ReservedPorts[i].HostNetwork = hostNetworkValue
|
2021-04-13 13:53:05 +00:00
|
|
|
} else {
|
|
|
|
iter.ctx.Logger().Named("binpack").Error(fmt.Sprintf("Invalid template for %s host network in port %s", port.HostNetwork, port.Label))
|
|
|
|
netIdx.Release()
|
|
|
|
continue OUTER
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
2020-06-16 15:53:10 +00:00
|
|
|
offer, err := netIdx.AssignPorts(ask)
|
|
|
|
if err != nil {
|
2019-06-11 04:29:12 +00:00
|
|
|
// If eviction is not enabled, mark this node as exhausted and continue
|
|
|
|
if !iter.evict {
|
|
|
|
iter.ctx.Metrics().ExhaustedNode(option.Node,
|
|
|
|
fmt.Sprintf("network: %s", err))
|
|
|
|
netIdx.Release()
|
|
|
|
continue OUTER
|
|
|
|
}
|
|
|
|
|
|
|
|
// Look for preemptible allocations to satisfy the network resource for this task
|
|
|
|
preemptor.SetCandidates(proposed)
|
|
|
|
|
|
|
|
netPreemptions := preemptor.PreemptForNetwork(ask, netIdx)
|
|
|
|
if netPreemptions == nil {
|
2019-12-12 18:05:29 +00:00
|
|
|
iter.ctx.Logger().Named("binpack").Debug("preemption not possible ", "network_resource", ask)
|
2019-06-11 04:29:12 +00:00
|
|
|
netIdx.Release()
|
|
|
|
continue OUTER
|
|
|
|
}
|
|
|
|
allocsToPreempt = append(allocsToPreempt, netPreemptions...)
|
|
|
|
|
|
|
|
// First subtract out preempted allocations
|
|
|
|
proposed = structs.RemoveAllocs(proposed, netPreemptions)
|
|
|
|
|
|
|
|
// Reset the network index and try the offer again
|
|
|
|
netIdx.Release()
|
|
|
|
netIdx = structs.NewNetworkIndex()
|
|
|
|
netIdx.SetNode(option.Node)
|
|
|
|
netIdx.AddAllocs(proposed)
|
|
|
|
|
2020-06-16 15:53:10 +00:00
|
|
|
offer, err = netIdx.AssignPorts(ask)
|
|
|
|
if err != nil {
|
2019-12-12 21:50:16 +00:00
|
|
|
iter.ctx.Logger().Named("binpack").Debug("unexpected error, unable to create network offer after considering preemption", "error", err)
|
2019-06-11 04:29:12 +00:00
|
|
|
netIdx.Release()
|
|
|
|
continue OUTER
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
// Reserve this to prevent another task from colliding
|
2020-06-16 15:53:10 +00:00
|
|
|
netIdx.AddReservedPorts(offer)
|
2019-06-11 04:29:12 +00:00
|
|
|
|
|
|
|
// Update the network ask to the offer
|
2020-07-06 22:51:46 +00:00
|
|
|
nwRes := structs.AllocatedPortsToNetworkResouce(ask, offer, option.Node.NodeResources)
|
2020-06-16 15:53:10 +00:00
|
|
|
total.Shared.Networks = []*structs.NetworkResource{nwRes}
|
2020-06-25 19:16:01 +00:00
|
|
|
total.Shared.Ports = offer
|
2019-06-18 04:55:43 +00:00
|
|
|
option.AllocResources = &structs.AllocatedSharedResources{
|
2020-06-16 15:53:10 +00:00
|
|
|
Networks: []*structs.NetworkResource{nwRes},
|
2019-06-11 04:29:12 +00:00
|
|
|
DiskMB: int64(iter.taskGroup.EphemeralDisk.SizeMB),
|
2020-06-16 15:53:10 +00:00
|
|
|
Ports: offer,
|
2019-06-11 04:29:12 +00:00
|
|
|
}
|
|
|
|
|
|
|
|
}
|
|
|
|
|
2016-08-25 17:27:19 +00:00
|
|
|
for _, task := range iter.taskGroup.Tasks {
|
2018-10-02 20:36:04 +00:00
|
|
|
// Allocate the resources
|
|
|
|
taskResources := &structs.AllocatedTaskResources{
|
|
|
|
Cpu: structs.AllocatedCpuResources{
|
2018-10-16 22:34:32 +00:00
|
|
|
CpuShares: int64(task.Resources.CPU),
|
2018-10-02 20:36:04 +00:00
|
|
|
},
|
|
|
|
Memory: structs.AllocatedMemoryResources{
|
2021-04-30 02:09:56 +00:00
|
|
|
MemoryMB: int64(task.Resources.MemoryMB),
|
2018-10-02 20:36:04 +00:00
|
|
|
},
|
|
|
|
}
|
2021-04-30 02:09:56 +00:00
|
|
|
if iter.memoryOversubscription {
|
|
|
|
taskResources.Memory.MemoryMaxMB = int64(task.Resources.MemoryMaxMB)
|
|
|
|
}
|
2015-09-13 21:31:32 +00:00
|
|
|
|
|
|
|
// Check if we need a network resource
|
2018-10-02 20:36:04 +00:00
|
|
|
if len(task.Resources.Networks) > 0 {
|
|
|
|
ask := task.Resources.Networks[0].Copy()
|
2022-07-12 21:40:25 +00:00
|
|
|
offer, err := netIdx.AssignTaskNetwork(ask)
|
2015-09-13 21:31:32 +00:00
|
|
|
if offer == nil {
|
2018-09-21 21:05:00 +00:00
|
|
|
// If eviction is not enabled, mark this node as exhausted and continue
|
|
|
|
if !iter.evict {
|
|
|
|
iter.ctx.Metrics().ExhaustedNode(option.Node,
|
|
|
|
fmt.Sprintf("network: %s", err))
|
|
|
|
netIdx.Release()
|
|
|
|
continue OUTER
|
|
|
|
}
|
|
|
|
|
|
|
|
// Look for preemptible allocations to satisfy the network resource for this task
|
2018-10-29 17:34:03 +00:00
|
|
|
preemptor.SetCandidates(proposed)
|
|
|
|
|
2018-10-30 14:14:56 +00:00
|
|
|
netPreemptions := preemptor.PreemptForNetwork(ask, netIdx)
|
2018-10-18 04:49:37 +00:00
|
|
|
if netPreemptions == nil {
|
2019-12-12 18:05:29 +00:00
|
|
|
iter.ctx.Logger().Named("binpack").Debug("preemption not possible ", "network_resource", ask)
|
2018-09-21 21:05:00 +00:00
|
|
|
netIdx.Release()
|
|
|
|
continue OUTER
|
|
|
|
}
|
2018-10-18 04:49:37 +00:00
|
|
|
allocsToPreempt = append(allocsToPreempt, netPreemptions...)
|
2018-09-21 21:05:00 +00:00
|
|
|
|
|
|
|
// First subtract out preempted allocations
|
2018-10-18 04:49:37 +00:00
|
|
|
proposed = structs.RemoveAllocs(proposed, netPreemptions)
|
2018-09-21 21:05:00 +00:00
|
|
|
|
|
|
|
// Reset the network index and try the offer again
|
2016-02-20 20:18:22 +00:00
|
|
|
netIdx.Release()
|
2018-09-21 21:05:00 +00:00
|
|
|
netIdx = structs.NewNetworkIndex()
|
|
|
|
netIdx.SetNode(option.Node)
|
|
|
|
netIdx.AddAllocs(proposed)
|
|
|
|
|
2022-07-12 21:40:25 +00:00
|
|
|
offer, err = netIdx.AssignTaskNetwork(ask)
|
2018-09-21 21:05:00 +00:00
|
|
|
if offer == nil {
|
2019-12-12 21:50:16 +00:00
|
|
|
iter.ctx.Logger().Named("binpack").Debug("unexpected error, unable to create network offer after considering preemption", "error", err)
|
2018-09-21 21:05:00 +00:00
|
|
|
netIdx.Release()
|
|
|
|
continue OUTER
|
|
|
|
}
|
2015-09-13 21:31:32 +00:00
|
|
|
}
|
|
|
|
// Reserve this to prevent another task from colliding
|
|
|
|
netIdx.AddReserved(offer)
|
|
|
|
|
|
|
|
// Update the network ask to the offer
|
|
|
|
taskResources.Networks = []*structs.NetworkResource{offer}
|
|
|
|
}
|
|
|
|
|
2018-10-15 22:15:46 +00:00
|
|
|
// Check if we need to assign devices
|
|
|
|
for _, req := range task.Resources.Devices {
|
2018-11-08 23:01:58 +00:00
|
|
|
offer, sumAffinities, err := devAllocator.AssignDevice(req)
|
2018-10-15 22:15:46 +00:00
|
|
|
if offer == nil {
|
2018-11-15 03:23:38 +00:00
|
|
|
// If eviction is not enabled, mark this node as exhausted and continue
|
|
|
|
if !iter.evict {
|
|
|
|
iter.ctx.Metrics().ExhaustedNode(option.Node, fmt.Sprintf("devices: %s", err))
|
|
|
|
continue OUTER
|
|
|
|
}
|
|
|
|
|
|
|
|
// Attempt preemption
|
|
|
|
preemptor.SetCandidates(proposed)
|
|
|
|
devicePreemptions := preemptor.PreemptForDevice(req, devAllocator)
|
|
|
|
|
|
|
|
if devicePreemptions == nil {
|
2019-12-12 18:05:29 +00:00
|
|
|
iter.ctx.Logger().Named("binpack").Debug("preemption not possible", "requested_device", req)
|
2018-11-15 03:23:38 +00:00
|
|
|
netIdx.Release()
|
|
|
|
continue OUTER
|
|
|
|
}
|
|
|
|
allocsToPreempt = append(allocsToPreempt, devicePreemptions...)
|
|
|
|
|
|
|
|
// First subtract out preempted allocations
|
|
|
|
proposed = structs.RemoveAllocs(proposed, allocsToPreempt)
|
|
|
|
|
|
|
|
// Reset the device allocator with new set of proposed allocs
|
|
|
|
devAllocator := newDeviceAllocator(iter.ctx, option.Node)
|
|
|
|
devAllocator.AddAllocs(proposed)
|
|
|
|
|
|
|
|
// Try offer again
|
|
|
|
offer, sumAffinities, err = devAllocator.AssignDevice(req)
|
|
|
|
if offer == nil {
|
2019-12-12 21:50:16 +00:00
|
|
|
iter.ctx.Logger().Named("binpack").Debug("unexpected error, unable to create device offer after considering preemption", "error", err)
|
2018-11-15 03:23:38 +00:00
|
|
|
continue OUTER
|
|
|
|
}
|
2018-10-15 22:15:46 +00:00
|
|
|
}
|
2018-10-17 18:04:54 +00:00
|
|
|
|
|
|
|
// Store the resource
|
2018-10-15 22:15:46 +00:00
|
|
|
devAllocator.AddReserved(offer)
|
|
|
|
taskResources.Devices = append(taskResources.Devices, offer)
|
2018-10-17 18:04:54 +00:00
|
|
|
|
|
|
|
// Add the scores
|
|
|
|
if len(req.Affinities) != 0 {
|
2018-11-08 23:01:58 +00:00
|
|
|
for _, a := range req.Affinities {
|
2019-01-30 20:20:38 +00:00
|
|
|
totalDeviceAffinityWeight += math.Abs(float64(a.Weight))
|
2018-11-08 23:01:58 +00:00
|
|
|
}
|
|
|
|
sumMatchingAffinities += sumAffinities
|
2018-10-17 18:04:54 +00:00
|
|
|
}
|
2018-10-15 22:15:46 +00:00
|
|
|
}
|
|
|
|
|
2021-03-19 04:29:07 +00:00
|
|
|
// Check if we need to allocate any reserved cores
|
|
|
|
if task.Resources.Cores > 0 {
|
|
|
|
// set of reservable CPUs for the node
|
|
|
|
nodeCPUSet := cpuset.New(option.Node.NodeResources.Cpu.ReservableCpuCores...)
|
|
|
|
// set of all reserved CPUs on the node
|
|
|
|
allocatedCPUSet := cpuset.New()
|
|
|
|
for _, alloc := range proposed {
|
|
|
|
allocatedCPUSet = allocatedCPUSet.Union(cpuset.New(alloc.ComparableResources().Flattened.Cpu.ReservedCores...))
|
|
|
|
}
|
|
|
|
|
|
|
|
// add any cores that were reserved for other tasks
|
|
|
|
for _, tr := range total.Tasks {
|
|
|
|
allocatedCPUSet = allocatedCPUSet.Union(cpuset.New(tr.Cpu.ReservedCores...))
|
|
|
|
}
|
|
|
|
|
|
|
|
// set of CPUs not yet reserved on the node
|
|
|
|
availableCPUSet := nodeCPUSet.Difference(allocatedCPUSet)
|
|
|
|
|
|
|
|
// If not enough cores are available mark the node as exhausted
|
|
|
|
if availableCPUSet.Size() < task.Resources.Cores {
|
|
|
|
// TODO preemption
|
|
|
|
iter.ctx.Metrics().ExhaustedNode(option.Node, "cores")
|
|
|
|
continue OUTER
|
|
|
|
}
|
|
|
|
|
|
|
|
// Set the task's reserved cores
|
|
|
|
taskResources.Cpu.ReservedCores = availableCPUSet.ToSlice()[0:task.Resources.Cores]
|
|
|
|
// Total CPU usage on the node is still tracked by CPUShares. Even though the task will have the entire
|
|
|
|
// core reserved, we still track overall usage by cpu shares.
|
|
|
|
taskResources.Cpu.CpuShares = option.Node.NodeResources.Cpu.SharesPerCore() * int64(task.Resources.Cores)
|
|
|
|
}
|
|
|
|
|
2015-09-13 21:31:32 +00:00
|
|
|
// Store the task resource
|
|
|
|
option.SetTaskResources(task, taskResources)
|
|
|
|
|
|
|
|
// Accumulate the total resource requirement
|
2018-10-02 20:36:04 +00:00
|
|
|
total.Tasks[task.Name] = taskResources
|
2019-12-16 20:34:58 +00:00
|
|
|
total.TaskLifecycles[task.Name] = task.Lifecycle
|
2015-09-13 21:31:32 +00:00
|
|
|
}
|
|
|
|
|
2018-10-18 04:49:37 +00:00
|
|
|
// Store current set of running allocs before adding resources for the task group
|
|
|
|
current := proposed
|
|
|
|
|
2015-08-13 18:54:59 +00:00
|
|
|
// Add the resources we are trying to fit
|
2018-10-02 20:36:04 +00:00
|
|
|
proposed = append(proposed, &structs.Allocation{AllocatedResources: total})
|
2015-08-13 18:28:02 +00:00
|
|
|
|
2018-10-17 22:03:38 +00:00
|
|
|
// Check if these allocations fit, if they do not, simply skip this node
|
|
|
|
fit, dim, util, _ := structs.AllocsFit(option.Node, proposed, netIdx, false)
|
2016-02-20 20:18:22 +00:00
|
|
|
netIdx.Release()
|
2015-08-13 18:54:59 +00:00
|
|
|
if !fit {
|
2018-09-21 21:05:00 +00:00
|
|
|
// Skip the node if evictions are not enabled
|
|
|
|
if !iter.evict {
|
|
|
|
iter.ctx.Metrics().ExhaustedNode(option.Node, dim)
|
|
|
|
continue
|
|
|
|
}
|
2018-10-18 05:07:56 +00:00
|
|
|
|
2018-09-21 21:05:00 +00:00
|
|
|
// If eviction is enabled and the node doesn't fit the alloc, check if
|
|
|
|
// any allocs can be preempted
|
2018-10-29 17:34:03 +00:00
|
|
|
|
|
|
|
// Initialize preemptor with candidate set
|
|
|
|
preemptor.SetCandidates(current)
|
2015-08-13 18:54:59 +00:00
|
|
|
|
2018-10-30 14:14:56 +00:00
|
|
|
preemptedAllocs := preemptor.PreemptForTaskGroup(total)
|
2018-10-29 17:34:03 +00:00
|
|
|
allocsToPreempt = append(allocsToPreempt, preemptedAllocs...)
|
2018-10-18 04:49:37 +00:00
|
|
|
|
2018-09-21 21:05:00 +00:00
|
|
|
// If we were unable to find preempted allocs to meet these requirements
|
|
|
|
// mark as exhausted and continue
|
2018-10-29 17:34:03 +00:00
|
|
|
if len(preemptedAllocs) == 0 {
|
2018-09-21 21:05:00 +00:00
|
|
|
iter.ctx.Metrics().ExhaustedNode(option.Node, dim)
|
|
|
|
continue
|
|
|
|
}
|
|
|
|
}
|
|
|
|
if len(allocsToPreempt) > 0 {
|
|
|
|
option.PreemptedAllocs = allocsToPreempt
|
2015-08-13 18:54:59 +00:00
|
|
|
}
|
2015-08-13 19:02:42 +00:00
|
|
|
|
2015-08-13 18:54:59 +00:00
|
|
|
// Score the fit normally otherwise
|
2020-04-24 14:47:43 +00:00
|
|
|
fitness := iter.scoreFit(option.Node, util)
|
2018-07-20 14:33:47 +00:00
|
|
|
normalizedFit := fitness / binPackingMaxFitScore
|
2018-07-16 13:47:18 +00:00
|
|
|
option.Scores = append(option.Scores, normalizedFit)
|
|
|
|
iter.ctx.Metrics().ScoreNode(option.Node, "binpack", normalizedFit)
|
2018-10-17 18:04:54 +00:00
|
|
|
|
|
|
|
// Score the device affinity
|
2018-11-08 23:01:58 +00:00
|
|
|
if totalDeviceAffinityWeight != 0 {
|
|
|
|
sumMatchingAffinities /= totalDeviceAffinityWeight
|
|
|
|
option.Scores = append(option.Scores, sumMatchingAffinities)
|
|
|
|
iter.ctx.Metrics().ScoreNode(option.Node, "devices", sumMatchingAffinities)
|
2018-10-17 18:04:54 +00:00
|
|
|
}
|
|
|
|
|
2015-08-13 18:54:59 +00:00
|
|
|
return option
|
2015-08-13 18:28:02 +00:00
|
|
|
}
|
|
|
|
}
|
2015-08-13 22:01:02 +00:00
|
|
|
|
|
|
|
func (iter *BinPackIterator) Reset() {
|
|
|
|
iter.source.Reset()
|
|
|
|
}
|
2015-08-16 17:32:25 +00:00
|
|
|
|
|
|
|
// JobAntiAffinityIterator is used to apply an anti-affinity to allocating
|
|
|
|
// along side other allocations from this job. This is used to help distribute
|
|
|
|
// load across the cluster.
|
|
|
|
type JobAntiAffinityIterator struct {
|
2018-07-16 13:47:18 +00:00
|
|
|
ctx Context
|
|
|
|
source RankIterator
|
|
|
|
jobID string
|
|
|
|
taskGroup string
|
|
|
|
desiredCount int
|
2015-08-16 17:32:25 +00:00
|
|
|
}
|
|
|
|
|
|
|
|
// NewJobAntiAffinityIterator is used to create a JobAntiAffinityIterator that
|
|
|
|
// applies the given penalty for co-placement with allocs from this job.
|
2018-07-16 13:47:18 +00:00
|
|
|
func NewJobAntiAffinityIterator(ctx Context, source RankIterator, jobID string) *JobAntiAffinityIterator {
|
2015-08-16 17:32:25 +00:00
|
|
|
iter := &JobAntiAffinityIterator{
|
2018-07-16 13:47:18 +00:00
|
|
|
ctx: ctx,
|
|
|
|
source: source,
|
|
|
|
jobID: jobID,
|
2015-08-16 17:32:25 +00:00
|
|
|
}
|
|
|
|
return iter
|
|
|
|
}
|
|
|
|
|
2018-07-16 13:47:18 +00:00
|
|
|
func (iter *JobAntiAffinityIterator) SetJob(job *structs.Job) {
|
|
|
|
iter.jobID = job.ID
|
|
|
|
}
|
|
|
|
|
|
|
|
func (iter *JobAntiAffinityIterator) SetTaskGroup(tg *structs.TaskGroup) {
|
|
|
|
iter.taskGroup = tg.Name
|
|
|
|
iter.desiredCount = tg.Count
|
2015-08-16 17:32:25 +00:00
|
|
|
}
|
|
|
|
|
|
|
|
func (iter *JobAntiAffinityIterator) Next() *RankedNode {
|
|
|
|
for {
|
|
|
|
option := iter.source.Next()
|
|
|
|
if option == nil {
|
|
|
|
return nil
|
|
|
|
}
|
|
|
|
|
|
|
|
// Get the proposed allocations
|
2015-09-07 23:19:21 +00:00
|
|
|
proposed, err := option.ProposedAllocs(iter.ctx)
|
|
|
|
if err != nil {
|
2018-09-15 23:23:13 +00:00
|
|
|
iter.ctx.Logger().Named("job_anti_affinity").Error("failed retrieving proposed allocations", "error", err)
|
2015-09-07 23:19:21 +00:00
|
|
|
continue
|
2015-08-16 17:32:25 +00:00
|
|
|
}
|
|
|
|
|
|
|
|
// Determine the number of collisions
|
|
|
|
collisions := 0
|
|
|
|
for _, alloc := range proposed {
|
2018-07-16 13:47:18 +00:00
|
|
|
if alloc.JobID == iter.jobID && alloc.TaskGroup == iter.taskGroup {
|
2015-08-16 17:32:25 +00:00
|
|
|
collisions += 1
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2018-07-16 13:47:18 +00:00
|
|
|
// Calculate the penalty based on number of collisions
|
|
|
|
// TODO(preetha): Figure out if batch jobs need a different scoring penalty where collisions matter less
|
2015-08-16 17:32:25 +00:00
|
|
|
if collisions > 0 {
|
2018-07-24 15:49:50 +00:00
|
|
|
scorePenalty := -1 * float64(collisions+1) / float64(iter.desiredCount)
|
2018-07-16 13:47:18 +00:00
|
|
|
option.Scores = append(option.Scores, scorePenalty)
|
2015-08-16 17:32:25 +00:00
|
|
|
iter.ctx.Metrics().ScoreNode(option.Node, "job-anti-affinity", scorePenalty)
|
2018-10-18 02:06:24 +00:00
|
|
|
} else {
|
|
|
|
iter.ctx.Metrics().ScoreNode(option.Node, "job-anti-affinity", 0)
|
2015-08-16 17:32:25 +00:00
|
|
|
}
|
|
|
|
return option
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
func (iter *JobAntiAffinityIterator) Reset() {
|
|
|
|
iter.source.Reset()
|
|
|
|
}
|
2018-01-14 22:47:21 +00:00
|
|
|
|
2018-07-16 13:47:18 +00:00
|
|
|
// NodeReschedulingPenaltyIterator is used to apply a penalty to
|
2018-01-14 22:47:21 +00:00
|
|
|
// a node that had a previous failed allocation for the same job.
|
|
|
|
// This is used when attempting to reschedule a failed alloc
|
2018-07-16 13:47:18 +00:00
|
|
|
type NodeReschedulingPenaltyIterator struct {
|
2018-01-14 22:47:21 +00:00
|
|
|
ctx Context
|
|
|
|
source RankIterator
|
|
|
|
penaltyNodes map[string]struct{}
|
|
|
|
}
|
|
|
|
|
2018-07-16 13:47:18 +00:00
|
|
|
// NewNodeReschedulingPenaltyIterator is used to create a NodeReschedulingPenaltyIterator that
|
|
|
|
// applies the given scoring penalty for placement onto nodes in penaltyNodes
|
|
|
|
func NewNodeReschedulingPenaltyIterator(ctx Context, source RankIterator) *NodeReschedulingPenaltyIterator {
|
|
|
|
iter := &NodeReschedulingPenaltyIterator{
|
|
|
|
ctx: ctx,
|
|
|
|
source: source,
|
2018-01-14 22:47:21 +00:00
|
|
|
}
|
|
|
|
return iter
|
|
|
|
}
|
|
|
|
|
2018-07-16 13:47:18 +00:00
|
|
|
func (iter *NodeReschedulingPenaltyIterator) SetPenaltyNodes(penaltyNodes map[string]struct{}) {
|
2018-01-14 22:47:21 +00:00
|
|
|
iter.penaltyNodes = penaltyNodes
|
|
|
|
}
|
|
|
|
|
2018-07-16 13:47:18 +00:00
|
|
|
func (iter *NodeReschedulingPenaltyIterator) Next() *RankedNode {
|
2020-12-10 15:29:18 +00:00
|
|
|
option := iter.source.Next()
|
|
|
|
if option == nil {
|
|
|
|
return nil
|
|
|
|
}
|
2018-01-14 22:47:21 +00:00
|
|
|
|
2020-12-10 15:29:18 +00:00
|
|
|
_, ok := iter.penaltyNodes[option.Node.ID]
|
|
|
|
if ok {
|
|
|
|
option.Scores = append(option.Scores, -1)
|
|
|
|
iter.ctx.Metrics().ScoreNode(option.Node, "node-reschedule-penalty", -1)
|
|
|
|
} else {
|
|
|
|
iter.ctx.Metrics().ScoreNode(option.Node, "node-reschedule-penalty", 0)
|
2018-01-14 22:47:21 +00:00
|
|
|
}
|
2020-12-10 15:29:18 +00:00
|
|
|
|
|
|
|
return option
|
2018-01-14 22:47:21 +00:00
|
|
|
}
|
|
|
|
|
2018-07-16 13:47:18 +00:00
|
|
|
func (iter *NodeReschedulingPenaltyIterator) Reset() {
|
2018-01-19 14:41:53 +00:00
|
|
|
iter.penaltyNodes = make(map[string]struct{})
|
2018-01-14 22:47:21 +00:00
|
|
|
iter.source.Reset()
|
|
|
|
}
|
2018-07-16 13:47:18 +00:00
|
|
|
|
2018-07-18 20:25:45 +00:00
|
|
|
// NodeAffinityIterator is used to resolve any affinity rules in the job or task group,
|
|
|
|
// and apply a weighted score to nodes if they match.
|
2018-07-16 13:47:18 +00:00
|
|
|
type NodeAffinityIterator struct {
|
|
|
|
ctx Context
|
|
|
|
source RankIterator
|
|
|
|
jobAffinities []*structs.Affinity
|
|
|
|
affinities []*structs.Affinity
|
|
|
|
}
|
|
|
|
|
2018-07-18 20:25:45 +00:00
|
|
|
// NewNodeAffinityIterator is used to create a NodeAffinityIterator that
|
|
|
|
// applies a weighted score according to whether nodes match any
|
|
|
|
// affinities in the job or task group.
|
2018-07-16 13:47:18 +00:00
|
|
|
func NewNodeAffinityIterator(ctx Context, source RankIterator) *NodeAffinityIterator {
|
|
|
|
return &NodeAffinityIterator{
|
|
|
|
ctx: ctx,
|
|
|
|
source: source,
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
func (iter *NodeAffinityIterator) SetJob(job *structs.Job) {
|
2018-07-18 20:25:45 +00:00
|
|
|
iter.jobAffinities = job.Affinities
|
2018-07-16 13:47:18 +00:00
|
|
|
}
|
|
|
|
|
|
|
|
func (iter *NodeAffinityIterator) SetTaskGroup(tg *structs.TaskGroup) {
|
|
|
|
// Merge job affinities
|
|
|
|
if iter.jobAffinities != nil {
|
|
|
|
iter.affinities = append(iter.affinities, iter.jobAffinities...)
|
|
|
|
}
|
|
|
|
|
|
|
|
// Merge task group affinities and task affinities
|
|
|
|
if tg.Affinities != nil {
|
|
|
|
iter.affinities = append(iter.affinities, tg.Affinities...)
|
|
|
|
}
|
|
|
|
for _, task := range tg.Tasks {
|
|
|
|
if task.Affinities != nil {
|
|
|
|
iter.affinities = append(iter.affinities, task.Affinities...)
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
func (iter *NodeAffinityIterator) Reset() {
|
|
|
|
iter.source.Reset()
|
|
|
|
// This method is called between each task group, so only reset the merged list
|
|
|
|
iter.affinities = nil
|
|
|
|
}
|
|
|
|
|
|
|
|
func (iter *NodeAffinityIterator) hasAffinities() bool {
|
|
|
|
return len(iter.affinities) > 0
|
|
|
|
}
|
|
|
|
|
|
|
|
func (iter *NodeAffinityIterator) Next() *RankedNode {
|
|
|
|
option := iter.source.Next()
|
|
|
|
if option == nil {
|
|
|
|
return nil
|
|
|
|
}
|
2018-07-18 20:25:45 +00:00
|
|
|
if !iter.hasAffinities() {
|
2018-10-18 02:06:24 +00:00
|
|
|
iter.ctx.Metrics().ScoreNode(option.Node, "node-affinity", 0)
|
2018-07-16 13:47:18 +00:00
|
|
|
return option
|
|
|
|
}
|
|
|
|
// TODO(preetha): we should calculate normalized weights once and reuse it here
|
|
|
|
sumWeight := 0.0
|
|
|
|
for _, affinity := range iter.affinities {
|
2019-01-30 20:20:38 +00:00
|
|
|
sumWeight += math.Abs(float64(affinity.Weight))
|
2018-07-16 13:47:18 +00:00
|
|
|
}
|
|
|
|
|
|
|
|
totalAffinityScore := 0.0
|
|
|
|
for _, affinity := range iter.affinities {
|
|
|
|
if matchesAffinity(iter.ctx, affinity, option.Node) {
|
2019-01-30 20:20:38 +00:00
|
|
|
totalAffinityScore += float64(affinity.Weight)
|
2018-07-16 13:47:18 +00:00
|
|
|
}
|
|
|
|
}
|
2018-07-24 15:49:50 +00:00
|
|
|
normScore := totalAffinityScore / sumWeight
|
2018-07-16 13:47:18 +00:00
|
|
|
if totalAffinityScore != 0.0 {
|
2018-07-24 15:49:50 +00:00
|
|
|
option.Scores = append(option.Scores, normScore)
|
|
|
|
iter.ctx.Metrics().ScoreNode(option.Node, "node-affinity", normScore)
|
2018-07-16 13:47:18 +00:00
|
|
|
}
|
|
|
|
return option
|
|
|
|
}
|
|
|
|
|
|
|
|
func matchesAffinity(ctx Context, affinity *structs.Affinity, option *structs.Node) bool {
|
2018-07-18 20:25:45 +00:00
|
|
|
//TODO(preetha): Add a step here that filters based on computed node class for potential speedup
|
2018-07-16 13:47:18 +00:00
|
|
|
// Resolve the targets
|
2018-11-13 23:57:59 +00:00
|
|
|
lVal, lOk := resolveTarget(affinity.LTarget, option)
|
|
|
|
rVal, rOk := resolveTarget(affinity.RTarget, option)
|
2018-07-16 13:47:18 +00:00
|
|
|
|
|
|
|
// Check if satisfied
|
2018-11-13 23:57:59 +00:00
|
|
|
return checkAffinity(ctx, affinity.Operand, lVal, rVal, lOk, rOk)
|
2018-07-16 13:47:18 +00:00
|
|
|
}
|
|
|
|
|
2018-07-18 20:25:45 +00:00
|
|
|
// ScoreNormalizationIterator is used to combine scores from various prior
|
|
|
|
// iterators and combine them into one final score. The current implementation
|
|
|
|
// averages the scores together.
|
2018-07-16 13:47:18 +00:00
|
|
|
type ScoreNormalizationIterator struct {
|
|
|
|
ctx Context
|
|
|
|
source RankIterator
|
|
|
|
}
|
|
|
|
|
2018-07-18 20:25:45 +00:00
|
|
|
// NewScoreNormalizationIterator is used to create a ScoreNormalizationIterator that
|
|
|
|
// averages scores from various iterators into a final score.
|
2018-07-16 13:47:18 +00:00
|
|
|
func NewScoreNormalizationIterator(ctx Context, source RankIterator) *ScoreNormalizationIterator {
|
|
|
|
return &ScoreNormalizationIterator{
|
|
|
|
ctx: ctx,
|
|
|
|
source: source}
|
|
|
|
}
|
|
|
|
|
|
|
|
func (iter *ScoreNormalizationIterator) Reset() {
|
|
|
|
iter.source.Reset()
|
|
|
|
}
|
|
|
|
|
|
|
|
func (iter *ScoreNormalizationIterator) Next() *RankedNode {
|
|
|
|
option := iter.source.Next()
|
2018-07-18 20:25:45 +00:00
|
|
|
if option == nil || len(option.Scores) == 0 {
|
|
|
|
return option
|
2018-07-16 13:47:18 +00:00
|
|
|
}
|
|
|
|
numScorers := len(option.Scores)
|
2018-07-18 20:25:45 +00:00
|
|
|
sum := 0.0
|
|
|
|
for _, score := range option.Scores {
|
|
|
|
sum += score
|
2018-07-16 13:47:18 +00:00
|
|
|
}
|
2018-07-18 20:25:45 +00:00
|
|
|
option.FinalScore = sum / float64(numScorers)
|
2018-07-16 13:47:18 +00:00
|
|
|
//TODO(preetha): Turn map in allocmetrics into a heap of topK scores
|
|
|
|
iter.ctx.Metrics().ScoreNode(option.Node, "normalized-score", option.FinalScore)
|
|
|
|
return option
|
|
|
|
}
|
2020-05-27 19:02:01 +00:00
|
|
|
|
|
|
|
// PreemptionScoringIterator is used to score nodes according to the
|
|
|
|
// combination of preemptible allocations in them
|
|
|
|
type PreemptionScoringIterator struct {
|
|
|
|
ctx Context
|
|
|
|
source RankIterator
|
|
|
|
}
|
|
|
|
|
2021-08-30 09:08:12 +00:00
|
|
|
// NewPreemptionScoringIterator is used to create a score based on net
|
|
|
|
// aggregate priority of preempted allocations.
|
2020-05-27 19:02:01 +00:00
|
|
|
func NewPreemptionScoringIterator(ctx Context, source RankIterator) RankIterator {
|
|
|
|
return &PreemptionScoringIterator{
|
|
|
|
ctx: ctx,
|
|
|
|
source: source,
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
func (iter *PreemptionScoringIterator) Reset() {
|
|
|
|
iter.source.Reset()
|
|
|
|
}
|
|
|
|
|
|
|
|
func (iter *PreemptionScoringIterator) Next() *RankedNode {
|
|
|
|
option := iter.source.Next()
|
|
|
|
if option == nil || option.PreemptedAllocs == nil {
|
|
|
|
return option
|
|
|
|
}
|
|
|
|
|
|
|
|
netPriority := netPriority(option.PreemptedAllocs)
|
|
|
|
// preemption score is inversely proportional to netPriority
|
|
|
|
preemptionScore := preemptionScore(netPriority)
|
|
|
|
option.Scores = append(option.Scores, preemptionScore)
|
|
|
|
iter.ctx.Metrics().ScoreNode(option.Node, "preemption", preemptionScore)
|
|
|
|
|
|
|
|
return option
|
|
|
|
}
|
|
|
|
|
|
|
|
// netPriority is a scoring heuristic that represents a combination of two factors.
|
|
|
|
// First factor is the max priority in the set of allocations, with
|
|
|
|
// an additional factor that takes into account the individual priorities of allocations
|
|
|
|
func netPriority(allocs []*structs.Allocation) float64 {
|
|
|
|
sumPriority := 0
|
|
|
|
max := 0.0
|
|
|
|
for _, alloc := range allocs {
|
|
|
|
if float64(alloc.Job.Priority) > max {
|
|
|
|
max = float64(alloc.Job.Priority)
|
|
|
|
}
|
|
|
|
sumPriority += alloc.Job.Priority
|
|
|
|
}
|
|
|
|
// We use the maximum priority across all allocations
|
|
|
|
// with an additional penalty that increases proportional to the
|
|
|
|
// ratio of the sum by max
|
|
|
|
// This ensures that we penalize nodes that have a low max but a high
|
|
|
|
// number of preemptible allocations
|
|
|
|
ret := max + (float64(sumPriority) / max)
|
|
|
|
return ret
|
|
|
|
}
|
|
|
|
|
|
|
|
// preemptionScore is calculated using a logistic function
|
|
|
|
// see https://www.desmos.com/calculator/alaeiuaiey for a visual representation of the curve.
|
|
|
|
// Lower values of netPriority get a score closer to 1 and the inflection point is around 2048
|
|
|
|
// The score is modelled to be between 0 and 1 because its combined with other
|
|
|
|
// scoring factors like bin packing
|
|
|
|
func preemptionScore(netPriority float64) float64 {
|
2020-05-27 19:13:19 +00:00
|
|
|
// These values were chosen such that a net priority of 2048 would get a preemption score of 0.5
|
|
|
|
// rate is the decay parameter of the logistic function used in scoring preemption options
|
|
|
|
const rate = 0.0048
|
|
|
|
|
|
|
|
// origin controls the inflection point of the logistic function used in scoring preemption options
|
|
|
|
const origin = 2048.0
|
|
|
|
|
2020-05-27 19:02:01 +00:00
|
|
|
// This function manifests as an s curve that asympotically moves towards zero for large values of netPriority
|
|
|
|
return 1.0 / (1 + math.Exp(rate*(netPriority-origin)))
|
|
|
|
}
|