open-nomad/nomad/core_sched_test.go
2022-04-05 15:21:28 -05:00

2513 lines
66 KiB
Go

package nomad
import (
"fmt"
"testing"
"time"
memdb "github.com/hashicorp/go-memdb"
msgpackrpc "github.com/hashicorp/net-rpc-msgpackrpc"
"github.com/hashicorp/nomad/ci"
"github.com/hashicorp/nomad/helper/uuid"
"github.com/hashicorp/nomad/nomad/mock"
"github.com/hashicorp/nomad/nomad/state"
"github.com/hashicorp/nomad/nomad/structs"
"github.com/hashicorp/nomad/testutil"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)
func TestCoreScheduler_EvalGC(t *testing.T) {
ci.Parallel(t)
s1, cleanupS1 := TestServer(t, nil)
defer cleanupS1()
testutil.WaitForLeader(t, s1.RPC)
// COMPAT Remove in 0.6: Reset the FSM time table since we reconcile which sets index 0
s1.fsm.timetable.table = make([]TimeTableEntry, 1, 10)
// Insert "dead" eval
store := s1.fsm.State()
eval := mock.Eval()
eval.Status = structs.EvalStatusFailed
store.UpsertJobSummary(999, mock.JobSummary(eval.JobID))
err := store.UpsertEvals(structs.MsgTypeTestSetup, 1000, []*structs.Evaluation{eval})
require.Nil(t, err)
// Insert mock job with rescheduling disabled
job := mock.Job()
job.ID = eval.JobID
job.TaskGroups[0].ReschedulePolicy = &structs.ReschedulePolicy{
Attempts: 0,
Interval: 0 * time.Second,
}
err = store.UpsertJob(structs.MsgTypeTestSetup, 1001, job)
require.Nil(t, err)
// Insert "dead" alloc
alloc := mock.Alloc()
alloc.EvalID = eval.ID
alloc.DesiredStatus = structs.AllocDesiredStatusStop
alloc.JobID = eval.JobID
alloc.TaskGroup = job.TaskGroups[0].Name
// Insert "lost" alloc
alloc2 := mock.Alloc()
alloc2.EvalID = eval.ID
alloc2.DesiredStatus = structs.AllocDesiredStatusRun
alloc2.ClientStatus = structs.AllocClientStatusLost
alloc2.JobID = eval.JobID
alloc2.TaskGroup = job.TaskGroups[0].Name
err = store.UpsertAllocs(structs.MsgTypeTestSetup, 1001, []*structs.Allocation{alloc, alloc2})
if err != nil {
t.Fatalf("err: %v", err)
}
// Update the time tables to make this work
tt := s1.fsm.TimeTable()
tt.Witness(2000, time.Now().UTC().Add(-1*s1.config.EvalGCThreshold))
// Create a core scheduler
snap, err := store.Snapshot()
if err != nil {
t.Fatalf("err: %v", err)
}
core := NewCoreScheduler(s1, snap)
// Attempt the GC
gc := s1.coreJobEval(structs.CoreJobEvalGC, 2000)
err = core.Process(gc)
if err != nil {
t.Fatalf("err: %v", err)
}
// Should be gone
ws := memdb.NewWatchSet()
out, err := store.EvalByID(ws, eval.ID)
if err != nil {
t.Fatalf("err: %v", err)
}
if out != nil {
t.Fatalf("bad: %v", out)
}
outA, err := store.AllocByID(ws, alloc.ID)
if err != nil {
t.Fatalf("err: %v", err)
}
if outA != nil {
t.Fatalf("bad: %v", outA)
}
outA2, err := store.AllocByID(ws, alloc2.ID)
if err != nil {
t.Fatalf("err: %v", err)
}
if outA2 != nil {
t.Fatalf("bad: %v", outA2)
}
}
// Tests GC behavior on allocations being rescheduled
func TestCoreScheduler_EvalGC_ReschedulingAllocs(t *testing.T) {
ci.Parallel(t)
s1, cleanupS1 := TestServer(t, nil)
defer cleanupS1()
testutil.WaitForLeader(t, s1.RPC)
// COMPAT Remove in 0.6: Reset the FSM time table since we reconcile which sets index 0
s1.fsm.timetable.table = make([]TimeTableEntry, 1, 10)
// Insert "dead" eval
store := s1.fsm.State()
eval := mock.Eval()
eval.Status = structs.EvalStatusFailed
store.UpsertJobSummary(999, mock.JobSummary(eval.JobID))
err := store.UpsertEvals(structs.MsgTypeTestSetup, 1000, []*structs.Evaluation{eval})
require.Nil(t, err)
// Insert "pending" eval for same job
eval2 := mock.Eval()
eval2.JobID = eval.JobID
store.UpsertJobSummary(999, mock.JobSummary(eval2.JobID))
err = store.UpsertEvals(structs.MsgTypeTestSetup, 1003, []*structs.Evaluation{eval2})
require.Nil(t, err)
// Insert mock job with default reschedule policy of 2 in 10 minutes
job := mock.Job()
job.ID = eval.JobID
err = store.UpsertJob(structs.MsgTypeTestSetup, 1001, job)
require.Nil(t, err)
// Insert failed alloc with an old reschedule attempt, can be GCed
alloc := mock.Alloc()
alloc.Job = job
alloc.EvalID = eval.ID
alloc.DesiredStatus = structs.AllocDesiredStatusRun
alloc.ClientStatus = structs.AllocClientStatusFailed
alloc.JobID = eval.JobID
alloc.TaskGroup = job.TaskGroups[0].Name
alloc.NextAllocation = uuid.Generate()
alloc.RescheduleTracker = &structs.RescheduleTracker{
Events: []*structs.RescheduleEvent{
{
RescheduleTime: time.Now().Add(-1 * time.Hour).UTC().UnixNano(),
PrevNodeID: uuid.Generate(),
PrevAllocID: uuid.Generate(),
},
},
}
alloc2 := mock.Alloc()
alloc2.Job = job
alloc2.EvalID = eval.ID
alloc2.DesiredStatus = structs.AllocDesiredStatusRun
alloc2.ClientStatus = structs.AllocClientStatusFailed
alloc2.JobID = eval.JobID
alloc2.TaskGroup = job.TaskGroups[0].Name
alloc2.RescheduleTracker = &structs.RescheduleTracker{
Events: []*structs.RescheduleEvent{
{
RescheduleTime: time.Now().Add(-3 * time.Minute).UTC().UnixNano(),
PrevNodeID: uuid.Generate(),
PrevAllocID: uuid.Generate(),
},
},
}
err = store.UpsertAllocs(structs.MsgTypeTestSetup, 1001, []*structs.Allocation{alloc, alloc2})
require.Nil(t, err)
// Update the time tables to make this work
tt := s1.fsm.TimeTable()
tt.Witness(2000, time.Now().UTC().Add(-1*s1.config.EvalGCThreshold))
// Create a core scheduler
snap, err := store.Snapshot()
if err != nil {
t.Fatalf("err: %v", err)
}
core := NewCoreScheduler(s1, snap)
// Attempt the GC, job has all terminal allocs and one pending eval
gc := s1.coreJobEval(structs.CoreJobEvalGC, 2000)
err = core.Process(gc)
require.Nil(t, err)
// Eval should still exist
ws := memdb.NewWatchSet()
out, err := store.EvalByID(ws, eval.ID)
require.Nil(t, err)
require.NotNil(t, out)
require.Equal(t, eval.ID, out.ID)
outA, err := store.AllocByID(ws, alloc.ID)
require.Nil(t, err)
require.Nil(t, outA)
outA2, err := store.AllocByID(ws, alloc2.ID)
require.Nil(t, err)
require.Equal(t, alloc2.ID, outA2.ID)
}
// Tests GC behavior on stopped job with reschedulable allocs
func TestCoreScheduler_EvalGC_StoppedJob_Reschedulable(t *testing.T) {
ci.Parallel(t)
s1, cleanupS1 := TestServer(t, nil)
defer cleanupS1()
testutil.WaitForLeader(t, s1.RPC)
// COMPAT Remove in 0.6: Reset the FSM time table since we reconcile which sets index 0
s1.fsm.timetable.table = make([]TimeTableEntry, 1, 10)
// Insert "dead" eval
store := s1.fsm.State()
eval := mock.Eval()
eval.Status = structs.EvalStatusFailed
store.UpsertJobSummary(999, mock.JobSummary(eval.JobID))
err := store.UpsertEvals(structs.MsgTypeTestSetup, 1000, []*structs.Evaluation{eval})
require.Nil(t, err)
// Insert mock stopped job with default reschedule policy of 2 in 10 minutes
job := mock.Job()
job.ID = eval.JobID
job.Stop = true
err = store.UpsertJob(structs.MsgTypeTestSetup, 1001, job)
require.Nil(t, err)
// Insert failed alloc with a recent reschedule attempt
alloc := mock.Alloc()
alloc.EvalID = eval.ID
alloc.DesiredStatus = structs.AllocDesiredStatusRun
alloc.ClientStatus = structs.AllocClientStatusLost
alloc.JobID = eval.JobID
alloc.TaskGroup = job.TaskGroups[0].Name
alloc.RescheduleTracker = &structs.RescheduleTracker{
Events: []*structs.RescheduleEvent{
{
RescheduleTime: time.Now().Add(-3 * time.Minute).UTC().UnixNano(),
PrevNodeID: uuid.Generate(),
PrevAllocID: uuid.Generate(),
},
},
}
err = store.UpsertAllocs(structs.MsgTypeTestSetup, 1001, []*structs.Allocation{alloc})
require.Nil(t, err)
// Update the time tables to make this work
tt := s1.fsm.TimeTable()
tt.Witness(2000, time.Now().UTC().Add(-1*s1.config.EvalGCThreshold))
// Create a core scheduler
snap, err := store.Snapshot()
if err != nil {
t.Fatalf("err: %v", err)
}
core := NewCoreScheduler(s1, snap)
// Attempt the GC
gc := s1.coreJobEval(structs.CoreJobEvalGC, 2000)
err = core.Process(gc)
require.Nil(t, err)
// Eval should not exist
ws := memdb.NewWatchSet()
out, err := store.EvalByID(ws, eval.ID)
require.Nil(t, err)
require.Nil(t, out)
// Alloc should not exist
outA, err := store.AllocByID(ws, alloc.ID)
require.Nil(t, err)
require.Nil(t, outA)
}
// An EvalGC should never reap a batch job that has not been stopped
func TestCoreScheduler_EvalGC_Batch(t *testing.T) {
ci.Parallel(t)
s1, cleanupS1 := TestServer(t, nil)
defer cleanupS1()
testutil.WaitForLeader(t, s1.RPC)
// COMPAT Remove in 0.6: Reset the FSM time table since we reconcile which sets index 0
s1.fsm.timetable.table = make([]TimeTableEntry, 1, 10)
// Insert a "dead" job
store := s1.fsm.State()
job := mock.Job()
job.Type = structs.JobTypeBatch
job.Status = structs.JobStatusDead
err := store.UpsertJob(structs.MsgTypeTestSetup, 1000, job)
if err != nil {
t.Fatalf("err: %v", err)
}
// Insert "complete" eval
eval := mock.Eval()
eval.Status = structs.EvalStatusComplete
eval.Type = structs.JobTypeBatch
eval.JobID = job.ID
err = store.UpsertEvals(structs.MsgTypeTestSetup, 1001, []*structs.Evaluation{eval})
if err != nil {
t.Fatalf("err: %v", err)
}
// Insert "failed" alloc
alloc := mock.Alloc()
alloc.Job = job
alloc.JobID = job.ID
alloc.EvalID = eval.ID
alloc.DesiredStatus = structs.AllocDesiredStatusStop
// Insert "lost" alloc
alloc2 := mock.Alloc()
alloc2.Job = job
alloc2.JobID = job.ID
alloc2.EvalID = eval.ID
alloc2.DesiredStatus = structs.AllocDesiredStatusRun
alloc2.ClientStatus = structs.AllocClientStatusLost
err = store.UpsertAllocs(structs.MsgTypeTestSetup, 1002, []*structs.Allocation{alloc, alloc2})
if err != nil {
t.Fatalf("err: %v", err)
}
// Update the time tables to make this work
tt := s1.fsm.TimeTable()
tt.Witness(2000, time.Now().UTC().Add(-1*s1.config.EvalGCThreshold))
// Create a core scheduler
snap, err := store.Snapshot()
if err != nil {
t.Fatalf("err: %v", err)
}
core := NewCoreScheduler(s1, snap)
// Attempt the GC
gc := s1.coreJobEval(structs.CoreJobEvalGC, 2000)
err = core.Process(gc)
if err != nil {
t.Fatalf("err: %v", err)
}
// Nothing should be gone
ws := memdb.NewWatchSet()
out, err := store.EvalByID(ws, eval.ID)
if err != nil {
t.Fatalf("err: %v", err)
}
if out == nil {
t.Fatalf("bad: %v", out)
}
outA, err := store.AllocByID(ws, alloc.ID)
if err != nil {
t.Fatalf("err: %v", err)
}
if outA == nil {
t.Fatalf("bad: %v", outA)
}
outA2, err := store.AllocByID(ws, alloc2.ID)
if err != nil {
t.Fatalf("err: %v", err)
}
if outA2 == nil {
t.Fatalf("bad: %v", outA2)
}
outB, err := store.JobByID(ws, job.Namespace, job.ID)
if err != nil {
t.Fatalf("err: %v", err)
}
if outB == nil {
t.Fatalf("bad: %v", outB)
}
}
// An EvalGC should reap allocations from jobs with an older modify index
func TestCoreScheduler_EvalGC_Batch_OldVersion(t *testing.T) {
ci.Parallel(t)
s1, cleanupS1 := TestServer(t, nil)
defer cleanupS1()
testutil.WaitForLeader(t, s1.RPC)
// COMPAT Remove in 0.6: Reset the FSM time table since we reconcile which sets index 0
s1.fsm.timetable.table = make([]TimeTableEntry, 1, 10)
// Insert a "dead" job
store := s1.fsm.State()
job := mock.Job()
job.Type = structs.JobTypeBatch
job.Status = structs.JobStatusDead
err := store.UpsertJob(structs.MsgTypeTestSetup, 1000, job)
if err != nil {
t.Fatalf("err: %v", err)
}
// Insert "complete" eval
eval := mock.Eval()
eval.Status = structs.EvalStatusComplete
eval.Type = structs.JobTypeBatch
eval.JobID = job.ID
err = store.UpsertEvals(structs.MsgTypeTestSetup, 1001, []*structs.Evaluation{eval})
if err != nil {
t.Fatalf("err: %v", err)
}
// Insert "failed" alloc
alloc := mock.Alloc()
alloc.Job = job
alloc.JobID = job.ID
alloc.EvalID = eval.ID
alloc.DesiredStatus = structs.AllocDesiredStatusStop
// Insert "lost" alloc
alloc2 := mock.Alloc()
alloc2.Job = job
alloc2.JobID = job.ID
alloc2.EvalID = eval.ID
alloc2.DesiredStatus = structs.AllocDesiredStatusRun
alloc2.ClientStatus = structs.AllocClientStatusLost
// Insert alloc with older job modifyindex
alloc3 := mock.Alloc()
job2 := job.Copy()
alloc3.Job = job2
alloc3.JobID = job2.ID
alloc3.EvalID = eval.ID
job2.CreateIndex = 500
alloc3.DesiredStatus = structs.AllocDesiredStatusRun
alloc3.ClientStatus = structs.AllocClientStatusLost
err = store.UpsertAllocs(structs.MsgTypeTestSetup, 1002, []*structs.Allocation{alloc, alloc2, alloc3})
if err != nil {
t.Fatalf("err: %v", err)
}
// Update the time tables to make this work
tt := s1.fsm.TimeTable()
tt.Witness(2000, time.Now().UTC().Add(-1*s1.config.EvalGCThreshold))
// Create a core scheduler
snap, err := store.Snapshot()
if err != nil {
t.Fatalf("err: %v", err)
}
core := NewCoreScheduler(s1, snap)
// Attempt the GC
gc := s1.coreJobEval(structs.CoreJobEvalGC, 2000)
err = core.Process(gc)
if err != nil {
t.Fatalf("err: %v", err)
}
// Alloc1 and 2 should be there, and alloc3 should be gone
ws := memdb.NewWatchSet()
out, err := store.EvalByID(ws, eval.ID)
if err != nil {
t.Fatalf("err: %v", err)
}
if out == nil {
t.Fatalf("bad: %v", out)
}
outA, err := store.AllocByID(ws, alloc.ID)
if err != nil {
t.Fatalf("err: %v", err)
}
if outA == nil {
t.Fatalf("bad: %v", outA)
}
outA2, err := store.AllocByID(ws, alloc2.ID)
if err != nil {
t.Fatalf("err: %v", err)
}
if outA2 == nil {
t.Fatalf("bad: %v", outA2)
}
outA3, err := store.AllocByID(ws, alloc3.ID)
if err != nil {
t.Fatalf("err: %v", err)
}
if outA3 != nil {
t.Fatalf("expected alloc to be nil:%v", outA2)
}
outB, err := store.JobByID(ws, job.Namespace, job.ID)
if err != nil {
t.Fatalf("err: %v", err)
}
if outB == nil {
t.Fatalf("bad: %v", outB)
}
}
// An EvalGC should reap a batch job that has been stopped
func TestCoreScheduler_EvalGC_BatchStopped(t *testing.T) {
ci.Parallel(t)
s1, cleanupS1 := TestServer(t, nil)
defer cleanupS1()
testutil.WaitForLeader(t, s1.RPC)
// COMPAT Remove in 0.6: Reset the FSM time table since we reconcile which sets index 0
s1.fsm.timetable.table = make([]TimeTableEntry, 1, 10)
// Create a "dead" job
store := s1.fsm.State()
job := mock.Job()
job.Type = structs.JobTypeBatch
job.Status = structs.JobStatusDead
job.Stop = true
job.TaskGroups[0].ReschedulePolicy = &structs.ReschedulePolicy{
Attempts: 0,
Interval: 0 * time.Second,
}
err := store.UpsertJob(structs.MsgTypeTestSetup, 1001, job)
require.Nil(t, err)
// Insert "complete" eval
eval := mock.Eval()
eval.Status = structs.EvalStatusComplete
eval.Type = structs.JobTypeBatch
eval.JobID = job.ID
err = store.UpsertEvals(structs.MsgTypeTestSetup, 1002, []*structs.Evaluation{eval})
require.Nil(t, err)
// Insert "failed" alloc
alloc := mock.Alloc()
alloc.JobID = job.ID
alloc.EvalID = eval.ID
alloc.TaskGroup = job.TaskGroups[0].Name
alloc.DesiredStatus = structs.AllocDesiredStatusStop
// Insert "lost" alloc
alloc2 := mock.Alloc()
alloc2.JobID = job.ID
alloc2.EvalID = eval.ID
alloc2.DesiredStatus = structs.AllocDesiredStatusRun
alloc2.ClientStatus = structs.AllocClientStatusLost
alloc2.TaskGroup = job.TaskGroups[0].Name
err = store.UpsertAllocs(structs.MsgTypeTestSetup, 1003, []*structs.Allocation{alloc, alloc2})
if err != nil {
t.Fatalf("err: %v", err)
}
// Update the time tables to make this work
tt := s1.fsm.TimeTable()
tt.Witness(2000, time.Now().UTC().Add(-1*s1.config.EvalGCThreshold))
// Create a core scheduler
snap, err := store.Snapshot()
if err != nil {
t.Fatalf("err: %v", err)
}
core := NewCoreScheduler(s1, snap)
// Attempt the GC
gc := s1.coreJobEval(structs.CoreJobEvalGC, 2000)
err = core.Process(gc)
if err != nil {
t.Fatalf("err: %v", err)
}
// Everything should be gone
ws := memdb.NewWatchSet()
out, err := store.EvalByID(ws, eval.ID)
if err != nil {
t.Fatalf("err: %v", err)
}
if out != nil {
t.Fatalf("bad: %v", out)
}
outA, err := store.AllocByID(ws, alloc.ID)
if err != nil {
t.Fatalf("err: %v", err)
}
if outA != nil {
t.Fatalf("bad: %v", outA)
}
outA2, err := store.AllocByID(ws, alloc2.ID)
if err != nil {
t.Fatalf("err: %v", err)
}
if outA2 != nil {
t.Fatalf("bad: %v", outA2)
}
}
func TestCoreScheduler_EvalGC_Partial(t *testing.T) {
ci.Parallel(t)
s1, cleanupS1 := TestServer(t, nil)
defer cleanupS1()
testutil.WaitForLeader(t, s1.RPC)
// COMPAT Remove in 0.6: Reset the FSM time table since we reconcile which sets index 0
s1.fsm.timetable.table = make([]TimeTableEntry, 1, 10)
// Insert "dead" eval
store := s1.fsm.State()
eval := mock.Eval()
eval.Status = structs.EvalStatusComplete
store.UpsertJobSummary(999, mock.JobSummary(eval.JobID))
err := store.UpsertEvals(structs.MsgTypeTestSetup, 1000, []*structs.Evaluation{eval})
if err != nil {
t.Fatalf("err: %v", err)
}
// Create mock job with id same as eval
job := mock.Job()
job.ID = eval.JobID
// Insert "dead" alloc
alloc := mock.Alloc()
alloc.JobID = job.ID
alloc.EvalID = eval.ID
alloc.DesiredStatus = structs.AllocDesiredStatusStop
alloc.TaskGroup = job.TaskGroups[0].Name
store.UpsertJobSummary(1001, mock.JobSummary(alloc.JobID))
// Insert "lost" alloc
alloc2 := mock.Alloc()
alloc2.JobID = job.ID
alloc2.EvalID = eval.ID
alloc2.TaskGroup = job.TaskGroups[0].Name
alloc2.DesiredStatus = structs.AllocDesiredStatusRun
alloc2.ClientStatus = structs.AllocClientStatusLost
err = store.UpsertAllocs(structs.MsgTypeTestSetup, 1002, []*structs.Allocation{alloc, alloc2})
if err != nil {
t.Fatalf("err: %v", err)
}
// Insert "running" alloc
alloc3 := mock.Alloc()
alloc3.EvalID = eval.ID
alloc3.JobID = job.ID
store.UpsertJobSummary(1003, mock.JobSummary(alloc3.JobID))
err = store.UpsertAllocs(structs.MsgTypeTestSetup, 1004, []*structs.Allocation{alloc3})
if err != nil {
t.Fatalf("err: %v", err)
}
// Insert mock job with rescheduling disabled
job.TaskGroups[0].ReschedulePolicy = &structs.ReschedulePolicy{
Attempts: 0,
Interval: 0 * time.Second,
}
err = store.UpsertJob(structs.MsgTypeTestSetup, 1001, job)
require.Nil(t, err)
// Update the time tables to make this work
tt := s1.fsm.TimeTable()
tt.Witness(2000, time.Now().UTC().Add(-1*s1.config.EvalGCThreshold))
// Create a core scheduler
snap, err := store.Snapshot()
if err != nil {
t.Fatalf("err: %v", err)
}
core := NewCoreScheduler(s1, snap)
// Attempt the GC
gc := s1.coreJobEval(structs.CoreJobEvalGC, 2000)
err = core.Process(gc)
if err != nil {
t.Fatalf("err: %v", err)
}
// Should not be gone
ws := memdb.NewWatchSet()
out, err := store.EvalByID(ws, eval.ID)
if err != nil {
t.Fatalf("err: %v", err)
}
if out == nil {
t.Fatalf("bad: %v", out)
}
outA, err := store.AllocByID(ws, alloc3.ID)
if err != nil {
t.Fatalf("err: %v", err)
}
if outA == nil {
t.Fatalf("bad: %v", outA)
}
// Should be gone
outB, err := store.AllocByID(ws, alloc.ID)
if err != nil {
t.Fatalf("err: %v", err)
}
if outB != nil {
t.Fatalf("bad: %v", outB)
}
outC, err := store.AllocByID(ws, alloc2.ID)
if err != nil {
t.Fatalf("err: %v", err)
}
if outC != nil {
t.Fatalf("bad: %v", outC)
}
}
func TestCoreScheduler_EvalGC_Force(t *testing.T) {
ci.Parallel(t)
for _, withAcl := range []bool{false, true} {
t.Run(fmt.Sprintf("with acl %v", withAcl), func(t *testing.T) {
var server *Server
var cleanup func()
if withAcl {
server, _, cleanup = TestACLServer(t, nil)
} else {
server, cleanup = TestServer(t, nil)
}
defer cleanup()
testutil.WaitForLeader(t, server.RPC)
// COMPAT Remove in 0.6: Reset the FSM time table since we reconcile which sets index 0
server.fsm.timetable.table = make([]TimeTableEntry, 1, 10)
// Insert "dead" eval
store := server.fsm.State()
eval := mock.Eval()
eval.Status = structs.EvalStatusFailed
store.UpsertJobSummary(999, mock.JobSummary(eval.JobID))
err := store.UpsertEvals(structs.MsgTypeTestSetup, 1000, []*structs.Evaluation{eval})
if err != nil {
t.Fatalf("err: %v", err)
}
// Insert mock job with rescheduling disabled
job := mock.Job()
job.ID = eval.JobID
job.TaskGroups[0].ReschedulePolicy = &structs.ReschedulePolicy{
Attempts: 0,
Interval: 0 * time.Second,
}
err = store.UpsertJob(structs.MsgTypeTestSetup, 1001, job)
require.Nil(t, err)
// Insert "dead" alloc
alloc := mock.Alloc()
alloc.EvalID = eval.ID
alloc.DesiredStatus = structs.AllocDesiredStatusStop
alloc.TaskGroup = job.TaskGroups[0].Name
store.UpsertJobSummary(1001, mock.JobSummary(alloc.JobID))
err = store.UpsertAllocs(structs.MsgTypeTestSetup, 1002, []*structs.Allocation{alloc})
if err != nil {
t.Fatalf("err: %v", err)
}
// Create a core scheduler
snap, err := store.Snapshot()
if err != nil {
t.Fatalf("err: %v", err)
}
core := NewCoreScheduler(server, snap)
// Attempt the GC
gc := server.coreJobEval(structs.CoreJobForceGC, 1002)
err = core.Process(gc)
if err != nil {
t.Fatalf("err: %v", err)
}
// Should be gone
ws := memdb.NewWatchSet()
out, err := store.EvalByID(ws, eval.ID)
if err != nil {
t.Fatalf("err: %v", err)
}
if out != nil {
t.Fatalf("bad: %v", out)
}
outA, err := store.AllocByID(ws, alloc.ID)
if err != nil {
t.Fatalf("err: %v", err)
}
if outA != nil {
t.Fatalf("bad: %v", outA)
}
})
}
}
func TestCoreScheduler_NodeGC(t *testing.T) {
ci.Parallel(t)
for _, withAcl := range []bool{false, true} {
t.Run(fmt.Sprintf("with acl %v", withAcl), func(t *testing.T) {
var server *Server
var cleanup func()
if withAcl {
server, _, cleanup = TestACLServer(t, nil)
} else {
server, cleanup = TestServer(t, nil)
}
defer cleanup()
testutil.WaitForLeader(t, server.RPC)
// COMPAT Remove in 0.6: Reset the FSM time table since we reconcile which sets index 0
server.fsm.timetable.table = make([]TimeTableEntry, 1, 10)
// Insert "dead" node
store := server.fsm.State()
node := mock.Node()
node.Status = structs.NodeStatusDown
err := store.UpsertNode(structs.MsgTypeTestSetup, 1000, node)
if err != nil {
t.Fatalf("err: %v", err)
}
// Update the time tables to make this work
tt := server.fsm.TimeTable()
tt.Witness(2000, time.Now().UTC().Add(-1*server.config.NodeGCThreshold))
// Create a core scheduler
snap, err := store.Snapshot()
if err != nil {
t.Fatalf("err: %v", err)
}
core := NewCoreScheduler(server, snap)
// Attempt the GC
gc := server.coreJobEval(structs.CoreJobNodeGC, 2000)
err = core.Process(gc)
if err != nil {
t.Fatalf("err: %v", err)
}
// Should be gone
ws := memdb.NewWatchSet()
out, err := store.NodeByID(ws, node.ID)
if err != nil {
t.Fatalf("err: %v", err)
}
if out != nil {
t.Fatalf("bad: %v", out)
}
})
}
}
func TestCoreScheduler_NodeGC_TerminalAllocs(t *testing.T) {
ci.Parallel(t)
s1, cleanupS1 := TestServer(t, nil)
defer cleanupS1()
testutil.WaitForLeader(t, s1.RPC)
// COMPAT Remove in 0.6: Reset the FSM time table since we reconcile which sets index 0
s1.fsm.timetable.table = make([]TimeTableEntry, 1, 10)
// Insert "dead" node
store := s1.fsm.State()
node := mock.Node()
node.Status = structs.NodeStatusDown
err := store.UpsertNode(structs.MsgTypeTestSetup, 1000, node)
if err != nil {
t.Fatalf("err: %v", err)
}
// Insert a terminal alloc on that node
alloc := mock.Alloc()
alloc.DesiredStatus = structs.AllocDesiredStatusStop
store.UpsertJobSummary(1001, mock.JobSummary(alloc.JobID))
if err := store.UpsertAllocs(structs.MsgTypeTestSetup, 1002, []*structs.Allocation{alloc}); err != nil {
t.Fatalf("err: %v", err)
}
// Update the time tables to make this work
tt := s1.fsm.TimeTable()
tt.Witness(2000, time.Now().UTC().Add(-1*s1.config.NodeGCThreshold))
// Create a core scheduler
snap, err := store.Snapshot()
if err != nil {
t.Fatalf("err: %v", err)
}
core := NewCoreScheduler(s1, snap)
// Attempt the GC
gc := s1.coreJobEval(structs.CoreJobNodeGC, 2000)
err = core.Process(gc)
if err != nil {
t.Fatalf("err: %v", err)
}
// Should be gone
ws := memdb.NewWatchSet()
out, err := store.NodeByID(ws, node.ID)
if err != nil {
t.Fatalf("err: %v", err)
}
if out != nil {
t.Fatalf("bad: %v", out)
}
}
func TestCoreScheduler_NodeGC_RunningAllocs(t *testing.T) {
ci.Parallel(t)
s1, cleanupS1 := TestServer(t, nil)
defer cleanupS1()
testutil.WaitForLeader(t, s1.RPC)
// COMPAT Remove in 0.6: Reset the FSM time table since we reconcile which sets index 0
s1.fsm.timetable.table = make([]TimeTableEntry, 1, 10)
// Insert "dead" node
store := s1.fsm.State()
node := mock.Node()
node.Status = structs.NodeStatusDown
err := store.UpsertNode(structs.MsgTypeTestSetup, 1000, node)
if err != nil {
t.Fatalf("err: %v", err)
}
// Insert a running alloc on that node
alloc := mock.Alloc()
alloc.NodeID = node.ID
alloc.DesiredStatus = structs.AllocDesiredStatusRun
alloc.ClientStatus = structs.AllocClientStatusRunning
store.UpsertJobSummary(1001, mock.JobSummary(alloc.JobID))
if err := store.UpsertAllocs(structs.MsgTypeTestSetup, 1002, []*structs.Allocation{alloc}); err != nil {
t.Fatalf("err: %v", err)
}
// Update the time tables to make this work
tt := s1.fsm.TimeTable()
tt.Witness(2000, time.Now().UTC().Add(-1*s1.config.NodeGCThreshold))
// Create a core scheduler
snap, err := store.Snapshot()
if err != nil {
t.Fatalf("err: %v", err)
}
core := NewCoreScheduler(s1, snap)
// Attempt the GC
gc := s1.coreJobEval(structs.CoreJobNodeGC, 2000)
err = core.Process(gc)
if err != nil {
t.Fatalf("err: %v", err)
}
// Should still be here
ws := memdb.NewWatchSet()
out, err := store.NodeByID(ws, node.ID)
if err != nil {
t.Fatalf("err: %v", err)
}
if out == nil {
t.Fatalf("bad: %v", out)
}
}
func TestCoreScheduler_NodeGC_Force(t *testing.T) {
ci.Parallel(t)
s1, cleanupS1 := TestServer(t, nil)
defer cleanupS1()
testutil.WaitForLeader(t, s1.RPC)
// COMPAT Remove in 0.6: Reset the FSM time table since we reconcile which sets index 0
s1.fsm.timetable.table = make([]TimeTableEntry, 1, 10)
// Insert "dead" node
store := s1.fsm.State()
node := mock.Node()
node.Status = structs.NodeStatusDown
err := store.UpsertNode(structs.MsgTypeTestSetup, 1000, node)
if err != nil {
t.Fatalf("err: %v", err)
}
// Create a core scheduler
snap, err := store.Snapshot()
if err != nil {
t.Fatalf("err: %v", err)
}
core := NewCoreScheduler(s1, snap)
// Attempt the GC
gc := s1.coreJobEval(structs.CoreJobForceGC, 1000)
err = core.Process(gc)
if err != nil {
t.Fatalf("err: %v", err)
}
// Should be gone
ws := memdb.NewWatchSet()
out, err := store.NodeByID(ws, node.ID)
if err != nil {
t.Fatalf("err: %v", err)
}
if out != nil {
t.Fatalf("bad: %v", out)
}
}
func TestCoreScheduler_JobGC_OutstandingEvals(t *testing.T) {
ci.Parallel(t)
s1, cleanupS1 := TestServer(t, nil)
defer cleanupS1()
testutil.WaitForLeader(t, s1.RPC)
// COMPAT Remove in 0.6: Reset the FSM time table since we reconcile which sets index 0
s1.fsm.timetable.table = make([]TimeTableEntry, 1, 10)
// Insert job.
store := s1.fsm.State()
job := mock.Job()
job.Type = structs.JobTypeBatch
job.Status = structs.JobStatusDead
err := store.UpsertJob(structs.MsgTypeTestSetup, 1000, job)
if err != nil {
t.Fatalf("err: %v", err)
}
// Insert two evals, one terminal and one not
eval := mock.Eval()
eval.JobID = job.ID
eval.Status = structs.EvalStatusComplete
eval2 := mock.Eval()
eval2.JobID = job.ID
eval2.Status = structs.EvalStatusPending
err = store.UpsertEvals(structs.MsgTypeTestSetup, 1001, []*structs.Evaluation{eval, eval2})
if err != nil {
t.Fatalf("err: %v", err)
}
// Update the time tables to make this work
tt := s1.fsm.TimeTable()
tt.Witness(2000, time.Now().UTC().Add(-1*s1.config.JobGCThreshold))
// Create a core scheduler
snap, err := store.Snapshot()
if err != nil {
t.Fatalf("err: %v", err)
}
core := NewCoreScheduler(s1, snap)
// Attempt the GC
gc := s1.coreJobEval(structs.CoreJobJobGC, 2000)
err = core.Process(gc)
if err != nil {
t.Fatalf("err: %v", err)
}
// Should still exist
ws := memdb.NewWatchSet()
out, err := store.JobByID(ws, job.Namespace, job.ID)
if err != nil {
t.Fatalf("err: %v", err)
}
if out == nil {
t.Fatalf("bad: %v", out)
}
outE, err := store.EvalByID(ws, eval.ID)
if err != nil {
t.Fatalf("err: %v", err)
}
if outE == nil {
t.Fatalf("bad: %v", outE)
}
outE2, err := store.EvalByID(ws, eval2.ID)
if err != nil {
t.Fatalf("err: %v", err)
}
if outE2 == nil {
t.Fatalf("bad: %v", outE2)
}
// Update the second eval to be terminal
eval2.Status = structs.EvalStatusComplete
err = store.UpsertEvals(structs.MsgTypeTestSetup, 1003, []*structs.Evaluation{eval2})
if err != nil {
t.Fatalf("err: %v", err)
}
// Create a core scheduler
snap, err = store.Snapshot()
if err != nil {
t.Fatalf("err: %v", err)
}
core = NewCoreScheduler(s1, snap)
// Attempt the GC
gc = s1.coreJobEval(structs.CoreJobJobGC, 2000)
err = core.Process(gc)
if err != nil {
t.Fatalf("err: %v", err)
}
// Should not still exist
out, err = store.JobByID(ws, job.Namespace, job.ID)
if err != nil {
t.Fatalf("err: %v", err)
}
if out != nil {
t.Fatalf("bad: %v", out)
}
outE, err = store.EvalByID(ws, eval.ID)
if err != nil {
t.Fatalf("err: %v", err)
}
if outE != nil {
t.Fatalf("bad: %v", outE)
}
outE2, err = store.EvalByID(ws, eval2.ID)
if err != nil {
t.Fatalf("err: %v", err)
}
if outE2 != nil {
t.Fatalf("bad: %v", outE2)
}
}
func TestCoreScheduler_JobGC_OutstandingAllocs(t *testing.T) {
ci.Parallel(t)
s1, cleanupS1 := TestServer(t, nil)
defer cleanupS1()
testutil.WaitForLeader(t, s1.RPC)
// COMPAT Remove in 0.6: Reset the FSM time table since we reconcile which sets index 0
s1.fsm.timetable.table = make([]TimeTableEntry, 1, 10)
// Insert job.
store := s1.fsm.State()
job := mock.Job()
job.Type = structs.JobTypeBatch
job.Status = structs.JobStatusDead
job.TaskGroups[0].ReschedulePolicy = &structs.ReschedulePolicy{
Attempts: 0,
Interval: 0 * time.Second,
}
err := store.UpsertJob(structs.MsgTypeTestSetup, 1000, job)
if err != nil {
t.Fatalf("err: %v", err)
}
// Insert an eval
eval := mock.Eval()
eval.JobID = job.ID
eval.Status = structs.EvalStatusComplete
err = store.UpsertEvals(structs.MsgTypeTestSetup, 1001, []*structs.Evaluation{eval})
if err != nil {
t.Fatalf("err: %v", err)
}
// Insert two allocs, one terminal and one not
alloc := mock.Alloc()
alloc.JobID = job.ID
alloc.EvalID = eval.ID
alloc.DesiredStatus = structs.AllocDesiredStatusRun
alloc.ClientStatus = structs.AllocClientStatusComplete
alloc.TaskGroup = job.TaskGroups[0].Name
alloc2 := mock.Alloc()
alloc2.JobID = job.ID
alloc2.EvalID = eval.ID
alloc2.DesiredStatus = structs.AllocDesiredStatusRun
alloc2.ClientStatus = structs.AllocClientStatusRunning
alloc2.TaskGroup = job.TaskGroups[0].Name
err = store.UpsertAllocs(structs.MsgTypeTestSetup, 1002, []*structs.Allocation{alloc, alloc2})
if err != nil {
t.Fatalf("err: %v", err)
}
// Update the time tables to make this work
tt := s1.fsm.TimeTable()
tt.Witness(2000, time.Now().UTC().Add(-1*s1.config.JobGCThreshold))
// Create a core scheduler
snap, err := store.Snapshot()
if err != nil {
t.Fatalf("err: %v", err)
}
core := NewCoreScheduler(s1, snap)
// Attempt the GC
gc := s1.coreJobEval(structs.CoreJobJobGC, 2000)
err = core.Process(gc)
if err != nil {
t.Fatalf("err: %v", err)
}
// Should still exist
ws := memdb.NewWatchSet()
out, err := store.JobByID(ws, job.Namespace, job.ID)
if err != nil {
t.Fatalf("err: %v", err)
}
if out == nil {
t.Fatalf("bad: %v", out)
}
outA, err := store.AllocByID(ws, alloc.ID)
if err != nil {
t.Fatalf("err: %v", err)
}
if outA == nil {
t.Fatalf("bad: %v", outA)
}
outA2, err := store.AllocByID(ws, alloc2.ID)
if err != nil {
t.Fatalf("err: %v", err)
}
if outA2 == nil {
t.Fatalf("bad: %v", outA2)
}
// Update the second alloc to be terminal
alloc2.ClientStatus = structs.AllocClientStatusComplete
err = store.UpsertAllocs(structs.MsgTypeTestSetup, 1003, []*structs.Allocation{alloc2})
if err != nil {
t.Fatalf("err: %v", err)
}
// Create a core scheduler
snap, err = store.Snapshot()
if err != nil {
t.Fatalf("err: %v", err)
}
core = NewCoreScheduler(s1, snap)
// Attempt the GC
gc = s1.coreJobEval(structs.CoreJobJobGC, 2000)
err = core.Process(gc)
if err != nil {
t.Fatalf("err: %v", err)
}
// Should not still exist
out, err = store.JobByID(ws, job.Namespace, job.ID)
if err != nil {
t.Fatalf("err: %v", err)
}
if out != nil {
t.Fatalf("bad: %v", out)
}
outA, err = store.AllocByID(ws, alloc.ID)
if err != nil {
t.Fatalf("err: %v", err)
}
if outA != nil {
t.Fatalf("bad: %v", outA)
}
outA2, err = store.AllocByID(ws, alloc2.ID)
if err != nil {
t.Fatalf("err: %v", err)
}
if outA2 != nil {
t.Fatalf("bad: %v", outA2)
}
}
// This test ensures that batch jobs are GC'd in one shot, meaning it all
// allocs/evals and job or nothing
func TestCoreScheduler_JobGC_OneShot(t *testing.T) {
ci.Parallel(t)
s1, cleanupS1 := TestServer(t, nil)
defer cleanupS1()
testutil.WaitForLeader(t, s1.RPC)
// COMPAT Remove in 0.6: Reset the FSM time table since we reconcile which sets index 0
s1.fsm.timetable.table = make([]TimeTableEntry, 1, 10)
// Insert job.
store := s1.fsm.State()
job := mock.Job()
job.Type = structs.JobTypeBatch
err := store.UpsertJob(structs.MsgTypeTestSetup, 1000, job)
if err != nil {
t.Fatalf("err: %v", err)
}
// Insert two complete evals
eval := mock.Eval()
eval.JobID = job.ID
eval.Status = structs.EvalStatusComplete
eval2 := mock.Eval()
eval2.JobID = job.ID
eval2.Status = structs.EvalStatusComplete
err = store.UpsertEvals(structs.MsgTypeTestSetup, 1001, []*structs.Evaluation{eval, eval2})
if err != nil {
t.Fatalf("err: %v", err)
}
// Insert one complete alloc and one running on distinct evals
alloc := mock.Alloc()
alloc.JobID = job.ID
alloc.EvalID = eval.ID
alloc.DesiredStatus = structs.AllocDesiredStatusStop
alloc2 := mock.Alloc()
alloc2.JobID = job.ID
alloc2.EvalID = eval2.ID
alloc2.DesiredStatus = structs.AllocDesiredStatusRun
err = store.UpsertAllocs(structs.MsgTypeTestSetup, 1002, []*structs.Allocation{alloc, alloc2})
if err != nil {
t.Fatalf("err: %v", err)
}
// Force the jobs state to dead
job.Status = structs.JobStatusDead
// Update the time tables to make this work
tt := s1.fsm.TimeTable()
tt.Witness(2000, time.Now().UTC().Add(-1*s1.config.JobGCThreshold))
// Create a core scheduler
snap, err := store.Snapshot()
if err != nil {
t.Fatalf("err: %v", err)
}
core := NewCoreScheduler(s1, snap)
// Attempt the GC
gc := s1.coreJobEval(structs.CoreJobJobGC, 2000)
err = core.Process(gc)
if err != nil {
t.Fatalf("err: %v", err)
}
// Should still exist
ws := memdb.NewWatchSet()
out, err := store.JobByID(ws, job.Namespace, job.ID)
if err != nil {
t.Fatalf("err: %v", err)
}
if out == nil {
t.Fatalf("bad: %v", out)
}
outE, err := store.EvalByID(ws, eval.ID)
if err != nil {
t.Fatalf("err: %v", err)
}
if outE == nil {
t.Fatalf("bad: %v", outE)
}
outE2, err := store.EvalByID(ws, eval2.ID)
if err != nil {
t.Fatalf("err: %v", err)
}
if outE2 == nil {
t.Fatalf("bad: %v", outE2)
}
outA, err := store.AllocByID(ws, alloc.ID)
if err != nil {
t.Fatalf("err: %v", err)
}
if outA == nil {
t.Fatalf("bad: %v", outA)
}
outA2, err := store.AllocByID(ws, alloc2.ID)
if err != nil {
t.Fatalf("err: %v", err)
}
if outA2 == nil {
t.Fatalf("bad: %v", outA2)
}
}
// This test ensures that stopped jobs are GCd
func TestCoreScheduler_JobGC_Stopped(t *testing.T) {
ci.Parallel(t)
s1, cleanupS1 := TestServer(t, nil)
defer cleanupS1()
testutil.WaitForLeader(t, s1.RPC)
// COMPAT Remove in 0.6: Reset the FSM time table since we reconcile which sets index 0
s1.fsm.timetable.table = make([]TimeTableEntry, 1, 10)
// Insert job.
store := s1.fsm.State()
job := mock.Job()
job.Stop = true
job.TaskGroups[0].ReschedulePolicy = &structs.ReschedulePolicy{
Attempts: 0,
Interval: 0 * time.Second,
}
err := store.UpsertJob(structs.MsgTypeTestSetup, 1000, job)
if err != nil {
t.Fatalf("err: %v", err)
}
// Insert two complete evals
eval := mock.Eval()
eval.JobID = job.ID
eval.Status = structs.EvalStatusComplete
eval2 := mock.Eval()
eval2.JobID = job.ID
eval2.Status = structs.EvalStatusComplete
err = store.UpsertEvals(structs.MsgTypeTestSetup, 1001, []*structs.Evaluation{eval, eval2})
if err != nil {
t.Fatalf("err: %v", err)
}
// Insert one complete alloc
alloc := mock.Alloc()
alloc.JobID = job.ID
alloc.EvalID = eval.ID
alloc.DesiredStatus = structs.AllocDesiredStatusStop
alloc.TaskGroup = job.TaskGroups[0].Name
err = store.UpsertAllocs(structs.MsgTypeTestSetup, 1002, []*structs.Allocation{alloc})
if err != nil {
t.Fatalf("err: %v", err)
}
// Update the time tables to make this work
tt := s1.fsm.TimeTable()
tt.Witness(2000, time.Now().UTC().Add(-1*s1.config.JobGCThreshold))
// Create a core scheduler
snap, err := store.Snapshot()
if err != nil {
t.Fatalf("err: %v", err)
}
core := NewCoreScheduler(s1, snap)
// Attempt the GC
gc := s1.coreJobEval(structs.CoreJobJobGC, 2000)
err = core.Process(gc)
if err != nil {
t.Fatalf("err: %v", err)
}
// Shouldn't still exist
ws := memdb.NewWatchSet()
out, err := store.JobByID(ws, job.Namespace, job.ID)
if err != nil {
t.Fatalf("err: %v", err)
}
if out != nil {
t.Fatalf("bad: %v", out)
}
outE, err := store.EvalByID(ws, eval.ID)
if err != nil {
t.Fatalf("err: %v", err)
}
if outE != nil {
t.Fatalf("bad: %v", outE)
}
outE2, err := store.EvalByID(ws, eval2.ID)
if err != nil {
t.Fatalf("err: %v", err)
}
if outE2 != nil {
t.Fatalf("bad: %v", outE2)
}
outA, err := store.AllocByID(ws, alloc.ID)
if err != nil {
t.Fatalf("err: %v", err)
}
if outA != nil {
t.Fatalf("bad: %v", outA)
}
}
func TestCoreScheduler_JobGC_Force(t *testing.T) {
ci.Parallel(t)
for _, withAcl := range []bool{false, true} {
t.Run(fmt.Sprintf("with acl %v", withAcl), func(t *testing.T) {
var server *Server
var cleanup func()
if withAcl {
server, _, cleanup = TestACLServer(t, nil)
} else {
server, cleanup = TestServer(t, nil)
}
defer cleanup()
testutil.WaitForLeader(t, server.RPC)
// COMPAT Remove in 0.6: Reset the FSM time table since we reconcile which sets index 0
server.fsm.timetable.table = make([]TimeTableEntry, 1, 10)
// Insert job.
store := server.fsm.State()
job := mock.Job()
job.Type = structs.JobTypeBatch
job.Status = structs.JobStatusDead
err := store.UpsertJob(structs.MsgTypeTestSetup, 1000, job)
if err != nil {
t.Fatalf("err: %v", err)
}
// Insert a terminal eval
eval := mock.Eval()
eval.JobID = job.ID
eval.Status = structs.EvalStatusComplete
err = store.UpsertEvals(structs.MsgTypeTestSetup, 1001, []*structs.Evaluation{eval})
if err != nil {
t.Fatalf("err: %v", err)
}
// Create a core scheduler
snap, err := store.Snapshot()
if err != nil {
t.Fatalf("err: %v", err)
}
core := NewCoreScheduler(server, snap)
// Attempt the GC
gc := server.coreJobEval(structs.CoreJobForceGC, 1002)
err = core.Process(gc)
if err != nil {
t.Fatalf("err: %v", err)
}
// Shouldn't still exist
ws := memdb.NewWatchSet()
out, err := store.JobByID(ws, job.Namespace, job.ID)
if err != nil {
t.Fatalf("err: %v", err)
}
if out != nil {
t.Fatalf("bad: %v", out)
}
outE, err := store.EvalByID(ws, eval.ID)
if err != nil {
t.Fatalf("err: %v", err)
}
if outE != nil {
t.Fatalf("bad: %v", outE)
}
})
}
}
// This test ensures parameterized jobs only get gc'd when stopped
func TestCoreScheduler_JobGC_Parameterized(t *testing.T) {
ci.Parallel(t)
s1, cleanupS1 := TestServer(t, nil)
defer cleanupS1()
testutil.WaitForLeader(t, s1.RPC)
// COMPAT Remove in 0.6: Reset the FSM time table since we reconcile which sets index 0
s1.fsm.timetable.table = make([]TimeTableEntry, 1, 10)
// Insert a parameterized job.
store := s1.fsm.State()
job := mock.Job()
job.Type = structs.JobTypeBatch
job.Status = structs.JobStatusRunning
job.ParameterizedJob = &structs.ParameterizedJobConfig{
Payload: structs.DispatchPayloadRequired,
}
err := store.UpsertJob(structs.MsgTypeTestSetup, 1000, job)
if err != nil {
t.Fatalf("err: %v", err)
}
// Create a core scheduler
snap, err := store.Snapshot()
if err != nil {
t.Fatalf("err: %v", err)
}
core := NewCoreScheduler(s1, snap)
// Attempt the GC
gc := s1.coreJobEval(structs.CoreJobForceGC, 1002)
err = core.Process(gc)
if err != nil {
t.Fatalf("err: %v", err)
}
// Should still exist
ws := memdb.NewWatchSet()
out, err := store.JobByID(ws, job.Namespace, job.ID)
if err != nil {
t.Fatalf("err: %v", err)
}
if out == nil {
t.Fatalf("bad: %v", out)
}
// Mark the job as stopped and try again
job2 := job.Copy()
job2.Stop = true
err = store.UpsertJob(structs.MsgTypeTestSetup, 2000, job2)
if err != nil {
t.Fatalf("err: %v", err)
}
// Create a core scheduler
snap, err = store.Snapshot()
if err != nil {
t.Fatalf("err: %v", err)
}
core = NewCoreScheduler(s1, snap)
// Attempt the GC
gc = s1.coreJobEval(structs.CoreJobForceGC, 2002)
err = core.Process(gc)
if err != nil {
t.Fatalf("err: %v", err)
}
// Should not exist
out, err = store.JobByID(ws, job.Namespace, job.ID)
if err != nil {
t.Fatalf("err: %v", err)
}
if out != nil {
t.Fatalf("bad: %+v", out)
}
}
// This test ensures periodic jobs don't get GCd until they are stopped
func TestCoreScheduler_JobGC_Periodic(t *testing.T) {
ci.Parallel(t)
s1, cleanupS1 := TestServer(t, nil)
defer cleanupS1()
testutil.WaitForLeader(t, s1.RPC)
// COMPAT Remove in 0.6: Reset the FSM time table since we reconcile which sets index 0
s1.fsm.timetable.table = make([]TimeTableEntry, 1, 10)
// Insert a parameterized job.
store := s1.fsm.State()
job := mock.PeriodicJob()
err := store.UpsertJob(structs.MsgTypeTestSetup, 1000, job)
if err != nil {
t.Fatalf("err: %v", err)
}
// Create a core scheduler
snap, err := store.Snapshot()
if err != nil {
t.Fatalf("err: %v", err)
}
core := NewCoreScheduler(s1, snap)
// Attempt the GC
gc := s1.coreJobEval(structs.CoreJobForceGC, 1002)
err = core.Process(gc)
if err != nil {
t.Fatalf("err: %v", err)
}
// Should still exist
ws := memdb.NewWatchSet()
out, err := store.JobByID(ws, job.Namespace, job.ID)
if err != nil {
t.Fatalf("err: %v", err)
}
if out == nil {
t.Fatalf("bad: %v", out)
}
// Mark the job as stopped and try again
job2 := job.Copy()
job2.Stop = true
err = store.UpsertJob(structs.MsgTypeTestSetup, 2000, job2)
if err != nil {
t.Fatalf("err: %v", err)
}
// Create a core scheduler
snap, err = store.Snapshot()
if err != nil {
t.Fatalf("err: %v", err)
}
core = NewCoreScheduler(s1, snap)
// Attempt the GC
gc = s1.coreJobEval(structs.CoreJobForceGC, 2002)
err = core.Process(gc)
if err != nil {
t.Fatalf("err: %v", err)
}
// Should not exist
out, err = store.JobByID(ws, job.Namespace, job.ID)
if err != nil {
t.Fatalf("err: %v", err)
}
if out != nil {
t.Fatalf("bad: %+v", out)
}
}
func TestCoreScheduler_DeploymentGC(t *testing.T) {
ci.Parallel(t)
s1, cleanupS1 := TestServer(t, nil)
defer cleanupS1()
testutil.WaitForLeader(t, s1.RPC)
assert := assert.New(t)
// COMPAT Remove in 0.6: Reset the FSM time table since we reconcile which sets index 0
s1.fsm.timetable.table = make([]TimeTableEntry, 1, 10)
// Insert an active, terminal, and terminal with allocations deployment
store := s1.fsm.State()
d1, d2, d3 := mock.Deployment(), mock.Deployment(), mock.Deployment()
d1.Status = structs.DeploymentStatusFailed
d3.Status = structs.DeploymentStatusSuccessful
assert.Nil(store.UpsertDeployment(1000, d1), "UpsertDeployment")
assert.Nil(store.UpsertDeployment(1001, d2), "UpsertDeployment")
assert.Nil(store.UpsertDeployment(1002, d3), "UpsertDeployment")
a := mock.Alloc()
a.JobID = d3.JobID
a.DeploymentID = d3.ID
assert.Nil(store.UpsertAllocs(structs.MsgTypeTestSetup, 1003, []*structs.Allocation{a}), "UpsertAllocs")
// Update the time tables to make this work
tt := s1.fsm.TimeTable()
tt.Witness(2000, time.Now().UTC().Add(-1*s1.config.DeploymentGCThreshold))
// Create a core scheduler
snap, err := store.Snapshot()
assert.Nil(err, "Snapshot")
core := NewCoreScheduler(s1, snap)
// Attempt the GC
gc := s1.coreJobEval(structs.CoreJobDeploymentGC, 2000)
assert.Nil(core.Process(gc), "Process GC")
// Should be gone
ws := memdb.NewWatchSet()
out, err := store.DeploymentByID(ws, d1.ID)
assert.Nil(err, "DeploymentByID")
assert.Nil(out, "Terminal Deployment")
out2, err := store.DeploymentByID(ws, d2.ID)
assert.Nil(err, "DeploymentByID")
assert.NotNil(out2, "Active Deployment")
out3, err := store.DeploymentByID(ws, d3.ID)
assert.Nil(err, "DeploymentByID")
assert.NotNil(out3, "Terminal Deployment With Allocs")
}
func TestCoreScheduler_DeploymentGC_Force(t *testing.T) {
ci.Parallel(t)
for _, withAcl := range []bool{false, true} {
t.Run(fmt.Sprintf("with acl %v", withAcl), func(t *testing.T) {
var server *Server
var cleanup func()
if withAcl {
server, _, cleanup = TestACLServer(t, nil)
} else {
server, cleanup = TestServer(t, nil)
}
defer cleanup()
testutil.WaitForLeader(t, server.RPC)
assert := assert.New(t)
// COMPAT Remove in 0.6: Reset the FSM time table since we reconcile which sets index 0
server.fsm.timetable.table = make([]TimeTableEntry, 1, 10)
// Insert terminal and active deployment
store := server.fsm.State()
d1, d2 := mock.Deployment(), mock.Deployment()
d1.Status = structs.DeploymentStatusFailed
assert.Nil(store.UpsertDeployment(1000, d1), "UpsertDeployment")
assert.Nil(store.UpsertDeployment(1001, d2), "UpsertDeployment")
// Create a core scheduler
snap, err := store.Snapshot()
assert.Nil(err, "Snapshot")
core := NewCoreScheduler(server, snap)
// Attempt the GC
gc := server.coreJobEval(structs.CoreJobForceGC, 1000)
assert.Nil(core.Process(gc), "Process Force GC")
// Should be gone
ws := memdb.NewWatchSet()
out, err := store.DeploymentByID(ws, d1.ID)
assert.Nil(err, "DeploymentByID")
assert.Nil(out, "Terminal Deployment")
out2, err := store.DeploymentByID(ws, d2.ID)
assert.Nil(err, "DeploymentByID")
assert.NotNil(out2, "Active Deployment")
})
}
}
func TestCoreScheduler_PartitionEvalReap(t *testing.T) {
ci.Parallel(t)
s1, cleanupS1 := TestServer(t, nil)
defer cleanupS1()
testutil.WaitForLeader(t, s1.RPC)
// COMPAT Remove in 0.6: Reset the FSM time table since we reconcile which sets index 0
s1.fsm.timetable.table = make([]TimeTableEntry, 1, 10)
// Create a core scheduler
snap, err := s1.fsm.State().Snapshot()
if err != nil {
t.Fatalf("err: %v", err)
}
core := NewCoreScheduler(s1, snap)
// Set the max ids per reap to something lower.
maxIdsPerReap = 2
evals := []string{"a", "b", "c"}
allocs := []string{"1", "2", "3"}
requests := core.(*CoreScheduler).partitionEvalReap(evals, allocs)
if len(requests) != 3 {
t.Fatalf("Expected 3 requests got: %v", requests)
}
first := requests[0]
if len(first.Allocs) != 2 && len(first.Evals) != 0 {
t.Fatalf("Unexpected first request: %v", first)
}
second := requests[1]
if len(second.Allocs) != 1 && len(second.Evals) != 1 {
t.Fatalf("Unexpected second request: %v", second)
}
third := requests[2]
if len(third.Allocs) != 0 && len(third.Evals) != 2 {
t.Fatalf("Unexpected third request: %v", third)
}
}
func TestCoreScheduler_PartitionDeploymentReap(t *testing.T) {
ci.Parallel(t)
s1, cleanupS1 := TestServer(t, nil)
defer cleanupS1()
testutil.WaitForLeader(t, s1.RPC)
// COMPAT Remove in 0.6: Reset the FSM time table since we reconcile which sets index 0
s1.fsm.timetable.table = make([]TimeTableEntry, 1, 10)
// Create a core scheduler
snap, err := s1.fsm.State().Snapshot()
if err != nil {
t.Fatalf("err: %v", err)
}
core := NewCoreScheduler(s1, snap)
// Set the max ids per reap to something lower.
maxIdsPerReap = 2
deployments := []string{"a", "b", "c"}
requests := core.(*CoreScheduler).partitionDeploymentReap(deployments)
if len(requests) != 2 {
t.Fatalf("Expected 2 requests got: %v", requests)
}
first := requests[0]
if len(first.Deployments) != 2 {
t.Fatalf("Unexpected first request: %v", first)
}
second := requests[1]
if len(second.Deployments) != 1 {
t.Fatalf("Unexpected second request: %v", second)
}
}
func TestCoreScheduler_PartitionJobReap(t *testing.T) {
ci.Parallel(t)
s1, cleanupS1 := TestServer(t, nil)
defer cleanupS1()
testutil.WaitForLeader(t, s1.RPC)
// Create a core scheduler
snap, err := s1.fsm.State().Snapshot()
if err != nil {
t.Fatalf("err: %v", err)
}
core := NewCoreScheduler(s1, snap)
// Set the max ids per reap to something lower.
maxIdsPerReap = 2
jobs := []*structs.Job{mock.Job(), mock.Job(), mock.Job()}
requests := core.(*CoreScheduler).partitionJobReap(jobs, "")
require.Len(t, requests, 2)
first := requests[0]
second := requests[1]
require.Len(t, first.Jobs, 2)
require.Len(t, second.Jobs, 1)
}
// Tests various scenarios when allocations are eligible to be GCed
func TestAllocation_GCEligible(t *testing.T) {
type testCase struct {
Desc string
GCTime time.Time
ClientStatus string
DesiredStatus string
JobStatus string
JobStop bool
AllocJobModifyIndex uint64
JobModifyIndex uint64
ModifyIndex uint64
NextAllocID string
ReschedulePolicy *structs.ReschedulePolicy
RescheduleTrackers []*structs.RescheduleEvent
ThresholdIndex uint64
ShouldGC bool
}
fail := time.Now()
harness := []testCase{
{
Desc: "Don't GC when non terminal",
ClientStatus: structs.AllocClientStatusPending,
DesiredStatus: structs.AllocDesiredStatusRun,
GCTime: fail,
ModifyIndex: 90,
ThresholdIndex: 90,
ShouldGC: false,
},
{
Desc: "Don't GC when non terminal and job stopped",
ClientStatus: structs.AllocClientStatusPending,
DesiredStatus: structs.AllocDesiredStatusRun,
JobStop: true,
GCTime: fail,
ModifyIndex: 90,
ThresholdIndex: 90,
ShouldGC: false,
},
{
Desc: "Don't GC when non terminal and job dead",
ClientStatus: structs.AllocClientStatusPending,
DesiredStatus: structs.AllocDesiredStatusRun,
JobStatus: structs.JobStatusDead,
GCTime: fail,
ModifyIndex: 90,
ThresholdIndex: 90,
ShouldGC: false,
},
{
Desc: "Don't GC when non terminal on client and job dead",
ClientStatus: structs.AllocClientStatusRunning,
DesiredStatus: structs.AllocDesiredStatusStop,
JobStatus: structs.JobStatusDead,
GCTime: fail,
ModifyIndex: 90,
ThresholdIndex: 90,
ShouldGC: false,
},
{
Desc: "GC when terminal but not failed ",
ClientStatus: structs.AllocClientStatusComplete,
DesiredStatus: structs.AllocDesiredStatusRun,
GCTime: fail,
ModifyIndex: 90,
ThresholdIndex: 90,
ReschedulePolicy: nil,
ShouldGC: true,
},
{
Desc: "Don't GC when threshold not met",
ClientStatus: structs.AllocClientStatusComplete,
DesiredStatus: structs.AllocDesiredStatusStop,
GCTime: fail,
ModifyIndex: 100,
ThresholdIndex: 90,
ReschedulePolicy: nil,
ShouldGC: false,
},
{
Desc: "GC when no reschedule policy",
ClientStatus: structs.AllocClientStatusFailed,
DesiredStatus: structs.AllocDesiredStatusRun,
GCTime: fail,
ReschedulePolicy: nil,
ModifyIndex: 90,
ThresholdIndex: 90,
ShouldGC: true,
},
{
Desc: "GC when empty policy",
ClientStatus: structs.AllocClientStatusFailed,
DesiredStatus: structs.AllocDesiredStatusRun,
GCTime: fail,
ReschedulePolicy: &structs.ReschedulePolicy{Attempts: 0, Interval: 0 * time.Minute},
ModifyIndex: 90,
ThresholdIndex: 90,
ShouldGC: true,
},
{
Desc: "Don't GC when no previous reschedule attempts",
ClientStatus: structs.AllocClientStatusFailed,
DesiredStatus: structs.AllocDesiredStatusRun,
GCTime: fail,
ModifyIndex: 90,
ThresholdIndex: 90,
ReschedulePolicy: &structs.ReschedulePolicy{Attempts: 1, Interval: 1 * time.Minute},
ShouldGC: false,
},
{
Desc: "Don't GC when prev reschedule attempt within interval",
ClientStatus: structs.AllocClientStatusFailed,
DesiredStatus: structs.AllocDesiredStatusRun,
ReschedulePolicy: &structs.ReschedulePolicy{Attempts: 2, Interval: 30 * time.Minute},
GCTime: fail,
ModifyIndex: 90,
ThresholdIndex: 90,
RescheduleTrackers: []*structs.RescheduleEvent{
{
RescheduleTime: fail.Add(-5 * time.Minute).UTC().UnixNano(),
},
},
ShouldGC: false,
},
{
Desc: "GC with prev reschedule attempt outside interval",
ClientStatus: structs.AllocClientStatusFailed,
DesiredStatus: structs.AllocDesiredStatusRun,
GCTime: fail,
ReschedulePolicy: &structs.ReschedulePolicy{Attempts: 5, Interval: 30 * time.Minute},
RescheduleTrackers: []*structs.RescheduleEvent{
{
RescheduleTime: fail.Add(-45 * time.Minute).UTC().UnixNano(),
},
{
RescheduleTime: fail.Add(-60 * time.Minute).UTC().UnixNano(),
},
},
ShouldGC: true,
},
{
Desc: "GC when next alloc id is set",
ClientStatus: structs.AllocClientStatusFailed,
DesiredStatus: structs.AllocDesiredStatusRun,
GCTime: fail,
ReschedulePolicy: &structs.ReschedulePolicy{Attempts: 5, Interval: 30 * time.Minute},
RescheduleTrackers: []*structs.RescheduleEvent{
{
RescheduleTime: fail.Add(-3 * time.Minute).UTC().UnixNano(),
},
},
NextAllocID: uuid.Generate(),
ShouldGC: true,
},
{
Desc: "Don't GC when next alloc id is not set and unlimited restarts",
ClientStatus: structs.AllocClientStatusFailed,
DesiredStatus: structs.AllocDesiredStatusRun,
GCTime: fail,
ReschedulePolicy: &structs.ReschedulePolicy{Unlimited: true, Delay: 5 * time.Second, DelayFunction: "constant"},
RescheduleTrackers: []*structs.RescheduleEvent{
{
RescheduleTime: fail.Add(-3 * time.Minute).UTC().UnixNano(),
},
},
ShouldGC: false,
},
{
Desc: "GC when job is stopped",
ClientStatus: structs.AllocClientStatusFailed,
DesiredStatus: structs.AllocDesiredStatusRun,
GCTime: fail,
ReschedulePolicy: &structs.ReschedulePolicy{Attempts: 5, Interval: 30 * time.Minute},
RescheduleTrackers: []*structs.RescheduleEvent{
{
RescheduleTime: fail.Add(-3 * time.Minute).UTC().UnixNano(),
},
},
JobStop: true,
ShouldGC: true,
},
{
Desc: "GC when job status is dead",
ClientStatus: structs.AllocClientStatusFailed,
DesiredStatus: structs.AllocDesiredStatusRun,
GCTime: fail,
ReschedulePolicy: &structs.ReschedulePolicy{Attempts: 5, Interval: 30 * time.Minute},
RescheduleTrackers: []*structs.RescheduleEvent{
{
RescheduleTime: fail.Add(-3 * time.Minute).UTC().UnixNano(),
},
},
JobStatus: structs.JobStatusDead,
ShouldGC: true,
},
{
Desc: "GC when desired status is stop, unlimited reschedule policy, no previous reschedule events",
ClientStatus: structs.AllocClientStatusFailed,
DesiredStatus: structs.AllocDesiredStatusStop,
GCTime: fail,
ReschedulePolicy: &structs.ReschedulePolicy{Unlimited: true, Delay: 5 * time.Second, DelayFunction: "constant"},
ShouldGC: true,
},
{
Desc: "GC when desired status is stop, limited reschedule policy, some previous reschedule events",
ClientStatus: structs.AllocClientStatusFailed,
DesiredStatus: structs.AllocDesiredStatusStop,
GCTime: fail,
ReschedulePolicy: &structs.ReschedulePolicy{Attempts: 5, Interval: 30 * time.Minute},
RescheduleTrackers: []*structs.RescheduleEvent{
{
RescheduleTime: fail.Add(-3 * time.Minute).UTC().UnixNano(),
},
},
ShouldGC: true,
},
}
for _, tc := range harness {
alloc := &structs.Allocation{}
alloc.ModifyIndex = tc.ModifyIndex
alloc.DesiredStatus = tc.DesiredStatus
alloc.ClientStatus = tc.ClientStatus
alloc.RescheduleTracker = &structs.RescheduleTracker{Events: tc.RescheduleTrackers}
alloc.NextAllocation = tc.NextAllocID
job := mock.Job()
alloc.TaskGroup = job.TaskGroups[0].Name
job.TaskGroups[0].ReschedulePolicy = tc.ReschedulePolicy
if tc.JobStatus != "" {
job.Status = tc.JobStatus
}
job.Stop = tc.JobStop
t.Run(tc.Desc, func(t *testing.T) {
if got := allocGCEligible(alloc, job, tc.GCTime, tc.ThresholdIndex); got != tc.ShouldGC {
t.Fatalf("expected %v but got %v", tc.ShouldGC, got)
}
})
}
// Verify nil job
alloc := mock.Alloc()
alloc.ClientStatus = structs.AllocClientStatusComplete
require.True(t, allocGCEligible(alloc, nil, time.Now(), 1000))
}
func TestCoreScheduler_CSIPluginGC(t *testing.T) {
ci.Parallel(t)
srv, cleanupSRV := TestServer(t, nil)
defer cleanupSRV()
testutil.WaitForLeader(t, srv.RPC)
srv.fsm.timetable.table = make([]TimeTableEntry, 1, 10)
deleteNodes := state.CreateTestCSIPlugin(srv.fsm.State(), "foo")
defer deleteNodes()
store := srv.fsm.State()
// Update the time tables to make this work
tt := srv.fsm.TimeTable()
index := uint64(2000)
tt.Witness(index, time.Now().UTC().Add(-1*srv.config.CSIPluginGCThreshold))
// Create a core scheduler
snap, err := store.Snapshot()
require.NoError(t, err)
core := NewCoreScheduler(srv, snap)
// Attempt the GC
index++
gc := srv.coreJobEval(structs.CoreJobCSIPluginGC, index)
require.NoError(t, core.Process(gc))
// Should not be gone (plugin in use)
ws := memdb.NewWatchSet()
plug, err := store.CSIPluginByID(ws, "foo")
require.NotNil(t, plug)
require.NoError(t, err)
// Empty the plugin
plug.Controllers = map[string]*structs.CSIInfo{}
plug.Nodes = map[string]*structs.CSIInfo{}
index++
err = store.UpsertCSIPlugin(index, plug)
require.NoError(t, err)
// Retry
index++
gc = srv.coreJobEval(structs.CoreJobCSIPluginGC, index)
require.NoError(t, core.Process(gc))
// Should be gone
plug, err = store.CSIPluginByID(ws, "foo")
require.Nil(t, plug)
require.NoError(t, err)
}
func TestCoreScheduler_CSIVolumeClaimGC(t *testing.T) {
srv, shutdown := TestServer(t, func(c *Config) {
c.NumSchedulers = 0 // Prevent automatic dequeue
})
defer shutdown()
testutil.WaitForLeader(t, srv.RPC)
codec := rpcClient(t, srv)
index := uint64(1)
volID := uuid.Generate()
ns := structs.DefaultNamespace
pluginID := "foo"
store := srv.fsm.State()
ws := memdb.NewWatchSet()
index, _ = store.LatestIndex()
// Create client node and plugin
node := mock.Node()
node.Attributes["nomad.version"] = "0.11.0" // needs client RPCs
node.CSINodePlugins = map[string]*structs.CSIInfo{
pluginID: {
PluginID: pluginID,
Healthy: true,
NodeInfo: &structs.CSINodeInfo{},
},
}
index++
err := store.UpsertNode(structs.MsgTypeTestSetup, index, node)
require.NoError(t, err)
// *Important*: for volume writes in this test we must use RPCs
// rather than StateStore methods directly, or the blocking query
// in volumewatcher won't get the final update for GC because it's
// watching on a different store at that point
// Register a volume
vols := []*structs.CSIVolume{{
ID: volID,
Namespace: ns,
PluginID: pluginID,
Topologies: []*structs.CSITopology{},
RequestedCapabilities: []*structs.CSIVolumeCapability{{
AccessMode: structs.CSIVolumeAccessModeMultiNodeSingleWriter,
AttachmentMode: structs.CSIVolumeAttachmentModeFilesystem,
}},
}}
volReq := &structs.CSIVolumeRegisterRequest{Volumes: vols}
volReq.Namespace = ns
volReq.Region = srv.config.Region
err = msgpackrpc.CallWithCodec(codec, "CSIVolume.Register",
volReq, &structs.CSIVolumeRegisterResponse{})
require.NoError(t, err)
// Create a job with two allocs that claim the volume.
// We use two allocs here, one of which is not running, so
// that we can assert the volumewatcher has made one
// complete pass (and removed the 2nd alloc) before we
// run the GC
eval := mock.Eval()
eval.Status = structs.EvalStatusFailed
index++
store.UpsertJobSummary(index, mock.JobSummary(eval.JobID))
index++
err = store.UpsertEvals(structs.MsgTypeTestSetup, index, []*structs.Evaluation{eval})
require.Nil(t, err)
job := mock.Job()
job.ID = eval.JobID
job.Status = structs.JobStatusRunning
index++
err = store.UpsertJob(structs.MsgTypeTestSetup, index, job)
require.NoError(t, err)
alloc1, alloc2 := mock.Alloc(), mock.Alloc()
alloc1.NodeID = node.ID
alloc1.ClientStatus = structs.AllocClientStatusRunning
alloc1.Job = job
alloc1.JobID = job.ID
alloc1.EvalID = eval.ID
alloc2.NodeID = node.ID
alloc2.ClientStatus = structs.AllocClientStatusComplete
alloc2.DesiredStatus = structs.AllocDesiredStatusStop
alloc2.Job = job
alloc2.JobID = job.ID
alloc2.EvalID = eval.ID
summary := mock.JobSummary(alloc1.JobID)
index++
require.NoError(t, store.UpsertJobSummary(index, summary))
summary = mock.JobSummary(alloc2.JobID)
index++
require.NoError(t, store.UpsertJobSummary(index, summary))
index++
require.NoError(t, store.UpsertAllocs(structs.MsgTypeTestSetup, index, []*structs.Allocation{alloc1, alloc2}))
req := &structs.CSIVolumeClaimRequest{
VolumeID: volID,
AllocationID: alloc1.ID,
NodeID: uuid.Generate(), // doesn't exist so we don't get errors trying to unmount volumes from it
Claim: structs.CSIVolumeClaimWrite,
AccessMode: structs.CSIVolumeAccessModeMultiNodeMultiWriter,
AttachmentMode: structs.CSIVolumeAttachmentModeFilesystem,
State: structs.CSIVolumeClaimStateTaken,
WriteRequest: structs.WriteRequest{
Namespace: ns,
Region: srv.config.Region,
},
}
err = msgpackrpc.CallWithCodec(codec, "CSIVolume.Claim",
req, &structs.CSIVolumeClaimResponse{})
require.NoError(t, err, "write claim should succeed")
req.AllocationID = alloc2.ID
req.State = structs.CSIVolumeClaimStateUnpublishing
err = msgpackrpc.CallWithCodec(codec, "CSIVolume.Claim",
req, &structs.CSIVolumeClaimResponse{})
require.NoError(t, err, "unpublishing claim should succeed")
require.Eventually(t, func() bool {
vol, err := store.CSIVolumeByID(ws, ns, volID)
require.NoError(t, err)
return len(vol.WriteClaims) == 1 &&
len(vol.WriteAllocs) == 1 &&
len(vol.PastClaims) == 0
}, time.Second*1, 100*time.Millisecond,
"volumewatcher should have released unpublishing claim without GC")
// At this point we can guarantee that volumewatcher is waiting
// for new work. Delete allocation and job so that the next pass
// thru volumewatcher has more work to do
index, _ = store.LatestIndex()
index++
err = store.DeleteJob(index, ns, job.ID)
require.NoError(t, err)
index, _ = store.LatestIndex()
index++
err = store.DeleteEval(index, []string{eval.ID}, []string{alloc1.ID})
require.NoError(t, err)
// Create a core scheduler and attempt the volume claim GC
snap, err := store.Snapshot()
require.NoError(t, err)
core := NewCoreScheduler(srv, snap)
index, _ = snap.LatestIndex()
index++
gc := srv.coreJobEval(structs.CoreJobForceGC, index)
c := core.(*CoreScheduler)
require.NoError(t, c.csiVolumeClaimGC(gc))
// the only remaining claim is for a deleted alloc with no path to
// the non-existent node, so volumewatcher will release the
// remaining claim
require.Eventually(t, func() bool {
vol, _ := store.CSIVolumeByID(ws, ns, volID)
return len(vol.WriteClaims) == 0 &&
len(vol.WriteAllocs) == 0 &&
len(vol.PastClaims) == 0
}, time.Second*2, 10*time.Millisecond, "claims were not released")
}
func TestCoreScheduler_CSIBadState_ClaimGC(t *testing.T) {
ci.Parallel(t)
srv, shutdown := TestServer(t, func(c *Config) {
c.NumSchedulers = 0 // Prevent automatic dequeue
})
defer shutdown()
testutil.WaitForLeader(t, srv.RPC)
err := state.TestBadCSIState(t, srv.State())
require.NoError(t, err)
snap, err := srv.State().Snapshot()
require.NoError(t, err)
core := NewCoreScheduler(srv, snap)
index, _ := srv.State().LatestIndex()
index++
gc := srv.coreJobEval(structs.CoreJobForceGC, index)
c := core.(*CoreScheduler)
require.NoError(t, c.csiVolumeClaimGC(gc))
require.Eventually(t, func() bool {
vol, _ := srv.State().CSIVolumeByID(nil,
structs.DefaultNamespace, "csi-volume-nfs0")
if len(vol.PastClaims) != 2 {
return false
}
for _, claim := range vol.PastClaims {
if claim.State != structs.CSIVolumeClaimStateUnpublishing {
return false
}
}
return true
}, time.Second*1, 10*time.Millisecond, "invalid claims should be marked for GC")
}
func TestCoreScheduler_FailLoop(t *testing.T) {
ci.Parallel(t)
srv, cleanupSrv := TestServer(t, func(c *Config) {
c.NumSchedulers = 0 // Prevent automatic dequeue
c.EvalDeliveryLimit = 2
c.EvalFailedFollowupBaselineDelay = time.Duration(50 * time.Millisecond)
c.EvalFailedFollowupDelayRange = time.Duration(1 * time.Millisecond)
})
defer cleanupSrv()
codec := rpcClient(t, srv)
sched := []string{structs.JobTypeCore}
testutil.WaitForResult(func() (bool, error) {
return srv.evalBroker.Enabled(), nil
}, func(err error) {
t.Fatalf("should enable eval broker")
})
// Enqueue a core job eval that can never succeed because it was enqueued
// by another leader that's now gone
expected := srv.coreJobEval(structs.CoreJobCSIPluginGC, 100)
expected.LeaderACL = "nonsense"
srv.evalBroker.Enqueue(expected)
nack := func(evalID, token string) error {
req := &structs.EvalAckRequest{
EvalID: evalID,
Token: token,
WriteRequest: structs.WriteRequest{Region: "global"},
}
var resp structs.GenericResponse
return msgpackrpc.CallWithCodec(codec, "Eval.Nack", req, &resp)
}
out, token, err := srv.evalBroker.Dequeue(sched, time.Second*5)
require.NoError(t, err)
require.NotNil(t, out)
require.Equal(t, expected, out)
// first fail
require.NoError(t, nack(out.ID, token))
out, token, err = srv.evalBroker.Dequeue(sched, time.Second*5)
require.NoError(t, err)
require.NotNil(t, out)
require.Equal(t, expected, out)
// second fail, should not result in failed-follow-up
require.NoError(t, nack(out.ID, token))
out, token, err = srv.evalBroker.Dequeue(sched, time.Second*5)
require.NoError(t, err)
if out != nil {
t.Fatalf(
"failed core jobs should not result in follow-up. TriggeredBy: %v",
out.TriggeredBy)
}
}