Disallow EvalGC to reap batch jobs evals/allocs and make JobGC only oneshot GCs everything

This commit is contained in:
Alex Dadgar 2016-06-27 15:47:49 -07:00
parent 3664d465dc
commit 3f0a47f9e4
2 changed files with 205 additions and 248 deletions

View File

@ -106,24 +106,27 @@ OUTER:
}
allEvalsGC := true
var jobAlloc, jobEval []string
for _, eval := range evals {
gc, allocs, err := c.gcEval(eval, oldThreshold)
gc, allocs, err := c.gcEval(eval, oldThreshold, true)
if err != nil {
continue OUTER
}
// Update whether all evals GC'd so we know whether to GC the job.
allEvalsGC = allEvalsGC && gc
if gc {
gcEval = append(gcEval, eval.ID)
jobEval = append(jobEval, eval.ID)
jobAlloc = append(jobAlloc, allocs...)
} else {
allEvalsGC = false
break
}
gcAlloc = append(gcAlloc, allocs...)
}
// Job is eligible for garbage collection
if allEvalsGC {
gcJob = append(gcJob, job.ID)
gcAlloc = append(gcAlloc, jobAlloc...)
gcEval = append(gcEval, jobEval...)
}
}
@ -187,7 +190,9 @@ func (c *CoreScheduler) evalGC(eval *structs.Evaluation) error {
for raw := iter.Next(); raw != nil; raw = iter.Next() {
eval := raw.(*structs.Evaluation)
gc, allocs, err := c.gcEval(eval, oldThreshold)
// The Evaluation GC should not handle batch jobs since those need to be
// garbage collected in one shot
gc, allocs, err := c.gcEval(eval, oldThreshold, false)
if err != nil {
return err
}
@ -213,7 +218,7 @@ func (c *CoreScheduler) evalGC(eval *structs.Evaluation) error {
// allocs are not older than the threshold. If the eval should be garbage
// collected, the associated alloc ids that should also be removed are also
// returned
func (c *CoreScheduler) gcEval(eval *structs.Evaluation, thresholdIndex uint64) (
func (c *CoreScheduler) gcEval(eval *structs.Evaluation, thresholdIndex uint64, allowBatch bool) (
bool, []string, error) {
// Ignore non-terminal and new evaluations
if !eval.TerminalStatus() || eval.ModifyIndex > thresholdIndex {
@ -225,6 +230,10 @@ func (c *CoreScheduler) gcEval(eval *structs.Evaluation, thresholdIndex uint64)
// terminal allocations get GC'd the scheduler would re-run the
// allocations.
if eval.Type == structs.JobTypeBatch {
if !allowBatch {
return false, nil, nil
}
// Check if the job is running
job, err := c.snap.JobByID(eval.JobID)
if err != nil {

View File

@ -68,6 +68,86 @@ func TestCoreScheduler_EvalGC(t *testing.T) {
}
}
// An EvalGC should never reap a batch job
func TestCoreScheduler_EvalGC_Batch(t *testing.T) {
s1 := testServer(t, nil)
defer s1.Shutdown()
testutil.WaitForLeader(t, s1.RPC)
// Insert a "dead" job
state := s1.fsm.State()
job := mock.Job()
job.Type = structs.JobTypeBatch
job.Status = structs.JobStatusDead
err := state.UpsertJob(1000, job)
if err != nil {
t.Fatalf("err: %v", err)
}
// Insert "dead" eval
eval := mock.Eval()
eval.Status = structs.EvalStatusComplete
eval.Type = structs.JobTypeBatch
eval.JobID = job.ID
err = state.UpsertEvals(1001, []*structs.Evaluation{eval})
if err != nil {
t.Fatalf("err: %v", err)
}
// Insert "dead" alloc
alloc := mock.Alloc()
alloc.JobID = job.ID
alloc.EvalID = eval.ID
alloc.DesiredStatus = structs.AllocDesiredStatusFailed
err = state.UpsertAllocs(1002, []*structs.Allocation{alloc})
if err != nil {
t.Fatalf("err: %v", err)
}
// Update the time tables to make this work
tt := s1.fsm.TimeTable()
tt.Witness(2000, time.Now().UTC().Add(-1*s1.config.EvalGCThreshold))
// Create a core scheduler
snap, err := state.Snapshot()
if err != nil {
t.Fatalf("err: %v", err)
}
core := NewCoreScheduler(s1, snap)
// Attempt the GC
gc := s1.coreJobEval(structs.CoreJobEvalGC, 2000)
err = core.Process(gc)
if err != nil {
t.Fatalf("err: %v", err)
}
// Nothing should be gone
out, err := state.EvalByID(eval.ID)
if err != nil {
t.Fatalf("err: %v", err)
}
if out == nil {
t.Fatalf("bad: %v", out)
}
outA, err := state.AllocByID(alloc.ID)
if err != nil {
t.Fatalf("err: %v", err)
}
if outA == nil {
t.Fatalf("bad: %v", outA)
}
outB, err := state.JobByID(job.ID)
if err != nil {
t.Fatalf("err: %v", err)
}
if outB == nil {
t.Fatalf("bad: %v", outB)
}
}
func TestCoreScheduler_EvalGC_Partial(t *testing.T) {
s1 := testServer(t, nil)
defer s1.Shutdown()
@ -144,246 +224,6 @@ func TestCoreScheduler_EvalGC_Partial(t *testing.T) {
}
}
func TestCoreScheduler_EvalGC_Batch_NoAllocs(t *testing.T) {
s1 := testServer(t, nil)
defer s1.Shutdown()
testutil.WaitForLeader(t, s1.RPC)
// Insert "dead" eval
state := s1.fsm.State()
eval := mock.Eval()
eval.Type = structs.JobTypeBatch
eval.Status = structs.EvalStatusFailed
err := state.UpsertEvals(1000, []*structs.Evaluation{eval})
if err != nil {
t.Fatalf("err: %v", err)
}
// Update the time tables to make this work
tt := s1.fsm.TimeTable()
tt.Witness(2000, time.Now().UTC().Add(-1*s1.config.EvalGCThreshold))
// Create a core scheduler
snap, err := state.Snapshot()
if err != nil {
t.Fatalf("err: %v", err)
}
core := NewCoreScheduler(s1, snap)
// Attempt the GC
gc := s1.coreJobEval(structs.CoreJobEvalGC, 2000)
err = core.Process(gc)
if err != nil {
t.Fatalf("err: %v", err)
}
// Should be gone because there is no alloc associated
out, err := state.EvalByID(eval.ID)
if err != nil {
t.Fatalf("err: %v", err)
}
if out != nil {
t.Fatalf("bad: %v", out)
}
}
func TestCoreScheduler_EvalGC_Batch_Allocs_WithRunningJob(t *testing.T) {
s1 := testServer(t, nil)
defer s1.Shutdown()
testutil.WaitForLeader(t, s1.RPC)
// Insert job.
state := s1.fsm.State()
job := mock.Job()
job.Type = structs.JobTypeBatch
err := state.UpsertJob(1000, job)
if err != nil {
t.Fatalf("err: %v", err)
}
// Insert "complete" eval
eval := mock.Eval()
eval.Type = structs.JobTypeBatch
eval.Status = structs.EvalStatusComplete
eval.JobID = job.ID
if err := state.UpsertEvals(1001, []*structs.Evaluation{eval}); err != nil {
t.Fatalf("err: %v", err)
}
// Insert "failed" alloc
alloc := mock.Alloc()
alloc.EvalID = eval.ID
alloc.JobID = job.ID
alloc.DesiredStatus = structs.AllocDesiredStatusFailed
// Insert "running" alloc
alloc2 := mock.Alloc()
alloc2.EvalID = eval.ID
alloc2.JobID = job.ID
alloc2.DesiredStatus = structs.AllocDesiredStatusRun
err = state.UpsertAllocs(1002, []*structs.Allocation{alloc, alloc2})
if err != nil {
t.Fatalf("err: %v", err)
}
// Update the time tables to make this work
tt := s1.fsm.TimeTable()
tt.Witness(2000, time.Now().UTC().Add(-1*s1.config.EvalGCThreshold))
// Create a core scheduler
snap, err := state.Snapshot()
if err != nil {
t.Fatalf("err: %v", err)
}
core := NewCoreScheduler(s1, snap)
// Attempt the GC
gc := s1.coreJobEval(structs.CoreJobEvalGC, 2000)
err = core.Process(gc)
if err != nil {
t.Fatalf("err: %v", err)
}
// Shouldn't be gone because there are associated allocs.
out, err := state.EvalByID(eval.ID)
if err != nil {
t.Fatalf("err: %v", err)
}
if out == nil {
t.Fatalf("bad: %v", out)
}
outA, err := state.AllocByID(alloc.ID)
if err != nil {
t.Fatalf("err: %v", err)
}
if outA == nil {
t.Fatalf("bad: %v", outA)
}
}
func TestCoreScheduler_EvalGC_Batch_Allocs_WithTerminalJob(t *testing.T) {
s1 := testServer(t, nil)
defer s1.Shutdown()
testutil.WaitForLeader(t, s1.RPC)
// Insert job.
state := s1.fsm.State()
job := mock.Job()
job.Type = structs.JobTypeBatch
err := state.UpsertJob(1000, job)
if err != nil {
t.Fatalf("err: %v", err)
}
// Insert "complete" eval
eval := mock.Eval()
eval.Type = structs.JobTypeBatch
eval.Status = structs.EvalStatusComplete
eval.JobID = job.ID
if err := state.UpsertEvals(1001, []*structs.Evaluation{eval}); err != nil {
t.Fatalf("err: %v", err)
}
// Insert "complete" alloc
alloc := mock.Alloc()
alloc.EvalID = eval.ID
alloc.JobID = job.ID
alloc.DesiredStatus = structs.AllocDesiredStatusRun
alloc.ClientStatus = structs.AllocClientStatusComplete
err = state.UpsertAllocs(1002, []*structs.Allocation{alloc})
if err != nil {
t.Fatalf("err: %v", err)
}
// Update the time tables to make this work
tt := s1.fsm.TimeTable()
tt.Witness(2000, time.Now().UTC().Add(-1*s1.config.EvalGCThreshold))
// Create a core scheduler
snap, err := state.Snapshot()
if err != nil {
t.Fatalf("err: %v", err)
}
core := NewCoreScheduler(s1, snap)
// Attempt the GC
gc := s1.coreJobEval(structs.CoreJobEvalGC, 2000)
err = core.Process(gc)
if err != nil {
t.Fatalf("err: %v", err)
}
// The job and it's associated alloc and eval should be gone
out, err := state.EvalByID(eval.ID)
if err != nil {
t.Fatalf("err: %v", err)
}
if out != nil {
t.Fatalf("expected eval: %v to be GC-ed", out)
}
outA, err := state.AllocByID(alloc.ID)
if err != nil {
t.Fatalf("err: %v", err)
}
if outA != nil {
t.Fatalf("expected alloc: %v to be GC-ed", outA)
}
}
func TestCoreScheduler_EvalGC_Batch_Allocs_NoJob(t *testing.T) {
s1 := testServer(t, nil)
defer s1.Shutdown()
testutil.WaitForLeader(t, s1.RPC)
// Insert "dead" eval
state := s1.fsm.State()
eval := mock.Eval()
eval.Type = structs.JobTypeBatch
eval.Status = structs.EvalStatusFailed
err := state.UpsertEvals(1000, []*structs.Evaluation{eval})
if err != nil {
t.Fatalf("err: %v", err)
}
// Insert "dead" alloc
alloc := mock.Alloc()
alloc.EvalID = eval.ID
alloc.DesiredStatus = structs.AllocDesiredStatusFailed
err = state.UpsertAllocs(1001, []*structs.Allocation{alloc})
if err != nil {
t.Fatalf("err: %v", err)
}
// Update the time tables to make this work
tt := s1.fsm.TimeTable()
tt.Witness(2000, time.Now().UTC().Add(-1*s1.config.EvalGCThreshold))
// Create a core scheduler
snap, err := state.Snapshot()
if err != nil {
t.Fatalf("err: %v", err)
}
core := NewCoreScheduler(s1, snap)
// Attempt the GC
gc := s1.coreJobEval(structs.CoreJobEvalGC, 2000)
err = core.Process(gc)
if err != nil {
t.Fatalf("err: %v", err)
}
// Should be gone because the job is deregistered.
out, err := state.EvalByID(eval.ID)
if err != nil {
t.Fatalf("err: %v", err)
}
if out != nil {
t.Fatalf("bad: %v", out)
}
}
func TestCoreScheduler_EvalGC_Force(t *testing.T) {
s1 := testServer(t, nil)
defer s1.Shutdown()
@ -722,6 +562,114 @@ func TestCoreScheduler_JobGC(t *testing.T) {
}
}
// This test ensures that batch jobs are GC'd in one shot, meaning it all
// allocs/evals and job or nothing
func TestCoreScheduler_JobGC_OneShot(t *testing.T) {
s1 := testServer(t, nil)
defer s1.Shutdown()
testutil.WaitForLeader(t, s1.RPC)
// Insert job.
state := s1.fsm.State()
job := mock.Job()
job.Type = structs.JobTypeBatch
err := state.UpsertJob(1000, job)
if err != nil {
t.Fatalf("err: %v", err)
}
// Insert two complete evals
eval := mock.Eval()
eval.JobID = job.ID
eval.Status = structs.EvalStatusComplete
eval2 := mock.Eval()
eval2.JobID = job.ID
eval2.Status = structs.EvalStatusComplete
err = state.UpsertEvals(1001, []*structs.Evaluation{eval, eval2})
if err != nil {
t.Fatalf("err: %v", err)
}
// Insert one complete alloc and one running on distinct evals
alloc := mock.Alloc()
alloc.JobID = job.ID
alloc.EvalID = eval.ID
alloc.DesiredStatus = structs.AllocDesiredStatusStop
alloc2 := mock.Alloc()
alloc2.JobID = job.ID
alloc2.EvalID = eval2.ID
alloc2.DesiredStatus = structs.AllocDesiredStatusRun
err = state.UpsertAllocs(1002, []*structs.Allocation{alloc, alloc2})
if err != nil {
t.Fatalf("err: %v", err)
}
// Force the jobs state to dead
job.Status = structs.JobStatusDead
// Update the time tables to make this work
tt := s1.fsm.TimeTable()
tt.Witness(2000, time.Now().UTC().Add(-1*s1.config.JobGCThreshold))
// Create a core scheduler
snap, err := state.Snapshot()
if err != nil {
t.Fatalf("err: %v", err)
}
core := NewCoreScheduler(s1, snap)
// Attempt the GC
gc := s1.coreJobEval(structs.CoreJobJobGC, 2000)
err = core.Process(gc)
if err != nil {
t.Fatalf("err: %v", err)
}
// Should still exist
out, err := state.JobByID(job.ID)
if err != nil {
t.Fatalf("err: %v", err)
}
if out == nil {
t.Fatalf("bad: %v", out)
}
outE, err := state.EvalByID(eval.ID)
if err != nil {
t.Fatalf("err: %v", err)
}
if outE == nil {
t.Fatalf("bad: %v", outE)
}
outE2, err := state.EvalByID(eval2.ID)
if err != nil {
t.Fatalf("err: %v", err)
}
if outE2 == nil {
t.Fatalf("bad: %v", outE2)
}
outA, err := state.AllocByID(alloc.ID)
if err != nil {
t.Fatalf("err: %v", err)
}
if outA == nil {
t.Fatalf("bad: %v", outA)
}
outA2, err := state.AllocByID(alloc2.ID)
if err != nil {
t.Fatalf("err: %v", err)
}
if outA2 == nil {
t.Fatalf("bad: %v", outA2)
}
}
func TestCoreScheduler_JobGC_Force(t *testing.T) {
tests := []struct {
test, evalStatus, allocStatus string