From 00537c739bb5c1f3d5a8ec08d6345399aa6e617a Mon Sep 17 00:00:00 2001 From: Preetha Appan Date: Tue, 3 Apr 2018 15:49:18 -0500 Subject: [PATCH] Fixes edge cases around timing and task finish time being set more than once --- client/alloc_runner.go | 23 ++- nomad/structs/structs.go | 11 +- nomad/structs/structs_test.go | 16 +- scheduler/generic_sched.go | 2 +- scheduler/reconcile.go | 21 ++- scheduler/reconcile_test.go | 282 ++++++++++++++++++++++++++-------- scheduler/reconcile_util.go | 10 +- 7 files changed, 270 insertions(+), 95 deletions(-) diff --git a/client/alloc_runner.go b/client/alloc_runner.go index 5e42f7329..03401990f 100644 --- a/client/alloc_runner.go +++ b/client/alloc_runner.go @@ -526,6 +526,23 @@ func copyTaskStates(states map[string]*structs.TaskState) map[string]*structs.Ta // Alloc returns the associated allocation func (r *AllocRunner) Alloc() *structs.Allocation { r.allocLock.Lock() + if r.alloc.TerminalStatus() { + group := r.alloc.Job.LookupTaskGroup(r.alloc.TaskGroup) + if r.alloc.TaskStates == nil { + r.alloc.TaskStates = make(map[string]*structs.TaskState) + } + now := time.Now() + for _, task := range group.Tasks { + ts, ok := r.alloc.TaskStates[task.Name] + if !ok { + ts = &structs.TaskState{} + r.alloc.TaskStates[task.Name] = ts + } + if ts.FinishedAt.IsZero() { + ts.FinishedAt = now + } + } + } // Don't do a deep copy of the job alloc := r.alloc.CopySkipJob() @@ -715,8 +732,10 @@ func (r *AllocRunner) setTaskState(taskName, state string, event *structs.TaskEv } } case structs.TaskStateDead: - // Capture the finished time. - taskState.FinishedAt = time.Now().UTC() + // Capture the finished time if not already set + if taskState.FinishedAt.IsZero() { + taskState.FinishedAt = time.Now().UTC() + } // Find all tasks that are not the one that is dead and check if the one // that is dead is a leader diff --git a/nomad/structs/structs.go b/nomad/structs/structs.go index 328ff86a1..b73869fbf 100644 --- a/nomad/structs/structs.go +++ b/nomad/structs/structs.go @@ -5763,16 +5763,7 @@ func (a *Allocation) LastEventTime() time.Time { } } } - // If no tasks have FinsihedAt set, examine task events - if lastEventTime.IsZero() { - for _, s := range a.TaskStates { - for _, e := range s.Events { - if lastEventTime.IsZero() || e.Time > lastEventTime.UnixNano() { - lastEventTime = time.Unix(0, e.Time).UTC() - } - } - } - } + if lastEventTime.IsZero() { return time.Unix(0, a.ModifyTime).UTC() } diff --git a/nomad/structs/structs_test.go b/nomad/structs/structs_test.go index 1f2193b29..b3b37a202 100644 --- a/nomad/structs/structs_test.go +++ b/nomad/structs/structs_test.go @@ -2692,7 +2692,7 @@ func TestAllocation_LastEventTime(t *testing.T) { expectedLastEventTime: t1.Add(-40 * time.Minute), }, { - desc: "No finishedAt set, one task event", + desc: "No finishedAt set, one task event, should use modify time", taskState: map[string]*TaskState{"foo": { State: "run", StartedAt: t1.Add(-2 * time.Hour), @@ -2700,19 +2700,7 @@ func TestAllocation_LastEventTime(t *testing.T) { {Type: "start", Time: t1.Add(-20 * time.Minute).UnixNano()}, }}, }, - expectedLastEventTime: t1.Add(-20 * time.Minute), - }, - { - desc: "No finishedAt set, many task events", - taskState: map[string]*TaskState{"foo": { - State: "run", - StartedAt: t1.Add(-2 * time.Hour), - Events: []*TaskEvent{ - {Type: "start", Time: t1.Add(-20 * time.Minute).UnixNano()}, - {Type: "status change", Time: t1.Add(-10 * time.Minute).UnixNano()}, - }}, - }, - expectedLastEventTime: t1.Add(-10 * time.Minute), + expectedLastEventTime: t1, }, } for _, tc := range testCases { diff --git a/scheduler/generic_sched.go b/scheduler/generic_sched.go index a81fbb62b..af6de95cc 100644 --- a/scheduler/generic_sched.go +++ b/scheduler/generic_sched.go @@ -330,7 +330,7 @@ func (s *GenericScheduler) computeJobAllocs() error { reconciler := NewAllocReconciler(s.ctx.Logger(), genericAllocUpdateFn(s.ctx, s.stack, s.eval.ID), - s.batch, s.eval.JobID, s.job, s.deployment, allocs, tainted) + s.batch, s.eval.JobID, s.job, s.deployment, allocs, tainted, s.eval.ID) results := reconciler.Compute() s.logger.Printf("[DEBUG] sched: %#v: %#v", s.eval, results) diff --git a/scheduler/reconcile.go b/scheduler/reconcile.go index 416cee45d..c851f6f8f 100644 --- a/scheduler/reconcile.go +++ b/scheduler/reconcile.go @@ -16,6 +16,11 @@ const ( // batchedFailedAllocWindowSize is the window size used // to batch up failed allocations before creating an eval batchedFailedAllocWindowSize = 5 * time.Second + + // rescheduleTimeLapseWindowSize is the window size relative to + // current time within which reschedulable allocations are placed. + // This helps protect against small clock drifts between servers + rescheduleTimeLapseWindowSize = 1 * time.Second ) // allocUpdateType takes an existing allocation and a new job definition and @@ -67,6 +72,13 @@ type allocReconciler struct { // existingAllocs is non-terminal existing allocations existingAllocs []*structs.Allocation + // evalID is the ID of the evaluation that triggered the reconciler + evalID string + + // now is used to override current time used when determining rescheduling eligibility + // used in unit tests + now time.Time + // result is the results of the reconcile. During computation it can be // used to store intermediate state result *reconcileResults @@ -145,7 +157,7 @@ func (r *reconcileResults) Changes() int { // the changes required to bring the cluster state inline with the declared jobspec func NewAllocReconciler(logger *log.Logger, allocUpdateFn allocUpdateType, batch bool, jobID string, job *structs.Job, deployment *structs.Deployment, - existingAllocs []*structs.Allocation, taintedNodes map[string]*structs.Node) *allocReconciler { + existingAllocs []*structs.Allocation, taintedNodes map[string]*structs.Node, evalID string) *allocReconciler { return &allocReconciler{ logger: logger, @@ -156,6 +168,7 @@ func NewAllocReconciler(logger *log.Logger, allocUpdateFn allocUpdateType, batch deployment: deployment.Copy(), existingAllocs: existingAllocs, taintedNodes: taintedNodes, + evalID: evalID, result: &reconcileResults{ desiredTGUpdates: make(map[string]*structs.DesiredUpdates), desiredFollowupEvals: make(map[string][]*structs.Evaluation), @@ -340,8 +353,12 @@ func (a *allocReconciler) computeGroup(group string, all allocSet) bool { // Determine what set of allocations are on tainted nodes untainted, migrate, lost := all.filterByTainted(a.taintedNodes) + if a.now.IsZero() { + a.now = time.Now() + } + // Determine what set of terminal allocations need to be rescheduled - untainted, rescheduleNow, rescheduleLater := untainted.filterByRescheduleable(a.batch) + untainted, rescheduleNow, rescheduleLater := untainted.filterByRescheduleable(a.batch, a.now, a.evalID) // Create batched follow up evaluations for allocations that are // reschedulable later and mark the allocations for in place updating diff --git a/scheduler/reconcile_test.go b/scheduler/reconcile_test.go index 8a24646af..54411c912 100644 --- a/scheduler/reconcile_test.go +++ b/scheduler/reconcile_test.go @@ -346,7 +346,7 @@ func assertResults(t *testing.T, r *reconcileResults, exp *resultExpectation) { // existing allocations func TestReconciler_Place_NoExisting(t *testing.T) { job := mock.Job() - reconciler := NewAllocReconciler(testLogger(), allocUpdateFnIgnore, false, job.ID, job, nil, nil, nil) + reconciler := NewAllocReconciler(testLogger(), allocUpdateFnIgnore, false, job.ID, job, nil, nil, nil, "") r := reconciler.Compute() // Assert the correct results @@ -382,7 +382,7 @@ func TestReconciler_Place_Existing(t *testing.T) { allocs = append(allocs, alloc) } - reconciler := NewAllocReconciler(testLogger(), allocUpdateFnIgnore, false, job.ID, job, nil, allocs, nil) + reconciler := NewAllocReconciler(testLogger(), allocUpdateFnIgnore, false, job.ID, job, nil, allocs, nil, "") r := reconciler.Compute() // Assert the correct results @@ -420,7 +420,7 @@ func TestReconciler_ScaleDown_Partial(t *testing.T) { allocs = append(allocs, alloc) } - reconciler := NewAllocReconciler(testLogger(), allocUpdateFnIgnore, false, job.ID, job, nil, allocs, nil) + reconciler := NewAllocReconciler(testLogger(), allocUpdateFnIgnore, false, job.ID, job, nil, allocs, nil, "") r := reconciler.Compute() // Assert the correct results @@ -459,7 +459,7 @@ func TestReconciler_ScaleDown_Zero(t *testing.T) { allocs = append(allocs, alloc) } - reconciler := NewAllocReconciler(testLogger(), allocUpdateFnIgnore, false, job.ID, job, nil, allocs, nil) + reconciler := NewAllocReconciler(testLogger(), allocUpdateFnIgnore, false, job.ID, job, nil, allocs, nil, "") r := reconciler.Compute() // Assert the correct results @@ -499,7 +499,7 @@ func TestReconciler_ScaleDown_Zero_DuplicateNames(t *testing.T) { expectedStopped = append(expectedStopped, i%2) } - reconciler := NewAllocReconciler(testLogger(), allocUpdateFnIgnore, false, job.ID, job, nil, allocs, nil) + reconciler := NewAllocReconciler(testLogger(), allocUpdateFnIgnore, false, job.ID, job, nil, allocs, nil, "") r := reconciler.Compute() // Assert the correct results @@ -534,7 +534,7 @@ func TestReconciler_Inplace(t *testing.T) { allocs = append(allocs, alloc) } - reconciler := NewAllocReconciler(testLogger(), allocUpdateFnInplace, false, job.ID, job, nil, allocs, nil) + reconciler := NewAllocReconciler(testLogger(), allocUpdateFnInplace, false, job.ID, job, nil, allocs, nil, "") r := reconciler.Compute() // Assert the correct results @@ -572,7 +572,7 @@ func TestReconciler_Inplace_ScaleUp(t *testing.T) { allocs = append(allocs, alloc) } - reconciler := NewAllocReconciler(testLogger(), allocUpdateFnInplace, false, job.ID, job, nil, allocs, nil) + reconciler := NewAllocReconciler(testLogger(), allocUpdateFnInplace, false, job.ID, job, nil, allocs, nil, "") r := reconciler.Compute() // Assert the correct results @@ -612,7 +612,7 @@ func TestReconciler_Inplace_ScaleDown(t *testing.T) { allocs = append(allocs, alloc) } - reconciler := NewAllocReconciler(testLogger(), allocUpdateFnInplace, false, job.ID, job, nil, allocs, nil) + reconciler := NewAllocReconciler(testLogger(), allocUpdateFnInplace, false, job.ID, job, nil, allocs, nil, "") r := reconciler.Compute() // Assert the correct results @@ -649,7 +649,7 @@ func TestReconciler_Destructive(t *testing.T) { allocs = append(allocs, alloc) } - reconciler := NewAllocReconciler(testLogger(), allocUpdateFnDestructive, false, job.ID, job, nil, allocs, nil) + reconciler := NewAllocReconciler(testLogger(), allocUpdateFnDestructive, false, job.ID, job, nil, allocs, nil, "") r := reconciler.Compute() // Assert the correct results @@ -685,7 +685,7 @@ func TestReconciler_Destructive_ScaleUp(t *testing.T) { allocs = append(allocs, alloc) } - reconciler := NewAllocReconciler(testLogger(), allocUpdateFnDestructive, false, job.ID, job, nil, allocs, nil) + reconciler := NewAllocReconciler(testLogger(), allocUpdateFnDestructive, false, job.ID, job, nil, allocs, nil, "") r := reconciler.Compute() // Assert the correct results @@ -724,7 +724,7 @@ func TestReconciler_Destructive_ScaleDown(t *testing.T) { allocs = append(allocs, alloc) } - reconciler := NewAllocReconciler(testLogger(), allocUpdateFnDestructive, false, job.ID, job, nil, allocs, nil) + reconciler := NewAllocReconciler(testLogger(), allocUpdateFnDestructive, false, job.ID, job, nil, allocs, nil, "") r := reconciler.Compute() // Assert the correct results @@ -769,7 +769,7 @@ func TestReconciler_LostNode(t *testing.T) { tainted[n.ID] = n } - reconciler := NewAllocReconciler(testLogger(), allocUpdateFnIgnore, false, job.ID, job, nil, allocs, tainted) + reconciler := NewAllocReconciler(testLogger(), allocUpdateFnIgnore, false, job.ID, job, nil, allocs, tainted, "") r := reconciler.Compute() // Assert the correct results @@ -819,7 +819,7 @@ func TestReconciler_LostNode_ScaleUp(t *testing.T) { tainted[n.ID] = n } - reconciler := NewAllocReconciler(testLogger(), allocUpdateFnIgnore, false, job.ID, job, nil, allocs, tainted) + reconciler := NewAllocReconciler(testLogger(), allocUpdateFnIgnore, false, job.ID, job, nil, allocs, tainted, "") r := reconciler.Compute() // Assert the correct results @@ -869,7 +869,7 @@ func TestReconciler_LostNode_ScaleDown(t *testing.T) { tainted[n.ID] = n } - reconciler := NewAllocReconciler(testLogger(), allocUpdateFnIgnore, false, job.ID, job, nil, allocs, tainted) + reconciler := NewAllocReconciler(testLogger(), allocUpdateFnIgnore, false, job.ID, job, nil, allocs, tainted, "") r := reconciler.Compute() // Assert the correct results @@ -915,7 +915,7 @@ func TestReconciler_DrainNode(t *testing.T) { tainted[n.ID] = n } - reconciler := NewAllocReconciler(testLogger(), allocUpdateFnIgnore, false, job.ID, job, nil, allocs, tainted) + reconciler := NewAllocReconciler(testLogger(), allocUpdateFnIgnore, false, job.ID, job, nil, allocs, tainted, "") r := reconciler.Compute() // Assert the correct results @@ -968,7 +968,7 @@ func TestReconciler_DrainNode_ScaleUp(t *testing.T) { tainted[n.ID] = n } - reconciler := NewAllocReconciler(testLogger(), allocUpdateFnIgnore, false, job.ID, job, nil, allocs, tainted) + reconciler := NewAllocReconciler(testLogger(), allocUpdateFnIgnore, false, job.ID, job, nil, allocs, tainted, "") r := reconciler.Compute() // Assert the correct results @@ -1022,7 +1022,7 @@ func TestReconciler_DrainNode_ScaleDown(t *testing.T) { tainted[n.ID] = n } - reconciler := NewAllocReconciler(testLogger(), allocUpdateFnIgnore, false, job.ID, job, nil, allocs, tainted) + reconciler := NewAllocReconciler(testLogger(), allocUpdateFnIgnore, false, job.ID, job, nil, allocs, tainted, "") r := reconciler.Compute() // Assert the correct results @@ -1067,7 +1067,7 @@ func TestReconciler_RemovedTG(t *testing.T) { newName := "different" job.TaskGroups[0].Name = newName - reconciler := NewAllocReconciler(testLogger(), allocUpdateFnIgnore, false, job.ID, job, nil, allocs, nil) + reconciler := NewAllocReconciler(testLogger(), allocUpdateFnIgnore, false, job.ID, job, nil, allocs, nil, "") r := reconciler.Compute() // Assert the correct results @@ -1129,7 +1129,7 @@ func TestReconciler_JobStopped(t *testing.T) { allocs = append(allocs, alloc) } - reconciler := NewAllocReconciler(testLogger(), allocUpdateFnIgnore, false, c.jobID, c.job, nil, allocs, nil) + reconciler := NewAllocReconciler(testLogger(), allocUpdateFnIgnore, false, c.jobID, c.job, nil, allocs, nil, "") r := reconciler.Compute() // Assert the correct results @@ -1169,7 +1169,7 @@ func TestReconciler_MultiTG(t *testing.T) { allocs = append(allocs, alloc) } - reconciler := NewAllocReconciler(testLogger(), allocUpdateFnIgnore, false, job.ID, job, nil, allocs, nil) + reconciler := NewAllocReconciler(testLogger(), allocUpdateFnIgnore, false, job.ID, job, nil, allocs, nil, "") r := reconciler.Compute() // Assert the correct results @@ -1248,7 +1248,7 @@ func TestReconciler_RescheduleLater_Batch(t *testing.T) { // Mark one as complete allocs[5].ClientStatus = structs.AllocClientStatusComplete - reconciler := NewAllocReconciler(testLogger(), allocUpdateFnIgnore, true, job.ID, job, nil, allocs, nil) + reconciler := NewAllocReconciler(testLogger(), allocUpdateFnIgnore, true, job.ID, job, nil, allocs, nil, "") r := reconciler.Compute() // Two reschedule attempts were already made, one more can be made at a future time @@ -1327,7 +1327,7 @@ func TestReconciler_RescheduleLaterWithBatchedEvals_Batch(t *testing.T) { FinishedAt: now.Add(10 * time.Second)}} } - reconciler := NewAllocReconciler(testLogger(), allocUpdateFnIgnore, true, job.ID, job, nil, allocs, nil) + reconciler := NewAllocReconciler(testLogger(), allocUpdateFnIgnore, true, job.ID, job, nil, allocs, nil, "") r := reconciler.Compute() // Verify that two follow up evals were created @@ -1405,7 +1405,8 @@ func TestReconciler_RescheduleNow_Batch(t *testing.T) { allocs[2].ClientStatus = structs.AllocClientStatusFailed allocs[2].TaskStates = map[string]*structs.TaskState{tgName: {State: "start", StartedAt: now.Add(-1 * time.Hour), - FinishedAt: now.Add(-10 * time.Second)}} + FinishedAt: now.Add(-5 * time.Second)}} + allocs[2].FollowupEvalID = uuid.Generate() allocs[2].RescheduleTracker = &structs.RescheduleTracker{Events: []*structs.RescheduleEvent{ {RescheduleTime: time.Now().Add(-2 * time.Hour).UTC().UnixNano(), PrevAllocID: allocs[0].ID, @@ -1419,7 +1420,8 @@ func TestReconciler_RescheduleNow_Batch(t *testing.T) { // Mark one as complete allocs[5].ClientStatus = structs.AllocClientStatusComplete - reconciler := NewAllocReconciler(testLogger(), allocUpdateFnIgnore, true, job.ID, job, nil, allocs, nil) + reconciler := NewAllocReconciler(testLogger(), allocUpdateFnIgnore, true, job.ID, job, nil, allocs, nil, "") + reconciler.now = now r := reconciler.Compute() // Verify that no follow up evals were created @@ -1492,7 +1494,7 @@ func TestReconciler_RescheduleLater_Service(t *testing.T) { // Mark one as desired state stop allocs[4].DesiredStatus = structs.AllocDesiredStatusStop - reconciler := NewAllocReconciler(testLogger(), allocUpdateFnIgnore, false, job.ID, job, nil, allocs, nil) + reconciler := NewAllocReconciler(testLogger(), allocUpdateFnIgnore, false, job.ID, job, nil, allocs, nil, "") r := reconciler.Compute() // Should place a new placement and create a follow up eval for the delayed reschedule @@ -1560,7 +1562,7 @@ func TestReconciler_Service_ClientStatusComplete(t *testing.T) { // Mark one as client status complete allocs[4].ClientStatus = structs.AllocClientStatusComplete - reconciler := NewAllocReconciler(testLogger(), allocUpdateFnIgnore, false, job.ID, job, nil, allocs, nil) + reconciler := NewAllocReconciler(testLogger(), allocUpdateFnIgnore, false, job.ID, job, nil, allocs, nil, "") r := reconciler.Compute() // Should place a new placement for the alloc that was marked complete @@ -1616,7 +1618,7 @@ func TestReconciler_Service_DesiredStop_ClientStatusComplete(t *testing.T) { allocs[4].ClientStatus = structs.AllocClientStatusFailed allocs[4].DesiredStatus = structs.AllocDesiredStatusStop - reconciler := NewAllocReconciler(testLogger(), allocUpdateFnIgnore, false, job.ID, job, nil, allocs, nil) + reconciler := NewAllocReconciler(testLogger(), allocUpdateFnIgnore, false, job.ID, job, nil, allocs, nil, "") r := reconciler.Compute() // Should place a new placement for the alloc that was marked stopped @@ -1693,7 +1695,7 @@ func TestReconciler_RescheduleNow_Service(t *testing.T) { // Mark one as desired state stop allocs[4].DesiredStatus = structs.AllocDesiredStatusStop - reconciler := NewAllocReconciler(testLogger(), allocUpdateFnIgnore, false, job.ID, job, nil, allocs, nil) + reconciler := NewAllocReconciler(testLogger(), allocUpdateFnIgnore, false, job.ID, job, nil, allocs, nil, "") r := reconciler.Compute() // Verify that no follow up evals were created @@ -1721,6 +1723,164 @@ func TestReconciler_RescheduleNow_Service(t *testing.T) { assertPlacementsAreRescheduled(t, 1, r.place) } +// Tests rescheduling failed service allocations when there's clock drift (upto a second) +func TestReconciler_RescheduleNow_WithinAllowedTimeWindow(t *testing.T) { + require := require.New(t) + + // Set desired 5 + job := mock.Job() + job.TaskGroups[0].Count = 5 + tgName := job.TaskGroups[0].Name + now := time.Now() + + // Set up reschedule policy and update stanza + job.TaskGroups[0].ReschedulePolicy = &structs.ReschedulePolicy{ + Attempts: 1, + Interval: 24 * time.Hour, + Delay: 5 * time.Second, + DelayFunction: "", + MaxDelay: 1 * time.Hour, + Unlimited: false, + } + job.TaskGroups[0].Update = noCanaryUpdate + + // Create 5 existing allocations + var allocs []*structs.Allocation + for i := 0; i < 5; i++ { + alloc := mock.Alloc() + alloc.Job = job + alloc.JobID = job.ID + alloc.NodeID = uuid.Generate() + alloc.Name = structs.AllocName(job.ID, job.TaskGroups[0].Name, uint(i)) + allocs = append(allocs, alloc) + alloc.ClientStatus = structs.AllocClientStatusRunning + } + + // Mark one as failed + allocs[0].ClientStatus = structs.AllocClientStatusFailed + + // Mark one of them as already rescheduled once + allocs[0].RescheduleTracker = &structs.RescheduleTracker{Events: []*structs.RescheduleEvent{ + {RescheduleTime: time.Now().Add(-1 * time.Hour).UTC().UnixNano(), + PrevAllocID: uuid.Generate(), + PrevNodeID: uuid.Generate(), + }, + }} + // Set fail time to 4 seconds ago which falls within the reschedule window + allocs[1].TaskStates = map[string]*structs.TaskState{tgName: {State: "start", + StartedAt: now.Add(-1 * time.Hour), + FinishedAt: now.Add(-4 * time.Second)}} + allocs[1].ClientStatus = structs.AllocClientStatusFailed + + reconciler := NewAllocReconciler(testLogger(), allocUpdateFnIgnore, false, job.ID, job, nil, allocs, nil, "") + reconciler.now = now + r := reconciler.Compute() + + // Verify that no follow up evals were created + evals := r.desiredFollowupEvals[tgName] + require.Nil(evals) + + // Verify that one rescheduled alloc was placed + assertResults(t, r, &resultExpectation{ + createDeployment: nil, + deploymentUpdates: nil, + place: 1, + inplace: 0, + stop: 0, + desiredTGUpdates: map[string]*structs.DesiredUpdates{ + job.TaskGroups[0].Name: { + Place: 1, + Ignore: 4, + }, + }, + }) + + // Rescheduled allocs should have previous allocs + assertNamesHaveIndexes(t, intRange(1, 1), placeResultsToNames(r.place)) + assertPlaceResultsHavePreviousAllocs(t, 1, r.place) + assertPlacementsAreRescheduled(t, 1, r.place) +} + +// Tests rescheduling failed service allocations when the eval ID matches and there's a large clock drift +func TestReconciler_RescheduleNow_EvalIDMatch(t *testing.T) { + require := require.New(t) + + // Set desired 5 + job := mock.Job() + job.TaskGroups[0].Count = 5 + tgName := job.TaskGroups[0].Name + now := time.Now() + + // Set up reschedule policy and update stanza + job.TaskGroups[0].ReschedulePolicy = &structs.ReschedulePolicy{ + Attempts: 1, + Interval: 24 * time.Hour, + Delay: 5 * time.Second, + DelayFunction: "", + MaxDelay: 1 * time.Hour, + Unlimited: false, + } + job.TaskGroups[0].Update = noCanaryUpdate + + // Create 5 existing allocations + var allocs []*structs.Allocation + for i := 0; i < 5; i++ { + alloc := mock.Alloc() + alloc.Job = job + alloc.JobID = job.ID + alloc.NodeID = uuid.Generate() + alloc.Name = structs.AllocName(job.ID, job.TaskGroups[0].Name, uint(i)) + allocs = append(allocs, alloc) + alloc.ClientStatus = structs.AllocClientStatusRunning + } + + // Mark one as failed + allocs[0].ClientStatus = structs.AllocClientStatusFailed + + // Mark one of them as already rescheduled once + allocs[0].RescheduleTracker = &structs.RescheduleTracker{Events: []*structs.RescheduleEvent{ + {RescheduleTime: time.Now().Add(-1 * time.Hour).UTC().UnixNano(), + PrevAllocID: uuid.Generate(), + PrevNodeID: uuid.Generate(), + }, + }} + // Set fail time to 5 seconds ago and eval ID + evalID := uuid.Generate() + allocs[1].TaskStates = map[string]*structs.TaskState{tgName: {State: "start", + StartedAt: now.Add(-1 * time.Hour), + FinishedAt: now.Add(-5 * time.Second)}} + allocs[1].ClientStatus = structs.AllocClientStatusFailed + allocs[1].FollowupEvalID = evalID + + reconciler := NewAllocReconciler(testLogger(), allocUpdateFnIgnore, false, job.ID, job, nil, allocs, nil, evalID) + reconciler.now = now.Add(-30 * time.Second) + r := reconciler.Compute() + + // Verify that no follow up evals were created + evals := r.desiredFollowupEvals[tgName] + require.Nil(evals) + + // Verify that one rescheduled alloc was placed + assertResults(t, r, &resultExpectation{ + createDeployment: nil, + deploymentUpdates: nil, + place: 1, + inplace: 0, + stop: 0, + desiredTGUpdates: map[string]*structs.DesiredUpdates{ + job.TaskGroups[0].Name: { + Place: 1, + Ignore: 4, + }, + }, + }) + + // Rescheduled allocs should have previous allocs + assertNamesHaveIndexes(t, intRange(1, 1), placeResultsToNames(r.place)) + assertPlaceResultsHavePreviousAllocs(t, 1, r.place) + assertPlacementsAreRescheduled(t, 1, r.place) +} + // Tests failed service allocations that were already rescheduled won't be rescheduled again func TestReconciler_DontReschedule_PreviouslyRescheduled(t *testing.T) { // Set desired 5 @@ -1756,7 +1916,7 @@ func TestReconciler_DontReschedule_PreviouslyRescheduled(t *testing.T) { // Mark one as desired state stop allocs[4].DesiredStatus = structs.AllocDesiredStatusStop - reconciler := NewAllocReconciler(testLogger(), allocUpdateFnIgnore, false, job.ID, job, nil, allocs, nil) + reconciler := NewAllocReconciler(testLogger(), allocUpdateFnIgnore, false, job.ID, job, nil, allocs, nil, "") r := reconciler.Compute() // Should place 1 - one is a new placement to make up the desired count of 5 @@ -1843,7 +2003,7 @@ func TestReconciler_CancelDeployment_JobStop(t *testing.T) { allocs = append(allocs, alloc) } - reconciler := NewAllocReconciler(testLogger(), allocUpdateFnIgnore, false, c.jobID, c.job, c.deployment, allocs, nil) + reconciler := NewAllocReconciler(testLogger(), allocUpdateFnIgnore, false, c.jobID, c.job, c.deployment, allocs, nil, "") r := reconciler.Compute() var updates []*structs.DeploymentStatusUpdate @@ -1920,7 +2080,7 @@ func TestReconciler_CancelDeployment_JobUpdate(t *testing.T) { allocs = append(allocs, alloc) } - reconciler := NewAllocReconciler(testLogger(), allocUpdateFnIgnore, false, job.ID, job, c.deployment, allocs, nil) + reconciler := NewAllocReconciler(testLogger(), allocUpdateFnIgnore, false, job.ID, job, c.deployment, allocs, nil, "") r := reconciler.Compute() var updates []*structs.DeploymentStatusUpdate @@ -1969,7 +2129,7 @@ func TestReconciler_CreateDeployment_RollingUpgrade_Destructive(t *testing.T) { allocs = append(allocs, alloc) } - reconciler := NewAllocReconciler(testLogger(), allocUpdateFnDestructive, false, job.ID, job, nil, allocs, nil) + reconciler := NewAllocReconciler(testLogger(), allocUpdateFnDestructive, false, job.ID, job, nil, allocs, nil, "") r := reconciler.Compute() d := structs.NewDeployment(job) @@ -2012,7 +2172,7 @@ func TestReconciler_CreateDeployment_RollingUpgrade_Inplace(t *testing.T) { allocs = append(allocs, alloc) } - reconciler := NewAllocReconciler(testLogger(), allocUpdateFnInplace, false, job.ID, job, nil, allocs, nil) + reconciler := NewAllocReconciler(testLogger(), allocUpdateFnInplace, false, job.ID, job, nil, allocs, nil, "") r := reconciler.Compute() d := structs.NewDeployment(job) @@ -2052,7 +2212,7 @@ func TestReconciler_DontCreateDeployment_NoChanges(t *testing.T) { allocs = append(allocs, alloc) } - reconciler := NewAllocReconciler(testLogger(), allocUpdateFnIgnore, false, job.ID, job, nil, allocs, nil) + reconciler := NewAllocReconciler(testLogger(), allocUpdateFnIgnore, false, job.ID, job, nil, allocs, nil, "") r := reconciler.Compute() // Assert the correct results @@ -2130,7 +2290,7 @@ func TestReconciler_PausedOrFailedDeployment_NoMoreCanaries(t *testing.T) { d.TaskGroups[canary.TaskGroup].PlacedCanaries = []string{canary.ID} mockUpdateFn := allocUpdateFnMock(map[string]allocUpdateType{canary.ID: allocUpdateFnIgnore}, allocUpdateFnDestructive) - reconciler := NewAllocReconciler(testLogger(), mockUpdateFn, false, job.ID, job, d, allocs, nil) + reconciler := NewAllocReconciler(testLogger(), mockUpdateFn, false, job.ID, job, d, allocs, nil, "") r := reconciler.Compute() // Assert the correct results @@ -2195,7 +2355,7 @@ func TestReconciler_PausedOrFailedDeployment_NoMorePlacements(t *testing.T) { allocs = append(allocs, alloc) } - reconciler := NewAllocReconciler(testLogger(), allocUpdateFnIgnore, false, job.ID, job, d, allocs, nil) + reconciler := NewAllocReconciler(testLogger(), allocUpdateFnIgnore, false, job.ID, job, d, allocs, nil, "") r := reconciler.Compute() // Assert the correct results @@ -2269,7 +2429,7 @@ func TestReconciler_PausedOrFailedDeployment_NoMoreDestructiveUpdates(t *testing allocs = append(allocs, newAlloc) mockUpdateFn := allocUpdateFnMock(map[string]allocUpdateType{newAlloc.ID: allocUpdateFnIgnore}, allocUpdateFnDestructive) - reconciler := NewAllocReconciler(testLogger(), mockUpdateFn, false, job.ID, job, d, allocs, nil) + reconciler := NewAllocReconciler(testLogger(), mockUpdateFn, false, job.ID, job, d, allocs, nil, "") r := reconciler.Compute() // Assert the correct results @@ -2357,7 +2517,7 @@ func TestReconciler_PausedOrFailedDeployment_Migrations(t *testing.T) { tainted[n.ID] = n } - reconciler := NewAllocReconciler(testLogger(), allocUpdateFnIgnore, false, job.ID, job, d, allocs, tainted) + reconciler := NewAllocReconciler(testLogger(), allocUpdateFnIgnore, false, job.ID, job, d, allocs, tainted, "") r := reconciler.Compute() // Assert the correct results @@ -2431,7 +2591,7 @@ func TestReconciler_DrainNode_Canary(t *testing.T) { tainted[n.ID] = n mockUpdateFn := allocUpdateFnMock(handled, allocUpdateFnDestructive) - reconciler := NewAllocReconciler(testLogger(), mockUpdateFn, false, job.ID, job, d, allocs, tainted) + reconciler := NewAllocReconciler(testLogger(), mockUpdateFn, false, job.ID, job, d, allocs, tainted, "") r := reconciler.Compute() // Assert the correct results @@ -2503,7 +2663,7 @@ func TestReconciler_LostNode_Canary(t *testing.T) { tainted[n.ID] = n mockUpdateFn := allocUpdateFnMock(handled, allocUpdateFnDestructive) - reconciler := NewAllocReconciler(testLogger(), mockUpdateFn, false, job.ID, job, d, allocs, tainted) + reconciler := NewAllocReconciler(testLogger(), mockUpdateFn, false, job.ID, job, d, allocs, tainted, "") r := reconciler.Compute() // Assert the correct results @@ -2569,7 +2729,7 @@ func TestReconciler_StopOldCanaries(t *testing.T) { allocs = append(allocs, canary) } - reconciler := NewAllocReconciler(testLogger(), allocUpdateFnDestructive, false, job.ID, job, d, allocs, nil) + reconciler := NewAllocReconciler(testLogger(), allocUpdateFnDestructive, false, job.ID, job, d, allocs, nil, "") r := reconciler.Compute() newD := structs.NewDeployment(job) @@ -2622,7 +2782,7 @@ func TestReconciler_NewCanaries(t *testing.T) { allocs = append(allocs, alloc) } - reconciler := NewAllocReconciler(testLogger(), allocUpdateFnDestructive, false, job.ID, job, nil, allocs, nil) + reconciler := NewAllocReconciler(testLogger(), allocUpdateFnDestructive, false, job.ID, job, nil, allocs, nil, "") r := reconciler.Compute() newD := structs.NewDeployment(job) @@ -2672,7 +2832,7 @@ func TestReconciler_NewCanaries_MultiTG(t *testing.T) { } } - reconciler := NewAllocReconciler(testLogger(), allocUpdateFnDestructive, false, job.ID, job, nil, allocs, nil) + reconciler := NewAllocReconciler(testLogger(), allocUpdateFnDestructive, false, job.ID, job, nil, allocs, nil, "") r := reconciler.Compute() newD := structs.NewDeployment(job) @@ -2725,7 +2885,7 @@ func TestReconciler_NewCanaries_ScaleUp(t *testing.T) { allocs = append(allocs, alloc) } - reconciler := NewAllocReconciler(testLogger(), allocUpdateFnDestructive, false, job.ID, job, nil, allocs, nil) + reconciler := NewAllocReconciler(testLogger(), allocUpdateFnDestructive, false, job.ID, job, nil, allocs, nil, "") r := reconciler.Compute() newD := structs.NewDeployment(job) @@ -2773,7 +2933,7 @@ func TestReconciler_NewCanaries_ScaleDown(t *testing.T) { allocs = append(allocs, alloc) } - reconciler := NewAllocReconciler(testLogger(), allocUpdateFnDestructive, false, job.ID, job, nil, allocs, nil) + reconciler := NewAllocReconciler(testLogger(), allocUpdateFnDestructive, false, job.ID, job, nil, allocs, nil, "") r := reconciler.Compute() newD := structs.NewDeployment(job) @@ -2850,7 +3010,7 @@ func TestReconciler_NewCanaries_FillNames(t *testing.T) { allocs = append(allocs, canary) } - reconciler := NewAllocReconciler(testLogger(), allocUpdateFnDestructive, false, job.ID, job, d, allocs, nil) + reconciler := NewAllocReconciler(testLogger(), allocUpdateFnDestructive, false, job.ID, job, d, allocs, nil, "") r := reconciler.Compute() // Assert the correct results @@ -2919,7 +3079,7 @@ func TestReconciler_PromoteCanaries_Unblock(t *testing.T) { } mockUpdateFn := allocUpdateFnMock(handled, allocUpdateFnDestructive) - reconciler := NewAllocReconciler(testLogger(), mockUpdateFn, false, job.ID, job, d, allocs, nil) + reconciler := NewAllocReconciler(testLogger(), mockUpdateFn, false, job.ID, job, d, allocs, nil, "") r := reconciler.Compute() // Assert the correct results @@ -2992,7 +3152,7 @@ func TestReconciler_PromoteCanaries_CanariesEqualCount(t *testing.T) { } mockUpdateFn := allocUpdateFnMock(handled, allocUpdateFnDestructive) - reconciler := NewAllocReconciler(testLogger(), mockUpdateFn, false, job.ID, job, d, allocs, nil) + reconciler := NewAllocReconciler(testLogger(), mockUpdateFn, false, job.ID, job, d, allocs, nil, "") r := reconciler.Compute() updates := []*structs.DeploymentStatusUpdate{ @@ -3091,7 +3251,7 @@ func TestReconciler_DeploymentLimit_HealthAccounting(t *testing.T) { } mockUpdateFn := allocUpdateFnMock(handled, allocUpdateFnDestructive) - reconciler := NewAllocReconciler(testLogger(), mockUpdateFn, false, job.ID, job, d, allocs, nil) + reconciler := NewAllocReconciler(testLogger(), mockUpdateFn, false, job.ID, job, d, allocs, nil, "") r := reconciler.Compute() // Assert the correct results @@ -3172,7 +3332,7 @@ func TestReconciler_TaintedNode_RollingUpgrade(t *testing.T) { } mockUpdateFn := allocUpdateFnMock(handled, allocUpdateFnDestructive) - reconciler := NewAllocReconciler(testLogger(), mockUpdateFn, false, job.ID, job, d, allocs, tainted) + reconciler := NewAllocReconciler(testLogger(), mockUpdateFn, false, job.ID, job, d, allocs, tainted, "") r := reconciler.Compute() // Assert the correct results @@ -3257,7 +3417,7 @@ func TestReconciler_FailedDeployment_PlacementLost(t *testing.T) { } mockUpdateFn := allocUpdateFnMock(handled, allocUpdateFnDestructive) - reconciler := NewAllocReconciler(testLogger(), mockUpdateFn, false, job.ID, job, d, allocs, tainted) + reconciler := NewAllocReconciler(testLogger(), mockUpdateFn, false, job.ID, job, d, allocs, tainted, "") r := reconciler.Compute() // Assert the correct results @@ -3312,7 +3472,7 @@ func TestReconciler_CompleteDeployment(t *testing.T) { allocs = append(allocs, alloc) } - reconciler := NewAllocReconciler(testLogger(), allocUpdateFnIgnore, false, job.ID, job, d, allocs, nil) + reconciler := NewAllocReconciler(testLogger(), allocUpdateFnIgnore, false, job.ID, job, d, allocs, nil, "") r := reconciler.Compute() // Assert the correct results @@ -3399,7 +3559,7 @@ func TestReconciler_FailedDeployment_CancelCanaries(t *testing.T) { } mockUpdateFn := allocUpdateFnMock(handled, allocUpdateFnDestructive) - reconciler := NewAllocReconciler(testLogger(), mockUpdateFn, false, job.ID, job, d, allocs, nil) + reconciler := NewAllocReconciler(testLogger(), mockUpdateFn, false, job.ID, job, d, allocs, nil, "") r := reconciler.Compute() // Assert the correct results @@ -3468,7 +3628,7 @@ func TestReconciler_FailedDeployment_NewJob(t *testing.T) { jobNew := job.Copy() jobNew.Version += 100 - reconciler := NewAllocReconciler(testLogger(), allocUpdateFnDestructive, false, job.ID, jobNew, d, allocs, nil) + reconciler := NewAllocReconciler(testLogger(), allocUpdateFnDestructive, false, job.ID, jobNew, d, allocs, nil, "") r := reconciler.Compute() dnew := structs.NewDeployment(jobNew) @@ -3521,7 +3681,7 @@ func TestReconciler_MarkDeploymentComplete(t *testing.T) { allocs = append(allocs, alloc) } - reconciler := NewAllocReconciler(testLogger(), allocUpdateFnIgnore, false, job.ID, job, d, allocs, nil) + reconciler := NewAllocReconciler(testLogger(), allocUpdateFnIgnore, false, job.ID, job, d, allocs, nil, "") r := reconciler.Compute() updates := []*structs.DeploymentStatusUpdate{ @@ -3590,7 +3750,7 @@ func TestReconciler_JobChange_ScaleUp_SecondEval(t *testing.T) { } mockUpdateFn := allocUpdateFnMock(handled, allocUpdateFnDestructive) - reconciler := NewAllocReconciler(testLogger(), mockUpdateFn, false, job.ID, job, d, allocs, nil) + reconciler := NewAllocReconciler(testLogger(), mockUpdateFn, false, job.ID, job, d, allocs, nil, "") r := reconciler.Compute() // Assert the correct results @@ -3625,7 +3785,7 @@ func TestReconciler_RollingUpgrade_MissingAllocs(t *testing.T) { allocs = append(allocs, alloc) } - reconciler := NewAllocReconciler(testLogger(), allocUpdateFnDestructive, false, job.ID, job, nil, allocs, nil) + reconciler := NewAllocReconciler(testLogger(), allocUpdateFnDestructive, false, job.ID, job, nil, allocs, nil, "") r := reconciler.Compute() d := structs.NewDeployment(job) @@ -3677,7 +3837,7 @@ func TestReconciler_Batch_Rerun(t *testing.T) { job2 := job.Copy() job2.CreateIndex++ - reconciler := NewAllocReconciler(testLogger(), allocUpdateFnIgnore, true, job2.ID, job2, nil, allocs, nil) + reconciler := NewAllocReconciler(testLogger(), allocUpdateFnIgnore, true, job2.ID, job2, nil, allocs, nil, "") r := reconciler.Compute() // Assert the correct results @@ -3737,7 +3897,7 @@ func TestReconciler_FailedDeployment_DontReschedule(t *testing.T) { StartedAt: now.Add(-1 * time.Hour), FinishedAt: now.Add(-10 * time.Second)}} - reconciler := NewAllocReconciler(testLogger(), allocUpdateFnDestructive, false, job.ID, job, d, allocs, nil) + reconciler := NewAllocReconciler(testLogger(), allocUpdateFnDestructive, false, job.ID, job, d, allocs, nil, "") r := reconciler.Compute() // Assert that no rescheduled placements were created @@ -3793,7 +3953,7 @@ func TestReconciler_DeploymentWithFailedAllocs_DontReschedule(t *testing.T) { StartedAt: now.Add(-1 * time.Hour), FinishedAt: now.Add(-10 * time.Second)}} - reconciler := NewAllocReconciler(testLogger(), allocUpdateFnDestructive, false, job.ID, job, d, allocs, nil) + reconciler := NewAllocReconciler(testLogger(), allocUpdateFnDestructive, false, job.ID, job, d, allocs, nil, "") r := reconciler.Compute() // Assert that no rescheduled placements were created @@ -3875,7 +4035,7 @@ func TestReconciler_FailedDeployment_AutoRevert_CancelCanaries(t *testing.T) { allocs = append(allocs, new) } - reconciler := NewAllocReconciler(testLogger(), allocUpdateFnIgnore, false, job.ID, jobv2, d, allocs, nil) + reconciler := NewAllocReconciler(testLogger(), allocUpdateFnIgnore, false, job.ID, jobv2, d, allocs, nil, "") r := reconciler.Compute() updates := []*structs.DeploymentStatusUpdate{ diff --git a/scheduler/reconcile_util.go b/scheduler/reconcile_util.go index a71adea23..434425f21 100644 --- a/scheduler/reconcile_util.go +++ b/scheduler/reconcile_util.go @@ -234,11 +234,10 @@ func (a allocSet) filterByTainted(nodes map[string]*structs.Node) (untainted, mi // untainted or a set of allocations that must be rescheduled now. Allocations that can be rescheduled // at a future time are also returned so that we can create follow up evaluations for them. Allocs are // skipped or considered untainted according to logic defined in shouldFilter method. -func (a allocSet) filterByRescheduleable(isBatch bool) (untainted, rescheduleNow allocSet, rescheduleLater []*delayedRescheduleInfo) { +func (a allocSet) filterByRescheduleable(isBatch bool, now time.Time, evalID string) (untainted, rescheduleNow allocSet, rescheduleLater []*delayedRescheduleInfo) { untainted = make(map[string]*structs.Allocation) rescheduleNow = make(map[string]*structs.Allocation) - now := time.Now() for _, alloc := range a { var eligibleNow, eligibleLater bool var rescheduleTime time.Time @@ -258,7 +257,7 @@ func (a allocSet) filterByRescheduleable(isBatch bool) (untainted, rescheduleNow // Only failed allocs with desired state run get to this point // If the failed alloc is not eligible for rescheduling now we add it to the untainted set - eligibleNow, eligibleLater, rescheduleTime = updateByReschedulable(alloc, now) + eligibleNow, eligibleLater, rescheduleTime = updateByReschedulable(alloc, now, evalID) if !eligibleNow { untainted[alloc.ID] = alloc if eligibleLater { @@ -321,9 +320,10 @@ func shouldFilter(alloc *structs.Allocation, isBatch bool) (untainted, ignore bo // updateByReschedulable is a helper method that encapsulates logic for whether a failed allocation // should be rescheduled now, later or left in the untainted set -func updateByReschedulable(alloc *structs.Allocation, now time.Time) (rescheduleNow, rescheduleLater bool, rescheduleTime time.Time) { +func updateByReschedulable(alloc *structs.Allocation, now time.Time, evalID string) (rescheduleNow, rescheduleLater bool, rescheduleTime time.Time) { rescheduleTime, eligible := alloc.NextRescheduleTime() - if eligible && now.After(rescheduleTime) { + // Reschedule if the eval ID matches the alloc's followup evalID or if its close to its reschedule time + if eligible && ((alloc.FollowupEvalID != "" && alloc.FollowupEvalID == evalID) || rescheduleTime.Sub(now) <= rescheduleTimeLapseWindowSize) { rescheduleNow = true return }