handle annotations

This commit is contained in:
Alex Dadgar 2017-05-23 13:02:47 -07:00
parent a46f7c3eb8
commit cf5baba808
4 changed files with 57 additions and 15 deletions

View File

@ -791,6 +791,7 @@ type DesiredUpdates struct {
Stop uint64
InPlaceUpdate uint64
DestructiveUpdate uint64
Canary uint64
}
type JobDispatchRequest struct {

View File

@ -4713,6 +4713,7 @@ type DesiredUpdates struct {
Stop uint64
InPlaceUpdate uint64
DestructiveUpdate uint64
Canary uint64
}
// msgpackHandle is a shared handle for encoding/decoding of structs

View File

@ -386,6 +386,12 @@ func (s *GenericScheduler) computeJobAllocs2() error {
reconciler := NewAllocReconciler(s.ctx, s.stack, s.batch, s.eval, s.job, s.deployment, allocs, tainted)
results := reconciler.Compute()
if s.eval.AnnotatePlan {
s.plan.Annotations = &structs.PlanAnnotations{
DesiredTGUpdates: results.desiredTGUpdates,
}
}
// Add the deployment changes to the plan
s.plan.CreatedDeployment = results.createDeployment
s.plan.DeploymentUpdates = results.deploymentUpdates
@ -405,6 +411,21 @@ func (s *GenericScheduler) computeJobAllocs2() error {
s.ctx.Plan().AppendAlloc(update)
}
// Nothing remaining to do if placement is not required
if len(results.place) == 0 {
if !s.job.Stopped() {
for _, tg := range s.job.TaskGroups {
s.queuedAllocs[tg.Name] = 0
}
}
return nil
}
// Record the number of allocations that needs to be placed per Task Group
for _, place := range results.place {
s.queuedAllocs[place.taskGroup.Name] += 1
}
// Compute the placements
return s.computePlacements2(results.place)
}

View File

@ -13,8 +13,6 @@ import (
* between the existing allocations and handles canaries replacing certain
* allocations.
* 2) Need to populate the desired state of a created deployment
* 3) Need to capture the overall desired transformations so that annotated
* plans work.
*/
// allocReconciler is used to determine the set of allocations that require
@ -75,7 +73,9 @@ type reconcileResults struct {
// stop is the set of allocations to stop
stop []allocStopResult
// TODO track the desired of the deployment
// desiredTGUpdates captures the desired set of changes to make for each
// task group.
desiredTGUpdates map[string]*structs.DesiredUpdates
}
// allocPlaceResult contains the information required to place a single
@ -109,7 +109,9 @@ func NewAllocReconciler(ctx Context, stack Stack, batch bool,
deployment: deployment,
existingAllocs: existingAllocs,
taintedNodes: taintedNodes,
result: new(reconcileResults),
result: &reconcileResults{
desiredTGUpdates: make(map[string]*structs.DesiredUpdates),
},
}
// Detect if the deployment is paused
@ -195,6 +197,10 @@ func (a *allocReconciler) markStop(allocs allocSet, clientStatus, statusDescript
// computeGroup reconciles state for a particular task group.
func (a *allocReconciler) computeGroup(group string, as allocSet) {
// Create the desired update object for the group
desiredChanges := new(structs.DesiredUpdates)
a.result.desiredTGUpdates[group] = desiredChanges
// Get the task group. The task group may be nil if the job was updates such
// that the task group no longer exists
tg := a.job.LookupTaskGroup(group)
@ -202,9 +208,6 @@ func (a *allocReconciler) computeGroup(group string, as allocSet) {
// Determine what set of alloations are on tainted nodes
untainted, migrate, lost := as.filterByTainted(a.taintedNodes)
a.ctx.Logger().Printf("RECONCILER -- untainted (%d); migrate (%d); lost (%d)", len(untainted), len(migrate), len(lost))
a.ctx.Logger().Printf("RECONCILER -- untainted %#v", untainted)
// If the task group is nil, then the task group has been removed so all we
// need to do is stop everything
if tg == nil {
@ -212,14 +215,15 @@ func (a *allocReconciler) computeGroup(group string, as allocSet) {
a.markStop(untainted, "", allocNotNeeded)
a.markStop(migrate, "", allocNotNeeded)
a.markStop(lost, structs.AllocClientStatusLost, allocLost)
desiredChanges.Stop = uint64(len(untainted) + len(migrate) + len(lost))
return
}
// Get the deployment state for the group
var dstate *structs.DeploymentState
if a.deployment != nil {
dstate = a.deployment.TaskGroups[group]
}
// Track the lost and migrating
desiredChanges.Migrate += uint64(len(migrate) + len(lost))
a.ctx.Logger().Printf("RECONCILER -- untainted (%d); migrate (%d); lost (%d)", len(untainted), len(migrate), len(lost))
a.ctx.Logger().Printf("RECONCILER -- untainted %#v", untainted)
// Mark all lost allocations for stop. Previous allocation doesn't matter
// here since it is on a lost node
@ -239,6 +243,7 @@ func (a *allocReconciler) computeGroup(group string, as allocSet) {
if a.deployment != nil {
current, older := canaries.filterByDeployment(a.deployment.ID)
a.markStop(older, "", allocNotNeeded)
desiredChanges.Stop += uint64(len(older))
a.ctx.Logger().Printf("RECONCILER -- older canaries %#v", older)
a.ctx.Logger().Printf("RECONCILER -- current canaries %#v", current)
@ -250,6 +255,7 @@ func (a *allocReconciler) computeGroup(group string, as allocSet) {
// We don't need any of those canaries since there no longer is a
// deployment
a.markStop(canaries, "", allocNotNeeded)
desiredChanges.Stop += uint64(len(canaries))
untainted = untainted.difference(canaries)
canaries = nil
a.ctx.Logger().Printf("RECONCILER -- untainted - remove canaries %#v", untainted)
@ -260,11 +266,15 @@ func (a *allocReconciler) computeGroup(group string, as allocSet) {
// included stopped allocations
keep, stop := a.computeStop(tg, untainted)
a.markStop(stop, "", allocNotNeeded)
desiredChanges.Stop += uint64(len(stop))
untainted = keep
// Do inplace upgrades where possible and capture the set of upgrades that
// need to be done destructively.
_, inplace, destructive := a.computeUpdates(tg, untainted)
ignore, inplace, destructive := a.computeUpdates(tg, untainted)
desiredChanges.Ignore += uint64(len(ignore))
desiredChanges.InPlaceUpdate += uint64(len(inplace))
desiredChanges.DestructiveUpdate += uint64(len(destructive))
a.ctx.Logger().Printf("RECONCILER -- Stopping (%d); Untainted (%d)", len(stop), len(keep))
a.ctx.Logger().Printf("RECONCILER -- Inplace (%d); Destructive (%d)", len(inplace), len(destructive))
@ -279,8 +289,10 @@ func (a *allocReconciler) computeGroup(group string, as allocSet) {
requireCanary := len(destructive) != 0 && strategy != nil && len(canaries) < strategy.Canary
placeCanaries := requireCanary && !a.deploymentPaused
if placeCanaries {
a.ctx.Logger().Printf("RECONCILER -- Canary (%d)", strategy.Canary-len(canaries))
for i := len(canaries); i < strategy.Canary; i++ {
number := strategy.Canary - len(canaries)
desiredChanges.Canary += uint64(number)
a.ctx.Logger().Printf("RECONCILER -- Canary (%d)", number)
for i := 0; i < number; i++ {
a.result.place = append(a.result.place, allocPlaceResult{
// XXX Pick better name
name: structs.GenerateUUID(),
@ -295,6 +307,12 @@ func (a *allocReconciler) computeGroup(group string, as allocSet) {
limit := a.computeLimit(tg, untainted, haveCanaries)
a.ctx.Logger().Printf("RECONCILER -- LIMIT %v", limit)
// Get the deployment state for the group
var dstate *structs.DeploymentState
if a.deployment != nil {
dstate = a.deployment.TaskGroups[group]
}
// Place if:
// * The deployment is not paused
// * Not placing any canaries
@ -305,6 +323,7 @@ func (a *allocReconciler) computeGroup(group string, as allocSet) {
if canPlace {
// Place all new allocations
place := a.computePlacements(tg, untainted)
desiredChanges.Place += uint64(len(place))
a.ctx.Logger().Printf("RECONCILER -- Placing (%d)", len(place))
for _, p := range place {
a.result.place = append(a.result.place, p)