Implement preemption for system jobs.

This commit implements an allocation selection algorithm for finding
allocations to preempt. It currently special cases network resource asks
from others (cpu/memory/disk/iops).
This commit is contained in:
Preetha Appan 2018-09-21 16:05:00 -05:00
parent d11064d6ba
commit cc295b90de
No known key found for this signature in database
GPG Key ID: 9F7C19990A50EAFC
12 changed files with 1794 additions and 30 deletions

View File

@ -813,7 +813,7 @@ func (n *nomadFSM) applyPlanResults(buf []byte, index uint64) interface{} {
n.logger.Error("ApplyPlan failed", "error", err)
return err
}
n.handleUpsertedEvals(req.PreemptionEvals)
return nil
}

View File

@ -10,6 +10,7 @@ import (
"github.com/armon/go-metrics"
"github.com/hashicorp/go-multierror"
"github.com/hashicorp/nomad/helper/uuid"
"github.com/hashicorp/nomad/nomad/state"
"github.com/hashicorp/nomad/nomad/structs"
"github.com/hashicorp/raft"
@ -163,6 +164,7 @@ func (p *planner) applyPlan(plan *structs.Plan, result *structs.PlanResult, snap
Deployment: result.Deployment,
DeploymentUpdates: result.DeploymentUpdates,
EvalID: plan.EvalID,
NodePreemptions: make([]*structs.Allocation, 0, len(result.NodePreemptions)),
}
for _, updateList := range result.NodeUpdate {
req.Alloc = append(req.Alloc, updateList...)
@ -171,6 +173,10 @@ func (p *planner) applyPlan(plan *structs.Plan, result *structs.PlanResult, snap
req.Alloc = append(req.Alloc, allocList...)
}
for _, preemptions := range result.NodePreemptions {
req.NodePreemptions = append(req.NodePreemptions, preemptions...)
}
// Set the time the alloc was applied for the first time. This can be used
// to approximate the scheduling time.
now := time.Now().UTC().UnixNano()
@ -181,6 +187,41 @@ func (p *planner) applyPlan(plan *structs.Plan, result *structs.PlanResult, snap
alloc.ModifyTime = now
}
// Set create and modify time for preempted allocs if any
// Also gather jobids to create follow up evals
preemptedJobIDs := make(map[structs.NamespacedID]struct{})
for _, alloc := range req.NodePreemptions {
if alloc.CreateTime == 0 {
alloc.CreateTime = now
}
alloc.ModifyTime = now
id := structs.NamespacedID{Namespace: alloc.Namespace, ID: alloc.JobID}
_, ok := preemptedJobIDs[id]
if !ok {
preemptedJobIDs[id] = struct{}{}
}
}
var evals []*structs.Evaluation
for preemptedJobID, _ := range preemptedJobIDs {
job, _ := p.State().JobByID(nil, preemptedJobID.Namespace, preemptedJobID.ID) // TODO Fix me
if job != nil {
//TODO(preetha): This eval is missing class eligibility related fields
// need to figure out how to set them per job
eval := &structs.Evaluation{
ID: uuid.Generate(),
Namespace: job.Namespace,
TriggeredBy: structs.EvalTriggerPreemption,
JobID: job.ID,
Type: job.Type,
Priority: job.Priority,
Status: structs.EvalStatusPending,
}
evals = append(evals, eval)
}
}
req.PreemptionEvals = evals
// Dispatch the Raft transaction
future, err := p.raftApplyFuture(structs.ApplyPlanResultsRequestType, &req)
if err != nil {
@ -259,6 +300,7 @@ func evaluatePlanPlacements(pool *EvaluatePool, snap *state.StateSnapshot, plan
NodeAllocation: make(map[string][]*structs.Allocation),
Deployment: plan.Deployment.Copy(),
DeploymentUpdates: plan.DeploymentUpdates,
NodePreemptions: make(map[string][]*structs.Allocation),
}
// Collect all the nodeIDs
@ -304,6 +346,7 @@ func evaluatePlanPlacements(pool *EvaluatePool, snap *state.StateSnapshot, plan
result.NodeAllocation = nil
result.DeploymentUpdates = nil
result.Deployment = nil
result.NodePreemptions = nil
return true
}
@ -318,6 +361,25 @@ func evaluatePlanPlacements(pool *EvaluatePool, snap *state.StateSnapshot, plan
if nodeAlloc := plan.NodeAllocation[nodeID]; len(nodeAlloc) > 0 {
result.NodeAllocation[nodeID] = nodeAlloc
}
if nodePreemptions := plan.NodePreemptions[nodeID]; nodePreemptions != nil {
var filteredNodePreemptions []*structs.Allocation
// Do a pass over preempted allocs in the plan to check
// whether the alloc is already in a terminal state
for _, preemptedAlloc := range nodePreemptions {
alloc, err := snap.AllocByID(nil, preemptedAlloc.ID)
if err != nil {
mErr.Errors = append(mErr.Errors, err)
}
if alloc != nil {
if !alloc.TerminalStatus() {
filteredNodePreemptions = append(filteredNodePreemptions, preemptedAlloc)
}
}
}
result.NodePreemptions[nodeID] = filteredNodePreemptions
}
return
}
@ -461,6 +523,14 @@ func evaluateNodePlan(snap *state.StateSnapshot, plan *structs.Plan, nodeID stri
if update := plan.NodeUpdate[nodeID]; len(update) > 0 {
remove = append(remove, update...)
}
// Remove any preempted allocs
if preempted := plan.NodePreemptions[nodeID]; len(preempted) > 0 {
for _, allocs := range preempted {
remove = append(remove, allocs)
}
}
if updated := plan.NodeAllocation[nodeID]; len(updated) > 0 {
for _, alloc := range updated {
remove = append(remove, alloc)

View File

@ -218,6 +218,43 @@ func (s *StateStore) UpsertPlanResults(index uint64, results *structs.ApplyPlanR
}
}
// TODO(preetha) do a pass to group by jobid
// Prepare preempted allocs in the plan results for update
for _, preemptedAlloc := range results.NodePreemptions {
// Look for existing alloc
existing, err := txn.First("allocs", "id", preemptedAlloc.ID)
if err != nil {
return fmt.Errorf("alloc lookup failed: %v", err)
}
// Nothing to do if this does not exist
if existing == nil {
return nil
}
exist := existing.(*structs.Allocation)
// Copy everything from the existing allocation
copyAlloc := exist.Copy()
// Only update the fields set by the scheduler
copyAlloc.DesiredStatus = preemptedAlloc.DesiredStatus
copyAlloc.PreemptedByAllocation = preemptedAlloc.PreemptedByAllocation
copyAlloc.DesiredDescription = preemptedAlloc.DesiredDescription
// Upsert the preempted allocations
if err := s.upsertAllocsImpl(index, []*structs.Allocation{copyAlloc}, txn); err != nil {
return err
}
}
// Upsert followup evals for allocs that were preempted
for _, eval := range results.PreemptionEvals {
if err := s.nestedUpsertEval(txn, index, eval); err != nil {
return err
}
}
txn.Commit()
return nil
}

View File

@ -7796,7 +7796,7 @@ const (
EvalTriggerMaxPlans = "max-plan-attempts"
EvalTriggerRetryFailedAlloc = "alloc-failure"
EvalTriggerQueuedAllocs = "queued-allocs"
EvalTriggerPreemption = "preempted"
EvalTriggerPreemption = "preemption"
)
const (
@ -8177,21 +8177,23 @@ func (p *Plan) AppendUpdate(alloc *Allocation, desiredStatus, desiredDesc, clien
p.NodeUpdate[node] = append(existing, newAlloc)
}
// AppendPreemptedAlloc is used to append an allocation that's being preempted to the plan.
// To minimize the size of the plan, this only sets a minimal set of fields in the allocation
func (p *Plan) AppendPreemptedAlloc(alloc *Allocation, desiredStatus, preemptingAllocID string) {
newAlloc := new(Allocation)
*newAlloc = *alloc
// Normalize the job
newAlloc.Job = nil
// Strip the resources as it can be rebuilt.
newAlloc.Resources = nil
newAlloc := &Allocation{}
newAlloc.ID = alloc.ID
newAlloc.JobID = alloc.JobID
newAlloc.Namespace = alloc.Namespace
newAlloc.DesiredStatus = desiredStatus
newAlloc.PreemptedByAllocation = preemptingAllocID
desiredDesc := fmt.Sprintf("Preempted by alloc ID %v", preemptingAllocID)
newAlloc.DesiredDescription = desiredDesc
// TaskResources are needed by the plan applier to check if allocations fit
// after removing preempted allocations
newAlloc.TaskResources = alloc.TaskResources
node := alloc.NodeID
// Append this alloc to slice for this node
existing := p.NodePreemptions[node]

View File

@ -16,6 +16,7 @@ func testContext(t testing.TB) (*state.StateStore, *EvalContext) {
plan := &structs.Plan{
NodeUpdate: make(map[string][]*structs.Allocation),
NodeAllocation: make(map[string][]*structs.Allocation),
NodePreemptions: make(map[string][]*structs.Allocation),
}
logger := testlog.HCLogger(t)

View File

@ -130,7 +130,8 @@ func (s *GenericScheduler) Process(eval *structs.Evaluation) error {
structs.EvalTriggerRollingUpdate, structs.EvalTriggerQueuedAllocs,
structs.EvalTriggerPeriodicJob, structs.EvalTriggerMaxPlans,
structs.EvalTriggerDeploymentWatcher, structs.EvalTriggerRetryFailedAlloc,
structs.EvalTriggerFailedFollowUp:
structs.EvalTriggerFailedFollowUp,
structs.EvalTriggerPreemption:
default:
desc := fmt.Sprintf("scheduler cannot handle '%s' evaluation reason",
eval.TriggeredBy)

437
scheduler/preemption.go Normal file
View File

@ -0,0 +1,437 @@
package scheduler
import (
"fmt"
"math"
"sort"
"github.com/hashicorp/nomad/nomad/structs"
)
// maxParallelPenalty is a score penalty applied to allocations to mitigate against
// too many allocations of the same job being preempted. This penalty is applied after the
// number of allocations being preempted exceeds max_parallel value in the job's migrate stanza
const maxParallelPenalty = 50.0
type PreemptionType uint8
const (
NetworkResource PreemptionType = iota
CPUMemoryDiskIOPS
)
// resourceDistance returns how close the resource is to the resource being asked for
// It is calculated by first computing a relative fraction and then measuring how close
// that is to the origin coordinate. Lower values are better
func resourceDistance(resource *structs.Resources, resourceAsk *structs.Resources) float64 {
memoryCoord, cpuCoord, iopsCoord, diskMBCoord := 0.0, 0.0, 0.0, 0.0
if resourceAsk.MemoryMB > 0 {
memoryCoord = float64(resourceAsk.MemoryMB-resource.MemoryMB) / float64(resourceAsk.MemoryMB)
}
if resourceAsk.CPU > 0 {
cpuCoord = float64(resourceAsk.CPU-resource.CPU) / float64(resourceAsk.CPU)
}
if resourceAsk.IOPS > 0 {
iopsCoord = float64(resourceAsk.IOPS-resource.IOPS) / float64(resourceAsk.IOPS)
}
if resourceAsk.DiskMB > 0 {
diskMBCoord = float64(resourceAsk.DiskMB-resource.DiskMB) / float64(resourceAsk.DiskMB)
}
originDist := math.Sqrt(
math.Pow(memoryCoord, 2) +
math.Pow(cpuCoord, 2) +
math.Pow(iopsCoord, 2) +
math.Pow(diskMBCoord, 2))
return originDist
}
// networkResourceDistance returns distance based on network megabits
func networkResourceDistance(resource *structs.Resources, resourceAsk *structs.Resources) float64 {
networkCoord := math.MaxFloat64
if len(resourceAsk.Networks) > 0 && resourceAsk.Networks[0].MBits > 0 {
networkCoord = float64(resourceAsk.Networks[0].MBits-resource.Networks[0].MBits) / float64(resourceAsk.Networks[0].MBits)
}
originDist := math.Sqrt(
math.Pow(networkCoord, 2))
return originDist
}
// getPreemptionScore is used to calculate a score (lower is better) based on the distance between
// the needed resource and requirements. A penalty is added when the choice already has some existing
// allocations in the plan that are being preempted.
func getPreemptionScore(resource *structs.Resources, resourceAsk *structs.Resources, preemptionType PreemptionType, maxParallel int, numPreemptedAllocs int) float64 {
maxParallelScorePenalty := 0.0
if maxParallel > 0 && numPreemptedAllocs >= maxParallel {
maxParallelScorePenalty = float64((numPreemptedAllocs+1)-maxParallel) * maxParallelPenalty
}
switch preemptionType {
case NetworkResource:
return networkResourceDistance(resource, resourceAsk) + maxParallelScorePenalty
case CPUMemoryDiskIOPS:
return resourceDistance(resource, resourceAsk) + maxParallelScorePenalty
}
panic(fmt.Errorf("Unknown preemption type:%v", preemptionType))
}
// findPreemptibleAllocationsForTaskGroup computes a list of allocations to preempt to accommodate
// the resources asked for. Only allocs with a job priority < 10 of jobPriority are considered
// This method is used after network resource needs have already been met.
func findPreemptibleAllocationsForTaskGroup(jobPriority int, current []*structs.Allocation, resourceAsk *structs.Resources, node *structs.Node, currentPreemptions []*structs.Allocation) []*structs.Allocation {
resourcesNeeded := resourceAsk.Copy()
allocsByPriority := filterAndGroupPreemptibleAllocs(jobPriority, current)
var bestAllocs []*structs.Allocation
allRequirementsMet := false
var preemptedResources *structs.Resources
//TODO(preetha): should add some debug logging
nodeRemainingResources := node.Resources.Copy()
// Initialize nodeRemainingResources with the remaining resources
// after accounting for reserved resources and all allocations
// Subtract the reserved resources of the node
if node.Reserved != nil {
nodeRemainingResources.Subtract(node.Reserved)
}
// Subtract current allocations
for _, alloc := range current {
nodeRemainingResources.Subtract(alloc.Resources)
}
// Iterate over allocations grouped by priority to find preemptible allocations
for _, allocGrp := range allocsByPriority {
for len(allocGrp.allocs) > 0 && !allRequirementsMet {
closestAllocIndex := -1
bestDistance := math.MaxFloat64
// find the alloc with the closest distance
for index, alloc := range allocGrp.allocs {
currentPreemptionCount := computeCurrentPreemptions(alloc, currentPreemptions)
maxParallel := 0
tg := alloc.Job.LookupTaskGroup(alloc.TaskGroup)
if tg != nil && tg.Migrate != nil {
maxParallel = tg.Migrate.MaxParallel
}
distance := getPreemptionScore(alloc.Resources, resourcesNeeded, CPUMemoryDiskIOPS, maxParallel, currentPreemptionCount)
if distance < bestDistance {
bestDistance = distance
closestAllocIndex = index
}
}
closestAlloc := allocGrp.allocs[closestAllocIndex]
if preemptedResources == nil {
preemptedResources = closestAlloc.Resources.Copy()
} else {
preemptedResources.Add(closestAlloc.Resources)
}
availableResources := preemptedResources.Copy()
availableResources.Add(nodeRemainingResources)
allRequirementsMet = meetsNonNetworkRequirements(availableResources, resourceAsk)
bestAllocs = append(bestAllocs, closestAlloc)
allocGrp.allocs[closestAllocIndex] = allocGrp.allocs[len(allocGrp.allocs)-1]
allocGrp.allocs = allocGrp.allocs[:len(allocGrp.allocs)-1]
resourcesNeeded.Subtract(closestAlloc.Resources)
}
if allRequirementsMet {
break
}
}
// Early return if all allocs examined and requirements were not met
if !allRequirementsMet {
return nil
}
// We do another pass to eliminate unnecessary preemptions
// This filters out allocs whose resources are already covered by another alloc
filteredBestAllocs := eliminateSuperSetAllocations(bestAllocs, resourceAsk, nodeRemainingResources, resourceDistance, meetsNonNetworkRequirements)
return filteredBestAllocs
}
// computeCurrentPreemptions counts the number of other allocations being preempted that match the job and task group of
// the alloc under consideration. This is used as a scoring factor to minimize too many allocs of the same job being preempted at once
func computeCurrentPreemptions(currentAlloc *structs.Allocation, currentPreemptions []*structs.Allocation) int {
numCurrentPreemptionsForJob := 0
for _, alloc := range currentPreemptions {
if alloc.JobID == currentAlloc.JobID && alloc.Namespace == currentAlloc.Namespace && alloc.TaskGroup == currentAlloc.TaskGroup {
numCurrentPreemptionsForJob++
}
}
return numCurrentPreemptionsForJob
}
// meetsNonNetworkRequirements checks if the first resource meets or exceeds the second resource's requirements
// This intentionally ignores network requirements, those are handled by meetsNetworkRequirements
func meetsNonNetworkRequirements(first *structs.Resources, second *structs.Resources) bool {
if first.CPU < second.CPU {
return false
}
if first.MemoryMB < second.MemoryMB {
return false
}
if first.DiskMB < second.DiskMB {
return false
}
if first.IOPS < second.IOPS {
return false
}
return true
}
// meetsNetworkRequirements checks if the first resource meets or exceeds the second resource's network MBits requirements
func meetsNetworkRequirements(first *structs.Resources, second *structs.Resources) bool {
if len(first.Networks) == 0 || len(second.Networks) == 0 {
return false
}
return first.Networks[0].MBits >= second.Networks[0].MBits
}
type groupedAllocs struct {
priority int
allocs []*structs.Allocation
}
func filterAndGroupPreemptibleAllocs(jobPriority int, current []*structs.Allocation) []*groupedAllocs {
allocsByPriority := make(map[int][]*structs.Allocation)
for _, alloc := range current {
// Why is alloc.Job even nil though?
if alloc.Job == nil {
continue
}
// Skip allocs whose priority is within a delta of 10
// This also skips any allocs of the current job
// for which we are attempting preemption
if jobPriority-alloc.Job.Priority < 10 {
continue
}
grpAllocs, ok := allocsByPriority[alloc.Job.Priority]
if !ok {
grpAllocs = make([]*structs.Allocation, 0)
}
grpAllocs = append(grpAllocs, alloc)
allocsByPriority[alloc.Job.Priority] = grpAllocs
}
var groupedSortedAllocs []*groupedAllocs
for priority, allocs := range allocsByPriority {
groupedSortedAllocs = append(groupedSortedAllocs, &groupedAllocs{
priority: priority,
allocs: allocs})
}
// Sort by priority
sort.Slice(groupedSortedAllocs, func(i, j int) bool {
return groupedSortedAllocs[i].priority < groupedSortedAllocs[j].priority
})
return groupedSortedAllocs
}
type distanceFn func(first *structs.Resources, second *structs.Resources) float64
type meetsRequirementsFn func(first *structs.Resources, second *structs.Resources) bool
func eliminateSuperSetAllocations(bestAllocs []*structs.Allocation, resourceAsk *structs.Resources,
nodeRemainingResources *structs.Resources, distanceFunc distanceFn, reqFunc meetsRequirementsFn) []*structs.Allocation {
// Sort by distance reversed to surface any superset allocs first
sort.Slice(bestAllocs, func(i, j int) bool {
distance1 := distanceFunc(bestAllocs[i].Resources, resourceAsk)
distance2 := distanceFunc(bestAllocs[j].Resources, resourceAsk)
return distance1 > distance2
})
var preemptedResources *structs.Resources
var filteredBestAllocs []*structs.Allocation
// Do another pass to eliminate allocations that are a superset of other allocations
// in the preemption set
for _, alloc := range bestAllocs {
if preemptedResources == nil {
preemptedResources = alloc.Resources
} else {
preemptedResources.Add(alloc.Resources)
}
filteredBestAllocs = append(filteredBestAllocs, alloc)
availableResources := preemptedResources.Copy()
availableResources.Add(nodeRemainingResources)
requirementsMet := reqFunc(availableResources, resourceAsk)
if requirementsMet {
break
}
}
return filteredBestAllocs
}
// preemptForNetworkResourceAsk tries to find allocations to preempt to meet network resources.
// this needs to consider network resources at the task level and for the same task it should
// only preempt allocations that share the same network device
func preemptForNetworkResourceAsk(jobPriority int, currentAllocs []*structs.Allocation, resourceAsk *structs.Resources,
netIdx *structs.NetworkIndex, currentPreemptions []*structs.Allocation) []*structs.Allocation {
// Early return if there are no current allocs
if len(currentAllocs) == 0 {
return nil
}
networkResourceAsk := resourceAsk.Networks[0]
deviceToAllocs := make(map[string][]*structs.Allocation)
MbitsNeeded := networkResourceAsk.MBits
reservedPortsNeeded := networkResourceAsk.ReservedPorts
// Create a map from each device to allocs
// We do this because to place a task we have to be able to
// preempt allocations that are using the same device.
//
// This step also filters out high priority allocations and allocations
// that are not using any network resources
for _, alloc := range currentAllocs {
if alloc.Job == nil {
continue
}
if jobPriority-alloc.Job.Priority < 10 {
continue
}
if len(alloc.Resources.Networks) > 0 {
device := alloc.Resources.Networks[0].Device
allocsForDevice := deviceToAllocs[device]
allocsForDevice = append(allocsForDevice, alloc)
deviceToAllocs[device] = allocsForDevice
}
}
// If no existing allocations use network resources, return early
if len(deviceToAllocs) == 0 {
return nil
}
var allocsToPreempt []*structs.Allocation
met := false
freeBandwidth := 0
for device, currentAllocs := range deviceToAllocs {
totalBandwidth := netIdx.AvailBandwidth[device]
// If the device doesn't have enough total available bandwidth, skip
if totalBandwidth < MbitsNeeded {
continue
}
// Track how much existing free bandwidth we have before preemption
freeBandwidth = totalBandwidth - netIdx.UsedBandwidth[device]
preemptedBandwidth := 0
// Reset allocsToPreempt since we don't want to preempt across devices for the same task
allocsToPreempt = nil
// Build map from used reserved ports to allocation
usedPortToAlloc := make(map[int]*structs.Allocation)
// First try to satisfy needed reserved ports
if len(reservedPortsNeeded) > 0 {
for _, alloc := range currentAllocs {
for _, tr := range alloc.TaskResources {
reservedPorts := tr.Networks[0].ReservedPorts
for _, p := range reservedPorts {
usedPortToAlloc[p.Value] = alloc
}
}
}
// Look for allocs that are using reserved ports needed
for _, port := range reservedPortsNeeded {
alloc, ok := usedPortToAlloc[port.Value]
if ok {
preemptedBandwidth += alloc.Resources.Networks[0].MBits
allocsToPreempt = append(allocsToPreempt, alloc)
}
}
// Remove allocs that were preempted to satisfy reserved ports
currentAllocs = structs.RemoveAllocs(currentAllocs, allocsToPreempt)
}
// If bandwidth requirements have been met, stop
if preemptedBandwidth+freeBandwidth >= MbitsNeeded {
met = true
break
}
// Split by priority
allocsByPriority := filterAndGroupPreemptibleAllocs(jobPriority, currentAllocs)
for _, allocsGrp := range allocsByPriority {
allocs := allocsGrp.allocs
// Sort by distance function that takes into account needed MBits
// as well as penalty for preempting an allocation
// whose task group already has existing preemptions
sort.Slice(allocs, func(i, j int) bool {
firstAlloc := allocs[i]
currentPreemptionCount1 := computeCurrentPreemptions(firstAlloc, currentPreemptions)
// Look up configured maxParallel value for these allocation's task groups
var maxParallel1, maxParallel2 int
tg1 := allocs[i].Job.LookupTaskGroup(allocs[i].TaskGroup)
if tg1 != nil && tg1.Migrate != nil {
maxParallel1 = tg1.Migrate.MaxParallel
}
distance1 := getPreemptionScore(allocs[i].Resources, resourceAsk, NetworkResource, maxParallel1, currentPreemptionCount1)
secondAlloc := allocs[j]
currentPreemptionCount2 := computeCurrentPreemptions(secondAlloc, currentPreemptions)
tg2 := secondAlloc.Job.LookupTaskGroup(secondAlloc.TaskGroup)
if tg2 != nil && tg2.Migrate != nil {
maxParallel2 = tg2.Migrate.MaxParallel
}
distance2 := getPreemptionScore(secondAlloc.Resources, resourceAsk, NetworkResource, maxParallel2, currentPreemptionCount2)
return distance1 < distance2
})
for _, alloc := range allocs {
preemptedBandwidth += alloc.Resources.Networks[0].MBits
allocsToPreempt = append(allocsToPreempt, alloc)
if preemptedBandwidth+freeBandwidth >= MbitsNeeded {
met = true
break
}
}
if met {
break
}
}
if met {
break
}
}
if len(allocsToPreempt) == 0 {
return nil
}
// Build a resource object with just the network Mbits filled in
// Its safe to use the first preempted allocation's network resource
// here because all allocations preempted will be from the same device
nodeRemainingResources := &structs.Resources{
Networks: []*structs.NetworkResource{
{
Device: allocsToPreempt[0].Resources.Networks[0].Device,
MBits: freeBandwidth,
},
},
}
// Do a final pass to eliminate any superset allocations
filteredBestAllocs := eliminateSuperSetAllocations(allocsToPreempt, resourceAsk, nodeRemainingResources, networkResourceDistance, meetsNetworkRequirements)
return filteredBestAllocs
}

View File

@ -0,0 +1,789 @@
package scheduler
import (
"testing"
"fmt"
"github.com/hashicorp/nomad/helper/uuid"
"github.com/hashicorp/nomad/nomad/mock"
"github.com/hashicorp/nomad/nomad/structs"
"github.com/stretchr/testify/require"
)
func TestResourceDistance(t *testing.T) {
resourceAsk := &structs.Resources{
CPU: 2048,
MemoryMB: 512,
IOPS: 300,
DiskMB: 4096,
Networks: []*structs.NetworkResource{
{
Device: "eth0",
MBits: 1024,
},
},
}
type testCase struct {
allocResource *structs.Resources
expectedDistance string
}
testCases := []*testCase{
{
&structs.Resources{
CPU: 2048,
MemoryMB: 512,
IOPS: 300,
DiskMB: 4096,
Networks: []*structs.NetworkResource{
{
Device: "eth0",
MBits: 1024,
},
},
},
"0.000",
},
{
&structs.Resources{
CPU: 1024,
MemoryMB: 400,
IOPS: 200,
DiskMB: 1024,
Networks: []*structs.NetworkResource{
{
Device: "eth0",
MBits: 1024,
},
},
},
"0.986",
},
{
&structs.Resources{
CPU: 1024,
MemoryMB: 200,
IOPS: 200,
DiskMB: 1024,
Networks: []*structs.NetworkResource{
{
Device: "eth0",
MBits: 512,
},
},
},
"1.138",
},
{
&structs.Resources{
CPU: 8192,
MemoryMB: 200,
IOPS: 200,
DiskMB: 1024,
Networks: []*structs.NetworkResource{
{
Device: "eth0",
MBits: 512,
},
},
},
"3.169",
},
{
&structs.Resources{
CPU: 2048,
MemoryMB: 500,
IOPS: 300,
DiskMB: 4096,
Networks: []*structs.NetworkResource{
{
Device: "eth0",
MBits: 1024,
},
},
},
"0.023",
},
}
for _, tc := range testCases {
t.Run("", func(t *testing.T) {
require := require.New(t)
require.Equal(tc.expectedDistance, fmt.Sprintf("%3.3f", resourceDistance(tc.allocResource, resourceAsk)))
})
}
}
func TestPreemption(t *testing.T) {
type testCase struct {
desc string
currentAllocations []*structs.Allocation
nodeReservedCapacity *structs.Resources
nodeCapacity *structs.Resources
resourceAsk *structs.Resources
jobPriority int
currentPreemptions []*structs.Allocation
preemptedAllocIDs map[string]struct{}
}
highPrioJob := mock.Job()
highPrioJob.Priority = 100
lowPrioJob := mock.Job()
lowPrioJob.Priority = 30
lowPrioJob2 := mock.Job()
lowPrioJob2.Priority = 30
// Create some persistent alloc ids to use in test cases
allocIDs := []string{uuid.Generate(), uuid.Generate(), uuid.Generate(), uuid.Generate(), uuid.Generate()}
nodeResources := &structs.Resources{
CPU: 4000,
MemoryMB: 8192,
DiskMB: 100 * 1024,
IOPS: 150,
Networks: []*structs.NetworkResource{
{
Device: "eth0",
CIDR: "192.168.0.100/32",
MBits: 1000,
},
},
}
reservedNodeResources := &structs.Resources{
CPU: 100,
MemoryMB: 256,
DiskMB: 4 * 1024,
Networks: []*structs.NetworkResource{
{
Device: "eth0",
IP: "192.168.0.100",
ReservedPorts: []structs.Port{{Label: "ssh", Value: 22}},
MBits: 1,
},
},
}
testCases := []testCase{
{
desc: "No preemption because existing allocs are not low priority",
currentAllocations: []*structs.Allocation{
createAlloc(allocIDs[0], highPrioJob, &structs.Resources{
CPU: 3200,
MemoryMB: 7256,
DiskMB: 4 * 1024,
IOPS: 150,
Networks: []*structs.NetworkResource{
{
Device: "eth0",
IP: "192.168.0.100",
MBits: 50,
},
},
})},
nodeReservedCapacity: reservedNodeResources,
nodeCapacity: nodeResources,
jobPriority: 100,
resourceAsk: &structs.Resources{
CPU: 100,
MemoryMB: 256,
DiskMB: 4 * 1024,
Networks: []*structs.NetworkResource{
{
Device: "eth0",
IP: "192.168.0.100",
ReservedPorts: []structs.Port{{Label: "ssh", Value: 22}},
MBits: 1,
},
},
},
},
{
desc: "Preempting low priority allocs not enough to meet resource ask",
currentAllocations: []*structs.Allocation{
createAlloc(allocIDs[0], lowPrioJob, &structs.Resources{
CPU: 3200,
MemoryMB: 7256,
DiskMB: 4 * 1024,
IOPS: 150,
Networks: []*structs.NetworkResource{
{
Device: "eth0",
IP: "192.168.0.100",
MBits: 50,
},
},
})},
nodeReservedCapacity: reservedNodeResources,
nodeCapacity: nodeResources,
jobPriority: 100,
resourceAsk: &structs.Resources{
CPU: 4000,
MemoryMB: 8192,
DiskMB: 4 * 1024,
IOPS: 300,
Networks: []*structs.NetworkResource{
{
Device: "eth0",
IP: "192.168.0.100",
ReservedPorts: []structs.Port{{Label: "ssh", Value: 22}},
MBits: 1,
},
},
},
},
{
desc: "Combination of high/low priority allocs, without static ports",
currentAllocations: []*structs.Allocation{
createAlloc(allocIDs[0], highPrioJob, &structs.Resources{
CPU: 2800,
MemoryMB: 2256,
DiskMB: 4 * 1024,
IOPS: 150,
Networks: []*structs.NetworkResource{
{
Device: "eth0",
IP: "192.168.0.100",
MBits: 150,
},
},
}),
createAlloc(allocIDs[1], lowPrioJob, &structs.Resources{
CPU: 200,
MemoryMB: 256,
DiskMB: 4 * 1024,
IOPS: 50,
Networks: []*structs.NetworkResource{
{
Device: "eth0",
IP: "192.168.0.200",
MBits: 500,
},
},
}),
createAlloc(allocIDs[2], lowPrioJob, &structs.Resources{
CPU: 200,
MemoryMB: 256,
DiskMB: 4 * 1024,
IOPS: 50,
Networks: []*structs.NetworkResource{
{
Device: "eth0",
IP: "192.168.0.100",
MBits: 300,
},
},
}),
createAlloc(allocIDs[3], lowPrioJob, &structs.Resources{
CPU: 700,
MemoryMB: 256,
DiskMB: 4 * 1024,
}),
},
nodeReservedCapacity: reservedNodeResources,
nodeCapacity: nodeResources,
jobPriority: 100,
resourceAsk: &structs.Resources{
CPU: 1100,
MemoryMB: 1000,
DiskMB: 25 * 1024,
Networks: []*structs.NetworkResource{
{
Device: "eth0",
IP: "192.168.0.100",
MBits: 840,
},
},
},
preemptedAllocIDs: map[string]struct{}{
allocIDs[1]: {},
allocIDs[2]: {},
allocIDs[3]: {},
},
}, {
desc: "Preemption needed for all resources except network",
currentAllocations: []*structs.Allocation{
createAlloc(allocIDs[0], highPrioJob, &structs.Resources{
CPU: 2800,
MemoryMB: 2256,
DiskMB: 40 * 1024,
IOPS: 100,
Networks: []*structs.NetworkResource{
{
Device: "eth0",
IP: "192.168.0.100",
MBits: 150,
},
},
}),
createAlloc(allocIDs[1], lowPrioJob, &structs.Resources{
CPU: 200,
MemoryMB: 256,
DiskMB: 4 * 1024,
IOPS: 50,
Networks: []*structs.NetworkResource{
{
Device: "eth0",
IP: "192.168.0.200",
MBits: 50,
},
},
}),
createAlloc(allocIDs[2], lowPrioJob, &structs.Resources{
CPU: 200,
MemoryMB: 512,
DiskMB: 25 * 1024,
}),
createAlloc(allocIDs[3], lowPrioJob, &structs.Resources{
CPU: 700,
MemoryMB: 276,
DiskMB: 20 * 1024,
}),
},
nodeReservedCapacity: reservedNodeResources,
nodeCapacity: nodeResources,
jobPriority: 100,
resourceAsk: &structs.Resources{
CPU: 1000,
MemoryMB: 3000,
DiskMB: 50 * 1024,
Networks: []*structs.NetworkResource{
{
Device: "eth0",
IP: "192.168.0.100",
MBits: 50,
},
},
},
preemptedAllocIDs: map[string]struct{}{
allocIDs[1]: {},
allocIDs[2]: {},
allocIDs[3]: {},
},
},
{
desc: "Only one low priority alloc needs to be preempted",
currentAllocations: []*structs.Allocation{
createAlloc(allocIDs[0], highPrioJob, &structs.Resources{
CPU: 1200,
MemoryMB: 2256,
DiskMB: 4 * 1024,
IOPS: 50,
Networks: []*structs.NetworkResource{
{
Device: "eth0",
IP: "192.168.0.100",
MBits: 150,
},
},
}),
createAlloc(allocIDs[1], lowPrioJob, &structs.Resources{
CPU: 200,
MemoryMB: 256,
DiskMB: 4 * 1024,
IOPS: 10,
Networks: []*structs.NetworkResource{
{
Device: "eth0",
IP: "192.168.0.100",
MBits: 500,
},
},
}),
createAlloc(allocIDs[2], lowPrioJob, &structs.Resources{
CPU: 200,
MemoryMB: 256,
DiskMB: 4 * 1024,
IOPS: 10,
Networks: []*structs.NetworkResource{
{
Device: "eth0",
IP: "192.168.0.200",
MBits: 300,
},
},
}),
},
nodeReservedCapacity: reservedNodeResources,
nodeCapacity: nodeResources,
jobPriority: 100,
resourceAsk: &structs.Resources{
CPU: 300,
MemoryMB: 500,
DiskMB: 5 * 1024,
Networks: []*structs.NetworkResource{
{
Device: "eth0",
IP: "192.168.0.100",
MBits: 320,
},
},
},
preemptedAllocIDs: map[string]struct{}{
allocIDs[2]: {},
},
},
{
desc: "one alloc meets static port need, another meets remaining mbits needed",
currentAllocations: []*structs.Allocation{
createAlloc(allocIDs[0], highPrioJob, &structs.Resources{
CPU: 1200,
MemoryMB: 2256,
DiskMB: 4 * 1024,
IOPS: 150,
Networks: []*structs.NetworkResource{
{
Device: "eth0",
IP: "192.168.0.100",
MBits: 150,
},
},
}),
createAlloc(allocIDs[1], lowPrioJob, &structs.Resources{
CPU: 200,
MemoryMB: 256,
DiskMB: 4 * 1024,
IOPS: 50,
Networks: []*structs.NetworkResource{
{
Device: "eth0",
IP: "192.168.0.200",
MBits: 500,
ReservedPorts: []structs.Port{
{
Label: "db",
Value: 88,
},
},
},
},
}),
createAlloc(allocIDs[2], lowPrioJob, &structs.Resources{
CPU: 200,
MemoryMB: 256,
DiskMB: 4 * 1024,
IOPS: 50,
Networks: []*structs.NetworkResource{
{
Device: "eth0",
IP: "192.168.0.100",
MBits: 200,
},
},
}),
},
nodeReservedCapacity: reservedNodeResources,
nodeCapacity: nodeResources,
jobPriority: 100,
resourceAsk: &structs.Resources{
CPU: 2700,
MemoryMB: 1000,
DiskMB: 25 * 1024,
Networks: []*structs.NetworkResource{
{
Device: "eth0",
IP: "192.168.0.100",
MBits: 800,
ReservedPorts: []structs.Port{
{
Label: "db",
Value: 88,
},
},
},
},
},
preemptedAllocIDs: map[string]struct{}{
allocIDs[1]: {},
allocIDs[2]: {},
},
},
{
desc: "alloc that meets static port need also meets other needds",
currentAllocations: []*structs.Allocation{
createAlloc(allocIDs[0], highPrioJob, &structs.Resources{
CPU: 1200,
MemoryMB: 2256,
DiskMB: 4 * 1024,
IOPS: 50,
Networks: []*structs.NetworkResource{
{
Device: "eth0",
IP: "192.168.0.100",
MBits: 150,
},
},
}),
createAlloc(allocIDs[1], lowPrioJob, &structs.Resources{
CPU: 200,
MemoryMB: 256,
DiskMB: 4 * 1024,
IOPS: 50,
Networks: []*structs.NetworkResource{
{
Device: "eth0",
IP: "192.168.0.200",
MBits: 600,
ReservedPorts: []structs.Port{
{
Label: "db",
Value: 88,
},
},
},
},
}),
createAlloc(allocIDs[2], lowPrioJob, &structs.Resources{
CPU: 200,
MemoryMB: 256,
DiskMB: 4 * 1024,
IOPS: 50,
Networks: []*structs.NetworkResource{
{
Device: "eth0",
IP: "192.168.0.100",
MBits: 100,
},
},
}),
},
nodeReservedCapacity: reservedNodeResources,
nodeCapacity: nodeResources,
jobPriority: 100,
resourceAsk: &structs.Resources{
CPU: 600,
MemoryMB: 1000,
DiskMB: 25 * 1024,
Networks: []*structs.NetworkResource{
{
Device: "eth0",
IP: "192.168.0.100",
MBits: 700,
ReservedPorts: []structs.Port{
{
Label: "db",
Value: 88,
},
},
},
},
},
preemptedAllocIDs: map[string]struct{}{
allocIDs[1]: {},
},
},
{
desc: "alloc from job that has existing evictions not chosen for preemption",
currentAllocations: []*structs.Allocation{
createAlloc(allocIDs[0], highPrioJob, &structs.Resources{
CPU: 1200,
MemoryMB: 2256,
DiskMB: 4 * 1024,
IOPS: 50,
Networks: []*structs.NetworkResource{
{
Device: "eth0",
IP: "192.168.0.100",
MBits: 150,
},
},
}),
createAlloc(allocIDs[1], lowPrioJob, &structs.Resources{
CPU: 200,
MemoryMB: 256,
DiskMB: 4 * 1024,
IOPS: 10,
Networks: []*structs.NetworkResource{
{
Device: "eth0",
IP: "192.168.0.200",
MBits: 500,
},
},
}),
createAlloc(allocIDs[2], lowPrioJob2, &structs.Resources{
CPU: 200,
MemoryMB: 256,
DiskMB: 4 * 1024,
IOPS: 10,
Networks: []*structs.NetworkResource{
{
Device: "eth0",
IP: "192.168.0.100",
MBits: 300,
},
},
}),
},
nodeReservedCapacity: reservedNodeResources,
nodeCapacity: nodeResources,
jobPriority: 100,
resourceAsk: &structs.Resources{
CPU: 300,
MemoryMB: 500,
DiskMB: 5 * 1024,
Networks: []*structs.NetworkResource{
{
Device: "eth0",
IP: "192.168.0.100",
MBits: 320,
},
},
},
currentPreemptions: []*structs.Allocation{
createAlloc(allocIDs[4], lowPrioJob2, &structs.Resources{
CPU: 200,
MemoryMB: 256,
DiskMB: 4 * 1024,
IOPS: 10,
Networks: []*structs.NetworkResource{
{
Device: "eth0",
IP: "192.168.0.100",
MBits: 300,
},
},
}),
},
preemptedAllocIDs: map[string]struct{}{
allocIDs[1]: {},
},
},
// This test case exercises the code path for a final filtering step that tries to
// minimize the number of preemptible allocations
{
desc: "Filter out allocs whose resource usage superset is also in the preemption list",
currentAllocations: []*structs.Allocation{
createAlloc(allocIDs[0], highPrioJob, &structs.Resources{
CPU: 1800,
MemoryMB: 2256,
DiskMB: 4 * 1024,
IOPS: 50,
Networks: []*structs.NetworkResource{
{
Device: "eth0",
IP: "192.168.0.100",
MBits: 150,
},
},
}),
createAlloc(allocIDs[1], lowPrioJob, &structs.Resources{
CPU: 1500,
MemoryMB: 256,
DiskMB: 5 * 1024,
Networks: []*structs.NetworkResource{
{
Device: "eth0",
IP: "192.168.0.100",
MBits: 100,
},
},
}),
createAlloc(allocIDs[2], lowPrioJob, &structs.Resources{
CPU: 600,
MemoryMB: 256,
DiskMB: 5 * 1024,
Networks: []*structs.NetworkResource{
{
Device: "eth0",
IP: "192.168.0.200",
MBits: 300,
},
},
}),
},
nodeReservedCapacity: reservedNodeResources,
nodeCapacity: nodeResources,
jobPriority: 100,
resourceAsk: &structs.Resources{
CPU: 1000,
MemoryMB: 256,
DiskMB: 5 * 1024,
Networks: []*structs.NetworkResource{
{
Device: "eth0",
IP: "192.168.0.100",
MBits: 50,
},
},
},
preemptedAllocIDs: map[string]struct{}{
allocIDs[1]: {},
},
},
}
for _, tc := range testCases {
t.Run(tc.desc, func(t *testing.T) {
node := mock.Node()
node.Resources = tc.nodeCapacity
node.Reserved = tc.nodeReservedCapacity
state, ctx := testContext(t)
nodes := []*RankedNode{
{
Node: node,
},
}
state.UpsertNode(1000, node)
for _, alloc := range tc.currentAllocations {
alloc.NodeID = node.ID
}
require := require.New(t)
err := state.UpsertAllocs(1001, tc.currentAllocations)
require.Nil(err)
if tc.currentPreemptions != nil {
ctx.plan.NodePreemptions[node.ID] = tc.currentPreemptions
}
static := NewStaticRankIterator(ctx, nodes)
binPackIter := NewBinPackIterator(ctx, static, true, tc.jobPriority)
taskGroup := &structs.TaskGroup{
EphemeralDisk: &structs.EphemeralDisk{},
Tasks: []*structs.Task{
{
Name: "web",
Resources: tc.resourceAsk,
},
},
}
binPackIter.SetTaskGroup(taskGroup)
option := binPackIter.Next()
if tc.preemptedAllocIDs == nil {
require.Nil(option)
} else {
require.NotNil(option)
preemptedAllocs := option.PreemptedAllocs
require.Equal(len(tc.preemptedAllocIDs), len(preemptedAllocs))
for _, alloc := range preemptedAllocs {
_, ok := tc.preemptedAllocIDs[alloc.ID]
require.True(ok)
}
}
})
}
}
// helper method to create allocations with given jobs and resources
func createAlloc(id string, job *structs.Job, resource *structs.Resources) *structs.Allocation {
alloc := &structs.Allocation{
ID: id,
Job: job,
JobID: job.ID,
TaskResources: map[string]*structs.Resources{
"web": resource,
},
Resources: resource,
Namespace: structs.DefaultNamespace,
EvalID: uuid.Generate(),
DesiredStatus: structs.AllocDesiredStatusRun,
ClientStatus: structs.AllocClientStatusRunning,
TaskGroup: "web",
}
return alloc
}

View File

@ -25,6 +25,10 @@ type RankedNode struct {
// Allocs is used to cache the proposed allocations on the
// node. This can be shared between iterators that require it.
Proposed []*structs.Allocation
// PreemptedAllocs is used by the BinpackIterator to identify allocs
// that should be preempted in order to make the placement
PreemptedAllocs []*structs.Allocation
}
func (r *RankedNode) GoString() string {
@ -195,6 +199,16 @@ OUTER:
DiskMB: int64(iter.taskGroup.EphemeralDisk.SizeMB),
},
}
var allocsToPreempt []*structs.Allocation
// Count the number of existing preemptions
allPreemptions := iter.ctx.Plan().NodePreemptions
var currentPreemptions []*structs.Allocation
for _, allocs := range allPreemptions {
currentPreemptions = append(currentPreemptions, allocs...)
}
for _, task := range iter.taskGroup.Tasks {
// Allocate the resources
taskResources := &structs.AllocatedTaskResources{
@ -211,12 +225,42 @@ OUTER:
ask := task.Resources.Networks[0].Copy()
offer, err := netIdx.AssignNetwork(ask)
if offer == nil {
// If eviction is not enabled, mark this node as exhausted and continue
if !iter.evict {
iter.ctx.Metrics().ExhaustedNode(option.Node,
fmt.Sprintf("network: %s", err))
netIdx.Release()
continue OUTER
}
// Look for preemptible allocations to satisfy the network resource for this task
preemptedAllocsForTaskNetwork := preemptForNetworkResourceAsk(iter.priority, proposed, taskResources, netIdx, currentPreemptions)
if preemptedAllocsForTaskNetwork == nil {
iter.ctx.Metrics().ExhaustedNode(option.Node,
fmt.Sprintf("unable to meet network resource %v after preemption", ask))
netIdx.Release()
continue OUTER
}
allocsToPreempt = append(allocsToPreempt, preemptedAllocsForTaskNetwork...)
// First subtract out preempted allocations
proposed = structs.RemoveAllocs(proposed, preemptedAllocsForTaskNetwork)
// Reset the network index and try the offer again
netIdx.Release()
netIdx = structs.NewNetworkIndex()
netIdx.SetNode(option.Node)
netIdx.AddAllocs(proposed)
offer, err = netIdx.AssignNetwork(ask)
if offer == nil {
iter.ctx.Metrics().ExhaustedNode(option.Node,
fmt.Sprintf("unexecpted error, unable to create offer after preempting:%v", err))
netIdx.Release()
continue OUTER
}
}
// Reserve this to prevent another task from colliding
netIdx.AddReserved(offer)
@ -234,19 +278,32 @@ OUTER:
// Add the resources we are trying to fit
proposed = append(proposed, &structs.Allocation{AllocatedResources: total})
// Check if these allocations fit, if they do not, simply skip this node
// Check if these allocations fit
fit, dim, util, _ := structs.AllocsFit(option.Node, proposed, netIdx)
netIdx.Release()
if !fit {
// Skip the node if evictions are not enabled
if !iter.evict {
iter.ctx.Metrics().ExhaustedNode(option.Node, dim)
continue
}
// If eviction is enabled and the node doesn't fit the alloc, check if
// any allocs can be preempted
// XXX: For now we completely ignore evictions. We should use that flag
// to determine if its possible to evict other lower priority allocations
// to make room. This explodes the search space, so it must be done
// carefully.
// Remove the last element containing the current placement from proposed allocs
current := proposed[:len(proposed)-1]
preemptForTaskGroup := findPreemptibleAllocationsForTaskGroup(iter.priority, current, total, option.Node, currentPreemptions)
allocsToPreempt = append(allocsToPreempt, preemptForTaskGroup...)
// If we were unable to find preempted allocs to meet these requirements
// mark as exhausted and continue
if len(preemptForTaskGroup) == 0 {
iter.ctx.Metrics().ExhaustedNode(option.Node, dim)
continue
}
}
if len(allocsToPreempt) > 0 {
option.PreemptedAllocs = allocsToPreempt
}
// Score the fit normally otherwise
fitness := structs.ScoreFit(option.Node, util)
normalizedFit := fitness / binPackingMaxFitScore

View File

@ -60,7 +60,7 @@ func (s *SystemScheduler) Process(eval *structs.Evaluation) error {
// Verify the evaluation trigger reason is understood
switch eval.TriggeredBy {
case structs.EvalTriggerJobRegister, structs.EvalTriggerNodeUpdate, structs.EvalTriggerFailedFollowUp,
structs.EvalTriggerJobDeregister, structs.EvalTriggerRollingUpdate,
structs.EvalTriggerJobDeregister, structs.EvalTriggerRollingUpdate, structs.EvalTriggerPreemption,
structs.EvalTriggerDeploymentWatcher, structs.EvalTriggerNodeDrain:
default:
desc := fmt.Sprintf("scheduler cannot handle '%s' evaluation reason",
@ -347,6 +347,24 @@ func (s *SystemScheduler) computePlacements(place []allocTuple) error {
alloc.PreviousAllocation = missing.Alloc.ID
}
// If this placement involves preemption, set DesiredState to stop for those allocations
if option.PreemptedAllocs != nil {
var preemptedAllocIDs []string
for _, stop := range option.PreemptedAllocs {
s.plan.AppendPreemptedAlloc(stop, structs.AllocDesiredStatusEvict, alloc.ID)
preemptedAllocIDs = append(preemptedAllocIDs, stop.ID)
if s.eval.AnnotatePlan && s.plan.Annotations != nil {
s.plan.Annotations.PreemptedAllocs = append(s.plan.Annotations.PreemptedAllocs, stop.Stub())
if s.plan.Annotations.DesiredTGUpdates != nil {
desired := s.plan.Annotations.DesiredTGUpdates[missing.TaskGroup.Name]
desired.Evict += 1
}
}
}
alloc.PreemptedAllocations = preemptedAllocIDs
}
s.plan.AppendAlloc(alloc)
} else {
// Lazy initialize the failed map

View File

@ -6,11 +6,14 @@ import (
"testing"
"time"
"fmt"
memdb "github.com/hashicorp/go-memdb"
"github.com/hashicorp/nomad/helper"
"github.com/hashicorp/nomad/helper/uuid"
"github.com/hashicorp/nomad/nomad/mock"
"github.com/hashicorp/nomad/nomad/structs"
"github.com/stretchr/testify/require"
)
func TestSystemSched_JobRegister(t *testing.T) {
@ -218,7 +221,7 @@ func TestSystemSched_JobRegister_EphemeralDiskConstraint(t *testing.T) {
JobID: job1.ID,
Status: structs.EvalStatusPending,
}
noErr(t, h.State.UpsertEvals(h.NextIndex(), []*structs.Evaluation{eval}))
noErr(t, h.State.UpsertEvals(h.NextIndex(), []*structs.Evaluation{eval1}))
// Process the evaluation
if err := h1.Process(NewSystemScheduler, eval1); err != nil {
@ -274,15 +277,31 @@ func TestSystemSched_ExhaustResources(t *testing.T) {
JobID: job.ID,
Status: structs.EvalStatusPending,
}
noErr(t, h.State.UpsertEvals(h.NextIndex(), []*structs.Evaluation{eval}))
noErr(t, h.State.UpsertEvals(h.NextIndex(), []*structs.Evaluation{eval1}))
// Process the evaluation
if err := h.Process(NewSystemScheduler, eval1); err != nil {
t.Fatalf("err: %v", err)
}
// Ensure that we have one allocation queued from the system job eval
// System scheduler will preempt the service job and would have placed eval1
require := require.New(t)
newPlan := h.Plans[1]
require.Len(newPlan.NodeAllocation, 1)
require.Len(newPlan.NodePreemptions, 1)
for _, allocList := range newPlan.NodeAllocation {
require.Len(allocList, 1)
require.Equal(job.ID, allocList[0].JobID)
}
for _, allocList := range newPlan.NodePreemptions {
require.Len(allocList, 1)
require.Equal(svcJob.ID, allocList[0].JobID)
}
// Ensure that we have no queued allocations on the second eval
queued := h.Evals[1].QueuedAllocations["web"]
if queued != 1 {
if queued != 0 {
t.Fatalf("expected: %v, actual: %v", 1, queued)
}
}
@ -1529,3 +1548,322 @@ func TestSystemSched_QueuedAllocsMultTG(t *testing.T) {
h.AssertEvalStatus(t, structs.EvalStatusComplete)
}
func TestSystemSched_Preemption(t *testing.T) {
h := NewHarness(t)
// Create nodes
var nodes []*structs.Node
for i := 0; i < 2; i++ {
node := mock.Node()
node.Resources = &structs.Resources{
CPU: 3072,
MemoryMB: 5034,
DiskMB: 20 * 1024,
IOPS: 150,
Networks: []*structs.NetworkResource{
{
Device: "eth0",
CIDR: "192.168.0.100/32",
MBits: 1000,
},
},
}
noErr(t, h.State.UpsertNode(h.NextIndex(), node))
nodes = append(nodes, node)
}
// Create some low priority batch jobs and allocations for them
// One job uses a reserved port
job1 := mock.BatchJob()
job1.Type = structs.JobTypeBatch
job1.Priority = 20
job1.TaskGroups[0].Tasks[0].Resources = &structs.Resources{
CPU: 512,
MemoryMB: 1024,
Networks: []*structs.NetworkResource{
{
MBits: 200,
ReservedPorts: []structs.Port{
{
Label: "web",
Value: 80,
},
},
},
},
}
alloc1 := mock.Alloc()
alloc1.Job = job1
alloc1.JobID = job1.ID
alloc1.NodeID = nodes[0].ID
alloc1.Name = "my-job[0]"
alloc1.TaskGroup = job1.TaskGroups[0].Name
alloc1.Resources = &structs.Resources{
CPU: 512,
MemoryMB: 1024,
DiskMB: 5 * 1024,
Networks: []*structs.NetworkResource{
{
Device: "eth0",
MBits: 200,
ReservedPorts: []structs.Port{
{
Label: "web",
Value: 80,
},
},
},
},
}
alloc1.TaskResources = map[string]*structs.Resources{
"web": {
CPU: 512,
MemoryMB: 1024,
DiskMB: 5 * 1024,
Networks: []*structs.NetworkResource{
{
Device: "eth0",
MBits: 200,
ReservedPorts: []structs.Port{
{
Label: "web",
Value: 80,
},
},
},
},
},
}
noErr(t, h.State.UpsertJob(h.NextIndex(), job1))
job2 := mock.BatchJob()
job2.Type = structs.JobTypeBatch
job2.Priority = 20
job2.TaskGroups[0].Tasks[0].Resources = &structs.Resources{
CPU: 512,
MemoryMB: 1024,
Networks: []*structs.NetworkResource{
{
MBits: 200,
},
},
}
alloc2 := mock.Alloc()
alloc2.Job = job2
alloc2.JobID = job2.ID
alloc2.NodeID = nodes[0].ID
alloc2.Name = "my-job[2]"
alloc2.TaskGroup = job2.TaskGroups[0].Name
alloc2.Resources = &structs.Resources{
CPU: 512,
MemoryMB: 1024,
DiskMB: 5 * 1024,
Networks: []*structs.NetworkResource{
{
Device: "eth0",
MBits: 200,
},
},
}
alloc2.TaskResources = map[string]*structs.Resources{
"web": {
CPU: 512,
MemoryMB: 1024,
DiskMB: 5 * 1024,
Networks: []*structs.NetworkResource{
{
Device: "eth0",
MBits: 200,
},
},
},
}
noErr(t, h.State.UpsertJob(h.NextIndex(), job2))
job3 := mock.Job()
job3.Type = structs.JobTypeBatch
job3.Priority = 40
job3.TaskGroups[0].Tasks[0].Resources = &structs.Resources{
CPU: 1024,
MemoryMB: 2048,
Networks: []*structs.NetworkResource{
{
Device: "eth0",
MBits: 400,
},
},
}
alloc3 := mock.Alloc()
alloc3.Job = job3
alloc3.JobID = job3.ID
alloc3.NodeID = nodes[0].ID
alloc3.Name = "my-job[0]"
alloc3.TaskGroup = job3.TaskGroups[0].Name
alloc3.Resources = &structs.Resources{
CPU: 1024,
MemoryMB: 25,
DiskMB: 5 * 1024,
Networks: []*structs.NetworkResource{
{
Device: "eth0",
MBits: 400,
},
},
}
alloc3.TaskResources = map[string]*structs.Resources{
"web": {
CPU: 1024,
MemoryMB: 25,
DiskMB: 5 * 1024,
Networks: []*structs.NetworkResource{
{
Device: "eth0",
MBits: 400,
},
},
},
}
noErr(t, h.State.UpsertAllocs(h.NextIndex(), []*structs.Allocation{alloc1, alloc2, alloc3}))
// Create a high priority job and allocs for it
// These allocs should not be preempted
job4 := mock.BatchJob()
job4.Type = structs.JobTypeBatch
job4.Priority = 100
job4.TaskGroups[0].Tasks[0].Resources = &structs.Resources{
CPU: 1024,
MemoryMB: 2048,
Networks: []*structs.NetworkResource{
{
MBits: 100,
},
},
}
alloc4 := mock.Alloc()
alloc4.Job = job4
alloc4.JobID = job4.ID
alloc4.NodeID = nodes[0].ID
alloc4.Name = "my-job4[0]"
alloc4.TaskGroup = job4.TaskGroups[0].Name
alloc4.Resources = &structs.Resources{
CPU: 1024,
MemoryMB: 2048,
DiskMB: 2 * 1024,
Networks: []*structs.NetworkResource{
{
Device: "eth0",
MBits: 100,
},
},
}
alloc4.TaskResources = map[string]*structs.Resources{
"web": {
CPU: 1024,
MemoryMB: 2048,
DiskMB: 2 * 1024,
Networks: []*structs.NetworkResource{
{
Device: "eth0",
MBits: 100,
},
},
},
}
noErr(t, h.State.UpsertJob(h.NextIndex(), job4))
noErr(t, h.State.UpsertAllocs(h.NextIndex(), []*structs.Allocation{alloc4}))
// Create a system job such that it would need to preempt both allocs to succeed
job := mock.SystemJob()
job.TaskGroups[0].Tasks[0].Resources = &structs.Resources{
CPU: 1948,
MemoryMB: 256,
Networks: []*structs.NetworkResource{
{
MBits: 800,
DynamicPorts: []structs.Port{{Label: "http"}},
},
},
}
noErr(t, h.State.UpsertJob(h.NextIndex(), job))
// Create a mock evaluation to register the job
eval := &structs.Evaluation{
Namespace: structs.DefaultNamespace,
ID: uuid.Generate(),
Priority: job.Priority,
TriggeredBy: structs.EvalTriggerJobRegister,
JobID: job.ID,
Status: structs.EvalStatusPending,
}
noErr(t, h.State.UpsertEvals(h.NextIndex(), []*structs.Evaluation{eval}))
// Process the evaluation
err := h.Process(NewSystemScheduler, eval)
require := require.New(t)
require.Nil(err)
// Ensure a single plan
require.Equal(1, len(h.Plans))
plan := h.Plans[0]
// Ensure the plan doesn't have annotations.
require.Nil(plan.Annotations)
// Ensure the plan allocated on both nodes
var planned []*structs.Allocation
preemptingAllocId := ""
require.Equal(2, len(plan.NodeAllocation))
// The alloc that got placed on node 1 is the preemptor
for _, allocList := range plan.NodeAllocation {
planned = append(planned, allocList...)
for _, alloc := range allocList {
if alloc.NodeID == nodes[0].ID {
preemptingAllocId = alloc.ID
}
}
}
// Lookup the allocations by JobID
ws := memdb.NewWatchSet()
out, err := h.State.AllocsByJob(ws, job.Namespace, job.ID, false)
noErr(t, err)
// Ensure all allocations placed
require.Equal(2, len(out))
// Verify that one node has preempted allocs
require.NotNil(plan.NodePreemptions[nodes[0].ID])
preemptedAllocs := plan.NodePreemptions[nodes[0].ID]
// Verify that three jobs have preempted allocs
require.Equal(3, len(preemptedAllocs))
expectedPreemptedJobIDs := []string{job1.ID, job2.ID, job3.ID}
// We expect job1, job2 and job3 to have preempted allocations
// job4 should not have any allocs preempted
for _, alloc := range preemptedAllocs {
require.Contains(expectedPreemptedJobIDs, alloc.JobID)
}
// Look up the preempted allocs by job ID
ws = memdb.NewWatchSet()
for _, jobId := range expectedPreemptedJobIDs {
out, err = h.State.AllocsByJob(ws, structs.DefaultNamespace, jobId, false)
noErr(t, err)
for _, alloc := range out {
require.Equal(structs.AllocDesiredStatusEvict, alloc.DesiredStatus)
require.Equal(fmt.Sprintf("Preempted by alloc ID %v", preemptingAllocId), alloc.DesiredDescription)
}
}
h.AssertEvalStatus(t, structs.EvalStatusComplete)
}

View File

@ -96,6 +96,7 @@ func (h *Harness) SubmitPlan(plan *structs.Plan) (*structs.PlanResult, State, er
result := new(structs.PlanResult)
result.NodeUpdate = plan.NodeUpdate
result.NodeAllocation = plan.NodeAllocation
result.NodePreemptions = plan.NodePreemptions
result.AllocIndex = index
// Flatten evicts and allocs
@ -116,6 +117,18 @@ func (h *Harness) SubmitPlan(plan *structs.Plan) (*structs.PlanResult, State, er
}
}
// Set create and modify time for preempted allocs and flatten them
var preemptedAllocs []*structs.Allocation
for _, preemptions := range result.NodePreemptions {
for _, alloc := range preemptions {
if alloc.CreateTime == 0 {
alloc.CreateTime = now
}
alloc.ModifyTime = now
preemptedAllocs = append(preemptedAllocs, alloc)
}
}
// Setup the update request
req := structs.ApplyPlanResultsRequest{
AllocUpdateRequest: structs.AllocUpdateRequest{
@ -125,6 +138,7 @@ func (h *Harness) SubmitPlan(plan *structs.Plan) (*structs.PlanResult, State, er
Deployment: plan.Deployment,
DeploymentUpdates: plan.DeploymentUpdates,
EvalID: plan.EvalID,
NodePreemptions: preemptedAllocs,
}
// Apply the full plan