Consider dead job status and modify unit test setup for correctness

This commit is contained in:
Preetha Appan 2018-01-30 14:45:59 -06:00
parent 4fd2691323
commit 28d2439810
No known key found for this signature in database
GPG Key ID: 9F7C19990A50EAFC
2 changed files with 38 additions and 23 deletions

View File

@ -258,16 +258,7 @@ func (c *CoreScheduler) gcEval(eval *structs.Evaluation, thresholdIndex uint64,
// Job doesn't exist // Job doesn't exist
// Job is Stopped and dead // Job is Stopped and dead
// allowBatch and the job is dead // allowBatch and the job is dead
collect := false collect := shouldCollect(job, allowBatch)
if job == nil {
collect = true
} else if job.Status != structs.JobStatusDead {
collect = false
} else if job.Stop {
collect = true
} else if allowBatch {
collect = true
}
// We don't want to gc anything related to a job which is not dead // We don't want to gc anything related to a job which is not dead
// If the batch job doesn't exist we can GC it regardless of allowBatch // If the batch job doesn't exist we can GC it regardless of allowBatch
@ -288,10 +279,8 @@ func (c *CoreScheduler) gcEval(eval *structs.Evaluation, thresholdIndex uint64,
gcEval := true gcEval := true
var gcAllocIDs []string var gcAllocIDs []string
for _, alloc := range allocs { for _, alloc := range allocs {
if job == nil || job.Stop { if job == nil || job.Stop || job.Status == structs.JobStatusDead {
// Eligible to be GC'd because the job is not around or stopped // Eligible to be GC'd because the job is not around, stopped or dead
// We don't consider jobs with "dead" status here because it may still
// have terminal allocs that are reschedulable
gcAllocIDs = append(gcAllocIDs, alloc.ID) gcAllocIDs = append(gcAllocIDs, alloc.ID)
continue continue
} }
@ -314,6 +303,21 @@ func (c *CoreScheduler) gcEval(eval *structs.Evaluation, thresholdIndex uint64,
return gcEval, gcAllocIDs, nil return gcEval, gcAllocIDs, nil
} }
// shouldCollect is a helper function that determines whether the job is eligible for GC
func shouldCollect(job *structs.Job, allowBatch bool) bool {
collect := false
if job == nil {
collect = true
} else if job.Status != structs.JobStatusDead {
collect = false
} else if job.Stop {
collect = true
} else if allowBatch {
collect = true
}
return collect
}
// evalReap contacts the leader and issues a reap on the passed evals and // evalReap contacts the leader and issues a reap on the passed evals and
// allocs. // allocs.
func (c *CoreScheduler) evalReap(evals, allocs []string) error { func (c *CoreScheduler) evalReap(evals, allocs []string) error {

View File

@ -125,6 +125,13 @@ func TestCoreScheduler_EvalGC_ReshedulingAllocs(t *testing.T) {
err := state.UpsertEvals(1000, []*structs.Evaluation{eval}) err := state.UpsertEvals(1000, []*structs.Evaluation{eval})
require.Nil(err) require.Nil(err)
// Insert "pending" eval for same job
eval2 := mock.Eval()
eval2.JobID = eval.JobID
state.UpsertJobSummary(999, mock.JobSummary(eval2.JobID))
err = state.UpsertEvals(1003, []*structs.Evaluation{eval2})
require.Nil(err)
// Insert mock job with default reschedule policy of 2 in 10 minutes // Insert mock job with default reschedule policy of 2 in 10 minutes
job := mock.Job() job := mock.Job()
job.ID = eval.JobID job.ID = eval.JobID
@ -179,7 +186,7 @@ func TestCoreScheduler_EvalGC_ReshedulingAllocs(t *testing.T) {
} }
core := NewCoreScheduler(s1, snap) core := NewCoreScheduler(s1, snap)
// Attempt the GC // Attempt the GC, job has all terminal allocs and one pending eval
gc := s1.coreJobEval(structs.CoreJobEvalGC, 2000) gc := s1.coreJobEval(structs.CoreJobEvalGC, 2000)
err = core.Process(gc) err = core.Process(gc)
require.Nil(err) require.Nil(err)
@ -492,18 +499,13 @@ func TestCoreScheduler_EvalGC_Partial(t *testing.T) {
t.Fatalf("err: %v", err) t.Fatalf("err: %v", err)
} }
// Insert mock job with rescheduling disabled // Create mock job with id same as eval
job := mock.Job() job := mock.Job()
job.ID = eval.JobID job.ID = eval.JobID
job.TaskGroups[0].ReschedulePolicy = &structs.ReschedulePolicy{
Attempts: 0,
Interval: 0 * time.Second,
}
err = state.UpsertJob(1001, job)
require.Nil(err)
// Insert "dead" alloc // Insert "dead" alloc
alloc := mock.Alloc() alloc := mock.Alloc()
alloc.JobID = job.ID
alloc.EvalID = eval.ID alloc.EvalID = eval.ID
alloc.DesiredStatus = structs.AllocDesiredStatusStop alloc.DesiredStatus = structs.AllocDesiredStatusStop
alloc.TaskGroup = job.TaskGroups[0].Name alloc.TaskGroup = job.TaskGroups[0].Name
@ -511,7 +513,7 @@ func TestCoreScheduler_EvalGC_Partial(t *testing.T) {
// Insert "lost" alloc // Insert "lost" alloc
alloc2 := mock.Alloc() alloc2 := mock.Alloc()
alloc2.JobID = alloc.JobID alloc2.JobID = job.ID
alloc2.EvalID = eval.ID alloc2.EvalID = eval.ID
alloc2.TaskGroup = job.TaskGroups[0].Name alloc2.TaskGroup = job.TaskGroups[0].Name
alloc2.DesiredStatus = structs.AllocDesiredStatusRun alloc2.DesiredStatus = structs.AllocDesiredStatusRun
@ -525,12 +527,21 @@ func TestCoreScheduler_EvalGC_Partial(t *testing.T) {
// Insert "running" alloc // Insert "running" alloc
alloc3 := mock.Alloc() alloc3 := mock.Alloc()
alloc3.EvalID = eval.ID alloc3.EvalID = eval.ID
alloc3.JobID = job.ID
state.UpsertJobSummary(1003, mock.JobSummary(alloc3.JobID)) state.UpsertJobSummary(1003, mock.JobSummary(alloc3.JobID))
err = state.UpsertAllocs(1004, []*structs.Allocation{alloc3}) err = state.UpsertAllocs(1004, []*structs.Allocation{alloc3})
if err != nil { if err != nil {
t.Fatalf("err: %v", err) t.Fatalf("err: %v", err)
} }
// Insert mock job with rescheduling disabled
job.TaskGroups[0].ReschedulePolicy = &structs.ReschedulePolicy{
Attempts: 0,
Interval: 0 * time.Second,
}
err = state.UpsertJob(1001, job)
require.Nil(err)
// Update the time tables to make this work // Update the time tables to make this work
tt := s1.fsm.TimeTable() tt := s1.fsm.TimeTable()
tt.Witness(2000, time.Now().UTC().Add(-1*s1.config.EvalGCThreshold)) tt.Witness(2000, time.Now().UTC().Add(-1*s1.config.EvalGCThreshold))