refactor preemption code to use method recievers and setters for common fields

This commit is contained in:
Preetha Appan 2018-10-29 12:34:03 -05:00
parent 1a5421f5d7
commit 047af5141e
No known key found for this signature in database
GPG Key ID: 9F7C19990A50EAFC
2 changed files with 270 additions and 210 deletions

View File

@ -12,89 +12,100 @@ import (
// number of allocations being preempted exceeds max_parallel value in the job's migrate stanza // number of allocations being preempted exceeds max_parallel value in the job's migrate stanza
const maxParallelPenalty = 50.0 const maxParallelPenalty = 50.0
// basicResourceDistance computes a distance using a coordinate system. It compares resource fields like CPU/Memory and Disk. type groupedAllocs struct {
// Values emitted are in the range [0, maxFloat] priority int
func basicResourceDistance(resourceAsk *structs.ComparableResources, resourceUsed *structs.ComparableResources) float64 { allocs []*structs.Allocation
memoryCoord, cpuCoord, diskMBCoord := 0.0, 0.0, 0.0
if resourceAsk.Flattened.Memory.MemoryMB > 0 {
memoryCoord = (float64(resourceAsk.Flattened.Memory.MemoryMB) - float64(resourceUsed.Flattened.Memory.MemoryMB)) / float64(resourceAsk.Flattened.Memory.MemoryMB)
}
if resourceAsk.Flattened.Cpu.CpuShares > 0 {
cpuCoord = (float64(resourceAsk.Flattened.Cpu.CpuShares) - float64(resourceUsed.Flattened.Cpu.CpuShares)) / float64(resourceAsk.Flattened.Cpu.CpuShares)
}
if resourceAsk.Shared.DiskMB > 0 {
diskMBCoord = (float64(resourceAsk.Shared.DiskMB) - float64(resourceUsed.Shared.DiskMB)) / float64(resourceAsk.Shared.DiskMB)
}
originDist := math.Sqrt(
math.Pow(memoryCoord, 2) +
math.Pow(cpuCoord, 2) +
math.Pow(diskMBCoord, 2))
return originDist
} }
// networkResourceDistance returns a distance based only on network megabits // Preemptor is used to track existing allocations
func networkResourceDistance(resourceUsed *structs.NetworkResource, resourceNeeded *structs.NetworkResource) float64 { // and find suitable allocations to preempt
networkCoord := math.MaxFloat64 type Preemptor struct {
if resourceUsed != nil && resourceNeeded != nil { // currentPreemptions is a map computed when SetPreemptions is called
networkCoord = float64(resourceNeeded.MBits-resourceUsed.MBits) / float64(resourceNeeded.MBits) // it tracks the number of preempted allocations per job/taskgroup
currentPreemptions map[structs.NamespacedID]map[string]int
// jobPriority is the priority of the job being preempted
jobPriority int
// nodeRemainingResources tracks remaining available resources on the node
nodeRemainingResources *structs.ComparableResources
// currentAllocs is the candidate set used to find preemptible allocations
currentAllocs []*structs.Allocation
} }
originDist := math.Sqrt( func NewPreemptor(jobPriority int) *Preemptor {
math.Pow(networkCoord, 2)) return &Preemptor{
return originDist currentPreemptions: make(map[structs.NamespacedID]map[string]int),
jobPriority: jobPriority,
}
} }
// getPreemptionScoreForTaskGroupResources is used to calculate a score (lower is better) based on the distance between // SetNode sets the node
// the needed resource and requirements. A penalty is added when the choice already has some existing func (p *Preemptor) SetNode(node *structs.Node) {
// allocations in the plan that are being preempted.
func getPreemptionScoreForTaskGroupResources(resourceAsk *structs.ComparableResources, resourceUsed *structs.ComparableResources, maxParallel int, numPreemptedAllocs int) float64 {
maxParallelScorePenalty := 0.0
if maxParallel > 0 && numPreemptedAllocs >= maxParallel {
maxParallelScorePenalty = float64((numPreemptedAllocs+1)-maxParallel) * maxParallelPenalty
}
return basicResourceDistance(resourceAsk, resourceUsed) + maxParallelScorePenalty
}
// getPreemptionScoreForNetwork is similar to getPreemptionScoreForTaskGroupResources
// but only uses network Mbits to calculate a preemption score
func getPreemptionScoreForNetwork(resourceUsed *structs.NetworkResource, resourceNeeded *structs.NetworkResource, maxParallel int, numPreemptedAllocs int) float64 {
if resourceUsed == nil || resourceNeeded == nil {
return math.MaxFloat64
}
maxParallelScorePenalty := 0.0
if maxParallel > 0 && numPreemptedAllocs >= maxParallel {
maxParallelScorePenalty = float64((numPreemptedAllocs+1)-maxParallel) * maxParallelPenalty
}
return networkResourceDistance(resourceUsed, resourceNeeded) + maxParallelScorePenalty
}
// findPreemptibleAllocationsForTaskGroup computes a list of allocations to preempt to accommodate
// the resources asked for. Only allocs with a job priority < 10 of jobPriority are considered
// This method is meant only for finding preemptible allocations based on CPU/Memory/Disk
func findPreemptibleAllocationsForTaskGroup(jobPriority int, current []*structs.Allocation, resourceAsk *structs.AllocatedResources, node *structs.Node, currentPreemptions []*structs.Allocation) []*structs.Allocation {
resourcesNeeded := resourceAsk.Comparable()
allocsByPriority := filterAndGroupPreemptibleAllocs(jobPriority, current)
var bestAllocs []*structs.Allocation
allRequirementsMet := false
var preemptedResources *structs.ComparableResources
//TODO(preetha): should add some debug logging
nodeRemainingResources := node.ComparableResources() nodeRemainingResources := node.ComparableResources()
// Initialize nodeRemainingResources with the remaining resources
// after accounting for reserved resources and all allocations
// Subtract the reserved resources of the node // Subtract the reserved resources of the node
if node.ComparableReservedResources() != nil { if node.ComparableReservedResources() != nil {
nodeRemainingResources.Subtract(node.ComparableReservedResources()) nodeRemainingResources.Subtract(node.ComparableReservedResources())
} }
p.nodeRemainingResources = nodeRemainingResources
}
// SetCandidates initializes the candidate set from which preemptions are chosen
func (p *Preemptor) SetCandidates(allocs []*structs.Allocation) {
p.currentAllocs = allocs
}
// SetPreemptions initializes a map tracking existing counts of preempted allocations
// per job/task group. This is used while scoring preemption options
func (p *Preemptor) SetPreemptions(allocs []*structs.Allocation) {
// Clear out existing values
for k := range p.currentPreemptions {
delete(p.currentPreemptions, k)
}
// Initialize counts
for _, alloc := range allocs {
id := structs.NamespacedID{alloc.JobID, alloc.Namespace}
countMap, ok := p.currentPreemptions[id]
if !ok {
countMap = make(map[string]int)
}
c := countMap[alloc.TaskGroup]
countMap[alloc.TaskGroup] = c + 1
p.currentPreemptions[id] = countMap
}
}
// getNumPreemptions counts the number of other allocations being preempted that match the job and task group of
// the alloc under consideration. This is used as a scoring factor to minimize too many allocs of the same job being preempted at once
func (p *Preemptor) getNumPreemptions(alloc *structs.Allocation) int {
numCurrentPreemptionsForJob := 0
countMap := p.currentPreemptions[structs.NamespacedID{alloc.JobID, alloc.Namespace}]
if countMap != nil {
numCurrentPreemptionsForJob = countMap[alloc.TaskGroup]
}
return numCurrentPreemptionsForJob
}
// preemptForTaskGroup computes a list of allocations to preempt to accommodate
// the resources asked for. Only allocs with a job priority < 10 of jobPriority are considered
// This method is meant only for finding preemptible allocations based on CPU/Memory/Disk
func (p *Preemptor) preemptForTaskGroup(resourceAsk *structs.AllocatedResources) []*structs.Allocation {
resourcesNeeded := resourceAsk.Comparable()
// Subtract current allocations // Subtract current allocations
for _, alloc := range current { for _, alloc := range p.currentAllocs {
nodeRemainingResources.Subtract(alloc.ComparableResources()) p.nodeRemainingResources.Subtract(alloc.ComparableResources())
} }
// Group candidates by priority, filter out ineligible allocs
allocsByPriority := filterAndGroupPreemptibleAllocs(p.jobPriority, p.currentAllocs)
var bestAllocs []*structs.Allocation
allRequirementsMet := false
var preemptedResources *structs.ComparableResources
// Iterate over allocations grouped by priority to find preemptible allocations // Iterate over allocations grouped by priority to find preemptible allocations
for _, allocGrp := range allocsByPriority { for _, allocGrp := range allocsByPriority {
for len(allocGrp.allocs) > 0 && !allRequirementsMet { for len(allocGrp.allocs) > 0 && !allRequirementsMet {
@ -102,13 +113,13 @@ func findPreemptibleAllocationsForTaskGroup(jobPriority int, current []*structs.
bestDistance := math.MaxFloat64 bestDistance := math.MaxFloat64
// find the alloc with the closest distance // find the alloc with the closest distance
for index, alloc := range allocGrp.allocs { for index, alloc := range allocGrp.allocs {
currentPreemptionCount := computeCurrentPreemptions(alloc, currentPreemptions) currentPreemptionCount := p.getNumPreemptions(alloc)
maxParallel := 0 maxParallel := 0
tg := alloc.Job.LookupTaskGroup(alloc.TaskGroup) tg := alloc.Job.LookupTaskGroup(alloc.TaskGroup)
if tg != nil && tg.Migrate != nil { if tg != nil && tg.Migrate != nil {
maxParallel = tg.Migrate.MaxParallel maxParallel = tg.Migrate.MaxParallel
} }
distance := getPreemptionScoreForTaskGroupResources(resourcesNeeded, alloc.ComparableResources(), maxParallel, currentPreemptionCount) distance := scoreForTaskGroup(resourcesNeeded, alloc.ComparableResources(), maxParallel, currentPreemptionCount)
if distance < bestDistance { if distance < bestDistance {
bestDistance = distance bestDistance = distance
closestAllocIndex = index closestAllocIndex = index
@ -122,7 +133,7 @@ func findPreemptibleAllocationsForTaskGroup(jobPriority int, current []*structs.
preemptedResources.Add(closestAlloc.ComparableResources()) preemptedResources.Add(closestAlloc.ComparableResources())
} }
availableResources := preemptedResources.Copy() availableResources := preemptedResources.Copy()
availableResources.Add(nodeRemainingResources) availableResources.Add(p.nodeRemainingResources)
// This step needs the original resources asked for as the second arg, can't use the running total // This step needs the original resources asked for as the second arg, can't use the running total
allRequirementsMet, _ = availableResources.Superset(resourceAsk.Comparable()) allRequirementsMet, _ = availableResources.Superset(resourceAsk.Comparable())
@ -156,141 +167,18 @@ func findPreemptibleAllocationsForTaskGroup(jobPriority int, current []*structs.
return distance1 > distance2 return distance1 > distance2
}) })
filteredBestAllocs := eliminateSuperSetAllocationsForTaskGroup(bestAllocs, nodeRemainingResources, resourcesNeeded) filteredBestAllocs := filterSupersetTaskGroup(bestAllocs, p.nodeRemainingResources, resourcesNeeded)
return filteredBestAllocs return filteredBestAllocs
} }
// computeCurrentPreemptions counts the number of other allocations being preempted that match the job and task group of // preemptForNetwork tries to find allocations to preempt to meet network resources.
// the alloc under consideration. This is used as a scoring factor to minimize too many allocs of the same job being preempted at once
func computeCurrentPreemptions(currentAlloc *structs.Allocation, currentPreemptions []*structs.Allocation) int {
numCurrentPreemptionsForJob := 0
for _, alloc := range currentPreemptions {
if alloc.JobID == currentAlloc.JobID && alloc.Namespace == currentAlloc.Namespace && alloc.TaskGroup == currentAlloc.TaskGroup {
numCurrentPreemptionsForJob++
}
}
return numCurrentPreemptionsForJob
}
// meetsNetworkRequirements checks if the first resource meets or exceeds the second resource's network MBits requirements
func meetsNetworkRequirements(firstMbits int, secondMbits int) bool {
if firstMbits == 0 || secondMbits == 0 {
return false
}
return firstMbits >= secondMbits
}
type groupedAllocs struct {
priority int
allocs []*structs.Allocation
}
// filterAndGroupPreemptibleAllocs groups allocations by priority after removing any from jobs of
// a higher priority than jobPriority
func filterAndGroupPreemptibleAllocs(jobPriority int, current []*structs.Allocation) []*groupedAllocs {
allocsByPriority := make(map[int][]*structs.Allocation)
for _, alloc := range current {
// Why is alloc.Job even nil though?
if alloc.Job == nil {
continue
}
// Skip allocs whose priority is within a delta of 10
// This also skips any allocs of the current job
// for which we are attempting preemption
if jobPriority-alloc.Job.Priority < 10 {
continue
}
grpAllocs, ok := allocsByPriority[alloc.Job.Priority]
if !ok {
grpAllocs = make([]*structs.Allocation, 0)
}
grpAllocs = append(grpAllocs, alloc)
allocsByPriority[alloc.Job.Priority] = grpAllocs
}
var groupedSortedAllocs []*groupedAllocs
for priority, allocs := range allocsByPriority {
groupedSortedAllocs = append(groupedSortedAllocs, &groupedAllocs{
priority: priority,
allocs: allocs})
}
// Sort by priority
sort.Slice(groupedSortedAllocs, func(i, j int) bool {
return groupedSortedAllocs[i].priority < groupedSortedAllocs[j].priority
})
return groupedSortedAllocs
}
// eliminateSuperSetAllocationsForTaskGroup is used as a final step to remove
// any allocations that meet a superset of requirements from the set of allocations
// to preempt
func eliminateSuperSetAllocationsForTaskGroup(bestAllocs []*structs.Allocation,
nodeRemainingResources *structs.ComparableResources,
resourceAsk *structs.ComparableResources) []*structs.Allocation {
var preemptedResources *structs.ComparableResources
var filteredBestAllocs []*structs.Allocation
// Do another pass to eliminate allocations that are a superset of other allocations
// in the preemption set
for _, alloc := range bestAllocs {
if preemptedResources == nil {
preemptedResources = alloc.ComparableResources().Copy()
} else {
preemptedResources.Add(alloc.ComparableResources().Copy())
}
filteredBestAllocs = append(filteredBestAllocs, alloc)
availableResources := preemptedResources.Copy()
availableResources.Add(nodeRemainingResources)
requirementsMet, _ := availableResources.Superset(resourceAsk)
if requirementsMet {
break
}
}
return filteredBestAllocs
}
// eliminateSuperSetAllocationsForNetwork is similar to eliminateSuperSetAllocationsForTaskGroup but only
// considers network Mbits
func eliminateSuperSetAllocationsForNetwork(bestAllocs []*structs.Allocation, networkResourcesAsk *structs.NetworkResource,
nodeRemainingResources *structs.ComparableResources) []*structs.Allocation {
var preemptedResources *structs.ComparableResources
var filteredBestAllocs []*structs.Allocation
// Do another pass to eliminate allocations that are a superset of other allocations
// in the preemption set
for _, alloc := range bestAllocs {
if preemptedResources == nil {
preemptedResources = alloc.ComparableResources().Copy()
} else {
preemptedResources.Add(alloc.ComparableResources().Copy())
}
filteredBestAllocs = append(filteredBestAllocs, alloc)
availableResources := preemptedResources.Copy()
availableResources.Add(nodeRemainingResources)
requirementsMet := meetsNetworkRequirements(availableResources.Flattened.Networks[0].MBits, networkResourcesAsk.MBits)
if requirementsMet {
break
}
}
return filteredBestAllocs
}
// preemptForNetworkResourceAsk tries to find allocations to preempt to meet network resources.
// This is called once per task when assigning a network to the task. While finding allocations // This is called once per task when assigning a network to the task. While finding allocations
// to preempt, this only considers allocations that share the same network device // to preempt, this only considers allocations that share the same network device
func preemptForNetworkResourceAsk(jobPriority int, currentAllocs []*structs.Allocation, networkResourceAsk *structs.NetworkResource, func (p *Preemptor) preemptForNetwork(networkResourceAsk *structs.NetworkResource, netIdx *structs.NetworkIndex) []*structs.Allocation {
netIdx *structs.NetworkIndex, currentPreemptions []*structs.Allocation) []*structs.Allocation {
// Early return if there are no current allocs // Early return if there are no current allocs
if len(currentAllocs) == 0 { if len(p.currentAllocs) == 0 {
return nil return nil
} }
@ -304,12 +192,12 @@ func preemptForNetworkResourceAsk(jobPriority int, currentAllocs []*structs.Allo
// //
// This step also filters out high priority allocations and allocations // This step also filters out high priority allocations and allocations
// that are not using any network resources // that are not using any network resources
for _, alloc := range currentAllocs { for _, alloc := range p.currentAllocs {
if alloc.Job == nil { if alloc.Job == nil {
continue continue
} }
if jobPriority-alloc.Job.Priority < 10 { if p.jobPriority-alloc.Job.Priority < 10 {
continue continue
} }
networks := alloc.CompatibleNetworkResources() networks := alloc.CompatibleNetworkResources()
@ -379,7 +267,7 @@ func preemptForNetworkResourceAsk(jobPriority int, currentAllocs []*structs.Allo
} }
// Split by priority // Split by priority
allocsByPriority := filterAndGroupPreemptibleAllocs(jobPriority, currentAllocs) allocsByPriority := filterAndGroupPreemptibleAllocs(p.jobPriority, currentAllocs)
for _, allocsGrp := range allocsByPriority { for _, allocsGrp := range allocsByPriority {
allocs := allocsGrp.allocs allocs := allocsGrp.allocs
@ -388,7 +276,7 @@ func preemptForNetworkResourceAsk(jobPriority int, currentAllocs []*structs.Allo
// as well as penalty for preempting an allocation // as well as penalty for preempting an allocation
// whose task group already has existing preemptions // whose task group already has existing preemptions
sort.Slice(allocs, func(i, j int) bool { sort.Slice(allocs, func(i, j int) bool {
return distanceComparatorForNetwork(allocs, currentPreemptions, networkResourceAsk, i, j) return p.distanceComparatorForNetwork(allocs, networkResourceAsk, i, j)
}) })
for _, alloc := range allocs { for _, alloc := range allocs {
@ -452,16 +340,177 @@ func preemptForNetworkResourceAsk(jobPriority int, currentAllocs []*structs.Allo
}) })
// Do a final pass to eliminate any superset allocations // Do a final pass to eliminate any superset allocations
filteredBestAllocs := eliminateSuperSetAllocationsForNetwork(allocsToPreempt, networkResourceAsk, nodeRemainingResources) filteredBestAllocs := filterSupersetNetwork(allocsToPreempt, networkResourceAsk, nodeRemainingResources)
return filteredBestAllocs
}
// basicResourceDistance computes a distance using a coordinate system. It compares resource fields like CPU/Memory and Disk.
// Values emitted are in the range [0, maxFloat]
func basicResourceDistance(resourceAsk *structs.ComparableResources, resourceUsed *structs.ComparableResources) float64 {
memoryCoord, cpuCoord, diskMBCoord := 0.0, 0.0, 0.0
if resourceAsk.Flattened.Memory.MemoryMB > 0 {
memoryCoord = (float64(resourceAsk.Flattened.Memory.MemoryMB) - float64(resourceUsed.Flattened.Memory.MemoryMB)) / float64(resourceAsk.Flattened.Memory.MemoryMB)
}
if resourceAsk.Flattened.Cpu.CpuShares > 0 {
cpuCoord = (float64(resourceAsk.Flattened.Cpu.CpuShares) - float64(resourceUsed.Flattened.Cpu.CpuShares)) / float64(resourceAsk.Flattened.Cpu.CpuShares)
}
if resourceAsk.Shared.DiskMB > 0 {
diskMBCoord = (float64(resourceAsk.Shared.DiskMB) - float64(resourceUsed.Shared.DiskMB)) / float64(resourceAsk.Shared.DiskMB)
}
originDist := math.Sqrt(
math.Pow(memoryCoord, 2) +
math.Pow(cpuCoord, 2) +
math.Pow(diskMBCoord, 2))
return originDist
}
// networkResourceDistance returns a distance based only on network megabits
func networkResourceDistance(resourceUsed *structs.NetworkResource, resourceNeeded *structs.NetworkResource) float64 {
networkCoord := math.MaxFloat64
if resourceUsed != nil && resourceNeeded != nil {
networkCoord = float64(resourceNeeded.MBits-resourceUsed.MBits) / float64(resourceNeeded.MBits)
}
originDist := math.Sqrt(
math.Pow(networkCoord, 2))
return originDist
}
// scoreForTaskGroup is used to calculate a score (lower is better) based on the distance between
// the needed resource and requirements. A penalty is added when the choice already has some existing
// allocations in the plan that are being preempted.
func scoreForTaskGroup(resourceAsk *structs.ComparableResources, resourceUsed *structs.ComparableResources, maxParallel int, numPreemptedAllocs int) float64 {
maxParallelScorePenalty := 0.0
if maxParallel > 0 && numPreemptedAllocs >= maxParallel {
maxParallelScorePenalty = float64((numPreemptedAllocs+1)-maxParallel) * maxParallelPenalty
}
return basicResourceDistance(resourceAsk, resourceUsed) + maxParallelScorePenalty
}
// scoreForNetwork is similar to scoreForTaskGroup
// but only uses network Mbits to calculate a preemption score
func scoreForNetwork(resourceUsed *structs.NetworkResource, resourceNeeded *structs.NetworkResource, maxParallel int, numPreemptedAllocs int) float64 {
if resourceUsed == nil || resourceNeeded == nil {
return math.MaxFloat64
}
maxParallelScorePenalty := 0.0
if maxParallel > 0 && numPreemptedAllocs >= maxParallel {
maxParallelScorePenalty = float64((numPreemptedAllocs+1)-maxParallel) * maxParallelPenalty
}
return networkResourceDistance(resourceUsed, resourceNeeded) + maxParallelScorePenalty
}
// meetsNetworkRequirements checks if the first resource meets or exceeds the second resource's network MBits requirements
func meetsNetworkRequirements(firstMbits int, secondMbits int) bool {
if firstMbits == 0 || secondMbits == 0 {
return false
}
return firstMbits >= secondMbits
}
// filterAndGroupPreemptibleAllocs groups allocations by priority after removing any from jobs of
// a higher priority than jobPriority
func filterAndGroupPreemptibleAllocs(jobPriority int, current []*structs.Allocation) []*groupedAllocs {
allocsByPriority := make(map[int][]*structs.Allocation)
for _, alloc := range current {
// Why is alloc.Job even nil though?
if alloc.Job == nil {
continue
}
// Skip allocs whose priority is within a delta of 10
// This also skips any allocs of the current job
// for which we are attempting preemption
if jobPriority-alloc.Job.Priority < 10 {
continue
}
grpAllocs, ok := allocsByPriority[alloc.Job.Priority]
if !ok {
grpAllocs = make([]*structs.Allocation, 0)
}
grpAllocs = append(grpAllocs, alloc)
allocsByPriority[alloc.Job.Priority] = grpAllocs
}
var groupedSortedAllocs []*groupedAllocs
for priority, allocs := range allocsByPriority {
groupedSortedAllocs = append(groupedSortedAllocs, &groupedAllocs{
priority: priority,
allocs: allocs})
}
// Sort by priority
sort.Slice(groupedSortedAllocs, func(i, j int) bool {
return groupedSortedAllocs[i].priority < groupedSortedAllocs[j].priority
})
return groupedSortedAllocs
}
// filterSupersetTaskGroup is used as a final step to remove
// any allocations that meet a superset of requirements from
// the set of allocations to preempt
func filterSupersetTaskGroup(bestAllocs []*structs.Allocation,
nodeRemainingResources *structs.ComparableResources,
resourceAsk *structs.ComparableResources) []*structs.Allocation {
var preemptedResources *structs.ComparableResources
var filteredBestAllocs []*structs.Allocation
// Do another pass to eliminate allocations that are a superset of other allocations
// in the preemption set
for _, alloc := range bestAllocs {
if preemptedResources == nil {
preemptedResources = alloc.ComparableResources().Copy()
} else {
preemptedResources.Add(alloc.ComparableResources().Copy())
}
filteredBestAllocs = append(filteredBestAllocs, alloc)
availableResources := preemptedResources.Copy()
availableResources.Add(nodeRemainingResources)
requirementsMet, _ := availableResources.Superset(resourceAsk)
if requirementsMet {
break
}
}
return filteredBestAllocs
}
// filterSupersetNetwork is similar to filterSupersetTaskGroup but only
// considers network Mbits
func filterSupersetNetwork(bestAllocs []*structs.Allocation, networkResourcesAsk *structs.NetworkResource,
nodeRemainingResources *structs.ComparableResources) []*structs.Allocation {
var preemptedResources *structs.ComparableResources
var filteredBestAllocs []*structs.Allocation
// Do another pass to eliminate allocations that are a superset of other allocations
// in the preemption set
for _, alloc := range bestAllocs {
if preemptedResources == nil {
preemptedResources = alloc.ComparableResources().Copy()
} else {
preemptedResources.Add(alloc.ComparableResources().Copy())
}
filteredBestAllocs = append(filteredBestAllocs, alloc)
availableResources := preemptedResources.Copy()
availableResources.Add(nodeRemainingResources)
requirementsMet := meetsNetworkRequirements(availableResources.Flattened.Networks[0].MBits, networkResourcesAsk.MBits)
if requirementsMet {
break
}
}
return filteredBestAllocs return filteredBestAllocs
} }
// distanceComparatorForNetwork is used as the sorting function when finding allocations to preempt. It uses // distanceComparatorForNetwork is used as the sorting function when finding allocations to preempt. It uses
// both a coordinayte distance function based on Mbits needed, and a penalty if the allocation under consideration // both a coordinayte distance function based on Mbits needed, and a penalty if the allocation under consideration
// belongs to a job that already has more preempted allocations // belongs to a job that already has more preempted allocations
func distanceComparatorForNetwork(allocs []*structs.Allocation, currentPreemptions []*structs.Allocation, networkResourceAsk *structs.NetworkResource, i int, j int) bool { func (p *Preemptor) distanceComparatorForNetwork(allocs []*structs.Allocation, networkResourceAsk *structs.NetworkResource, i int, j int) bool {
firstAlloc := allocs[i] firstAlloc := allocs[i]
currentPreemptionCount1 := computeCurrentPreemptions(firstAlloc, currentPreemptions) currentPreemptionCount1 := p.getNumPreemptions(firstAlloc)
// Look up configured maxParallel value for these allocation's task groups // Look up configured maxParallel value for these allocation's task groups
var maxParallel1, maxParallel2 int var maxParallel1, maxParallel2 int
tg1 := allocs[i].Job.LookupTaskGroup(firstAlloc.TaskGroup) tg1 := allocs[i].Job.LookupTaskGroup(firstAlloc.TaskGroup)
@ -475,10 +524,10 @@ func distanceComparatorForNetwork(allocs []*structs.Allocation, currentPreemptio
firstAllocNetResourceUsed = firstAllocNetworks[0] firstAllocNetResourceUsed = firstAllocNetworks[0]
} }
distance1 := getPreemptionScoreForNetwork(firstAllocNetResourceUsed, networkResourceAsk, maxParallel1, currentPreemptionCount1) distance1 := scoreForNetwork(firstAllocNetResourceUsed, networkResourceAsk, maxParallel1, currentPreemptionCount1)
secondAlloc := allocs[j] secondAlloc := allocs[j]
currentPreemptionCount2 := computeCurrentPreemptions(secondAlloc, currentPreemptions) currentPreemptionCount2 := p.getNumPreemptions(secondAlloc)
tg2 := secondAlloc.Job.LookupTaskGroup(secondAlloc.TaskGroup) tg2 := secondAlloc.Job.LookupTaskGroup(secondAlloc.TaskGroup)
if tg2 != nil && tg2.Migrate != nil { if tg2 != nil && tg2.Migrate != nil {
maxParallel2 = tg2.Migrate.MaxParallel maxParallel2 = tg2.Migrate.MaxParallel
@ -490,6 +539,6 @@ func distanceComparatorForNetwork(allocs []*structs.Allocation, currentPreemptio
secondAllocNetResourceUsed = secondAllocNetworks[0] secondAllocNetResourceUsed = secondAllocNetworks[0]
} }
distance2 := getPreemptionScoreForNetwork(secondAllocNetResourceUsed, networkResourceAsk, maxParallel2, currentPreemptionCount2) distance2 := scoreForNetwork(secondAllocNetResourceUsed, networkResourceAsk, maxParallel2, currentPreemptionCount2)
return distance1 < distance2 return distance1 < distance2
} }

View File

@ -202,12 +202,17 @@ OUTER:
var allocsToPreempt []*structs.Allocation var allocsToPreempt []*structs.Allocation
// Initialize preemptor with node
preemptor := NewPreemptor(iter.priority)
preemptor.SetNode(option.Node)
// Count the number of existing preemptions // Count the number of existing preemptions
allPreemptions := iter.ctx.Plan().NodePreemptions allPreemptions := iter.ctx.Plan().NodePreemptions
var currentPreemptions []*structs.Allocation var currentPreemptions []*structs.Allocation
for _, allocs := range allPreemptions { for _, allocs := range allPreemptions {
currentPreemptions = append(currentPreemptions, allocs...) currentPreemptions = append(currentPreemptions, allocs...)
} }
preemptor.SetPreemptions(currentPreemptions)
for _, task := range iter.taskGroup.Tasks { for _, task := range iter.taskGroup.Tasks {
// Allocate the resources // Allocate the resources
@ -234,7 +239,9 @@ OUTER:
} }
// Look for preemptible allocations to satisfy the network resource for this task // Look for preemptible allocations to satisfy the network resource for this task
netPreemptions := preemptForNetworkResourceAsk(iter.priority, proposed, ask, netIdx, currentPreemptions) preemptor.SetCandidates(proposed)
netPreemptions := preemptor.preemptForNetwork(ask, netIdx)
if netPreemptions == nil { if netPreemptions == nil {
iter.ctx.Metrics().ExhaustedNode(option.Node, iter.ctx.Metrics().ExhaustedNode(option.Node,
fmt.Sprintf("unable to meet network resource %v after preemption", ask)) fmt.Sprintf("unable to meet network resource %v after preemption", ask))
@ -292,12 +299,16 @@ OUTER:
// If eviction is enabled and the node doesn't fit the alloc, check if // If eviction is enabled and the node doesn't fit the alloc, check if
// any allocs can be preempted // any allocs can be preempted
preemptForTaskGroup := findPreemptibleAllocationsForTaskGroup(iter.priority, current, total, option.Node, currentPreemptions)
allocsToPreempt = append(allocsToPreempt, preemptForTaskGroup...) // Initialize preemptor with candidate set
preemptor.SetCandidates(current)
preemptedAllocs := preemptor.preemptForTaskGroup(total)
allocsToPreempt = append(allocsToPreempt, preemptedAllocs...)
// If we were unable to find preempted allocs to meet these requirements // If we were unable to find preempted allocs to meet these requirements
// mark as exhausted and continue // mark as exhausted and continue
if len(preemptForTaskGroup) == 0 { if len(preemptedAllocs) == 0 {
iter.ctx.Metrics().ExhaustedNode(option.Node, dim) iter.ctx.Metrics().ExhaustedNode(option.Node, dim)
continue continue
} }