Scheduler and Reconciler changes to support delayed rescheduling

This commit is contained in:
Preetha Appan 2018-03-01 18:23:44 -06:00
parent 342c3fb961
commit 5373ade731
No known key found for this signature in database
GPG Key ID: 9F7C19990A50EAFC
5 changed files with 983 additions and 51 deletions

View File

@ -72,8 +72,10 @@ type GenericScheduler struct {
ctx *EvalContext
stack *GenericStack
// Deprecated, was used in pre Nomad 0.7 rolling update stanza
followupEvalWait time.Duration
nextEval *structs.Evaluation
followUpEvals []*structs.Evaluation
deployment *structs.Deployment
@ -261,6 +263,19 @@ func (s *GenericScheduler) process() (bool, error) {
s.logger.Printf("[DEBUG] sched: %#v: rolling migration limit reached, next eval '%s' created", s.eval, s.nextEval.ID)
}
// Create follow up evals for any delayed reschedule eligible allocations
if len(s.followUpEvals) > 0 {
for _, eval := range s.followUpEvals {
eval.PreviousEval = s.eval.ID
// TODO(preetha) this should be batching evals before inserting them
if err := s.planner.CreateEval(eval); err != nil {
s.logger.Printf("[ERR] sched: %#v failed to make next eval for rescheduling: %v", s.eval, err)
return false, err
}
s.logger.Printf("[DEBUG] sched: %#v: found reschedulable allocs, next eval '%s' created", s.eval, eval.ID)
}
}
// Submit the plan and store the results.
result, newState, err := s.planner.SubmitPlan(s.plan)
s.planResult = result
@ -336,6 +351,12 @@ func (s *GenericScheduler) computeJobAllocs() error {
// follow up eval to handle node draining.
s.followupEvalWait = results.followupEvalWait
// Store all the follow up evaluations from rescheduled allocations
if len(results.desiredFollowupEvals) > 0 {
for _, evals := range results.desiredFollowupEvals {
s.followUpEvals = append(s.followUpEvals, evals...)
}
}
// Update the stored deployment
if results.deployment != nil {
s.deployment = results.deployment
@ -467,7 +488,7 @@ func (s *GenericScheduler) computePlacements(destructive, place []placementResul
if prevAllocation != nil {
alloc.PreviousAllocation = prevAllocation.ID
if missing.IsRescheduling() {
updateRescheduleTracker(alloc, prevAllocation)
updateRescheduleTracker(alloc, prevAllocation, tg.ReschedulePolicy, time.Now())
}
}
@ -523,15 +544,42 @@ func getSelectOptions(prevAllocation *structs.Allocation, preferredNode *structs
return selectOptions
}
const max_past_reschedule_events = 5
// updateRescheduleTracker carries over previous restart attempts and adds the most recent restart
func updateRescheduleTracker(alloc *structs.Allocation, prev *structs.Allocation) {
func updateRescheduleTracker(alloc *structs.Allocation, prev *structs.Allocation, reschedPolicy *structs.ReschedulePolicy, now time.Time) {
var rescheduleEvents []*structs.RescheduleEvent
if prev.RescheduleTracker != nil {
for _, reschedEvent := range prev.RescheduleTracker.Events {
rescheduleEvents = append(rescheduleEvents, reschedEvent.Copy())
var interval time.Duration
if reschedPolicy != nil {
interval = reschedPolicy.Interval
}
// if attempts is set copy all events in the interval range
if reschedPolicy.Attempts > 0 {
for _, reschedEvent := range prev.RescheduleTracker.Events {
timeDiff := time.Now().UTC().UnixNano() - reschedEvent.RescheduleTime
// Only copy over events that are within restart interval
// This keeps the list of events small in cases where there's a long chain of old restart events
if interval > 0 && timeDiff <= interval.Nanoseconds() {
rescheduleEvents = append(rescheduleEvents, reschedEvent.Copy())
}
}
} else {
// for unlimited restarts, only copy the last n
copied := 0
for i := len(prev.RescheduleTracker.Events) - 1; i >= 0 && copied < max_past_reschedule_events; i-- {
reschedEvent := prev.RescheduleTracker.Events[i]
rescheduleEvents = append(rescheduleEvents, reschedEvent.Copy())
copied++
}
// reverse it to get the correct order
for left, right := 0, len(rescheduleEvents)-1; left < right; left, right = left+1, right-1 {
rescheduleEvents[left], rescheduleEvents[right] = rescheduleEvents[right], rescheduleEvents[left]
}
}
}
rescheduleEvent := structs.NewRescheduleEvent(time.Now().UTC().UnixNano(), prev.ID, prev.NodeID)
nextDelay := prev.NextDelay(reschedPolicy)
rescheduleEvent := structs.NewRescheduleEvent(now.UnixNano(), prev.ID, prev.NodeID, nextDelay)
rescheduleEvents = append(rescheduleEvents, rescheduleEvent)
alloc.RescheduleTracker = &structs.RescheduleTracker{Events: rescheduleEvents}
}

View File

@ -2715,7 +2715,7 @@ func TestServiceSched_RetryLimit(t *testing.T) {
h.AssertEvalStatus(t, structs.EvalStatusFailed)
}
func TestServiceSched_Reschedule_Once(t *testing.T) {
func TestServiceSched_Reschedule_OnceNow(t *testing.T) {
h := NewHarness(t)
// Create some nodes
@ -2730,9 +2730,15 @@ func TestServiceSched_Reschedule_Once(t *testing.T) {
job := mock.Job()
job.TaskGroups[0].Count = 2
job.TaskGroups[0].ReschedulePolicy = &structs.ReschedulePolicy{
Attempts: 1,
Interval: 15 * time.Minute,
Attempts: 1,
Interval: 15 * time.Minute,
Delay: 5 * time.Second,
DelayCeiling: 1 * time.Minute,
DelayFunction: "linear",
}
tgName := job.TaskGroups[0].Name
now := time.Now()
noErr(t, h.State.UpsertJob(h.NextIndex(), job))
var allocs []*structs.Allocation
@ -2746,6 +2752,9 @@ func TestServiceSched_Reschedule_Once(t *testing.T) {
}
// Mark one of the allocations as failed
allocs[1].ClientStatus = structs.AllocClientStatusFailed
allocs[1].TaskStates = map[string]*structs.TaskState{tgName: {State: "dead",
StartedAt: now.Add(-1 * time.Hour),
FinishedAt: now.Add(-10 * time.Second)}}
failedAllocID := allocs[1].ID
successAllocID := allocs[0].ID
@ -2817,7 +2826,96 @@ func TestServiceSched_Reschedule_Once(t *testing.T) {
}
func TestServiceSched_Reschedule_Multiple(t *testing.T) {
// Tests that alloc reschedulable at a future time creates a follow up eval
func TestServiceSched_Reschedule_Later(t *testing.T) {
h := NewHarness(t)
require := require.New(t)
// Create some nodes
var nodes []*structs.Node
for i := 0; i < 10; i++ {
node := mock.Node()
nodes = append(nodes, node)
noErr(t, h.State.UpsertNode(h.NextIndex(), node))
}
// Generate a fake job with allocations and an update policy.
job := mock.Job()
job.TaskGroups[0].Count = 2
delayDuration := 15 * time.Second
job.TaskGroups[0].ReschedulePolicy = &structs.ReschedulePolicy{
Attempts: 1,
Interval: 15 * time.Minute,
Delay: delayDuration,
DelayCeiling: 1 * time.Minute,
DelayFunction: "linear",
}
tgName := job.TaskGroups[0].Name
now := time.Now()
noErr(t, h.State.UpsertJob(h.NextIndex(), job))
var allocs []*structs.Allocation
for i := 0; i < 2; i++ {
alloc := mock.Alloc()
alloc.Job = job
alloc.JobID = job.ID
alloc.NodeID = nodes[i].ID
alloc.Name = fmt.Sprintf("my-job.web[%d]", i)
allocs = append(allocs, alloc)
}
// Mark one of the allocations as failed
allocs[1].ClientStatus = structs.AllocClientStatusFailed
allocs[1].TaskStates = map[string]*structs.TaskState{tgName: {State: "dead",
StartedAt: now.Add(-1 * time.Hour),
FinishedAt: now}}
failedAllocID := allocs[1].ID
noErr(t, h.State.UpsertAllocs(h.NextIndex(), allocs))
// Create a mock evaluation
eval := &structs.Evaluation{
Namespace: structs.DefaultNamespace,
ID: uuid.Generate(),
Priority: 50,
TriggeredBy: structs.EvalTriggerNodeUpdate,
JobID: job.ID,
Status: structs.EvalStatusPending,
}
noErr(t, h.State.UpsertEvals(h.NextIndex(), []*structs.Evaluation{eval}))
// Process the evaluation
err := h.Process(NewServiceScheduler, eval)
if err != nil {
t.Fatalf("err: %v", err)
}
// Ensure multiple plans
if len(h.Plans) == 0 {
t.Fatalf("bad: %#v", h.Plans)
}
// Lookup the allocations by JobID
ws := memdb.NewWatchSet()
out, err := h.State.AllocsByJob(ws, job.Namespace, job.ID, false)
noErr(t, err)
// Verify no new allocs were created
require.Equal(2, len(out))
// Verify follow up eval was created for the failed alloc
alloc, err := h.State.AllocByID(ws, failedAllocID)
require.Nil(err)
require.NotEmpty(alloc.FollowupEvalID)
// Ensure there is a follow up eval.
if len(h.CreateEvals) != 1 || h.CreateEvals[0].Status != structs.EvalStatusPending {
t.Fatalf("bad: %#v", h.CreateEvals)
}
followupEval := h.CreateEvals[0]
require.Equal(now.Add(delayDuration), followupEval.WaitUntil)
}
func TestServiceSched_Reschedule_MultipleNow(t *testing.T) {
h := NewHarness(t)
// Create some nodes
@ -2833,9 +2931,14 @@ func TestServiceSched_Reschedule_Multiple(t *testing.T) {
job := mock.Job()
job.TaskGroups[0].Count = 2
job.TaskGroups[0].ReschedulePolicy = &structs.ReschedulePolicy{
Attempts: maxRestartAttempts,
Interval: 30 * time.Minute,
Attempts: maxRestartAttempts,
Interval: 30 * time.Minute,
Delay: 5 * time.Second,
DelayFunction: "linear",
}
tgName := job.TaskGroups[0].Name
now := time.Now()
noErr(t, h.State.UpsertJob(h.NextIndex(), job))
var allocs []*structs.Allocation
@ -2850,6 +2953,9 @@ func TestServiceSched_Reschedule_Multiple(t *testing.T) {
}
// Mark one of the allocations as failed
allocs[1].ClientStatus = structs.AllocClientStatusFailed
allocs[1].TaskStates = map[string]*structs.TaskState{tgName: {State: "dead",
StartedAt: now.Add(-1 * time.Hour),
FinishedAt: now.Add(-10 * time.Second)}}
noErr(t, h.State.UpsertAllocs(h.NextIndex(), allocs))
@ -2915,6 +3021,9 @@ func TestServiceSched_Reschedule_Multiple(t *testing.T) {
// Mark this alloc as failed again
newAlloc.ClientStatus = structs.AllocClientStatusFailed
newAlloc.TaskStates = map[string]*structs.TaskState{tgName: {State: "dead",
StartedAt: now.Add(-12 * time.Second),
FinishedAt: now.Add(-10 * time.Second)}}
failedAllocId = newAlloc.ID
failedNodeID = newAlloc.NodeID
@ -2946,6 +3055,136 @@ func TestServiceSched_Reschedule_Multiple(t *testing.T) {
assert.Equal(5, len(out)) // 2 original, plus 3 reschedule attempts
}
// Tests that old reschedule attempts are pruned
func TestServiceSched_Reschedule_PruneEvents(t *testing.T) {
h := NewHarness(t)
// Create some nodes
var nodes []*structs.Node
for i := 0; i < 10; i++ {
node := mock.Node()
nodes = append(nodes, node)
noErr(t, h.State.UpsertNode(h.NextIndex(), node))
}
// Generate a fake job with allocations and an update policy.
job := mock.Job()
job.TaskGroups[0].Count = 2
job.TaskGroups[0].ReschedulePolicy = &structs.ReschedulePolicy{
DelayFunction: "exponential",
DelayCeiling: 1 * time.Hour,
Delay: 5 * time.Second,
Unlimited: true,
}
noErr(t, h.State.UpsertJob(h.NextIndex(), job))
var allocs []*structs.Allocation
for i := 0; i < 2; i++ {
alloc := mock.Alloc()
alloc.Job = job
alloc.JobID = job.ID
alloc.NodeID = nodes[i].ID
alloc.Name = fmt.Sprintf("my-job.web[%d]", i)
allocs = append(allocs, alloc)
}
now := time.Now()
// Mark allocations as failed with restart info
allocs[1].TaskStates = map[string]*structs.TaskState{job.TaskGroups[0].Name: {State: "dead",
StartedAt: now.Add(-1 * time.Hour),
FinishedAt: now.Add(-15 * time.Minute)}}
allocs[1].ClientStatus = structs.AllocClientStatusFailed
allocs[1].RescheduleTracker = &structs.RescheduleTracker{
Events: []*structs.RescheduleEvent{
{RescheduleTime: now.Add(-1 * time.Hour).UTC().UnixNano(),
PrevAllocID: uuid.Generate(),
PrevNodeID: uuid.Generate(),
Delay: 5 * time.Second,
},
{RescheduleTime: now.Add(-40 * time.Minute).UTC().UnixNano(),
PrevAllocID: allocs[0].ID,
PrevNodeID: uuid.Generate(),
Delay: 10 * time.Second,
},
{RescheduleTime: now.Add(-30 * time.Minute).UTC().UnixNano(),
PrevAllocID: allocs[0].ID,
PrevNodeID: uuid.Generate(),
Delay: 20 * time.Second,
},
{RescheduleTime: now.Add(-20 * time.Minute).UTC().UnixNano(),
PrevAllocID: allocs[0].ID,
PrevNodeID: uuid.Generate(),
Delay: 40 * time.Second,
},
{RescheduleTime: now.Add(-10 * time.Minute).UTC().UnixNano(),
PrevAllocID: allocs[0].ID,
PrevNodeID: uuid.Generate(),
Delay: 80 * time.Second,
},
{RescheduleTime: now.Add(-3 * time.Minute).UTC().UnixNano(),
PrevAllocID: allocs[0].ID,
PrevNodeID: uuid.Generate(),
Delay: 160 * time.Second,
},
},
}
expectedFirstRescheduleEvent := allocs[1].RescheduleTracker.Events[1]
expectedDelay := 320 * time.Second
failedAllocID := allocs[1].ID
successAllocID := allocs[0].ID
noErr(t, h.State.UpsertAllocs(h.NextIndex(), allocs))
// Create a mock evaluation
eval := &structs.Evaluation{
Namespace: structs.DefaultNamespace,
ID: uuid.Generate(),
Priority: 50,
TriggeredBy: structs.EvalTriggerNodeUpdate,
JobID: job.ID,
Status: structs.EvalStatusPending,
}
noErr(t, h.State.UpsertEvals(h.NextIndex(), []*structs.Evaluation{eval}))
// Process the evaluation
err := h.Process(NewServiceScheduler, eval)
if err != nil {
t.Fatalf("err: %v", err)
}
// Ensure multiple plans
if len(h.Plans) == 0 {
t.Fatalf("bad: %#v", h.Plans)
}
// Lookup the allocations by JobID
ws := memdb.NewWatchSet()
out, err := h.State.AllocsByJob(ws, job.Namespace, job.ID, false)
noErr(t, err)
// Verify that one new allocation got created with its restart tracker info
assert := assert.New(t)
assert.Equal(3, len(out))
var newAlloc *structs.Allocation
for _, alloc := range out {
if alloc.ID != successAllocID && alloc.ID != failedAllocID {
newAlloc = alloc
}
}
assert.Equal(failedAllocID, newAlloc.PreviousAllocation)
// Verify that the new alloc copied the last 5 reschedule attempts
assert.Equal(6, len(newAlloc.RescheduleTracker.Events))
assert.Equal(expectedFirstRescheduleEvent, newAlloc.RescheduleTracker.Events[0])
mostRecentRescheduleEvent := newAlloc.RescheduleTracker.Events[5]
// Verify that the failed alloc ID is in the most recent reschedule event
assert.Equal(failedAllocID, mostRecentRescheduleEvent.PrevAllocID)
// Verify that the delay value was captured correctly
assert.Equal(expectedDelay, mostRecentRescheduleEvent.Delay)
}
// Tests that deployments with failed allocs don't result in placements
func TestDeployment_FailedAllocs_NoReschedule(t *testing.T) {
h := NewHarness(t)
@ -3079,6 +3318,9 @@ func TestBatchSched_Run_FailedAlloc(t *testing.T) {
job.TaskGroups[0].Count = 1
noErr(t, h.State.UpsertJob(h.NextIndex(), job))
tgName := job.TaskGroups[0].Name
now := time.Now()
// Create a failed alloc
alloc := mock.Alloc()
alloc.Job = job
@ -3086,6 +3328,9 @@ func TestBatchSched_Run_FailedAlloc(t *testing.T) {
alloc.NodeID = node.ID
alloc.Name = "my-job.web[0]"
alloc.ClientStatus = structs.AllocClientStatusFailed
alloc.TaskStates = map[string]*structs.TaskState{tgName: {State: "dead",
StartedAt: now.Add(-1 * time.Hour),
FinishedAt: now.Add(-10 * time.Second)}}
noErr(t, h.State.UpsertAllocs(h.NextIndex(), []*structs.Allocation{alloc}))
// Create a mock evaluation to register the job
@ -3231,6 +3476,9 @@ func TestBatchSched_Run_FailedAllocQueuedAllocations(t *testing.T) {
job.TaskGroups[0].Count = 1
noErr(t, h.State.UpsertJob(h.NextIndex(), job))
tgName := job.TaskGroups[0].Name
now := time.Now()
// Create a failed alloc
alloc := mock.Alloc()
alloc.Job = job
@ -3238,6 +3486,9 @@ func TestBatchSched_Run_FailedAllocQueuedAllocations(t *testing.T) {
alloc.NodeID = node.ID
alloc.Name = "my-job.web[0]"
alloc.ClientStatus = structs.AllocClientStatusFailed
alloc.TaskStates = map[string]*structs.TaskState{tgName: {State: "dead",
StartedAt: now.Add(-1 * time.Hour),
FinishedAt: now.Add(-10 * time.Second)}}
noErr(t, h.State.UpsertAllocs(h.NextIndex(), []*structs.Allocation{alloc}))
// Create a mock evaluation to register the job
@ -3963,3 +4214,232 @@ func TestServiceSched_CancelDeployment_NewerJob(t *testing.T) {
h.AssertEvalStatus(t, structs.EvalStatusComplete)
}
// Various table driven tests for carry forward
// of past reschedule events
func Test_updateRescheduleTracker(t *testing.T) {
t1 := time.Now().UTC()
alloc := mock.Alloc()
prevAlloc := mock.Alloc()
type testCase struct {
desc string
prevAllocEvents []*structs.RescheduleEvent
reschedPolicy *structs.ReschedulePolicy
expectedRescheduleEvents []*structs.RescheduleEvent
reschedTime time.Time
}
testCases := []testCase{
{
desc: "No past events",
prevAllocEvents: nil,
reschedPolicy: &structs.ReschedulePolicy{Unlimited: false, Interval: 24 * time.Hour, Attempts: 2, Delay: 5 * time.Second},
reschedTime: t1,
expectedRescheduleEvents: []*structs.RescheduleEvent{{t1.UnixNano(), prevAlloc.ID, prevAlloc.NodeID, 5 * time.Second}},
},
{
desc: "one past event, linear delay",
prevAllocEvents: []*structs.RescheduleEvent{
{RescheduleTime: t1.Add(-1 * time.Hour).UnixNano(),
PrevAllocID: prevAlloc.ID,
PrevNodeID: prevAlloc.NodeID,
Delay: 5 * time.Second}},
reschedPolicy: &structs.ReschedulePolicy{Unlimited: false, Interval: 24 * time.Hour, Attempts: 2, Delay: 5 * time.Second},
reschedTime: t1,
expectedRescheduleEvents: []*structs.RescheduleEvent{
{
RescheduleTime: t1.Add(-1 * time.Hour).UnixNano(),
PrevAllocID: prevAlloc.ID,
PrevNodeID: prevAlloc.NodeID,
Delay: 5 * time.Second,
},
{
RescheduleTime: t1.UnixNano(),
PrevAllocID: prevAlloc.ID,
PrevNodeID: prevAlloc.NodeID,
Delay: 5 * time.Second,
},
},
},
{
desc: "one past event, fibonacci delay",
prevAllocEvents: []*structs.RescheduleEvent{
{RescheduleTime: t1.Add(-1 * time.Hour).UnixNano(),
PrevAllocID: prevAlloc.ID,
PrevNodeID: prevAlloc.NodeID,
Delay: 5 * time.Second}},
reschedPolicy: &structs.ReschedulePolicy{Unlimited: false, Interval: 24 * time.Hour, Attempts: 2, Delay: 5 * time.Second, DelayFunction: "fibonacci", DelayCeiling: 60 * time.Second},
reschedTime: t1,
expectedRescheduleEvents: []*structs.RescheduleEvent{
{
RescheduleTime: t1.Add(-1 * time.Hour).UnixNano(),
PrevAllocID: prevAlloc.ID,
PrevNodeID: prevAlloc.NodeID,
Delay: 5 * time.Second,
},
{
RescheduleTime: t1.UnixNano(),
PrevAllocID: prevAlloc.ID,
PrevNodeID: prevAlloc.NodeID,
Delay: 5 * time.Second,
},
},
},
{
desc: "eight past events, fibonacci delay, unlimited",
prevAllocEvents: []*structs.RescheduleEvent{
{
RescheduleTime: t1.Add(-1 * time.Hour).UnixNano(),
PrevAllocID: prevAlloc.ID,
PrevNodeID: prevAlloc.NodeID,
Delay: 5 * time.Second,
},
{
RescheduleTime: t1.Add(-1 * time.Hour).UnixNano(),
PrevAllocID: prevAlloc.ID,
PrevNodeID: prevAlloc.NodeID,
Delay: 5 * time.Second,
},
{
RescheduleTime: t1.Add(-1 * time.Hour).UnixNano(),
PrevAllocID: prevAlloc.ID,
PrevNodeID: prevAlloc.NodeID,
Delay: 10 * time.Second,
},
{
RescheduleTime: t1.Add(-1 * time.Hour).UnixNano(),
PrevAllocID: prevAlloc.ID,
PrevNodeID: prevAlloc.NodeID,
Delay: 15 * time.Second,
},
{
RescheduleTime: t1.Add(-1 * time.Hour).UnixNano(),
PrevAllocID: prevAlloc.ID,
PrevNodeID: prevAlloc.NodeID,
Delay: 25 * time.Second,
},
{
RescheduleTime: t1.Add(-1 * time.Hour).UnixNano(),
PrevAllocID: prevAlloc.ID,
PrevNodeID: prevAlloc.NodeID,
Delay: 40 * time.Second,
},
{
RescheduleTime: t1.Add(-1 * time.Hour).UnixNano(),
PrevAllocID: prevAlloc.ID,
PrevNodeID: prevAlloc.NodeID,
Delay: 65 * time.Second,
},
{
RescheduleTime: t1.Add(-1 * time.Hour).UnixNano(),
PrevAllocID: prevAlloc.ID,
PrevNodeID: prevAlloc.NodeID,
Delay: 105 * time.Second,
},
},
reschedPolicy: &structs.ReschedulePolicy{Unlimited: true, Delay: 5 * time.Second, DelayFunction: "fibonacci", DelayCeiling: 240 * time.Second},
reschedTime: t1,
expectedRescheduleEvents: []*structs.RescheduleEvent{
{
RescheduleTime: t1.Add(-1 * time.Hour).UnixNano(),
PrevAllocID: prevAlloc.ID,
PrevNodeID: prevAlloc.NodeID,
Delay: 15 * time.Second,
},
{
RescheduleTime: t1.Add(-1 * time.Hour).UnixNano(),
PrevAllocID: prevAlloc.ID,
PrevNodeID: prevAlloc.NodeID,
Delay: 25 * time.Second,
},
{
RescheduleTime: t1.Add(-1 * time.Hour).UnixNano(),
PrevAllocID: prevAlloc.ID,
PrevNodeID: prevAlloc.NodeID,
Delay: 40 * time.Second,
},
{
RescheduleTime: t1.Add(-1 * time.Hour).UnixNano(),
PrevAllocID: prevAlloc.ID,
PrevNodeID: prevAlloc.NodeID,
Delay: 65 * time.Second,
},
{
RescheduleTime: t1.Add(-1 * time.Hour).UnixNano(),
PrevAllocID: prevAlloc.ID,
PrevNodeID: prevAlloc.NodeID,
Delay: 105 * time.Second,
},
{
RescheduleTime: t1.UnixNano(),
PrevAllocID: prevAlloc.ID,
PrevNodeID: prevAlloc.NodeID,
Delay: 170 * time.Second,
},
},
},
{
desc: " old attempts past interval, exponential delay, limited",
prevAllocEvents: []*structs.RescheduleEvent{
{
RescheduleTime: t1.Add(-2 * time.Hour).UnixNano(),
PrevAllocID: prevAlloc.ID,
PrevNodeID: prevAlloc.NodeID,
Delay: 5 * time.Second,
},
{
RescheduleTime: t1.Add(-1 * time.Hour).UnixNano(),
PrevAllocID: prevAlloc.ID,
PrevNodeID: prevAlloc.NodeID,
Delay: 10 * time.Second,
},
{
RescheduleTime: t1.Add(-30 * time.Minute).UnixNano(),
PrevAllocID: prevAlloc.ID,
PrevNodeID: prevAlloc.NodeID,
Delay: 20 * time.Second,
},
{
RescheduleTime: t1.Add(-10 * time.Minute).UnixNano(),
PrevAllocID: prevAlloc.ID,
PrevNodeID: prevAlloc.NodeID,
Delay: 40 * time.Second,
},
},
reschedPolicy: &structs.ReschedulePolicy{Unlimited: false, Interval: 1 * time.Hour, Attempts: 5, Delay: 5 * time.Second, DelayFunction: "exponential", DelayCeiling: 240 * time.Second},
reschedTime: t1,
expectedRescheduleEvents: []*structs.RescheduleEvent{
{
RescheduleTime: t1.Add(-30 * time.Minute).UnixNano(),
PrevAllocID: prevAlloc.ID,
PrevNodeID: prevAlloc.NodeID,
Delay: 20 * time.Second,
},
{
RescheduleTime: t1.Add(-10 * time.Minute).UnixNano(),
PrevAllocID: prevAlloc.ID,
PrevNodeID: prevAlloc.NodeID,
Delay: 40 * time.Second,
},
{
RescheduleTime: t1.UnixNano(),
PrevAllocID: prevAlloc.ID,
PrevNodeID: prevAlloc.NodeID,
Delay: 80 * time.Second,
},
},
},
}
for _, tc := range testCases {
t.Run(tc.desc, func(t *testing.T) {
require := require.New(t)
prevAlloc.RescheduleTracker = &structs.RescheduleTracker{Events: tc.prevAllocEvents}
updateRescheduleTracker(alloc, prevAlloc, tc.reschedPolicy, tc.reschedTime)
require.Equal(tc.expectedRescheduleEvents, alloc.RescheduleTracker.Events)
})
}
}

View File

@ -5,7 +5,10 @@ import (
"log"
"time"
"sort"
"github.com/hashicorp/nomad/helper"
"github.com/hashicorp/nomad/helper/uuid"
"github.com/hashicorp/nomad/nomad/structs"
)
@ -92,7 +95,23 @@ type reconcileResults struct {
// followupEvalWait is set if there should be a followup eval run after the
// given duration
// Deprecated, the delay strategy that sets this is not available after nomad 0.7.0
followupEvalWait time.Duration
// desiredFollowupEvals is the map of follow up evaluations to create per task group
// This is used to create a delayed evaluation for rescheduling failed allocations.
desiredFollowupEvals map[string][]*structs.Evaluation
}
// delayedRescheduleInfo contains the allocation id and a time when its eligible to be rescheduled.
// this is used to create follow up evaluations
type delayedRescheduleInfo struct {
// allocID is the ID of the allocation eligible to be rescheduled
allocID string
// rescheduleTime is the time to use in the delayed evaluation
rescheduleTime time.Time
}
func (r *reconcileResults) GoString() string {
@ -136,7 +155,8 @@ func NewAllocReconciler(logger *log.Logger, allocUpdateFn allocUpdateType, batch
existingAllocs: existingAllocs,
taintedNodes: taintedNodes,
result: &reconcileResults{
desiredTGUpdates: make(map[string]*structs.DesiredUpdates),
desiredTGUpdates: make(map[string]*structs.DesiredUpdates),
desiredFollowupEvals: make(map[string][]*structs.Evaluation),
},
}
}
@ -270,6 +290,8 @@ func (a *allocReconciler) markStop(allocs allocSet, clientStatus, statusDescript
}
}
const batchedFailedAllocWindowSize = 5 * time.Second
// computeGroup reconciles state for a particular task group. It returns whether
// the deployment it is for is complete with regards to the task group.
func (a *allocReconciler) computeGroup(group string, all allocSet) bool {
@ -318,11 +340,18 @@ func (a *allocReconciler) computeGroup(group string, all allocSet) bool {
untainted, migrate, lost := all.filterByTainted(a.taintedNodes)
// Determine what set of terminal allocations need to be rescheduled
untainted, reschedule := untainted.filterByReschedulable(a.batch, tg.ReschedulePolicy)
untainted, rescheduleNow, rescheduleLater := untainted.filterByRescheduleable(a.batch, tg.ReschedulePolicy)
// Create batched follow up evaluations for allocations that are reschedulable later
rescheduleLaterAllocs := make(map[string]*structs.Allocation)
if len(rescheduleLater) > 0 {
rescheduleLaterAllocs = a.handleDelayedReschedules(rescheduleLater, all, tg.Name)
}
// Create a structure for choosing names. Seed with the taken names which is
// the union of untainted and migrating nodes (includes canaries)
nameIndex := newAllocNameIndex(a.jobID, group, tg.Count, untainted.union(migrate, reschedule))
nameIndex := newAllocNameIndex(a.jobID, group, tg.Count, untainted.union(migrate, rescheduleNow))
// Stop any unneeded allocations and update the untainted set to not
// included stopped allocations.
@ -341,7 +370,7 @@ func (a *allocReconciler) computeGroup(group string, all allocSet) bool {
// Do inplace upgrades where possible and capture the set of upgrades that
// need to be done destructively.
ignore, inplace, destructive := a.computeUpdates(tg, untainted)
ignore, inplace, destructive := a.computeUpdates(tg, untainted, rescheduleLaterAllocs)
desiredChanges.Ignore += uint64(len(ignore))
desiredChanges.InPlaceUpdate += uint64(len(inplace))
if !existingDeployment {
@ -379,7 +408,7 @@ func (a *allocReconciler) computeGroup(group string, all allocSet) bool {
// * The deployment is not paused or failed
// * Not placing any canaries
// * If there are any canaries that they have been promoted
place := a.computePlacements(tg, nameIndex, untainted, migrate, reschedule)
place := a.computePlacements(tg, nameIndex, untainted, migrate, rescheduleNow)
if !existingDeployment {
dstate.DesiredTotal += len(place)
}
@ -774,7 +803,7 @@ func (a *allocReconciler) computeStop(group *structs.TaskGroup, nameIndex *alloc
// 2. Those that can be upgraded in-place. These are added to the results
// automatically since the function contains the correct state to do so,
// 3. Those that require destructive updates
func (a *allocReconciler) computeUpdates(group *structs.TaskGroup, untainted allocSet) (ignore, inplace, destructive allocSet) {
func (a *allocReconciler) computeUpdates(group *structs.TaskGroup, untainted, rescheduleLaterAllocs allocSet) (ignore, inplace, destructive allocSet) {
// Determine the set of allocations that need to be updated
ignore = make(map[string]*structs.Allocation)
inplace = make(map[string]*structs.Allocation)
@ -782,7 +811,13 @@ func (a *allocReconciler) computeUpdates(group *structs.TaskGroup, untainted all
for _, alloc := range untainted {
ignoreChange, destructiveChange, inplaceAlloc := a.allocUpdateFn(alloc, a.job, group)
if ignoreChange {
// Also check if the alloc is marked for later rescheduling.
// If so it should be in the inplace list
reschedLaterAlloc, isRescheduleLater := rescheduleLaterAllocs[alloc.ID]
if isRescheduleLater {
inplace[alloc.ID] = alloc
a.result.inplaceUpdate = append(a.result.inplaceUpdate, reschedLaterAlloc)
} else if ignoreChange {
ignore[alloc.ID] = alloc
} else if destructiveChange {
destructive[alloc.ID] = alloc
@ -796,3 +831,71 @@ func (a *allocReconciler) computeUpdates(group *structs.TaskGroup, untainted all
return
}
func (a *allocReconciler) handleDelayedReschedules(rescheduleLater []*delayedRescheduleInfo, all allocSet, tgName string) allocSet {
// Sort by time
sort.Slice(rescheduleLater, func(i, j int) bool {
return rescheduleLater[i].rescheduleTime.Before(rescheduleLater[j].rescheduleTime)
})
var evals []*structs.Evaluation
var allocIDs []string
nextReschedTime := rescheduleLater[0].rescheduleTime
allocIDToFollowupEvalID := make(map[string]string, len(rescheduleLater))
for _, allocReschedInfo := range rescheduleLater {
if allocReschedInfo.rescheduleTime.UTC().UnixNano()-nextReschedTime.UTC().UnixNano() < batchedFailedAllocWindowSize.Nanoseconds() {
// add to batch
allocIDs = append(allocIDs, allocReschedInfo.allocID)
} else {
// create a new eval for the previous batch
eval := &structs.Evaluation{
ID: uuid.Generate(),
Namespace: a.job.Namespace,
Priority: a.job.Priority,
Type: a.job.Type,
TriggeredBy: structs.EvalTriggerRetryFailedAlloc,
JobID: a.job.ID,
JobModifyIndex: a.job.ModifyIndex,
Status: structs.EvalStatusPending,
WaitUntil: nextReschedTime,
}
evals = append(evals, eval)
for _, allocID := range allocIDs {
allocIDToFollowupEvalID[allocID] = eval.ID
}
nextReschedTime = allocReschedInfo.rescheduleTime
// clear out this batch and start it again
allocIDs = nil
allocIDs = append(allocIDs, allocReschedInfo.allocID)
}
}
// Deal with the last batch
if len(allocIDs) > 0 {
eval := &structs.Evaluation{
ID: uuid.Generate(),
Namespace: a.job.Namespace,
Priority: a.job.Priority,
Type: a.job.Type,
TriggeredBy: structs.EvalTriggerRetryFailedAlloc,
JobID: a.job.ID,
JobModifyIndex: a.job.ModifyIndex,
Status: structs.EvalStatusPending,
WaitUntil: nextReschedTime,
}
evals = append(evals, eval)
for _, allocID := range allocIDs {
allocIDToFollowupEvalID[allocID] = eval.ID
}
}
a.result.desiredFollowupEvals[tgName] = evals
// create inplace updates for every alloc ID that needs to be updated with its follow up eval ID
rescheduleLaterAllocs := make(map[string]*structs.Allocation)
for allocID, evalID := range allocIDToFollowupEvalID {
existingAlloc := all[allocID]
updatedAlloc := existingAlloc.Copy()
updatedAlloc.FollowupEvalID = evalID
rescheduleLaterAllocs[allocID] = updatedAlloc
}
return rescheduleLaterAllocs
}

View File

@ -15,6 +15,7 @@ import (
"github.com/hashicorp/nomad/nomad/mock"
"github.com/hashicorp/nomad/nomad/structs"
"github.com/kr/pretty"
"github.com/stretchr/testify/require"
)
/*
@ -38,9 +39,12 @@ Basic Tests:
Handle task group being removed
Handle job being stopped both as .Stopped and nil
Place more that one group
Handle rescheduling failed allocs for batch jobs
Handle rescheduling failed allocs for service jobs
Handle delayed rescheduling failed allocs for batch jobs
Handle delayed rescheduling failed allocs for service jobs
Handle eligible now rescheduling failed allocs for batch jobs
Handle eligible now rescheduling failed allocs for service jobs
Previously rescheduled allocs should not be rescheduled again
Aggregated evaluations for allocations that fail close together
Update stanza Tests:
Stopped job cancels any active deployment
@ -1203,15 +1207,17 @@ func TestReconciler_MultiTG(t *testing.T) {
assertNamesHaveIndexes(t, intRange(2, 9, 0, 9), placeResultsToNames(r.place))
}
// Tests rescheduling failed batch allocations
func TestReconciler_Reschedule_Batch(t *testing.T) {
// Tests delayed rescheduling of failed batch allocations
func TestReconciler_RescheduleLater_Batch(t *testing.T) {
require := require.New(t)
// Set desired 4
job := mock.Job()
job.TaskGroups[0].Count = 4
now := time.Now()
// Set up reschedule policy
job.TaskGroups[0].ReschedulePolicy = &structs.ReschedulePolicy{Attempts: 3, Interval: 24 * time.Hour}
delayDur := 15 * time.Second
job.TaskGroups[0].ReschedulePolicy = &structs.ReschedulePolicy{Attempts: 3, Interval: 24 * time.Hour, Delay: delayDur, DelayFunction: "linear"}
tgName := job.TaskGroups[0].Name
// Create 6 existing allocations - 2 running, 1 complete and 3 failed
var allocs []*structs.Allocation
for i := 0; i < 6; i++ {
@ -1235,6 +1241,9 @@ func TestReconciler_Reschedule_Batch(t *testing.T) {
}}
allocs[1].NextAllocation = allocs[2].ID
allocs[2].ClientStatus = structs.AllocClientStatusFailed
allocs[2].TaskStates = map[string]*structs.TaskState{tgName: {State: "start",
StartedAt: now.Add(-1 * time.Hour),
FinishedAt: now}}
allocs[2].RescheduleTracker = &structs.RescheduleTracker{Events: []*structs.RescheduleEvent{
{RescheduleTime: time.Now().Add(-2 * time.Hour).UTC().UnixNano(),
PrevAllocID: allocs[0].ID,
@ -1251,7 +1260,171 @@ func TestReconciler_Reschedule_Batch(t *testing.T) {
reconciler := NewAllocReconciler(testLogger(), allocUpdateFnIgnore, true, job.ID, job, nil, allocs, nil)
r := reconciler.Compute()
// Two reschedule attempts were made, one more can be made
// Two reschedule attempts were already made, one more can be made at a future time
// Verify that the follow up eval has the expected waitUntil time
evals := r.desiredFollowupEvals[tgName]
require.NotNil(evals)
require.Equal(1, len(evals))
require.Equal(now.Add(delayDur), evals[0].WaitUntil)
// Alloc 5 should not be replaced because it is terminal
assertResults(t, r, &resultExpectation{
createDeployment: nil,
deploymentUpdates: nil,
place: 0,
inplace: 1,
stop: 0,
desiredTGUpdates: map[string]*structs.DesiredUpdates{
job.TaskGroups[0].Name: {
Place: 0,
InPlaceUpdate: 1,
Ignore: 3,
},
},
})
assertNamesHaveIndexes(t, intRange(2, 2), allocsToNames(r.inplaceUpdate))
// verify that the followup evalID field is set correctly
r.inplaceUpdate[0].EvalID = evals[0].ID
}
// Tests delayed rescheduling of failed batch allocations and batching of allocs
// with fail times that are close together
func TestReconciler_RescheduleLaterWithBatchedEvals_Batch(t *testing.T) {
require := require.New(t)
// Set desired 4
job := mock.Job()
job.TaskGroups[0].Count = 10
now := time.Now()
// Set up reschedule policy
delayDur := 15 * time.Second
job.TaskGroups[0].ReschedulePolicy = &structs.ReschedulePolicy{Attempts: 3, Interval: 24 * time.Hour, Delay: delayDur, DelayFunction: "linear"}
tgName := job.TaskGroups[0].Name
// Create 10 existing allocations
var allocs []*structs.Allocation
for i := 0; i < 10; i++ {
alloc := mock.Alloc()
alloc.Job = job
alloc.JobID = job.ID
alloc.NodeID = uuid.Generate()
alloc.Name = structs.AllocName(job.ID, job.TaskGroups[0].Name, uint(i))
allocs = append(allocs, alloc)
alloc.ClientStatus = structs.AllocClientStatusRunning
}
// Mark 5 as failed with fail times very close together
for i := 0; i < 5; i++ {
allocs[i].ClientStatus = structs.AllocClientStatusFailed
allocs[i].TaskStates = map[string]*structs.TaskState{tgName: {State: "start",
StartedAt: now.Add(-1 * time.Hour),
FinishedAt: now.Add(time.Duration(50*i) * time.Millisecond)}}
}
// Mark two more as failed several seconds later
for i := 5; i < 7; i++ {
allocs[i].ClientStatus = structs.AllocClientStatusFailed
allocs[i].TaskStates = map[string]*structs.TaskState{tgName: {State: "start",
StartedAt: now.Add(-1 * time.Hour),
FinishedAt: now.Add(10 * time.Second)}}
}
reconciler := NewAllocReconciler(testLogger(), allocUpdateFnIgnore, true, job.ID, job, nil, allocs, nil)
r := reconciler.Compute()
// Verify that two follow up evals were created
evals := r.desiredFollowupEvals[tgName]
require.NotNil(evals)
require.Equal(2, len(evals))
// Verify expected WaitUntil values for both batched evals
require.Equal(now.Add(delayDur), evals[0].WaitUntil)
secondBatchDuration := delayDur + 10*time.Second
require.Equal(now.Add(secondBatchDuration), evals[1].WaitUntil)
// Alloc 5 should not be replaced because it is terminal
assertResults(t, r, &resultExpectation{
createDeployment: nil,
deploymentUpdates: nil,
place: 0,
inplace: 7,
stop: 0,
desiredTGUpdates: map[string]*structs.DesiredUpdates{
job.TaskGroups[0].Name: {
Place: 0,
InPlaceUpdate: 7,
Ignore: 3,
},
},
})
assertNamesHaveIndexes(t, intRange(0, 6), allocsToNames(r.inplaceUpdate))
// verify that the followup evalID field is set correctly
for _, alloc := range r.inplaceUpdate {
if allocNameToIndex(alloc.Name) < 5 {
require.Equal(evals[0].ID, alloc.FollowupEvalID)
} else if allocNameToIndex(alloc.Name) < 7 {
require.Equal(evals[1].ID, alloc.FollowupEvalID)
} else {
t.Fatalf("Unexpected alloc name in Inplace results %v", alloc.Name)
}
}
}
// Tests rescheduling failed batch allocations
func TestReconciler_RescheduleNow_Batch(t *testing.T) {
require := require.New(t)
// Set desired 4
job := mock.Job()
job.TaskGroups[0].Count = 4
now := time.Now()
// Set up reschedule policy
job.TaskGroups[0].ReschedulePolicy = &structs.ReschedulePolicy{Attempts: 3, Interval: 24 * time.Hour, Delay: 5 * time.Second, DelayFunction: "linear"}
tgName := job.TaskGroups[0].Name
// Create 6 existing allocations - 2 running, 1 complete and 3 failed
var allocs []*structs.Allocation
for i := 0; i < 6; i++ {
alloc := mock.Alloc()
alloc.Job = job
alloc.JobID = job.ID
alloc.NodeID = uuid.Generate()
alloc.Name = structs.AllocName(job.ID, job.TaskGroups[0].Name, uint(i))
allocs = append(allocs, alloc)
alloc.ClientStatus = structs.AllocClientStatusRunning
}
// Mark 3 as failed with restart tracking info
allocs[0].ClientStatus = structs.AllocClientStatusFailed
allocs[0].NextAllocation = allocs[1].ID
allocs[1].ClientStatus = structs.AllocClientStatusFailed
allocs[1].RescheduleTracker = &structs.RescheduleTracker{Events: []*structs.RescheduleEvent{
{RescheduleTime: time.Now().Add(-1 * time.Hour).UTC().UnixNano(),
PrevAllocID: allocs[0].ID,
PrevNodeID: uuid.Generate(),
},
}}
allocs[1].NextAllocation = allocs[2].ID
allocs[2].ClientStatus = structs.AllocClientStatusFailed
allocs[2].TaskStates = map[string]*structs.TaskState{tgName: {State: "start",
StartedAt: now.Add(-1 * time.Hour),
FinishedAt: now.Add(-10 * time.Second)}}
allocs[2].RescheduleTracker = &structs.RescheduleTracker{Events: []*structs.RescheduleEvent{
{RescheduleTime: time.Now().Add(-2 * time.Hour).UTC().UnixNano(),
PrevAllocID: allocs[0].ID,
PrevNodeID: uuid.Generate(),
},
{RescheduleTime: time.Now().Add(-1 * time.Hour).UTC().UnixNano(),
PrevAllocID: allocs[1].ID,
PrevNodeID: uuid.Generate(),
},
}}
// Mark one as complete
allocs[5].ClientStatus = structs.AllocClientStatusComplete
reconciler := NewAllocReconciler(testLogger(), allocUpdateFnIgnore, true, job.ID, job, nil, allocs, nil)
r := reconciler.Compute()
// Verify that no follow up evals were created
evals := r.desiredFollowupEvals[tgName]
require.Nil(evals)
// Two reschedule attempts were made, one more can be made now
// Alloc 5 should not be replaced because it is terminal
assertResults(t, r, &resultExpectation{
createDeployment: nil,
@ -1266,19 +1439,24 @@ func TestReconciler_Reschedule_Batch(t *testing.T) {
},
},
})
assertNamesHaveIndexes(t, intRange(2, 2), placeResultsToNames(r.place))
assertPlaceResultsHavePreviousAllocs(t, 1, r.place)
assertPlacementsAreRescheduled(t, 1, r.place)
}
// Tests rescheduling failed service allocations with desired state stop
func TestReconciler_Reschedule_Service(t *testing.T) {
func TestReconciler_RescheduleLater_Service(t *testing.T) {
require := require.New(t)
// Set desired 5
job := mock.Job()
job.TaskGroups[0].Count = 5
tgName := job.TaskGroups[0].Name
now := time.Now()
// Set up reschedule policy
job.TaskGroups[0].ReschedulePolicy = &structs.ReschedulePolicy{Attempts: 1, Interval: 24 * time.Hour}
delayDur := 15 * time.Second
job.TaskGroups[0].ReschedulePolicy = &structs.ReschedulePolicy{Attempts: 1, Interval: 24 * time.Hour, Delay: delayDur, DelayCeiling: 1 * time.Hour}
// Create 5 existing allocations
var allocs []*structs.Allocation
@ -1293,15 +1471,17 @@ func TestReconciler_Reschedule_Service(t *testing.T) {
}
// Mark two as failed
allocs[0].ClientStatus = structs.AllocClientStatusFailed
allocs[1].ClientStatus = structs.AllocClientStatusFailed
// Mark one of them as already rescheduled once
allocs[1].RescheduleTracker = &structs.RescheduleTracker{Events: []*structs.RescheduleEvent{
allocs[0].RescheduleTracker = &structs.RescheduleTracker{Events: []*structs.RescheduleEvent{
{RescheduleTime: time.Now().Add(-1 * time.Hour).UTC().UnixNano(),
PrevAllocID: uuid.Generate(),
PrevNodeID: uuid.Generate(),
},
}}
allocs[1].TaskStates = map[string]*structs.TaskState{tgName: {State: "start",
StartedAt: now.Add(-1 * time.Hour),
FinishedAt: now}}
allocs[1].ClientStatus = structs.AllocClientStatusFailed
// Mark one as desired state stop
allocs[4].DesiredStatus = structs.AllocDesiredStatusStop
@ -1309,7 +1489,81 @@ func TestReconciler_Reschedule_Service(t *testing.T) {
reconciler := NewAllocReconciler(testLogger(), allocUpdateFnIgnore, false, job.ID, job, nil, allocs, nil)
r := reconciler.Compute()
// Should place 2, one is rescheduled, one is past its reschedule limit and one is a new placement
// Should place a new placement and create a follow up eval for the delayed reschedule
// Verify that the follow up eval has the expected waitUntil time
evals := r.desiredFollowupEvals[tgName]
require.NotNil(evals)
require.Equal(1, len(evals))
require.Equal(now.Add(delayDur), evals[0].WaitUntil)
assertResults(t, r, &resultExpectation{
createDeployment: nil,
deploymentUpdates: nil,
place: 1,
inplace: 1,
stop: 0,
desiredTGUpdates: map[string]*structs.DesiredUpdates{
job.TaskGroups[0].Name: {
Place: 1,
InPlaceUpdate: 1,
Ignore: 3,
},
},
})
assertNamesHaveIndexes(t, intRange(4, 4), placeResultsToNames(r.place))
assertNamesHaveIndexes(t, intRange(1, 1), allocsToNames(r.inplaceUpdate))
// verify that the followup evalID field is set correctly
r.inplaceUpdate[0].EvalID = evals[0].ID
}
// Tests rescheduling failed service allocations with desired state stop
func TestReconciler_RescheduleNow_Service(t *testing.T) {
require := require.New(t)
// Set desired 5
job := mock.Job()
job.TaskGroups[0].Count = 5
tgName := job.TaskGroups[0].Name
now := time.Now()
// Set up reschedule policy
job.TaskGroups[0].ReschedulePolicy = &structs.ReschedulePolicy{Attempts: 1, Interval: 24 * time.Hour, Delay: 5 * time.Second, DelayCeiling: 1 * time.Hour}
// Create 5 existing allocations
var allocs []*structs.Allocation
for i := 0; i < 5; i++ {
alloc := mock.Alloc()
alloc.Job = job
alloc.JobID = job.ID
alloc.NodeID = uuid.Generate()
alloc.Name = structs.AllocName(job.ID, job.TaskGroups[0].Name, uint(i))
allocs = append(allocs, alloc)
alloc.ClientStatus = structs.AllocClientStatusRunning
}
// Mark two as failed
allocs[0].ClientStatus = structs.AllocClientStatusFailed
// Mark one of them as already rescheduled once
allocs[0].RescheduleTracker = &structs.RescheduleTracker{Events: []*structs.RescheduleEvent{
{RescheduleTime: time.Now().Add(-1 * time.Hour).UTC().UnixNano(),
PrevAllocID: uuid.Generate(),
PrevNodeID: uuid.Generate(),
},
}}
allocs[1].TaskStates = map[string]*structs.TaskState{tgName: {State: "start",
StartedAt: now.Add(-1 * time.Hour),
FinishedAt: now.Add(-10 * time.Second)}}
allocs[1].ClientStatus = structs.AllocClientStatusFailed
// Mark one as desired state stop
allocs[4].DesiredStatus = structs.AllocDesiredStatusStop
reconciler := NewAllocReconciler(testLogger(), allocUpdateFnIgnore, false, job.ID, job, nil, allocs, nil)
r := reconciler.Compute()
// Verify that no follow up evals were created
evals := r.desiredFollowupEvals[tgName]
require.Nil(evals)
// Verify that one rescheduled alloc and one replacement for terminal alloc were placed
assertResults(t, r, &resultExpectation{
createDeployment: nil,
deploymentUpdates: nil,
@ -1324,8 +1578,8 @@ func TestReconciler_Reschedule_Service(t *testing.T) {
},
})
assertNamesHaveIndexes(t, intRange(0, 0, 4, 4), placeResultsToNames(r.place))
// 2 rescheduled allocs should have previous allocs
assertNamesHaveIndexes(t, intRange(1, 1, 4, 4), placeResultsToNames(r.place))
// Rescheduled allocs should have previous allocs
assertPlaceResultsHavePreviousAllocs(t, 1, r.place)
assertPlacementsAreRescheduled(t, 1, r.place)
}
@ -3374,6 +3628,8 @@ func TestReconciler_FailedDeployment_DontReschedule(t *testing.T) {
job := mock.Job()
job.TaskGroups[0].Update = noCanaryUpdate
tgName := job.TaskGroups[0].Name
now := time.Now()
// Create an existing failed deployment that has some placed allocs
d := structs.NewDeployment(job)
d.Status = structs.DeploymentStatusFailed
@ -3394,8 +3650,17 @@ func TestReconciler_FailedDeployment_DontReschedule(t *testing.T) {
alloc.TaskGroup = job.TaskGroups[0].Name
allocs = append(allocs, alloc)
}
//create some allocations that are reschedulable now
allocs[2].ClientStatus = structs.AllocClientStatusFailed
allocs[2].TaskStates = map[string]*structs.TaskState{tgName: {State: "start",
StartedAt: now.Add(-1 * time.Hour),
FinishedAt: now.Add(-10 * time.Second)}}
allocs[3].ClientStatus = structs.AllocClientStatusFailed
allocs[3].TaskStates = map[string]*structs.TaskState{tgName: {State: "start",
StartedAt: now.Add(-1 * time.Hour),
FinishedAt: now.Add(-10 * time.Second)}}
reconciler := NewAllocReconciler(testLogger(), allocUpdateFnDestructive, false, job.ID, job, d, allocs, nil)
r := reconciler.Compute()
@ -3417,6 +3682,8 @@ func TestReconciler_FailedDeployment_DontReschedule(t *testing.T) {
func TestReconciler_DeploymentWithFailedAllocs_DontReschedule(t *testing.T) {
job := mock.Job()
job.TaskGroups[0].Update = noCanaryUpdate
tgName := job.TaskGroups[0].Name
now := time.Now()
// Mock deployment with failed allocs, but deployment watcher hasn't marked it as failed yet
d := structs.NewDeployment(job)
@ -3439,8 +3706,17 @@ func TestReconciler_DeploymentWithFailedAllocs_DontReschedule(t *testing.T) {
alloc.DeploymentID = d.ID
allocs = append(allocs, alloc)
}
// Create allocs that are reschedulable now
allocs[2].ClientStatus = structs.AllocClientStatusFailed
allocs[2].TaskStates = map[string]*structs.TaskState{tgName: {State: "start",
StartedAt: now.Add(-1 * time.Hour),
FinishedAt: now.Add(-10 * time.Second)}}
allocs[3].ClientStatus = structs.AllocClientStatusFailed
allocs[3].TaskStates = map[string]*structs.TaskState{tgName: {State: "start",
StartedAt: now.Add(-1 * time.Hour),
FinishedAt: now.Add(-10 * time.Second)}}
reconciler := NewAllocReconciler(testLogger(), allocUpdateFnDestructive, false, job.ID, job, d, allocs, nil)
r := reconciler.Compute()

View File

@ -227,14 +227,17 @@ func (a allocSet) filterByTainted(nodes map[string]*structs.Node) (untainted, mi
return
}
// filterByReschedulable filters the allocation set to return the set of allocations that are either
// terminal or running, and a set of allocations that must be rescheduled
func (a allocSet) filterByReschedulable(isBatch bool, reschedulePolicy *structs.ReschedulePolicy) (untainted, reschedule allocSet) {
// filterByRescheduleable filters the allocation set to return the set of allocations that are either
// terminal or running, and a set of allocations that must be rescheduled now. Allocations that can be rescheduled
// at a future time are also returned so that we can create follow up evaluations for them
func (a allocSet) filterByRescheduleable(isBatch bool, reschedulePolicy *structs.ReschedulePolicy) (untainted, rescheduleNow allocSet, rescheduleLater []*delayedRescheduleInfo) {
untainted = make(map[string]*structs.Allocation)
reschedule = make(map[string]*structs.Allocation)
rescheduleNow = make(map[string]*structs.Allocation)
now := time.Now()
for _, alloc := range a {
var isUntainted, eligibleNow, eligibleLater bool
var rescheduleTime time.Time
if isBatch {
// Allocs from batch jobs should be filtered when the desired status
// is terminal and the client did not finish or when the client
@ -249,26 +252,48 @@ func (a allocSet) filterByReschedulable(isBatch bool, reschedulePolicy *structs.
default:
}
if alloc.NextAllocation == "" {
if alloc.ShouldReschedule(reschedulePolicy, now) {
reschedule[alloc.ID] = alloc
} else {
untainted[alloc.ID] = alloc
}
//ignore allocs that have already been rescheduled
isUntainted, eligibleNow, eligibleLater, rescheduleTime = updateByReschedulable(alloc, reschedulePolicy, now, true)
}
} else {
//ignore allocs that have already been rescheduled
if alloc.NextAllocation == "" {
// ignore allocs whose desired state is stop/evict
// everything else is either reschedulable or untainted
if alloc.ShouldReschedule(reschedulePolicy, now) {
reschedule[alloc.ID] = alloc
} else if alloc.DesiredStatus != structs.AllocDesiredStatusStop && alloc.DesiredStatus != structs.AllocDesiredStatusEvict {
untainted[alloc.ID] = alloc
}
isUntainted, eligibleNow, eligibleLater, rescheduleTime = updateByReschedulable(alloc, reschedulePolicy, now, false)
}
}
if isUntainted {
untainted[alloc.ID] = alloc
}
if eligibleNow {
rescheduleNow[alloc.ID] = alloc
} else if eligibleLater {
rescheduleLater = append(rescheduleLater, &delayedRescheduleInfo{alloc.ID, rescheduleTime})
}
}
return
}
// updateByReschedulable is a helper method that encapsulates logic for whether a failed allocation
// should be rescheduled now, later or left in the untainted set
func updateByReschedulable(alloc *structs.Allocation, reschedulePolicy *structs.ReschedulePolicy, now time.Time, batch bool) (untainted, rescheduleNow, rescheduleLater bool, rescheduleTime time.Time) {
shouldAllow := true
if !batch {
// for service type jobs we ignore allocs whose desired state is stop/evict
// everything else is either rescheduleable or untainted
shouldAllow = alloc.DesiredStatus != structs.AllocDesiredStatusStop && alloc.DesiredStatus != structs.AllocDesiredStatusEvict
}
rescheduleTime, eligible := alloc.NextRescheduleTime(reschedulePolicy)
timeDiff := rescheduleTime.UTC().UnixNano() - now.UTC().UnixNano()
// we consider a time difference of less than 5 seconds to be eligible
// because we collapse allocations that failed within 5 seconds into a single evaluation
if eligible && timeDiff < batchedFailedAllocWindowSize.Nanoseconds() {
rescheduleNow = true
} else if shouldAllow {
untainted = true
if eligible && alloc.FollowupEvalID == "" {
rescheduleLater = true
}
}
return
}