Make garbage collection be aware of rescheduling info in allocations
This commit is contained in:
parent
9d15e0c05b
commit
dd91a2f5be
|
@ -241,16 +241,18 @@ func (c *CoreScheduler) gcEval(eval *structs.Evaluation, thresholdIndex uint64,
|
|||
// Create a watchset
|
||||
ws := memdb.NewWatchSet()
|
||||
|
||||
// Look up the job
|
||||
job, err := c.snap.JobByID(ws, eval.Namespace, eval.JobID)
|
||||
if err != nil {
|
||||
return false, nil, err
|
||||
}
|
||||
|
||||
// If the eval is from a running "batch" job we don't want to garbage
|
||||
// collect its allocations. If there is a long running batch job and its
|
||||
// terminal allocations get GC'd the scheduler would re-run the
|
||||
// allocations.
|
||||
if eval.Type == structs.JobTypeBatch {
|
||||
// Check if the job is running
|
||||
job, err := c.snap.JobByID(ws, eval.Namespace, eval.JobID)
|
||||
if err != nil {
|
||||
return false, nil, err
|
||||
}
|
||||
|
||||
// Can collect if:
|
||||
// Job doesn't exist
|
||||
|
@ -286,7 +288,13 @@ func (c *CoreScheduler) gcEval(eval *structs.Evaluation, thresholdIndex uint64,
|
|||
gcEval := true
|
||||
var gcAllocIDs []string
|
||||
for _, alloc := range allocs {
|
||||
if !alloc.TerminalStatus() || alloc.ModifyIndex > thresholdIndex {
|
||||
var reschedulePolicy *structs.ReschedulePolicy
|
||||
tg := job.LookupTaskGroup(alloc.TaskGroup)
|
||||
|
||||
if tg != nil {
|
||||
reschedulePolicy = tg.ReschedulePolicy
|
||||
}
|
||||
if !alloc.GCEligible(reschedulePolicy, time.Now(), thresholdIndex) {
|
||||
// Can't GC the evaluation since not all of the allocations are
|
||||
// terminal
|
||||
gcEval = false
|
||||
|
|
|
@ -6,10 +6,12 @@ import (
|
|||
"time"
|
||||
|
||||
memdb "github.com/hashicorp/go-memdb"
|
||||
"github.com/hashicorp/nomad/helper/uuid"
|
||||
"github.com/hashicorp/nomad/nomad/mock"
|
||||
"github.com/hashicorp/nomad/nomad/structs"
|
||||
"github.com/hashicorp/nomad/testutil"
|
||||
"github.com/stretchr/testify/assert"
|
||||
"github.com/stretchr/testify/require"
|
||||
)
|
||||
|
||||
func TestCoreScheduler_EvalGC(t *testing.T) {
|
||||
|
@ -17,6 +19,7 @@ func TestCoreScheduler_EvalGC(t *testing.T) {
|
|||
s1 := testServer(t, nil)
|
||||
defer s1.Shutdown()
|
||||
testutil.WaitForLeader(t, s1.RPC)
|
||||
require := require.New(t)
|
||||
|
||||
// COMPAT Remove in 0.6: Reset the FSM time table since we reconcile which sets index 0
|
||||
s1.fsm.timetable.table = make([]TimeTableEntry, 1, 10)
|
||||
|
@ -27,15 +30,24 @@ func TestCoreScheduler_EvalGC(t *testing.T) {
|
|||
eval.Status = structs.EvalStatusFailed
|
||||
state.UpsertJobSummary(999, mock.JobSummary(eval.JobID))
|
||||
err := state.UpsertEvals(1000, []*structs.Evaluation{eval})
|
||||
if err != nil {
|
||||
t.Fatalf("err: %v", err)
|
||||
require.Nil(err)
|
||||
|
||||
// Insert mock job with rescheduling disabled
|
||||
job := mock.Job()
|
||||
job.ID = eval.JobID
|
||||
job.TaskGroups[0].ReschedulePolicy = &structs.ReschedulePolicy{
|
||||
Attempts: 0,
|
||||
Interval: 0 * time.Second,
|
||||
}
|
||||
err = state.UpsertJob(1001, job)
|
||||
require.Nil(err)
|
||||
|
||||
// Insert "dead" alloc
|
||||
alloc := mock.Alloc()
|
||||
alloc.EvalID = eval.ID
|
||||
alloc.DesiredStatus = structs.AllocDesiredStatusStop
|
||||
alloc.JobID = eval.JobID
|
||||
alloc.TaskGroup = job.TaskGroups[0].Name
|
||||
|
||||
// Insert "lost" alloc
|
||||
alloc2 := mock.Alloc()
|
||||
|
@ -43,6 +55,7 @@ func TestCoreScheduler_EvalGC(t *testing.T) {
|
|||
alloc2.DesiredStatus = structs.AllocDesiredStatusRun
|
||||
alloc2.ClientStatus = structs.AllocClientStatusLost
|
||||
alloc2.JobID = eval.JobID
|
||||
alloc2.TaskGroup = job.TaskGroups[0].Name
|
||||
err = state.UpsertAllocs(1001, []*structs.Allocation{alloc, alloc2})
|
||||
if err != nil {
|
||||
t.Fatalf("err: %v", err)
|
||||
|
@ -93,6 +106,100 @@ func TestCoreScheduler_EvalGC(t *testing.T) {
|
|||
}
|
||||
}
|
||||
|
||||
// Tests GC behavior on allocations being rescheduled
|
||||
func TestCoreScheduler_EvalGC_ReshedulingAllocs(t *testing.T) {
|
||||
t.Parallel()
|
||||
s1 := testServer(t, nil)
|
||||
defer s1.Shutdown()
|
||||
testutil.WaitForLeader(t, s1.RPC)
|
||||
require := require.New(t)
|
||||
|
||||
// COMPAT Remove in 0.6: Reset the FSM time table since we reconcile which sets index 0
|
||||
s1.fsm.timetable.table = make([]TimeTableEntry, 1, 10)
|
||||
|
||||
// Insert "dead" eval
|
||||
state := s1.fsm.State()
|
||||
eval := mock.Eval()
|
||||
eval.Status = structs.EvalStatusFailed
|
||||
state.UpsertJobSummary(999, mock.JobSummary(eval.JobID))
|
||||
err := state.UpsertEvals(1000, []*structs.Evaluation{eval})
|
||||
require.Nil(err)
|
||||
|
||||
// Insert mock job with default reschedule policy of 2 in 10 minutes
|
||||
job := mock.Job()
|
||||
job.ID = eval.JobID
|
||||
|
||||
err = state.UpsertJob(1001, job)
|
||||
require.Nil(err)
|
||||
|
||||
// Insert failed alloc with an old reschedule attempt, can be GCed
|
||||
alloc := mock.Alloc()
|
||||
alloc.EvalID = eval.ID
|
||||
alloc.DesiredStatus = structs.AllocDesiredStatusRun
|
||||
alloc.ClientStatus = structs.AllocClientStatusFailed
|
||||
alloc.JobID = eval.JobID
|
||||
alloc.TaskGroup = job.TaskGroups[0].Name
|
||||
alloc.RescheduleTracker = &structs.RescheduleTracker{
|
||||
Events: []*structs.RescheduleEvent{
|
||||
{
|
||||
RescheduleTime: time.Now().Add(-1 * time.Hour).UTC().UnixNano(),
|
||||
PrevNodeID: uuid.Generate(),
|
||||
PrevAllocID: uuid.Generate(),
|
||||
},
|
||||
},
|
||||
}
|
||||
|
||||
// Insert another failed alloc with a recent reschedule attempt, can't be GCed
|
||||
alloc2 := mock.Alloc()
|
||||
alloc2.EvalID = eval.ID
|
||||
alloc2.DesiredStatus = structs.AllocDesiredStatusRun
|
||||
alloc2.ClientStatus = structs.AllocClientStatusLost
|
||||
alloc2.JobID = eval.JobID
|
||||
alloc2.TaskGroup = job.TaskGroups[0].Name
|
||||
alloc2.RescheduleTracker = &structs.RescheduleTracker{
|
||||
Events: []*structs.RescheduleEvent{
|
||||
{
|
||||
RescheduleTime: time.Now().Add(-3 * time.Minute).UTC().UnixNano(),
|
||||
PrevNodeID: uuid.Generate(),
|
||||
PrevAllocID: uuid.Generate(),
|
||||
},
|
||||
},
|
||||
}
|
||||
err = state.UpsertAllocs(1001, []*structs.Allocation{alloc, alloc2})
|
||||
require.Nil(err)
|
||||
|
||||
// Update the time tables to make this work
|
||||
tt := s1.fsm.TimeTable()
|
||||
tt.Witness(2000, time.Now().UTC().Add(-1*s1.config.EvalGCThreshold))
|
||||
|
||||
// Create a core scheduler
|
||||
snap, err := state.Snapshot()
|
||||
if err != nil {
|
||||
t.Fatalf("err: %v", err)
|
||||
}
|
||||
core := NewCoreScheduler(s1, snap)
|
||||
|
||||
// Attempt the GC
|
||||
gc := s1.coreJobEval(structs.CoreJobEvalGC, 2000)
|
||||
err = core.Process(gc)
|
||||
require.Nil(err)
|
||||
|
||||
// Eval should still exist
|
||||
ws := memdb.NewWatchSet()
|
||||
out, err := state.EvalByID(ws, eval.ID)
|
||||
require.Nil(err)
|
||||
require.Equal(eval.ID, out.ID)
|
||||
|
||||
outA, err := state.AllocByID(ws, alloc.ID)
|
||||
require.Nil(err)
|
||||
require.Nil(outA)
|
||||
|
||||
outA2, err := state.AllocByID(ws, alloc2.ID)
|
||||
require.Nil(err)
|
||||
require.Equal(alloc2.ID, outA2.ID)
|
||||
|
||||
}
|
||||
|
||||
// An EvalGC should never reap a batch job that has not been stopped
|
||||
func TestCoreScheduler_EvalGC_Batch(t *testing.T) {
|
||||
t.Parallel()
|
||||
|
@ -201,6 +308,7 @@ func TestCoreScheduler_EvalGC_BatchStopped(t *testing.T) {
|
|||
defer s1.Shutdown()
|
||||
testutil.WaitForLeader(t, s1.RPC)
|
||||
|
||||
require := require.New(t)
|
||||
// COMPAT Remove in 0.6: Reset the FSM time table since we reconcile which sets index 0
|
||||
s1.fsm.timetable.table = make([]TimeTableEntry, 1, 10)
|
||||
|
||||
|
@ -209,21 +317,27 @@ func TestCoreScheduler_EvalGC_BatchStopped(t *testing.T) {
|
|||
job := mock.Job()
|
||||
job.Type = structs.JobTypeBatch
|
||||
job.Status = structs.JobStatusDead
|
||||
job.Stop = true
|
||||
job.TaskGroups[0].ReschedulePolicy = &structs.ReschedulePolicy{
|
||||
Attempts: 0,
|
||||
Interval: 0 * time.Second,
|
||||
}
|
||||
err := state.UpsertJob(1001, job)
|
||||
require.Nil(err)
|
||||
|
||||
// Insert "complete" eval
|
||||
eval := mock.Eval()
|
||||
eval.Status = structs.EvalStatusComplete
|
||||
eval.Type = structs.JobTypeBatch
|
||||
eval.JobID = job.ID
|
||||
err := state.UpsertEvals(1001, []*structs.Evaluation{eval})
|
||||
if err != nil {
|
||||
t.Fatalf("err: %v", err)
|
||||
}
|
||||
err = state.UpsertEvals(1002, []*structs.Evaluation{eval})
|
||||
require.Nil(err)
|
||||
|
||||
// Insert "failed" alloc
|
||||
alloc := mock.Alloc()
|
||||
alloc.JobID = job.ID
|
||||
alloc.EvalID = eval.ID
|
||||
alloc.TaskGroup = job.TaskGroups[0].Name
|
||||
alloc.DesiredStatus = structs.AllocDesiredStatusStop
|
||||
|
||||
// Insert "lost" alloc
|
||||
|
@ -232,8 +346,9 @@ func TestCoreScheduler_EvalGC_BatchStopped(t *testing.T) {
|
|||
alloc2.EvalID = eval.ID
|
||||
alloc2.DesiredStatus = structs.AllocDesiredStatusRun
|
||||
alloc2.ClientStatus = structs.AllocClientStatusLost
|
||||
alloc2.TaskGroup = job.TaskGroups[0].Name
|
||||
|
||||
err = state.UpsertAllocs(1002, []*structs.Allocation{alloc, alloc2})
|
||||
err = state.UpsertAllocs(1003, []*structs.Allocation{alloc, alloc2})
|
||||
if err != nil {
|
||||
t.Fatalf("err: %v", err)
|
||||
}
|
||||
|
@ -288,7 +403,7 @@ func TestCoreScheduler_EvalGC_Partial(t *testing.T) {
|
|||
s1 := testServer(t, nil)
|
||||
defer s1.Shutdown()
|
||||
testutil.WaitForLeader(t, s1.RPC)
|
||||
|
||||
require := require.New(t)
|
||||
// COMPAT Remove in 0.6: Reset the FSM time table since we reconcile which sets index 0
|
||||
s1.fsm.timetable.table = make([]TimeTableEntry, 1, 10)
|
||||
|
||||
|
@ -302,16 +417,28 @@ func TestCoreScheduler_EvalGC_Partial(t *testing.T) {
|
|||
t.Fatalf("err: %v", err)
|
||||
}
|
||||
|
||||
// Insert mock job with rescheduling disabled
|
||||
job := mock.Job()
|
||||
job.ID = eval.JobID
|
||||
job.TaskGroups[0].ReschedulePolicy = &structs.ReschedulePolicy{
|
||||
Attempts: 0,
|
||||
Interval: 0 * time.Second,
|
||||
}
|
||||
err = state.UpsertJob(1001, job)
|
||||
require.Nil(err)
|
||||
|
||||
// Insert "dead" alloc
|
||||
alloc := mock.Alloc()
|
||||
alloc.EvalID = eval.ID
|
||||
alloc.DesiredStatus = structs.AllocDesiredStatusStop
|
||||
alloc.TaskGroup = job.TaskGroups[0].Name
|
||||
state.UpsertJobSummary(1001, mock.JobSummary(alloc.JobID))
|
||||
|
||||
// Insert "lost" alloc
|
||||
alloc2 := mock.Alloc()
|
||||
alloc2.JobID = alloc.JobID
|
||||
alloc2.EvalID = eval.ID
|
||||
alloc2.TaskGroup = job.TaskGroups[0].Name
|
||||
alloc2.DesiredStatus = structs.AllocDesiredStatusRun
|
||||
alloc2.ClientStatus = structs.AllocClientStatusLost
|
||||
|
||||
|
@ -387,6 +514,7 @@ func TestCoreScheduler_EvalGC_Force(t *testing.T) {
|
|||
t.Parallel()
|
||||
for _, withAcl := range []bool{false, true} {
|
||||
t.Run(fmt.Sprintf("with acl %v", withAcl), func(t *testing.T) {
|
||||
require := require.New(t)
|
||||
var server *Server
|
||||
if withAcl {
|
||||
server, _ = testACLServer(t, nil)
|
||||
|
@ -409,10 +537,21 @@ func TestCoreScheduler_EvalGC_Force(t *testing.T) {
|
|||
t.Fatalf("err: %v", err)
|
||||
}
|
||||
|
||||
// Insert mock job with rescheduling disabled
|
||||
job := mock.Job()
|
||||
job.ID = eval.JobID
|
||||
job.TaskGroups[0].ReschedulePolicy = &structs.ReschedulePolicy{
|
||||
Attempts: 0,
|
||||
Interval: 0 * time.Second,
|
||||
}
|
||||
err = state.UpsertJob(1001, job)
|
||||
require.Nil(err)
|
||||
|
||||
// Insert "dead" alloc
|
||||
alloc := mock.Alloc()
|
||||
alloc.EvalID = eval.ID
|
||||
alloc.DesiredStatus = structs.AllocDesiredStatusStop
|
||||
alloc.TaskGroup = job.TaskGroups[0].Name
|
||||
state.UpsertJobSummary(1001, mock.JobSummary(alloc.JobID))
|
||||
err = state.UpsertAllocs(1002, []*structs.Allocation{alloc})
|
||||
if err != nil {
|
||||
|
@ -802,6 +941,10 @@ func TestCoreScheduler_JobGC_OutstandingAllocs(t *testing.T) {
|
|||
job := mock.Job()
|
||||
job.Type = structs.JobTypeBatch
|
||||
job.Status = structs.JobStatusDead
|
||||
job.TaskGroups[0].ReschedulePolicy = &structs.ReschedulePolicy{
|
||||
Attempts: 0,
|
||||
Interval: 0 * time.Second,
|
||||
}
|
||||
err := state.UpsertJob(1000, job)
|
||||
if err != nil {
|
||||
t.Fatalf("err: %v", err)
|
||||
|
@ -822,12 +965,14 @@ func TestCoreScheduler_JobGC_OutstandingAllocs(t *testing.T) {
|
|||
alloc.EvalID = eval.ID
|
||||
alloc.DesiredStatus = structs.AllocDesiredStatusRun
|
||||
alloc.ClientStatus = structs.AllocClientStatusComplete
|
||||
alloc.TaskGroup = job.TaskGroups[0].Name
|
||||
|
||||
alloc2 := mock.Alloc()
|
||||
alloc2.JobID = job.ID
|
||||
alloc2.EvalID = eval.ID
|
||||
alloc2.DesiredStatus = structs.AllocDesiredStatusRun
|
||||
alloc2.ClientStatus = structs.AllocClientStatusRunning
|
||||
alloc2.TaskGroup = job.TaskGroups[0].Name
|
||||
|
||||
err = state.UpsertAllocs(1002, []*structs.Allocation{alloc, alloc2})
|
||||
if err != nil {
|
||||
|
@ -1051,8 +1196,11 @@ func TestCoreScheduler_JobGC_Stopped(t *testing.T) {
|
|||
// Insert job.
|
||||
state := s1.fsm.State()
|
||||
job := mock.Job()
|
||||
//job.Status = structs.JobStatusDead
|
||||
job.Stop = true
|
||||
job.TaskGroups[0].ReschedulePolicy = &structs.ReschedulePolicy{
|
||||
Attempts: 0,
|
||||
Interval: 0 * time.Second,
|
||||
}
|
||||
err := state.UpsertJob(1000, job)
|
||||
if err != nil {
|
||||
t.Fatalf("err: %v", err)
|
||||
|
@ -1077,7 +1225,7 @@ func TestCoreScheduler_JobGC_Stopped(t *testing.T) {
|
|||
alloc.JobID = job.ID
|
||||
alloc.EvalID = eval.ID
|
||||
alloc.DesiredStatus = structs.AllocDesiredStatusStop
|
||||
|
||||
alloc.TaskGroup = job.TaskGroups[0].Name
|
||||
err = state.UpsertAllocs(1002, []*structs.Allocation{alloc})
|
||||
if err != nil {
|
||||
t.Fatalf("err: %v", err)
|
||||
|
|
|
@ -5192,6 +5192,33 @@ func (a *Allocation) RescheduleEligible(reschedulePolicy *ReschedulePolicy, fail
|
|||
return attempted < attempts
|
||||
}
|
||||
|
||||
// GCEligible returns if the allocation is eligible to be garbage collected
|
||||
// according to its terminal status and its reschedule trackers
|
||||
func (a *Allocation) GCEligible(reschedulePolicy *ReschedulePolicy, gcTime time.Time, thresholdIndex uint64) bool {
|
||||
|
||||
// Not in a terminal status and old enough
|
||||
if !a.TerminalStatus() || a.ModifyIndex > thresholdIndex {
|
||||
return false
|
||||
}
|
||||
// No reschedule policy or restarts are disabled
|
||||
if reschedulePolicy == nil || reschedulePolicy.Attempts == 0 || reschedulePolicy.Interval == 0 {
|
||||
return true
|
||||
}
|
||||
// Eligible for restarts but none have been attempted yet
|
||||
if a.RescheduleTracker == nil || len(a.RescheduleTracker.Events) == 0 {
|
||||
return false
|
||||
}
|
||||
|
||||
// Most recent reschedule attempt is within time interval
|
||||
interval := reschedulePolicy.Interval
|
||||
lastIndex := len(a.RescheduleTracker.Events)
|
||||
lastRescheduleEvent := a.RescheduleTracker.Events[lastIndex-1]
|
||||
timeDiff := gcTime.UTC().UnixNano() - lastRescheduleEvent.RescheduleTime
|
||||
|
||||
return timeDiff > interval.Nanoseconds()
|
||||
|
||||
}
|
||||
|
||||
// Terminated returns if the allocation is in a terminal state on a client.
|
||||
func (a *Allocation) Terminated() bool {
|
||||
if a.ClientStatus == AllocClientStatusFailed ||
|
||||
|
|
Loading…
Reference in New Issue