Pull out in-place updating into a passed in function; reduce inputs to reconciler

This commit is contained in:
Alex Dadgar 2017-05-31 16:55:40 -07:00
parent c77944ed29
commit d111dd5c10
3 changed files with 123 additions and 104 deletions

View File

@ -383,7 +383,9 @@ func (s *GenericScheduler) computeJobAllocs() error {
// Filter out the allocations in a terminal state
allocs, _ = s.filterCompleteAllocs(allocs)
reconciler := NewAllocReconciler(s.ctx, s.stack, s.batch, s.eval, s.job, s.deployment, allocs, tainted)
reconciler := NewAllocReconciler(s.ctx.Logger(),
newAllocUpdateFn(s.ctx, s.stack, s.eval.ID),
s.batch, s.eval.JobID, s.job, s.deployment, allocs, tainted)
results := reconciler.Compute()
if s.eval.AnnotatePlan {

View File

@ -1,32 +1,42 @@
package scheduler
import (
memdb "github.com/hashicorp/go-memdb"
"log"
"github.com/hashicorp/nomad/helper"
"github.com/hashicorp/nomad/nomad/structs"
)
// allocUpdateType takes an existing allocation and a new job definition and
// returns whether the allocation can ignore the change, requires a destructive
// update, or can be inplace updated. If it can be inplace updated, an updated
// allocation that has the new resources and alloc metrics attached will be
// returned.
type allocUpdateType func(existing *structs.Allocation, newJob *structs.Job, newTG *structs.TaskGroup) (ignore, destructive bool, updated *structs.Allocation)
// allocReconciler is used to determine the set of allocations that require
// placement, inplace updating or stopping given the job specification and
// existing cluster state. The reconciler should only be used for batch and
// service jobs.
type allocReconciler struct {
// ctx gives access to the state store and logger
ctx Context
// logger is used to log debug information. Logging should be kept at a
// minimal here
logger *log.Logger
// stack allows checking for the ability to do an in-place update
stack Stack
// canInplace is used to check if the allocation can be inplace upgraded
allocUpdateFn allocUpdateType
// batch marks whether the job is a batch job
batch bool
// eval is the evaluation triggering the scheduling event
eval *structs.Evaluation
// job is the job being operated on, it may be nil if the job is being
// stopped via a purge
job *structs.Job
// jobID is the ID of the job being operated on. The job may be nil if it is
// being stopped so we require this seperately.
jobID string
// deployment is the current deployment for the job
deployment *structs.Deployment
@ -87,15 +97,15 @@ type allocStopResult struct {
// NewAllocReconciler creates a new reconciler that should be used to determine
// the changes required to bring the cluster state inline with the declared jobspec
func NewAllocReconciler(ctx Context, stack Stack, batch bool,
eval *structs.Evaluation, job *structs.Job, deployment *structs.Deployment,
func NewAllocReconciler(logger *log.Logger, allocUpdateFn allocUpdateType, batch bool,
jobID string, job *structs.Job, deployment *structs.Deployment,
existingAllocs []*structs.Allocation, taintedNodes map[string]*structs.Node) *allocReconciler {
a := &allocReconciler{
ctx: ctx,
stack: stack,
eval: eval,
logger: logger,
allocUpdateFn: allocUpdateFn,
batch: batch,
jobID: jobID,
job: job,
deployment: deployment,
existingAllocs: existingAllocs,
@ -150,11 +160,11 @@ func (a *allocReconciler) Compute() *reconcileResults {
if a.deployment == nil && !stopped && a.job.HasUpdateStrategy() {
a.deployment = structs.NewDeployment(a.job)
a.result.createDeployment = a.deployment
a.ctx.Logger().Printf("ALEX: MADE DEPLOYMENT %q", a.deployment.ID)
a.logger.Printf("ALEX: MADE DEPLOYMENT %q", a.deployment.ID)
}
if a.deployment != nil {
a.ctx.Logger().Printf("ALEX: CURRENT DEPLOYMENT %q", a.deployment.ID)
a.logger.Printf("ALEX: CURRENT DEPLOYMENT %q", a.deployment.ID)
}
m := newAllocMatrix(a.job, a.existingAllocs)
@ -202,7 +212,7 @@ func (a *allocReconciler) computeGroup(group string, as allocSet) {
// If the task group is nil, then the task group has been removed so all we
// need to do is stop everything
if tg == nil {
a.ctx.Logger().Printf("RECONCILER -- STOPPING ALL")
a.logger.Printf("RECONCILER -- STOPPING ALL")
a.markStop(untainted, "", allocNotNeeded)
a.markStop(migrate, "", allocNotNeeded)
a.markStop(lost, structs.AllocClientStatusLost, allocLost)
@ -227,8 +237,8 @@ func (a *allocReconciler) computeGroup(group string, as allocSet) {
// Track the lost and migrating
desiredChanges.Migrate += uint64(len(migrate) + len(lost))
a.ctx.Logger().Printf("RECONCILER -- untainted (%d); migrate (%d); lost (%d)", len(untainted), len(migrate), len(lost))
a.ctx.Logger().Printf("RECONCILER -- untainted %#v", untainted)
a.logger.Printf("RECONCILER -- untainted (%d); migrate (%d); lost (%d)", len(untainted), len(migrate), len(lost))
a.logger.Printf("RECONCILER -- untainted %#v", untainted)
// Mark all lost allocations for stop. Previous allocation doesn't matter
// here since it is on a lost node
@ -250,8 +260,8 @@ func (a *allocReconciler) computeGroup(group string, as allocSet) {
a.markStop(older, "", allocNotNeeded)
desiredChanges.Stop += uint64(len(older))
a.ctx.Logger().Printf("RECONCILER -- older canaries %#v", older)
a.ctx.Logger().Printf("RECONCILER -- current canaries %#v", current)
a.logger.Printf("RECONCILER -- older canaries %#v", older)
a.logger.Printf("RECONCILER -- current canaries %#v", current)
untainted = untainted.difference(older)
canaries = current
@ -263,11 +273,11 @@ func (a *allocReconciler) computeGroup(group string, as allocSet) {
untainted = untainted.difference(canaries)
canaries = nil
}
a.ctx.Logger().Printf("RECONCILER -- untainted - remove canaries %#v", untainted)
a.logger.Printf("RECONCILER -- untainted - remove canaries %#v", untainted)
}
// Create a structure for choosing names
nameIndex := newAllocNameIndex(a.eval.JobID, group, tg.Count, untainted)
nameIndex := newAllocNameIndex(a.jobID, group, tg.Count, untainted)
// Stop any unneeded allocations and update the untainted set to not
// included stopped allocations. We ignore canaries since that can push us
@ -293,8 +303,8 @@ func (a *allocReconciler) computeGroup(group string, as allocSet) {
desiredChanges.InPlaceUpdate += uint64(len(inplace))
desiredChanges.DestructiveUpdate += uint64(len(destructive))
a.ctx.Logger().Printf("RECONCILER -- Stopping (%d)", len(stop))
a.ctx.Logger().Printf("RECONCILER -- Inplace (%d); Destructive (%d)", len(inplace), len(destructive))
a.logger.Printf("RECONCILER -- Stopping (%d)", len(stop))
a.logger.Printf("RECONCILER -- Inplace (%d); Destructive (%d)", len(inplace), len(destructive))
// Get the update strategy of the group
strategy := tg.Update
@ -312,7 +322,7 @@ func (a *allocReconciler) computeGroup(group string, as allocSet) {
dstate.DesiredTotal += strategy.Canary
}
a.ctx.Logger().Printf("RECONCILER -- Canary (%d)", number)
a.logger.Printf("RECONCILER -- Canary (%d)", number)
for _, name := range nameIndex.NextCanaries(uint(number), canaries, destructive) {
a.result.place = append(a.result.place, allocPlaceResult{
name: name,
@ -325,7 +335,7 @@ func (a *allocReconciler) computeGroup(group string, as allocSet) {
// Determine how many we can place
haveCanaries := dstate != nil && dstate.DesiredCanaries != 0
limit := a.computeLimit(tg, untainted, destructive, haveCanaries)
a.ctx.Logger().Printf("RECONCILER -- LIMIT %v", limit)
a.logger.Printf("RECONCILER -- LIMIT %v", limit)
// Place if:
// * The deployment is not paused
@ -342,7 +352,7 @@ func (a *allocReconciler) computeGroup(group string, as allocSet) {
desiredChanges.Place += uint64(len(place))
// Place all new allocations
a.ctx.Logger().Printf("RECONCILER -- Placing (%d)", len(place))
a.logger.Printf("RECONCILER -- Placing (%d)", len(place))
for _, p := range place {
a.result.place = append(a.result.place, p)
}
@ -350,7 +360,7 @@ func (a *allocReconciler) computeGroup(group string, as allocSet) {
// Do all destructive updates
min := helper.IntMin(len(destructive), limit)
i := 0
a.ctx.Logger().Printf("RECONCILER -- Destructive Updating (%d)", min)
a.logger.Printf("RECONCILER -- Destructive Updating (%d)", min)
for _, alloc := range destructive {
if i == min {
break
@ -371,7 +381,7 @@ func (a *allocReconciler) computeGroup(group string, as allocSet) {
}
// TODO Migrations should be done using a stagger and max_parallel.
a.ctx.Logger().Printf("RECONCILER -- Migrating (%d)", len(migrate))
a.logger.Printf("RECONCILER -- Migrating (%d)", len(migrate))
for _, alloc := range migrate {
a.result.stop = append(a.result.stop, allocStopResult{
alloc: alloc,
@ -480,84 +490,16 @@ func (a *allocReconciler) computeUpdates(group *structs.TaskGroup, untainted all
inplace = make(map[string]*structs.Allocation)
destructive = make(map[string]*structs.Allocation)
ws := memdb.NewWatchSet()
for _, alloc := range untainted {
if alloc.Job.JobModifyIndex == a.job.JobModifyIndex {
ignoreChange, destructiveChange, inplaceAlloc := a.allocUpdateFn(alloc, a.job, group)
if ignoreChange {
ignore[alloc.ID] = alloc
continue
}
// Check if the task drivers or config has changed, requires
// a destructive upgrade since that cannot be done in-place.
if tasksUpdated(a.job, alloc.Job, group.Name) {
} else if destructiveChange {
destructive[alloc.ID] = alloc
continue
} else {
inplace[alloc.ID] = alloc
a.result.inplaceUpdate = append(a.result.inplaceUpdate, inplaceAlloc)
}
// Terminal batch allocations are not filtered when they are completed
// successfully. We should avoid adding the allocation to the plan in
// the case that it is an in-place update to avoid both additional data
// in the plan and work for the clients.
if alloc.TerminalStatus() {
ignore[alloc.ID] = alloc
continue
}
// Get the existing node
node, err := a.ctx.State().NodeByID(ws, alloc.NodeID)
if err != nil {
a.ctx.Logger().Printf("[ERR] sched: %#v failed to get node '%s': %v", a.eval, alloc.NodeID, err)
continue
}
if node == nil {
destructive[alloc.ID] = alloc
continue
}
// Set the existing node as the base set
a.stack.SetNodes([]*structs.Node{node})
// Stage an eviction of the current allocation. This is done so that
// the current allocation is discounted when checking for feasability.
// Otherwise we would be trying to fit the tasks current resources and
// updated resources. After select is called we can remove the evict.
a.ctx.Plan().AppendUpdate(alloc, structs.AllocDesiredStatusStop, allocInPlace, "")
// Attempt to match the task group
option, _ := a.stack.Select(group)
// Pop the allocation
a.ctx.Plan().PopUpdate(alloc)
// Skip if we could not do an in-place update
if option == nil {
destructive[alloc.ID] = alloc
continue
}
// Restore the network offers from the existing allocation.
// We do not allow network resources (reserved/dynamic ports)
// to be updated. This is guarded in taskUpdated, so we can
// safely restore those here.
for task, resources := range option.TaskResources {
existing := alloc.TaskResources[task]
resources.Networks = existing.Networks
}
// Create a shallow copy
newAlloc := new(structs.Allocation)
*newAlloc = *alloc
// Update the allocation
newAlloc.EvalID = a.eval.ID
newAlloc.Job = nil // Use the Job in the Plan
newAlloc.Resources = nil // Computed in Plan Apply
newAlloc.TaskResources = option.TaskResources
newAlloc.Metrics = a.ctx.Metrics()
// Add this to the result and the tracking allocSet
inplace[alloc.ID] = alloc
a.result.inplaceUpdate = append(a.result.inplaceUpdate, newAlloc)
}
return

View File

@ -732,3 +732,78 @@ func updateNonTerminalAllocsToLost(plan *structs.Plan, tainted map[string]*struc
}
}
}
func newAllocUpdateFn(ctx Context, stack Stack, evalID string) allocUpdateType {
return func(existing *structs.Allocation, newJob *structs.Job, newTG *structs.TaskGroup) (ignore, destructive bool, updated *structs.Allocation) {
// Same index, so nothing to do
if existing.Job.JobModifyIndex == newJob.JobModifyIndex {
return true, false, nil
}
// Check if the task drivers or config has changed, requires
// a destructive upgrade since that cannot be done in-place.
if tasksUpdated(newJob, existing.Job, newTG.Name) {
return false, true, nil
}
// Terminal batch allocations are not filtered when they are completed
// successfully. We should avoid adding the allocation to the plan in
// the case that it is an in-place update to avoid both additional data
// in the plan and work for the clients.
if existing.TerminalStatus() {
return true, false, nil
}
// Get the existing node
ws := memdb.NewWatchSet()
node, err := ctx.State().NodeByID(ws, existing.NodeID)
if err != nil {
ctx.Logger().Printf("[ERR] sched: %#v failed to get node '%s': %v", evalID, existing.NodeID, err)
return true, false, nil
}
if node == nil {
return false, true, nil
}
// Set the existing node as the base set
stack.SetNodes([]*structs.Node{node})
// Stage an eviction of the current allocation. This is done so that
// the current allocation is discounted when checking for feasability.
// Otherwise we would be trying to fit the tasks current resources and
// updated resources. After select is called we can remove the evict.
ctx.Plan().AppendUpdate(existing, structs.AllocDesiredStatusStop, allocInPlace, "")
// Attempt to match the task group
option, _ := stack.Select(newTG)
// Pop the allocation
ctx.Plan().PopUpdate(existing)
// Require destructive if we could not do an in-place update
if option == nil {
return false, true, nil
}
// Restore the network offers from the existing allocation.
// We do not allow network resources (reserved/dynamic ports)
// to be updated. This is guarded in taskUpdated, so we can
// safely restore those here.
for task, resources := range option.TaskResources {
existingResources := existing.TaskResources[task]
resources.Networks = existingResources.Networks
}
// Create a shallow copy
newAlloc := new(structs.Allocation)
*newAlloc = *existing
// Update the allocation
newAlloc.EvalID = evalID
newAlloc.Job = nil // Use the Job in the Plan
newAlloc.Resources = nil // Computed in Plan Apply
newAlloc.TaskResources = option.TaskResources
newAlloc.Metrics = ctx.Metrics()
return false, false, newAlloc
}
}