Merge pull request #1060 from hashicorp/b-gc-fix

Force GC checks nodes last and fix eval GC to cleanup deregistered batch jobs
This commit is contained in:
Alex Dadgar 2016-04-08 14:13:54 -07:00
commit d9908ee981
5 changed files with 123 additions and 43 deletions

View file

@ -28,19 +28,35 @@ func NewCoreScheduler(srv *Server, snap *state.StateSnapshot) scheduler.Schedule
}
// Process is used to implement the scheduler.Scheduler interface
func (s *CoreScheduler) Process(eval *structs.Evaluation) error {
func (c *CoreScheduler) Process(eval *structs.Evaluation) error {
switch eval.JobID {
case structs.CoreJobEvalGC:
return s.evalGC(eval)
return c.evalGC(eval)
case structs.CoreJobNodeGC:
return s.nodeGC(eval)
return c.nodeGC(eval)
case structs.CoreJobJobGC:
return s.jobGC(eval)
return c.jobGC(eval)
case structs.CoreJobForceGC:
return c.forceGC(eval)
default:
return fmt.Errorf("core scheduler cannot handle job '%s'", eval.JobID)
}
}
// forceGC is used to garbage collect all eligible objects.
func (c *CoreScheduler) forceGC(eval *structs.Evaluation) error {
if err := c.jobGC(eval); err != nil {
return err
}
if err := c.evalGC(eval); err != nil {
return err
}
// Node GC must occur after the others to ensure the allocations are
// cleared.
return c.nodeGC(eval)
}
// jobGC is used to garbage collect eligible jobs.
func (c *CoreScheduler) jobGC(eval *structs.Evaluation) error {
// Get all the jobs eligible for garbage collection.
@ -50,7 +66,7 @@ func (c *CoreScheduler) jobGC(eval *structs.Evaluation) error {
}
var oldThreshold uint64
if eval.TriggeredBy == structs.EvalTriggerForceGC {
if eval.JobID == structs.CoreJobForceGC {
// The GC was forced, so set the threshold to its maximum so everything
// will GC.
oldThreshold = math.MaxUint64
@ -60,9 +76,9 @@ func (c *CoreScheduler) jobGC(eval *structs.Evaluation) error {
tt := c.srv.fsm.TimeTable()
cutoff := time.Now().UTC().Add(-1 * c.srv.config.JobGCThreshold)
oldThreshold = tt.NearestIndex(cutoff)
c.srv.logger.Printf("[DEBUG] sched.core: job GC: scanning before index %d (%v)",
oldThreshold, c.srv.config.JobGCThreshold)
}
c.srv.logger.Printf("[DEBUG] sched.core: job GC: scanning before index %d (%v)",
oldThreshold, c.srv.config.JobGCThreshold)
// Collect the allocations, evaluations and jobs to GC
var gcAlloc, gcEval, gcJob []string
@ -137,7 +153,7 @@ func (c *CoreScheduler) evalGC(eval *structs.Evaluation) error {
}
var oldThreshold uint64
if eval.TriggeredBy == structs.EvalTriggerForceGC {
if eval.JobID == structs.CoreJobForceGC {
// The GC was forced, so set the threshold to its maximum so everything
// will GC.
oldThreshold = math.MaxUint64
@ -149,9 +165,9 @@ func (c *CoreScheduler) evalGC(eval *structs.Evaluation) error {
tt := c.srv.fsm.TimeTable()
cutoff := time.Now().UTC().Add(-1 * c.srv.config.EvalGCThreshold)
oldThreshold = tt.NearestIndex(cutoff)
c.srv.logger.Printf("[DEBUG] sched.core: eval GC: scanning before index %d (%v)",
oldThreshold, c.srv.config.EvalGCThreshold)
}
c.srv.logger.Printf("[DEBUG] sched.core: eval GC: scanning before index %d (%v)",
oldThreshold, c.srv.config.EvalGCThreshold)
// Collect the allocations and evaluations to GC
var gcAlloc, gcEval []string
@ -163,12 +179,22 @@ func (c *CoreScheduler) evalGC(eval *structs.Evaluation) error {
return err
}
// If the eval is from a "batch" job we don't want to garbage collect
// its allocations. If there is a long running batch job and its
// If the eval is from a running "batch" job we don't want to garbage
// collect its allocations. If there is a long running batch job and its
// terminal allocations get GC'd the scheduler would re-run the
// allocations.
if len(allocs) != 0 && eval.Type == structs.JobTypeBatch {
continue
if eval.Type == structs.JobTypeBatch {
// Check if the job is running
job, err := c.snap.JobByID(eval.JobID)
if err != nil {
return err
}
// If the job has been deregistered, we want to garbage collect the
// allocations and evaluations.
if job != nil && len(allocs) != 0 {
continue
}
}
if gc {
@ -257,7 +283,7 @@ func (c *CoreScheduler) nodeGC(eval *structs.Evaluation) error {
}
var oldThreshold uint64
if eval.TriggeredBy == structs.EvalTriggerForceGC {
if eval.JobID == structs.CoreJobForceGC {
// The GC was forced, so set the threshold to its maximum so everything
// will GC.
oldThreshold = math.MaxUint64
@ -269,9 +295,9 @@ func (c *CoreScheduler) nodeGC(eval *structs.Evaluation) error {
tt := c.srv.fsm.TimeTable()
cutoff := time.Now().UTC().Add(-1 * c.srv.config.NodeGCThreshold)
oldThreshold = tt.NearestIndex(cutoff)
c.srv.logger.Printf("[DEBUG] sched.core: node GC: scanning before index %d (%v)",
oldThreshold, c.srv.config.NodeGCThreshold)
}
c.srv.logger.Printf("[DEBUG] sched.core: node GC: scanning before index %d (%v)",
oldThreshold, c.srv.config.NodeGCThreshold)
// Collect the nodes to GC
var gcNode []string

View file

@ -113,7 +113,77 @@ func TestCoreScheduler_EvalGC_Batch_NoAllocs(t *testing.T) {
}
}
func TestCoreScheduler_EvalGC_Batch_Allocs(t *testing.T) {
func TestCoreScheduler_EvalGC_Batch_Allocs_WithJob(t *testing.T) {
s1 := testServer(t, nil)
defer s1.Shutdown()
testutil.WaitForLeader(t, s1.RPC)
// Insert job.
state := s1.fsm.State()
job := mock.Job()
job.Type = structs.JobTypeBatch
err := state.UpsertJob(1000, job)
if err != nil {
t.Fatalf("err: %v", err)
}
// Insert "dead" eval
eval := mock.Eval()
eval.Type = structs.JobTypeBatch
eval.Status = structs.EvalStatusFailed
eval.JobID = job.ID
if err := state.UpsertEvals(1001, []*structs.Evaluation{eval}); err != nil {
t.Fatalf("err: %v", err)
}
// Insert "dead" alloc
alloc := mock.Alloc()
alloc.EvalID = eval.ID
alloc.JobID = job.ID
alloc.DesiredStatus = structs.AllocDesiredStatusFailed
err = state.UpsertAllocs(1002, []*structs.Allocation{alloc})
if err != nil {
t.Fatalf("err: %v", err)
}
// Update the time tables to make this work
tt := s1.fsm.TimeTable()
tt.Witness(2000, time.Now().UTC().Add(-1*s1.config.EvalGCThreshold))
// Create a core scheduler
snap, err := state.Snapshot()
if err != nil {
t.Fatalf("err: %v", err)
}
core := NewCoreScheduler(s1, snap)
// Attempt the GC
gc := s1.coreJobEval(structs.CoreJobEvalGC)
gc.ModifyIndex = 2000
err = core.Process(gc)
if err != nil {
t.Fatalf("err: %v", err)
}
// Shouldn't be gone because there are associated allocs.
out, err := state.EvalByID(eval.ID)
if err != nil {
t.Fatalf("err: %v", err)
}
if out == nil {
t.Fatalf("bad: %v", out)
}
outA, err := state.AllocByID(alloc.ID)
if err != nil {
t.Fatalf("err: %v", err)
}
if outA == nil {
t.Fatalf("bad: %v", outA)
}
}
func TestCoreScheduler_EvalGC_Batch_Allocs_NoJob(t *testing.T) {
s1 := testServer(t, nil)
defer s1.Shutdown()
testutil.WaitForLeader(t, s1.RPC)
@ -156,22 +226,14 @@ func TestCoreScheduler_EvalGC_Batch_Allocs(t *testing.T) {
t.Fatalf("err: %v", err)
}
// Shouldn't be gone because there are associated allocs.
// Should be gone because the job is deregistered.
out, err := state.EvalByID(eval.ID)
if err != nil {
t.Fatalf("err: %v", err)
}
if out == nil {
if out != nil {
t.Fatalf("bad: %v", out)
}
outA, err := state.AllocByID(alloc.ID)
if err != nil {
t.Fatalf("err: %v", err)
}
if outA == nil {
t.Fatalf("bad: %v", outA)
}
}
func TestCoreScheduler_EvalGC_Force(t *testing.T) {
@ -205,7 +267,7 @@ func TestCoreScheduler_EvalGC_Force(t *testing.T) {
core := NewCoreScheduler(s1, snap)
// Attempt the GC
gc := s1.forceCoreJobEval(structs.CoreJobEvalGC)
gc := s1.coreJobEval(structs.CoreJobForceGC)
err = core.Process(gc)
if err != nil {
t.Fatalf("err: %v", err)
@ -294,7 +356,7 @@ func TestCoreScheduler_NodeGC_Force(t *testing.T) {
core := NewCoreScheduler(s1, snap)
// Attempt the GC
gc := s1.forceCoreJobEval(structs.CoreJobNodeGC)
gc := s1.coreJobEval(structs.CoreJobForceGC)
err = core.Process(gc)
if err != nil {
t.Fatalf("err: %v", err)
@ -502,7 +564,7 @@ func TestCoreScheduler_JobGC_Force(t *testing.T) {
core := NewCoreScheduler(s1, snap)
// Attempt the GC
gc := s1.forceCoreJobEval(structs.CoreJobJobGC)
gc := s1.coreJobEval(structs.CoreJobForceGC)
err = core.Process(gc)
if err != nil {
t.Fatalf("test(%s) err: %v", test.test, err)

View file

@ -270,14 +270,6 @@ func (s *Server) coreJobEval(job string) *structs.Evaluation {
}
}
// forceCoreJobEval returns an evaluation for a core job that will ignore GC
// cutoffs.
func (s *Server) forceCoreJobEval(job string) *structs.Evaluation {
eval := s.coreJobEval(job)
eval.TriggeredBy = structs.EvalTriggerForceGC
return eval
}
// reapFailedEvaluations is used to reap evaluations that
// have reached their delivery limit and should be failed
func (s *Server) reapFailedEvaluations(stopCh chan struct{}) {

View file

@ -2435,7 +2435,6 @@ const (
EvalTriggerPeriodicJob = "periodic-job"
EvalTriggerNodeUpdate = "node-update"
EvalTriggerScheduled = "scheduled"
EvalTriggerForceGC = "force-gc"
EvalTriggerRollingUpdate = "rolling-update"
)
@ -2456,6 +2455,9 @@ const (
// evaluations and allocations are terminal. If so, we delete these out of
// the system.
CoreJobJobGC = "job-gc"
// CoreJobForceGC is used to force garbage collection of all GCable objects.
CoreJobForceGC = "force-gc"
)
// Evaluation is used anytime we need to apply business logic as a result

View file

@ -16,8 +16,6 @@ func (s *System) GarbageCollect(args *structs.GenericRequest, reply *structs.Gen
return err
}
s.srv.evalBroker.Enqueue(s.srv.forceCoreJobEval(structs.CoreJobEvalGC))
s.srv.evalBroker.Enqueue(s.srv.forceCoreJobEval(structs.CoreJobNodeGC))
s.srv.evalBroker.Enqueue(s.srv.forceCoreJobEval(structs.CoreJobJobGC))
s.srv.evalBroker.Enqueue(s.srv.coreJobEval(structs.CoreJobForceGC))
return nil
}