From 5373ade731c6e0f505e0cb53572f5d4454e50ee5 Mon Sep 17 00:00:00 2001 From: Preetha Appan Date: Thu, 1 Mar 2018 18:23:44 -0600 Subject: [PATCH] Scheduler and Reconciler changes to support delayed rescheduling --- scheduler/generic_sched.go | 58 +++- scheduler/generic_sched_test.go | 492 +++++++++++++++++++++++++++++++- scheduler/reconcile.go | 117 +++++++- scheduler/reconcile_test.go | 310 ++++++++++++++++++-- scheduler/reconcile_util.go | 57 ++-- 5 files changed, 983 insertions(+), 51 deletions(-) diff --git a/scheduler/generic_sched.go b/scheduler/generic_sched.go index 5830c5d11..994c782c6 100644 --- a/scheduler/generic_sched.go +++ b/scheduler/generic_sched.go @@ -72,8 +72,10 @@ type GenericScheduler struct { ctx *EvalContext stack *GenericStack + // Deprecated, was used in pre Nomad 0.7 rolling update stanza followupEvalWait time.Duration nextEval *structs.Evaluation + followUpEvals []*structs.Evaluation deployment *structs.Deployment @@ -261,6 +263,19 @@ func (s *GenericScheduler) process() (bool, error) { s.logger.Printf("[DEBUG] sched: %#v: rolling migration limit reached, next eval '%s' created", s.eval, s.nextEval.ID) } + // Create follow up evals for any delayed reschedule eligible allocations + if len(s.followUpEvals) > 0 { + for _, eval := range s.followUpEvals { + eval.PreviousEval = s.eval.ID + // TODO(preetha) this should be batching evals before inserting them + if err := s.planner.CreateEval(eval); err != nil { + s.logger.Printf("[ERR] sched: %#v failed to make next eval for rescheduling: %v", s.eval, err) + return false, err + } + s.logger.Printf("[DEBUG] sched: %#v: found reschedulable allocs, next eval '%s' created", s.eval, eval.ID) + } + } + // Submit the plan and store the results. result, newState, err := s.planner.SubmitPlan(s.plan) s.planResult = result @@ -336,6 +351,12 @@ func (s *GenericScheduler) computeJobAllocs() error { // follow up eval to handle node draining. s.followupEvalWait = results.followupEvalWait + // Store all the follow up evaluations from rescheduled allocations + if len(results.desiredFollowupEvals) > 0 { + for _, evals := range results.desiredFollowupEvals { + s.followUpEvals = append(s.followUpEvals, evals...) + } + } // Update the stored deployment if results.deployment != nil { s.deployment = results.deployment @@ -467,7 +488,7 @@ func (s *GenericScheduler) computePlacements(destructive, place []placementResul if prevAllocation != nil { alloc.PreviousAllocation = prevAllocation.ID if missing.IsRescheduling() { - updateRescheduleTracker(alloc, prevAllocation) + updateRescheduleTracker(alloc, prevAllocation, tg.ReschedulePolicy, time.Now()) } } @@ -523,15 +544,42 @@ func getSelectOptions(prevAllocation *structs.Allocation, preferredNode *structs return selectOptions } +const max_past_reschedule_events = 5 + // updateRescheduleTracker carries over previous restart attempts and adds the most recent restart -func updateRescheduleTracker(alloc *structs.Allocation, prev *structs.Allocation) { +func updateRescheduleTracker(alloc *structs.Allocation, prev *structs.Allocation, reschedPolicy *structs.ReschedulePolicy, now time.Time) { var rescheduleEvents []*structs.RescheduleEvent if prev.RescheduleTracker != nil { - for _, reschedEvent := range prev.RescheduleTracker.Events { - rescheduleEvents = append(rescheduleEvents, reschedEvent.Copy()) + var interval time.Duration + if reschedPolicy != nil { + interval = reschedPolicy.Interval + } + // if attempts is set copy all events in the interval range + if reschedPolicy.Attempts > 0 { + for _, reschedEvent := range prev.RescheduleTracker.Events { + timeDiff := time.Now().UTC().UnixNano() - reschedEvent.RescheduleTime + // Only copy over events that are within restart interval + // This keeps the list of events small in cases where there's a long chain of old restart events + if interval > 0 && timeDiff <= interval.Nanoseconds() { + rescheduleEvents = append(rescheduleEvents, reschedEvent.Copy()) + } + } + } else { + // for unlimited restarts, only copy the last n + copied := 0 + for i := len(prev.RescheduleTracker.Events) - 1; i >= 0 && copied < max_past_reschedule_events; i-- { + reschedEvent := prev.RescheduleTracker.Events[i] + rescheduleEvents = append(rescheduleEvents, reschedEvent.Copy()) + copied++ + } + // reverse it to get the correct order + for left, right := 0, len(rescheduleEvents)-1; left < right; left, right = left+1, right-1 { + rescheduleEvents[left], rescheduleEvents[right] = rescheduleEvents[right], rescheduleEvents[left] + } } } - rescheduleEvent := structs.NewRescheduleEvent(time.Now().UTC().UnixNano(), prev.ID, prev.NodeID) + nextDelay := prev.NextDelay(reschedPolicy) + rescheduleEvent := structs.NewRescheduleEvent(now.UnixNano(), prev.ID, prev.NodeID, nextDelay) rescheduleEvents = append(rescheduleEvents, rescheduleEvent) alloc.RescheduleTracker = &structs.RescheduleTracker{Events: rescheduleEvents} } diff --git a/scheduler/generic_sched_test.go b/scheduler/generic_sched_test.go index 1443a2314..ee7a775bc 100644 --- a/scheduler/generic_sched_test.go +++ b/scheduler/generic_sched_test.go @@ -2715,7 +2715,7 @@ func TestServiceSched_RetryLimit(t *testing.T) { h.AssertEvalStatus(t, structs.EvalStatusFailed) } -func TestServiceSched_Reschedule_Once(t *testing.T) { +func TestServiceSched_Reschedule_OnceNow(t *testing.T) { h := NewHarness(t) // Create some nodes @@ -2730,9 +2730,15 @@ func TestServiceSched_Reschedule_Once(t *testing.T) { job := mock.Job() job.TaskGroups[0].Count = 2 job.TaskGroups[0].ReschedulePolicy = &structs.ReschedulePolicy{ - Attempts: 1, - Interval: 15 * time.Minute, + Attempts: 1, + Interval: 15 * time.Minute, + Delay: 5 * time.Second, + DelayCeiling: 1 * time.Minute, + DelayFunction: "linear", } + tgName := job.TaskGroups[0].Name + now := time.Now() + noErr(t, h.State.UpsertJob(h.NextIndex(), job)) var allocs []*structs.Allocation @@ -2746,6 +2752,9 @@ func TestServiceSched_Reschedule_Once(t *testing.T) { } // Mark one of the allocations as failed allocs[1].ClientStatus = structs.AllocClientStatusFailed + allocs[1].TaskStates = map[string]*structs.TaskState{tgName: {State: "dead", + StartedAt: now.Add(-1 * time.Hour), + FinishedAt: now.Add(-10 * time.Second)}} failedAllocID := allocs[1].ID successAllocID := allocs[0].ID @@ -2817,7 +2826,96 @@ func TestServiceSched_Reschedule_Once(t *testing.T) { } -func TestServiceSched_Reschedule_Multiple(t *testing.T) { +// Tests that alloc reschedulable at a future time creates a follow up eval +func TestServiceSched_Reschedule_Later(t *testing.T) { + h := NewHarness(t) + require := require.New(t) + // Create some nodes + var nodes []*structs.Node + for i := 0; i < 10; i++ { + node := mock.Node() + nodes = append(nodes, node) + noErr(t, h.State.UpsertNode(h.NextIndex(), node)) + } + + // Generate a fake job with allocations and an update policy. + job := mock.Job() + job.TaskGroups[0].Count = 2 + delayDuration := 15 * time.Second + job.TaskGroups[0].ReschedulePolicy = &structs.ReschedulePolicy{ + Attempts: 1, + Interval: 15 * time.Minute, + Delay: delayDuration, + DelayCeiling: 1 * time.Minute, + DelayFunction: "linear", + } + tgName := job.TaskGroups[0].Name + now := time.Now() + + noErr(t, h.State.UpsertJob(h.NextIndex(), job)) + + var allocs []*structs.Allocation + for i := 0; i < 2; i++ { + alloc := mock.Alloc() + alloc.Job = job + alloc.JobID = job.ID + alloc.NodeID = nodes[i].ID + alloc.Name = fmt.Sprintf("my-job.web[%d]", i) + allocs = append(allocs, alloc) + } + // Mark one of the allocations as failed + allocs[1].ClientStatus = structs.AllocClientStatusFailed + allocs[1].TaskStates = map[string]*structs.TaskState{tgName: {State: "dead", + StartedAt: now.Add(-1 * time.Hour), + FinishedAt: now}} + failedAllocID := allocs[1].ID + + noErr(t, h.State.UpsertAllocs(h.NextIndex(), allocs)) + + // Create a mock evaluation + eval := &structs.Evaluation{ + Namespace: structs.DefaultNamespace, + ID: uuid.Generate(), + Priority: 50, + TriggeredBy: structs.EvalTriggerNodeUpdate, + JobID: job.ID, + Status: structs.EvalStatusPending, + } + noErr(t, h.State.UpsertEvals(h.NextIndex(), []*structs.Evaluation{eval})) + + // Process the evaluation + err := h.Process(NewServiceScheduler, eval) + if err != nil { + t.Fatalf("err: %v", err) + } + + // Ensure multiple plans + if len(h.Plans) == 0 { + t.Fatalf("bad: %#v", h.Plans) + } + + // Lookup the allocations by JobID + ws := memdb.NewWatchSet() + out, err := h.State.AllocsByJob(ws, job.Namespace, job.ID, false) + noErr(t, err) + + // Verify no new allocs were created + require.Equal(2, len(out)) + + // Verify follow up eval was created for the failed alloc + alloc, err := h.State.AllocByID(ws, failedAllocID) + require.Nil(err) + require.NotEmpty(alloc.FollowupEvalID) + + // Ensure there is a follow up eval. + if len(h.CreateEvals) != 1 || h.CreateEvals[0].Status != structs.EvalStatusPending { + t.Fatalf("bad: %#v", h.CreateEvals) + } + followupEval := h.CreateEvals[0] + require.Equal(now.Add(delayDuration), followupEval.WaitUntil) +} + +func TestServiceSched_Reschedule_MultipleNow(t *testing.T) { h := NewHarness(t) // Create some nodes @@ -2833,9 +2931,14 @@ func TestServiceSched_Reschedule_Multiple(t *testing.T) { job := mock.Job() job.TaskGroups[0].Count = 2 job.TaskGroups[0].ReschedulePolicy = &structs.ReschedulePolicy{ - Attempts: maxRestartAttempts, - Interval: 30 * time.Minute, + Attempts: maxRestartAttempts, + Interval: 30 * time.Minute, + Delay: 5 * time.Second, + DelayFunction: "linear", } + tgName := job.TaskGroups[0].Name + now := time.Now() + noErr(t, h.State.UpsertJob(h.NextIndex(), job)) var allocs []*structs.Allocation @@ -2850,6 +2953,9 @@ func TestServiceSched_Reschedule_Multiple(t *testing.T) { } // Mark one of the allocations as failed allocs[1].ClientStatus = structs.AllocClientStatusFailed + allocs[1].TaskStates = map[string]*structs.TaskState{tgName: {State: "dead", + StartedAt: now.Add(-1 * time.Hour), + FinishedAt: now.Add(-10 * time.Second)}} noErr(t, h.State.UpsertAllocs(h.NextIndex(), allocs)) @@ -2915,6 +3021,9 @@ func TestServiceSched_Reschedule_Multiple(t *testing.T) { // Mark this alloc as failed again newAlloc.ClientStatus = structs.AllocClientStatusFailed + newAlloc.TaskStates = map[string]*structs.TaskState{tgName: {State: "dead", + StartedAt: now.Add(-12 * time.Second), + FinishedAt: now.Add(-10 * time.Second)}} failedAllocId = newAlloc.ID failedNodeID = newAlloc.NodeID @@ -2946,6 +3055,136 @@ func TestServiceSched_Reschedule_Multiple(t *testing.T) { assert.Equal(5, len(out)) // 2 original, plus 3 reschedule attempts } +// Tests that old reschedule attempts are pruned +func TestServiceSched_Reschedule_PruneEvents(t *testing.T) { + h := NewHarness(t) + + // Create some nodes + var nodes []*structs.Node + for i := 0; i < 10; i++ { + node := mock.Node() + nodes = append(nodes, node) + noErr(t, h.State.UpsertNode(h.NextIndex(), node)) + } + + // Generate a fake job with allocations and an update policy. + job := mock.Job() + job.TaskGroups[0].Count = 2 + job.TaskGroups[0].ReschedulePolicy = &structs.ReschedulePolicy{ + DelayFunction: "exponential", + DelayCeiling: 1 * time.Hour, + Delay: 5 * time.Second, + Unlimited: true, + } + noErr(t, h.State.UpsertJob(h.NextIndex(), job)) + + var allocs []*structs.Allocation + for i := 0; i < 2; i++ { + alloc := mock.Alloc() + alloc.Job = job + alloc.JobID = job.ID + alloc.NodeID = nodes[i].ID + alloc.Name = fmt.Sprintf("my-job.web[%d]", i) + allocs = append(allocs, alloc) + } + now := time.Now() + // Mark allocations as failed with restart info + allocs[1].TaskStates = map[string]*structs.TaskState{job.TaskGroups[0].Name: {State: "dead", + StartedAt: now.Add(-1 * time.Hour), + FinishedAt: now.Add(-15 * time.Minute)}} + allocs[1].ClientStatus = structs.AllocClientStatusFailed + + allocs[1].RescheduleTracker = &structs.RescheduleTracker{ + Events: []*structs.RescheduleEvent{ + {RescheduleTime: now.Add(-1 * time.Hour).UTC().UnixNano(), + PrevAllocID: uuid.Generate(), + PrevNodeID: uuid.Generate(), + Delay: 5 * time.Second, + }, + {RescheduleTime: now.Add(-40 * time.Minute).UTC().UnixNano(), + PrevAllocID: allocs[0].ID, + PrevNodeID: uuid.Generate(), + Delay: 10 * time.Second, + }, + {RescheduleTime: now.Add(-30 * time.Minute).UTC().UnixNano(), + PrevAllocID: allocs[0].ID, + PrevNodeID: uuid.Generate(), + Delay: 20 * time.Second, + }, + {RescheduleTime: now.Add(-20 * time.Minute).UTC().UnixNano(), + PrevAllocID: allocs[0].ID, + PrevNodeID: uuid.Generate(), + Delay: 40 * time.Second, + }, + {RescheduleTime: now.Add(-10 * time.Minute).UTC().UnixNano(), + PrevAllocID: allocs[0].ID, + PrevNodeID: uuid.Generate(), + Delay: 80 * time.Second, + }, + {RescheduleTime: now.Add(-3 * time.Minute).UTC().UnixNano(), + PrevAllocID: allocs[0].ID, + PrevNodeID: uuid.Generate(), + Delay: 160 * time.Second, + }, + }, + } + expectedFirstRescheduleEvent := allocs[1].RescheduleTracker.Events[1] + expectedDelay := 320 * time.Second + failedAllocID := allocs[1].ID + successAllocID := allocs[0].ID + + noErr(t, h.State.UpsertAllocs(h.NextIndex(), allocs)) + + // Create a mock evaluation + eval := &structs.Evaluation{ + Namespace: structs.DefaultNamespace, + ID: uuid.Generate(), + Priority: 50, + TriggeredBy: structs.EvalTriggerNodeUpdate, + JobID: job.ID, + Status: structs.EvalStatusPending, + } + noErr(t, h.State.UpsertEvals(h.NextIndex(), []*structs.Evaluation{eval})) + + // Process the evaluation + err := h.Process(NewServiceScheduler, eval) + if err != nil { + t.Fatalf("err: %v", err) + } + + // Ensure multiple plans + if len(h.Plans) == 0 { + t.Fatalf("bad: %#v", h.Plans) + } + + // Lookup the allocations by JobID + ws := memdb.NewWatchSet() + out, err := h.State.AllocsByJob(ws, job.Namespace, job.ID, false) + noErr(t, err) + + // Verify that one new allocation got created with its restart tracker info + assert := assert.New(t) + assert.Equal(3, len(out)) + var newAlloc *structs.Allocation + for _, alloc := range out { + if alloc.ID != successAllocID && alloc.ID != failedAllocID { + newAlloc = alloc + } + } + + assert.Equal(failedAllocID, newAlloc.PreviousAllocation) + // Verify that the new alloc copied the last 5 reschedule attempts + assert.Equal(6, len(newAlloc.RescheduleTracker.Events)) + assert.Equal(expectedFirstRescheduleEvent, newAlloc.RescheduleTracker.Events[0]) + + mostRecentRescheduleEvent := newAlloc.RescheduleTracker.Events[5] + // Verify that the failed alloc ID is in the most recent reschedule event + assert.Equal(failedAllocID, mostRecentRescheduleEvent.PrevAllocID) + // Verify that the delay value was captured correctly + assert.Equal(expectedDelay, mostRecentRescheduleEvent.Delay) + +} + // Tests that deployments with failed allocs don't result in placements func TestDeployment_FailedAllocs_NoReschedule(t *testing.T) { h := NewHarness(t) @@ -3079,6 +3318,9 @@ func TestBatchSched_Run_FailedAlloc(t *testing.T) { job.TaskGroups[0].Count = 1 noErr(t, h.State.UpsertJob(h.NextIndex(), job)) + tgName := job.TaskGroups[0].Name + now := time.Now() + // Create a failed alloc alloc := mock.Alloc() alloc.Job = job @@ -3086,6 +3328,9 @@ func TestBatchSched_Run_FailedAlloc(t *testing.T) { alloc.NodeID = node.ID alloc.Name = "my-job.web[0]" alloc.ClientStatus = structs.AllocClientStatusFailed + alloc.TaskStates = map[string]*structs.TaskState{tgName: {State: "dead", + StartedAt: now.Add(-1 * time.Hour), + FinishedAt: now.Add(-10 * time.Second)}} noErr(t, h.State.UpsertAllocs(h.NextIndex(), []*structs.Allocation{alloc})) // Create a mock evaluation to register the job @@ -3231,6 +3476,9 @@ func TestBatchSched_Run_FailedAllocQueuedAllocations(t *testing.T) { job.TaskGroups[0].Count = 1 noErr(t, h.State.UpsertJob(h.NextIndex(), job)) + tgName := job.TaskGroups[0].Name + now := time.Now() + // Create a failed alloc alloc := mock.Alloc() alloc.Job = job @@ -3238,6 +3486,9 @@ func TestBatchSched_Run_FailedAllocQueuedAllocations(t *testing.T) { alloc.NodeID = node.ID alloc.Name = "my-job.web[0]" alloc.ClientStatus = structs.AllocClientStatusFailed + alloc.TaskStates = map[string]*structs.TaskState{tgName: {State: "dead", + StartedAt: now.Add(-1 * time.Hour), + FinishedAt: now.Add(-10 * time.Second)}} noErr(t, h.State.UpsertAllocs(h.NextIndex(), []*structs.Allocation{alloc})) // Create a mock evaluation to register the job @@ -3963,3 +4214,232 @@ func TestServiceSched_CancelDeployment_NewerJob(t *testing.T) { h.AssertEvalStatus(t, structs.EvalStatusComplete) } + +// Various table driven tests for carry forward +// of past reschedule events +func Test_updateRescheduleTracker(t *testing.T) { + + t1 := time.Now().UTC() + alloc := mock.Alloc() + prevAlloc := mock.Alloc() + + type testCase struct { + desc string + prevAllocEvents []*structs.RescheduleEvent + reschedPolicy *structs.ReschedulePolicy + expectedRescheduleEvents []*structs.RescheduleEvent + reschedTime time.Time + } + + testCases := []testCase{ + { + desc: "No past events", + prevAllocEvents: nil, + reschedPolicy: &structs.ReschedulePolicy{Unlimited: false, Interval: 24 * time.Hour, Attempts: 2, Delay: 5 * time.Second}, + reschedTime: t1, + expectedRescheduleEvents: []*structs.RescheduleEvent{{t1.UnixNano(), prevAlloc.ID, prevAlloc.NodeID, 5 * time.Second}}, + }, + { + desc: "one past event, linear delay", + prevAllocEvents: []*structs.RescheduleEvent{ + {RescheduleTime: t1.Add(-1 * time.Hour).UnixNano(), + PrevAllocID: prevAlloc.ID, + PrevNodeID: prevAlloc.NodeID, + Delay: 5 * time.Second}}, + reschedPolicy: &structs.ReschedulePolicy{Unlimited: false, Interval: 24 * time.Hour, Attempts: 2, Delay: 5 * time.Second}, + reschedTime: t1, + expectedRescheduleEvents: []*structs.RescheduleEvent{ + { + RescheduleTime: t1.Add(-1 * time.Hour).UnixNano(), + PrevAllocID: prevAlloc.ID, + PrevNodeID: prevAlloc.NodeID, + Delay: 5 * time.Second, + }, + { + RescheduleTime: t1.UnixNano(), + PrevAllocID: prevAlloc.ID, + PrevNodeID: prevAlloc.NodeID, + Delay: 5 * time.Second, + }, + }, + }, + { + desc: "one past event, fibonacci delay", + prevAllocEvents: []*structs.RescheduleEvent{ + {RescheduleTime: t1.Add(-1 * time.Hour).UnixNano(), + PrevAllocID: prevAlloc.ID, + PrevNodeID: prevAlloc.NodeID, + Delay: 5 * time.Second}}, + reschedPolicy: &structs.ReschedulePolicy{Unlimited: false, Interval: 24 * time.Hour, Attempts: 2, Delay: 5 * time.Second, DelayFunction: "fibonacci", DelayCeiling: 60 * time.Second}, + reschedTime: t1, + expectedRescheduleEvents: []*structs.RescheduleEvent{ + { + RescheduleTime: t1.Add(-1 * time.Hour).UnixNano(), + PrevAllocID: prevAlloc.ID, + PrevNodeID: prevAlloc.NodeID, + Delay: 5 * time.Second, + }, + { + RescheduleTime: t1.UnixNano(), + PrevAllocID: prevAlloc.ID, + PrevNodeID: prevAlloc.NodeID, + Delay: 5 * time.Second, + }, + }, + }, + { + desc: "eight past events, fibonacci delay, unlimited", + prevAllocEvents: []*structs.RescheduleEvent{ + { + RescheduleTime: t1.Add(-1 * time.Hour).UnixNano(), + PrevAllocID: prevAlloc.ID, + PrevNodeID: prevAlloc.NodeID, + Delay: 5 * time.Second, + }, + { + RescheduleTime: t1.Add(-1 * time.Hour).UnixNano(), + PrevAllocID: prevAlloc.ID, + PrevNodeID: prevAlloc.NodeID, + Delay: 5 * time.Second, + }, + { + RescheduleTime: t1.Add(-1 * time.Hour).UnixNano(), + PrevAllocID: prevAlloc.ID, + PrevNodeID: prevAlloc.NodeID, + Delay: 10 * time.Second, + }, + { + RescheduleTime: t1.Add(-1 * time.Hour).UnixNano(), + PrevAllocID: prevAlloc.ID, + PrevNodeID: prevAlloc.NodeID, + Delay: 15 * time.Second, + }, + { + RescheduleTime: t1.Add(-1 * time.Hour).UnixNano(), + PrevAllocID: prevAlloc.ID, + PrevNodeID: prevAlloc.NodeID, + Delay: 25 * time.Second, + }, + { + RescheduleTime: t1.Add(-1 * time.Hour).UnixNano(), + PrevAllocID: prevAlloc.ID, + PrevNodeID: prevAlloc.NodeID, + Delay: 40 * time.Second, + }, + { + RescheduleTime: t1.Add(-1 * time.Hour).UnixNano(), + PrevAllocID: prevAlloc.ID, + PrevNodeID: prevAlloc.NodeID, + Delay: 65 * time.Second, + }, + { + RescheduleTime: t1.Add(-1 * time.Hour).UnixNano(), + PrevAllocID: prevAlloc.ID, + PrevNodeID: prevAlloc.NodeID, + Delay: 105 * time.Second, + }, + }, + reschedPolicy: &structs.ReschedulePolicy{Unlimited: true, Delay: 5 * time.Second, DelayFunction: "fibonacci", DelayCeiling: 240 * time.Second}, + reschedTime: t1, + expectedRescheduleEvents: []*structs.RescheduleEvent{ + { + RescheduleTime: t1.Add(-1 * time.Hour).UnixNano(), + PrevAllocID: prevAlloc.ID, + PrevNodeID: prevAlloc.NodeID, + Delay: 15 * time.Second, + }, + { + RescheduleTime: t1.Add(-1 * time.Hour).UnixNano(), + PrevAllocID: prevAlloc.ID, + PrevNodeID: prevAlloc.NodeID, + Delay: 25 * time.Second, + }, + { + RescheduleTime: t1.Add(-1 * time.Hour).UnixNano(), + PrevAllocID: prevAlloc.ID, + PrevNodeID: prevAlloc.NodeID, + Delay: 40 * time.Second, + }, + { + RescheduleTime: t1.Add(-1 * time.Hour).UnixNano(), + PrevAllocID: prevAlloc.ID, + PrevNodeID: prevAlloc.NodeID, + Delay: 65 * time.Second, + }, + { + RescheduleTime: t1.Add(-1 * time.Hour).UnixNano(), + PrevAllocID: prevAlloc.ID, + PrevNodeID: prevAlloc.NodeID, + Delay: 105 * time.Second, + }, + { + RescheduleTime: t1.UnixNano(), + PrevAllocID: prevAlloc.ID, + PrevNodeID: prevAlloc.NodeID, + Delay: 170 * time.Second, + }, + }, + }, + { + desc: " old attempts past interval, exponential delay, limited", + prevAllocEvents: []*structs.RescheduleEvent{ + { + RescheduleTime: t1.Add(-2 * time.Hour).UnixNano(), + PrevAllocID: prevAlloc.ID, + PrevNodeID: prevAlloc.NodeID, + Delay: 5 * time.Second, + }, + { + RescheduleTime: t1.Add(-1 * time.Hour).UnixNano(), + PrevAllocID: prevAlloc.ID, + PrevNodeID: prevAlloc.NodeID, + Delay: 10 * time.Second, + }, + { + RescheduleTime: t1.Add(-30 * time.Minute).UnixNano(), + PrevAllocID: prevAlloc.ID, + PrevNodeID: prevAlloc.NodeID, + Delay: 20 * time.Second, + }, + { + RescheduleTime: t1.Add(-10 * time.Minute).UnixNano(), + PrevAllocID: prevAlloc.ID, + PrevNodeID: prevAlloc.NodeID, + Delay: 40 * time.Second, + }, + }, + reschedPolicy: &structs.ReschedulePolicy{Unlimited: false, Interval: 1 * time.Hour, Attempts: 5, Delay: 5 * time.Second, DelayFunction: "exponential", DelayCeiling: 240 * time.Second}, + reschedTime: t1, + expectedRescheduleEvents: []*structs.RescheduleEvent{ + { + RescheduleTime: t1.Add(-30 * time.Minute).UnixNano(), + PrevAllocID: prevAlloc.ID, + PrevNodeID: prevAlloc.NodeID, + Delay: 20 * time.Second, + }, + { + RescheduleTime: t1.Add(-10 * time.Minute).UnixNano(), + PrevAllocID: prevAlloc.ID, + PrevNodeID: prevAlloc.NodeID, + Delay: 40 * time.Second, + }, + { + RescheduleTime: t1.UnixNano(), + PrevAllocID: prevAlloc.ID, + PrevNodeID: prevAlloc.NodeID, + Delay: 80 * time.Second, + }, + }, + }, + } + + for _, tc := range testCases { + t.Run(tc.desc, func(t *testing.T) { + require := require.New(t) + prevAlloc.RescheduleTracker = &structs.RescheduleTracker{Events: tc.prevAllocEvents} + updateRescheduleTracker(alloc, prevAlloc, tc.reschedPolicy, tc.reschedTime) + require.Equal(tc.expectedRescheduleEvents, alloc.RescheduleTracker.Events) + }) + } + +} diff --git a/scheduler/reconcile.go b/scheduler/reconcile.go index 35eebdcb1..5eedc91be 100644 --- a/scheduler/reconcile.go +++ b/scheduler/reconcile.go @@ -5,7 +5,10 @@ import ( "log" "time" + "sort" + "github.com/hashicorp/nomad/helper" + "github.com/hashicorp/nomad/helper/uuid" "github.com/hashicorp/nomad/nomad/structs" ) @@ -92,7 +95,23 @@ type reconcileResults struct { // followupEvalWait is set if there should be a followup eval run after the // given duration + // Deprecated, the delay strategy that sets this is not available after nomad 0.7.0 followupEvalWait time.Duration + + // desiredFollowupEvals is the map of follow up evaluations to create per task group + // This is used to create a delayed evaluation for rescheduling failed allocations. + desiredFollowupEvals map[string][]*structs.Evaluation +} + +// delayedRescheduleInfo contains the allocation id and a time when its eligible to be rescheduled. +// this is used to create follow up evaluations +type delayedRescheduleInfo struct { + + // allocID is the ID of the allocation eligible to be rescheduled + allocID string + + // rescheduleTime is the time to use in the delayed evaluation + rescheduleTime time.Time } func (r *reconcileResults) GoString() string { @@ -136,7 +155,8 @@ func NewAllocReconciler(logger *log.Logger, allocUpdateFn allocUpdateType, batch existingAllocs: existingAllocs, taintedNodes: taintedNodes, result: &reconcileResults{ - desiredTGUpdates: make(map[string]*structs.DesiredUpdates), + desiredTGUpdates: make(map[string]*structs.DesiredUpdates), + desiredFollowupEvals: make(map[string][]*structs.Evaluation), }, } } @@ -270,6 +290,8 @@ func (a *allocReconciler) markStop(allocs allocSet, clientStatus, statusDescript } } +const batchedFailedAllocWindowSize = 5 * time.Second + // computeGroup reconciles state for a particular task group. It returns whether // the deployment it is for is complete with regards to the task group. func (a *allocReconciler) computeGroup(group string, all allocSet) bool { @@ -318,11 +340,18 @@ func (a *allocReconciler) computeGroup(group string, all allocSet) bool { untainted, migrate, lost := all.filterByTainted(a.taintedNodes) // Determine what set of terminal allocations need to be rescheduled - untainted, reschedule := untainted.filterByReschedulable(a.batch, tg.ReschedulePolicy) + untainted, rescheduleNow, rescheduleLater := untainted.filterByRescheduleable(a.batch, tg.ReschedulePolicy) + + // Create batched follow up evaluations for allocations that are reschedulable later + rescheduleLaterAllocs := make(map[string]*structs.Allocation) + if len(rescheduleLater) > 0 { + rescheduleLaterAllocs = a.handleDelayedReschedules(rescheduleLater, all, tg.Name) + } + // Create a structure for choosing names. Seed with the taken names which is // the union of untainted and migrating nodes (includes canaries) - nameIndex := newAllocNameIndex(a.jobID, group, tg.Count, untainted.union(migrate, reschedule)) + nameIndex := newAllocNameIndex(a.jobID, group, tg.Count, untainted.union(migrate, rescheduleNow)) // Stop any unneeded allocations and update the untainted set to not // included stopped allocations. @@ -341,7 +370,7 @@ func (a *allocReconciler) computeGroup(group string, all allocSet) bool { // Do inplace upgrades where possible and capture the set of upgrades that // need to be done destructively. - ignore, inplace, destructive := a.computeUpdates(tg, untainted) + ignore, inplace, destructive := a.computeUpdates(tg, untainted, rescheduleLaterAllocs) desiredChanges.Ignore += uint64(len(ignore)) desiredChanges.InPlaceUpdate += uint64(len(inplace)) if !existingDeployment { @@ -379,7 +408,7 @@ func (a *allocReconciler) computeGroup(group string, all allocSet) bool { // * The deployment is not paused or failed // * Not placing any canaries // * If there are any canaries that they have been promoted - place := a.computePlacements(tg, nameIndex, untainted, migrate, reschedule) + place := a.computePlacements(tg, nameIndex, untainted, migrate, rescheduleNow) if !existingDeployment { dstate.DesiredTotal += len(place) } @@ -774,7 +803,7 @@ func (a *allocReconciler) computeStop(group *structs.TaskGroup, nameIndex *alloc // 2. Those that can be upgraded in-place. These are added to the results // automatically since the function contains the correct state to do so, // 3. Those that require destructive updates -func (a *allocReconciler) computeUpdates(group *structs.TaskGroup, untainted allocSet) (ignore, inplace, destructive allocSet) { +func (a *allocReconciler) computeUpdates(group *structs.TaskGroup, untainted, rescheduleLaterAllocs allocSet) (ignore, inplace, destructive allocSet) { // Determine the set of allocations that need to be updated ignore = make(map[string]*structs.Allocation) inplace = make(map[string]*structs.Allocation) @@ -782,7 +811,13 @@ func (a *allocReconciler) computeUpdates(group *structs.TaskGroup, untainted all for _, alloc := range untainted { ignoreChange, destructiveChange, inplaceAlloc := a.allocUpdateFn(alloc, a.job, group) - if ignoreChange { + // Also check if the alloc is marked for later rescheduling. + // If so it should be in the inplace list + reschedLaterAlloc, isRescheduleLater := rescheduleLaterAllocs[alloc.ID] + if isRescheduleLater { + inplace[alloc.ID] = alloc + a.result.inplaceUpdate = append(a.result.inplaceUpdate, reschedLaterAlloc) + } else if ignoreChange { ignore[alloc.ID] = alloc } else if destructiveChange { destructive[alloc.ID] = alloc @@ -796,3 +831,71 @@ func (a *allocReconciler) computeUpdates(group *structs.TaskGroup, untainted all return } +func (a *allocReconciler) handleDelayedReschedules(rescheduleLater []*delayedRescheduleInfo, all allocSet, tgName string) allocSet { + // Sort by time + sort.Slice(rescheduleLater, func(i, j int) bool { + return rescheduleLater[i].rescheduleTime.Before(rescheduleLater[j].rescheduleTime) + }) + + var evals []*structs.Evaluation + var allocIDs []string + nextReschedTime := rescheduleLater[0].rescheduleTime + allocIDToFollowupEvalID := make(map[string]string, len(rescheduleLater)) + for _, allocReschedInfo := range rescheduleLater { + if allocReschedInfo.rescheduleTime.UTC().UnixNano()-nextReschedTime.UTC().UnixNano() < batchedFailedAllocWindowSize.Nanoseconds() { + // add to batch + allocIDs = append(allocIDs, allocReschedInfo.allocID) + } else { + // create a new eval for the previous batch + eval := &structs.Evaluation{ + ID: uuid.Generate(), + Namespace: a.job.Namespace, + Priority: a.job.Priority, + Type: a.job.Type, + TriggeredBy: structs.EvalTriggerRetryFailedAlloc, + JobID: a.job.ID, + JobModifyIndex: a.job.ModifyIndex, + Status: structs.EvalStatusPending, + WaitUntil: nextReschedTime, + } + evals = append(evals, eval) + for _, allocID := range allocIDs { + allocIDToFollowupEvalID[allocID] = eval.ID + } + nextReschedTime = allocReschedInfo.rescheduleTime + // clear out this batch and start it again + allocIDs = nil + allocIDs = append(allocIDs, allocReschedInfo.allocID) + } + } + // Deal with the last batch + if len(allocIDs) > 0 { + eval := &structs.Evaluation{ + ID: uuid.Generate(), + Namespace: a.job.Namespace, + Priority: a.job.Priority, + Type: a.job.Type, + TriggeredBy: structs.EvalTriggerRetryFailedAlloc, + JobID: a.job.ID, + JobModifyIndex: a.job.ModifyIndex, + Status: structs.EvalStatusPending, + WaitUntil: nextReschedTime, + } + evals = append(evals, eval) + for _, allocID := range allocIDs { + allocIDToFollowupEvalID[allocID] = eval.ID + } + } + + a.result.desiredFollowupEvals[tgName] = evals + + // create inplace updates for every alloc ID that needs to be updated with its follow up eval ID + rescheduleLaterAllocs := make(map[string]*structs.Allocation) + for allocID, evalID := range allocIDToFollowupEvalID { + existingAlloc := all[allocID] + updatedAlloc := existingAlloc.Copy() + updatedAlloc.FollowupEvalID = evalID + rescheduleLaterAllocs[allocID] = updatedAlloc + } + return rescheduleLaterAllocs +} diff --git a/scheduler/reconcile_test.go b/scheduler/reconcile_test.go index 910039d03..7866a2096 100644 --- a/scheduler/reconcile_test.go +++ b/scheduler/reconcile_test.go @@ -15,6 +15,7 @@ import ( "github.com/hashicorp/nomad/nomad/mock" "github.com/hashicorp/nomad/nomad/structs" "github.com/kr/pretty" + "github.com/stretchr/testify/require" ) /* @@ -38,9 +39,12 @@ Basic Tests: √ Handle task group being removed √ Handle job being stopped both as .Stopped and nil √ Place more that one group -√ Handle rescheduling failed allocs for batch jobs -√ Handle rescheduling failed allocs for service jobs +√ Handle delayed rescheduling failed allocs for batch jobs +√ Handle delayed rescheduling failed allocs for service jobs +√ Handle eligible now rescheduling failed allocs for batch jobs +√ Handle eligible now rescheduling failed allocs for service jobs √ Previously rescheduled allocs should not be rescheduled again +√ Aggregated evaluations for allocations that fail close together Update stanza Tests: √ Stopped job cancels any active deployment @@ -1203,15 +1207,17 @@ func TestReconciler_MultiTG(t *testing.T) { assertNamesHaveIndexes(t, intRange(2, 9, 0, 9), placeResultsToNames(r.place)) } -// Tests rescheduling failed batch allocations -func TestReconciler_Reschedule_Batch(t *testing.T) { +// Tests delayed rescheduling of failed batch allocations +func TestReconciler_RescheduleLater_Batch(t *testing.T) { + require := require.New(t) // Set desired 4 job := mock.Job() job.TaskGroups[0].Count = 4 - + now := time.Now() // Set up reschedule policy - job.TaskGroups[0].ReschedulePolicy = &structs.ReschedulePolicy{Attempts: 3, Interval: 24 * time.Hour} - + delayDur := 15 * time.Second + job.TaskGroups[0].ReschedulePolicy = &structs.ReschedulePolicy{Attempts: 3, Interval: 24 * time.Hour, Delay: delayDur, DelayFunction: "linear"} + tgName := job.TaskGroups[0].Name // Create 6 existing allocations - 2 running, 1 complete and 3 failed var allocs []*structs.Allocation for i := 0; i < 6; i++ { @@ -1235,6 +1241,9 @@ func TestReconciler_Reschedule_Batch(t *testing.T) { }} allocs[1].NextAllocation = allocs[2].ID allocs[2].ClientStatus = structs.AllocClientStatusFailed + allocs[2].TaskStates = map[string]*structs.TaskState{tgName: {State: "start", + StartedAt: now.Add(-1 * time.Hour), + FinishedAt: now}} allocs[2].RescheduleTracker = &structs.RescheduleTracker{Events: []*structs.RescheduleEvent{ {RescheduleTime: time.Now().Add(-2 * time.Hour).UTC().UnixNano(), PrevAllocID: allocs[0].ID, @@ -1251,7 +1260,171 @@ func TestReconciler_Reschedule_Batch(t *testing.T) { reconciler := NewAllocReconciler(testLogger(), allocUpdateFnIgnore, true, job.ID, job, nil, allocs, nil) r := reconciler.Compute() - // Two reschedule attempts were made, one more can be made + // Two reschedule attempts were already made, one more can be made at a future time + + // Verify that the follow up eval has the expected waitUntil time + evals := r.desiredFollowupEvals[tgName] + require.NotNil(evals) + require.Equal(1, len(evals)) + require.Equal(now.Add(delayDur), evals[0].WaitUntil) + + // Alloc 5 should not be replaced because it is terminal + assertResults(t, r, &resultExpectation{ + createDeployment: nil, + deploymentUpdates: nil, + place: 0, + inplace: 1, + stop: 0, + desiredTGUpdates: map[string]*structs.DesiredUpdates{ + job.TaskGroups[0].Name: { + Place: 0, + InPlaceUpdate: 1, + Ignore: 3, + }, + }, + }) + assertNamesHaveIndexes(t, intRange(2, 2), allocsToNames(r.inplaceUpdate)) + // verify that the followup evalID field is set correctly + r.inplaceUpdate[0].EvalID = evals[0].ID +} + +// Tests delayed rescheduling of failed batch allocations and batching of allocs +// with fail times that are close together +func TestReconciler_RescheduleLaterWithBatchedEvals_Batch(t *testing.T) { + require := require.New(t) + // Set desired 4 + job := mock.Job() + job.TaskGroups[0].Count = 10 + now := time.Now() + // Set up reschedule policy + delayDur := 15 * time.Second + job.TaskGroups[0].ReschedulePolicy = &structs.ReschedulePolicy{Attempts: 3, Interval: 24 * time.Hour, Delay: delayDur, DelayFunction: "linear"} + tgName := job.TaskGroups[0].Name + // Create 10 existing allocations + var allocs []*structs.Allocation + for i := 0; i < 10; i++ { + alloc := mock.Alloc() + alloc.Job = job + alloc.JobID = job.ID + alloc.NodeID = uuid.Generate() + alloc.Name = structs.AllocName(job.ID, job.TaskGroups[0].Name, uint(i)) + allocs = append(allocs, alloc) + alloc.ClientStatus = structs.AllocClientStatusRunning + } + // Mark 5 as failed with fail times very close together + for i := 0; i < 5; i++ { + allocs[i].ClientStatus = structs.AllocClientStatusFailed + allocs[i].TaskStates = map[string]*structs.TaskState{tgName: {State: "start", + StartedAt: now.Add(-1 * time.Hour), + FinishedAt: now.Add(time.Duration(50*i) * time.Millisecond)}} + } + + // Mark two more as failed several seconds later + for i := 5; i < 7; i++ { + allocs[i].ClientStatus = structs.AllocClientStatusFailed + allocs[i].TaskStates = map[string]*structs.TaskState{tgName: {State: "start", + StartedAt: now.Add(-1 * time.Hour), + FinishedAt: now.Add(10 * time.Second)}} + } + + reconciler := NewAllocReconciler(testLogger(), allocUpdateFnIgnore, true, job.ID, job, nil, allocs, nil) + r := reconciler.Compute() + + // Verify that two follow up evals were created + evals := r.desiredFollowupEvals[tgName] + require.NotNil(evals) + require.Equal(2, len(evals)) + + // Verify expected WaitUntil values for both batched evals + require.Equal(now.Add(delayDur), evals[0].WaitUntil) + secondBatchDuration := delayDur + 10*time.Second + require.Equal(now.Add(secondBatchDuration), evals[1].WaitUntil) + + // Alloc 5 should not be replaced because it is terminal + assertResults(t, r, &resultExpectation{ + createDeployment: nil, + deploymentUpdates: nil, + place: 0, + inplace: 7, + stop: 0, + desiredTGUpdates: map[string]*structs.DesiredUpdates{ + job.TaskGroups[0].Name: { + Place: 0, + InPlaceUpdate: 7, + Ignore: 3, + }, + }, + }) + assertNamesHaveIndexes(t, intRange(0, 6), allocsToNames(r.inplaceUpdate)) + // verify that the followup evalID field is set correctly + for _, alloc := range r.inplaceUpdate { + if allocNameToIndex(alloc.Name) < 5 { + require.Equal(evals[0].ID, alloc.FollowupEvalID) + } else if allocNameToIndex(alloc.Name) < 7 { + require.Equal(evals[1].ID, alloc.FollowupEvalID) + } else { + t.Fatalf("Unexpected alloc name in Inplace results %v", alloc.Name) + } + } +} + +// Tests rescheduling failed batch allocations +func TestReconciler_RescheduleNow_Batch(t *testing.T) { + require := require.New(t) + // Set desired 4 + job := mock.Job() + job.TaskGroups[0].Count = 4 + now := time.Now() + // Set up reschedule policy + job.TaskGroups[0].ReschedulePolicy = &structs.ReschedulePolicy{Attempts: 3, Interval: 24 * time.Hour, Delay: 5 * time.Second, DelayFunction: "linear"} + tgName := job.TaskGroups[0].Name + // Create 6 existing allocations - 2 running, 1 complete and 3 failed + var allocs []*structs.Allocation + for i := 0; i < 6; i++ { + alloc := mock.Alloc() + alloc.Job = job + alloc.JobID = job.ID + alloc.NodeID = uuid.Generate() + alloc.Name = structs.AllocName(job.ID, job.TaskGroups[0].Name, uint(i)) + allocs = append(allocs, alloc) + alloc.ClientStatus = structs.AllocClientStatusRunning + } + // Mark 3 as failed with restart tracking info + allocs[0].ClientStatus = structs.AllocClientStatusFailed + allocs[0].NextAllocation = allocs[1].ID + allocs[1].ClientStatus = structs.AllocClientStatusFailed + allocs[1].RescheduleTracker = &structs.RescheduleTracker{Events: []*structs.RescheduleEvent{ + {RescheduleTime: time.Now().Add(-1 * time.Hour).UTC().UnixNano(), + PrevAllocID: allocs[0].ID, + PrevNodeID: uuid.Generate(), + }, + }} + allocs[1].NextAllocation = allocs[2].ID + allocs[2].ClientStatus = structs.AllocClientStatusFailed + allocs[2].TaskStates = map[string]*structs.TaskState{tgName: {State: "start", + StartedAt: now.Add(-1 * time.Hour), + FinishedAt: now.Add(-10 * time.Second)}} + allocs[2].RescheduleTracker = &structs.RescheduleTracker{Events: []*structs.RescheduleEvent{ + {RescheduleTime: time.Now().Add(-2 * time.Hour).UTC().UnixNano(), + PrevAllocID: allocs[0].ID, + PrevNodeID: uuid.Generate(), + }, + {RescheduleTime: time.Now().Add(-1 * time.Hour).UTC().UnixNano(), + PrevAllocID: allocs[1].ID, + PrevNodeID: uuid.Generate(), + }, + }} + // Mark one as complete + allocs[5].ClientStatus = structs.AllocClientStatusComplete + + reconciler := NewAllocReconciler(testLogger(), allocUpdateFnIgnore, true, job.ID, job, nil, allocs, nil) + r := reconciler.Compute() + + // Verify that no follow up evals were created + evals := r.desiredFollowupEvals[tgName] + require.Nil(evals) + + // Two reschedule attempts were made, one more can be made now // Alloc 5 should not be replaced because it is terminal assertResults(t, r, &resultExpectation{ createDeployment: nil, @@ -1266,19 +1439,24 @@ func TestReconciler_Reschedule_Batch(t *testing.T) { }, }, }) + assertNamesHaveIndexes(t, intRange(2, 2), placeResultsToNames(r.place)) assertPlaceResultsHavePreviousAllocs(t, 1, r.place) assertPlacementsAreRescheduled(t, 1, r.place) + } // Tests rescheduling failed service allocations with desired state stop -func TestReconciler_Reschedule_Service(t *testing.T) { +func TestReconciler_RescheduleLater_Service(t *testing.T) { + require := require.New(t) // Set desired 5 job := mock.Job() job.TaskGroups[0].Count = 5 - + tgName := job.TaskGroups[0].Name + now := time.Now() // Set up reschedule policy - job.TaskGroups[0].ReschedulePolicy = &structs.ReschedulePolicy{Attempts: 1, Interval: 24 * time.Hour} + delayDur := 15 * time.Second + job.TaskGroups[0].ReschedulePolicy = &structs.ReschedulePolicy{Attempts: 1, Interval: 24 * time.Hour, Delay: delayDur, DelayCeiling: 1 * time.Hour} // Create 5 existing allocations var allocs []*structs.Allocation @@ -1293,15 +1471,17 @@ func TestReconciler_Reschedule_Service(t *testing.T) { } // Mark two as failed allocs[0].ClientStatus = structs.AllocClientStatusFailed - allocs[1].ClientStatus = structs.AllocClientStatusFailed - // Mark one of them as already rescheduled once - allocs[1].RescheduleTracker = &structs.RescheduleTracker{Events: []*structs.RescheduleEvent{ + allocs[0].RescheduleTracker = &structs.RescheduleTracker{Events: []*structs.RescheduleEvent{ {RescheduleTime: time.Now().Add(-1 * time.Hour).UTC().UnixNano(), PrevAllocID: uuid.Generate(), PrevNodeID: uuid.Generate(), }, }} + allocs[1].TaskStates = map[string]*structs.TaskState{tgName: {State: "start", + StartedAt: now.Add(-1 * time.Hour), + FinishedAt: now}} + allocs[1].ClientStatus = structs.AllocClientStatusFailed // Mark one as desired state stop allocs[4].DesiredStatus = structs.AllocDesiredStatusStop @@ -1309,7 +1489,81 @@ func TestReconciler_Reschedule_Service(t *testing.T) { reconciler := NewAllocReconciler(testLogger(), allocUpdateFnIgnore, false, job.ID, job, nil, allocs, nil) r := reconciler.Compute() - // Should place 2, one is rescheduled, one is past its reschedule limit and one is a new placement + // Should place a new placement and create a follow up eval for the delayed reschedule + // Verify that the follow up eval has the expected waitUntil time + evals := r.desiredFollowupEvals[tgName] + require.NotNil(evals) + require.Equal(1, len(evals)) + require.Equal(now.Add(delayDur), evals[0].WaitUntil) + + assertResults(t, r, &resultExpectation{ + createDeployment: nil, + deploymentUpdates: nil, + place: 1, + inplace: 1, + stop: 0, + desiredTGUpdates: map[string]*structs.DesiredUpdates{ + job.TaskGroups[0].Name: { + Place: 1, + InPlaceUpdate: 1, + Ignore: 3, + }, + }, + }) + + assertNamesHaveIndexes(t, intRange(4, 4), placeResultsToNames(r.place)) + assertNamesHaveIndexes(t, intRange(1, 1), allocsToNames(r.inplaceUpdate)) + // verify that the followup evalID field is set correctly + r.inplaceUpdate[0].EvalID = evals[0].ID +} + +// Tests rescheduling failed service allocations with desired state stop +func TestReconciler_RescheduleNow_Service(t *testing.T) { + require := require.New(t) + // Set desired 5 + job := mock.Job() + job.TaskGroups[0].Count = 5 + tgName := job.TaskGroups[0].Name + now := time.Now() + // Set up reschedule policy + job.TaskGroups[0].ReschedulePolicy = &structs.ReschedulePolicy{Attempts: 1, Interval: 24 * time.Hour, Delay: 5 * time.Second, DelayCeiling: 1 * time.Hour} + + // Create 5 existing allocations + var allocs []*structs.Allocation + for i := 0; i < 5; i++ { + alloc := mock.Alloc() + alloc.Job = job + alloc.JobID = job.ID + alloc.NodeID = uuid.Generate() + alloc.Name = structs.AllocName(job.ID, job.TaskGroups[0].Name, uint(i)) + allocs = append(allocs, alloc) + alloc.ClientStatus = structs.AllocClientStatusRunning + } + // Mark two as failed + allocs[0].ClientStatus = structs.AllocClientStatusFailed + // Mark one of them as already rescheduled once + allocs[0].RescheduleTracker = &structs.RescheduleTracker{Events: []*structs.RescheduleEvent{ + {RescheduleTime: time.Now().Add(-1 * time.Hour).UTC().UnixNano(), + PrevAllocID: uuid.Generate(), + PrevNodeID: uuid.Generate(), + }, + }} + allocs[1].TaskStates = map[string]*structs.TaskState{tgName: {State: "start", + StartedAt: now.Add(-1 * time.Hour), + FinishedAt: now.Add(-10 * time.Second)}} + allocs[1].ClientStatus = structs.AllocClientStatusFailed + + // Mark one as desired state stop + allocs[4].DesiredStatus = structs.AllocDesiredStatusStop + + reconciler := NewAllocReconciler(testLogger(), allocUpdateFnIgnore, false, job.ID, job, nil, allocs, nil) + r := reconciler.Compute() + + // Verify that no follow up evals were created + evals := r.desiredFollowupEvals[tgName] + require.Nil(evals) + + // Verify that one rescheduled alloc and one replacement for terminal alloc were placed assertResults(t, r, &resultExpectation{ createDeployment: nil, deploymentUpdates: nil, @@ -1324,8 +1578,8 @@ func TestReconciler_Reschedule_Service(t *testing.T) { }, }) - assertNamesHaveIndexes(t, intRange(0, 0, 4, 4), placeResultsToNames(r.place)) - // 2 rescheduled allocs should have previous allocs + assertNamesHaveIndexes(t, intRange(1, 1, 4, 4), placeResultsToNames(r.place)) + // Rescheduled allocs should have previous allocs assertPlaceResultsHavePreviousAllocs(t, 1, r.place) assertPlacementsAreRescheduled(t, 1, r.place) } @@ -3374,6 +3628,8 @@ func TestReconciler_FailedDeployment_DontReschedule(t *testing.T) { job := mock.Job() job.TaskGroups[0].Update = noCanaryUpdate + tgName := job.TaskGroups[0].Name + now := time.Now() // Create an existing failed deployment that has some placed allocs d := structs.NewDeployment(job) d.Status = structs.DeploymentStatusFailed @@ -3394,8 +3650,17 @@ func TestReconciler_FailedDeployment_DontReschedule(t *testing.T) { alloc.TaskGroup = job.TaskGroups[0].Name allocs = append(allocs, alloc) } + + //create some allocations that are reschedulable now allocs[2].ClientStatus = structs.AllocClientStatusFailed + allocs[2].TaskStates = map[string]*structs.TaskState{tgName: {State: "start", + StartedAt: now.Add(-1 * time.Hour), + FinishedAt: now.Add(-10 * time.Second)}} + allocs[3].ClientStatus = structs.AllocClientStatusFailed + allocs[3].TaskStates = map[string]*structs.TaskState{tgName: {State: "start", + StartedAt: now.Add(-1 * time.Hour), + FinishedAt: now.Add(-10 * time.Second)}} reconciler := NewAllocReconciler(testLogger(), allocUpdateFnDestructive, false, job.ID, job, d, allocs, nil) r := reconciler.Compute() @@ -3417,6 +3682,8 @@ func TestReconciler_FailedDeployment_DontReschedule(t *testing.T) { func TestReconciler_DeploymentWithFailedAllocs_DontReschedule(t *testing.T) { job := mock.Job() job.TaskGroups[0].Update = noCanaryUpdate + tgName := job.TaskGroups[0].Name + now := time.Now() // Mock deployment with failed allocs, but deployment watcher hasn't marked it as failed yet d := structs.NewDeployment(job) @@ -3439,8 +3706,17 @@ func TestReconciler_DeploymentWithFailedAllocs_DontReschedule(t *testing.T) { alloc.DeploymentID = d.ID allocs = append(allocs, alloc) } + + // Create allocs that are reschedulable now allocs[2].ClientStatus = structs.AllocClientStatusFailed + allocs[2].TaskStates = map[string]*structs.TaskState{tgName: {State: "start", + StartedAt: now.Add(-1 * time.Hour), + FinishedAt: now.Add(-10 * time.Second)}} + allocs[3].ClientStatus = structs.AllocClientStatusFailed + allocs[3].TaskStates = map[string]*structs.TaskState{tgName: {State: "start", + StartedAt: now.Add(-1 * time.Hour), + FinishedAt: now.Add(-10 * time.Second)}} reconciler := NewAllocReconciler(testLogger(), allocUpdateFnDestructive, false, job.ID, job, d, allocs, nil) r := reconciler.Compute() diff --git a/scheduler/reconcile_util.go b/scheduler/reconcile_util.go index cff5c2648..99a7a07cc 100644 --- a/scheduler/reconcile_util.go +++ b/scheduler/reconcile_util.go @@ -227,14 +227,17 @@ func (a allocSet) filterByTainted(nodes map[string]*structs.Node) (untainted, mi return } -// filterByReschedulable filters the allocation set to return the set of allocations that are either -// terminal or running, and a set of allocations that must be rescheduled -func (a allocSet) filterByReschedulable(isBatch bool, reschedulePolicy *structs.ReschedulePolicy) (untainted, reschedule allocSet) { +// filterByRescheduleable filters the allocation set to return the set of allocations that are either +// terminal or running, and a set of allocations that must be rescheduled now. Allocations that can be rescheduled +// at a future time are also returned so that we can create follow up evaluations for them +func (a allocSet) filterByRescheduleable(isBatch bool, reschedulePolicy *structs.ReschedulePolicy) (untainted, rescheduleNow allocSet, rescheduleLater []*delayedRescheduleInfo) { untainted = make(map[string]*structs.Allocation) - reschedule = make(map[string]*structs.Allocation) + rescheduleNow = make(map[string]*structs.Allocation) now := time.Now() for _, alloc := range a { + var isUntainted, eligibleNow, eligibleLater bool + var rescheduleTime time.Time if isBatch { // Allocs from batch jobs should be filtered when the desired status // is terminal and the client did not finish or when the client @@ -249,26 +252,48 @@ func (a allocSet) filterByReschedulable(isBatch bool, reschedulePolicy *structs. default: } if alloc.NextAllocation == "" { - if alloc.ShouldReschedule(reschedulePolicy, now) { - reschedule[alloc.ID] = alloc - } else { - untainted[alloc.ID] = alloc - } + //ignore allocs that have already been rescheduled + isUntainted, eligibleNow, eligibleLater, rescheduleTime = updateByReschedulable(alloc, reschedulePolicy, now, true) } } else { //ignore allocs that have already been rescheduled if alloc.NextAllocation == "" { - // ignore allocs whose desired state is stop/evict - // everything else is either reschedulable or untainted - if alloc.ShouldReschedule(reschedulePolicy, now) { - reschedule[alloc.ID] = alloc - } else if alloc.DesiredStatus != structs.AllocDesiredStatusStop && alloc.DesiredStatus != structs.AllocDesiredStatusEvict { - untainted[alloc.ID] = alloc - } + isUntainted, eligibleNow, eligibleLater, rescheduleTime = updateByReschedulable(alloc, reschedulePolicy, now, false) } } + if isUntainted { + untainted[alloc.ID] = alloc + } + if eligibleNow { + rescheduleNow[alloc.ID] = alloc + } else if eligibleLater { + rescheduleLater = append(rescheduleLater, &delayedRescheduleInfo{alloc.ID, rescheduleTime}) + } } + return +} +// updateByReschedulable is a helper method that encapsulates logic for whether a failed allocation +// should be rescheduled now, later or left in the untainted set +func updateByReschedulable(alloc *structs.Allocation, reschedulePolicy *structs.ReschedulePolicy, now time.Time, batch bool) (untainted, rescheduleNow, rescheduleLater bool, rescheduleTime time.Time) { + shouldAllow := true + if !batch { + // for service type jobs we ignore allocs whose desired state is stop/evict + // everything else is either rescheduleable or untainted + shouldAllow = alloc.DesiredStatus != structs.AllocDesiredStatusStop && alloc.DesiredStatus != structs.AllocDesiredStatusEvict + } + rescheduleTime, eligible := alloc.NextRescheduleTime(reschedulePolicy) + timeDiff := rescheduleTime.UTC().UnixNano() - now.UTC().UnixNano() + // we consider a time difference of less than 5 seconds to be eligible + // because we collapse allocations that failed within 5 seconds into a single evaluation + if eligible && timeDiff < batchedFailedAllocWindowSize.Nanoseconds() { + rescheduleNow = true + } else if shouldAllow { + untainted = true + if eligible && alloc.FollowupEvalID == "" { + rescheduleLater = true + } + } return }