diff --git a/nomad/fsm.go b/nomad/fsm.go index 0111f0c35..ba02bf063 100644 --- a/nomad/fsm.go +++ b/nomad/fsm.go @@ -813,7 +813,7 @@ func (n *nomadFSM) applyPlanResults(buf []byte, index uint64) interface{} { n.logger.Error("ApplyPlan failed", "error", err) return err } - + n.handleUpsertedEvals(req.PreemptionEvals) return nil } diff --git a/nomad/plan_apply.go b/nomad/plan_apply.go index 12f4dc02a..66fd64add 100644 --- a/nomad/plan_apply.go +++ b/nomad/plan_apply.go @@ -10,6 +10,7 @@ import ( "github.com/armon/go-metrics" "github.com/hashicorp/go-multierror" + "github.com/hashicorp/nomad/helper/uuid" "github.com/hashicorp/nomad/nomad/state" "github.com/hashicorp/nomad/nomad/structs" "github.com/hashicorp/raft" @@ -163,6 +164,7 @@ func (p *planner) applyPlan(plan *structs.Plan, result *structs.PlanResult, snap Deployment: result.Deployment, DeploymentUpdates: result.DeploymentUpdates, EvalID: plan.EvalID, + NodePreemptions: make([]*structs.Allocation, 0, len(result.NodePreemptions)), } for _, updateList := range result.NodeUpdate { req.Alloc = append(req.Alloc, updateList...) @@ -171,6 +173,10 @@ func (p *planner) applyPlan(plan *structs.Plan, result *structs.PlanResult, snap req.Alloc = append(req.Alloc, allocList...) } + for _, preemptions := range result.NodePreemptions { + req.NodePreemptions = append(req.NodePreemptions, preemptions...) + } + // Set the time the alloc was applied for the first time. This can be used // to approximate the scheduling time. now := time.Now().UTC().UnixNano() @@ -181,6 +187,41 @@ func (p *planner) applyPlan(plan *structs.Plan, result *structs.PlanResult, snap alloc.ModifyTime = now } + // Set create and modify time for preempted allocs if any + // Also gather jobids to create follow up evals + preemptedJobIDs := make(map[structs.NamespacedID]struct{}) + for _, alloc := range req.NodePreemptions { + if alloc.CreateTime == 0 { + alloc.CreateTime = now + } + alloc.ModifyTime = now + id := structs.NamespacedID{Namespace: alloc.Namespace, ID: alloc.JobID} + _, ok := preemptedJobIDs[id] + if !ok { + preemptedJobIDs[id] = struct{}{} + } + } + + var evals []*structs.Evaluation + for preemptedJobID, _ := range preemptedJobIDs { + job, _ := p.State().JobByID(nil, preemptedJobID.Namespace, preemptedJobID.ID) // TODO Fix me + if job != nil { + //TODO(preetha): This eval is missing class eligibility related fields + // need to figure out how to set them per job + eval := &structs.Evaluation{ + ID: uuid.Generate(), + Namespace: job.Namespace, + TriggeredBy: structs.EvalTriggerPreemption, + JobID: job.ID, + Type: job.Type, + Priority: job.Priority, + Status: structs.EvalStatusPending, + } + evals = append(evals, eval) + } + } + req.PreemptionEvals = evals + // Dispatch the Raft transaction future, err := p.raftApplyFuture(structs.ApplyPlanResultsRequestType, &req) if err != nil { @@ -259,6 +300,7 @@ func evaluatePlanPlacements(pool *EvaluatePool, snap *state.StateSnapshot, plan NodeAllocation: make(map[string][]*structs.Allocation), Deployment: plan.Deployment.Copy(), DeploymentUpdates: plan.DeploymentUpdates, + NodePreemptions: make(map[string][]*structs.Allocation), } // Collect all the nodeIDs @@ -304,6 +346,7 @@ func evaluatePlanPlacements(pool *EvaluatePool, snap *state.StateSnapshot, plan result.NodeAllocation = nil result.DeploymentUpdates = nil result.Deployment = nil + result.NodePreemptions = nil return true } @@ -318,6 +361,25 @@ func evaluatePlanPlacements(pool *EvaluatePool, snap *state.StateSnapshot, plan if nodeAlloc := plan.NodeAllocation[nodeID]; len(nodeAlloc) > 0 { result.NodeAllocation[nodeID] = nodeAlloc } + + if nodePreemptions := plan.NodePreemptions[nodeID]; nodePreemptions != nil { + var filteredNodePreemptions []*structs.Allocation + // Do a pass over preempted allocs in the plan to check + // whether the alloc is already in a terminal state + for _, preemptedAlloc := range nodePreemptions { + alloc, err := snap.AllocByID(nil, preemptedAlloc.ID) + if err != nil { + mErr.Errors = append(mErr.Errors, err) + } + if alloc != nil { + if !alloc.TerminalStatus() { + filteredNodePreemptions = append(filteredNodePreemptions, preemptedAlloc) + } + } + } + result.NodePreemptions[nodeID] = filteredNodePreemptions + } + return } @@ -461,6 +523,14 @@ func evaluateNodePlan(snap *state.StateSnapshot, plan *structs.Plan, nodeID stri if update := plan.NodeUpdate[nodeID]; len(update) > 0 { remove = append(remove, update...) } + + // Remove any preempted allocs + if preempted := plan.NodePreemptions[nodeID]; len(preempted) > 0 { + for _, allocs := range preempted { + remove = append(remove, allocs) + } + } + if updated := plan.NodeAllocation[nodeID]; len(updated) > 0 { for _, alloc := range updated { remove = append(remove, alloc) diff --git a/nomad/state/state_store.go b/nomad/state/state_store.go index 8885ebab4..d212b508b 100644 --- a/nomad/state/state_store.go +++ b/nomad/state/state_store.go @@ -218,6 +218,43 @@ func (s *StateStore) UpsertPlanResults(index uint64, results *structs.ApplyPlanR } } + // TODO(preetha) do a pass to group by jobid + // Prepare preempted allocs in the plan results for update + for _, preemptedAlloc := range results.NodePreemptions { + // Look for existing alloc + existing, err := txn.First("allocs", "id", preemptedAlloc.ID) + if err != nil { + return fmt.Errorf("alloc lookup failed: %v", err) + } + + // Nothing to do if this does not exist + if existing == nil { + return nil + } + exist := existing.(*structs.Allocation) + + // Copy everything from the existing allocation + copyAlloc := exist.Copy() + + // Only update the fields set by the scheduler + copyAlloc.DesiredStatus = preemptedAlloc.DesiredStatus + copyAlloc.PreemptedByAllocation = preemptedAlloc.PreemptedByAllocation + copyAlloc.DesiredDescription = preemptedAlloc.DesiredDescription + + // Upsert the preempted allocations + if err := s.upsertAllocsImpl(index, []*structs.Allocation{copyAlloc}, txn); err != nil { + return err + } + } + + // Upsert followup evals for allocs that were preempted + + for _, eval := range results.PreemptionEvals { + if err := s.nestedUpsertEval(txn, index, eval); err != nil { + return err + } + } + txn.Commit() return nil } diff --git a/nomad/structs/structs.go b/nomad/structs/structs.go index be625dcd7..868514cf9 100644 --- a/nomad/structs/structs.go +++ b/nomad/structs/structs.go @@ -7796,7 +7796,7 @@ const ( EvalTriggerMaxPlans = "max-plan-attempts" EvalTriggerRetryFailedAlloc = "alloc-failure" EvalTriggerQueuedAllocs = "queued-allocs" - EvalTriggerPreemption = "preempted" + EvalTriggerPreemption = "preemption" ) const ( @@ -8177,21 +8177,23 @@ func (p *Plan) AppendUpdate(alloc *Allocation, desiredStatus, desiredDesc, clien p.NodeUpdate[node] = append(existing, newAlloc) } +// AppendPreemptedAlloc is used to append an allocation that's being preempted to the plan. +// To minimize the size of the plan, this only sets a minimal set of fields in the allocation func (p *Plan) AppendPreemptedAlloc(alloc *Allocation, desiredStatus, preemptingAllocID string) { - newAlloc := new(Allocation) - *newAlloc = *alloc - // Normalize the job - newAlloc.Job = nil - - // Strip the resources as it can be rebuilt. - newAlloc.Resources = nil - + newAlloc := &Allocation{} + newAlloc.ID = alloc.ID + newAlloc.JobID = alloc.JobID + newAlloc.Namespace = alloc.Namespace newAlloc.DesiredStatus = desiredStatus newAlloc.PreemptedByAllocation = preemptingAllocID desiredDesc := fmt.Sprintf("Preempted by alloc ID %v", preemptingAllocID) newAlloc.DesiredDescription = desiredDesc + // TaskResources are needed by the plan applier to check if allocations fit + // after removing preempted allocations + newAlloc.TaskResources = alloc.TaskResources + node := alloc.NodeID // Append this alloc to slice for this node existing := p.NodePreemptions[node] diff --git a/scheduler/context_test.go b/scheduler/context_test.go index 28416b047..8eb7792c8 100644 --- a/scheduler/context_test.go +++ b/scheduler/context_test.go @@ -14,8 +14,9 @@ import ( func testContext(t testing.TB) (*state.StateStore, *EvalContext) { state := state.TestStateStore(t) plan := &structs.Plan{ - NodeUpdate: make(map[string][]*structs.Allocation), - NodeAllocation: make(map[string][]*structs.Allocation), + NodeUpdate: make(map[string][]*structs.Allocation), + NodeAllocation: make(map[string][]*structs.Allocation), + NodePreemptions: make(map[string][]*structs.Allocation), } logger := testlog.HCLogger(t) diff --git a/scheduler/generic_sched.go b/scheduler/generic_sched.go index cf6ce977a..541678fd5 100644 --- a/scheduler/generic_sched.go +++ b/scheduler/generic_sched.go @@ -130,7 +130,8 @@ func (s *GenericScheduler) Process(eval *structs.Evaluation) error { structs.EvalTriggerRollingUpdate, structs.EvalTriggerQueuedAllocs, structs.EvalTriggerPeriodicJob, structs.EvalTriggerMaxPlans, structs.EvalTriggerDeploymentWatcher, structs.EvalTriggerRetryFailedAlloc, - structs.EvalTriggerFailedFollowUp: + structs.EvalTriggerFailedFollowUp, + structs.EvalTriggerPreemption: default: desc := fmt.Sprintf("scheduler cannot handle '%s' evaluation reason", eval.TriggeredBy) diff --git a/scheduler/preemption.go b/scheduler/preemption.go new file mode 100644 index 000000000..f3a80d272 --- /dev/null +++ b/scheduler/preemption.go @@ -0,0 +1,437 @@ +package scheduler + +import ( + "fmt" + "math" + "sort" + + "github.com/hashicorp/nomad/nomad/structs" +) + +// maxParallelPenalty is a score penalty applied to allocations to mitigate against +// too many allocations of the same job being preempted. This penalty is applied after the +// number of allocations being preempted exceeds max_parallel value in the job's migrate stanza +const maxParallelPenalty = 50.0 + +type PreemptionType uint8 + +const ( + NetworkResource PreemptionType = iota + CPUMemoryDiskIOPS +) + +// resourceDistance returns how close the resource is to the resource being asked for +// It is calculated by first computing a relative fraction and then measuring how close +// that is to the origin coordinate. Lower values are better +func resourceDistance(resource *structs.Resources, resourceAsk *structs.Resources) float64 { + memoryCoord, cpuCoord, iopsCoord, diskMBCoord := 0.0, 0.0, 0.0, 0.0 + if resourceAsk.MemoryMB > 0 { + memoryCoord = float64(resourceAsk.MemoryMB-resource.MemoryMB) / float64(resourceAsk.MemoryMB) + } + if resourceAsk.CPU > 0 { + cpuCoord = float64(resourceAsk.CPU-resource.CPU) / float64(resourceAsk.CPU) + } + if resourceAsk.IOPS > 0 { + iopsCoord = float64(resourceAsk.IOPS-resource.IOPS) / float64(resourceAsk.IOPS) + } + if resourceAsk.DiskMB > 0 { + diskMBCoord = float64(resourceAsk.DiskMB-resource.DiskMB) / float64(resourceAsk.DiskMB) + } + + originDist := math.Sqrt( + math.Pow(memoryCoord, 2) + + math.Pow(cpuCoord, 2) + + math.Pow(iopsCoord, 2) + + math.Pow(diskMBCoord, 2)) + return originDist +} + +// networkResourceDistance returns distance based on network megabits +func networkResourceDistance(resource *structs.Resources, resourceAsk *structs.Resources) float64 { + networkCoord := math.MaxFloat64 + if len(resourceAsk.Networks) > 0 && resourceAsk.Networks[0].MBits > 0 { + networkCoord = float64(resourceAsk.Networks[0].MBits-resource.Networks[0].MBits) / float64(resourceAsk.Networks[0].MBits) + } + + originDist := math.Sqrt( + math.Pow(networkCoord, 2)) + return originDist +} + +// getPreemptionScore is used to calculate a score (lower is better) based on the distance between +// the needed resource and requirements. A penalty is added when the choice already has some existing +// allocations in the plan that are being preempted. +func getPreemptionScore(resource *structs.Resources, resourceAsk *structs.Resources, preemptionType PreemptionType, maxParallel int, numPreemptedAllocs int) float64 { + maxParallelScorePenalty := 0.0 + if maxParallel > 0 && numPreemptedAllocs >= maxParallel { + maxParallelScorePenalty = float64((numPreemptedAllocs+1)-maxParallel) * maxParallelPenalty + } + switch preemptionType { + case NetworkResource: + return networkResourceDistance(resource, resourceAsk) + maxParallelScorePenalty + case CPUMemoryDiskIOPS: + return resourceDistance(resource, resourceAsk) + maxParallelScorePenalty + } + panic(fmt.Errorf("Unknown preemption type:%v", preemptionType)) +} + +// findPreemptibleAllocationsForTaskGroup computes a list of allocations to preempt to accommodate +// the resources asked for. Only allocs with a job priority < 10 of jobPriority are considered +// This method is used after network resource needs have already been met. +func findPreemptibleAllocationsForTaskGroup(jobPriority int, current []*structs.Allocation, resourceAsk *structs.Resources, node *structs.Node, currentPreemptions []*structs.Allocation) []*structs.Allocation { + resourcesNeeded := resourceAsk.Copy() + allocsByPriority := filterAndGroupPreemptibleAllocs(jobPriority, current) + var bestAllocs []*structs.Allocation + allRequirementsMet := false + var preemptedResources *structs.Resources + + //TODO(preetha): should add some debug logging + + nodeRemainingResources := node.Resources.Copy() + + // Initialize nodeRemainingResources with the remaining resources + // after accounting for reserved resources and all allocations + + // Subtract the reserved resources of the node + if node.Reserved != nil { + nodeRemainingResources.Subtract(node.Reserved) + } + + // Subtract current allocations + for _, alloc := range current { + nodeRemainingResources.Subtract(alloc.Resources) + } + + // Iterate over allocations grouped by priority to find preemptible allocations + for _, allocGrp := range allocsByPriority { + for len(allocGrp.allocs) > 0 && !allRequirementsMet { + closestAllocIndex := -1 + bestDistance := math.MaxFloat64 + // find the alloc with the closest distance + for index, alloc := range allocGrp.allocs { + currentPreemptionCount := computeCurrentPreemptions(alloc, currentPreemptions) + maxParallel := 0 + tg := alloc.Job.LookupTaskGroup(alloc.TaskGroup) + if tg != nil && tg.Migrate != nil { + maxParallel = tg.Migrate.MaxParallel + } + distance := getPreemptionScore(alloc.Resources, resourcesNeeded, CPUMemoryDiskIOPS, maxParallel, currentPreemptionCount) + if distance < bestDistance { + bestDistance = distance + closestAllocIndex = index + } + } + closestAlloc := allocGrp.allocs[closestAllocIndex] + + if preemptedResources == nil { + preemptedResources = closestAlloc.Resources.Copy() + } else { + preemptedResources.Add(closestAlloc.Resources) + } + availableResources := preemptedResources.Copy() + availableResources.Add(nodeRemainingResources) + + allRequirementsMet = meetsNonNetworkRequirements(availableResources, resourceAsk) + bestAllocs = append(bestAllocs, closestAlloc) + + allocGrp.allocs[closestAllocIndex] = allocGrp.allocs[len(allocGrp.allocs)-1] + allocGrp.allocs = allocGrp.allocs[:len(allocGrp.allocs)-1] + resourcesNeeded.Subtract(closestAlloc.Resources) + } + if allRequirementsMet { + break + } + } + + // Early return if all allocs examined and requirements were not met + if !allRequirementsMet { + return nil + } + + // We do another pass to eliminate unnecessary preemptions + // This filters out allocs whose resources are already covered by another alloc + filteredBestAllocs := eliminateSuperSetAllocations(bestAllocs, resourceAsk, nodeRemainingResources, resourceDistance, meetsNonNetworkRequirements) + return filteredBestAllocs + +} + +// computeCurrentPreemptions counts the number of other allocations being preempted that match the job and task group of +// the alloc under consideration. This is used as a scoring factor to minimize too many allocs of the same job being preempted at once +func computeCurrentPreemptions(currentAlloc *structs.Allocation, currentPreemptions []*structs.Allocation) int { + numCurrentPreemptionsForJob := 0 + for _, alloc := range currentPreemptions { + if alloc.JobID == currentAlloc.JobID && alloc.Namespace == currentAlloc.Namespace && alloc.TaskGroup == currentAlloc.TaskGroup { + numCurrentPreemptionsForJob++ + } + } + return numCurrentPreemptionsForJob +} + +// meetsNonNetworkRequirements checks if the first resource meets or exceeds the second resource's requirements +// This intentionally ignores network requirements, those are handled by meetsNetworkRequirements +func meetsNonNetworkRequirements(first *structs.Resources, second *structs.Resources) bool { + if first.CPU < second.CPU { + return false + } + if first.MemoryMB < second.MemoryMB { + return false + } + if first.DiskMB < second.DiskMB { + return false + } + if first.IOPS < second.IOPS { + return false + } + return true +} + +// meetsNetworkRequirements checks if the first resource meets or exceeds the second resource's network MBits requirements +func meetsNetworkRequirements(first *structs.Resources, second *structs.Resources) bool { + if len(first.Networks) == 0 || len(second.Networks) == 0 { + return false + } + return first.Networks[0].MBits >= second.Networks[0].MBits +} + +type groupedAllocs struct { + priority int + allocs []*structs.Allocation +} + +func filterAndGroupPreemptibleAllocs(jobPriority int, current []*structs.Allocation) []*groupedAllocs { + allocsByPriority := make(map[int][]*structs.Allocation) + for _, alloc := range current { + // Why is alloc.Job even nil though? + if alloc.Job == nil { + continue + } + + // Skip allocs whose priority is within a delta of 10 + // This also skips any allocs of the current job + // for which we are attempting preemption + if jobPriority-alloc.Job.Priority < 10 { + continue + } + grpAllocs, ok := allocsByPriority[alloc.Job.Priority] + if !ok { + grpAllocs = make([]*structs.Allocation, 0) + } + grpAllocs = append(grpAllocs, alloc) + allocsByPriority[alloc.Job.Priority] = grpAllocs + } + + var groupedSortedAllocs []*groupedAllocs + for priority, allocs := range allocsByPriority { + groupedSortedAllocs = append(groupedSortedAllocs, &groupedAllocs{ + priority: priority, + allocs: allocs}) + } + + // Sort by priority + sort.Slice(groupedSortedAllocs, func(i, j int) bool { + return groupedSortedAllocs[i].priority < groupedSortedAllocs[j].priority + }) + + return groupedSortedAllocs +} + +type distanceFn func(first *structs.Resources, second *structs.Resources) float64 + +type meetsRequirementsFn func(first *structs.Resources, second *structs.Resources) bool + +func eliminateSuperSetAllocations(bestAllocs []*structs.Allocation, resourceAsk *structs.Resources, + nodeRemainingResources *structs.Resources, distanceFunc distanceFn, reqFunc meetsRequirementsFn) []*structs.Allocation { + // Sort by distance reversed to surface any superset allocs first + sort.Slice(bestAllocs, func(i, j int) bool { + distance1 := distanceFunc(bestAllocs[i].Resources, resourceAsk) + distance2 := distanceFunc(bestAllocs[j].Resources, resourceAsk) + return distance1 > distance2 + }) + + var preemptedResources *structs.Resources + var filteredBestAllocs []*structs.Allocation + + // Do another pass to eliminate allocations that are a superset of other allocations + // in the preemption set + for _, alloc := range bestAllocs { + if preemptedResources == nil { + preemptedResources = alloc.Resources + } else { + preemptedResources.Add(alloc.Resources) + } + filteredBestAllocs = append(filteredBestAllocs, alloc) + availableResources := preemptedResources.Copy() + availableResources.Add(nodeRemainingResources) + + requirementsMet := reqFunc(availableResources, resourceAsk) + if requirementsMet { + break + } + } + return filteredBestAllocs +} + +// preemptForNetworkResourceAsk tries to find allocations to preempt to meet network resources. +// this needs to consider network resources at the task level and for the same task it should +// only preempt allocations that share the same network device +func preemptForNetworkResourceAsk(jobPriority int, currentAllocs []*structs.Allocation, resourceAsk *structs.Resources, + netIdx *structs.NetworkIndex, currentPreemptions []*structs.Allocation) []*structs.Allocation { + + // Early return if there are no current allocs + if len(currentAllocs) == 0 { + return nil + } + + networkResourceAsk := resourceAsk.Networks[0] + deviceToAllocs := make(map[string][]*structs.Allocation) + MbitsNeeded := networkResourceAsk.MBits + reservedPortsNeeded := networkResourceAsk.ReservedPorts + + // Create a map from each device to allocs + // We do this because to place a task we have to be able to + // preempt allocations that are using the same device. + // + // This step also filters out high priority allocations and allocations + // that are not using any network resources + for _, alloc := range currentAllocs { + if alloc.Job == nil { + continue + } + + if jobPriority-alloc.Job.Priority < 10 { + continue + } + if len(alloc.Resources.Networks) > 0 { + device := alloc.Resources.Networks[0].Device + allocsForDevice := deviceToAllocs[device] + allocsForDevice = append(allocsForDevice, alloc) + deviceToAllocs[device] = allocsForDevice + } + } + + // If no existing allocations use network resources, return early + if len(deviceToAllocs) == 0 { + return nil + } + + var allocsToPreempt []*structs.Allocation + + met := false + freeBandwidth := 0 + + for device, currentAllocs := range deviceToAllocs { + totalBandwidth := netIdx.AvailBandwidth[device] + // If the device doesn't have enough total available bandwidth, skip + if totalBandwidth < MbitsNeeded { + continue + } + + // Track how much existing free bandwidth we have before preemption + freeBandwidth = totalBandwidth - netIdx.UsedBandwidth[device] + + preemptedBandwidth := 0 + + // Reset allocsToPreempt since we don't want to preempt across devices for the same task + allocsToPreempt = nil + + // Build map from used reserved ports to allocation + usedPortToAlloc := make(map[int]*structs.Allocation) + + // First try to satisfy needed reserved ports + if len(reservedPortsNeeded) > 0 { + for _, alloc := range currentAllocs { + for _, tr := range alloc.TaskResources { + reservedPorts := tr.Networks[0].ReservedPorts + for _, p := range reservedPorts { + usedPortToAlloc[p.Value] = alloc + } + } + } + + // Look for allocs that are using reserved ports needed + for _, port := range reservedPortsNeeded { + alloc, ok := usedPortToAlloc[port.Value] + if ok { + preemptedBandwidth += alloc.Resources.Networks[0].MBits + allocsToPreempt = append(allocsToPreempt, alloc) + } + } + + // Remove allocs that were preempted to satisfy reserved ports + currentAllocs = structs.RemoveAllocs(currentAllocs, allocsToPreempt) + } + + // If bandwidth requirements have been met, stop + if preemptedBandwidth+freeBandwidth >= MbitsNeeded { + met = true + break + } + + // Split by priority + allocsByPriority := filterAndGroupPreemptibleAllocs(jobPriority, currentAllocs) + + for _, allocsGrp := range allocsByPriority { + allocs := allocsGrp.allocs + + // Sort by distance function that takes into account needed MBits + // as well as penalty for preempting an allocation + // whose task group already has existing preemptions + sort.Slice(allocs, func(i, j int) bool { + firstAlloc := allocs[i] + currentPreemptionCount1 := computeCurrentPreemptions(firstAlloc, currentPreemptions) + + // Look up configured maxParallel value for these allocation's task groups + var maxParallel1, maxParallel2 int + tg1 := allocs[i].Job.LookupTaskGroup(allocs[i].TaskGroup) + if tg1 != nil && tg1.Migrate != nil { + maxParallel1 = tg1.Migrate.MaxParallel + } + distance1 := getPreemptionScore(allocs[i].Resources, resourceAsk, NetworkResource, maxParallel1, currentPreemptionCount1) + + secondAlloc := allocs[j] + currentPreemptionCount2 := computeCurrentPreemptions(secondAlloc, currentPreemptions) + tg2 := secondAlloc.Job.LookupTaskGroup(secondAlloc.TaskGroup) + if tg2 != nil && tg2.Migrate != nil { + maxParallel2 = tg2.Migrate.MaxParallel + } + distance2 := getPreemptionScore(secondAlloc.Resources, resourceAsk, NetworkResource, maxParallel2, currentPreemptionCount2) + + return distance1 < distance2 + }) + + for _, alloc := range allocs { + preemptedBandwidth += alloc.Resources.Networks[0].MBits + allocsToPreempt = append(allocsToPreempt, alloc) + if preemptedBandwidth+freeBandwidth >= MbitsNeeded { + met = true + break + } + } + if met { + break + } + } + if met { + break + } + } + if len(allocsToPreempt) == 0 { + return nil + } + + // Build a resource object with just the network Mbits filled in + // Its safe to use the first preempted allocation's network resource + // here because all allocations preempted will be from the same device + nodeRemainingResources := &structs.Resources{ + Networks: []*structs.NetworkResource{ + { + Device: allocsToPreempt[0].Resources.Networks[0].Device, + MBits: freeBandwidth, + }, + }, + } + + // Do a final pass to eliminate any superset allocations + filteredBestAllocs := eliminateSuperSetAllocations(allocsToPreempt, resourceAsk, nodeRemainingResources, networkResourceDistance, meetsNetworkRequirements) + return filteredBestAllocs +} diff --git a/scheduler/preemption_test.go b/scheduler/preemption_test.go new file mode 100644 index 000000000..cacef52bd --- /dev/null +++ b/scheduler/preemption_test.go @@ -0,0 +1,789 @@ +package scheduler + +import ( + "testing" + + "fmt" + + "github.com/hashicorp/nomad/helper/uuid" + "github.com/hashicorp/nomad/nomad/mock" + "github.com/hashicorp/nomad/nomad/structs" + "github.com/stretchr/testify/require" +) + +func TestResourceDistance(t *testing.T) { + resourceAsk := &structs.Resources{ + CPU: 2048, + MemoryMB: 512, + IOPS: 300, + DiskMB: 4096, + Networks: []*structs.NetworkResource{ + { + Device: "eth0", + MBits: 1024, + }, + }, + } + + type testCase struct { + allocResource *structs.Resources + expectedDistance string + } + + testCases := []*testCase{ + { + &structs.Resources{ + CPU: 2048, + MemoryMB: 512, + IOPS: 300, + DiskMB: 4096, + Networks: []*structs.NetworkResource{ + { + Device: "eth0", + MBits: 1024, + }, + }, + }, + "0.000", + }, + { + &structs.Resources{ + CPU: 1024, + MemoryMB: 400, + IOPS: 200, + DiskMB: 1024, + Networks: []*structs.NetworkResource{ + { + Device: "eth0", + MBits: 1024, + }, + }, + }, + "0.986", + }, + { + &structs.Resources{ + CPU: 1024, + MemoryMB: 200, + IOPS: 200, + DiskMB: 1024, + Networks: []*structs.NetworkResource{ + { + Device: "eth0", + MBits: 512, + }, + }, + }, + "1.138", + }, + { + &structs.Resources{ + CPU: 8192, + MemoryMB: 200, + IOPS: 200, + DiskMB: 1024, + Networks: []*structs.NetworkResource{ + { + Device: "eth0", + MBits: 512, + }, + }, + }, + "3.169", + }, + { + &structs.Resources{ + CPU: 2048, + MemoryMB: 500, + IOPS: 300, + DiskMB: 4096, + Networks: []*structs.NetworkResource{ + { + Device: "eth0", + MBits: 1024, + }, + }, + }, + "0.023", + }, + } + + for _, tc := range testCases { + t.Run("", func(t *testing.T) { + require := require.New(t) + require.Equal(tc.expectedDistance, fmt.Sprintf("%3.3f", resourceDistance(tc.allocResource, resourceAsk))) + }) + + } + +} + +func TestPreemption(t *testing.T) { + type testCase struct { + desc string + currentAllocations []*structs.Allocation + nodeReservedCapacity *structs.Resources + nodeCapacity *structs.Resources + resourceAsk *structs.Resources + jobPriority int + currentPreemptions []*structs.Allocation + preemptedAllocIDs map[string]struct{} + } + + highPrioJob := mock.Job() + highPrioJob.Priority = 100 + + lowPrioJob := mock.Job() + lowPrioJob.Priority = 30 + + lowPrioJob2 := mock.Job() + lowPrioJob2.Priority = 30 + + // Create some persistent alloc ids to use in test cases + allocIDs := []string{uuid.Generate(), uuid.Generate(), uuid.Generate(), uuid.Generate(), uuid.Generate()} + + nodeResources := &structs.Resources{ + CPU: 4000, + MemoryMB: 8192, + DiskMB: 100 * 1024, + IOPS: 150, + Networks: []*structs.NetworkResource{ + { + Device: "eth0", + CIDR: "192.168.0.100/32", + MBits: 1000, + }, + }, + } + reservedNodeResources := &structs.Resources{ + CPU: 100, + MemoryMB: 256, + DiskMB: 4 * 1024, + Networks: []*structs.NetworkResource{ + { + Device: "eth0", + IP: "192.168.0.100", + ReservedPorts: []structs.Port{{Label: "ssh", Value: 22}}, + MBits: 1, + }, + }, + } + + testCases := []testCase{ + { + desc: "No preemption because existing allocs are not low priority", + currentAllocations: []*structs.Allocation{ + createAlloc(allocIDs[0], highPrioJob, &structs.Resources{ + CPU: 3200, + MemoryMB: 7256, + DiskMB: 4 * 1024, + IOPS: 150, + Networks: []*structs.NetworkResource{ + { + Device: "eth0", + IP: "192.168.0.100", + MBits: 50, + }, + }, + })}, + nodeReservedCapacity: reservedNodeResources, + nodeCapacity: nodeResources, + jobPriority: 100, + resourceAsk: &structs.Resources{ + CPU: 100, + MemoryMB: 256, + DiskMB: 4 * 1024, + Networks: []*structs.NetworkResource{ + { + Device: "eth0", + IP: "192.168.0.100", + ReservedPorts: []structs.Port{{Label: "ssh", Value: 22}}, + MBits: 1, + }, + }, + }, + }, + { + desc: "Preempting low priority allocs not enough to meet resource ask", + currentAllocations: []*structs.Allocation{ + createAlloc(allocIDs[0], lowPrioJob, &structs.Resources{ + CPU: 3200, + MemoryMB: 7256, + DiskMB: 4 * 1024, + IOPS: 150, + Networks: []*structs.NetworkResource{ + { + Device: "eth0", + IP: "192.168.0.100", + MBits: 50, + }, + }, + })}, + nodeReservedCapacity: reservedNodeResources, + nodeCapacity: nodeResources, + jobPriority: 100, + resourceAsk: &structs.Resources{ + CPU: 4000, + MemoryMB: 8192, + DiskMB: 4 * 1024, + IOPS: 300, + Networks: []*structs.NetworkResource{ + { + Device: "eth0", + IP: "192.168.0.100", + ReservedPorts: []structs.Port{{Label: "ssh", Value: 22}}, + MBits: 1, + }, + }, + }, + }, + { + desc: "Combination of high/low priority allocs, without static ports", + currentAllocations: []*structs.Allocation{ + createAlloc(allocIDs[0], highPrioJob, &structs.Resources{ + CPU: 2800, + MemoryMB: 2256, + DiskMB: 4 * 1024, + IOPS: 150, + Networks: []*structs.NetworkResource{ + { + Device: "eth0", + IP: "192.168.0.100", + MBits: 150, + }, + }, + }), + createAlloc(allocIDs[1], lowPrioJob, &structs.Resources{ + CPU: 200, + MemoryMB: 256, + DiskMB: 4 * 1024, + IOPS: 50, + Networks: []*structs.NetworkResource{ + { + Device: "eth0", + IP: "192.168.0.200", + MBits: 500, + }, + }, + }), + createAlloc(allocIDs[2], lowPrioJob, &structs.Resources{ + CPU: 200, + MemoryMB: 256, + DiskMB: 4 * 1024, + IOPS: 50, + Networks: []*structs.NetworkResource{ + { + Device: "eth0", + IP: "192.168.0.100", + MBits: 300, + }, + }, + }), + createAlloc(allocIDs[3], lowPrioJob, &structs.Resources{ + CPU: 700, + MemoryMB: 256, + DiskMB: 4 * 1024, + }), + }, + nodeReservedCapacity: reservedNodeResources, + nodeCapacity: nodeResources, + jobPriority: 100, + resourceAsk: &structs.Resources{ + CPU: 1100, + MemoryMB: 1000, + DiskMB: 25 * 1024, + Networks: []*structs.NetworkResource{ + { + Device: "eth0", + IP: "192.168.0.100", + MBits: 840, + }, + }, + }, + preemptedAllocIDs: map[string]struct{}{ + allocIDs[1]: {}, + allocIDs[2]: {}, + allocIDs[3]: {}, + }, + }, { + desc: "Preemption needed for all resources except network", + currentAllocations: []*structs.Allocation{ + createAlloc(allocIDs[0], highPrioJob, &structs.Resources{ + CPU: 2800, + MemoryMB: 2256, + DiskMB: 40 * 1024, + IOPS: 100, + Networks: []*structs.NetworkResource{ + { + Device: "eth0", + IP: "192.168.0.100", + MBits: 150, + }, + }, + }), + createAlloc(allocIDs[1], lowPrioJob, &structs.Resources{ + CPU: 200, + MemoryMB: 256, + DiskMB: 4 * 1024, + IOPS: 50, + Networks: []*structs.NetworkResource{ + { + Device: "eth0", + IP: "192.168.0.200", + MBits: 50, + }, + }, + }), + createAlloc(allocIDs[2], lowPrioJob, &structs.Resources{ + CPU: 200, + MemoryMB: 512, + DiskMB: 25 * 1024, + }), + createAlloc(allocIDs[3], lowPrioJob, &structs.Resources{ + CPU: 700, + MemoryMB: 276, + DiskMB: 20 * 1024, + }), + }, + nodeReservedCapacity: reservedNodeResources, + nodeCapacity: nodeResources, + jobPriority: 100, + resourceAsk: &structs.Resources{ + CPU: 1000, + MemoryMB: 3000, + DiskMB: 50 * 1024, + Networks: []*structs.NetworkResource{ + { + Device: "eth0", + IP: "192.168.0.100", + MBits: 50, + }, + }, + }, + preemptedAllocIDs: map[string]struct{}{ + allocIDs[1]: {}, + allocIDs[2]: {}, + allocIDs[3]: {}, + }, + }, + { + desc: "Only one low priority alloc needs to be preempted", + currentAllocations: []*structs.Allocation{ + createAlloc(allocIDs[0], highPrioJob, &structs.Resources{ + CPU: 1200, + MemoryMB: 2256, + DiskMB: 4 * 1024, + IOPS: 50, + Networks: []*structs.NetworkResource{ + { + Device: "eth0", + IP: "192.168.0.100", + MBits: 150, + }, + }, + }), + createAlloc(allocIDs[1], lowPrioJob, &structs.Resources{ + CPU: 200, + MemoryMB: 256, + DiskMB: 4 * 1024, + IOPS: 10, + Networks: []*structs.NetworkResource{ + { + Device: "eth0", + IP: "192.168.0.100", + MBits: 500, + }, + }, + }), + createAlloc(allocIDs[2], lowPrioJob, &structs.Resources{ + CPU: 200, + MemoryMB: 256, + DiskMB: 4 * 1024, + IOPS: 10, + Networks: []*structs.NetworkResource{ + { + Device: "eth0", + IP: "192.168.0.200", + MBits: 300, + }, + }, + }), + }, + nodeReservedCapacity: reservedNodeResources, + nodeCapacity: nodeResources, + jobPriority: 100, + resourceAsk: &structs.Resources{ + CPU: 300, + MemoryMB: 500, + DiskMB: 5 * 1024, + Networks: []*structs.NetworkResource{ + { + Device: "eth0", + IP: "192.168.0.100", + MBits: 320, + }, + }, + }, + preemptedAllocIDs: map[string]struct{}{ + allocIDs[2]: {}, + }, + }, + { + desc: "one alloc meets static port need, another meets remaining mbits needed", + currentAllocations: []*structs.Allocation{ + createAlloc(allocIDs[0], highPrioJob, &structs.Resources{ + CPU: 1200, + MemoryMB: 2256, + DiskMB: 4 * 1024, + IOPS: 150, + Networks: []*structs.NetworkResource{ + { + Device: "eth0", + IP: "192.168.0.100", + MBits: 150, + }, + }, + }), + createAlloc(allocIDs[1], lowPrioJob, &structs.Resources{ + CPU: 200, + MemoryMB: 256, + DiskMB: 4 * 1024, + IOPS: 50, + Networks: []*structs.NetworkResource{ + { + Device: "eth0", + IP: "192.168.0.200", + MBits: 500, + ReservedPorts: []structs.Port{ + { + Label: "db", + Value: 88, + }, + }, + }, + }, + }), + createAlloc(allocIDs[2], lowPrioJob, &structs.Resources{ + CPU: 200, + MemoryMB: 256, + DiskMB: 4 * 1024, + IOPS: 50, + Networks: []*structs.NetworkResource{ + { + Device: "eth0", + IP: "192.168.0.100", + MBits: 200, + }, + }, + }), + }, + nodeReservedCapacity: reservedNodeResources, + nodeCapacity: nodeResources, + jobPriority: 100, + resourceAsk: &structs.Resources{ + CPU: 2700, + MemoryMB: 1000, + DiskMB: 25 * 1024, + Networks: []*structs.NetworkResource{ + { + Device: "eth0", + IP: "192.168.0.100", + MBits: 800, + ReservedPorts: []structs.Port{ + { + Label: "db", + Value: 88, + }, + }, + }, + }, + }, + preemptedAllocIDs: map[string]struct{}{ + allocIDs[1]: {}, + allocIDs[2]: {}, + }, + }, + { + desc: "alloc that meets static port need also meets other needds", + currentAllocations: []*structs.Allocation{ + createAlloc(allocIDs[0], highPrioJob, &structs.Resources{ + CPU: 1200, + MemoryMB: 2256, + DiskMB: 4 * 1024, + IOPS: 50, + Networks: []*structs.NetworkResource{ + { + Device: "eth0", + IP: "192.168.0.100", + MBits: 150, + }, + }, + }), + createAlloc(allocIDs[1], lowPrioJob, &structs.Resources{ + CPU: 200, + MemoryMB: 256, + DiskMB: 4 * 1024, + IOPS: 50, + Networks: []*structs.NetworkResource{ + { + Device: "eth0", + IP: "192.168.0.200", + MBits: 600, + ReservedPorts: []structs.Port{ + { + Label: "db", + Value: 88, + }, + }, + }, + }, + }), + createAlloc(allocIDs[2], lowPrioJob, &structs.Resources{ + CPU: 200, + MemoryMB: 256, + DiskMB: 4 * 1024, + IOPS: 50, + Networks: []*structs.NetworkResource{ + { + Device: "eth0", + IP: "192.168.0.100", + MBits: 100, + }, + }, + }), + }, + nodeReservedCapacity: reservedNodeResources, + nodeCapacity: nodeResources, + jobPriority: 100, + resourceAsk: &structs.Resources{ + CPU: 600, + MemoryMB: 1000, + DiskMB: 25 * 1024, + Networks: []*structs.NetworkResource{ + { + Device: "eth0", + IP: "192.168.0.100", + MBits: 700, + ReservedPorts: []structs.Port{ + { + Label: "db", + Value: 88, + }, + }, + }, + }, + }, + preemptedAllocIDs: map[string]struct{}{ + allocIDs[1]: {}, + }, + }, + { + desc: "alloc from job that has existing evictions not chosen for preemption", + currentAllocations: []*structs.Allocation{ + createAlloc(allocIDs[0], highPrioJob, &structs.Resources{ + CPU: 1200, + MemoryMB: 2256, + DiskMB: 4 * 1024, + IOPS: 50, + Networks: []*structs.NetworkResource{ + { + Device: "eth0", + IP: "192.168.0.100", + MBits: 150, + }, + }, + }), + createAlloc(allocIDs[1], lowPrioJob, &structs.Resources{ + CPU: 200, + MemoryMB: 256, + DiskMB: 4 * 1024, + IOPS: 10, + Networks: []*structs.NetworkResource{ + { + Device: "eth0", + IP: "192.168.0.200", + MBits: 500, + }, + }, + }), + createAlloc(allocIDs[2], lowPrioJob2, &structs.Resources{ + CPU: 200, + MemoryMB: 256, + DiskMB: 4 * 1024, + IOPS: 10, + Networks: []*structs.NetworkResource{ + { + Device: "eth0", + IP: "192.168.0.100", + MBits: 300, + }, + }, + }), + }, + nodeReservedCapacity: reservedNodeResources, + nodeCapacity: nodeResources, + jobPriority: 100, + resourceAsk: &structs.Resources{ + CPU: 300, + MemoryMB: 500, + DiskMB: 5 * 1024, + Networks: []*structs.NetworkResource{ + { + Device: "eth0", + IP: "192.168.0.100", + MBits: 320, + }, + }, + }, + currentPreemptions: []*structs.Allocation{ + createAlloc(allocIDs[4], lowPrioJob2, &structs.Resources{ + CPU: 200, + MemoryMB: 256, + DiskMB: 4 * 1024, + IOPS: 10, + Networks: []*structs.NetworkResource{ + { + Device: "eth0", + IP: "192.168.0.100", + MBits: 300, + }, + }, + }), + }, + preemptedAllocIDs: map[string]struct{}{ + allocIDs[1]: {}, + }, + }, + // This test case exercises the code path for a final filtering step that tries to + // minimize the number of preemptible allocations + { + desc: "Filter out allocs whose resource usage superset is also in the preemption list", + currentAllocations: []*structs.Allocation{ + createAlloc(allocIDs[0], highPrioJob, &structs.Resources{ + CPU: 1800, + MemoryMB: 2256, + DiskMB: 4 * 1024, + IOPS: 50, + Networks: []*structs.NetworkResource{ + { + Device: "eth0", + IP: "192.168.0.100", + MBits: 150, + }, + }, + }), + createAlloc(allocIDs[1], lowPrioJob, &structs.Resources{ + CPU: 1500, + MemoryMB: 256, + DiskMB: 5 * 1024, + Networks: []*structs.NetworkResource{ + { + Device: "eth0", + IP: "192.168.0.100", + MBits: 100, + }, + }, + }), + createAlloc(allocIDs[2], lowPrioJob, &structs.Resources{ + CPU: 600, + MemoryMB: 256, + DiskMB: 5 * 1024, + Networks: []*structs.NetworkResource{ + { + Device: "eth0", + IP: "192.168.0.200", + MBits: 300, + }, + }, + }), + }, + nodeReservedCapacity: reservedNodeResources, + nodeCapacity: nodeResources, + jobPriority: 100, + resourceAsk: &structs.Resources{ + CPU: 1000, + MemoryMB: 256, + DiskMB: 5 * 1024, + Networks: []*structs.NetworkResource{ + { + Device: "eth0", + IP: "192.168.0.100", + MBits: 50, + }, + }, + }, + preemptedAllocIDs: map[string]struct{}{ + allocIDs[1]: {}, + }, + }, + } + + for _, tc := range testCases { + t.Run(tc.desc, func(t *testing.T) { + node := mock.Node() + node.Resources = tc.nodeCapacity + node.Reserved = tc.nodeReservedCapacity + + state, ctx := testContext(t) + nodes := []*RankedNode{ + { + Node: node, + }, + } + state.UpsertNode(1000, node) + for _, alloc := range tc.currentAllocations { + alloc.NodeID = node.ID + } + require := require.New(t) + err := state.UpsertAllocs(1001, tc.currentAllocations) + require.Nil(err) + if tc.currentPreemptions != nil { + ctx.plan.NodePreemptions[node.ID] = tc.currentPreemptions + } + static := NewStaticRankIterator(ctx, nodes) + binPackIter := NewBinPackIterator(ctx, static, true, tc.jobPriority) + + taskGroup := &structs.TaskGroup{ + EphemeralDisk: &structs.EphemeralDisk{}, + Tasks: []*structs.Task{ + { + Name: "web", + Resources: tc.resourceAsk, + }, + }, + } + binPackIter.SetTaskGroup(taskGroup) + option := binPackIter.Next() + if tc.preemptedAllocIDs == nil { + require.Nil(option) + } else { + require.NotNil(option) + preemptedAllocs := option.PreemptedAllocs + require.Equal(len(tc.preemptedAllocIDs), len(preemptedAllocs)) + for _, alloc := range preemptedAllocs { + _, ok := tc.preemptedAllocIDs[alloc.ID] + require.True(ok) + } + } + }) + } +} + +// helper method to create allocations with given jobs and resources +func createAlloc(id string, job *structs.Job, resource *structs.Resources) *structs.Allocation { + alloc := &structs.Allocation{ + ID: id, + Job: job, + JobID: job.ID, + TaskResources: map[string]*structs.Resources{ + "web": resource, + }, + Resources: resource, + Namespace: structs.DefaultNamespace, + EvalID: uuid.Generate(), + DesiredStatus: structs.AllocDesiredStatusRun, + ClientStatus: structs.AllocClientStatusRunning, + TaskGroup: "web", + } + return alloc +} diff --git a/scheduler/rank.go b/scheduler/rank.go index d2e70a4e6..3c7adceaa 100644 --- a/scheduler/rank.go +++ b/scheduler/rank.go @@ -25,6 +25,10 @@ type RankedNode struct { // Allocs is used to cache the proposed allocations on the // node. This can be shared between iterators that require it. Proposed []*structs.Allocation + + // PreemptedAllocs is used by the BinpackIterator to identify allocs + // that should be preempted in order to make the placement + PreemptedAllocs []*structs.Allocation } func (r *RankedNode) GoString() string { @@ -195,6 +199,16 @@ OUTER: DiskMB: int64(iter.taskGroup.EphemeralDisk.SizeMB), }, } + + var allocsToPreempt []*structs.Allocation + + // Count the number of existing preemptions + allPreemptions := iter.ctx.Plan().NodePreemptions + var currentPreemptions []*structs.Allocation + for _, allocs := range allPreemptions { + currentPreemptions = append(currentPreemptions, allocs...) + } + for _, task := range iter.taskGroup.Tasks { // Allocate the resources taskResources := &structs.AllocatedTaskResources{ @@ -211,10 +225,40 @@ OUTER: ask := task.Resources.Networks[0].Copy() offer, err := netIdx.AssignNetwork(ask) if offer == nil { - iter.ctx.Metrics().ExhaustedNode(option.Node, - fmt.Sprintf("network: %s", err)) + // If eviction is not enabled, mark this node as exhausted and continue + if !iter.evict { + iter.ctx.Metrics().ExhaustedNode(option.Node, + fmt.Sprintf("network: %s", err)) + netIdx.Release() + continue OUTER + } + + // Look for preemptible allocations to satisfy the network resource for this task + preemptedAllocsForTaskNetwork := preemptForNetworkResourceAsk(iter.priority, proposed, taskResources, netIdx, currentPreemptions) + if preemptedAllocsForTaskNetwork == nil { + iter.ctx.Metrics().ExhaustedNode(option.Node, + fmt.Sprintf("unable to meet network resource %v after preemption", ask)) + netIdx.Release() + continue OUTER + } + allocsToPreempt = append(allocsToPreempt, preemptedAllocsForTaskNetwork...) + + // First subtract out preempted allocations + proposed = structs.RemoveAllocs(proposed, preemptedAllocsForTaskNetwork) + + // Reset the network index and try the offer again netIdx.Release() - continue OUTER + netIdx = structs.NewNetworkIndex() + netIdx.SetNode(option.Node) + netIdx.AddAllocs(proposed) + + offer, err = netIdx.AssignNetwork(ask) + if offer == nil { + iter.ctx.Metrics().ExhaustedNode(option.Node, + fmt.Sprintf("unexecpted error, unable to create offer after preempting:%v", err)) + netIdx.Release() + continue OUTER + } } // Reserve this to prevent another task from colliding @@ -234,19 +278,32 @@ OUTER: // Add the resources we are trying to fit proposed = append(proposed, &structs.Allocation{AllocatedResources: total}) - // Check if these allocations fit, if they do not, simply skip this node + // Check if these allocations fit fit, dim, util, _ := structs.AllocsFit(option.Node, proposed, netIdx) netIdx.Release() if !fit { - iter.ctx.Metrics().ExhaustedNode(option.Node, dim) - continue + // Skip the node if evictions are not enabled + if !iter.evict { + iter.ctx.Metrics().ExhaustedNode(option.Node, dim) + continue + } + // If eviction is enabled and the node doesn't fit the alloc, check if + // any allocs can be preempted + + // Remove the last element containing the current placement from proposed allocs + current := proposed[:len(proposed)-1] + preemptForTaskGroup := findPreemptibleAllocationsForTaskGroup(iter.priority, current, total, option.Node, currentPreemptions) + allocsToPreempt = append(allocsToPreempt, preemptForTaskGroup...) + // If we were unable to find preempted allocs to meet these requirements + // mark as exhausted and continue + if len(preemptForTaskGroup) == 0 { + iter.ctx.Metrics().ExhaustedNode(option.Node, dim) + continue + } + } + if len(allocsToPreempt) > 0 { + option.PreemptedAllocs = allocsToPreempt } - - // XXX: For now we completely ignore evictions. We should use that flag - // to determine if its possible to evict other lower priority allocations - // to make room. This explodes the search space, so it must be done - // carefully. - // Score the fit normally otherwise fitness := structs.ScoreFit(option.Node, util) normalizedFit := fitness / binPackingMaxFitScore diff --git a/scheduler/system_sched.go b/scheduler/system_sched.go index 60202eb68..912440aef 100644 --- a/scheduler/system_sched.go +++ b/scheduler/system_sched.go @@ -60,7 +60,7 @@ func (s *SystemScheduler) Process(eval *structs.Evaluation) error { // Verify the evaluation trigger reason is understood switch eval.TriggeredBy { case structs.EvalTriggerJobRegister, structs.EvalTriggerNodeUpdate, structs.EvalTriggerFailedFollowUp, - structs.EvalTriggerJobDeregister, structs.EvalTriggerRollingUpdate, + structs.EvalTriggerJobDeregister, structs.EvalTriggerRollingUpdate, structs.EvalTriggerPreemption, structs.EvalTriggerDeploymentWatcher, structs.EvalTriggerNodeDrain: default: desc := fmt.Sprintf("scheduler cannot handle '%s' evaluation reason", @@ -347,6 +347,24 @@ func (s *SystemScheduler) computePlacements(place []allocTuple) error { alloc.PreviousAllocation = missing.Alloc.ID } + // If this placement involves preemption, set DesiredState to stop for those allocations + if option.PreemptedAllocs != nil { + var preemptedAllocIDs []string + for _, stop := range option.PreemptedAllocs { + s.plan.AppendPreemptedAlloc(stop, structs.AllocDesiredStatusEvict, alloc.ID) + + preemptedAllocIDs = append(preemptedAllocIDs, stop.ID) + if s.eval.AnnotatePlan && s.plan.Annotations != nil { + s.plan.Annotations.PreemptedAllocs = append(s.plan.Annotations.PreemptedAllocs, stop.Stub()) + if s.plan.Annotations.DesiredTGUpdates != nil { + desired := s.plan.Annotations.DesiredTGUpdates[missing.TaskGroup.Name] + desired.Evict += 1 + } + } + } + alloc.PreemptedAllocations = preemptedAllocIDs + } + s.plan.AppendAlloc(alloc) } else { // Lazy initialize the failed map diff --git a/scheduler/system_sched_test.go b/scheduler/system_sched_test.go index 3d78b7061..d2cf1f5f8 100644 --- a/scheduler/system_sched_test.go +++ b/scheduler/system_sched_test.go @@ -6,11 +6,14 @@ import ( "testing" "time" + "fmt" + memdb "github.com/hashicorp/go-memdb" "github.com/hashicorp/nomad/helper" "github.com/hashicorp/nomad/helper/uuid" "github.com/hashicorp/nomad/nomad/mock" "github.com/hashicorp/nomad/nomad/structs" + "github.com/stretchr/testify/require" ) func TestSystemSched_JobRegister(t *testing.T) { @@ -218,7 +221,7 @@ func TestSystemSched_JobRegister_EphemeralDiskConstraint(t *testing.T) { JobID: job1.ID, Status: structs.EvalStatusPending, } - noErr(t, h.State.UpsertEvals(h.NextIndex(), []*structs.Evaluation{eval})) + noErr(t, h.State.UpsertEvals(h.NextIndex(), []*structs.Evaluation{eval1})) // Process the evaluation if err := h1.Process(NewSystemScheduler, eval1); err != nil { @@ -274,15 +277,31 @@ func TestSystemSched_ExhaustResources(t *testing.T) { JobID: job.ID, Status: structs.EvalStatusPending, } - noErr(t, h.State.UpsertEvals(h.NextIndex(), []*structs.Evaluation{eval})) + noErr(t, h.State.UpsertEvals(h.NextIndex(), []*structs.Evaluation{eval1})) // Process the evaluation if err := h.Process(NewSystemScheduler, eval1); err != nil { t.Fatalf("err: %v", err) } - // Ensure that we have one allocation queued from the system job eval + // System scheduler will preempt the service job and would have placed eval1 + require := require.New(t) + + newPlan := h.Plans[1] + require.Len(newPlan.NodeAllocation, 1) + require.Len(newPlan.NodePreemptions, 1) + + for _, allocList := range newPlan.NodeAllocation { + require.Len(allocList, 1) + require.Equal(job.ID, allocList[0].JobID) + } + + for _, allocList := range newPlan.NodePreemptions { + require.Len(allocList, 1) + require.Equal(svcJob.ID, allocList[0].JobID) + } + // Ensure that we have no queued allocations on the second eval queued := h.Evals[1].QueuedAllocations["web"] - if queued != 1 { + if queued != 0 { t.Fatalf("expected: %v, actual: %v", 1, queued) } } @@ -1529,3 +1548,322 @@ func TestSystemSched_QueuedAllocsMultTG(t *testing.T) { h.AssertEvalStatus(t, structs.EvalStatusComplete) } + +func TestSystemSched_Preemption(t *testing.T) { + h := NewHarness(t) + + // Create nodes + var nodes []*structs.Node + for i := 0; i < 2; i++ { + node := mock.Node() + node.Resources = &structs.Resources{ + CPU: 3072, + MemoryMB: 5034, + DiskMB: 20 * 1024, + IOPS: 150, + Networks: []*structs.NetworkResource{ + { + Device: "eth0", + CIDR: "192.168.0.100/32", + MBits: 1000, + }, + }, + } + noErr(t, h.State.UpsertNode(h.NextIndex(), node)) + nodes = append(nodes, node) + } + + // Create some low priority batch jobs and allocations for them + // One job uses a reserved port + job1 := mock.BatchJob() + job1.Type = structs.JobTypeBatch + job1.Priority = 20 + job1.TaskGroups[0].Tasks[0].Resources = &structs.Resources{ + CPU: 512, + MemoryMB: 1024, + Networks: []*structs.NetworkResource{ + { + MBits: 200, + ReservedPorts: []structs.Port{ + { + Label: "web", + Value: 80, + }, + }, + }, + }, + } + + alloc1 := mock.Alloc() + alloc1.Job = job1 + alloc1.JobID = job1.ID + alloc1.NodeID = nodes[0].ID + alloc1.Name = "my-job[0]" + alloc1.TaskGroup = job1.TaskGroups[0].Name + alloc1.Resources = &structs.Resources{ + CPU: 512, + MemoryMB: 1024, + DiskMB: 5 * 1024, + Networks: []*structs.NetworkResource{ + { + Device: "eth0", + MBits: 200, + ReservedPorts: []structs.Port{ + { + Label: "web", + Value: 80, + }, + }, + }, + }, + } + alloc1.TaskResources = map[string]*structs.Resources{ + "web": { + CPU: 512, + MemoryMB: 1024, + DiskMB: 5 * 1024, + Networks: []*structs.NetworkResource{ + { + Device: "eth0", + MBits: 200, + ReservedPorts: []structs.Port{ + { + Label: "web", + Value: 80, + }, + }, + }, + }, + }, + } + noErr(t, h.State.UpsertJob(h.NextIndex(), job1)) + + job2 := mock.BatchJob() + job2.Type = structs.JobTypeBatch + job2.Priority = 20 + job2.TaskGroups[0].Tasks[0].Resources = &structs.Resources{ + CPU: 512, + MemoryMB: 1024, + Networks: []*structs.NetworkResource{ + { + MBits: 200, + }, + }, + } + + alloc2 := mock.Alloc() + alloc2.Job = job2 + alloc2.JobID = job2.ID + alloc2.NodeID = nodes[0].ID + alloc2.Name = "my-job[2]" + alloc2.TaskGroup = job2.TaskGroups[0].Name + alloc2.Resources = &structs.Resources{ + CPU: 512, + MemoryMB: 1024, + DiskMB: 5 * 1024, + Networks: []*structs.NetworkResource{ + { + Device: "eth0", + MBits: 200, + }, + }, + } + alloc2.TaskResources = map[string]*structs.Resources{ + "web": { + CPU: 512, + MemoryMB: 1024, + DiskMB: 5 * 1024, + Networks: []*structs.NetworkResource{ + { + Device: "eth0", + MBits: 200, + }, + }, + }, + } + noErr(t, h.State.UpsertJob(h.NextIndex(), job2)) + + job3 := mock.Job() + job3.Type = structs.JobTypeBatch + job3.Priority = 40 + job3.TaskGroups[0].Tasks[0].Resources = &structs.Resources{ + CPU: 1024, + MemoryMB: 2048, + Networks: []*structs.NetworkResource{ + { + Device: "eth0", + MBits: 400, + }, + }, + } + + alloc3 := mock.Alloc() + alloc3.Job = job3 + alloc3.JobID = job3.ID + alloc3.NodeID = nodes[0].ID + alloc3.Name = "my-job[0]" + alloc3.TaskGroup = job3.TaskGroups[0].Name + alloc3.Resources = &structs.Resources{ + CPU: 1024, + MemoryMB: 25, + DiskMB: 5 * 1024, + Networks: []*structs.NetworkResource{ + { + Device: "eth0", + MBits: 400, + }, + }, + } + alloc3.TaskResources = map[string]*structs.Resources{ + "web": { + CPU: 1024, + MemoryMB: 25, + DiskMB: 5 * 1024, + Networks: []*structs.NetworkResource{ + { + Device: "eth0", + MBits: 400, + }, + }, + }, + } + noErr(t, h.State.UpsertAllocs(h.NextIndex(), []*structs.Allocation{alloc1, alloc2, alloc3})) + + // Create a high priority job and allocs for it + // These allocs should not be preempted + + job4 := mock.BatchJob() + job4.Type = structs.JobTypeBatch + job4.Priority = 100 + job4.TaskGroups[0].Tasks[0].Resources = &structs.Resources{ + CPU: 1024, + MemoryMB: 2048, + Networks: []*structs.NetworkResource{ + { + MBits: 100, + }, + }, + } + + alloc4 := mock.Alloc() + alloc4.Job = job4 + alloc4.JobID = job4.ID + alloc4.NodeID = nodes[0].ID + alloc4.Name = "my-job4[0]" + alloc4.TaskGroup = job4.TaskGroups[0].Name + alloc4.Resources = &structs.Resources{ + CPU: 1024, + MemoryMB: 2048, + DiskMB: 2 * 1024, + Networks: []*structs.NetworkResource{ + { + Device: "eth0", + MBits: 100, + }, + }, + } + alloc4.TaskResources = map[string]*structs.Resources{ + "web": { + CPU: 1024, + MemoryMB: 2048, + DiskMB: 2 * 1024, + Networks: []*structs.NetworkResource{ + { + Device: "eth0", + MBits: 100, + }, + }, + }, + } + noErr(t, h.State.UpsertJob(h.NextIndex(), job4)) + noErr(t, h.State.UpsertAllocs(h.NextIndex(), []*structs.Allocation{alloc4})) + + // Create a system job such that it would need to preempt both allocs to succeed + job := mock.SystemJob() + job.TaskGroups[0].Tasks[0].Resources = &structs.Resources{ + CPU: 1948, + MemoryMB: 256, + Networks: []*structs.NetworkResource{ + { + MBits: 800, + DynamicPorts: []structs.Port{{Label: "http"}}, + }, + }, + } + noErr(t, h.State.UpsertJob(h.NextIndex(), job)) + + // Create a mock evaluation to register the job + eval := &structs.Evaluation{ + Namespace: structs.DefaultNamespace, + ID: uuid.Generate(), + Priority: job.Priority, + TriggeredBy: structs.EvalTriggerJobRegister, + JobID: job.ID, + Status: structs.EvalStatusPending, + } + noErr(t, h.State.UpsertEvals(h.NextIndex(), []*structs.Evaluation{eval})) + + // Process the evaluation + err := h.Process(NewSystemScheduler, eval) + require := require.New(t) + require.Nil(err) + + // Ensure a single plan + require.Equal(1, len(h.Plans)) + plan := h.Plans[0] + + // Ensure the plan doesn't have annotations. + require.Nil(plan.Annotations) + + // Ensure the plan allocated on both nodes + var planned []*structs.Allocation + preemptingAllocId := "" + require.Equal(2, len(plan.NodeAllocation)) + + // The alloc that got placed on node 1 is the preemptor + for _, allocList := range plan.NodeAllocation { + planned = append(planned, allocList...) + for _, alloc := range allocList { + if alloc.NodeID == nodes[0].ID { + preemptingAllocId = alloc.ID + } + } + } + + // Lookup the allocations by JobID + ws := memdb.NewWatchSet() + out, err := h.State.AllocsByJob(ws, job.Namespace, job.ID, false) + noErr(t, err) + + // Ensure all allocations placed + require.Equal(2, len(out)) + + // Verify that one node has preempted allocs + require.NotNil(plan.NodePreemptions[nodes[0].ID]) + preemptedAllocs := plan.NodePreemptions[nodes[0].ID] + + // Verify that three jobs have preempted allocs + require.Equal(3, len(preemptedAllocs)) + + expectedPreemptedJobIDs := []string{job1.ID, job2.ID, job3.ID} + + // We expect job1, job2 and job3 to have preempted allocations + // job4 should not have any allocs preempted + for _, alloc := range preemptedAllocs { + require.Contains(expectedPreemptedJobIDs, alloc.JobID) + } + // Look up the preempted allocs by job ID + ws = memdb.NewWatchSet() + + for _, jobId := range expectedPreemptedJobIDs { + out, err = h.State.AllocsByJob(ws, structs.DefaultNamespace, jobId, false) + noErr(t, err) + for _, alloc := range out { + require.Equal(structs.AllocDesiredStatusEvict, alloc.DesiredStatus) + require.Equal(fmt.Sprintf("Preempted by alloc ID %v", preemptingAllocId), alloc.DesiredDescription) + } + } + + h.AssertEvalStatus(t, structs.EvalStatusComplete) + +} diff --git a/scheduler/testing.go b/scheduler/testing.go index 0410b1190..0a527ede9 100644 --- a/scheduler/testing.go +++ b/scheduler/testing.go @@ -96,6 +96,7 @@ func (h *Harness) SubmitPlan(plan *structs.Plan) (*structs.PlanResult, State, er result := new(structs.PlanResult) result.NodeUpdate = plan.NodeUpdate result.NodeAllocation = plan.NodeAllocation + result.NodePreemptions = plan.NodePreemptions result.AllocIndex = index // Flatten evicts and allocs @@ -116,6 +117,18 @@ func (h *Harness) SubmitPlan(plan *structs.Plan) (*structs.PlanResult, State, er } } + // Set create and modify time for preempted allocs and flatten them + var preemptedAllocs []*structs.Allocation + for _, preemptions := range result.NodePreemptions { + for _, alloc := range preemptions { + if alloc.CreateTime == 0 { + alloc.CreateTime = now + } + alloc.ModifyTime = now + preemptedAllocs = append(preemptedAllocs, alloc) + } + } + // Setup the update request req := structs.ApplyPlanResultsRequest{ AllocUpdateRequest: structs.AllocUpdateRequest{ @@ -125,6 +138,7 @@ func (h *Harness) SubmitPlan(plan *structs.Plan) (*structs.PlanResult, State, er Deployment: plan.Deployment, DeploymentUpdates: plan.DeploymentUpdates, EvalID: plan.EvalID, + NodePreemptions: preemptedAllocs, } // Apply the full plan