2018-07-17 22:25:38 +00:00
|
|
|
package scheduler
|
|
|
|
|
|
|
|
import (
|
|
|
|
"github.com/hashicorp/nomad/nomad/structs"
|
|
|
|
)
|
|
|
|
|
2018-07-26 15:32:06 +00:00
|
|
|
const (
|
|
|
|
// ImplicitTarget is used to represent any remaining attribute values
|
|
|
|
// when target percentages don't add up to 100
|
|
|
|
ImplicitTarget = "*"
|
2018-07-26 22:14:27 +00:00
|
|
|
|
|
|
|
// evenSpreadBoost is a positive boost used when the job has
|
|
|
|
// even spread over an attribute. Any nodes whose attribute value is
|
|
|
|
// equal to the minimum count over all possible attributes gets boosted
|
|
|
|
evenSpreadBoost = 0.01
|
|
|
|
|
|
|
|
// evenSpreadPenality is a penalty used when the job has
|
|
|
|
// even spread over an attribute. Any nodes whose attribute value is
|
|
|
|
// greater than the minimum count over all possible attributes gets this penalty.
|
|
|
|
// This is to ensure that other nodes are preferred when one of the values
|
|
|
|
// has a larger number of allocations
|
|
|
|
evenSpreadPenalty = -0.5
|
2018-07-26 15:32:06 +00:00
|
|
|
)
|
|
|
|
|
2018-07-17 22:25:38 +00:00
|
|
|
// SpreadIterator is used to spread allocations across a specified attribute
|
|
|
|
// according to preset weights
|
|
|
|
type SpreadIterator struct {
|
2018-07-26 00:28:36 +00:00
|
|
|
ctx Context
|
|
|
|
source RankIterator
|
|
|
|
job *structs.Job
|
|
|
|
tg *structs.TaskGroup
|
|
|
|
jobSpreads []*structs.Spread
|
|
|
|
// tgSpreadInfo is a map per task group with precomputed
|
|
|
|
// values for desired counts and weight
|
|
|
|
tgSpreadInfo map[string]spreadAttributeMap
|
|
|
|
// sumSpreadWeights tracks the total weight across all spread
|
|
|
|
// stanzas
|
|
|
|
sumSpreadWeights int
|
|
|
|
hasSpread bool
|
|
|
|
// groupProperySets is a memoized map from task group to property sets.
|
|
|
|
// existing allocs are computed once, and allocs from the plan are updated
|
|
|
|
// when Reset is called
|
2018-07-17 22:25:38 +00:00
|
|
|
groupPropertySets map[string][]*propertySet
|
|
|
|
}
|
|
|
|
|
|
|
|
type spreadAttributeMap map[string]*spreadInfo
|
|
|
|
|
|
|
|
type spreadInfo struct {
|
|
|
|
weight int
|
|
|
|
desiredCounts map[string]float64
|
|
|
|
}
|
|
|
|
|
|
|
|
func NewSpreadIterator(ctx Context, source RankIterator) *SpreadIterator {
|
|
|
|
iter := &SpreadIterator{
|
|
|
|
ctx: ctx,
|
|
|
|
source: source,
|
|
|
|
groupPropertySets: make(map[string][]*propertySet),
|
|
|
|
tgSpreadInfo: make(map[string]spreadAttributeMap),
|
|
|
|
}
|
|
|
|
return iter
|
|
|
|
}
|
|
|
|
|
|
|
|
func (iter *SpreadIterator) Reset() {
|
|
|
|
iter.source.Reset()
|
|
|
|
for _, sets := range iter.groupPropertySets {
|
|
|
|
for _, ps := range sets {
|
|
|
|
ps.PopulateProposed()
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
func (iter *SpreadIterator) SetJob(job *structs.Job) {
|
|
|
|
iter.job = job
|
|
|
|
if job.Spreads != nil {
|
|
|
|
iter.jobSpreads = job.Spreads
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
func (iter *SpreadIterator) SetTaskGroup(tg *structs.TaskGroup) {
|
|
|
|
iter.tg = tg
|
|
|
|
|
|
|
|
// Build the property set at the taskgroup level
|
|
|
|
if _, ok := iter.groupPropertySets[tg.Name]; !ok {
|
|
|
|
// First add property sets that are at the job level for this task group
|
|
|
|
for _, spread := range iter.jobSpreads {
|
|
|
|
pset := NewPropertySet(iter.ctx, iter.job)
|
|
|
|
pset.SetTargetAttribute(spread.Attribute, tg.Name)
|
|
|
|
iter.groupPropertySets[tg.Name] = append(iter.groupPropertySets[tg.Name], pset)
|
|
|
|
}
|
|
|
|
|
|
|
|
// Include property sets at the task group level
|
|
|
|
for _, spread := range tg.Spreads {
|
|
|
|
pset := NewPropertySet(iter.ctx, iter.job)
|
|
|
|
pset.SetTargetAttribute(spread.Attribute, tg.Name)
|
|
|
|
iter.groupPropertySets[tg.Name] = append(iter.groupPropertySets[tg.Name], pset)
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2018-07-26 00:28:36 +00:00
|
|
|
// Check if there are any spreads configured
|
2018-07-17 22:25:38 +00:00
|
|
|
iter.hasSpread = len(iter.groupPropertySets[tg.Name]) != 0
|
|
|
|
|
|
|
|
// Build tgSpreadInfo at the task group level
|
|
|
|
if _, ok := iter.tgSpreadInfo[tg.Name]; !ok {
|
|
|
|
iter.computeSpreadInfo(tg)
|
|
|
|
}
|
|
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
func (iter *SpreadIterator) hasSpreads() bool {
|
|
|
|
return iter.hasSpread
|
|
|
|
}
|
|
|
|
|
|
|
|
func (iter *SpreadIterator) Next() *RankedNode {
|
|
|
|
for {
|
|
|
|
option := iter.source.Next()
|
|
|
|
|
|
|
|
// Hot path if there is nothing to check
|
|
|
|
if option == nil || !iter.hasSpreads() {
|
|
|
|
return option
|
|
|
|
}
|
|
|
|
|
|
|
|
tgName := iter.tg.Name
|
|
|
|
propertySets := iter.groupPropertySets[tgName]
|
|
|
|
// Iterate over each spread attribute's property set and add a weighted score
|
|
|
|
totalSpreadScore := 0.0
|
|
|
|
for _, pset := range propertySets {
|
|
|
|
nValue, errorMsg, usedCount := pset.UsedCount(option.Node, tgName)
|
2018-07-18 15:53:37 +00:00
|
|
|
// Skip if there was errors in resolving this attribute to compute used counts
|
2018-07-17 22:25:38 +00:00
|
|
|
if errorMsg != "" {
|
2018-07-26 15:32:06 +00:00
|
|
|
iter.ctx.Logger().Printf("[WARN] sched: error building spread attributes for task group %v:%v", tgName, errorMsg)
|
2018-07-17 22:25:38 +00:00
|
|
|
}
|
|
|
|
spreadAttributeMap := iter.tgSpreadInfo[tgName]
|
|
|
|
spreadDetails := spreadAttributeMap[pset.targetAttribute]
|
2018-07-26 22:14:27 +00:00
|
|
|
|
|
|
|
if len(spreadDetails.desiredCounts) == 0 {
|
|
|
|
// When desired counts map is empty the user didn't specify any targets
|
|
|
|
// Treat this as a special case
|
|
|
|
scoreBoost := evenSpreadScoreBoost(pset, option.Node)
|
|
|
|
totalSpreadScore += scoreBoost
|
|
|
|
} else {
|
|
|
|
// Get the desired count
|
|
|
|
desiredCount, ok := spreadDetails.desiredCounts[nValue]
|
2018-07-26 15:32:06 +00:00
|
|
|
if !ok {
|
2018-07-26 22:14:27 +00:00
|
|
|
// See if there is an implicit target
|
|
|
|
desiredCount, ok = spreadDetails.desiredCounts[ImplicitTarget]
|
|
|
|
if !ok {
|
|
|
|
// The desired count for this attribute is zero if it gets here
|
|
|
|
// don't boost the score
|
|
|
|
continue
|
|
|
|
}
|
|
|
|
}
|
|
|
|
if float64(usedCount) < desiredCount {
|
|
|
|
// Calculate the relative weight of this specific spread attribute
|
|
|
|
spreadWeight := float64(spreadDetails.weight) / float64(iter.sumSpreadWeights)
|
|
|
|
// Score Boost is proportional the difference between current and desired count
|
|
|
|
// It is multiplied with the spread weight to account for cases where the job has
|
|
|
|
// more than one spread attribute
|
|
|
|
scoreBoost := ((desiredCount - float64(usedCount)) / desiredCount) * spreadWeight
|
|
|
|
totalSpreadScore += scoreBoost
|
2018-07-26 15:32:06 +00:00
|
|
|
}
|
2018-07-17 22:25:38 +00:00
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
if totalSpreadScore != 0.0 {
|
|
|
|
option.Scores = append(option.Scores, totalSpreadScore)
|
|
|
|
iter.ctx.Metrics().ScoreNode(option.Node, "allocation-spread", totalSpreadScore)
|
|
|
|
}
|
|
|
|
return option
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2018-07-26 22:14:27 +00:00
|
|
|
// evenSpreadScoreBoost is a scoring helper that calculates the score
|
|
|
|
// for the option when even spread is desired (all attribute values get equal preference)
|
|
|
|
func evenSpreadScoreBoost(pset *propertySet, option *structs.Node) float64 {
|
|
|
|
combinedUseMap := pset.GetCombinedUseMap()
|
|
|
|
if len(combinedUseMap) == 0 {
|
|
|
|
// Nothing placed yet, so return 0 as the score
|
|
|
|
return 0.0
|
|
|
|
}
|
|
|
|
// Get the nodes property value
|
|
|
|
nValue, ok := getProperty(option, pset.targetAttribute)
|
|
|
|
currentAttributeCount := uint64(0)
|
|
|
|
if ok {
|
|
|
|
currentAttributeCount = combinedUseMap[nValue]
|
|
|
|
}
|
|
|
|
minCount := uint64(0)
|
|
|
|
maxCount := uint64(0)
|
|
|
|
for _, value := range combinedUseMap {
|
|
|
|
if minCount == 0 || value < minCount {
|
|
|
|
minCount = value
|
|
|
|
}
|
|
|
|
if maxCount == 0 || value > maxCount {
|
|
|
|
maxCount = value
|
|
|
|
}
|
|
|
|
}
|
|
|
|
if currentAttributeCount < minCount {
|
|
|
|
// Small positive boost for attributes with min count
|
|
|
|
return evenSpreadBoost
|
|
|
|
} else if currentAttributeCount > minCount {
|
|
|
|
// Negative boost if attribute count is greater than minimum
|
|
|
|
// This is so that other nodes will get a preference over this one
|
|
|
|
return evenSpreadPenalty
|
|
|
|
} else {
|
|
|
|
// When min and max are same the current distribution is even
|
|
|
|
// so we penalize
|
|
|
|
if minCount == maxCount {
|
|
|
|
return evenSpreadPenalty
|
|
|
|
} else {
|
|
|
|
return evenSpreadBoost
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2018-07-17 22:25:38 +00:00
|
|
|
// computeSpreadInfo computes and stores percentages and total values
|
|
|
|
// from all spreads that apply to a specific task group
|
|
|
|
func (iter *SpreadIterator) computeSpreadInfo(tg *structs.TaskGroup) {
|
|
|
|
spreadInfos := make(spreadAttributeMap, len(tg.Spreads))
|
|
|
|
totalCount := tg.Count
|
2018-07-18 15:53:37 +00:00
|
|
|
|
|
|
|
// Always combine any spread stanzas defined at the job level here
|
|
|
|
combinedSpreads := make([]*structs.Spread, 0, len(tg.Spreads)+len(iter.jobSpreads))
|
|
|
|
combinedSpreads = append(combinedSpreads, tg.Spreads...)
|
|
|
|
combinedSpreads = append(combinedSpreads, iter.jobSpreads...)
|
|
|
|
for _, spread := range combinedSpreads {
|
2018-07-26 15:32:06 +00:00
|
|
|
si := &spreadInfo{weight: spread.Weight, desiredCounts: make(map[string]float64)}
|
|
|
|
sumDesiredCounts := 0.0
|
2018-07-17 22:25:38 +00:00
|
|
|
for _, st := range spread.SpreadTarget {
|
2018-07-26 15:32:06 +00:00
|
|
|
desiredCount := (float64(st.Percent) / float64(100)) * float64(totalCount)
|
2018-07-17 22:25:38 +00:00
|
|
|
si.desiredCounts[st.Value] = desiredCount
|
2018-07-26 15:32:06 +00:00
|
|
|
sumDesiredCounts += desiredCount
|
|
|
|
}
|
2018-07-26 22:14:27 +00:00
|
|
|
// Account for remaining count only if there is any spread targets
|
|
|
|
if sumDesiredCounts > 0 && sumDesiredCounts < float64(totalCount) {
|
2018-07-26 15:32:06 +00:00
|
|
|
remainingCount := float64(totalCount) - sumDesiredCounts
|
|
|
|
si.desiredCounts[ImplicitTarget] = remainingCount
|
2018-07-17 22:25:38 +00:00
|
|
|
}
|
|
|
|
spreadInfos[spread.Attribute] = si
|
|
|
|
iter.sumSpreadWeights += spread.Weight
|
|
|
|
}
|
|
|
|
iter.tgSpreadInfo[tg.Name] = spreadInfos
|
|
|
|
}
|