5fc63ace0b
When calculating the score in the `SpreadIterator`, the score boost is proportional to the difference between the current and desired count. But when there are implicit spread targets, the current count is the sum of the possible implicit targets, which results in incorrect scoring unless there's only one implicit target. This changeset updates the `propertySet` struct to accept a set of explicit target values so it can detect when a property value falls into the implicit set and should be combined with other implicit values. Fixes: #11823
376 lines
12 KiB
Go
376 lines
12 KiB
Go
// Copyright (c) HashiCorp, Inc.
|
|
// SPDX-License-Identifier: MPL-2.0
|
|
|
|
package scheduler
|
|
|
|
import (
|
|
"fmt"
|
|
"strconv"
|
|
|
|
log "github.com/hashicorp/go-hclog"
|
|
memdb "github.com/hashicorp/go-memdb"
|
|
"github.com/hashicorp/go-set"
|
|
|
|
"github.com/hashicorp/nomad/helper"
|
|
"github.com/hashicorp/nomad/nomad/structs"
|
|
)
|
|
|
|
// propertySet is used to track the values used for a particular property.
|
|
type propertySet struct {
|
|
// ctx is used to lookup the plan and state
|
|
ctx Context
|
|
|
|
// logger is the logger for the property set
|
|
logger log.Logger
|
|
|
|
// jobID is the job we are operating on
|
|
jobID string
|
|
|
|
// namespace is the namespace of the job we are operating on
|
|
namespace string
|
|
|
|
// taskGroup is optionally set if the constraint is for a task group
|
|
taskGroup string
|
|
|
|
// targetAttribute is the attribute this property set is checking
|
|
targetAttribute string
|
|
|
|
// targetValues are the set of attribute values that are explicitly expected,
|
|
// so we can combine the count of values that belong to any implicit targets.
|
|
targetValues *set.Set[string]
|
|
|
|
// allowedCount is the allowed number of allocations that can have the
|
|
// distinct property
|
|
allowedCount uint64
|
|
|
|
// errorBuilding marks whether there was an error when building the property
|
|
// set
|
|
errorBuilding error
|
|
|
|
// existingValues is a mapping of the values of a property to the number of
|
|
// times the value has been used by pre-existing allocations.
|
|
existingValues map[string]uint64
|
|
|
|
// proposedValues is a mapping of the values of a property to the number of
|
|
// times the value has been used by proposed allocations.
|
|
proposedValues map[string]uint64
|
|
|
|
// clearedValues is a mapping of the values of a property to the number of
|
|
// times the value has been used by proposed stopped allocations.
|
|
clearedValues map[string]uint64
|
|
}
|
|
|
|
// NewPropertySet returns a new property set used to guarantee unique property
|
|
// values for new allocation placements.
|
|
func NewPropertySet(ctx Context, job *structs.Job) *propertySet {
|
|
p := &propertySet{
|
|
ctx: ctx,
|
|
jobID: job.ID,
|
|
namespace: job.Namespace,
|
|
existingValues: make(map[string]uint64),
|
|
targetValues: set.From([]string{}),
|
|
logger: ctx.Logger().Named("property_set"),
|
|
}
|
|
|
|
return p
|
|
}
|
|
|
|
// SetJobConstraint is used to parameterize the property set for a
|
|
// distinct_property constraint set at the job level.
|
|
func (p *propertySet) SetJobConstraint(constraint *structs.Constraint) {
|
|
p.setConstraint(constraint, "")
|
|
}
|
|
|
|
// SetTGConstraint is used to parameterize the property set for a
|
|
// distinct_property constraint set at the task group level. The inputs are the
|
|
// constraint and the task group name.
|
|
func (p *propertySet) SetTGConstraint(constraint *structs.Constraint, taskGroup string) {
|
|
p.setConstraint(constraint, taskGroup)
|
|
}
|
|
|
|
// setConstraint is a shared helper for setting a job or task group constraint.
|
|
func (p *propertySet) setConstraint(constraint *structs.Constraint, taskGroup string) {
|
|
var allowedCount uint64
|
|
// Determine the number of allowed allocations with the property.
|
|
if v := constraint.RTarget; v != "" {
|
|
c, err := strconv.ParseUint(v, 10, 64)
|
|
if err != nil {
|
|
p.errorBuilding = fmt.Errorf("failed to convert RTarget %q to uint64: %v", v, err)
|
|
p.logger.Error("failed to convert RTarget to uint64", "RTarget", v, "error", err)
|
|
return
|
|
}
|
|
|
|
allowedCount = c
|
|
} else {
|
|
allowedCount = 1
|
|
}
|
|
p.setTargetAttributeWithCount(constraint.LTarget, allowedCount, taskGroup)
|
|
}
|
|
|
|
// SetTargetAttribute is used to populate this property set without also storing allowed count
|
|
// This is used when evaluating spread blocks
|
|
func (p *propertySet) SetTargetAttribute(targetAttribute string, taskGroup string) {
|
|
p.setTargetAttributeWithCount(targetAttribute, 0, taskGroup)
|
|
}
|
|
|
|
// setTargetAttributeWithCount is a shared helper for setting a job or task group attribute and allowedCount
|
|
// allowedCount can be zero when this is used in evaluating spread blocks
|
|
func (p *propertySet) setTargetAttributeWithCount(targetAttribute string, allowedCount uint64, taskGroup string) {
|
|
// Store that this is for a task group
|
|
if taskGroup != "" {
|
|
p.taskGroup = taskGroup
|
|
}
|
|
|
|
// Store the constraint
|
|
p.targetAttribute = targetAttribute
|
|
|
|
p.allowedCount = allowedCount
|
|
|
|
// Determine the number of existing allocations that are using a property
|
|
// value
|
|
p.populateExisting()
|
|
|
|
// Populate the proposed when setting the constraint. We do this because
|
|
// when detecting if we can inplace update an allocation we stage an
|
|
// eviction and then select. This means the plan has an eviction before a
|
|
// single select has finished.
|
|
p.PopulateProposed()
|
|
}
|
|
|
|
func (p *propertySet) SetTargetValues(values []string) {
|
|
p.targetValues = set.From(values)
|
|
}
|
|
|
|
// populateExisting is a helper shared when setting the constraint to populate
|
|
// the existing values.
|
|
func (p *propertySet) populateExisting() {
|
|
// Retrieve all previously placed allocations
|
|
ws := memdb.NewWatchSet()
|
|
allocs, err := p.ctx.State().AllocsByJob(ws, p.namespace, p.jobID, false)
|
|
if err != nil {
|
|
p.errorBuilding = fmt.Errorf("failed to get job's allocations: %v", err)
|
|
p.logger.Error("failed to get job's allocations", "job", p.jobID, "namespace", p.namespace, "error", err)
|
|
return
|
|
}
|
|
|
|
// Filter to the correct set of allocs
|
|
allocs = p.filterAllocs(allocs, true)
|
|
|
|
// Get all the nodes that have been used by the allocs
|
|
nodes, err := p.buildNodeMap(allocs)
|
|
if err != nil {
|
|
p.errorBuilding = err
|
|
p.logger.Error("failed to build node map", "error", err)
|
|
return
|
|
}
|
|
|
|
// Build existing properties map
|
|
p.populateProperties(allocs, nodes, p.existingValues)
|
|
}
|
|
|
|
// PopulateProposed populates the proposed values and recomputes any cleared
|
|
// value. It should be called whenever the plan is updated to ensure correct
|
|
// results when checking an option.
|
|
func (p *propertySet) PopulateProposed() {
|
|
|
|
// Reset the proposed properties
|
|
p.proposedValues = make(map[string]uint64)
|
|
p.clearedValues = make(map[string]uint64)
|
|
|
|
// Gather the set of proposed stops.
|
|
var stopping []*structs.Allocation
|
|
for _, updates := range p.ctx.Plan().NodeUpdate {
|
|
stopping = append(stopping, updates...)
|
|
}
|
|
stopping = p.filterAllocs(stopping, false)
|
|
|
|
// Gather the proposed allocations
|
|
var proposed []*structs.Allocation
|
|
for _, pallocs := range p.ctx.Plan().NodeAllocation {
|
|
proposed = append(proposed, pallocs...)
|
|
}
|
|
proposed = p.filterAllocs(proposed, true)
|
|
|
|
// Get the used nodes
|
|
both := make([]*structs.Allocation, 0, len(stopping)+len(proposed))
|
|
both = append(both, stopping...)
|
|
both = append(both, proposed...)
|
|
nodes, err := p.buildNodeMap(both)
|
|
if err != nil {
|
|
p.errorBuilding = err
|
|
p.logger.Error("failed to build node map", "error", err)
|
|
return
|
|
}
|
|
|
|
// Populate the cleared values
|
|
p.populateProperties(stopping, nodes, p.clearedValues)
|
|
|
|
// Populate the proposed values
|
|
p.populateProperties(proposed, nodes, p.proposedValues)
|
|
|
|
// Remove any cleared value that is now being used by the proposed allocs
|
|
for value := range p.proposedValues {
|
|
current, ok := p.clearedValues[value]
|
|
if !ok {
|
|
continue
|
|
} else if current == 0 {
|
|
delete(p.clearedValues, value)
|
|
} else if current > 1 {
|
|
p.clearedValues[value]--
|
|
}
|
|
}
|
|
}
|
|
|
|
// SatisfiesDistinctProperties checks if the option satisfies the
|
|
// distinct_property constraints given the existing placements and proposed
|
|
// placements. If the option does not satisfy the constraints an explanation is
|
|
// given.
|
|
func (p *propertySet) SatisfiesDistinctProperties(option *structs.Node, tg string) (bool, string) {
|
|
nValue, errorMsg, usedCount := p.UsedCount(option, tg)
|
|
if errorMsg != "" {
|
|
return false, errorMsg
|
|
}
|
|
// The property value has been used but within the number of allowed
|
|
// allocations.
|
|
if usedCount < p.allowedCount {
|
|
return true, ""
|
|
}
|
|
|
|
return false, fmt.Sprintf("distinct_property: %s=%s used by %d allocs", p.targetAttribute, nValue, usedCount)
|
|
}
|
|
|
|
// UsedCount returns the number of times the value of the attribute being tracked by this
|
|
// property set is used across current and proposed allocations. It also returns the resolved
|
|
// attribute value for the node, and an error message if it couldn't be resolved correctly
|
|
func (p *propertySet) UsedCount(option *structs.Node, _ string) (string, string, uint64) {
|
|
// Check if there was an error building
|
|
if p.errorBuilding != nil {
|
|
return "", p.errorBuilding.Error(), 0
|
|
}
|
|
|
|
// Get the nodes property value
|
|
nValue, ok := getProperty(option, p.targetAttribute)
|
|
targetPropertyValue := p.targetedPropertyValue(nValue)
|
|
if !ok {
|
|
return nValue, fmt.Sprintf("missing property %q", p.targetAttribute), 0
|
|
}
|
|
combinedUse := p.GetCombinedUseMap()
|
|
usedCount := combinedUse[targetPropertyValue]
|
|
return targetPropertyValue, "", usedCount
|
|
}
|
|
|
|
// GetCombinedUseMap counts how many times the property has been used by
|
|
// existing and proposed allocations. It also takes into account any stopped
|
|
// allocations
|
|
func (p *propertySet) GetCombinedUseMap() map[string]uint64 {
|
|
combinedUse := make(map[string]uint64, helper.Max(len(p.existingValues), len(p.proposedValues)))
|
|
for _, usedValues := range []map[string]uint64{p.existingValues, p.proposedValues} {
|
|
for propertyValue, usedCount := range usedValues {
|
|
targetPropertyValue := p.targetedPropertyValue(propertyValue)
|
|
combinedUse[targetPropertyValue] += usedCount
|
|
}
|
|
}
|
|
|
|
// Go through and discount the combined count when the value has been
|
|
// cleared by a proposed stop.
|
|
for propertyValue, clearedCount := range p.clearedValues {
|
|
targetPropertyValue := p.targetedPropertyValue(propertyValue)
|
|
combined, ok := combinedUse[targetPropertyValue]
|
|
if !ok {
|
|
continue
|
|
}
|
|
|
|
// Don't clear below 0.
|
|
if combined >= clearedCount {
|
|
combinedUse[targetPropertyValue] = combined - clearedCount
|
|
} else {
|
|
combinedUse[targetPropertyValue] = 0
|
|
}
|
|
}
|
|
return combinedUse
|
|
}
|
|
|
|
// filterAllocs filters a set of allocations to just be those that are running
|
|
// and if the property set is operation at a task group level, for allocations
|
|
// for that task group
|
|
func (p *propertySet) filterAllocs(allocs []*structs.Allocation, filterTerminal bool) []*structs.Allocation {
|
|
n := len(allocs)
|
|
for i := 0; i < n; i++ {
|
|
remove := false
|
|
if filterTerminal {
|
|
remove = allocs[i].TerminalStatus()
|
|
}
|
|
|
|
// If the constraint is on the task group filter the allocations to just
|
|
// those on the task group
|
|
if p.taskGroup != "" {
|
|
remove = remove || allocs[i].TaskGroup != p.taskGroup
|
|
}
|
|
|
|
if remove {
|
|
allocs[i], allocs[n-1] = allocs[n-1], nil
|
|
i--
|
|
n--
|
|
}
|
|
}
|
|
return allocs[:n]
|
|
}
|
|
|
|
// buildNodeMap takes a list of allocations and returns a map of the nodes used
|
|
// by those allocations
|
|
func (p *propertySet) buildNodeMap(allocs []*structs.Allocation) (map[string]*structs.Node, error) {
|
|
// Get all the nodes that have been used by the allocs
|
|
nodes := make(map[string]*structs.Node)
|
|
ws := memdb.NewWatchSet()
|
|
for _, alloc := range allocs {
|
|
if _, ok := nodes[alloc.NodeID]; ok {
|
|
continue
|
|
}
|
|
|
|
node, err := p.ctx.State().NodeByID(ws, alloc.NodeID)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("failed to lookup node ID %q: %v", alloc.NodeID, err)
|
|
}
|
|
|
|
nodes[alloc.NodeID] = node
|
|
}
|
|
|
|
return nodes, nil
|
|
}
|
|
|
|
// populateProperties goes through all allocations and builds up the used
|
|
// properties from the nodes storing the results in the passed properties map.
|
|
func (p *propertySet) populateProperties(allocs []*structs.Allocation, nodes map[string]*structs.Node,
|
|
properties map[string]uint64) {
|
|
|
|
for _, alloc := range allocs {
|
|
nProperty, ok := getProperty(nodes[alloc.NodeID], p.targetAttribute)
|
|
if !ok {
|
|
continue
|
|
}
|
|
|
|
targetPropertyValue := p.targetedPropertyValue(nProperty)
|
|
properties[targetPropertyValue]++
|
|
}
|
|
}
|
|
|
|
// getProperty is used to lookup the property value on the node
|
|
func getProperty(n *structs.Node, property string) (string, bool) {
|
|
if n == nil || property == "" {
|
|
return "", false
|
|
}
|
|
|
|
return resolveTarget(property, n)
|
|
}
|
|
|
|
// targetedPropertyValue transforms the property value to combine all implicit
|
|
// target values into a single wildcard placeholder so that we get accurate
|
|
// counts when we compare an explicitly-defined target against multiple implicit
|
|
// targets.
|
|
func (p *propertySet) targetedPropertyValue(propertyValue string) string {
|
|
if p.targetValues.Empty() || p.targetValues.Contains(propertyValue) {
|
|
return propertyValue
|
|
}
|
|
return "*"
|
|
}
|