db0d95b09c
Added this as a new step in the stack and some unit tests
347 lines
11 KiB
Go
347 lines
11 KiB
Go
package scheduler
|
|
|
|
import (
|
|
"fmt"
|
|
"strconv"
|
|
|
|
memdb "github.com/hashicorp/go-memdb"
|
|
"github.com/hashicorp/nomad/helper"
|
|
"github.com/hashicorp/nomad/nomad/structs"
|
|
)
|
|
|
|
// propertySet is used to track the values used for a particular property.
|
|
type propertySet struct {
|
|
// ctx is used to lookup the plan and state
|
|
ctx Context
|
|
|
|
// jobID is the job we are operating on
|
|
jobID string
|
|
|
|
// namespace is the namespace of the job we are operating on
|
|
namespace string
|
|
|
|
// taskGroup is optionally set if the constraint is for a task group
|
|
taskGroup string
|
|
|
|
// targetAttribute is the attribute this property set is checking
|
|
targetAttribute string
|
|
|
|
// allowedCount is the allowed number of allocations that can have the
|
|
// distinct property
|
|
allowedCount uint64
|
|
|
|
// errorBuilding marks whether there was an error when building the property
|
|
// set
|
|
errorBuilding error
|
|
|
|
// existingValues is a mapping of the values of a property to the number of
|
|
// times the value has been used by pre-existing allocations.
|
|
existingValues map[string]uint64
|
|
|
|
// proposedValues is a mapping of the values of a property to the number of
|
|
// times the value has been used by proposed allocations.
|
|
proposedValues map[string]uint64
|
|
|
|
// clearedValues is a mapping of the values of a property to the number of
|
|
// times the value has been used by proposed stopped allocations.
|
|
clearedValues map[string]uint64
|
|
}
|
|
|
|
// NewPropertySet returns a new property set used to guarantee unique property
|
|
// values for new allocation placements.
|
|
func NewPropertySet(ctx Context, job *structs.Job) *propertySet {
|
|
p := &propertySet{
|
|
ctx: ctx,
|
|
jobID: job.ID,
|
|
namespace: job.Namespace,
|
|
existingValues: make(map[string]uint64),
|
|
}
|
|
|
|
return p
|
|
}
|
|
|
|
// SetJobConstraintAttribute is used to parameterize the property set for a
|
|
// distinct_property constraint set at the job level.
|
|
func (p *propertySet) SetJobConstraint(constraint *structs.Constraint) {
|
|
p.setConstraint(constraint, "")
|
|
}
|
|
|
|
// SetTGConstraint is used to parameterize the property set for a
|
|
// distinct_property constraint set at the task group level. The inputs are the
|
|
// constraint and the task group name.
|
|
func (p *propertySet) SetTGConstraint(constraint *structs.Constraint, taskGroup string) {
|
|
p.setConstraint(constraint, taskGroup)
|
|
}
|
|
|
|
// setConstraint is a shared helper for setting a job or task group constraint.
|
|
func (p *propertySet) setConstraint(constraint *structs.Constraint, taskGroup string) {
|
|
allowedCount := uint64(0)
|
|
// Determine the number of allowed allocations with the property.
|
|
if v := constraint.RTarget; v != "" {
|
|
c, err := strconv.ParseUint(v, 10, 64)
|
|
if err != nil {
|
|
p.errorBuilding = fmt.Errorf("failed to convert RTarget %q to uint64: %v", v, err)
|
|
p.ctx.Logger().Printf("[ERR] scheduler.dynamic-constraint: %v", p.errorBuilding)
|
|
return
|
|
}
|
|
|
|
allowedCount = c
|
|
} else {
|
|
allowedCount = 1
|
|
}
|
|
p.setPropertySetInner(constraint.LTarget, allowedCount, taskGroup)
|
|
}
|
|
|
|
// SetTargetAttribute is used to populate this property set without also storing allowed count
|
|
// This is used when evaluating spread stanzas
|
|
func (p *propertySet) SetTargetAttribute(targetAttribute string, taskGroup string) {
|
|
p.setPropertySetInner(targetAttribute, 0, taskGroup)
|
|
}
|
|
|
|
// setConstraint is a shared helper for setting a job or task group attribute and allowedCount
|
|
// allowedCount can be zero when this is used in evaluating spread stanzas
|
|
func (p *propertySet) setPropertySetInner(targetAttribute string, allowedCount uint64, taskGroup string) {
|
|
// Store that this is for a task group
|
|
if taskGroup != "" {
|
|
p.taskGroup = taskGroup
|
|
}
|
|
|
|
// Store the constraint
|
|
p.targetAttribute = targetAttribute
|
|
|
|
p.allowedCount = allowedCount
|
|
|
|
// Determine the number of existing allocations that are using a property
|
|
// value
|
|
p.populateExisting(targetAttribute)
|
|
|
|
// Populate the proposed when setting the constraint. We do this because
|
|
// when detecting if we can inplace update an allocation we stage an
|
|
// eviction and then select. This means the plan has an eviction before a
|
|
// single select has finished.
|
|
p.PopulateProposed()
|
|
}
|
|
|
|
// populateExisting is a helper shared when setting the constraint to populate
|
|
// the existing values.
|
|
func (p *propertySet) populateExisting(targetAttribute string) {
|
|
// Retrieve all previously placed allocations
|
|
ws := memdb.NewWatchSet()
|
|
allocs, err := p.ctx.State().AllocsByJob(ws, p.namespace, p.jobID, false)
|
|
if err != nil {
|
|
p.errorBuilding = fmt.Errorf("failed to get job's allocations: %v", err)
|
|
p.ctx.Logger().Printf("[ERR] scheduler.dynamic-constraint: %v", p.errorBuilding)
|
|
return
|
|
}
|
|
|
|
// Filter to the correct set of allocs
|
|
allocs = p.filterAllocs(allocs, true)
|
|
|
|
// Get all the nodes that have been used by the allocs
|
|
nodes, err := p.buildNodeMap(allocs)
|
|
if err != nil {
|
|
p.errorBuilding = err
|
|
p.ctx.Logger().Printf("[ERR] scheduler.dynamic-constraint: %v", err)
|
|
return
|
|
}
|
|
|
|
// Build existing properties map
|
|
p.populateProperties(allocs, nodes, p.existingValues)
|
|
}
|
|
|
|
// PopulateProposed populates the proposed values and recomputes any cleared
|
|
// value. It should be called whenever the plan is updated to ensure correct
|
|
// results when checking an option.
|
|
func (p *propertySet) PopulateProposed() {
|
|
|
|
// Reset the proposed properties
|
|
p.proposedValues = make(map[string]uint64)
|
|
p.clearedValues = make(map[string]uint64)
|
|
|
|
// Gather the set of proposed stops.
|
|
var stopping []*structs.Allocation
|
|
for _, updates := range p.ctx.Plan().NodeUpdate {
|
|
stopping = append(stopping, updates...)
|
|
}
|
|
stopping = p.filterAllocs(stopping, false)
|
|
|
|
// Gather the proposed allocations
|
|
var proposed []*structs.Allocation
|
|
for _, pallocs := range p.ctx.Plan().NodeAllocation {
|
|
proposed = append(proposed, pallocs...)
|
|
}
|
|
proposed = p.filterAllocs(proposed, true)
|
|
|
|
// Get the used nodes
|
|
both := make([]*structs.Allocation, 0, len(stopping)+len(proposed))
|
|
both = append(both, stopping...)
|
|
both = append(both, proposed...)
|
|
nodes, err := p.buildNodeMap(both)
|
|
if err != nil {
|
|
p.errorBuilding = err
|
|
p.ctx.Logger().Printf("[ERR] scheduler.dynamic-constraint: %v", err)
|
|
return
|
|
}
|
|
|
|
// Populate the cleared values
|
|
p.populateProperties(stopping, nodes, p.clearedValues)
|
|
|
|
// Populate the proposed values
|
|
p.populateProperties(proposed, nodes, p.proposedValues)
|
|
|
|
// Remove any cleared value that is now being used by the proposed allocs
|
|
for value := range p.proposedValues {
|
|
current, ok := p.clearedValues[value]
|
|
if !ok {
|
|
continue
|
|
} else if current == 0 {
|
|
delete(p.clearedValues, value)
|
|
} else if current > 1 {
|
|
p.clearedValues[value]--
|
|
}
|
|
}
|
|
}
|
|
|
|
// SatisfiesDistinctProperties checks if the option satisfies the
|
|
// distinct_property constraints given the existing placements and proposed
|
|
// placements. If the option does not satisfy the constraints an explanation is
|
|
// given.
|
|
func (p *propertySet) SatisfiesDistinctProperties(option *structs.Node, tg string) (bool, string) {
|
|
nValue, errorMsg, usedCount := p.UsedCount(option, tg)
|
|
if errorMsg != "" {
|
|
return false, errorMsg
|
|
}
|
|
// The property value has been used but within the number of allowed
|
|
// allocations.
|
|
if usedCount < p.allowedCount {
|
|
return true, ""
|
|
}
|
|
|
|
return false, fmt.Sprintf("distinct_property: %s=%s used by %d allocs", p.targetAttribute, nValue, usedCount)
|
|
}
|
|
|
|
// UsedCount returns the number of times the value of the attribute being tracked by this
|
|
// property set is used across current and proposed allocations. It also returns the resolved
|
|
// attribute value for the node, and an error message if it couldn't be resolved correctly
|
|
func (p *propertySet) UsedCount(option *structs.Node, tg string) (string, string, uint64) {
|
|
// Check if there was an error building
|
|
if p.errorBuilding != nil {
|
|
return "", p.errorBuilding.Error(), 0
|
|
}
|
|
|
|
// Get the nodes property value
|
|
nValue, ok := getProperty(option, p.targetAttribute)
|
|
if !ok {
|
|
return nValue, fmt.Sprintf("missing property %q", p.targetAttribute), 0
|
|
}
|
|
|
|
// combine the counts of how many times the property has been used by
|
|
// existing and proposed allocations
|
|
combinedUse := make(map[string]uint64, helper.IntMax(len(p.existingValues), len(p.proposedValues)))
|
|
for _, usedValues := range []map[string]uint64{p.existingValues, p.proposedValues} {
|
|
for propertyValue, usedCount := range usedValues {
|
|
combinedUse[propertyValue] += usedCount
|
|
}
|
|
}
|
|
|
|
// Go through and discount the combined count when the value has been
|
|
// cleared by a proposed stop.
|
|
for propertyValue, clearedCount := range p.clearedValues {
|
|
combined, ok := combinedUse[propertyValue]
|
|
if !ok {
|
|
continue
|
|
}
|
|
|
|
// Don't clear below 0.
|
|
if combined >= clearedCount {
|
|
combinedUse[propertyValue] = combined - clearedCount
|
|
} else {
|
|
combinedUse[propertyValue] = 0
|
|
}
|
|
}
|
|
|
|
usedCount := combinedUse[nValue]
|
|
return nValue, "", usedCount
|
|
}
|
|
|
|
// filterAllocs filters a set of allocations to just be those that are running
|
|
// and if the property set is operation at a task group level, for allocations
|
|
// for that task group
|
|
func (p *propertySet) filterAllocs(allocs []*structs.Allocation, filterTerminal bool) []*structs.Allocation {
|
|
n := len(allocs)
|
|
for i := 0; i < n; i++ {
|
|
remove := false
|
|
if filterTerminal {
|
|
remove = allocs[i].TerminalStatus()
|
|
}
|
|
|
|
// If the constraint is on the task group filter the allocations to just
|
|
// those on the task group
|
|
if p.taskGroup != "" {
|
|
remove = remove || allocs[i].TaskGroup != p.taskGroup
|
|
}
|
|
|
|
if remove {
|
|
allocs[i], allocs[n-1] = allocs[n-1], nil
|
|
i--
|
|
n--
|
|
}
|
|
}
|
|
return allocs[:n]
|
|
}
|
|
|
|
// buildNodeMap takes a list of allocations and returns a map of the nodes used
|
|
// by those allocations
|
|
func (p *propertySet) buildNodeMap(allocs []*structs.Allocation) (map[string]*structs.Node, error) {
|
|
// Get all the nodes that have been used by the allocs
|
|
nodes := make(map[string]*structs.Node)
|
|
ws := memdb.NewWatchSet()
|
|
for _, alloc := range allocs {
|
|
if _, ok := nodes[alloc.NodeID]; ok {
|
|
continue
|
|
}
|
|
|
|
node, err := p.ctx.State().NodeByID(ws, alloc.NodeID)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("failed to lookup node ID %q: %v", alloc.NodeID, err)
|
|
}
|
|
|
|
nodes[alloc.NodeID] = node
|
|
}
|
|
|
|
return nodes, nil
|
|
}
|
|
|
|
// populateProperties goes through all allocations and builds up the used
|
|
// properties from the nodes storing the results in the passed properties map.
|
|
func (p *propertySet) populateProperties(allocs []*structs.Allocation, nodes map[string]*structs.Node,
|
|
properties map[string]uint64) {
|
|
|
|
for _, alloc := range allocs {
|
|
nProperty, ok := getProperty(nodes[alloc.NodeID], p.targetAttribute)
|
|
if !ok {
|
|
continue
|
|
}
|
|
|
|
properties[nProperty]++
|
|
}
|
|
}
|
|
|
|
// getProperty is used to lookup the property value on the node
|
|
func getProperty(n *structs.Node, property string) (string, bool) {
|
|
if n == nil || property == "" {
|
|
return "", false
|
|
}
|
|
|
|
val, ok := resolveTarget(property, n)
|
|
if !ok {
|
|
return "", false
|
|
}
|
|
nodeValue, ok := val.(string)
|
|
if !ok {
|
|
return "", false
|
|
}
|
|
|
|
return nodeValue, true
|
|
}
|