diff --git a/scheduler/generic_sched.go b/scheduler/generic_sched.go index 786012e05..b0c1ddcc4 100644 --- a/scheduler/generic_sched.go +++ b/scheduler/generic_sched.go @@ -383,7 +383,9 @@ func (s *GenericScheduler) computeJobAllocs() error { // Filter out the allocations in a terminal state allocs, _ = s.filterCompleteAllocs(allocs) - reconciler := NewAllocReconciler(s.ctx, s.stack, s.batch, s.eval, s.job, s.deployment, allocs, tainted) + reconciler := NewAllocReconciler(s.ctx.Logger(), + newAllocUpdateFn(s.ctx, s.stack, s.eval.ID), + s.batch, s.eval.JobID, s.job, s.deployment, allocs, tainted) results := reconciler.Compute() if s.eval.AnnotatePlan { diff --git a/scheduler/reconcile.go b/scheduler/reconcile.go index 7d1be5357..f6983689d 100644 --- a/scheduler/reconcile.go +++ b/scheduler/reconcile.go @@ -1,32 +1,42 @@ package scheduler import ( - memdb "github.com/hashicorp/go-memdb" + "log" + "github.com/hashicorp/nomad/helper" "github.com/hashicorp/nomad/nomad/structs" ) +// allocUpdateType takes an existing allocation and a new job definition and +// returns whether the allocation can ignore the change, requires a destructive +// update, or can be inplace updated. If it can be inplace updated, an updated +// allocation that has the new resources and alloc metrics attached will be +// returned. +type allocUpdateType func(existing *structs.Allocation, newJob *structs.Job, newTG *structs.TaskGroup) (ignore, destructive bool, updated *structs.Allocation) + // allocReconciler is used to determine the set of allocations that require // placement, inplace updating or stopping given the job specification and // existing cluster state. The reconciler should only be used for batch and // service jobs. type allocReconciler struct { - // ctx gives access to the state store and logger - ctx Context + // logger is used to log debug information. Logging should be kept at a + // minimal here + logger *log.Logger - // stack allows checking for the ability to do an in-place update - stack Stack + // canInplace is used to check if the allocation can be inplace upgraded + allocUpdateFn allocUpdateType // batch marks whether the job is a batch job batch bool - // eval is the evaluation triggering the scheduling event - eval *structs.Evaluation - // job is the job being operated on, it may be nil if the job is being // stopped via a purge job *structs.Job + // jobID is the ID of the job being operated on. The job may be nil if it is + // being stopped so we require this seperately. + jobID string + // deployment is the current deployment for the job deployment *structs.Deployment @@ -87,15 +97,15 @@ type allocStopResult struct { // NewAllocReconciler creates a new reconciler that should be used to determine // the changes required to bring the cluster state inline with the declared jobspec -func NewAllocReconciler(ctx Context, stack Stack, batch bool, - eval *structs.Evaluation, job *structs.Job, deployment *structs.Deployment, +func NewAllocReconciler(logger *log.Logger, allocUpdateFn allocUpdateType, batch bool, + jobID string, job *structs.Job, deployment *structs.Deployment, existingAllocs []*structs.Allocation, taintedNodes map[string]*structs.Node) *allocReconciler { a := &allocReconciler{ - ctx: ctx, - stack: stack, - eval: eval, + logger: logger, + allocUpdateFn: allocUpdateFn, batch: batch, + jobID: jobID, job: job, deployment: deployment, existingAllocs: existingAllocs, @@ -150,11 +160,11 @@ func (a *allocReconciler) Compute() *reconcileResults { if a.deployment == nil && !stopped && a.job.HasUpdateStrategy() { a.deployment = structs.NewDeployment(a.job) a.result.createDeployment = a.deployment - a.ctx.Logger().Printf("ALEX: MADE DEPLOYMENT %q", a.deployment.ID) + a.logger.Printf("ALEX: MADE DEPLOYMENT %q", a.deployment.ID) } if a.deployment != nil { - a.ctx.Logger().Printf("ALEX: CURRENT DEPLOYMENT %q", a.deployment.ID) + a.logger.Printf("ALEX: CURRENT DEPLOYMENT %q", a.deployment.ID) } m := newAllocMatrix(a.job, a.existingAllocs) @@ -202,7 +212,7 @@ func (a *allocReconciler) computeGroup(group string, as allocSet) { // If the task group is nil, then the task group has been removed so all we // need to do is stop everything if tg == nil { - a.ctx.Logger().Printf("RECONCILER -- STOPPING ALL") + a.logger.Printf("RECONCILER -- STOPPING ALL") a.markStop(untainted, "", allocNotNeeded) a.markStop(migrate, "", allocNotNeeded) a.markStop(lost, structs.AllocClientStatusLost, allocLost) @@ -227,8 +237,8 @@ func (a *allocReconciler) computeGroup(group string, as allocSet) { // Track the lost and migrating desiredChanges.Migrate += uint64(len(migrate) + len(lost)) - a.ctx.Logger().Printf("RECONCILER -- untainted (%d); migrate (%d); lost (%d)", len(untainted), len(migrate), len(lost)) - a.ctx.Logger().Printf("RECONCILER -- untainted %#v", untainted) + a.logger.Printf("RECONCILER -- untainted (%d); migrate (%d); lost (%d)", len(untainted), len(migrate), len(lost)) + a.logger.Printf("RECONCILER -- untainted %#v", untainted) // Mark all lost allocations for stop. Previous allocation doesn't matter // here since it is on a lost node @@ -250,8 +260,8 @@ func (a *allocReconciler) computeGroup(group string, as allocSet) { a.markStop(older, "", allocNotNeeded) desiredChanges.Stop += uint64(len(older)) - a.ctx.Logger().Printf("RECONCILER -- older canaries %#v", older) - a.ctx.Logger().Printf("RECONCILER -- current canaries %#v", current) + a.logger.Printf("RECONCILER -- older canaries %#v", older) + a.logger.Printf("RECONCILER -- current canaries %#v", current) untainted = untainted.difference(older) canaries = current @@ -263,11 +273,11 @@ func (a *allocReconciler) computeGroup(group string, as allocSet) { untainted = untainted.difference(canaries) canaries = nil } - a.ctx.Logger().Printf("RECONCILER -- untainted - remove canaries %#v", untainted) + a.logger.Printf("RECONCILER -- untainted - remove canaries %#v", untainted) } // Create a structure for choosing names - nameIndex := newAllocNameIndex(a.eval.JobID, group, tg.Count, untainted) + nameIndex := newAllocNameIndex(a.jobID, group, tg.Count, untainted) // Stop any unneeded allocations and update the untainted set to not // included stopped allocations. We ignore canaries since that can push us @@ -293,8 +303,8 @@ func (a *allocReconciler) computeGroup(group string, as allocSet) { desiredChanges.InPlaceUpdate += uint64(len(inplace)) desiredChanges.DestructiveUpdate += uint64(len(destructive)) - a.ctx.Logger().Printf("RECONCILER -- Stopping (%d)", len(stop)) - a.ctx.Logger().Printf("RECONCILER -- Inplace (%d); Destructive (%d)", len(inplace), len(destructive)) + a.logger.Printf("RECONCILER -- Stopping (%d)", len(stop)) + a.logger.Printf("RECONCILER -- Inplace (%d); Destructive (%d)", len(inplace), len(destructive)) // Get the update strategy of the group strategy := tg.Update @@ -312,7 +322,7 @@ func (a *allocReconciler) computeGroup(group string, as allocSet) { dstate.DesiredTotal += strategy.Canary } - a.ctx.Logger().Printf("RECONCILER -- Canary (%d)", number) + a.logger.Printf("RECONCILER -- Canary (%d)", number) for _, name := range nameIndex.NextCanaries(uint(number), canaries, destructive) { a.result.place = append(a.result.place, allocPlaceResult{ name: name, @@ -325,7 +335,7 @@ func (a *allocReconciler) computeGroup(group string, as allocSet) { // Determine how many we can place haveCanaries := dstate != nil && dstate.DesiredCanaries != 0 limit := a.computeLimit(tg, untainted, destructive, haveCanaries) - a.ctx.Logger().Printf("RECONCILER -- LIMIT %v", limit) + a.logger.Printf("RECONCILER -- LIMIT %v", limit) // Place if: // * The deployment is not paused @@ -342,7 +352,7 @@ func (a *allocReconciler) computeGroup(group string, as allocSet) { desiredChanges.Place += uint64(len(place)) // Place all new allocations - a.ctx.Logger().Printf("RECONCILER -- Placing (%d)", len(place)) + a.logger.Printf("RECONCILER -- Placing (%d)", len(place)) for _, p := range place { a.result.place = append(a.result.place, p) } @@ -350,7 +360,7 @@ func (a *allocReconciler) computeGroup(group string, as allocSet) { // Do all destructive updates min := helper.IntMin(len(destructive), limit) i := 0 - a.ctx.Logger().Printf("RECONCILER -- Destructive Updating (%d)", min) + a.logger.Printf("RECONCILER -- Destructive Updating (%d)", min) for _, alloc := range destructive { if i == min { break @@ -371,7 +381,7 @@ func (a *allocReconciler) computeGroup(group string, as allocSet) { } // TODO Migrations should be done using a stagger and max_parallel. - a.ctx.Logger().Printf("RECONCILER -- Migrating (%d)", len(migrate)) + a.logger.Printf("RECONCILER -- Migrating (%d)", len(migrate)) for _, alloc := range migrate { a.result.stop = append(a.result.stop, allocStopResult{ alloc: alloc, @@ -480,84 +490,16 @@ func (a *allocReconciler) computeUpdates(group *structs.TaskGroup, untainted all inplace = make(map[string]*structs.Allocation) destructive = make(map[string]*structs.Allocation) - ws := memdb.NewWatchSet() for _, alloc := range untainted { - if alloc.Job.JobModifyIndex == a.job.JobModifyIndex { + ignoreChange, destructiveChange, inplaceAlloc := a.allocUpdateFn(alloc, a.job, group) + if ignoreChange { ignore[alloc.ID] = alloc - continue - } - - // Check if the task drivers or config has changed, requires - // a destructive upgrade since that cannot be done in-place. - if tasksUpdated(a.job, alloc.Job, group.Name) { + } else if destructiveChange { destructive[alloc.ID] = alloc - continue + } else { + inplace[alloc.ID] = alloc + a.result.inplaceUpdate = append(a.result.inplaceUpdate, inplaceAlloc) } - - // Terminal batch allocations are not filtered when they are completed - // successfully. We should avoid adding the allocation to the plan in - // the case that it is an in-place update to avoid both additional data - // in the plan and work for the clients. - if alloc.TerminalStatus() { - ignore[alloc.ID] = alloc - continue - } - - // Get the existing node - node, err := a.ctx.State().NodeByID(ws, alloc.NodeID) - if err != nil { - a.ctx.Logger().Printf("[ERR] sched: %#v failed to get node '%s': %v", a.eval, alloc.NodeID, err) - continue - } - if node == nil { - destructive[alloc.ID] = alloc - continue - } - - // Set the existing node as the base set - a.stack.SetNodes([]*structs.Node{node}) - - // Stage an eviction of the current allocation. This is done so that - // the current allocation is discounted when checking for feasability. - // Otherwise we would be trying to fit the tasks current resources and - // updated resources. After select is called we can remove the evict. - a.ctx.Plan().AppendUpdate(alloc, structs.AllocDesiredStatusStop, allocInPlace, "") - - // Attempt to match the task group - option, _ := a.stack.Select(group) - - // Pop the allocation - a.ctx.Plan().PopUpdate(alloc) - - // Skip if we could not do an in-place update - if option == nil { - destructive[alloc.ID] = alloc - continue - } - - // Restore the network offers from the existing allocation. - // We do not allow network resources (reserved/dynamic ports) - // to be updated. This is guarded in taskUpdated, so we can - // safely restore those here. - for task, resources := range option.TaskResources { - existing := alloc.TaskResources[task] - resources.Networks = existing.Networks - } - - // Create a shallow copy - newAlloc := new(structs.Allocation) - *newAlloc = *alloc - - // Update the allocation - newAlloc.EvalID = a.eval.ID - newAlloc.Job = nil // Use the Job in the Plan - newAlloc.Resources = nil // Computed in Plan Apply - newAlloc.TaskResources = option.TaskResources - newAlloc.Metrics = a.ctx.Metrics() - - // Add this to the result and the tracking allocSet - inplace[alloc.ID] = alloc - a.result.inplaceUpdate = append(a.result.inplaceUpdate, newAlloc) } return diff --git a/scheduler/util.go b/scheduler/util.go index c84283b1c..4f6bd5f92 100644 --- a/scheduler/util.go +++ b/scheduler/util.go @@ -732,3 +732,78 @@ func updateNonTerminalAllocsToLost(plan *structs.Plan, tainted map[string]*struc } } } + +func newAllocUpdateFn(ctx Context, stack Stack, evalID string) allocUpdateType { + return func(existing *structs.Allocation, newJob *structs.Job, newTG *structs.TaskGroup) (ignore, destructive bool, updated *structs.Allocation) { + // Same index, so nothing to do + if existing.Job.JobModifyIndex == newJob.JobModifyIndex { + return true, false, nil + } + + // Check if the task drivers or config has changed, requires + // a destructive upgrade since that cannot be done in-place. + if tasksUpdated(newJob, existing.Job, newTG.Name) { + return false, true, nil + } + + // Terminal batch allocations are not filtered when they are completed + // successfully. We should avoid adding the allocation to the plan in + // the case that it is an in-place update to avoid both additional data + // in the plan and work for the clients. + if existing.TerminalStatus() { + return true, false, nil + } + + // Get the existing node + ws := memdb.NewWatchSet() + node, err := ctx.State().NodeByID(ws, existing.NodeID) + if err != nil { + ctx.Logger().Printf("[ERR] sched: %#v failed to get node '%s': %v", evalID, existing.NodeID, err) + return true, false, nil + } + if node == nil { + return false, true, nil + } + + // Set the existing node as the base set + stack.SetNodes([]*structs.Node{node}) + + // Stage an eviction of the current allocation. This is done so that + // the current allocation is discounted when checking for feasability. + // Otherwise we would be trying to fit the tasks current resources and + // updated resources. After select is called we can remove the evict. + ctx.Plan().AppendUpdate(existing, structs.AllocDesiredStatusStop, allocInPlace, "") + + // Attempt to match the task group + option, _ := stack.Select(newTG) + + // Pop the allocation + ctx.Plan().PopUpdate(existing) + + // Require destructive if we could not do an in-place update + if option == nil { + return false, true, nil + } + + // Restore the network offers from the existing allocation. + // We do not allow network resources (reserved/dynamic ports) + // to be updated. This is guarded in taskUpdated, so we can + // safely restore those here. + for task, resources := range option.TaskResources { + existingResources := existing.TaskResources[task] + resources.Networks = existingResources.Networks + } + + // Create a shallow copy + newAlloc := new(structs.Allocation) + *newAlloc = *existing + + // Update the allocation + newAlloc.EvalID = evalID + newAlloc.Job = nil // Use the Job in the Plan + newAlloc.Resources = nil // Computed in Plan Apply + newAlloc.TaskResources = option.TaskResources + newAlloc.Metrics = ctx.Metrics() + return false, false, newAlloc + } +}