Fixes edge cases around timing and task finish time being set more than once

This commit is contained in:
Preetha Appan 2018-04-03 15:49:18 -05:00
parent a78e29520b
commit 00537c739b
No known key found for this signature in database
GPG Key ID: 9F7C19990A50EAFC
7 changed files with 270 additions and 95 deletions

View File

@ -526,6 +526,23 @@ func copyTaskStates(states map[string]*structs.TaskState) map[string]*structs.Ta
// Alloc returns the associated allocation
func (r *AllocRunner) Alloc() *structs.Allocation {
r.allocLock.Lock()
if r.alloc.TerminalStatus() {
group := r.alloc.Job.LookupTaskGroup(r.alloc.TaskGroup)
if r.alloc.TaskStates == nil {
r.alloc.TaskStates = make(map[string]*structs.TaskState)
}
now := time.Now()
for _, task := range group.Tasks {
ts, ok := r.alloc.TaskStates[task.Name]
if !ok {
ts = &structs.TaskState{}
r.alloc.TaskStates[task.Name] = ts
}
if ts.FinishedAt.IsZero() {
ts.FinishedAt = now
}
}
}
// Don't do a deep copy of the job
alloc := r.alloc.CopySkipJob()
@ -715,8 +732,10 @@ func (r *AllocRunner) setTaskState(taskName, state string, event *structs.TaskEv
}
}
case structs.TaskStateDead:
// Capture the finished time.
taskState.FinishedAt = time.Now().UTC()
// Capture the finished time if not already set
if taskState.FinishedAt.IsZero() {
taskState.FinishedAt = time.Now().UTC()
}
// Find all tasks that are not the one that is dead and check if the one
// that is dead is a leader

View File

@ -5763,16 +5763,7 @@ func (a *Allocation) LastEventTime() time.Time {
}
}
}
// If no tasks have FinsihedAt set, examine task events
if lastEventTime.IsZero() {
for _, s := range a.TaskStates {
for _, e := range s.Events {
if lastEventTime.IsZero() || e.Time > lastEventTime.UnixNano() {
lastEventTime = time.Unix(0, e.Time).UTC()
}
}
}
}
if lastEventTime.IsZero() {
return time.Unix(0, a.ModifyTime).UTC()
}

View File

@ -2692,7 +2692,7 @@ func TestAllocation_LastEventTime(t *testing.T) {
expectedLastEventTime: t1.Add(-40 * time.Minute),
},
{
desc: "No finishedAt set, one task event",
desc: "No finishedAt set, one task event, should use modify time",
taskState: map[string]*TaskState{"foo": {
State: "run",
StartedAt: t1.Add(-2 * time.Hour),
@ -2700,19 +2700,7 @@ func TestAllocation_LastEventTime(t *testing.T) {
{Type: "start", Time: t1.Add(-20 * time.Minute).UnixNano()},
}},
},
expectedLastEventTime: t1.Add(-20 * time.Minute),
},
{
desc: "No finishedAt set, many task events",
taskState: map[string]*TaskState{"foo": {
State: "run",
StartedAt: t1.Add(-2 * time.Hour),
Events: []*TaskEvent{
{Type: "start", Time: t1.Add(-20 * time.Minute).UnixNano()},
{Type: "status change", Time: t1.Add(-10 * time.Minute).UnixNano()},
}},
},
expectedLastEventTime: t1.Add(-10 * time.Minute),
expectedLastEventTime: t1,
},
}
for _, tc := range testCases {

View File

@ -330,7 +330,7 @@ func (s *GenericScheduler) computeJobAllocs() error {
reconciler := NewAllocReconciler(s.ctx.Logger(),
genericAllocUpdateFn(s.ctx, s.stack, s.eval.ID),
s.batch, s.eval.JobID, s.job, s.deployment, allocs, tainted)
s.batch, s.eval.JobID, s.job, s.deployment, allocs, tainted, s.eval.ID)
results := reconciler.Compute()
s.logger.Printf("[DEBUG] sched: %#v: %#v", s.eval, results)

View File

@ -16,6 +16,11 @@ const (
// batchedFailedAllocWindowSize is the window size used
// to batch up failed allocations before creating an eval
batchedFailedAllocWindowSize = 5 * time.Second
// rescheduleTimeLapseWindowSize is the window size relative to
// current time within which reschedulable allocations are placed.
// This helps protect against small clock drifts between servers
rescheduleTimeLapseWindowSize = 1 * time.Second
)
// allocUpdateType takes an existing allocation and a new job definition and
@ -67,6 +72,13 @@ type allocReconciler struct {
// existingAllocs is non-terminal existing allocations
existingAllocs []*structs.Allocation
// evalID is the ID of the evaluation that triggered the reconciler
evalID string
// now is used to override current time used when determining rescheduling eligibility
// used in unit tests
now time.Time
// result is the results of the reconcile. During computation it can be
// used to store intermediate state
result *reconcileResults
@ -145,7 +157,7 @@ func (r *reconcileResults) Changes() int {
// the changes required to bring the cluster state inline with the declared jobspec
func NewAllocReconciler(logger *log.Logger, allocUpdateFn allocUpdateType, batch bool,
jobID string, job *structs.Job, deployment *structs.Deployment,
existingAllocs []*structs.Allocation, taintedNodes map[string]*structs.Node) *allocReconciler {
existingAllocs []*structs.Allocation, taintedNodes map[string]*structs.Node, evalID string) *allocReconciler {
return &allocReconciler{
logger: logger,
@ -156,6 +168,7 @@ func NewAllocReconciler(logger *log.Logger, allocUpdateFn allocUpdateType, batch
deployment: deployment.Copy(),
existingAllocs: existingAllocs,
taintedNodes: taintedNodes,
evalID: evalID,
result: &reconcileResults{
desiredTGUpdates: make(map[string]*structs.DesiredUpdates),
desiredFollowupEvals: make(map[string][]*structs.Evaluation),
@ -340,8 +353,12 @@ func (a *allocReconciler) computeGroup(group string, all allocSet) bool {
// Determine what set of allocations are on tainted nodes
untainted, migrate, lost := all.filterByTainted(a.taintedNodes)
if a.now.IsZero() {
a.now = time.Now()
}
// Determine what set of terminal allocations need to be rescheduled
untainted, rescheduleNow, rescheduleLater := untainted.filterByRescheduleable(a.batch)
untainted, rescheduleNow, rescheduleLater := untainted.filterByRescheduleable(a.batch, a.now, a.evalID)
// Create batched follow up evaluations for allocations that are
// reschedulable later and mark the allocations for in place updating

View File

@ -346,7 +346,7 @@ func assertResults(t *testing.T, r *reconcileResults, exp *resultExpectation) {
// existing allocations
func TestReconciler_Place_NoExisting(t *testing.T) {
job := mock.Job()
reconciler := NewAllocReconciler(testLogger(), allocUpdateFnIgnore, false, job.ID, job, nil, nil, nil)
reconciler := NewAllocReconciler(testLogger(), allocUpdateFnIgnore, false, job.ID, job, nil, nil, nil, "")
r := reconciler.Compute()
// Assert the correct results
@ -382,7 +382,7 @@ func TestReconciler_Place_Existing(t *testing.T) {
allocs = append(allocs, alloc)
}
reconciler := NewAllocReconciler(testLogger(), allocUpdateFnIgnore, false, job.ID, job, nil, allocs, nil)
reconciler := NewAllocReconciler(testLogger(), allocUpdateFnIgnore, false, job.ID, job, nil, allocs, nil, "")
r := reconciler.Compute()
// Assert the correct results
@ -420,7 +420,7 @@ func TestReconciler_ScaleDown_Partial(t *testing.T) {
allocs = append(allocs, alloc)
}
reconciler := NewAllocReconciler(testLogger(), allocUpdateFnIgnore, false, job.ID, job, nil, allocs, nil)
reconciler := NewAllocReconciler(testLogger(), allocUpdateFnIgnore, false, job.ID, job, nil, allocs, nil, "")
r := reconciler.Compute()
// Assert the correct results
@ -459,7 +459,7 @@ func TestReconciler_ScaleDown_Zero(t *testing.T) {
allocs = append(allocs, alloc)
}
reconciler := NewAllocReconciler(testLogger(), allocUpdateFnIgnore, false, job.ID, job, nil, allocs, nil)
reconciler := NewAllocReconciler(testLogger(), allocUpdateFnIgnore, false, job.ID, job, nil, allocs, nil, "")
r := reconciler.Compute()
// Assert the correct results
@ -499,7 +499,7 @@ func TestReconciler_ScaleDown_Zero_DuplicateNames(t *testing.T) {
expectedStopped = append(expectedStopped, i%2)
}
reconciler := NewAllocReconciler(testLogger(), allocUpdateFnIgnore, false, job.ID, job, nil, allocs, nil)
reconciler := NewAllocReconciler(testLogger(), allocUpdateFnIgnore, false, job.ID, job, nil, allocs, nil, "")
r := reconciler.Compute()
// Assert the correct results
@ -534,7 +534,7 @@ func TestReconciler_Inplace(t *testing.T) {
allocs = append(allocs, alloc)
}
reconciler := NewAllocReconciler(testLogger(), allocUpdateFnInplace, false, job.ID, job, nil, allocs, nil)
reconciler := NewAllocReconciler(testLogger(), allocUpdateFnInplace, false, job.ID, job, nil, allocs, nil, "")
r := reconciler.Compute()
// Assert the correct results
@ -572,7 +572,7 @@ func TestReconciler_Inplace_ScaleUp(t *testing.T) {
allocs = append(allocs, alloc)
}
reconciler := NewAllocReconciler(testLogger(), allocUpdateFnInplace, false, job.ID, job, nil, allocs, nil)
reconciler := NewAllocReconciler(testLogger(), allocUpdateFnInplace, false, job.ID, job, nil, allocs, nil, "")
r := reconciler.Compute()
// Assert the correct results
@ -612,7 +612,7 @@ func TestReconciler_Inplace_ScaleDown(t *testing.T) {
allocs = append(allocs, alloc)
}
reconciler := NewAllocReconciler(testLogger(), allocUpdateFnInplace, false, job.ID, job, nil, allocs, nil)
reconciler := NewAllocReconciler(testLogger(), allocUpdateFnInplace, false, job.ID, job, nil, allocs, nil, "")
r := reconciler.Compute()
// Assert the correct results
@ -649,7 +649,7 @@ func TestReconciler_Destructive(t *testing.T) {
allocs = append(allocs, alloc)
}
reconciler := NewAllocReconciler(testLogger(), allocUpdateFnDestructive, false, job.ID, job, nil, allocs, nil)
reconciler := NewAllocReconciler(testLogger(), allocUpdateFnDestructive, false, job.ID, job, nil, allocs, nil, "")
r := reconciler.Compute()
// Assert the correct results
@ -685,7 +685,7 @@ func TestReconciler_Destructive_ScaleUp(t *testing.T) {
allocs = append(allocs, alloc)
}
reconciler := NewAllocReconciler(testLogger(), allocUpdateFnDestructive, false, job.ID, job, nil, allocs, nil)
reconciler := NewAllocReconciler(testLogger(), allocUpdateFnDestructive, false, job.ID, job, nil, allocs, nil, "")
r := reconciler.Compute()
// Assert the correct results
@ -724,7 +724,7 @@ func TestReconciler_Destructive_ScaleDown(t *testing.T) {
allocs = append(allocs, alloc)
}
reconciler := NewAllocReconciler(testLogger(), allocUpdateFnDestructive, false, job.ID, job, nil, allocs, nil)
reconciler := NewAllocReconciler(testLogger(), allocUpdateFnDestructive, false, job.ID, job, nil, allocs, nil, "")
r := reconciler.Compute()
// Assert the correct results
@ -769,7 +769,7 @@ func TestReconciler_LostNode(t *testing.T) {
tainted[n.ID] = n
}
reconciler := NewAllocReconciler(testLogger(), allocUpdateFnIgnore, false, job.ID, job, nil, allocs, tainted)
reconciler := NewAllocReconciler(testLogger(), allocUpdateFnIgnore, false, job.ID, job, nil, allocs, tainted, "")
r := reconciler.Compute()
// Assert the correct results
@ -819,7 +819,7 @@ func TestReconciler_LostNode_ScaleUp(t *testing.T) {
tainted[n.ID] = n
}
reconciler := NewAllocReconciler(testLogger(), allocUpdateFnIgnore, false, job.ID, job, nil, allocs, tainted)
reconciler := NewAllocReconciler(testLogger(), allocUpdateFnIgnore, false, job.ID, job, nil, allocs, tainted, "")
r := reconciler.Compute()
// Assert the correct results
@ -869,7 +869,7 @@ func TestReconciler_LostNode_ScaleDown(t *testing.T) {
tainted[n.ID] = n
}
reconciler := NewAllocReconciler(testLogger(), allocUpdateFnIgnore, false, job.ID, job, nil, allocs, tainted)
reconciler := NewAllocReconciler(testLogger(), allocUpdateFnIgnore, false, job.ID, job, nil, allocs, tainted, "")
r := reconciler.Compute()
// Assert the correct results
@ -915,7 +915,7 @@ func TestReconciler_DrainNode(t *testing.T) {
tainted[n.ID] = n
}
reconciler := NewAllocReconciler(testLogger(), allocUpdateFnIgnore, false, job.ID, job, nil, allocs, tainted)
reconciler := NewAllocReconciler(testLogger(), allocUpdateFnIgnore, false, job.ID, job, nil, allocs, tainted, "")
r := reconciler.Compute()
// Assert the correct results
@ -968,7 +968,7 @@ func TestReconciler_DrainNode_ScaleUp(t *testing.T) {
tainted[n.ID] = n
}
reconciler := NewAllocReconciler(testLogger(), allocUpdateFnIgnore, false, job.ID, job, nil, allocs, tainted)
reconciler := NewAllocReconciler(testLogger(), allocUpdateFnIgnore, false, job.ID, job, nil, allocs, tainted, "")
r := reconciler.Compute()
// Assert the correct results
@ -1022,7 +1022,7 @@ func TestReconciler_DrainNode_ScaleDown(t *testing.T) {
tainted[n.ID] = n
}
reconciler := NewAllocReconciler(testLogger(), allocUpdateFnIgnore, false, job.ID, job, nil, allocs, tainted)
reconciler := NewAllocReconciler(testLogger(), allocUpdateFnIgnore, false, job.ID, job, nil, allocs, tainted, "")
r := reconciler.Compute()
// Assert the correct results
@ -1067,7 +1067,7 @@ func TestReconciler_RemovedTG(t *testing.T) {
newName := "different"
job.TaskGroups[0].Name = newName
reconciler := NewAllocReconciler(testLogger(), allocUpdateFnIgnore, false, job.ID, job, nil, allocs, nil)
reconciler := NewAllocReconciler(testLogger(), allocUpdateFnIgnore, false, job.ID, job, nil, allocs, nil, "")
r := reconciler.Compute()
// Assert the correct results
@ -1129,7 +1129,7 @@ func TestReconciler_JobStopped(t *testing.T) {
allocs = append(allocs, alloc)
}
reconciler := NewAllocReconciler(testLogger(), allocUpdateFnIgnore, false, c.jobID, c.job, nil, allocs, nil)
reconciler := NewAllocReconciler(testLogger(), allocUpdateFnIgnore, false, c.jobID, c.job, nil, allocs, nil, "")
r := reconciler.Compute()
// Assert the correct results
@ -1169,7 +1169,7 @@ func TestReconciler_MultiTG(t *testing.T) {
allocs = append(allocs, alloc)
}
reconciler := NewAllocReconciler(testLogger(), allocUpdateFnIgnore, false, job.ID, job, nil, allocs, nil)
reconciler := NewAllocReconciler(testLogger(), allocUpdateFnIgnore, false, job.ID, job, nil, allocs, nil, "")
r := reconciler.Compute()
// Assert the correct results
@ -1248,7 +1248,7 @@ func TestReconciler_RescheduleLater_Batch(t *testing.T) {
// Mark one as complete
allocs[5].ClientStatus = structs.AllocClientStatusComplete
reconciler := NewAllocReconciler(testLogger(), allocUpdateFnIgnore, true, job.ID, job, nil, allocs, nil)
reconciler := NewAllocReconciler(testLogger(), allocUpdateFnIgnore, true, job.ID, job, nil, allocs, nil, "")
r := reconciler.Compute()
// Two reschedule attempts were already made, one more can be made at a future time
@ -1327,7 +1327,7 @@ func TestReconciler_RescheduleLaterWithBatchedEvals_Batch(t *testing.T) {
FinishedAt: now.Add(10 * time.Second)}}
}
reconciler := NewAllocReconciler(testLogger(), allocUpdateFnIgnore, true, job.ID, job, nil, allocs, nil)
reconciler := NewAllocReconciler(testLogger(), allocUpdateFnIgnore, true, job.ID, job, nil, allocs, nil, "")
r := reconciler.Compute()
// Verify that two follow up evals were created
@ -1405,7 +1405,8 @@ func TestReconciler_RescheduleNow_Batch(t *testing.T) {
allocs[2].ClientStatus = structs.AllocClientStatusFailed
allocs[2].TaskStates = map[string]*structs.TaskState{tgName: {State: "start",
StartedAt: now.Add(-1 * time.Hour),
FinishedAt: now.Add(-10 * time.Second)}}
FinishedAt: now.Add(-5 * time.Second)}}
allocs[2].FollowupEvalID = uuid.Generate()
allocs[2].RescheduleTracker = &structs.RescheduleTracker{Events: []*structs.RescheduleEvent{
{RescheduleTime: time.Now().Add(-2 * time.Hour).UTC().UnixNano(),
PrevAllocID: allocs[0].ID,
@ -1419,7 +1420,8 @@ func TestReconciler_RescheduleNow_Batch(t *testing.T) {
// Mark one as complete
allocs[5].ClientStatus = structs.AllocClientStatusComplete
reconciler := NewAllocReconciler(testLogger(), allocUpdateFnIgnore, true, job.ID, job, nil, allocs, nil)
reconciler := NewAllocReconciler(testLogger(), allocUpdateFnIgnore, true, job.ID, job, nil, allocs, nil, "")
reconciler.now = now
r := reconciler.Compute()
// Verify that no follow up evals were created
@ -1492,7 +1494,7 @@ func TestReconciler_RescheduleLater_Service(t *testing.T) {
// Mark one as desired state stop
allocs[4].DesiredStatus = structs.AllocDesiredStatusStop
reconciler := NewAllocReconciler(testLogger(), allocUpdateFnIgnore, false, job.ID, job, nil, allocs, nil)
reconciler := NewAllocReconciler(testLogger(), allocUpdateFnIgnore, false, job.ID, job, nil, allocs, nil, "")
r := reconciler.Compute()
// Should place a new placement and create a follow up eval for the delayed reschedule
@ -1560,7 +1562,7 @@ func TestReconciler_Service_ClientStatusComplete(t *testing.T) {
// Mark one as client status complete
allocs[4].ClientStatus = structs.AllocClientStatusComplete
reconciler := NewAllocReconciler(testLogger(), allocUpdateFnIgnore, false, job.ID, job, nil, allocs, nil)
reconciler := NewAllocReconciler(testLogger(), allocUpdateFnIgnore, false, job.ID, job, nil, allocs, nil, "")
r := reconciler.Compute()
// Should place a new placement for the alloc that was marked complete
@ -1616,7 +1618,7 @@ func TestReconciler_Service_DesiredStop_ClientStatusComplete(t *testing.T) {
allocs[4].ClientStatus = structs.AllocClientStatusFailed
allocs[4].DesiredStatus = structs.AllocDesiredStatusStop
reconciler := NewAllocReconciler(testLogger(), allocUpdateFnIgnore, false, job.ID, job, nil, allocs, nil)
reconciler := NewAllocReconciler(testLogger(), allocUpdateFnIgnore, false, job.ID, job, nil, allocs, nil, "")
r := reconciler.Compute()
// Should place a new placement for the alloc that was marked stopped
@ -1693,7 +1695,7 @@ func TestReconciler_RescheduleNow_Service(t *testing.T) {
// Mark one as desired state stop
allocs[4].DesiredStatus = structs.AllocDesiredStatusStop
reconciler := NewAllocReconciler(testLogger(), allocUpdateFnIgnore, false, job.ID, job, nil, allocs, nil)
reconciler := NewAllocReconciler(testLogger(), allocUpdateFnIgnore, false, job.ID, job, nil, allocs, nil, "")
r := reconciler.Compute()
// Verify that no follow up evals were created
@ -1721,6 +1723,164 @@ func TestReconciler_RescheduleNow_Service(t *testing.T) {
assertPlacementsAreRescheduled(t, 1, r.place)
}
// Tests rescheduling failed service allocations when there's clock drift (upto a second)
func TestReconciler_RescheduleNow_WithinAllowedTimeWindow(t *testing.T) {
require := require.New(t)
// Set desired 5
job := mock.Job()
job.TaskGroups[0].Count = 5
tgName := job.TaskGroups[0].Name
now := time.Now()
// Set up reschedule policy and update stanza
job.TaskGroups[0].ReschedulePolicy = &structs.ReschedulePolicy{
Attempts: 1,
Interval: 24 * time.Hour,
Delay: 5 * time.Second,
DelayFunction: "",
MaxDelay: 1 * time.Hour,
Unlimited: false,
}
job.TaskGroups[0].Update = noCanaryUpdate
// Create 5 existing allocations
var allocs []*structs.Allocation
for i := 0; i < 5; i++ {
alloc := mock.Alloc()
alloc.Job = job
alloc.JobID = job.ID
alloc.NodeID = uuid.Generate()
alloc.Name = structs.AllocName(job.ID, job.TaskGroups[0].Name, uint(i))
allocs = append(allocs, alloc)
alloc.ClientStatus = structs.AllocClientStatusRunning
}
// Mark one as failed
allocs[0].ClientStatus = structs.AllocClientStatusFailed
// Mark one of them as already rescheduled once
allocs[0].RescheduleTracker = &structs.RescheduleTracker{Events: []*structs.RescheduleEvent{
{RescheduleTime: time.Now().Add(-1 * time.Hour).UTC().UnixNano(),
PrevAllocID: uuid.Generate(),
PrevNodeID: uuid.Generate(),
},
}}
// Set fail time to 4 seconds ago which falls within the reschedule window
allocs[1].TaskStates = map[string]*structs.TaskState{tgName: {State: "start",
StartedAt: now.Add(-1 * time.Hour),
FinishedAt: now.Add(-4 * time.Second)}}
allocs[1].ClientStatus = structs.AllocClientStatusFailed
reconciler := NewAllocReconciler(testLogger(), allocUpdateFnIgnore, false, job.ID, job, nil, allocs, nil, "")
reconciler.now = now
r := reconciler.Compute()
// Verify that no follow up evals were created
evals := r.desiredFollowupEvals[tgName]
require.Nil(evals)
// Verify that one rescheduled alloc was placed
assertResults(t, r, &resultExpectation{
createDeployment: nil,
deploymentUpdates: nil,
place: 1,
inplace: 0,
stop: 0,
desiredTGUpdates: map[string]*structs.DesiredUpdates{
job.TaskGroups[0].Name: {
Place: 1,
Ignore: 4,
},
},
})
// Rescheduled allocs should have previous allocs
assertNamesHaveIndexes(t, intRange(1, 1), placeResultsToNames(r.place))
assertPlaceResultsHavePreviousAllocs(t, 1, r.place)
assertPlacementsAreRescheduled(t, 1, r.place)
}
// Tests rescheduling failed service allocations when the eval ID matches and there's a large clock drift
func TestReconciler_RescheduleNow_EvalIDMatch(t *testing.T) {
require := require.New(t)
// Set desired 5
job := mock.Job()
job.TaskGroups[0].Count = 5
tgName := job.TaskGroups[0].Name
now := time.Now()
// Set up reschedule policy and update stanza
job.TaskGroups[0].ReschedulePolicy = &structs.ReschedulePolicy{
Attempts: 1,
Interval: 24 * time.Hour,
Delay: 5 * time.Second,
DelayFunction: "",
MaxDelay: 1 * time.Hour,
Unlimited: false,
}
job.TaskGroups[0].Update = noCanaryUpdate
// Create 5 existing allocations
var allocs []*structs.Allocation
for i := 0; i < 5; i++ {
alloc := mock.Alloc()
alloc.Job = job
alloc.JobID = job.ID
alloc.NodeID = uuid.Generate()
alloc.Name = structs.AllocName(job.ID, job.TaskGroups[0].Name, uint(i))
allocs = append(allocs, alloc)
alloc.ClientStatus = structs.AllocClientStatusRunning
}
// Mark one as failed
allocs[0].ClientStatus = structs.AllocClientStatusFailed
// Mark one of them as already rescheduled once
allocs[0].RescheduleTracker = &structs.RescheduleTracker{Events: []*structs.RescheduleEvent{
{RescheduleTime: time.Now().Add(-1 * time.Hour).UTC().UnixNano(),
PrevAllocID: uuid.Generate(),
PrevNodeID: uuid.Generate(),
},
}}
// Set fail time to 5 seconds ago and eval ID
evalID := uuid.Generate()
allocs[1].TaskStates = map[string]*structs.TaskState{tgName: {State: "start",
StartedAt: now.Add(-1 * time.Hour),
FinishedAt: now.Add(-5 * time.Second)}}
allocs[1].ClientStatus = structs.AllocClientStatusFailed
allocs[1].FollowupEvalID = evalID
reconciler := NewAllocReconciler(testLogger(), allocUpdateFnIgnore, false, job.ID, job, nil, allocs, nil, evalID)
reconciler.now = now.Add(-30 * time.Second)
r := reconciler.Compute()
// Verify that no follow up evals were created
evals := r.desiredFollowupEvals[tgName]
require.Nil(evals)
// Verify that one rescheduled alloc was placed
assertResults(t, r, &resultExpectation{
createDeployment: nil,
deploymentUpdates: nil,
place: 1,
inplace: 0,
stop: 0,
desiredTGUpdates: map[string]*structs.DesiredUpdates{
job.TaskGroups[0].Name: {
Place: 1,
Ignore: 4,
},
},
})
// Rescheduled allocs should have previous allocs
assertNamesHaveIndexes(t, intRange(1, 1), placeResultsToNames(r.place))
assertPlaceResultsHavePreviousAllocs(t, 1, r.place)
assertPlacementsAreRescheduled(t, 1, r.place)
}
// Tests failed service allocations that were already rescheduled won't be rescheduled again
func TestReconciler_DontReschedule_PreviouslyRescheduled(t *testing.T) {
// Set desired 5
@ -1756,7 +1916,7 @@ func TestReconciler_DontReschedule_PreviouslyRescheduled(t *testing.T) {
// Mark one as desired state stop
allocs[4].DesiredStatus = structs.AllocDesiredStatusStop
reconciler := NewAllocReconciler(testLogger(), allocUpdateFnIgnore, false, job.ID, job, nil, allocs, nil)
reconciler := NewAllocReconciler(testLogger(), allocUpdateFnIgnore, false, job.ID, job, nil, allocs, nil, "")
r := reconciler.Compute()
// Should place 1 - one is a new placement to make up the desired count of 5
@ -1843,7 +2003,7 @@ func TestReconciler_CancelDeployment_JobStop(t *testing.T) {
allocs = append(allocs, alloc)
}
reconciler := NewAllocReconciler(testLogger(), allocUpdateFnIgnore, false, c.jobID, c.job, c.deployment, allocs, nil)
reconciler := NewAllocReconciler(testLogger(), allocUpdateFnIgnore, false, c.jobID, c.job, c.deployment, allocs, nil, "")
r := reconciler.Compute()
var updates []*structs.DeploymentStatusUpdate
@ -1920,7 +2080,7 @@ func TestReconciler_CancelDeployment_JobUpdate(t *testing.T) {
allocs = append(allocs, alloc)
}
reconciler := NewAllocReconciler(testLogger(), allocUpdateFnIgnore, false, job.ID, job, c.deployment, allocs, nil)
reconciler := NewAllocReconciler(testLogger(), allocUpdateFnIgnore, false, job.ID, job, c.deployment, allocs, nil, "")
r := reconciler.Compute()
var updates []*structs.DeploymentStatusUpdate
@ -1969,7 +2129,7 @@ func TestReconciler_CreateDeployment_RollingUpgrade_Destructive(t *testing.T) {
allocs = append(allocs, alloc)
}
reconciler := NewAllocReconciler(testLogger(), allocUpdateFnDestructive, false, job.ID, job, nil, allocs, nil)
reconciler := NewAllocReconciler(testLogger(), allocUpdateFnDestructive, false, job.ID, job, nil, allocs, nil, "")
r := reconciler.Compute()
d := structs.NewDeployment(job)
@ -2012,7 +2172,7 @@ func TestReconciler_CreateDeployment_RollingUpgrade_Inplace(t *testing.T) {
allocs = append(allocs, alloc)
}
reconciler := NewAllocReconciler(testLogger(), allocUpdateFnInplace, false, job.ID, job, nil, allocs, nil)
reconciler := NewAllocReconciler(testLogger(), allocUpdateFnInplace, false, job.ID, job, nil, allocs, nil, "")
r := reconciler.Compute()
d := structs.NewDeployment(job)
@ -2052,7 +2212,7 @@ func TestReconciler_DontCreateDeployment_NoChanges(t *testing.T) {
allocs = append(allocs, alloc)
}
reconciler := NewAllocReconciler(testLogger(), allocUpdateFnIgnore, false, job.ID, job, nil, allocs, nil)
reconciler := NewAllocReconciler(testLogger(), allocUpdateFnIgnore, false, job.ID, job, nil, allocs, nil, "")
r := reconciler.Compute()
// Assert the correct results
@ -2130,7 +2290,7 @@ func TestReconciler_PausedOrFailedDeployment_NoMoreCanaries(t *testing.T) {
d.TaskGroups[canary.TaskGroup].PlacedCanaries = []string{canary.ID}
mockUpdateFn := allocUpdateFnMock(map[string]allocUpdateType{canary.ID: allocUpdateFnIgnore}, allocUpdateFnDestructive)
reconciler := NewAllocReconciler(testLogger(), mockUpdateFn, false, job.ID, job, d, allocs, nil)
reconciler := NewAllocReconciler(testLogger(), mockUpdateFn, false, job.ID, job, d, allocs, nil, "")
r := reconciler.Compute()
// Assert the correct results
@ -2195,7 +2355,7 @@ func TestReconciler_PausedOrFailedDeployment_NoMorePlacements(t *testing.T) {
allocs = append(allocs, alloc)
}
reconciler := NewAllocReconciler(testLogger(), allocUpdateFnIgnore, false, job.ID, job, d, allocs, nil)
reconciler := NewAllocReconciler(testLogger(), allocUpdateFnIgnore, false, job.ID, job, d, allocs, nil, "")
r := reconciler.Compute()
// Assert the correct results
@ -2269,7 +2429,7 @@ func TestReconciler_PausedOrFailedDeployment_NoMoreDestructiveUpdates(t *testing
allocs = append(allocs, newAlloc)
mockUpdateFn := allocUpdateFnMock(map[string]allocUpdateType{newAlloc.ID: allocUpdateFnIgnore}, allocUpdateFnDestructive)
reconciler := NewAllocReconciler(testLogger(), mockUpdateFn, false, job.ID, job, d, allocs, nil)
reconciler := NewAllocReconciler(testLogger(), mockUpdateFn, false, job.ID, job, d, allocs, nil, "")
r := reconciler.Compute()
// Assert the correct results
@ -2357,7 +2517,7 @@ func TestReconciler_PausedOrFailedDeployment_Migrations(t *testing.T) {
tainted[n.ID] = n
}
reconciler := NewAllocReconciler(testLogger(), allocUpdateFnIgnore, false, job.ID, job, d, allocs, tainted)
reconciler := NewAllocReconciler(testLogger(), allocUpdateFnIgnore, false, job.ID, job, d, allocs, tainted, "")
r := reconciler.Compute()
// Assert the correct results
@ -2431,7 +2591,7 @@ func TestReconciler_DrainNode_Canary(t *testing.T) {
tainted[n.ID] = n
mockUpdateFn := allocUpdateFnMock(handled, allocUpdateFnDestructive)
reconciler := NewAllocReconciler(testLogger(), mockUpdateFn, false, job.ID, job, d, allocs, tainted)
reconciler := NewAllocReconciler(testLogger(), mockUpdateFn, false, job.ID, job, d, allocs, tainted, "")
r := reconciler.Compute()
// Assert the correct results
@ -2503,7 +2663,7 @@ func TestReconciler_LostNode_Canary(t *testing.T) {
tainted[n.ID] = n
mockUpdateFn := allocUpdateFnMock(handled, allocUpdateFnDestructive)
reconciler := NewAllocReconciler(testLogger(), mockUpdateFn, false, job.ID, job, d, allocs, tainted)
reconciler := NewAllocReconciler(testLogger(), mockUpdateFn, false, job.ID, job, d, allocs, tainted, "")
r := reconciler.Compute()
// Assert the correct results
@ -2569,7 +2729,7 @@ func TestReconciler_StopOldCanaries(t *testing.T) {
allocs = append(allocs, canary)
}
reconciler := NewAllocReconciler(testLogger(), allocUpdateFnDestructive, false, job.ID, job, d, allocs, nil)
reconciler := NewAllocReconciler(testLogger(), allocUpdateFnDestructive, false, job.ID, job, d, allocs, nil, "")
r := reconciler.Compute()
newD := structs.NewDeployment(job)
@ -2622,7 +2782,7 @@ func TestReconciler_NewCanaries(t *testing.T) {
allocs = append(allocs, alloc)
}
reconciler := NewAllocReconciler(testLogger(), allocUpdateFnDestructive, false, job.ID, job, nil, allocs, nil)
reconciler := NewAllocReconciler(testLogger(), allocUpdateFnDestructive, false, job.ID, job, nil, allocs, nil, "")
r := reconciler.Compute()
newD := structs.NewDeployment(job)
@ -2672,7 +2832,7 @@ func TestReconciler_NewCanaries_MultiTG(t *testing.T) {
}
}
reconciler := NewAllocReconciler(testLogger(), allocUpdateFnDestructive, false, job.ID, job, nil, allocs, nil)
reconciler := NewAllocReconciler(testLogger(), allocUpdateFnDestructive, false, job.ID, job, nil, allocs, nil, "")
r := reconciler.Compute()
newD := structs.NewDeployment(job)
@ -2725,7 +2885,7 @@ func TestReconciler_NewCanaries_ScaleUp(t *testing.T) {
allocs = append(allocs, alloc)
}
reconciler := NewAllocReconciler(testLogger(), allocUpdateFnDestructive, false, job.ID, job, nil, allocs, nil)
reconciler := NewAllocReconciler(testLogger(), allocUpdateFnDestructive, false, job.ID, job, nil, allocs, nil, "")
r := reconciler.Compute()
newD := structs.NewDeployment(job)
@ -2773,7 +2933,7 @@ func TestReconciler_NewCanaries_ScaleDown(t *testing.T) {
allocs = append(allocs, alloc)
}
reconciler := NewAllocReconciler(testLogger(), allocUpdateFnDestructive, false, job.ID, job, nil, allocs, nil)
reconciler := NewAllocReconciler(testLogger(), allocUpdateFnDestructive, false, job.ID, job, nil, allocs, nil, "")
r := reconciler.Compute()
newD := structs.NewDeployment(job)
@ -2850,7 +3010,7 @@ func TestReconciler_NewCanaries_FillNames(t *testing.T) {
allocs = append(allocs, canary)
}
reconciler := NewAllocReconciler(testLogger(), allocUpdateFnDestructive, false, job.ID, job, d, allocs, nil)
reconciler := NewAllocReconciler(testLogger(), allocUpdateFnDestructive, false, job.ID, job, d, allocs, nil, "")
r := reconciler.Compute()
// Assert the correct results
@ -2919,7 +3079,7 @@ func TestReconciler_PromoteCanaries_Unblock(t *testing.T) {
}
mockUpdateFn := allocUpdateFnMock(handled, allocUpdateFnDestructive)
reconciler := NewAllocReconciler(testLogger(), mockUpdateFn, false, job.ID, job, d, allocs, nil)
reconciler := NewAllocReconciler(testLogger(), mockUpdateFn, false, job.ID, job, d, allocs, nil, "")
r := reconciler.Compute()
// Assert the correct results
@ -2992,7 +3152,7 @@ func TestReconciler_PromoteCanaries_CanariesEqualCount(t *testing.T) {
}
mockUpdateFn := allocUpdateFnMock(handled, allocUpdateFnDestructive)
reconciler := NewAllocReconciler(testLogger(), mockUpdateFn, false, job.ID, job, d, allocs, nil)
reconciler := NewAllocReconciler(testLogger(), mockUpdateFn, false, job.ID, job, d, allocs, nil, "")
r := reconciler.Compute()
updates := []*structs.DeploymentStatusUpdate{
@ -3091,7 +3251,7 @@ func TestReconciler_DeploymentLimit_HealthAccounting(t *testing.T) {
}
mockUpdateFn := allocUpdateFnMock(handled, allocUpdateFnDestructive)
reconciler := NewAllocReconciler(testLogger(), mockUpdateFn, false, job.ID, job, d, allocs, nil)
reconciler := NewAllocReconciler(testLogger(), mockUpdateFn, false, job.ID, job, d, allocs, nil, "")
r := reconciler.Compute()
// Assert the correct results
@ -3172,7 +3332,7 @@ func TestReconciler_TaintedNode_RollingUpgrade(t *testing.T) {
}
mockUpdateFn := allocUpdateFnMock(handled, allocUpdateFnDestructive)
reconciler := NewAllocReconciler(testLogger(), mockUpdateFn, false, job.ID, job, d, allocs, tainted)
reconciler := NewAllocReconciler(testLogger(), mockUpdateFn, false, job.ID, job, d, allocs, tainted, "")
r := reconciler.Compute()
// Assert the correct results
@ -3257,7 +3417,7 @@ func TestReconciler_FailedDeployment_PlacementLost(t *testing.T) {
}
mockUpdateFn := allocUpdateFnMock(handled, allocUpdateFnDestructive)
reconciler := NewAllocReconciler(testLogger(), mockUpdateFn, false, job.ID, job, d, allocs, tainted)
reconciler := NewAllocReconciler(testLogger(), mockUpdateFn, false, job.ID, job, d, allocs, tainted, "")
r := reconciler.Compute()
// Assert the correct results
@ -3312,7 +3472,7 @@ func TestReconciler_CompleteDeployment(t *testing.T) {
allocs = append(allocs, alloc)
}
reconciler := NewAllocReconciler(testLogger(), allocUpdateFnIgnore, false, job.ID, job, d, allocs, nil)
reconciler := NewAllocReconciler(testLogger(), allocUpdateFnIgnore, false, job.ID, job, d, allocs, nil, "")
r := reconciler.Compute()
// Assert the correct results
@ -3399,7 +3559,7 @@ func TestReconciler_FailedDeployment_CancelCanaries(t *testing.T) {
}
mockUpdateFn := allocUpdateFnMock(handled, allocUpdateFnDestructive)
reconciler := NewAllocReconciler(testLogger(), mockUpdateFn, false, job.ID, job, d, allocs, nil)
reconciler := NewAllocReconciler(testLogger(), mockUpdateFn, false, job.ID, job, d, allocs, nil, "")
r := reconciler.Compute()
// Assert the correct results
@ -3468,7 +3628,7 @@ func TestReconciler_FailedDeployment_NewJob(t *testing.T) {
jobNew := job.Copy()
jobNew.Version += 100
reconciler := NewAllocReconciler(testLogger(), allocUpdateFnDestructive, false, job.ID, jobNew, d, allocs, nil)
reconciler := NewAllocReconciler(testLogger(), allocUpdateFnDestructive, false, job.ID, jobNew, d, allocs, nil, "")
r := reconciler.Compute()
dnew := structs.NewDeployment(jobNew)
@ -3521,7 +3681,7 @@ func TestReconciler_MarkDeploymentComplete(t *testing.T) {
allocs = append(allocs, alloc)
}
reconciler := NewAllocReconciler(testLogger(), allocUpdateFnIgnore, false, job.ID, job, d, allocs, nil)
reconciler := NewAllocReconciler(testLogger(), allocUpdateFnIgnore, false, job.ID, job, d, allocs, nil, "")
r := reconciler.Compute()
updates := []*structs.DeploymentStatusUpdate{
@ -3590,7 +3750,7 @@ func TestReconciler_JobChange_ScaleUp_SecondEval(t *testing.T) {
}
mockUpdateFn := allocUpdateFnMock(handled, allocUpdateFnDestructive)
reconciler := NewAllocReconciler(testLogger(), mockUpdateFn, false, job.ID, job, d, allocs, nil)
reconciler := NewAllocReconciler(testLogger(), mockUpdateFn, false, job.ID, job, d, allocs, nil, "")
r := reconciler.Compute()
// Assert the correct results
@ -3625,7 +3785,7 @@ func TestReconciler_RollingUpgrade_MissingAllocs(t *testing.T) {
allocs = append(allocs, alloc)
}
reconciler := NewAllocReconciler(testLogger(), allocUpdateFnDestructive, false, job.ID, job, nil, allocs, nil)
reconciler := NewAllocReconciler(testLogger(), allocUpdateFnDestructive, false, job.ID, job, nil, allocs, nil, "")
r := reconciler.Compute()
d := structs.NewDeployment(job)
@ -3677,7 +3837,7 @@ func TestReconciler_Batch_Rerun(t *testing.T) {
job2 := job.Copy()
job2.CreateIndex++
reconciler := NewAllocReconciler(testLogger(), allocUpdateFnIgnore, true, job2.ID, job2, nil, allocs, nil)
reconciler := NewAllocReconciler(testLogger(), allocUpdateFnIgnore, true, job2.ID, job2, nil, allocs, nil, "")
r := reconciler.Compute()
// Assert the correct results
@ -3737,7 +3897,7 @@ func TestReconciler_FailedDeployment_DontReschedule(t *testing.T) {
StartedAt: now.Add(-1 * time.Hour),
FinishedAt: now.Add(-10 * time.Second)}}
reconciler := NewAllocReconciler(testLogger(), allocUpdateFnDestructive, false, job.ID, job, d, allocs, nil)
reconciler := NewAllocReconciler(testLogger(), allocUpdateFnDestructive, false, job.ID, job, d, allocs, nil, "")
r := reconciler.Compute()
// Assert that no rescheduled placements were created
@ -3793,7 +3953,7 @@ func TestReconciler_DeploymentWithFailedAllocs_DontReschedule(t *testing.T) {
StartedAt: now.Add(-1 * time.Hour),
FinishedAt: now.Add(-10 * time.Second)}}
reconciler := NewAllocReconciler(testLogger(), allocUpdateFnDestructive, false, job.ID, job, d, allocs, nil)
reconciler := NewAllocReconciler(testLogger(), allocUpdateFnDestructive, false, job.ID, job, d, allocs, nil, "")
r := reconciler.Compute()
// Assert that no rescheduled placements were created
@ -3875,7 +4035,7 @@ func TestReconciler_FailedDeployment_AutoRevert_CancelCanaries(t *testing.T) {
allocs = append(allocs, new)
}
reconciler := NewAllocReconciler(testLogger(), allocUpdateFnIgnore, false, job.ID, jobv2, d, allocs, nil)
reconciler := NewAllocReconciler(testLogger(), allocUpdateFnIgnore, false, job.ID, jobv2, d, allocs, nil, "")
r := reconciler.Compute()
updates := []*structs.DeploymentStatusUpdate{

View File

@ -234,11 +234,10 @@ func (a allocSet) filterByTainted(nodes map[string]*structs.Node) (untainted, mi
// untainted or a set of allocations that must be rescheduled now. Allocations that can be rescheduled
// at a future time are also returned so that we can create follow up evaluations for them. Allocs are
// skipped or considered untainted according to logic defined in shouldFilter method.
func (a allocSet) filterByRescheduleable(isBatch bool) (untainted, rescheduleNow allocSet, rescheduleLater []*delayedRescheduleInfo) {
func (a allocSet) filterByRescheduleable(isBatch bool, now time.Time, evalID string) (untainted, rescheduleNow allocSet, rescheduleLater []*delayedRescheduleInfo) {
untainted = make(map[string]*structs.Allocation)
rescheduleNow = make(map[string]*structs.Allocation)
now := time.Now()
for _, alloc := range a {
var eligibleNow, eligibleLater bool
var rescheduleTime time.Time
@ -258,7 +257,7 @@ func (a allocSet) filterByRescheduleable(isBatch bool) (untainted, rescheduleNow
// Only failed allocs with desired state run get to this point
// If the failed alloc is not eligible for rescheduling now we add it to the untainted set
eligibleNow, eligibleLater, rescheduleTime = updateByReschedulable(alloc, now)
eligibleNow, eligibleLater, rescheduleTime = updateByReschedulable(alloc, now, evalID)
if !eligibleNow {
untainted[alloc.ID] = alloc
if eligibleLater {
@ -321,9 +320,10 @@ func shouldFilter(alloc *structs.Allocation, isBatch bool) (untainted, ignore bo
// updateByReschedulable is a helper method that encapsulates logic for whether a failed allocation
// should be rescheduled now, later or left in the untainted set
func updateByReschedulable(alloc *structs.Allocation, now time.Time) (rescheduleNow, rescheduleLater bool, rescheduleTime time.Time) {
func updateByReschedulable(alloc *structs.Allocation, now time.Time, evalID string) (rescheduleNow, rescheduleLater bool, rescheduleTime time.Time) {
rescheduleTime, eligible := alloc.NextRescheduleTime()
if eligible && now.After(rescheduleTime) {
// Reschedule if the eval ID matches the alloc's followup evalID or if its close to its reschedule time
if eligible && ((alloc.FollowupEvalID != "" && alloc.FollowupEvalID == evalID) || rescheduleTime.Sub(now) <= rescheduleTimeLapseWindowSize) {
rescheduleNow = true
return
}