GC: ensure no leakage of evaluations for batch jobs. (#15097)

Prior to 2409f72 the code compared the modification index of a job to itself. Afterwards, the code compared the creation index of the job to itself. In either case there should never be a case of re-parenting of allocs causing the evaluation to trivially always result in false, which leads to unreclaimable memory.

Prior to this change allocations and evaluations for batch jobs were never garbage collected until the batch job was explicitly stopped. The new `batch_eval_gc_threshold` server configuration controls how often they are collected. The default threshold is `24h`.
This commit is contained in:
stswidwinski 2023-01-31 13:32:14 -05:00 committed by GitHub
parent 7838f16e2b
commit 16eefbbf4d
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
8 changed files with 358 additions and 327 deletions

3
.changelog/15097.txt Normal file
View File

@ -0,0 +1,3 @@
```release-note:breaking-change
core: Ensure no leakage of evaluations for batch jobs. Prior to this change allocations and evaluations for batch jobs were never garbage collected until the batch job was explicitly stopped. The new `batch_eval_gc_threshold` server configuration controls how often they are collected. The default threshold is `24h`.
```

View File

@ -390,6 +390,13 @@ func convertServerConfig(agentConfig *Config) (*nomad.Config, error) {
} }
conf.EvalGCThreshold = dur conf.EvalGCThreshold = dur
} }
if gcThreshold := agentConfig.Server.BatchEvalGCThreshold; gcThreshold != "" {
dur, err := time.ParseDuration(gcThreshold)
if err != nil {
return nil, err
}
conf.BatchEvalGCThreshold = dur
}
if gcThreshold := agentConfig.Server.DeploymentGCThreshold; gcThreshold != "" { if gcThreshold := agentConfig.Server.DeploymentGCThreshold; gcThreshold != "" {
dur, err := time.ParseDuration(gcThreshold) dur, err := time.ParseDuration(gcThreshold)
if err != nil { if err != nil {

View File

@ -473,9 +473,14 @@ type ServerConfig struct {
// EvalGCThreshold controls how "old" an eval must be to be collected by GC. // EvalGCThreshold controls how "old" an eval must be to be collected by GC.
// Age is not the only requirement for a eval to be GCed but the threshold // Age is not the only requirement for a eval to be GCed but the threshold
// can be used to filter by age. // can be used to filter by age. Please note that batch job evaluations are
// controlled by 'BatchEvalGCThreshold' instead.
EvalGCThreshold string `hcl:"eval_gc_threshold"` EvalGCThreshold string `hcl:"eval_gc_threshold"`
// BatchEvalGCThreshold controls how "old" an evaluation must be to be eligible
// for GC if the eval belongs to a batch job.
BatchEvalGCThreshold string `hcl:"batch_eval_gc_threshold"`
// DeploymentGCThreshold controls how "old" a deployment must be to be // DeploymentGCThreshold controls how "old" a deployment must be to be
// collected by GC. Age is not the only requirement for a deployment to be // collected by GC. Age is not the only requirement for a deployment to be
// GCed but the threshold can be used to filter by age. // GCed but the threshold can be used to filter by age.
@ -1861,6 +1866,9 @@ func (s *ServerConfig) Merge(b *ServerConfig) *ServerConfig {
if b.EvalGCThreshold != "" { if b.EvalGCThreshold != "" {
result.EvalGCThreshold = b.EvalGCThreshold result.EvalGCThreshold = b.EvalGCThreshold
} }
if b.BatchEvalGCThreshold != "" {
result.BatchEvalGCThreshold = b.BatchEvalGCThreshold
}
if b.DeploymentGCThreshold != "" { if b.DeploymentGCThreshold != "" {
result.DeploymentGCThreshold = b.DeploymentGCThreshold result.DeploymentGCThreshold = b.DeploymentGCThreshold
} }

View File

@ -140,6 +140,7 @@ func TestConfig_Merge(t *testing.T) {
RaftMultiplier: pointer.Of(5), RaftMultiplier: pointer.Of(5),
NumSchedulers: pointer.Of(1), NumSchedulers: pointer.Of(1),
NodeGCThreshold: "1h", NodeGCThreshold: "1h",
BatchEvalGCThreshold: "4h",
HeartbeatGrace: 30 * time.Second, HeartbeatGrace: 30 * time.Second,
MinHeartbeatTTL: 30 * time.Second, MinHeartbeatTTL: 30 * time.Second,
MaxHeartbeatsPerSecond: 30.0, MaxHeartbeatsPerSecond: 30.0,
@ -339,6 +340,7 @@ func TestConfig_Merge(t *testing.T) {
NumSchedulers: pointer.Of(2), NumSchedulers: pointer.Of(2),
EnabledSchedulers: []string{structs.JobTypeBatch}, EnabledSchedulers: []string{structs.JobTypeBatch},
NodeGCThreshold: "12h", NodeGCThreshold: "12h",
BatchEvalGCThreshold: "4h",
HeartbeatGrace: 2 * time.Minute, HeartbeatGrace: 2 * time.Minute,
MinHeartbeatTTL: 2 * time.Minute, MinHeartbeatTTL: 2 * time.Minute,
MaxHeartbeatsPerSecond: 200.0, MaxHeartbeatsPerSecond: 200.0,

View File

@ -153,8 +153,15 @@ type Config struct {
// EvalGCThreshold is how "old" an evaluation must be to be eligible // EvalGCThreshold is how "old" an evaluation must be to be eligible
// for GC. This gives users some time to debug a failed evaluation. // for GC. This gives users some time to debug a failed evaluation.
//
// Please note that the rules for GC of evaluations which belong to a batch
// job are separate and controlled by `BatchEvalGCThreshold`
EvalGCThreshold time.Duration EvalGCThreshold time.Duration
// BatchEvalGCThreshold is how "old" an evaluation must be to be eligible
// for GC if the eval belongs to a batch job.
BatchEvalGCThreshold time.Duration
// JobGCInterval is how often we dispatch a job to GC jobs that are // JobGCInterval is how often we dispatch a job to GC jobs that are
// available for garbage collection. // available for garbage collection.
JobGCInterval time.Duration JobGCInterval time.Duration
@ -460,6 +467,7 @@ func DefaultConfig() *Config {
ReconcileInterval: 60 * time.Second, ReconcileInterval: 60 * time.Second,
EvalGCInterval: 5 * time.Minute, EvalGCInterval: 5 * time.Minute,
EvalGCThreshold: 1 * time.Hour, EvalGCThreshold: 1 * time.Hour,
BatchEvalGCThreshold: 24 * time.Hour,
JobGCInterval: 5 * time.Minute, JobGCInterval: 5 * time.Minute,
JobGCThreshold: 4 * time.Hour, JobGCThreshold: 4 * time.Hour,
NodeGCInterval: 5 * time.Minute, NodeGCInterval: 5 * time.Minute,

View File

@ -240,15 +240,20 @@ func (c *CoreScheduler) evalGC(eval *structs.Evaluation) error {
oldThreshold := c.getThreshold(eval, "eval", oldThreshold := c.getThreshold(eval, "eval",
"eval_gc_threshold", c.srv.config.EvalGCThreshold) "eval_gc_threshold", c.srv.config.EvalGCThreshold)
batchOldThreshold := c.getThreshold(eval, "eval",
"batch_eval_gc_threshold", c.srv.config.BatchEvalGCThreshold)
// Collect the allocations and evaluations to GC // Collect the allocations and evaluations to GC
var gcAlloc, gcEval []string var gcAlloc, gcEval []string
for raw := iter.Next(); raw != nil; raw = iter.Next() { for raw := iter.Next(); raw != nil; raw = iter.Next() {
eval := raw.(*structs.Evaluation) eval := raw.(*structs.Evaluation)
// The Evaluation GC should not handle batch jobs since those need to be gcThreshold := oldThreshold
// garbage collected in one shot if eval.Type == structs.JobTypeBatch {
gc, allocs, err := c.gcEval(eval, oldThreshold, false) gcThreshold = batchOldThreshold
}
gc, allocs, err := c.gcEval(eval, gcThreshold, false)
if err != nil { if err != nil {
return err return err
} }
@ -299,33 +304,26 @@ func (c *CoreScheduler) gcEval(eval *structs.Evaluation, thresholdIndex uint64,
} }
// If the eval is from a running "batch" job we don't want to garbage // If the eval is from a running "batch" job we don't want to garbage
// collect its allocations. If there is a long running batch job and its // collect its most current allocations. If there is a long running batch job and its
// terminal allocations get GC'd the scheduler would re-run the // terminal allocations get GC'd the scheduler would re-run the allocations. However,
// allocations. // we do want to GC old Evals and Allocs if there are newer ones due to update.
//
// The age of the evaluation must also reach the threshold configured to be GCed so that
// one may debug old evaluations and referenced allocations.
if eval.Type == structs.JobTypeBatch { if eval.Type == structs.JobTypeBatch {
// Check if the job is running // Check if the job is running
// Can collect if: // Can collect if either holds:
// Job doesn't exist // - Job doesn't exist
// Job is Stopped and dead // - Job is Stopped and dead
// allowBatch and the job is dead // - allowBatch and the job is dead
collect := false //
if job == nil { // If we cannot collect outright, check if a partial GC may occur
collect = true collect := job == nil || job.Status == structs.JobStatusDead && (job.Stop || allowBatch)
} else if job.Status != structs.JobStatusDead {
collect = false
} else if job.Stop {
collect = true
} else if allowBatch {
collect = true
}
// We don't want to gc anything related to a job which is not dead
// If the batch job doesn't exist we can GC it regardless of allowBatch
if !collect { if !collect {
// Find allocs associated with older (based on createindex) and GC them if terminal oldAllocs := olderVersionTerminalAllocs(allocs, job, thresholdIndex)
oldAllocs := olderVersionTerminalAllocs(allocs, job) gcEval := (len(oldAllocs) == len(allocs))
return false, oldAllocs, nil return gcEval, oldAllocs, nil
} }
} }
@ -346,12 +344,12 @@ func (c *CoreScheduler) gcEval(eval *structs.Evaluation, thresholdIndex uint64,
return gcEval, gcAllocIDs, nil return gcEval, gcAllocIDs, nil
} }
// olderVersionTerminalAllocs returns terminal allocations whose job create index // olderVersionTerminalAllocs returns a list of terminal allocations that belong to the evaluation and may be
// is older than the job's create index // GCed.
func olderVersionTerminalAllocs(allocs []*structs.Allocation, job *structs.Job) []string { func olderVersionTerminalAllocs(allocs []*structs.Allocation, job *structs.Job, thresholdIndex uint64) []string {
var ret []string var ret []string
for _, alloc := range allocs { for _, alloc := range allocs {
if alloc.Job != nil && alloc.Job.CreateIndex < job.CreateIndex && alloc.TerminalStatus() { if alloc.CreateIndex < job.JobModifyIndex && alloc.ModifyIndex < thresholdIndex && alloc.TerminalStatus() {
ret = append(ret, alloc.ID) ret = append(ret, alloc.ID)
} }
} }

View File

@ -14,6 +14,7 @@ import (
"github.com/hashicorp/nomad/nomad/state" "github.com/hashicorp/nomad/nomad/state"
"github.com/hashicorp/nomad/nomad/structs" "github.com/hashicorp/nomad/nomad/structs"
"github.com/hashicorp/nomad/testutil" "github.com/hashicorp/nomad/testutil"
"github.com/shoenig/test/must"
"github.com/stretchr/testify/assert" "github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require" "github.com/stretchr/testify/require"
) )
@ -293,324 +294,320 @@ func TestCoreScheduler_EvalGC_StoppedJob_Reschedulable(t *testing.T) {
func TestCoreScheduler_EvalGC_Batch(t *testing.T) { func TestCoreScheduler_EvalGC_Batch(t *testing.T) {
ci.Parallel(t) ci.Parallel(t)
s1, cleanupS1 := TestServer(t, nil) s1, cleanupS1 := TestServer(t, func(c *Config) {
// Set EvalGCThreshold past BatchEvalThreshold to make sure that only
// BatchEvalThreshold affects the results.
c.BatchEvalGCThreshold = time.Hour
c.EvalGCThreshold = 2 * time.Hour
})
defer cleanupS1() defer cleanupS1()
testutil.WaitForLeader(t, s1.RPC) testutil.WaitForLeader(t, s1.RPC)
// COMPAT Remove in 0.6: Reset the FSM time table since we reconcile which sets index 0 // COMPAT Remove in 0.6: Reset the FSM time table since we reconcile which sets index 0
s1.fsm.timetable.table = make([]TimeTableEntry, 1, 10) s1.fsm.timetable.table = make([]TimeTableEntry, 2, 10)
// Insert a "dead" job var jobModifyIdx uint64 = 1000
// A "stopped" job containing one "complete" eval with one terminal allocation.
store := s1.fsm.State() store := s1.fsm.State()
job := mock.Job() stoppedJob := mock.Job()
job.Type = structs.JobTypeBatch stoppedJob.Type = structs.JobTypeBatch
job.Status = structs.JobStatusDead stoppedJob.Status = structs.JobStatusDead
err := store.UpsertJob(structs.MsgTypeTestSetup, 1000, job) stoppedJob.Stop = true
if err != nil { stoppedJob.TaskGroups[0].ReschedulePolicy = &structs.ReschedulePolicy{
t.Fatalf("err: %v", err)
}
// Insert "complete" eval
eval := mock.Eval()
eval.Status = structs.EvalStatusComplete
eval.Type = structs.JobTypeBatch
eval.JobID = job.ID
err = store.UpsertEvals(structs.MsgTypeTestSetup, 1001, []*structs.Evaluation{eval})
if err != nil {
t.Fatalf("err: %v", err)
}
// Insert "failed" alloc
alloc := mock.Alloc()
alloc.Job = job
alloc.JobID = job.ID
alloc.EvalID = eval.ID
alloc.DesiredStatus = structs.AllocDesiredStatusStop
// Insert "lost" alloc
alloc2 := mock.Alloc()
alloc2.Job = job
alloc2.JobID = job.ID
alloc2.EvalID = eval.ID
alloc2.DesiredStatus = structs.AllocDesiredStatusRun
alloc2.ClientStatus = structs.AllocClientStatusLost
err = store.UpsertAllocs(structs.MsgTypeTestSetup, 1002, []*structs.Allocation{alloc, alloc2})
if err != nil {
t.Fatalf("err: %v", err)
}
// Update the time tables to make this work
tt := s1.fsm.TimeTable()
tt.Witness(2000, time.Now().UTC().Add(-1*s1.config.EvalGCThreshold))
// Create a core scheduler
snap, err := store.Snapshot()
if err != nil {
t.Fatalf("err: %v", err)
}
core := NewCoreScheduler(s1, snap)
// Attempt the GC
gc := s1.coreJobEval(structs.CoreJobEvalGC, 2000)
err = core.Process(gc)
if err != nil {
t.Fatalf("err: %v", err)
}
// Nothing should be gone
ws := memdb.NewWatchSet()
out, err := store.EvalByID(ws, eval.ID)
if err != nil {
t.Fatalf("err: %v", err)
}
if out == nil {
t.Fatalf("bad: %v", out)
}
outA, err := store.AllocByID(ws, alloc.ID)
if err != nil {
t.Fatalf("err: %v", err)
}
if outA == nil {
t.Fatalf("bad: %v", outA)
}
outA2, err := store.AllocByID(ws, alloc2.ID)
if err != nil {
t.Fatalf("err: %v", err)
}
if outA2 == nil {
t.Fatalf("bad: %v", outA2)
}
outB, err := store.JobByID(ws, job.Namespace, job.ID)
if err != nil {
t.Fatalf("err: %v", err)
}
if outB == nil {
t.Fatalf("bad: %v", outB)
}
}
// An EvalGC should reap allocations from jobs with an older modify index
func TestCoreScheduler_EvalGC_Batch_OldVersion(t *testing.T) {
ci.Parallel(t)
s1, cleanupS1 := TestServer(t, nil)
defer cleanupS1()
testutil.WaitForLeader(t, s1.RPC)
// COMPAT Remove in 0.6: Reset the FSM time table since we reconcile which sets index 0
s1.fsm.timetable.table = make([]TimeTableEntry, 1, 10)
// Insert a "dead" job
store := s1.fsm.State()
job := mock.Job()
job.Type = structs.JobTypeBatch
job.Status = structs.JobStatusDead
err := store.UpsertJob(structs.MsgTypeTestSetup, 1000, job)
if err != nil {
t.Fatalf("err: %v", err)
}
// Insert "complete" eval
eval := mock.Eval()
eval.Status = structs.EvalStatusComplete
eval.Type = structs.JobTypeBatch
eval.JobID = job.ID
err = store.UpsertEvals(structs.MsgTypeTestSetup, 1001, []*structs.Evaluation{eval})
if err != nil {
t.Fatalf("err: %v", err)
}
// Insert "failed" alloc
alloc := mock.Alloc()
alloc.Job = job
alloc.JobID = job.ID
alloc.EvalID = eval.ID
alloc.DesiredStatus = structs.AllocDesiredStatusStop
// Insert "lost" alloc
alloc2 := mock.Alloc()
alloc2.Job = job
alloc2.JobID = job.ID
alloc2.EvalID = eval.ID
alloc2.DesiredStatus = structs.AllocDesiredStatusRun
alloc2.ClientStatus = structs.AllocClientStatusLost
// Insert alloc with older job modifyindex
alloc3 := mock.Alloc()
job2 := job.Copy()
alloc3.Job = job2
alloc3.JobID = job2.ID
alloc3.EvalID = eval.ID
job2.CreateIndex = 500
alloc3.DesiredStatus = structs.AllocDesiredStatusRun
alloc3.ClientStatus = structs.AllocClientStatusLost
err = store.UpsertAllocs(structs.MsgTypeTestSetup, 1002, []*structs.Allocation{alloc, alloc2, alloc3})
if err != nil {
t.Fatalf("err: %v", err)
}
// Update the time tables to make this work
tt := s1.fsm.TimeTable()
tt.Witness(2000, time.Now().UTC().Add(-1*s1.config.EvalGCThreshold))
// Create a core scheduler
snap, err := store.Snapshot()
if err != nil {
t.Fatalf("err: %v", err)
}
core := NewCoreScheduler(s1, snap)
// Attempt the GC
gc := s1.coreJobEval(structs.CoreJobEvalGC, 2000)
err = core.Process(gc)
if err != nil {
t.Fatalf("err: %v", err)
}
// Alloc1 and 2 should be there, and alloc3 should be gone
ws := memdb.NewWatchSet()
out, err := store.EvalByID(ws, eval.ID)
if err != nil {
t.Fatalf("err: %v", err)
}
if out == nil {
t.Fatalf("bad: %v", out)
}
outA, err := store.AllocByID(ws, alloc.ID)
if err != nil {
t.Fatalf("err: %v", err)
}
if outA == nil {
t.Fatalf("bad: %v", outA)
}
outA2, err := store.AllocByID(ws, alloc2.ID)
if err != nil {
t.Fatalf("err: %v", err)
}
if outA2 == nil {
t.Fatalf("bad: %v", outA2)
}
outA3, err := store.AllocByID(ws, alloc3.ID)
if err != nil {
t.Fatalf("err: %v", err)
}
if outA3 != nil {
t.Fatalf("expected alloc to be nil:%v", outA2)
}
outB, err := store.JobByID(ws, job.Namespace, job.ID)
if err != nil {
t.Fatalf("err: %v", err)
}
if outB == nil {
t.Fatalf("bad: %v", outB)
}
}
// An EvalGC should reap a batch job that has been stopped
func TestCoreScheduler_EvalGC_BatchStopped(t *testing.T) {
ci.Parallel(t)
s1, cleanupS1 := TestServer(t, nil)
defer cleanupS1()
testutil.WaitForLeader(t, s1.RPC)
// COMPAT Remove in 0.6: Reset the FSM time table since we reconcile which sets index 0
s1.fsm.timetable.table = make([]TimeTableEntry, 1, 10)
// Create a "dead" job
store := s1.fsm.State()
job := mock.Job()
job.Type = structs.JobTypeBatch
job.Status = structs.JobStatusDead
job.Stop = true
job.TaskGroups[0].ReschedulePolicy = &structs.ReschedulePolicy{
Attempts: 0, Attempts: 0,
Interval: 0 * time.Second, Interval: 0 * time.Second,
} }
err := store.UpsertJob(structs.MsgTypeTestSetup, 1001, job) err := store.UpsertJob(structs.MsgTypeTestSetup, jobModifyIdx+1, stoppedJob)
require.Nil(t, err) must.NoError(t, err)
// Insert "complete" eval stoppedJobEval := mock.Eval()
eval := mock.Eval() stoppedJobEval.Status = structs.EvalStatusComplete
eval.Status = structs.EvalStatusComplete stoppedJobEval.Type = structs.JobTypeBatch
eval.Type = structs.JobTypeBatch stoppedJobEval.JobID = stoppedJob.ID
eval.JobID = job.ID err = store.UpsertEvals(structs.MsgTypeTestSetup, jobModifyIdx+2, []*structs.Evaluation{stoppedJobEval})
err = store.UpsertEvals(structs.MsgTypeTestSetup, 1002, []*structs.Evaluation{eval}) must.NoError(t, err)
require.Nil(t, err)
// Insert "failed" alloc stoppedJobStoppedAlloc := mock.Alloc()
alloc := mock.Alloc() stoppedJobStoppedAlloc.Job = stoppedJob
alloc.JobID = job.ID stoppedJobStoppedAlloc.JobID = stoppedJob.ID
alloc.EvalID = eval.ID stoppedJobStoppedAlloc.EvalID = stoppedJobEval.ID
alloc.TaskGroup = job.TaskGroups[0].Name stoppedJobStoppedAlloc.DesiredStatus = structs.AllocDesiredStatusStop
alloc.DesiredStatus = structs.AllocDesiredStatusStop stoppedJobStoppedAlloc.ClientStatus = structs.AllocClientStatusFailed
// Insert "lost" alloc stoppedJobLostAlloc := mock.Alloc()
alloc2 := mock.Alloc() stoppedJobLostAlloc.Job = stoppedJob
alloc2.JobID = job.ID stoppedJobLostAlloc.JobID = stoppedJob.ID
alloc2.EvalID = eval.ID stoppedJobLostAlloc.EvalID = stoppedJobEval.ID
alloc2.DesiredStatus = structs.AllocDesiredStatusRun stoppedJobLostAlloc.DesiredStatus = structs.AllocDesiredStatusRun
alloc2.ClientStatus = structs.AllocClientStatusLost stoppedJobLostAlloc.ClientStatus = structs.AllocClientStatusLost
alloc2.TaskGroup = job.TaskGroups[0].Name
err = store.UpsertAllocs(structs.MsgTypeTestSetup, 1003, []*structs.Allocation{alloc, alloc2}) err = store.UpsertAllocs(structs.MsgTypeTestSetup, jobModifyIdx+3, []*structs.Allocation{stoppedJobStoppedAlloc, stoppedJobLostAlloc})
if err != nil { must.NoError(t, err)
t.Fatalf("err: %v", err)
// A "dead" job containing one "complete" eval with:
// 1. A "stopped" alloc
// 2. A "lost" alloc
// Both allocs upserted at 1002.
deadJob := mock.Job()
deadJob.Type = structs.JobTypeBatch
deadJob.Status = structs.JobStatusDead
err = store.UpsertJob(structs.MsgTypeTestSetup, jobModifyIdx, deadJob)
must.NoError(t, err)
deadJobEval := mock.Eval()
deadJobEval.Status = structs.EvalStatusComplete
deadJobEval.Type = structs.JobTypeBatch
deadJobEval.JobID = deadJob.ID
err = store.UpsertEvals(structs.MsgTypeTestSetup, jobModifyIdx+1, []*structs.Evaluation{deadJobEval})
must.NoError(t, err)
stoppedAlloc := mock.Alloc()
stoppedAlloc.Job = deadJob
stoppedAlloc.JobID = deadJob.ID
stoppedAlloc.EvalID = deadJobEval.ID
stoppedAlloc.DesiredStatus = structs.AllocDesiredStatusStop
stoppedAlloc.ClientStatus = structs.AllocClientStatusFailed
lostAlloc := mock.Alloc()
lostAlloc.Job = deadJob
lostAlloc.JobID = deadJob.ID
lostAlloc.EvalID = deadJobEval.ID
lostAlloc.DesiredStatus = structs.AllocDesiredStatusRun
lostAlloc.ClientStatus = structs.AllocClientStatusLost
err = store.UpsertAllocs(structs.MsgTypeTestSetup, jobModifyIdx+2, []*structs.Allocation{stoppedAlloc, lostAlloc})
must.NoError(t, err)
// An "alive" job #2 containing two complete evals. The first with:
// 1. A "lost" alloc
// 2. A "running" alloc
// Both allocs upserted at 999
//
// The second with just terminal allocs:
// 1. A "completed" alloc
// All allocs upserted at 999. The eval upserted at 999 as well.
activeJob := mock.Job()
activeJob.Type = structs.JobTypeBatch
activeJob.Status = structs.JobStatusDead
err = store.UpsertJob(structs.MsgTypeTestSetup, jobModifyIdx, activeJob)
must.NoError(t, err)
activeJobEval := mock.Eval()
activeJobEval.Status = structs.EvalStatusComplete
activeJobEval.Type = structs.JobTypeBatch
activeJobEval.JobID = activeJob.ID
err = store.UpsertEvals(structs.MsgTypeTestSetup, jobModifyIdx+1, []*structs.Evaluation{activeJobEval})
must.NoError(t, err)
activeJobRunningAlloc := mock.Alloc()
activeJobRunningAlloc.Job = activeJob
activeJobRunningAlloc.JobID = activeJob.ID
activeJobRunningAlloc.EvalID = activeJobEval.ID
activeJobRunningAlloc.DesiredStatus = structs.AllocDesiredStatusRun
activeJobRunningAlloc.ClientStatus = structs.AllocClientStatusRunning
activeJobLostAlloc := mock.Alloc()
activeJobLostAlloc.Job = activeJob
activeJobLostAlloc.JobID = activeJob.ID
activeJobLostAlloc.EvalID = activeJobEval.ID
activeJobLostAlloc.DesiredStatus = structs.AllocDesiredStatusRun
activeJobLostAlloc.ClientStatus = structs.AllocClientStatusLost
err = store.UpsertAllocs(structs.MsgTypeTestSetup, jobModifyIdx-1, []*structs.Allocation{activeJobRunningAlloc, activeJobLostAlloc})
must.NoError(t, err)
activeJobCompleteEval := mock.Eval()
activeJobCompleteEval.Status = structs.EvalStatusComplete
activeJobCompleteEval.Type = structs.JobTypeBatch
activeJobCompleteEval.JobID = activeJob.ID
err = store.UpsertEvals(structs.MsgTypeTestSetup, jobModifyIdx-1, []*structs.Evaluation{activeJobCompleteEval})
must.NoError(t, err)
activeJobCompletedEvalCompletedAlloc := mock.Alloc()
activeJobCompletedEvalCompletedAlloc.Job = activeJob
activeJobCompletedEvalCompletedAlloc.JobID = activeJob.ID
activeJobCompletedEvalCompletedAlloc.EvalID = activeJobCompleteEval.ID
activeJobCompletedEvalCompletedAlloc.DesiredStatus = structs.AllocDesiredStatusStop
activeJobCompletedEvalCompletedAlloc.ClientStatus = structs.AllocClientStatusComplete
err = store.UpsertAllocs(structs.MsgTypeTestSetup, jobModifyIdx-1, []*structs.Allocation{activeJobCompletedEvalCompletedAlloc})
must.NoError(t, err)
// A job that ran once and was then purged.
purgedJob := mock.Job()
purgedJob.Type = structs.JobTypeBatch
purgedJob.Status = structs.JobStatusDead
err = store.UpsertJob(structs.MsgTypeTestSetup, jobModifyIdx, purgedJob)
must.NoError(t, err)
purgedJobEval := mock.Eval()
purgedJobEval.Status = structs.EvalStatusComplete
purgedJobEval.Type = structs.JobTypeBatch
purgedJobEval.JobID = purgedJob.ID
err = store.UpsertEvals(structs.MsgTypeTestSetup, jobModifyIdx+1, []*structs.Evaluation{purgedJobEval})
must.NoError(t, err)
purgedJobCompleteAlloc := mock.Alloc()
purgedJobCompleteAlloc.Job = purgedJob
purgedJobCompleteAlloc.JobID = purgedJob.ID
purgedJobCompleteAlloc.EvalID = purgedJobEval.ID
purgedJobCompleteAlloc.DesiredStatus = structs.AllocDesiredStatusRun
purgedJobCompleteAlloc.ClientStatus = structs.AllocClientStatusLost
err = store.UpsertAllocs(structs.MsgTypeTestSetup, jobModifyIdx-1, []*structs.Allocation{purgedJobCompleteAlloc})
must.NoError(t, err)
purgedJobCompleteEval := mock.Eval()
purgedJobCompleteEval.Status = structs.EvalStatusComplete
purgedJobCompleteEval.Type = structs.JobTypeBatch
purgedJobCompleteEval.JobID = purgedJob.ID
err = store.UpsertEvals(structs.MsgTypeTestSetup, jobModifyIdx-1, []*structs.Evaluation{purgedJobCompleteEval})
must.NoError(t, err)
// Purge job.
err = store.DeleteJob(jobModifyIdx, purgedJob.Namespace, purgedJob.ID)
must.NoError(t, err)
// A little helper for assertions
assertCorrectJobEvalAlloc := func(
ws memdb.WatchSet,
jobsShouldExist []*structs.Job,
jobsShouldNotExist []*structs.Job,
evalsShouldExist []*structs.Evaluation,
evalsShouldNotExist []*structs.Evaluation,
allocsShouldExist []*structs.Allocation,
allocsShouldNotExist []*structs.Allocation,
) {
t.Helper()
for _, job := range jobsShouldExist {
out, err := store.JobByID(ws, job.Namespace, job.ID)
must.NoError(t, err)
must.NotNil(t, out)
}
for _, job := range jobsShouldNotExist {
out, err := store.JobByID(ws, job.Namespace, job.ID)
must.NoError(t, err)
must.Nil(t, out)
}
for _, eval := range evalsShouldExist {
out, err := store.EvalByID(ws, eval.ID)
must.NoError(t, err)
must.NotNil(t, out)
}
for _, eval := range evalsShouldNotExist {
out, err := store.EvalByID(ws, eval.ID)
must.NoError(t, err)
must.Nil(t, out)
}
for _, alloc := range allocsShouldExist {
outA, err := store.AllocByID(ws, alloc.ID)
must.NoError(t, err)
must.NotNil(t, outA)
}
for _, alloc := range allocsShouldNotExist {
outA, err := store.AllocByID(ws, alloc.ID)
must.NoError(t, err)
must.Nil(t, outA)
}
} }
// Update the time tables to make this work
tt := s1.fsm.TimeTable()
tt.Witness(2000, time.Now().UTC().Add(-1*s1.config.EvalGCThreshold))
// Create a core scheduler // Create a core scheduler
snap, err := store.Snapshot() snap, err := store.Snapshot()
if err != nil { must.NoError(t, err)
t.Fatalf("err: %v", err)
}
core := NewCoreScheduler(s1, snap) core := NewCoreScheduler(s1, snap)
// Attempt the GC // Attempt the GC without moving the time at all
gc := s1.coreJobEval(structs.CoreJobEvalGC, 2000) gc := s1.coreJobEval(structs.CoreJobEvalGC, jobModifyIdx)
err = core.Process(gc) err = core.Process(gc)
if err != nil { must.NoError(t, err)
t.Fatalf("err: %v", err)
}
// Everything should be gone // Nothing is gone
ws := memdb.NewWatchSet() assertCorrectJobEvalAlloc(
out, err := store.EvalByID(ws, eval.ID) memdb.NewWatchSet(),
if err != nil { []*structs.Job{deadJob, activeJob, stoppedJob},
t.Fatalf("err: %v", err) []*structs.Job{},
} []*structs.Evaluation{
if out != nil { deadJobEval,
t.Fatalf("bad: %v", out) activeJobEval, activeJobCompleteEval,
} stoppedJobEval,
purgedJobEval,
},
[]*structs.Evaluation{},
[]*structs.Allocation{
stoppedAlloc, lostAlloc,
activeJobRunningAlloc, activeJobLostAlloc, activeJobCompletedEvalCompletedAlloc,
stoppedJobStoppedAlloc, stoppedJobLostAlloc,
purgedJobCompleteAlloc,
},
[]*structs.Allocation{},
)
outA, err := store.AllocByID(ws, alloc.ID) // Update the time tables by half of the BatchEvalGCThreshold which is too
if err != nil { // small to GC anything.
t.Fatalf("err: %v", err) tt := s1.fsm.TimeTable()
} tt.Witness(2*jobModifyIdx, time.Now().UTC().Add((-1)*s1.config.BatchEvalGCThreshold/2))
if outA != nil {
t.Fatalf("bad: %v", outA)
}
outA2, err := store.AllocByID(ws, alloc2.ID) gc = s1.coreJobEval(structs.CoreJobEvalGC, jobModifyIdx*2)
if err != nil { err = core.Process(gc)
t.Fatalf("err: %v", err) must.NoError(t, err)
}
if outA2 != nil { // Nothing is gone.
t.Fatalf("bad: %v", outA2) assertCorrectJobEvalAlloc(
} memdb.NewWatchSet(),
[]*structs.Job{deadJob, activeJob, stoppedJob},
[]*structs.Job{},
[]*structs.Evaluation{
deadJobEval,
activeJobEval, activeJobCompleteEval,
stoppedJobEval,
purgedJobEval,
},
[]*structs.Evaluation{},
[]*structs.Allocation{
stoppedAlloc, lostAlloc,
activeJobRunningAlloc, activeJobLostAlloc, activeJobCompletedEvalCompletedAlloc,
stoppedJobStoppedAlloc, stoppedJobLostAlloc,
purgedJobCompleteAlloc,
},
[]*structs.Allocation{},
)
// Update the time tables so that BatchEvalGCThreshold has elapsed.
s1.fsm.timetable.table = make([]TimeTableEntry, 2, 10)
tt = s1.fsm.TimeTable()
tt.Witness(2*jobModifyIdx, time.Now().UTC().Add(-1*s1.config.BatchEvalGCThreshold))
gc = s1.coreJobEval(structs.CoreJobEvalGC, jobModifyIdx*2)
err = core.Process(gc)
must.NoError(t, err)
// We expect the following:
//
// 1. The stopped job remains, but its evaluation and allocations are both removed.
// 2. The dead job remains with its evaluation and allocations intact. This is because
// for them the BatchEvalGCThreshold has not yet elapsed (their modification idx are larger
// than that of the job).
// 3. The active job remains since it is active, even though the allocations are otherwise
// eligible for GC. However, the inactive allocation is GCed for it.
// 4. The eval and allocation for the purged job are GCed.
assertCorrectJobEvalAlloc(
memdb.NewWatchSet(),
[]*structs.Job{deadJob, activeJob, stoppedJob},
[]*structs.Job{},
[]*structs.Evaluation{deadJobEval, activeJobEval},
[]*structs.Evaluation{activeJobCompleteEval, stoppedJobEval, purgedJobEval},
[]*structs.Allocation{stoppedAlloc, lostAlloc, activeJobRunningAlloc},
[]*structs.Allocation{
activeJobLostAlloc, activeJobCompletedEvalCompletedAlloc,
stoppedJobLostAlloc, stoppedJobLostAlloc,
purgedJobCompleteAlloc,
})
} }
func TestCoreScheduler_EvalGC_Partial(t *testing.T) { func TestCoreScheduler_EvalGC_Partial(t *testing.T) {

View File

@ -93,7 +93,15 @@ server {
- `eval_gc_threshold` `(string: "1h")` - Specifies the minimum time an - `eval_gc_threshold` `(string: "1h")` - Specifies the minimum time an
evaluation must be in the terminal state before it is eligible for garbage evaluation must be in the terminal state before it is eligible for garbage
collection. This is specified using a label suffix like "30s" or "1h". collection. This is specified using a label suffix like "30s" or "1h". Note
that batch job evaluations are controlled via `batch_eval_gc_threshold`.
- `batch_eval_gc_threshold` `(string: "24h")` - Specifies the minimum time an
evaluation stemming from a batch job must be in the terminal state before it is
eligible for garbage collection. This is specified using a label suffix like
"30s" or "1h". Note that the threshold is a necessary but insufficient condition
for collection, and the most recent evaluation won't be garbage collected even if
it breaches the threshold.
- `deployment_gc_threshold` `(string: "1h")` - Specifies the minimum time a - `deployment_gc_threshold` `(string: "1h")` - Specifies the minimum time a
deployment must be in the terminal state before it is eligible for garbage deployment must be in the terminal state before it is eligible for garbage