scheduler/reconcile: set FollowupEvalID on lost stop_after_client_disconnect (#8105) (#8138)

* scheduler/reconcile: set FollowupEvalID on lost stop_after_client_disconnect

* scheduler/reconcile: thread follupEvalIDs through to results.stop

* scheduler/reconcile: comment typo

* nomad/_test: correct arguments for plan.AppendStoppedAlloc

* scheduler/reconcile: avoid nil, cleanup handleDelayed(Lost|Reschedules)
This commit is contained in:
Lang Martin 2020-06-09 17:13:53 -04:00 committed by GitHub
parent 1cca7abcab
commit 069840bef8
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
10 changed files with 65 additions and 43 deletions

View File

@ -339,6 +339,7 @@ func normalizeStoppedAlloc(stoppedAlloc *structs.Allocation, now int64) *structs
DesiredDescription: stoppedAlloc.DesiredDescription,
ClientStatus: stoppedAlloc.ClientStatus,
ModifyTime: now,
FollowupEvalID: stoppedAlloc.FollowupEvalID,
}
}

View File

@ -5379,6 +5379,9 @@ func (s *StateSnapshot) DenormalizeAllocationDiffSlice(allocDiffs []*structs.All
if allocDiff.ClientStatus != "" {
allocCopy.ClientStatus = allocDiff.ClientStatus
}
if allocDiff.FollowupEvalID != "" {
allocCopy.FollowupEvalID = allocDiff.FollowupEvalID
}
}
if allocDiff.ModifyTime != 0 {
allocCopy.ModifyTime = allocDiff.ModifyTime

View File

@ -9465,7 +9465,7 @@ type Plan struct {
// AppendStoppedAlloc marks an allocation to be stopped. The clientStatus of the
// allocation may be optionally set by passing in a non-empty value.
func (p *Plan) AppendStoppedAlloc(alloc *Allocation, desiredDesc, clientStatus string) {
func (p *Plan) AppendStoppedAlloc(alloc *Allocation, desiredDesc, clientStatus, followupEvalID string) {
newAlloc := new(Allocation)
*newAlloc = *alloc
@ -9490,6 +9490,10 @@ func (p *Plan) AppendStoppedAlloc(alloc *Allocation, desiredDesc, clientStatus s
newAlloc.AppendState(AllocStateFieldClientStatus, clientStatus)
if followupEvalID != "" {
newAlloc.FollowupEvalID = followupEvalID
}
node := alloc.NodeID
existing := p.NodeUpdate[node]
p.NodeUpdate[node] = append(existing, newAlloc)
@ -9564,6 +9568,7 @@ func (p *Plan) NormalizeAllocations() {
ID: alloc.ID,
DesiredDescription: alloc.DesiredDescription,
ClientStatus: alloc.ClientStatus,
FollowupEvalID: alloc.FollowupEvalID,
}
}
}

View File

@ -3563,7 +3563,7 @@ func TestPlan_NormalizeAllocations(t *testing.T) {
}
stoppedAlloc := MockAlloc()
desiredDesc := "Desired desc"
plan.AppendStoppedAlloc(stoppedAlloc, desiredDesc, AllocClientStatusLost)
plan.AppendStoppedAlloc(stoppedAlloc, desiredDesc, AllocClientStatusLost, "followup-eval-id")
preemptedAlloc := MockAlloc()
preemptingAllocID := uuid.Generate()
plan.AppendPreemptedAlloc(preemptedAlloc, preemptingAllocID)
@ -3575,6 +3575,7 @@ func TestPlan_NormalizeAllocations(t *testing.T) {
ID: stoppedAlloc.ID,
DesiredDescription: desiredDesc,
ClientStatus: AllocClientStatusLost,
FollowupEvalID: "followup-eval-id",
}
assert.Equal(t, expectedStoppedAlloc, actualStoppedAlloc)
actualPreemptedAlloc := plan.NodePreemptions[preemptedAlloc.NodeID][0]
@ -3593,7 +3594,7 @@ func TestPlan_AppendStoppedAllocAppendsAllocWithUpdatedAttrs(t *testing.T) {
alloc := MockAlloc()
desiredDesc := "Desired desc"
plan.AppendStoppedAlloc(alloc, desiredDesc, AllocClientStatusLost)
plan.AppendStoppedAlloc(alloc, desiredDesc, AllocClientStatusLost, "")
expectedAlloc := new(Allocation)
*expectedAlloc = *alloc

View File

@ -437,7 +437,7 @@ func TestWorker_SubmitPlanNormalizedAllocations(t *testing.T) {
NodePreemptions: make(map[string][]*structs.Allocation),
}
desiredDescription := "desired desc"
plan.AppendStoppedAlloc(stoppedAlloc, desiredDescription, structs.AllocClientStatusLost)
plan.AppendStoppedAlloc(stoppedAlloc, desiredDescription, structs.AllocClientStatusLost, "")
preemptingAllocID := uuid.Generate()
plan.AppendPreemptedAlloc(preemptedAlloc, preemptingAllocID)

View File

@ -378,7 +378,7 @@ func (s *GenericScheduler) computeJobAllocs() error {
// Handle the stop
for _, stop := range results.stop {
s.plan.AppendStoppedAlloc(stop.alloc, stop.statusDescription, stop.clientStatus)
s.plan.AppendStoppedAlloc(stop.alloc, stop.statusDescription, stop.clientStatus, stop.followupEvalID)
}
// Handle the in-place updates
@ -476,7 +476,7 @@ func (s *GenericScheduler) computePlacements(destructive, place []placementResul
stopPrevAlloc, stopPrevAllocDesc := missing.StopPreviousAlloc()
prevAllocation := missing.PreviousAllocation()
if stopPrevAlloc {
s.plan.AppendStoppedAlloc(prevAllocation, stopPrevAllocDesc, "")
s.plan.AppendStoppedAlloc(prevAllocation, stopPrevAllocDesc, "", "")
}
// Compute penalty nodes for rescheduled allocs

View File

@ -301,6 +301,19 @@ func (a *allocReconciler) markStop(allocs allocSet, clientStatus, statusDescript
}
}
// markDelayed does markStop, but optionally includes a FollowupEvalID so that we can update
// the stopped alloc with its delayed rescheduling evalID
func (a *allocReconciler) markDelayed(allocs allocSet, clientStatus, statusDescription string, followupEvals map[string]string) {
for _, alloc := range allocs {
a.result.stop = append(a.result.stop, allocStopResult{
alloc: alloc,
clientStatus: clientStatus,
statusDescription: statusDescription,
followupEvalID: followupEvals[alloc.ID],
})
}
}
// computeGroup reconciles state for a particular task group. It returns whether
// the deployment it is for is complete with regards to the task group.
func (a *allocReconciler) computeGroup(group string, all allocSet) bool {
@ -355,7 +368,7 @@ func (a *allocReconciler) computeGroup(group string, all allocSet) bool {
// Find delays for any lost allocs that have stop_after_client_disconnect
lostLater := lost.delayByStopAfterClientDisconnect()
a.handleDelayedLost(lostLater, all, tg.Name)
lostLaterEvals := a.handleDelayedLost(lostLater, all, tg.Name)
// Create batched follow up evaluations for allocations that are
// reschedulable later and mark the allocations for in place updating
@ -368,7 +381,7 @@ func (a *allocReconciler) computeGroup(group string, all allocSet) bool {
// Stop any unneeded allocations and update the untainted set to not
// included stopped allocations.
canaryState := dstate != nil && dstate.DesiredCanaries != 0 && !dstate.Promoted
stop := a.computeStop(tg, nameIndex, untainted, migrate, lost, canaries, canaryState)
stop := a.computeStop(tg, nameIndex, untainted, migrate, lost, canaries, canaryState, lostLaterEvals)
desiredChanges.Stop += uint64(len(stop))
untainted = untainted.difference(stop)
@ -705,13 +718,13 @@ func (a *allocReconciler) computePlacements(group *structs.TaskGroup,
// the group definition, the set of allocations in various states and whether we
// are canarying.
func (a *allocReconciler) computeStop(group *structs.TaskGroup, nameIndex *allocNameIndex,
untainted, migrate, lost, canaries allocSet, canaryState bool) allocSet {
untainted, migrate, lost, canaries allocSet, canaryState bool, followupEvals map[string]string) allocSet {
// Mark all lost allocations for stop. Previous allocation doesn't matter
// here since it is on a lost node
var stop allocSet
stop = stop.union(lost)
a.markStop(lost, structs.AllocClientStatusLost, allocLost)
a.markDelayed(lost, structs.AllocClientStatusLost, allocLost, followupEvals)
// If we are still deploying or creating canaries, don't stop them
if canaryState {
@ -836,22 +849,33 @@ func (a *allocReconciler) computeUpdates(group *structs.TaskGroup, untainted all
return
}
// handleDelayedReschedules creates batched followup evaluations with the WaitUntil field set
// for allocations that are eligible to be rescheduled later
// handleDelayedReschedules creates batched followup evaluations with the WaitUntil field
// set for allocations that are eligible to be rescheduled later, and marks the alloc with
// the followupEvalID
func (a *allocReconciler) handleDelayedReschedules(rescheduleLater []*delayedRescheduleInfo, all allocSet, tgName string) {
a.handleDelayedReschedulesImpl(rescheduleLater, all, tgName, true)
// followupEvals are created in the same way as for delayed lost allocs
allocIDToFollowupEvalID := a.handleDelayedLost(rescheduleLater, all, tgName)
// Initialize the annotations
if len(allocIDToFollowupEvalID) != 0 && a.result.attributeUpdates == nil {
a.result.attributeUpdates = make(map[string]*structs.Allocation)
}
// Create updates that will be applied to the allocs to mark the FollowupEvalID
for allocID, evalID := range allocIDToFollowupEvalID {
existingAlloc := all[allocID]
updatedAlloc := existingAlloc.Copy()
updatedAlloc.FollowupEvalID = evalID
a.result.attributeUpdates[updatedAlloc.ID] = updatedAlloc
}
}
// handleDelayedLost creates batched followup evaluations with the WaitUntil field set for lost allocations
func (a *allocReconciler) handleDelayedLost(rescheduleLater []*delayedRescheduleInfo, all allocSet, tgName string) {
a.handleDelayedReschedulesImpl(rescheduleLater, all, tgName, false)
}
// handleDelayedReschedulesImpl creates batched followup evaluations with the WaitUntil field set
func (a *allocReconciler) handleDelayedReschedulesImpl(rescheduleLater []*delayedRescheduleInfo, all allocSet, tgName string,
createUpdates bool) {
// handleDelayedLost creates batched followup evaluations with the WaitUntil field set for
// lost allocations. followupEvals are appended to a.result as a side effect, we return a
// map of alloc IDs to their followupEval IDs
func (a *allocReconciler) handleDelayedLost(rescheduleLater []*delayedRescheduleInfo, all allocSet, tgName string) map[string]string {
if len(rescheduleLater) == 0 {
return
return map[string]string{}
}
// Sort by time
@ -904,18 +928,5 @@ func (a *allocReconciler) handleDelayedReschedulesImpl(rescheduleLater []*delaye
a.result.desiredFollowupEvals[tgName] = evals
// Initialize the annotations
if len(allocIDToFollowupEvalID) != 0 && a.result.attributeUpdates == nil {
a.result.attributeUpdates = make(map[string]*structs.Allocation)
}
// Create in-place updates for every alloc ID that needs to be updated with its follow up eval ID
if createUpdates {
for allocID, evalID := range allocIDToFollowupEvalID {
existingAlloc := all[allocID]
updatedAlloc := existingAlloc.Copy()
updatedAlloc.FollowupEvalID = evalID
a.result.attributeUpdates[updatedAlloc.ID] = updatedAlloc
}
}
return allocIDToFollowupEvalID
}

View File

@ -41,6 +41,7 @@ type allocStopResult struct {
alloc *structs.Allocation
clientStatus string
statusDescription string
followupEvalID string
}
// allocPlaceResult contains the information required to place a single

View File

@ -212,18 +212,18 @@ func (s *SystemScheduler) computeJobAllocs() error {
// Add all the allocs to stop
for _, e := range diff.stop {
s.plan.AppendStoppedAlloc(e.Alloc, allocNotNeeded, "")
s.plan.AppendStoppedAlloc(e.Alloc, allocNotNeeded, "", "")
}
// Add all the allocs to migrate
for _, e := range diff.migrate {
s.plan.AppendStoppedAlloc(e.Alloc, allocNodeTainted, "")
s.plan.AppendStoppedAlloc(e.Alloc, allocNodeTainted, "", "")
}
// Lost allocations should be transitioned to desired status stop and client
// status lost.
for _, e := range diff.lost {
s.plan.AppendStoppedAlloc(e.Alloc, allocLost, structs.AllocClientStatusLost)
s.plan.AppendStoppedAlloc(e.Alloc, allocLost, structs.AllocClientStatusLost, "")
}
// Attempt to do the upgrades in place

View File

@ -601,7 +601,7 @@ func inplaceUpdate(ctx Context, eval *structs.Evaluation, job *structs.Job,
// the current allocation is discounted when checking for feasibility.
// Otherwise we would be trying to fit the tasks current resources and
// updated resources. After select is called we can remove the evict.
ctx.Plan().AppendStoppedAlloc(update.Alloc, allocInPlace, "")
ctx.Plan().AppendStoppedAlloc(update.Alloc, allocInPlace, "", "")
// Attempt to match the task group
option := stack.Select(update.TaskGroup, nil) // This select only looks at one node so we don't pass selectOptions
@ -670,7 +670,7 @@ func evictAndPlace(ctx Context, diff *diffResult, allocs []allocTuple, desc stri
n := len(allocs)
for i := 0; i < n && i < *limit; i++ {
a := allocs[i]
ctx.Plan().AppendStoppedAlloc(a.Alloc, desc, "")
ctx.Plan().AppendStoppedAlloc(a.Alloc, desc, "", "")
diff.place = append(diff.place, a)
}
if n <= *limit {
@ -831,7 +831,7 @@ func updateNonTerminalAllocsToLost(plan *structs.Plan, tainted map[string]*struc
alloc.DesiredStatus == structs.AllocDesiredStatusEvict) &&
(alloc.ClientStatus == structs.AllocClientStatusRunning ||
alloc.ClientStatus == structs.AllocClientStatusPending) {
plan.AppendStoppedAlloc(alloc, allocLost, structs.AllocClientStatusLost)
plan.AppendStoppedAlloc(alloc, allocLost, structs.AllocClientStatusLost, "")
}
}
}
@ -881,7 +881,7 @@ func genericAllocUpdateFn(ctx Context, stack Stack, evalID string) allocUpdateTy
// the current allocation is discounted when checking for feasibility.
// Otherwise we would be trying to fit the tasks current resources and
// updated resources. After select is called we can remove the evict.
ctx.Plan().AppendStoppedAlloc(existing, allocInPlace, "")
ctx.Plan().AppendStoppedAlloc(existing, allocInPlace, "", "")
// Attempt to match the task group
option := stack.Select(newTG, nil) // This select only looks at one node so we don't pass selectOptions