batch jobs GC removes terminal allocs if job modifyindex is older than running job

This commit is contained in:
Preetha Appan 2018-11-01 00:02:26 -05:00
parent b0ddc03409
commit e586817ce7
No known key found for this signature in database
GPG Key ID: 9F7C19990A50EAFC
2 changed files with 163 additions and 21 deletions

View File

@ -286,6 +286,14 @@ func (c *CoreScheduler) gcEval(eval *structs.Evaluation, thresholdIndex uint64,
return false, nil, err return false, nil, err
} }
// Get the allocations by eval
allocs, err := c.snap.AllocsByEval(ws, eval.ID)
if err != nil {
c.logger.Error("failed to get allocs for eval",
"eval_id", eval.ID, "error", err)
return false, nil, err
}
// If the eval is from a running "batch" job we don't want to garbage // If the eval is from a running "batch" job we don't want to garbage
// collect its allocations. If there is a long running batch job and its // collect its allocations. If there is a long running batch job and its
// terminal allocations get GC'd the scheduler would re-run the // terminal allocations get GC'd the scheduler would re-run the
@ -311,18 +319,12 @@ func (c *CoreScheduler) gcEval(eval *structs.Evaluation, thresholdIndex uint64,
// We don't want to gc anything related to a job which is not dead // We don't want to gc anything related to a job which is not dead
// If the batch job doesn't exist we can GC it regardless of allowBatch // If the batch job doesn't exist we can GC it regardless of allowBatch
if !collect { if !collect {
return false, nil, nil // Find allocs associated with older (based on modifyindex) and GC them if terminal
oldAllocs := olderVersionTerminalAllocs(allocs, job)
return false, oldAllocs, nil
} }
} }
// Get the allocations by eval
allocs, err := c.snap.AllocsByEval(ws, eval.ID)
if err != nil {
c.logger.Error("failed to get allocs for eval",
"eval_id", eval.ID, "error", err)
return false, nil, err
}
// Scan the allocations to ensure they are terminal and old // Scan the allocations to ensure they are terminal and old
gcEval := true gcEval := true
var gcAllocIDs []string var gcAllocIDs []string
@ -340,6 +342,18 @@ func (c *CoreScheduler) gcEval(eval *structs.Evaluation, thresholdIndex uint64,
return gcEval, gcAllocIDs, nil return gcEval, gcAllocIDs, nil
} }
// olderVersionTerminalAllocs returns terminal allocations whose job modify index
// is older than the job's modify index
func olderVersionTerminalAllocs(allocs []*structs.Allocation, job *structs.Job) []string {
var ret []string
for _, alloc := range allocs {
if alloc.Job != nil && alloc.Job.JobModifyIndex < job.JobModifyIndex && alloc.TerminalStatus() {
ret = append(ret, alloc.ID)
}
}
return ret
}
// evalReap contacts the leader and issues a reap on the passed evals and // evalReap contacts the leader and issues a reap on the passed evals and
// allocs. // allocs.
func (c *CoreScheduler) evalReap(evals, allocs []string) error { func (c *CoreScheduler) evalReap(evals, allocs []string) error {

View File

@ -141,6 +141,7 @@ func TestCoreScheduler_EvalGC_ReschedulingAllocs(t *testing.T) {
// Insert failed alloc with an old reschedule attempt, can be GCed // Insert failed alloc with an old reschedule attempt, can be GCed
alloc := mock.Alloc() alloc := mock.Alloc()
alloc.Job = job
alloc.EvalID = eval.ID alloc.EvalID = eval.ID
alloc.DesiredStatus = structs.AllocDesiredStatusRun alloc.DesiredStatus = structs.AllocDesiredStatusRun
alloc.ClientStatus = structs.AllocClientStatusFailed alloc.ClientStatus = structs.AllocClientStatusFailed
@ -158,6 +159,7 @@ func TestCoreScheduler_EvalGC_ReschedulingAllocs(t *testing.T) {
} }
alloc2 := mock.Alloc() alloc2 := mock.Alloc()
alloc2.Job = job
alloc2.EvalID = eval.ID alloc2.EvalID = eval.ID
alloc2.DesiredStatus = structs.AllocDesiredStatusRun alloc2.DesiredStatus = structs.AllocDesiredStatusRun
alloc2.ClientStatus = structs.AllocClientStatusFailed alloc2.ClientStatus = structs.AllocClientStatusFailed
@ -315,12 +317,14 @@ func TestCoreScheduler_EvalGC_Batch(t *testing.T) {
// Insert "failed" alloc // Insert "failed" alloc
alloc := mock.Alloc() alloc := mock.Alloc()
alloc.Job = job
alloc.JobID = job.ID alloc.JobID = job.ID
alloc.EvalID = eval.ID alloc.EvalID = eval.ID
alloc.DesiredStatus = structs.AllocDesiredStatusStop alloc.DesiredStatus = structs.AllocDesiredStatusStop
// Insert "lost" alloc // Insert "lost" alloc
alloc2 := mock.Alloc() alloc2 := mock.Alloc()
alloc2.Job = job
alloc2.JobID = job.ID alloc2.JobID = job.ID
alloc2.EvalID = eval.ID alloc2.EvalID = eval.ID
alloc2.DesiredStatus = structs.AllocDesiredStatusRun alloc2.DesiredStatus = structs.AllocDesiredStatusRun
@ -384,6 +388,128 @@ func TestCoreScheduler_EvalGC_Batch(t *testing.T) {
} }
} }
// An EvalGC should reap allocations from jobs with an older modify index
func TestCoreScheduler_EvalGC_Batch_OldVersion(t *testing.T) {
t.Parallel()
s1 := TestServer(t, nil)
defer s1.Shutdown()
testutil.WaitForLeader(t, s1.RPC)
// COMPAT Remove in 0.6: Reset the FSM time table since we reconcile which sets index 0
s1.fsm.timetable.table = make([]TimeTableEntry, 1, 10)
// Insert a "dead" job
state := s1.fsm.State()
job := mock.Job()
job.Type = structs.JobTypeBatch
job.Status = structs.JobStatusDead
err := state.UpsertJob(1000, job)
if err != nil {
t.Fatalf("err: %v", err)
}
// Insert "complete" eval
eval := mock.Eval()
eval.Status = structs.EvalStatusComplete
eval.Type = structs.JobTypeBatch
eval.JobID = job.ID
err = state.UpsertEvals(1001, []*structs.Evaluation{eval})
if err != nil {
t.Fatalf("err: %v", err)
}
// Insert "failed" alloc
alloc := mock.Alloc()
alloc.Job = job
alloc.JobID = job.ID
alloc.EvalID = eval.ID
alloc.DesiredStatus = structs.AllocDesiredStatusStop
// Insert "lost" alloc
alloc2 := mock.Alloc()
alloc2.Job = job
alloc2.JobID = job.ID
alloc2.EvalID = eval.ID
alloc2.DesiredStatus = structs.AllocDesiredStatusRun
alloc2.ClientStatus = structs.AllocClientStatusLost
// Insert alloc with older job modifyindex
alloc3 := mock.Alloc()
job2 := job.Copy()
alloc3.Job = job2
alloc3.JobID = job2.ID
alloc3.EvalID = eval.ID
job2.JobModifyIndex = 500
alloc3.DesiredStatus = structs.AllocDesiredStatusRun
alloc3.ClientStatus = structs.AllocClientStatusLost
err = state.UpsertAllocs(1002, []*structs.Allocation{alloc, alloc2, alloc3})
if err != nil {
t.Fatalf("err: %v", err)
}
// Update the time tables to make this work
tt := s1.fsm.TimeTable()
tt.Witness(2000, time.Now().UTC().Add(-1*s1.config.EvalGCThreshold))
// Create a core scheduler
snap, err := state.Snapshot()
if err != nil {
t.Fatalf("err: %v", err)
}
core := NewCoreScheduler(s1, snap)
// Attempt the GC
gc := s1.coreJobEval(structs.CoreJobEvalGC, 2000)
err = core.Process(gc)
if err != nil {
t.Fatalf("err: %v", err)
}
// Alloc1 and 2 should be there, and alloc3 should be gone
ws := memdb.NewWatchSet()
out, err := state.EvalByID(ws, eval.ID)
if err != nil {
t.Fatalf("err: %v", err)
}
if out == nil {
t.Fatalf("bad: %v", out)
}
outA, err := state.AllocByID(ws, alloc.ID)
if err != nil {
t.Fatalf("err: %v", err)
}
if outA == nil {
t.Fatalf("bad: %v", outA)
}
outA2, err := state.AllocByID(ws, alloc2.ID)
if err != nil {
t.Fatalf("err: %v", err)
}
if outA2 == nil {
t.Fatalf("bad: %v", outA2)
}
outA3, err := state.AllocByID(ws, alloc3.ID)
if err != nil {
t.Fatalf("err: %v", err)
}
if outA3 != nil {
t.Fatalf("expected alloc to be nil:%v", outA2)
}
outB, err := state.JobByID(ws, job.Namespace, job.ID)
if err != nil {
t.Fatalf("err: %v", err)
}
if outB == nil {
t.Fatalf("bad: %v", outB)
}
}
// An EvalGC should reap a batch job that has been stopped // An EvalGC should reap a batch job that has been stopped
func TestCoreScheduler_EvalGC_BatchStopped(t *testing.T) { func TestCoreScheduler_EvalGC_BatchStopped(t *testing.T) {
t.Parallel() t.Parallel()
@ -1798,18 +1924,20 @@ func TestCoreScheduler_PartitionJobReap(t *testing.T) {
// Tests various scenarios when allocations are eligible to be GCed // Tests various scenarios when allocations are eligible to be GCed
func TestAllocation_GCEligible(t *testing.T) { func TestAllocation_GCEligible(t *testing.T) {
type testCase struct { type testCase struct {
Desc string Desc string
GCTime time.Time GCTime time.Time
ClientStatus string ClientStatus string
DesiredStatus string DesiredStatus string
JobStatus string JobStatus string
JobStop bool JobStop bool
ModifyIndex uint64 AllocJobModifyIndex uint64
NextAllocID string JobModifyIndex uint64
ReschedulePolicy *structs.ReschedulePolicy ModifyIndex uint64
RescheduleTrackers []*structs.RescheduleEvent NextAllocID string
ThresholdIndex uint64 ReschedulePolicy *structs.ReschedulePolicy
ShouldGC bool RescheduleTrackers []*structs.RescheduleEvent
ThresholdIndex uint64
ShouldGC bool
} }
fail := time.Now() fail := time.Now()