903b5baaa4
When replication of a single key fails, the replication loop breaks early and therefore keys that fall later in the sorting order will never get replicated. This is particularly a problem for clusters impacted by the bug that caused #14981 and that were later upgraded; the keys that were never replicated can now never be replicated, and so we need to handle them safely. Included in the replication fix: * Refactor the replication loop so that each key replicated in a function call that returns an error, to make the workflow more clear and reduce nesting. Log the error and continue. * Improve stability of keyring replication tests. We no longer block leadership on initializing the keyring, so there's a race condition in the keyring tests where we can test for the existence of the root key before the keyring has been initialize. Change this to an "eventually" test. But these fixes aren't enough to fix #14981 because they'll end up seeing an error once a second complaining about the missing key, so we also need to fix keyring GC so the keys can be removed from the state store. Now we'll store the key ID used to sign a workload identity in the Allocation, and we'll index the Allocation table on that so we can track whether any live Allocation was signed with a particular key ID.
2849 lines
77 KiB
Go
2849 lines
77 KiB
Go
package nomad
|
|
|
|
import (
|
|
"fmt"
|
|
"testing"
|
|
"time"
|
|
|
|
memdb "github.com/hashicorp/go-memdb"
|
|
msgpackrpc "github.com/hashicorp/net-rpc-msgpackrpc"
|
|
"github.com/hashicorp/nomad/ci"
|
|
"github.com/hashicorp/nomad/helper/pointer"
|
|
"github.com/hashicorp/nomad/helper/uuid"
|
|
"github.com/hashicorp/nomad/nomad/mock"
|
|
"github.com/hashicorp/nomad/nomad/state"
|
|
"github.com/hashicorp/nomad/nomad/structs"
|
|
"github.com/hashicorp/nomad/testutil"
|
|
"github.com/stretchr/testify/assert"
|
|
"github.com/stretchr/testify/require"
|
|
)
|
|
|
|
func TestCoreScheduler_EvalGC(t *testing.T) {
|
|
ci.Parallel(t)
|
|
|
|
s1, cleanupS1 := TestServer(t, nil)
|
|
defer cleanupS1()
|
|
testutil.WaitForLeader(t, s1.RPC)
|
|
|
|
// COMPAT Remove in 0.6: Reset the FSM time table since we reconcile which sets index 0
|
|
s1.fsm.timetable.table = make([]TimeTableEntry, 1, 10)
|
|
|
|
// Insert "dead" eval
|
|
store := s1.fsm.State()
|
|
eval := mock.Eval()
|
|
eval.Status = structs.EvalStatusFailed
|
|
store.UpsertJobSummary(999, mock.JobSummary(eval.JobID))
|
|
err := store.UpsertEvals(structs.MsgTypeTestSetup, 1000, []*structs.Evaluation{eval})
|
|
require.Nil(t, err)
|
|
|
|
// Insert mock job with rescheduling disabled
|
|
job := mock.Job()
|
|
job.ID = eval.JobID
|
|
job.TaskGroups[0].ReschedulePolicy = &structs.ReschedulePolicy{
|
|
Attempts: 0,
|
|
Interval: 0 * time.Second,
|
|
}
|
|
err = store.UpsertJob(structs.MsgTypeTestSetup, 1001, job)
|
|
require.Nil(t, err)
|
|
|
|
// Insert "dead" alloc
|
|
alloc := mock.Alloc()
|
|
alloc.EvalID = eval.ID
|
|
alloc.DesiredStatus = structs.AllocDesiredStatusStop
|
|
alloc.JobID = eval.JobID
|
|
alloc.TaskGroup = job.TaskGroups[0].Name
|
|
|
|
// Insert "lost" alloc
|
|
alloc2 := mock.Alloc()
|
|
alloc2.EvalID = eval.ID
|
|
alloc2.DesiredStatus = structs.AllocDesiredStatusRun
|
|
alloc2.ClientStatus = structs.AllocClientStatusLost
|
|
alloc2.JobID = eval.JobID
|
|
alloc2.TaskGroup = job.TaskGroups[0].Name
|
|
err = store.UpsertAllocs(structs.MsgTypeTestSetup, 1001, []*structs.Allocation{alloc, alloc2})
|
|
if err != nil {
|
|
t.Fatalf("err: %v", err)
|
|
}
|
|
|
|
// Update the time tables to make this work
|
|
tt := s1.fsm.TimeTable()
|
|
tt.Witness(2000, time.Now().UTC().Add(-1*s1.config.EvalGCThreshold))
|
|
|
|
// Create a core scheduler
|
|
snap, err := store.Snapshot()
|
|
if err != nil {
|
|
t.Fatalf("err: %v", err)
|
|
}
|
|
core := NewCoreScheduler(s1, snap)
|
|
|
|
// Attempt the GC
|
|
gc := s1.coreJobEval(structs.CoreJobEvalGC, 2000)
|
|
err = core.Process(gc)
|
|
if err != nil {
|
|
t.Fatalf("err: %v", err)
|
|
}
|
|
|
|
// Should be gone
|
|
ws := memdb.NewWatchSet()
|
|
out, err := store.EvalByID(ws, eval.ID)
|
|
if err != nil {
|
|
t.Fatalf("err: %v", err)
|
|
}
|
|
if out != nil {
|
|
t.Fatalf("bad: %v", out)
|
|
}
|
|
|
|
outA, err := store.AllocByID(ws, alloc.ID)
|
|
if err != nil {
|
|
t.Fatalf("err: %v", err)
|
|
}
|
|
if outA != nil {
|
|
t.Fatalf("bad: %v", outA)
|
|
}
|
|
|
|
outA2, err := store.AllocByID(ws, alloc2.ID)
|
|
if err != nil {
|
|
t.Fatalf("err: %v", err)
|
|
}
|
|
if outA2 != nil {
|
|
t.Fatalf("bad: %v", outA2)
|
|
}
|
|
}
|
|
|
|
// Tests GC behavior on allocations being rescheduled
|
|
func TestCoreScheduler_EvalGC_ReschedulingAllocs(t *testing.T) {
|
|
ci.Parallel(t)
|
|
|
|
s1, cleanupS1 := TestServer(t, nil)
|
|
defer cleanupS1()
|
|
testutil.WaitForLeader(t, s1.RPC)
|
|
|
|
// COMPAT Remove in 0.6: Reset the FSM time table since we reconcile which sets index 0
|
|
s1.fsm.timetable.table = make([]TimeTableEntry, 1, 10)
|
|
|
|
// Insert "dead" eval
|
|
store := s1.fsm.State()
|
|
eval := mock.Eval()
|
|
eval.Status = structs.EvalStatusFailed
|
|
store.UpsertJobSummary(999, mock.JobSummary(eval.JobID))
|
|
err := store.UpsertEvals(structs.MsgTypeTestSetup, 1000, []*structs.Evaluation{eval})
|
|
require.Nil(t, err)
|
|
|
|
// Insert "pending" eval for same job
|
|
eval2 := mock.Eval()
|
|
eval2.JobID = eval.JobID
|
|
store.UpsertJobSummary(999, mock.JobSummary(eval2.JobID))
|
|
err = store.UpsertEvals(structs.MsgTypeTestSetup, 1003, []*structs.Evaluation{eval2})
|
|
require.Nil(t, err)
|
|
|
|
// Insert mock job with default reschedule policy of 2 in 10 minutes
|
|
job := mock.Job()
|
|
job.ID = eval.JobID
|
|
|
|
err = store.UpsertJob(structs.MsgTypeTestSetup, 1001, job)
|
|
require.Nil(t, err)
|
|
|
|
// Insert failed alloc with an old reschedule attempt, can be GCed
|
|
alloc := mock.Alloc()
|
|
alloc.Job = job
|
|
alloc.EvalID = eval.ID
|
|
alloc.DesiredStatus = structs.AllocDesiredStatusRun
|
|
alloc.ClientStatus = structs.AllocClientStatusFailed
|
|
alloc.JobID = eval.JobID
|
|
alloc.TaskGroup = job.TaskGroups[0].Name
|
|
alloc.NextAllocation = uuid.Generate()
|
|
alloc.RescheduleTracker = &structs.RescheduleTracker{
|
|
Events: []*structs.RescheduleEvent{
|
|
{
|
|
RescheduleTime: time.Now().Add(-1 * time.Hour).UTC().UnixNano(),
|
|
PrevNodeID: uuid.Generate(),
|
|
PrevAllocID: uuid.Generate(),
|
|
},
|
|
},
|
|
}
|
|
|
|
alloc2 := mock.Alloc()
|
|
alloc2.Job = job
|
|
alloc2.EvalID = eval.ID
|
|
alloc2.DesiredStatus = structs.AllocDesiredStatusRun
|
|
alloc2.ClientStatus = structs.AllocClientStatusFailed
|
|
alloc2.JobID = eval.JobID
|
|
alloc2.TaskGroup = job.TaskGroups[0].Name
|
|
alloc2.RescheduleTracker = &structs.RescheduleTracker{
|
|
Events: []*structs.RescheduleEvent{
|
|
{
|
|
RescheduleTime: time.Now().Add(-3 * time.Minute).UTC().UnixNano(),
|
|
PrevNodeID: uuid.Generate(),
|
|
PrevAllocID: uuid.Generate(),
|
|
},
|
|
},
|
|
}
|
|
err = store.UpsertAllocs(structs.MsgTypeTestSetup, 1001, []*structs.Allocation{alloc, alloc2})
|
|
require.Nil(t, err)
|
|
|
|
// Update the time tables to make this work
|
|
tt := s1.fsm.TimeTable()
|
|
tt.Witness(2000, time.Now().UTC().Add(-1*s1.config.EvalGCThreshold))
|
|
|
|
// Create a core scheduler
|
|
snap, err := store.Snapshot()
|
|
if err != nil {
|
|
t.Fatalf("err: %v", err)
|
|
}
|
|
core := NewCoreScheduler(s1, snap)
|
|
|
|
// Attempt the GC, job has all terminal allocs and one pending eval
|
|
gc := s1.coreJobEval(structs.CoreJobEvalGC, 2000)
|
|
err = core.Process(gc)
|
|
require.Nil(t, err)
|
|
|
|
// Eval should still exist
|
|
ws := memdb.NewWatchSet()
|
|
out, err := store.EvalByID(ws, eval.ID)
|
|
require.Nil(t, err)
|
|
require.NotNil(t, out)
|
|
require.Equal(t, eval.ID, out.ID)
|
|
|
|
outA, err := store.AllocByID(ws, alloc.ID)
|
|
require.Nil(t, err)
|
|
require.Nil(t, outA)
|
|
|
|
outA2, err := store.AllocByID(ws, alloc2.ID)
|
|
require.Nil(t, err)
|
|
require.Equal(t, alloc2.ID, outA2.ID)
|
|
|
|
}
|
|
|
|
// Tests GC behavior on stopped job with reschedulable allocs
|
|
func TestCoreScheduler_EvalGC_StoppedJob_Reschedulable(t *testing.T) {
|
|
ci.Parallel(t)
|
|
|
|
s1, cleanupS1 := TestServer(t, nil)
|
|
defer cleanupS1()
|
|
testutil.WaitForLeader(t, s1.RPC)
|
|
|
|
// COMPAT Remove in 0.6: Reset the FSM time table since we reconcile which sets index 0
|
|
s1.fsm.timetable.table = make([]TimeTableEntry, 1, 10)
|
|
|
|
// Insert "dead" eval
|
|
store := s1.fsm.State()
|
|
eval := mock.Eval()
|
|
eval.Status = structs.EvalStatusFailed
|
|
store.UpsertJobSummary(999, mock.JobSummary(eval.JobID))
|
|
err := store.UpsertEvals(structs.MsgTypeTestSetup, 1000, []*structs.Evaluation{eval})
|
|
require.Nil(t, err)
|
|
|
|
// Insert mock stopped job with default reschedule policy of 2 in 10 minutes
|
|
job := mock.Job()
|
|
job.ID = eval.JobID
|
|
job.Stop = true
|
|
|
|
err = store.UpsertJob(structs.MsgTypeTestSetup, 1001, job)
|
|
require.Nil(t, err)
|
|
|
|
// Insert failed alloc with a recent reschedule attempt
|
|
alloc := mock.Alloc()
|
|
alloc.EvalID = eval.ID
|
|
alloc.DesiredStatus = structs.AllocDesiredStatusRun
|
|
alloc.ClientStatus = structs.AllocClientStatusLost
|
|
alloc.JobID = eval.JobID
|
|
alloc.TaskGroup = job.TaskGroups[0].Name
|
|
alloc.RescheduleTracker = &structs.RescheduleTracker{
|
|
Events: []*structs.RescheduleEvent{
|
|
{
|
|
RescheduleTime: time.Now().Add(-3 * time.Minute).UTC().UnixNano(),
|
|
PrevNodeID: uuid.Generate(),
|
|
PrevAllocID: uuid.Generate(),
|
|
},
|
|
},
|
|
}
|
|
err = store.UpsertAllocs(structs.MsgTypeTestSetup, 1001, []*structs.Allocation{alloc})
|
|
require.Nil(t, err)
|
|
|
|
// Update the time tables to make this work
|
|
tt := s1.fsm.TimeTable()
|
|
tt.Witness(2000, time.Now().UTC().Add(-1*s1.config.EvalGCThreshold))
|
|
|
|
// Create a core scheduler
|
|
snap, err := store.Snapshot()
|
|
if err != nil {
|
|
t.Fatalf("err: %v", err)
|
|
}
|
|
core := NewCoreScheduler(s1, snap)
|
|
|
|
// Attempt the GC
|
|
gc := s1.coreJobEval(structs.CoreJobEvalGC, 2000)
|
|
err = core.Process(gc)
|
|
require.Nil(t, err)
|
|
|
|
// Eval should not exist
|
|
ws := memdb.NewWatchSet()
|
|
out, err := store.EvalByID(ws, eval.ID)
|
|
require.Nil(t, err)
|
|
require.Nil(t, out)
|
|
|
|
// Alloc should not exist
|
|
outA, err := store.AllocByID(ws, alloc.ID)
|
|
require.Nil(t, err)
|
|
require.Nil(t, outA)
|
|
|
|
}
|
|
|
|
// An EvalGC should never reap a batch job that has not been stopped
|
|
func TestCoreScheduler_EvalGC_Batch(t *testing.T) {
|
|
ci.Parallel(t)
|
|
|
|
s1, cleanupS1 := TestServer(t, nil)
|
|
defer cleanupS1()
|
|
testutil.WaitForLeader(t, s1.RPC)
|
|
|
|
// COMPAT Remove in 0.6: Reset the FSM time table since we reconcile which sets index 0
|
|
s1.fsm.timetable.table = make([]TimeTableEntry, 1, 10)
|
|
|
|
// Insert a "dead" job
|
|
store := s1.fsm.State()
|
|
job := mock.Job()
|
|
job.Type = structs.JobTypeBatch
|
|
job.Status = structs.JobStatusDead
|
|
err := store.UpsertJob(structs.MsgTypeTestSetup, 1000, job)
|
|
if err != nil {
|
|
t.Fatalf("err: %v", err)
|
|
}
|
|
|
|
// Insert "complete" eval
|
|
eval := mock.Eval()
|
|
eval.Status = structs.EvalStatusComplete
|
|
eval.Type = structs.JobTypeBatch
|
|
eval.JobID = job.ID
|
|
err = store.UpsertEvals(structs.MsgTypeTestSetup, 1001, []*structs.Evaluation{eval})
|
|
if err != nil {
|
|
t.Fatalf("err: %v", err)
|
|
}
|
|
|
|
// Insert "failed" alloc
|
|
alloc := mock.Alloc()
|
|
alloc.Job = job
|
|
alloc.JobID = job.ID
|
|
alloc.EvalID = eval.ID
|
|
alloc.DesiredStatus = structs.AllocDesiredStatusStop
|
|
|
|
// Insert "lost" alloc
|
|
alloc2 := mock.Alloc()
|
|
alloc2.Job = job
|
|
alloc2.JobID = job.ID
|
|
alloc2.EvalID = eval.ID
|
|
alloc2.DesiredStatus = structs.AllocDesiredStatusRun
|
|
alloc2.ClientStatus = structs.AllocClientStatusLost
|
|
|
|
err = store.UpsertAllocs(structs.MsgTypeTestSetup, 1002, []*structs.Allocation{alloc, alloc2})
|
|
if err != nil {
|
|
t.Fatalf("err: %v", err)
|
|
}
|
|
|
|
// Update the time tables to make this work
|
|
tt := s1.fsm.TimeTable()
|
|
tt.Witness(2000, time.Now().UTC().Add(-1*s1.config.EvalGCThreshold))
|
|
|
|
// Create a core scheduler
|
|
snap, err := store.Snapshot()
|
|
if err != nil {
|
|
t.Fatalf("err: %v", err)
|
|
}
|
|
core := NewCoreScheduler(s1, snap)
|
|
|
|
// Attempt the GC
|
|
gc := s1.coreJobEval(structs.CoreJobEvalGC, 2000)
|
|
err = core.Process(gc)
|
|
if err != nil {
|
|
t.Fatalf("err: %v", err)
|
|
}
|
|
|
|
// Nothing should be gone
|
|
ws := memdb.NewWatchSet()
|
|
out, err := store.EvalByID(ws, eval.ID)
|
|
if err != nil {
|
|
t.Fatalf("err: %v", err)
|
|
}
|
|
if out == nil {
|
|
t.Fatalf("bad: %v", out)
|
|
}
|
|
|
|
outA, err := store.AllocByID(ws, alloc.ID)
|
|
if err != nil {
|
|
t.Fatalf("err: %v", err)
|
|
}
|
|
if outA == nil {
|
|
t.Fatalf("bad: %v", outA)
|
|
}
|
|
|
|
outA2, err := store.AllocByID(ws, alloc2.ID)
|
|
if err != nil {
|
|
t.Fatalf("err: %v", err)
|
|
}
|
|
if outA2 == nil {
|
|
t.Fatalf("bad: %v", outA2)
|
|
}
|
|
|
|
outB, err := store.JobByID(ws, job.Namespace, job.ID)
|
|
if err != nil {
|
|
t.Fatalf("err: %v", err)
|
|
}
|
|
if outB == nil {
|
|
t.Fatalf("bad: %v", outB)
|
|
}
|
|
}
|
|
|
|
// An EvalGC should reap allocations from jobs with an older modify index
|
|
func TestCoreScheduler_EvalGC_Batch_OldVersion(t *testing.T) {
|
|
ci.Parallel(t)
|
|
|
|
s1, cleanupS1 := TestServer(t, nil)
|
|
defer cleanupS1()
|
|
testutil.WaitForLeader(t, s1.RPC)
|
|
|
|
// COMPAT Remove in 0.6: Reset the FSM time table since we reconcile which sets index 0
|
|
s1.fsm.timetable.table = make([]TimeTableEntry, 1, 10)
|
|
|
|
// Insert a "dead" job
|
|
store := s1.fsm.State()
|
|
job := mock.Job()
|
|
job.Type = structs.JobTypeBatch
|
|
job.Status = structs.JobStatusDead
|
|
err := store.UpsertJob(structs.MsgTypeTestSetup, 1000, job)
|
|
if err != nil {
|
|
t.Fatalf("err: %v", err)
|
|
}
|
|
|
|
// Insert "complete" eval
|
|
eval := mock.Eval()
|
|
eval.Status = structs.EvalStatusComplete
|
|
eval.Type = structs.JobTypeBatch
|
|
eval.JobID = job.ID
|
|
err = store.UpsertEvals(structs.MsgTypeTestSetup, 1001, []*structs.Evaluation{eval})
|
|
if err != nil {
|
|
t.Fatalf("err: %v", err)
|
|
}
|
|
|
|
// Insert "failed" alloc
|
|
alloc := mock.Alloc()
|
|
alloc.Job = job
|
|
alloc.JobID = job.ID
|
|
alloc.EvalID = eval.ID
|
|
alloc.DesiredStatus = structs.AllocDesiredStatusStop
|
|
|
|
// Insert "lost" alloc
|
|
alloc2 := mock.Alloc()
|
|
alloc2.Job = job
|
|
alloc2.JobID = job.ID
|
|
alloc2.EvalID = eval.ID
|
|
alloc2.DesiredStatus = structs.AllocDesiredStatusRun
|
|
alloc2.ClientStatus = structs.AllocClientStatusLost
|
|
|
|
// Insert alloc with older job modifyindex
|
|
alloc3 := mock.Alloc()
|
|
job2 := job.Copy()
|
|
|
|
alloc3.Job = job2
|
|
alloc3.JobID = job2.ID
|
|
alloc3.EvalID = eval.ID
|
|
job2.CreateIndex = 500
|
|
alloc3.DesiredStatus = structs.AllocDesiredStatusRun
|
|
alloc3.ClientStatus = structs.AllocClientStatusLost
|
|
|
|
err = store.UpsertAllocs(structs.MsgTypeTestSetup, 1002, []*structs.Allocation{alloc, alloc2, alloc3})
|
|
if err != nil {
|
|
t.Fatalf("err: %v", err)
|
|
}
|
|
|
|
// Update the time tables to make this work
|
|
tt := s1.fsm.TimeTable()
|
|
tt.Witness(2000, time.Now().UTC().Add(-1*s1.config.EvalGCThreshold))
|
|
|
|
// Create a core scheduler
|
|
snap, err := store.Snapshot()
|
|
if err != nil {
|
|
t.Fatalf("err: %v", err)
|
|
}
|
|
core := NewCoreScheduler(s1, snap)
|
|
|
|
// Attempt the GC
|
|
gc := s1.coreJobEval(structs.CoreJobEvalGC, 2000)
|
|
err = core.Process(gc)
|
|
if err != nil {
|
|
t.Fatalf("err: %v", err)
|
|
}
|
|
|
|
// Alloc1 and 2 should be there, and alloc3 should be gone
|
|
ws := memdb.NewWatchSet()
|
|
out, err := store.EvalByID(ws, eval.ID)
|
|
if err != nil {
|
|
t.Fatalf("err: %v", err)
|
|
}
|
|
if out == nil {
|
|
t.Fatalf("bad: %v", out)
|
|
}
|
|
|
|
outA, err := store.AllocByID(ws, alloc.ID)
|
|
if err != nil {
|
|
t.Fatalf("err: %v", err)
|
|
}
|
|
if outA == nil {
|
|
t.Fatalf("bad: %v", outA)
|
|
}
|
|
|
|
outA2, err := store.AllocByID(ws, alloc2.ID)
|
|
if err != nil {
|
|
t.Fatalf("err: %v", err)
|
|
}
|
|
if outA2 == nil {
|
|
t.Fatalf("bad: %v", outA2)
|
|
}
|
|
|
|
outA3, err := store.AllocByID(ws, alloc3.ID)
|
|
if err != nil {
|
|
t.Fatalf("err: %v", err)
|
|
}
|
|
if outA3 != nil {
|
|
t.Fatalf("expected alloc to be nil:%v", outA2)
|
|
}
|
|
|
|
outB, err := store.JobByID(ws, job.Namespace, job.ID)
|
|
if err != nil {
|
|
t.Fatalf("err: %v", err)
|
|
}
|
|
if outB == nil {
|
|
t.Fatalf("bad: %v", outB)
|
|
}
|
|
}
|
|
|
|
// An EvalGC should reap a batch job that has been stopped
|
|
func TestCoreScheduler_EvalGC_BatchStopped(t *testing.T) {
|
|
ci.Parallel(t)
|
|
|
|
s1, cleanupS1 := TestServer(t, nil)
|
|
defer cleanupS1()
|
|
testutil.WaitForLeader(t, s1.RPC)
|
|
|
|
// COMPAT Remove in 0.6: Reset the FSM time table since we reconcile which sets index 0
|
|
s1.fsm.timetable.table = make([]TimeTableEntry, 1, 10)
|
|
|
|
// Create a "dead" job
|
|
store := s1.fsm.State()
|
|
job := mock.Job()
|
|
job.Type = structs.JobTypeBatch
|
|
job.Status = structs.JobStatusDead
|
|
job.Stop = true
|
|
job.TaskGroups[0].ReschedulePolicy = &structs.ReschedulePolicy{
|
|
Attempts: 0,
|
|
Interval: 0 * time.Second,
|
|
}
|
|
err := store.UpsertJob(structs.MsgTypeTestSetup, 1001, job)
|
|
require.Nil(t, err)
|
|
|
|
// Insert "complete" eval
|
|
eval := mock.Eval()
|
|
eval.Status = structs.EvalStatusComplete
|
|
eval.Type = structs.JobTypeBatch
|
|
eval.JobID = job.ID
|
|
err = store.UpsertEvals(structs.MsgTypeTestSetup, 1002, []*structs.Evaluation{eval})
|
|
require.Nil(t, err)
|
|
|
|
// Insert "failed" alloc
|
|
alloc := mock.Alloc()
|
|
alloc.JobID = job.ID
|
|
alloc.EvalID = eval.ID
|
|
alloc.TaskGroup = job.TaskGroups[0].Name
|
|
alloc.DesiredStatus = structs.AllocDesiredStatusStop
|
|
|
|
// Insert "lost" alloc
|
|
alloc2 := mock.Alloc()
|
|
alloc2.JobID = job.ID
|
|
alloc2.EvalID = eval.ID
|
|
alloc2.DesiredStatus = structs.AllocDesiredStatusRun
|
|
alloc2.ClientStatus = structs.AllocClientStatusLost
|
|
alloc2.TaskGroup = job.TaskGroups[0].Name
|
|
|
|
err = store.UpsertAllocs(structs.MsgTypeTestSetup, 1003, []*structs.Allocation{alloc, alloc2})
|
|
if err != nil {
|
|
t.Fatalf("err: %v", err)
|
|
}
|
|
|
|
// Update the time tables to make this work
|
|
tt := s1.fsm.TimeTable()
|
|
tt.Witness(2000, time.Now().UTC().Add(-1*s1.config.EvalGCThreshold))
|
|
|
|
// Create a core scheduler
|
|
snap, err := store.Snapshot()
|
|
if err != nil {
|
|
t.Fatalf("err: %v", err)
|
|
}
|
|
core := NewCoreScheduler(s1, snap)
|
|
|
|
// Attempt the GC
|
|
gc := s1.coreJobEval(structs.CoreJobEvalGC, 2000)
|
|
err = core.Process(gc)
|
|
if err != nil {
|
|
t.Fatalf("err: %v", err)
|
|
}
|
|
|
|
// Everything should be gone
|
|
ws := memdb.NewWatchSet()
|
|
out, err := store.EvalByID(ws, eval.ID)
|
|
if err != nil {
|
|
t.Fatalf("err: %v", err)
|
|
}
|
|
if out != nil {
|
|
t.Fatalf("bad: %v", out)
|
|
}
|
|
|
|
outA, err := store.AllocByID(ws, alloc.ID)
|
|
if err != nil {
|
|
t.Fatalf("err: %v", err)
|
|
}
|
|
if outA != nil {
|
|
t.Fatalf("bad: %v", outA)
|
|
}
|
|
|
|
outA2, err := store.AllocByID(ws, alloc2.ID)
|
|
if err != nil {
|
|
t.Fatalf("err: %v", err)
|
|
}
|
|
if outA2 != nil {
|
|
t.Fatalf("bad: %v", outA2)
|
|
}
|
|
}
|
|
|
|
func TestCoreScheduler_EvalGC_Partial(t *testing.T) {
|
|
ci.Parallel(t)
|
|
|
|
s1, cleanupS1 := TestServer(t, nil)
|
|
defer cleanupS1()
|
|
testutil.WaitForLeader(t, s1.RPC)
|
|
|
|
// COMPAT Remove in 0.6: Reset the FSM time table since we reconcile which sets index 0
|
|
s1.fsm.timetable.table = make([]TimeTableEntry, 1, 10)
|
|
|
|
// Insert "dead" eval
|
|
store := s1.fsm.State()
|
|
eval := mock.Eval()
|
|
eval.Status = structs.EvalStatusComplete
|
|
store.UpsertJobSummary(999, mock.JobSummary(eval.JobID))
|
|
err := store.UpsertEvals(structs.MsgTypeTestSetup, 1000, []*structs.Evaluation{eval})
|
|
if err != nil {
|
|
t.Fatalf("err: %v", err)
|
|
}
|
|
|
|
// Create mock job with id same as eval
|
|
job := mock.Job()
|
|
job.ID = eval.JobID
|
|
|
|
// Insert "dead" alloc
|
|
alloc := mock.Alloc()
|
|
alloc.JobID = job.ID
|
|
alloc.EvalID = eval.ID
|
|
alloc.DesiredStatus = structs.AllocDesiredStatusStop
|
|
alloc.TaskGroup = job.TaskGroups[0].Name
|
|
store.UpsertJobSummary(1001, mock.JobSummary(alloc.JobID))
|
|
|
|
// Insert "lost" alloc
|
|
alloc2 := mock.Alloc()
|
|
alloc2.JobID = job.ID
|
|
alloc2.EvalID = eval.ID
|
|
alloc2.TaskGroup = job.TaskGroups[0].Name
|
|
alloc2.DesiredStatus = structs.AllocDesiredStatusRun
|
|
alloc2.ClientStatus = structs.AllocClientStatusLost
|
|
|
|
err = store.UpsertAllocs(structs.MsgTypeTestSetup, 1002, []*structs.Allocation{alloc, alloc2})
|
|
if err != nil {
|
|
t.Fatalf("err: %v", err)
|
|
}
|
|
|
|
// Insert "running" alloc
|
|
alloc3 := mock.Alloc()
|
|
alloc3.EvalID = eval.ID
|
|
alloc3.JobID = job.ID
|
|
store.UpsertJobSummary(1003, mock.JobSummary(alloc3.JobID))
|
|
err = store.UpsertAllocs(structs.MsgTypeTestSetup, 1004, []*structs.Allocation{alloc3})
|
|
if err != nil {
|
|
t.Fatalf("err: %v", err)
|
|
}
|
|
|
|
// Insert mock job with rescheduling disabled
|
|
job.TaskGroups[0].ReschedulePolicy = &structs.ReschedulePolicy{
|
|
Attempts: 0,
|
|
Interval: 0 * time.Second,
|
|
}
|
|
err = store.UpsertJob(structs.MsgTypeTestSetup, 1001, job)
|
|
require.Nil(t, err)
|
|
|
|
// Update the time tables to make this work
|
|
tt := s1.fsm.TimeTable()
|
|
tt.Witness(2000, time.Now().UTC().Add(-1*s1.config.EvalGCThreshold))
|
|
|
|
// Create a core scheduler
|
|
snap, err := store.Snapshot()
|
|
if err != nil {
|
|
t.Fatalf("err: %v", err)
|
|
}
|
|
core := NewCoreScheduler(s1, snap)
|
|
|
|
// Attempt the GC
|
|
gc := s1.coreJobEval(structs.CoreJobEvalGC, 2000)
|
|
err = core.Process(gc)
|
|
if err != nil {
|
|
t.Fatalf("err: %v", err)
|
|
}
|
|
|
|
// Should not be gone
|
|
ws := memdb.NewWatchSet()
|
|
out, err := store.EvalByID(ws, eval.ID)
|
|
if err != nil {
|
|
t.Fatalf("err: %v", err)
|
|
}
|
|
if out == nil {
|
|
t.Fatalf("bad: %v", out)
|
|
}
|
|
|
|
outA, err := store.AllocByID(ws, alloc3.ID)
|
|
if err != nil {
|
|
t.Fatalf("err: %v", err)
|
|
}
|
|
if outA == nil {
|
|
t.Fatalf("bad: %v", outA)
|
|
}
|
|
|
|
// Should be gone
|
|
outB, err := store.AllocByID(ws, alloc.ID)
|
|
if err != nil {
|
|
t.Fatalf("err: %v", err)
|
|
}
|
|
if outB != nil {
|
|
t.Fatalf("bad: %v", outB)
|
|
}
|
|
|
|
outC, err := store.AllocByID(ws, alloc2.ID)
|
|
if err != nil {
|
|
t.Fatalf("err: %v", err)
|
|
}
|
|
if outC != nil {
|
|
t.Fatalf("bad: %v", outC)
|
|
}
|
|
}
|
|
|
|
func TestCoreScheduler_EvalGC_Force(t *testing.T) {
|
|
ci.Parallel(t)
|
|
for _, withAcl := range []bool{false, true} {
|
|
t.Run(fmt.Sprintf("with acl %v", withAcl), func(t *testing.T) {
|
|
var server *Server
|
|
var cleanup func()
|
|
if withAcl {
|
|
server, _, cleanup = TestACLServer(t, nil)
|
|
} else {
|
|
server, cleanup = TestServer(t, nil)
|
|
}
|
|
defer cleanup()
|
|
testutil.WaitForLeader(t, server.RPC)
|
|
|
|
// COMPAT Remove in 0.6: Reset the FSM time table since we reconcile which sets index 0
|
|
server.fsm.timetable.table = make([]TimeTableEntry, 1, 10)
|
|
|
|
// Insert "dead" eval
|
|
store := server.fsm.State()
|
|
eval := mock.Eval()
|
|
eval.Status = structs.EvalStatusFailed
|
|
store.UpsertJobSummary(999, mock.JobSummary(eval.JobID))
|
|
err := store.UpsertEvals(structs.MsgTypeTestSetup, 1000, []*structs.Evaluation{eval})
|
|
if err != nil {
|
|
t.Fatalf("err: %v", err)
|
|
}
|
|
|
|
// Insert mock job with rescheduling disabled
|
|
job := mock.Job()
|
|
job.ID = eval.JobID
|
|
job.TaskGroups[0].ReschedulePolicy = &structs.ReschedulePolicy{
|
|
Attempts: 0,
|
|
Interval: 0 * time.Second,
|
|
}
|
|
err = store.UpsertJob(structs.MsgTypeTestSetup, 1001, job)
|
|
require.Nil(t, err)
|
|
|
|
// Insert "dead" alloc
|
|
alloc := mock.Alloc()
|
|
alloc.EvalID = eval.ID
|
|
alloc.DesiredStatus = structs.AllocDesiredStatusStop
|
|
alloc.TaskGroup = job.TaskGroups[0].Name
|
|
store.UpsertJobSummary(1001, mock.JobSummary(alloc.JobID))
|
|
err = store.UpsertAllocs(structs.MsgTypeTestSetup, 1002, []*structs.Allocation{alloc})
|
|
if err != nil {
|
|
t.Fatalf("err: %v", err)
|
|
}
|
|
|
|
// Create a core scheduler
|
|
snap, err := store.Snapshot()
|
|
if err != nil {
|
|
t.Fatalf("err: %v", err)
|
|
}
|
|
core := NewCoreScheduler(server, snap)
|
|
|
|
// Attempt the GC
|
|
gc := server.coreJobEval(structs.CoreJobForceGC, 1002)
|
|
err = core.Process(gc)
|
|
if err != nil {
|
|
t.Fatalf("err: %v", err)
|
|
}
|
|
|
|
// Should be gone
|
|
ws := memdb.NewWatchSet()
|
|
out, err := store.EvalByID(ws, eval.ID)
|
|
if err != nil {
|
|
t.Fatalf("err: %v", err)
|
|
}
|
|
if out != nil {
|
|
t.Fatalf("bad: %v", out)
|
|
}
|
|
|
|
outA, err := store.AllocByID(ws, alloc.ID)
|
|
if err != nil {
|
|
t.Fatalf("err: %v", err)
|
|
}
|
|
if outA != nil {
|
|
t.Fatalf("bad: %v", outA)
|
|
}
|
|
})
|
|
}
|
|
}
|
|
|
|
func TestCoreScheduler_NodeGC(t *testing.T) {
|
|
ci.Parallel(t)
|
|
for _, withAcl := range []bool{false, true} {
|
|
t.Run(fmt.Sprintf("with acl %v", withAcl), func(t *testing.T) {
|
|
var server *Server
|
|
var cleanup func()
|
|
if withAcl {
|
|
server, _, cleanup = TestACLServer(t, nil)
|
|
} else {
|
|
server, cleanup = TestServer(t, nil)
|
|
}
|
|
defer cleanup()
|
|
testutil.WaitForLeader(t, server.RPC)
|
|
|
|
// COMPAT Remove in 0.6: Reset the FSM time table since we reconcile which sets index 0
|
|
server.fsm.timetable.table = make([]TimeTableEntry, 1, 10)
|
|
|
|
// Insert "dead" node
|
|
store := server.fsm.State()
|
|
node := mock.Node()
|
|
node.Status = structs.NodeStatusDown
|
|
err := store.UpsertNode(structs.MsgTypeTestSetup, 1000, node)
|
|
if err != nil {
|
|
t.Fatalf("err: %v", err)
|
|
}
|
|
|
|
// Update the time tables to make this work
|
|
tt := server.fsm.TimeTable()
|
|
tt.Witness(2000, time.Now().UTC().Add(-1*server.config.NodeGCThreshold))
|
|
|
|
// Create a core scheduler
|
|
snap, err := store.Snapshot()
|
|
if err != nil {
|
|
t.Fatalf("err: %v", err)
|
|
}
|
|
core := NewCoreScheduler(server, snap)
|
|
|
|
// Attempt the GC
|
|
gc := server.coreJobEval(structs.CoreJobNodeGC, 2000)
|
|
err = core.Process(gc)
|
|
if err != nil {
|
|
t.Fatalf("err: %v", err)
|
|
}
|
|
|
|
// Should be gone
|
|
ws := memdb.NewWatchSet()
|
|
out, err := store.NodeByID(ws, node.ID)
|
|
if err != nil {
|
|
t.Fatalf("err: %v", err)
|
|
}
|
|
if out != nil {
|
|
t.Fatalf("bad: %v", out)
|
|
}
|
|
})
|
|
}
|
|
}
|
|
|
|
func TestCoreScheduler_NodeGC_TerminalAllocs(t *testing.T) {
|
|
ci.Parallel(t)
|
|
|
|
s1, cleanupS1 := TestServer(t, nil)
|
|
defer cleanupS1()
|
|
testutil.WaitForLeader(t, s1.RPC)
|
|
|
|
// COMPAT Remove in 0.6: Reset the FSM time table since we reconcile which sets index 0
|
|
s1.fsm.timetable.table = make([]TimeTableEntry, 1, 10)
|
|
|
|
// Insert "dead" node
|
|
store := s1.fsm.State()
|
|
node := mock.Node()
|
|
node.Status = structs.NodeStatusDown
|
|
err := store.UpsertNode(structs.MsgTypeTestSetup, 1000, node)
|
|
if err != nil {
|
|
t.Fatalf("err: %v", err)
|
|
}
|
|
|
|
// Insert a terminal alloc on that node
|
|
alloc := mock.Alloc()
|
|
alloc.DesiredStatus = structs.AllocDesiredStatusStop
|
|
store.UpsertJobSummary(1001, mock.JobSummary(alloc.JobID))
|
|
if err := store.UpsertAllocs(structs.MsgTypeTestSetup, 1002, []*structs.Allocation{alloc}); err != nil {
|
|
t.Fatalf("err: %v", err)
|
|
}
|
|
|
|
// Update the time tables to make this work
|
|
tt := s1.fsm.TimeTable()
|
|
tt.Witness(2000, time.Now().UTC().Add(-1*s1.config.NodeGCThreshold))
|
|
|
|
// Create a core scheduler
|
|
snap, err := store.Snapshot()
|
|
if err != nil {
|
|
t.Fatalf("err: %v", err)
|
|
}
|
|
core := NewCoreScheduler(s1, snap)
|
|
|
|
// Attempt the GC
|
|
gc := s1.coreJobEval(structs.CoreJobNodeGC, 2000)
|
|
err = core.Process(gc)
|
|
if err != nil {
|
|
t.Fatalf("err: %v", err)
|
|
}
|
|
|
|
// Should be gone
|
|
ws := memdb.NewWatchSet()
|
|
out, err := store.NodeByID(ws, node.ID)
|
|
if err != nil {
|
|
t.Fatalf("err: %v", err)
|
|
}
|
|
if out != nil {
|
|
t.Fatalf("bad: %v", out)
|
|
}
|
|
}
|
|
|
|
func TestCoreScheduler_NodeGC_RunningAllocs(t *testing.T) {
|
|
ci.Parallel(t)
|
|
|
|
s1, cleanupS1 := TestServer(t, nil)
|
|
defer cleanupS1()
|
|
testutil.WaitForLeader(t, s1.RPC)
|
|
|
|
// COMPAT Remove in 0.6: Reset the FSM time table since we reconcile which sets index 0
|
|
s1.fsm.timetable.table = make([]TimeTableEntry, 1, 10)
|
|
|
|
// Insert "dead" node
|
|
store := s1.fsm.State()
|
|
node := mock.Node()
|
|
node.Status = structs.NodeStatusDown
|
|
err := store.UpsertNode(structs.MsgTypeTestSetup, 1000, node)
|
|
if err != nil {
|
|
t.Fatalf("err: %v", err)
|
|
}
|
|
|
|
// Insert a running alloc on that node
|
|
alloc := mock.Alloc()
|
|
alloc.NodeID = node.ID
|
|
alloc.DesiredStatus = structs.AllocDesiredStatusRun
|
|
alloc.ClientStatus = structs.AllocClientStatusRunning
|
|
store.UpsertJobSummary(1001, mock.JobSummary(alloc.JobID))
|
|
if err := store.UpsertAllocs(structs.MsgTypeTestSetup, 1002, []*structs.Allocation{alloc}); err != nil {
|
|
t.Fatalf("err: %v", err)
|
|
}
|
|
|
|
// Update the time tables to make this work
|
|
tt := s1.fsm.TimeTable()
|
|
tt.Witness(2000, time.Now().UTC().Add(-1*s1.config.NodeGCThreshold))
|
|
|
|
// Create a core scheduler
|
|
snap, err := store.Snapshot()
|
|
if err != nil {
|
|
t.Fatalf("err: %v", err)
|
|
}
|
|
core := NewCoreScheduler(s1, snap)
|
|
|
|
// Attempt the GC
|
|
gc := s1.coreJobEval(structs.CoreJobNodeGC, 2000)
|
|
err = core.Process(gc)
|
|
if err != nil {
|
|
t.Fatalf("err: %v", err)
|
|
}
|
|
|
|
// Should still be here
|
|
ws := memdb.NewWatchSet()
|
|
out, err := store.NodeByID(ws, node.ID)
|
|
if err != nil {
|
|
t.Fatalf("err: %v", err)
|
|
}
|
|
if out == nil {
|
|
t.Fatalf("bad: %v", out)
|
|
}
|
|
}
|
|
|
|
func TestCoreScheduler_NodeGC_Force(t *testing.T) {
|
|
ci.Parallel(t)
|
|
|
|
s1, cleanupS1 := TestServer(t, nil)
|
|
defer cleanupS1()
|
|
testutil.WaitForLeader(t, s1.RPC)
|
|
|
|
// COMPAT Remove in 0.6: Reset the FSM time table since we reconcile which sets index 0
|
|
s1.fsm.timetable.table = make([]TimeTableEntry, 1, 10)
|
|
|
|
// Insert "dead" node
|
|
store := s1.fsm.State()
|
|
node := mock.Node()
|
|
node.Status = structs.NodeStatusDown
|
|
err := store.UpsertNode(structs.MsgTypeTestSetup, 1000, node)
|
|
if err != nil {
|
|
t.Fatalf("err: %v", err)
|
|
}
|
|
|
|
// Create a core scheduler
|
|
snap, err := store.Snapshot()
|
|
if err != nil {
|
|
t.Fatalf("err: %v", err)
|
|
}
|
|
core := NewCoreScheduler(s1, snap)
|
|
|
|
// Attempt the GC
|
|
gc := s1.coreJobEval(structs.CoreJobForceGC, 1000)
|
|
err = core.Process(gc)
|
|
if err != nil {
|
|
t.Fatalf("err: %v", err)
|
|
}
|
|
|
|
// Should be gone
|
|
ws := memdb.NewWatchSet()
|
|
out, err := store.NodeByID(ws, node.ID)
|
|
if err != nil {
|
|
t.Fatalf("err: %v", err)
|
|
}
|
|
if out != nil {
|
|
t.Fatalf("bad: %v", out)
|
|
}
|
|
}
|
|
|
|
func TestCoreScheduler_JobGC_OutstandingEvals(t *testing.T) {
|
|
ci.Parallel(t)
|
|
|
|
s1, cleanupS1 := TestServer(t, nil)
|
|
defer cleanupS1()
|
|
testutil.WaitForLeader(t, s1.RPC)
|
|
|
|
// COMPAT Remove in 0.6: Reset the FSM time table since we reconcile which sets index 0
|
|
s1.fsm.timetable.table = make([]TimeTableEntry, 1, 10)
|
|
|
|
// Insert job.
|
|
store := s1.fsm.State()
|
|
job := mock.Job()
|
|
job.Type = structs.JobTypeBatch
|
|
job.Status = structs.JobStatusDead
|
|
err := store.UpsertJob(structs.MsgTypeTestSetup, 1000, job)
|
|
if err != nil {
|
|
t.Fatalf("err: %v", err)
|
|
}
|
|
|
|
// Insert two evals, one terminal and one not
|
|
eval := mock.Eval()
|
|
eval.JobID = job.ID
|
|
eval.Status = structs.EvalStatusComplete
|
|
|
|
eval2 := mock.Eval()
|
|
eval2.JobID = job.ID
|
|
eval2.Status = structs.EvalStatusPending
|
|
err = store.UpsertEvals(structs.MsgTypeTestSetup, 1001, []*structs.Evaluation{eval, eval2})
|
|
if err != nil {
|
|
t.Fatalf("err: %v", err)
|
|
}
|
|
|
|
// Update the time tables to make this work
|
|
tt := s1.fsm.TimeTable()
|
|
tt.Witness(2000, time.Now().UTC().Add(-1*s1.config.JobGCThreshold))
|
|
|
|
// Create a core scheduler
|
|
snap, err := store.Snapshot()
|
|
if err != nil {
|
|
t.Fatalf("err: %v", err)
|
|
}
|
|
core := NewCoreScheduler(s1, snap)
|
|
|
|
// Attempt the GC
|
|
gc := s1.coreJobEval(structs.CoreJobJobGC, 2000)
|
|
err = core.Process(gc)
|
|
if err != nil {
|
|
t.Fatalf("err: %v", err)
|
|
}
|
|
|
|
// Should still exist
|
|
ws := memdb.NewWatchSet()
|
|
out, err := store.JobByID(ws, job.Namespace, job.ID)
|
|
if err != nil {
|
|
t.Fatalf("err: %v", err)
|
|
}
|
|
if out == nil {
|
|
t.Fatalf("bad: %v", out)
|
|
}
|
|
|
|
outE, err := store.EvalByID(ws, eval.ID)
|
|
if err != nil {
|
|
t.Fatalf("err: %v", err)
|
|
}
|
|
if outE == nil {
|
|
t.Fatalf("bad: %v", outE)
|
|
}
|
|
|
|
outE2, err := store.EvalByID(ws, eval2.ID)
|
|
if err != nil {
|
|
t.Fatalf("err: %v", err)
|
|
}
|
|
if outE2 == nil {
|
|
t.Fatalf("bad: %v", outE2)
|
|
}
|
|
|
|
// Update the second eval to be terminal
|
|
eval2.Status = structs.EvalStatusComplete
|
|
err = store.UpsertEvals(structs.MsgTypeTestSetup, 1003, []*structs.Evaluation{eval2})
|
|
if err != nil {
|
|
t.Fatalf("err: %v", err)
|
|
}
|
|
|
|
// Create a core scheduler
|
|
snap, err = store.Snapshot()
|
|
if err != nil {
|
|
t.Fatalf("err: %v", err)
|
|
}
|
|
core = NewCoreScheduler(s1, snap)
|
|
|
|
// Attempt the GC
|
|
gc = s1.coreJobEval(structs.CoreJobJobGC, 2000)
|
|
err = core.Process(gc)
|
|
if err != nil {
|
|
t.Fatalf("err: %v", err)
|
|
}
|
|
|
|
// Should not still exist
|
|
out, err = store.JobByID(ws, job.Namespace, job.ID)
|
|
if err != nil {
|
|
t.Fatalf("err: %v", err)
|
|
}
|
|
if out != nil {
|
|
t.Fatalf("bad: %v", out)
|
|
}
|
|
|
|
outE, err = store.EvalByID(ws, eval.ID)
|
|
if err != nil {
|
|
t.Fatalf("err: %v", err)
|
|
}
|
|
if outE != nil {
|
|
t.Fatalf("bad: %v", outE)
|
|
}
|
|
|
|
outE2, err = store.EvalByID(ws, eval2.ID)
|
|
if err != nil {
|
|
t.Fatalf("err: %v", err)
|
|
}
|
|
if outE2 != nil {
|
|
t.Fatalf("bad: %v", outE2)
|
|
}
|
|
}
|
|
|
|
func TestCoreScheduler_JobGC_OutstandingAllocs(t *testing.T) {
|
|
ci.Parallel(t)
|
|
|
|
s1, cleanupS1 := TestServer(t, nil)
|
|
defer cleanupS1()
|
|
testutil.WaitForLeader(t, s1.RPC)
|
|
|
|
// COMPAT Remove in 0.6: Reset the FSM time table since we reconcile which sets index 0
|
|
s1.fsm.timetable.table = make([]TimeTableEntry, 1, 10)
|
|
|
|
// Insert job.
|
|
store := s1.fsm.State()
|
|
job := mock.Job()
|
|
job.Type = structs.JobTypeBatch
|
|
job.Status = structs.JobStatusDead
|
|
job.TaskGroups[0].ReschedulePolicy = &structs.ReschedulePolicy{
|
|
Attempts: 0,
|
|
Interval: 0 * time.Second,
|
|
}
|
|
err := store.UpsertJob(structs.MsgTypeTestSetup, 1000, job)
|
|
if err != nil {
|
|
t.Fatalf("err: %v", err)
|
|
}
|
|
|
|
// Insert an eval
|
|
eval := mock.Eval()
|
|
eval.JobID = job.ID
|
|
eval.Status = structs.EvalStatusComplete
|
|
err = store.UpsertEvals(structs.MsgTypeTestSetup, 1001, []*structs.Evaluation{eval})
|
|
if err != nil {
|
|
t.Fatalf("err: %v", err)
|
|
}
|
|
|
|
// Insert two allocs, one terminal and one not
|
|
alloc := mock.Alloc()
|
|
alloc.JobID = job.ID
|
|
alloc.EvalID = eval.ID
|
|
alloc.DesiredStatus = structs.AllocDesiredStatusRun
|
|
alloc.ClientStatus = structs.AllocClientStatusComplete
|
|
alloc.TaskGroup = job.TaskGroups[0].Name
|
|
|
|
alloc2 := mock.Alloc()
|
|
alloc2.JobID = job.ID
|
|
alloc2.EvalID = eval.ID
|
|
alloc2.DesiredStatus = structs.AllocDesiredStatusRun
|
|
alloc2.ClientStatus = structs.AllocClientStatusRunning
|
|
alloc2.TaskGroup = job.TaskGroups[0].Name
|
|
|
|
err = store.UpsertAllocs(structs.MsgTypeTestSetup, 1002, []*structs.Allocation{alloc, alloc2})
|
|
if err != nil {
|
|
t.Fatalf("err: %v", err)
|
|
}
|
|
|
|
// Update the time tables to make this work
|
|
tt := s1.fsm.TimeTable()
|
|
tt.Witness(2000, time.Now().UTC().Add(-1*s1.config.JobGCThreshold))
|
|
|
|
// Create a core scheduler
|
|
snap, err := store.Snapshot()
|
|
if err != nil {
|
|
t.Fatalf("err: %v", err)
|
|
}
|
|
core := NewCoreScheduler(s1, snap)
|
|
|
|
// Attempt the GC
|
|
gc := s1.coreJobEval(structs.CoreJobJobGC, 2000)
|
|
err = core.Process(gc)
|
|
if err != nil {
|
|
t.Fatalf("err: %v", err)
|
|
}
|
|
|
|
// Should still exist
|
|
ws := memdb.NewWatchSet()
|
|
out, err := store.JobByID(ws, job.Namespace, job.ID)
|
|
if err != nil {
|
|
t.Fatalf("err: %v", err)
|
|
}
|
|
if out == nil {
|
|
t.Fatalf("bad: %v", out)
|
|
}
|
|
|
|
outA, err := store.AllocByID(ws, alloc.ID)
|
|
if err != nil {
|
|
t.Fatalf("err: %v", err)
|
|
}
|
|
if outA == nil {
|
|
t.Fatalf("bad: %v", outA)
|
|
}
|
|
|
|
outA2, err := store.AllocByID(ws, alloc2.ID)
|
|
if err != nil {
|
|
t.Fatalf("err: %v", err)
|
|
}
|
|
if outA2 == nil {
|
|
t.Fatalf("bad: %v", outA2)
|
|
}
|
|
|
|
// Update the second alloc to be terminal
|
|
alloc2.ClientStatus = structs.AllocClientStatusComplete
|
|
err = store.UpsertAllocs(structs.MsgTypeTestSetup, 1003, []*structs.Allocation{alloc2})
|
|
if err != nil {
|
|
t.Fatalf("err: %v", err)
|
|
}
|
|
|
|
// Create a core scheduler
|
|
snap, err = store.Snapshot()
|
|
if err != nil {
|
|
t.Fatalf("err: %v", err)
|
|
}
|
|
core = NewCoreScheduler(s1, snap)
|
|
|
|
// Attempt the GC
|
|
gc = s1.coreJobEval(structs.CoreJobJobGC, 2000)
|
|
err = core.Process(gc)
|
|
if err != nil {
|
|
t.Fatalf("err: %v", err)
|
|
}
|
|
|
|
// Should not still exist
|
|
out, err = store.JobByID(ws, job.Namespace, job.ID)
|
|
if err != nil {
|
|
t.Fatalf("err: %v", err)
|
|
}
|
|
if out != nil {
|
|
t.Fatalf("bad: %v", out)
|
|
}
|
|
|
|
outA, err = store.AllocByID(ws, alloc.ID)
|
|
if err != nil {
|
|
t.Fatalf("err: %v", err)
|
|
}
|
|
if outA != nil {
|
|
t.Fatalf("bad: %v", outA)
|
|
}
|
|
|
|
outA2, err = store.AllocByID(ws, alloc2.ID)
|
|
if err != nil {
|
|
t.Fatalf("err: %v", err)
|
|
}
|
|
if outA2 != nil {
|
|
t.Fatalf("bad: %v", outA2)
|
|
}
|
|
}
|
|
|
|
// This test ensures that batch jobs are GC'd in one shot, meaning it all
|
|
// allocs/evals and job or nothing
|
|
func TestCoreScheduler_JobGC_OneShot(t *testing.T) {
|
|
ci.Parallel(t)
|
|
|
|
s1, cleanupS1 := TestServer(t, nil)
|
|
defer cleanupS1()
|
|
testutil.WaitForLeader(t, s1.RPC)
|
|
|
|
// COMPAT Remove in 0.6: Reset the FSM time table since we reconcile which sets index 0
|
|
s1.fsm.timetable.table = make([]TimeTableEntry, 1, 10)
|
|
|
|
// Insert job.
|
|
store := s1.fsm.State()
|
|
job := mock.Job()
|
|
job.Type = structs.JobTypeBatch
|
|
err := store.UpsertJob(structs.MsgTypeTestSetup, 1000, job)
|
|
if err != nil {
|
|
t.Fatalf("err: %v", err)
|
|
}
|
|
|
|
// Insert two complete evals
|
|
eval := mock.Eval()
|
|
eval.JobID = job.ID
|
|
eval.Status = structs.EvalStatusComplete
|
|
|
|
eval2 := mock.Eval()
|
|
eval2.JobID = job.ID
|
|
eval2.Status = structs.EvalStatusComplete
|
|
|
|
err = store.UpsertEvals(structs.MsgTypeTestSetup, 1001, []*structs.Evaluation{eval, eval2})
|
|
if err != nil {
|
|
t.Fatalf("err: %v", err)
|
|
}
|
|
|
|
// Insert one complete alloc and one running on distinct evals
|
|
alloc := mock.Alloc()
|
|
alloc.JobID = job.ID
|
|
alloc.EvalID = eval.ID
|
|
alloc.DesiredStatus = structs.AllocDesiredStatusStop
|
|
|
|
alloc2 := mock.Alloc()
|
|
alloc2.JobID = job.ID
|
|
alloc2.EvalID = eval2.ID
|
|
alloc2.DesiredStatus = structs.AllocDesiredStatusRun
|
|
|
|
err = store.UpsertAllocs(structs.MsgTypeTestSetup, 1002, []*structs.Allocation{alloc, alloc2})
|
|
if err != nil {
|
|
t.Fatalf("err: %v", err)
|
|
}
|
|
|
|
// Force the jobs state to dead
|
|
job.Status = structs.JobStatusDead
|
|
|
|
// Update the time tables to make this work
|
|
tt := s1.fsm.TimeTable()
|
|
tt.Witness(2000, time.Now().UTC().Add(-1*s1.config.JobGCThreshold))
|
|
|
|
// Create a core scheduler
|
|
snap, err := store.Snapshot()
|
|
if err != nil {
|
|
t.Fatalf("err: %v", err)
|
|
}
|
|
core := NewCoreScheduler(s1, snap)
|
|
|
|
// Attempt the GC
|
|
gc := s1.coreJobEval(structs.CoreJobJobGC, 2000)
|
|
err = core.Process(gc)
|
|
if err != nil {
|
|
t.Fatalf("err: %v", err)
|
|
}
|
|
|
|
// Should still exist
|
|
ws := memdb.NewWatchSet()
|
|
out, err := store.JobByID(ws, job.Namespace, job.ID)
|
|
if err != nil {
|
|
t.Fatalf("err: %v", err)
|
|
}
|
|
if out == nil {
|
|
t.Fatalf("bad: %v", out)
|
|
}
|
|
|
|
outE, err := store.EvalByID(ws, eval.ID)
|
|
if err != nil {
|
|
t.Fatalf("err: %v", err)
|
|
}
|
|
if outE == nil {
|
|
t.Fatalf("bad: %v", outE)
|
|
}
|
|
|
|
outE2, err := store.EvalByID(ws, eval2.ID)
|
|
if err != nil {
|
|
t.Fatalf("err: %v", err)
|
|
}
|
|
if outE2 == nil {
|
|
t.Fatalf("bad: %v", outE2)
|
|
}
|
|
|
|
outA, err := store.AllocByID(ws, alloc.ID)
|
|
if err != nil {
|
|
t.Fatalf("err: %v", err)
|
|
}
|
|
if outA == nil {
|
|
t.Fatalf("bad: %v", outA)
|
|
}
|
|
outA2, err := store.AllocByID(ws, alloc2.ID)
|
|
if err != nil {
|
|
t.Fatalf("err: %v", err)
|
|
}
|
|
if outA2 == nil {
|
|
t.Fatalf("bad: %v", outA2)
|
|
}
|
|
}
|
|
|
|
// This test ensures that stopped jobs are GCd
|
|
func TestCoreScheduler_JobGC_Stopped(t *testing.T) {
|
|
ci.Parallel(t)
|
|
|
|
s1, cleanupS1 := TestServer(t, nil)
|
|
defer cleanupS1()
|
|
testutil.WaitForLeader(t, s1.RPC)
|
|
|
|
// COMPAT Remove in 0.6: Reset the FSM time table since we reconcile which sets index 0
|
|
s1.fsm.timetable.table = make([]TimeTableEntry, 1, 10)
|
|
|
|
// Insert job.
|
|
store := s1.fsm.State()
|
|
job := mock.Job()
|
|
job.Stop = true
|
|
job.TaskGroups[0].ReschedulePolicy = &structs.ReschedulePolicy{
|
|
Attempts: 0,
|
|
Interval: 0 * time.Second,
|
|
}
|
|
err := store.UpsertJob(structs.MsgTypeTestSetup, 1000, job)
|
|
if err != nil {
|
|
t.Fatalf("err: %v", err)
|
|
}
|
|
|
|
// Insert two complete evals
|
|
eval := mock.Eval()
|
|
eval.JobID = job.ID
|
|
eval.Status = structs.EvalStatusComplete
|
|
|
|
eval2 := mock.Eval()
|
|
eval2.JobID = job.ID
|
|
eval2.Status = structs.EvalStatusComplete
|
|
|
|
err = store.UpsertEvals(structs.MsgTypeTestSetup, 1001, []*structs.Evaluation{eval, eval2})
|
|
if err != nil {
|
|
t.Fatalf("err: %v", err)
|
|
}
|
|
|
|
// Insert one complete alloc
|
|
alloc := mock.Alloc()
|
|
alloc.JobID = job.ID
|
|
alloc.EvalID = eval.ID
|
|
alloc.DesiredStatus = structs.AllocDesiredStatusStop
|
|
alloc.TaskGroup = job.TaskGroups[0].Name
|
|
err = store.UpsertAllocs(structs.MsgTypeTestSetup, 1002, []*structs.Allocation{alloc})
|
|
if err != nil {
|
|
t.Fatalf("err: %v", err)
|
|
}
|
|
|
|
// Update the time tables to make this work
|
|
tt := s1.fsm.TimeTable()
|
|
tt.Witness(2000, time.Now().UTC().Add(-1*s1.config.JobGCThreshold))
|
|
|
|
// Create a core scheduler
|
|
snap, err := store.Snapshot()
|
|
if err != nil {
|
|
t.Fatalf("err: %v", err)
|
|
}
|
|
core := NewCoreScheduler(s1, snap)
|
|
|
|
// Attempt the GC
|
|
gc := s1.coreJobEval(structs.CoreJobJobGC, 2000)
|
|
err = core.Process(gc)
|
|
if err != nil {
|
|
t.Fatalf("err: %v", err)
|
|
}
|
|
|
|
// Shouldn't still exist
|
|
ws := memdb.NewWatchSet()
|
|
out, err := store.JobByID(ws, job.Namespace, job.ID)
|
|
if err != nil {
|
|
t.Fatalf("err: %v", err)
|
|
}
|
|
if out != nil {
|
|
t.Fatalf("bad: %v", out)
|
|
}
|
|
|
|
outE, err := store.EvalByID(ws, eval.ID)
|
|
if err != nil {
|
|
t.Fatalf("err: %v", err)
|
|
}
|
|
if outE != nil {
|
|
t.Fatalf("bad: %v", outE)
|
|
}
|
|
|
|
outE2, err := store.EvalByID(ws, eval2.ID)
|
|
if err != nil {
|
|
t.Fatalf("err: %v", err)
|
|
}
|
|
if outE2 != nil {
|
|
t.Fatalf("bad: %v", outE2)
|
|
}
|
|
|
|
outA, err := store.AllocByID(ws, alloc.ID)
|
|
if err != nil {
|
|
t.Fatalf("err: %v", err)
|
|
}
|
|
if outA != nil {
|
|
t.Fatalf("bad: %v", outA)
|
|
}
|
|
}
|
|
|
|
func TestCoreScheduler_JobGC_Force(t *testing.T) {
|
|
ci.Parallel(t)
|
|
for _, withAcl := range []bool{false, true} {
|
|
t.Run(fmt.Sprintf("with acl %v", withAcl), func(t *testing.T) {
|
|
var server *Server
|
|
var cleanup func()
|
|
if withAcl {
|
|
server, _, cleanup = TestACLServer(t, nil)
|
|
} else {
|
|
server, cleanup = TestServer(t, nil)
|
|
}
|
|
defer cleanup()
|
|
testutil.WaitForLeader(t, server.RPC)
|
|
|
|
// COMPAT Remove in 0.6: Reset the FSM time table since we reconcile which sets index 0
|
|
server.fsm.timetable.table = make([]TimeTableEntry, 1, 10)
|
|
|
|
// Insert job.
|
|
store := server.fsm.State()
|
|
job := mock.Job()
|
|
job.Type = structs.JobTypeBatch
|
|
job.Status = structs.JobStatusDead
|
|
err := store.UpsertJob(structs.MsgTypeTestSetup, 1000, job)
|
|
if err != nil {
|
|
t.Fatalf("err: %v", err)
|
|
}
|
|
|
|
// Insert a terminal eval
|
|
eval := mock.Eval()
|
|
eval.JobID = job.ID
|
|
eval.Status = structs.EvalStatusComplete
|
|
err = store.UpsertEvals(structs.MsgTypeTestSetup, 1001, []*structs.Evaluation{eval})
|
|
if err != nil {
|
|
t.Fatalf("err: %v", err)
|
|
}
|
|
|
|
// Create a core scheduler
|
|
snap, err := store.Snapshot()
|
|
if err != nil {
|
|
t.Fatalf("err: %v", err)
|
|
}
|
|
core := NewCoreScheduler(server, snap)
|
|
|
|
// Attempt the GC
|
|
gc := server.coreJobEval(structs.CoreJobForceGC, 1002)
|
|
err = core.Process(gc)
|
|
if err != nil {
|
|
t.Fatalf("err: %v", err)
|
|
}
|
|
|
|
// Shouldn't still exist
|
|
ws := memdb.NewWatchSet()
|
|
out, err := store.JobByID(ws, job.Namespace, job.ID)
|
|
if err != nil {
|
|
t.Fatalf("err: %v", err)
|
|
}
|
|
if out != nil {
|
|
t.Fatalf("bad: %v", out)
|
|
}
|
|
|
|
outE, err := store.EvalByID(ws, eval.ID)
|
|
if err != nil {
|
|
t.Fatalf("err: %v", err)
|
|
}
|
|
if outE != nil {
|
|
t.Fatalf("bad: %v", outE)
|
|
}
|
|
})
|
|
}
|
|
}
|
|
|
|
// This test ensures parameterized jobs only get gc'd when stopped
|
|
func TestCoreScheduler_JobGC_Parameterized(t *testing.T) {
|
|
ci.Parallel(t)
|
|
|
|
s1, cleanupS1 := TestServer(t, nil)
|
|
defer cleanupS1()
|
|
testutil.WaitForLeader(t, s1.RPC)
|
|
|
|
// COMPAT Remove in 0.6: Reset the FSM time table since we reconcile which sets index 0
|
|
s1.fsm.timetable.table = make([]TimeTableEntry, 1, 10)
|
|
|
|
// Insert a parameterized job.
|
|
store := s1.fsm.State()
|
|
job := mock.Job()
|
|
job.Type = structs.JobTypeBatch
|
|
job.Status = structs.JobStatusRunning
|
|
job.ParameterizedJob = &structs.ParameterizedJobConfig{
|
|
Payload: structs.DispatchPayloadRequired,
|
|
}
|
|
err := store.UpsertJob(structs.MsgTypeTestSetup, 1000, job)
|
|
if err != nil {
|
|
t.Fatalf("err: %v", err)
|
|
}
|
|
|
|
// Create a core scheduler
|
|
snap, err := store.Snapshot()
|
|
if err != nil {
|
|
t.Fatalf("err: %v", err)
|
|
}
|
|
core := NewCoreScheduler(s1, snap)
|
|
|
|
// Attempt the GC
|
|
gc := s1.coreJobEval(structs.CoreJobForceGC, 1002)
|
|
err = core.Process(gc)
|
|
if err != nil {
|
|
t.Fatalf("err: %v", err)
|
|
}
|
|
|
|
// Should still exist
|
|
ws := memdb.NewWatchSet()
|
|
out, err := store.JobByID(ws, job.Namespace, job.ID)
|
|
if err != nil {
|
|
t.Fatalf("err: %v", err)
|
|
}
|
|
if out == nil {
|
|
t.Fatalf("bad: %v", out)
|
|
}
|
|
|
|
// Mark the job as stopped and try again
|
|
job2 := job.Copy()
|
|
job2.Stop = true
|
|
err = store.UpsertJob(structs.MsgTypeTestSetup, 2000, job2)
|
|
if err != nil {
|
|
t.Fatalf("err: %v", err)
|
|
}
|
|
|
|
// Create a core scheduler
|
|
snap, err = store.Snapshot()
|
|
if err != nil {
|
|
t.Fatalf("err: %v", err)
|
|
}
|
|
core = NewCoreScheduler(s1, snap)
|
|
|
|
// Attempt the GC
|
|
gc = s1.coreJobEval(structs.CoreJobForceGC, 2002)
|
|
err = core.Process(gc)
|
|
if err != nil {
|
|
t.Fatalf("err: %v", err)
|
|
}
|
|
|
|
// Should not exist
|
|
out, err = store.JobByID(ws, job.Namespace, job.ID)
|
|
if err != nil {
|
|
t.Fatalf("err: %v", err)
|
|
}
|
|
if out != nil {
|
|
t.Fatalf("bad: %+v", out)
|
|
}
|
|
}
|
|
|
|
// This test ensures periodic jobs don't get GCd until they are stopped
|
|
func TestCoreScheduler_JobGC_Periodic(t *testing.T) {
|
|
ci.Parallel(t)
|
|
|
|
s1, cleanupS1 := TestServer(t, nil)
|
|
defer cleanupS1()
|
|
testutil.WaitForLeader(t, s1.RPC)
|
|
|
|
// COMPAT Remove in 0.6: Reset the FSM time table since we reconcile which sets index 0
|
|
s1.fsm.timetable.table = make([]TimeTableEntry, 1, 10)
|
|
|
|
// Insert a parameterized job.
|
|
store := s1.fsm.State()
|
|
job := mock.PeriodicJob()
|
|
err := store.UpsertJob(structs.MsgTypeTestSetup, 1000, job)
|
|
if err != nil {
|
|
t.Fatalf("err: %v", err)
|
|
}
|
|
|
|
// Create a core scheduler
|
|
snap, err := store.Snapshot()
|
|
if err != nil {
|
|
t.Fatalf("err: %v", err)
|
|
}
|
|
core := NewCoreScheduler(s1, snap)
|
|
|
|
// Attempt the GC
|
|
gc := s1.coreJobEval(structs.CoreJobForceGC, 1002)
|
|
err = core.Process(gc)
|
|
if err != nil {
|
|
t.Fatalf("err: %v", err)
|
|
}
|
|
|
|
// Should still exist
|
|
ws := memdb.NewWatchSet()
|
|
out, err := store.JobByID(ws, job.Namespace, job.ID)
|
|
if err != nil {
|
|
t.Fatalf("err: %v", err)
|
|
}
|
|
if out == nil {
|
|
t.Fatalf("bad: %v", out)
|
|
}
|
|
|
|
// Mark the job as stopped and try again
|
|
job2 := job.Copy()
|
|
job2.Stop = true
|
|
err = store.UpsertJob(structs.MsgTypeTestSetup, 2000, job2)
|
|
if err != nil {
|
|
t.Fatalf("err: %v", err)
|
|
}
|
|
|
|
// Create a core scheduler
|
|
snap, err = store.Snapshot()
|
|
if err != nil {
|
|
t.Fatalf("err: %v", err)
|
|
}
|
|
core = NewCoreScheduler(s1, snap)
|
|
|
|
// Attempt the GC
|
|
gc = s1.coreJobEval(structs.CoreJobForceGC, 2002)
|
|
err = core.Process(gc)
|
|
if err != nil {
|
|
t.Fatalf("err: %v", err)
|
|
}
|
|
|
|
// Should not exist
|
|
out, err = store.JobByID(ws, job.Namespace, job.ID)
|
|
if err != nil {
|
|
t.Fatalf("err: %v", err)
|
|
}
|
|
if out != nil {
|
|
t.Fatalf("bad: %+v", out)
|
|
}
|
|
}
|
|
|
|
func TestCoreScheduler_DeploymentGC(t *testing.T) {
|
|
ci.Parallel(t)
|
|
|
|
s1, cleanupS1 := TestServer(t, nil)
|
|
defer cleanupS1()
|
|
testutil.WaitForLeader(t, s1.RPC)
|
|
assert := assert.New(t)
|
|
|
|
// COMPAT Remove in 0.6: Reset the FSM time table since we reconcile which sets index 0
|
|
s1.fsm.timetable.table = make([]TimeTableEntry, 1, 10)
|
|
|
|
// Insert an active, terminal, and terminal with allocations deployment
|
|
store := s1.fsm.State()
|
|
d1, d2, d3 := mock.Deployment(), mock.Deployment(), mock.Deployment()
|
|
d1.Status = structs.DeploymentStatusFailed
|
|
d3.Status = structs.DeploymentStatusSuccessful
|
|
assert.Nil(store.UpsertDeployment(1000, d1), "UpsertDeployment")
|
|
assert.Nil(store.UpsertDeployment(1001, d2), "UpsertDeployment")
|
|
assert.Nil(store.UpsertDeployment(1002, d3), "UpsertDeployment")
|
|
|
|
a := mock.Alloc()
|
|
a.JobID = d3.JobID
|
|
a.DeploymentID = d3.ID
|
|
assert.Nil(store.UpsertAllocs(structs.MsgTypeTestSetup, 1003, []*structs.Allocation{a}), "UpsertAllocs")
|
|
|
|
// Update the time tables to make this work
|
|
tt := s1.fsm.TimeTable()
|
|
tt.Witness(2000, time.Now().UTC().Add(-1*s1.config.DeploymentGCThreshold))
|
|
|
|
// Create a core scheduler
|
|
snap, err := store.Snapshot()
|
|
assert.Nil(err, "Snapshot")
|
|
core := NewCoreScheduler(s1, snap)
|
|
|
|
// Attempt the GC
|
|
gc := s1.coreJobEval(structs.CoreJobDeploymentGC, 2000)
|
|
assert.Nil(core.Process(gc), "Process GC")
|
|
|
|
// Should be gone
|
|
ws := memdb.NewWatchSet()
|
|
out, err := store.DeploymentByID(ws, d1.ID)
|
|
assert.Nil(err, "DeploymentByID")
|
|
assert.Nil(out, "Terminal Deployment")
|
|
out2, err := store.DeploymentByID(ws, d2.ID)
|
|
assert.Nil(err, "DeploymentByID")
|
|
assert.NotNil(out2, "Active Deployment")
|
|
out3, err := store.DeploymentByID(ws, d3.ID)
|
|
assert.Nil(err, "DeploymentByID")
|
|
assert.NotNil(out3, "Terminal Deployment With Allocs")
|
|
}
|
|
|
|
func TestCoreScheduler_DeploymentGC_Force(t *testing.T) {
|
|
ci.Parallel(t)
|
|
for _, withAcl := range []bool{false, true} {
|
|
t.Run(fmt.Sprintf("with acl %v", withAcl), func(t *testing.T) {
|
|
var server *Server
|
|
var cleanup func()
|
|
if withAcl {
|
|
server, _, cleanup = TestACLServer(t, nil)
|
|
} else {
|
|
server, cleanup = TestServer(t, nil)
|
|
}
|
|
defer cleanup()
|
|
testutil.WaitForLeader(t, server.RPC)
|
|
assert := assert.New(t)
|
|
|
|
// COMPAT Remove in 0.6: Reset the FSM time table since we reconcile which sets index 0
|
|
server.fsm.timetable.table = make([]TimeTableEntry, 1, 10)
|
|
|
|
// Insert terminal and active deployment
|
|
store := server.fsm.State()
|
|
d1, d2 := mock.Deployment(), mock.Deployment()
|
|
d1.Status = structs.DeploymentStatusFailed
|
|
assert.Nil(store.UpsertDeployment(1000, d1), "UpsertDeployment")
|
|
assert.Nil(store.UpsertDeployment(1001, d2), "UpsertDeployment")
|
|
|
|
// Create a core scheduler
|
|
snap, err := store.Snapshot()
|
|
assert.Nil(err, "Snapshot")
|
|
core := NewCoreScheduler(server, snap)
|
|
|
|
// Attempt the GC
|
|
gc := server.coreJobEval(structs.CoreJobForceGC, 1000)
|
|
assert.Nil(core.Process(gc), "Process Force GC")
|
|
|
|
// Should be gone
|
|
ws := memdb.NewWatchSet()
|
|
out, err := store.DeploymentByID(ws, d1.ID)
|
|
assert.Nil(err, "DeploymentByID")
|
|
assert.Nil(out, "Terminal Deployment")
|
|
out2, err := store.DeploymentByID(ws, d2.ID)
|
|
assert.Nil(err, "DeploymentByID")
|
|
assert.NotNil(out2, "Active Deployment")
|
|
})
|
|
}
|
|
}
|
|
|
|
func TestCoreScheduler_PartitionEvalReap(t *testing.T) {
|
|
ci.Parallel(t)
|
|
|
|
s1, cleanupS1 := TestServer(t, nil)
|
|
defer cleanupS1()
|
|
testutil.WaitForLeader(t, s1.RPC)
|
|
|
|
// COMPAT Remove in 0.6: Reset the FSM time table since we reconcile which sets index 0
|
|
s1.fsm.timetable.table = make([]TimeTableEntry, 1, 10)
|
|
|
|
// Create a core scheduler
|
|
snap, err := s1.fsm.State().Snapshot()
|
|
if err != nil {
|
|
t.Fatalf("err: %v", err)
|
|
}
|
|
core := NewCoreScheduler(s1, snap)
|
|
|
|
// Set the max ids per reap to something lower.
|
|
structs.MaxUUIDsPerWriteRequest = 2
|
|
|
|
evals := []string{"a", "b", "c"}
|
|
allocs := []string{"1", "2", "3"}
|
|
requests := core.(*CoreScheduler).partitionEvalReap(evals, allocs)
|
|
if len(requests) != 3 {
|
|
t.Fatalf("Expected 3 requests got: %v", requests)
|
|
}
|
|
|
|
first := requests[0]
|
|
if len(first.Allocs) != 2 && len(first.Evals) != 0 {
|
|
t.Fatalf("Unexpected first request: %v", first)
|
|
}
|
|
|
|
second := requests[1]
|
|
if len(second.Allocs) != 1 && len(second.Evals) != 1 {
|
|
t.Fatalf("Unexpected second request: %v", second)
|
|
}
|
|
|
|
third := requests[2]
|
|
if len(third.Allocs) != 0 && len(third.Evals) != 2 {
|
|
t.Fatalf("Unexpected third request: %v", third)
|
|
}
|
|
}
|
|
|
|
func TestCoreScheduler_PartitionDeploymentReap(t *testing.T) {
|
|
ci.Parallel(t)
|
|
|
|
s1, cleanupS1 := TestServer(t, nil)
|
|
defer cleanupS1()
|
|
testutil.WaitForLeader(t, s1.RPC)
|
|
|
|
// COMPAT Remove in 0.6: Reset the FSM time table since we reconcile which sets index 0
|
|
s1.fsm.timetable.table = make([]TimeTableEntry, 1, 10)
|
|
|
|
// Create a core scheduler
|
|
snap, err := s1.fsm.State().Snapshot()
|
|
if err != nil {
|
|
t.Fatalf("err: %v", err)
|
|
}
|
|
core := NewCoreScheduler(s1, snap)
|
|
|
|
// Set the max ids per reap to something lower.
|
|
structs.MaxUUIDsPerWriteRequest = 2
|
|
|
|
deployments := []string{"a", "b", "c"}
|
|
requests := core.(*CoreScheduler).partitionDeploymentReap(deployments)
|
|
if len(requests) != 2 {
|
|
t.Fatalf("Expected 2 requests got: %v", requests)
|
|
}
|
|
|
|
first := requests[0]
|
|
if len(first.Deployments) != 2 {
|
|
t.Fatalf("Unexpected first request: %v", first)
|
|
}
|
|
|
|
second := requests[1]
|
|
if len(second.Deployments) != 1 {
|
|
t.Fatalf("Unexpected second request: %v", second)
|
|
}
|
|
}
|
|
|
|
func TestCoreScheduler_PartitionJobReap(t *testing.T) {
|
|
|
|
s1, cleanupS1 := TestServer(t, nil)
|
|
defer cleanupS1()
|
|
testutil.WaitForLeader(t, s1.RPC)
|
|
|
|
// Create a core scheduler
|
|
snap, err := s1.fsm.State().Snapshot()
|
|
if err != nil {
|
|
t.Fatalf("err: %v", err)
|
|
}
|
|
core := NewCoreScheduler(s1, snap)
|
|
|
|
// Set the max ids per reap to something lower.
|
|
originalMaxUUIDsPerWriteRequest := structs.MaxUUIDsPerWriteRequest
|
|
structs.MaxUUIDsPerWriteRequest = 2
|
|
defer func() {
|
|
structs.MaxUUIDsPerWriteRequest = originalMaxUUIDsPerWriteRequest
|
|
}()
|
|
|
|
jobs := []*structs.Job{mock.Job(), mock.Job(), mock.Job()}
|
|
requests := core.(*CoreScheduler).partitionJobReap(jobs, "")
|
|
require.Len(t, requests, 2)
|
|
|
|
first := requests[0]
|
|
second := requests[1]
|
|
require.Len(t, first.Jobs, 2)
|
|
require.Len(t, second.Jobs, 1)
|
|
}
|
|
|
|
// Tests various scenarios when allocations are eligible to be GCed
|
|
func TestAllocation_GCEligible(t *testing.T) {
|
|
type testCase struct {
|
|
Desc string
|
|
GCTime time.Time
|
|
ClientStatus string
|
|
DesiredStatus string
|
|
JobStatus string
|
|
JobStop bool
|
|
AllocJobModifyIndex uint64
|
|
JobModifyIndex uint64
|
|
ModifyIndex uint64
|
|
NextAllocID string
|
|
ReschedulePolicy *structs.ReschedulePolicy
|
|
RescheduleTrackers []*structs.RescheduleEvent
|
|
ThresholdIndex uint64
|
|
ShouldGC bool
|
|
}
|
|
|
|
fail := time.Now()
|
|
|
|
harness := []testCase{
|
|
{
|
|
Desc: "Don't GC when non terminal",
|
|
ClientStatus: structs.AllocClientStatusPending,
|
|
DesiredStatus: structs.AllocDesiredStatusRun,
|
|
GCTime: fail,
|
|
ModifyIndex: 90,
|
|
ThresholdIndex: 90,
|
|
ShouldGC: false,
|
|
},
|
|
{
|
|
Desc: "Don't GC when non terminal and job stopped",
|
|
ClientStatus: structs.AllocClientStatusPending,
|
|
DesiredStatus: structs.AllocDesiredStatusRun,
|
|
JobStop: true,
|
|
GCTime: fail,
|
|
ModifyIndex: 90,
|
|
ThresholdIndex: 90,
|
|
ShouldGC: false,
|
|
},
|
|
{
|
|
Desc: "Don't GC when non terminal and job dead",
|
|
ClientStatus: structs.AllocClientStatusPending,
|
|
DesiredStatus: structs.AllocDesiredStatusRun,
|
|
JobStatus: structs.JobStatusDead,
|
|
GCTime: fail,
|
|
ModifyIndex: 90,
|
|
ThresholdIndex: 90,
|
|
ShouldGC: false,
|
|
},
|
|
{
|
|
Desc: "Don't GC when non terminal on client and job dead",
|
|
ClientStatus: structs.AllocClientStatusRunning,
|
|
DesiredStatus: structs.AllocDesiredStatusStop,
|
|
JobStatus: structs.JobStatusDead,
|
|
GCTime: fail,
|
|
ModifyIndex: 90,
|
|
ThresholdIndex: 90,
|
|
ShouldGC: false,
|
|
},
|
|
{
|
|
Desc: "GC when terminal but not failed ",
|
|
ClientStatus: structs.AllocClientStatusComplete,
|
|
DesiredStatus: structs.AllocDesiredStatusRun,
|
|
GCTime: fail,
|
|
ModifyIndex: 90,
|
|
ThresholdIndex: 90,
|
|
ReschedulePolicy: nil,
|
|
ShouldGC: true,
|
|
},
|
|
{
|
|
Desc: "Don't GC when threshold not met",
|
|
ClientStatus: structs.AllocClientStatusComplete,
|
|
DesiredStatus: structs.AllocDesiredStatusStop,
|
|
GCTime: fail,
|
|
ModifyIndex: 100,
|
|
ThresholdIndex: 90,
|
|
ReschedulePolicy: nil,
|
|
ShouldGC: false,
|
|
},
|
|
{
|
|
Desc: "GC when no reschedule policy",
|
|
ClientStatus: structs.AllocClientStatusFailed,
|
|
DesiredStatus: structs.AllocDesiredStatusRun,
|
|
GCTime: fail,
|
|
ReschedulePolicy: nil,
|
|
ModifyIndex: 90,
|
|
ThresholdIndex: 90,
|
|
ShouldGC: true,
|
|
},
|
|
{
|
|
Desc: "GC when empty policy",
|
|
ClientStatus: structs.AllocClientStatusFailed,
|
|
DesiredStatus: structs.AllocDesiredStatusRun,
|
|
GCTime: fail,
|
|
ReschedulePolicy: &structs.ReschedulePolicy{Attempts: 0, Interval: 0 * time.Minute},
|
|
ModifyIndex: 90,
|
|
ThresholdIndex: 90,
|
|
ShouldGC: true,
|
|
},
|
|
{
|
|
Desc: "Don't GC when no previous reschedule attempts",
|
|
ClientStatus: structs.AllocClientStatusFailed,
|
|
DesiredStatus: structs.AllocDesiredStatusRun,
|
|
GCTime: fail,
|
|
ModifyIndex: 90,
|
|
ThresholdIndex: 90,
|
|
ReschedulePolicy: &structs.ReschedulePolicy{Attempts: 1, Interval: 1 * time.Minute},
|
|
ShouldGC: false,
|
|
},
|
|
{
|
|
Desc: "Don't GC when prev reschedule attempt within interval",
|
|
ClientStatus: structs.AllocClientStatusFailed,
|
|
DesiredStatus: structs.AllocDesiredStatusRun,
|
|
ReschedulePolicy: &structs.ReschedulePolicy{Attempts: 2, Interval: 30 * time.Minute},
|
|
GCTime: fail,
|
|
ModifyIndex: 90,
|
|
ThresholdIndex: 90,
|
|
RescheduleTrackers: []*structs.RescheduleEvent{
|
|
{
|
|
RescheduleTime: fail.Add(-5 * time.Minute).UTC().UnixNano(),
|
|
},
|
|
},
|
|
ShouldGC: false,
|
|
},
|
|
{
|
|
Desc: "GC with prev reschedule attempt outside interval",
|
|
ClientStatus: structs.AllocClientStatusFailed,
|
|
DesiredStatus: structs.AllocDesiredStatusRun,
|
|
GCTime: fail,
|
|
ReschedulePolicy: &structs.ReschedulePolicy{Attempts: 5, Interval: 30 * time.Minute},
|
|
RescheduleTrackers: []*structs.RescheduleEvent{
|
|
{
|
|
RescheduleTime: fail.Add(-45 * time.Minute).UTC().UnixNano(),
|
|
},
|
|
{
|
|
RescheduleTime: fail.Add(-60 * time.Minute).UTC().UnixNano(),
|
|
},
|
|
},
|
|
ShouldGC: true,
|
|
},
|
|
{
|
|
Desc: "GC when next alloc id is set",
|
|
ClientStatus: structs.AllocClientStatusFailed,
|
|
DesiredStatus: structs.AllocDesiredStatusRun,
|
|
GCTime: fail,
|
|
ReschedulePolicy: &structs.ReschedulePolicy{Attempts: 5, Interval: 30 * time.Minute},
|
|
RescheduleTrackers: []*structs.RescheduleEvent{
|
|
{
|
|
RescheduleTime: fail.Add(-3 * time.Minute).UTC().UnixNano(),
|
|
},
|
|
},
|
|
NextAllocID: uuid.Generate(),
|
|
ShouldGC: true,
|
|
},
|
|
{
|
|
Desc: "Don't GC when next alloc id is not set and unlimited restarts",
|
|
ClientStatus: structs.AllocClientStatusFailed,
|
|
DesiredStatus: structs.AllocDesiredStatusRun,
|
|
GCTime: fail,
|
|
ReschedulePolicy: &structs.ReschedulePolicy{Unlimited: true, Delay: 5 * time.Second, DelayFunction: "constant"},
|
|
RescheduleTrackers: []*structs.RescheduleEvent{
|
|
{
|
|
RescheduleTime: fail.Add(-3 * time.Minute).UTC().UnixNano(),
|
|
},
|
|
},
|
|
ShouldGC: false,
|
|
},
|
|
{
|
|
Desc: "GC when job is stopped",
|
|
ClientStatus: structs.AllocClientStatusFailed,
|
|
DesiredStatus: structs.AllocDesiredStatusRun,
|
|
GCTime: fail,
|
|
ReschedulePolicy: &structs.ReschedulePolicy{Attempts: 5, Interval: 30 * time.Minute},
|
|
RescheduleTrackers: []*structs.RescheduleEvent{
|
|
{
|
|
RescheduleTime: fail.Add(-3 * time.Minute).UTC().UnixNano(),
|
|
},
|
|
},
|
|
JobStop: true,
|
|
ShouldGC: true,
|
|
},
|
|
{
|
|
Desc: "GC when job status is dead",
|
|
ClientStatus: structs.AllocClientStatusFailed,
|
|
DesiredStatus: structs.AllocDesiredStatusRun,
|
|
GCTime: fail,
|
|
ReschedulePolicy: &structs.ReschedulePolicy{Attempts: 5, Interval: 30 * time.Minute},
|
|
RescheduleTrackers: []*structs.RescheduleEvent{
|
|
{
|
|
RescheduleTime: fail.Add(-3 * time.Minute).UTC().UnixNano(),
|
|
},
|
|
},
|
|
JobStatus: structs.JobStatusDead,
|
|
ShouldGC: true,
|
|
},
|
|
{
|
|
Desc: "GC when desired status is stop, unlimited reschedule policy, no previous reschedule events",
|
|
ClientStatus: structs.AllocClientStatusFailed,
|
|
DesiredStatus: structs.AllocDesiredStatusStop,
|
|
GCTime: fail,
|
|
ReschedulePolicy: &structs.ReschedulePolicy{Unlimited: true, Delay: 5 * time.Second, DelayFunction: "constant"},
|
|
ShouldGC: true,
|
|
},
|
|
{
|
|
Desc: "GC when desired status is stop, limited reschedule policy, some previous reschedule events",
|
|
ClientStatus: structs.AllocClientStatusFailed,
|
|
DesiredStatus: structs.AllocDesiredStatusStop,
|
|
GCTime: fail,
|
|
ReschedulePolicy: &structs.ReschedulePolicy{Attempts: 5, Interval: 30 * time.Minute},
|
|
RescheduleTrackers: []*structs.RescheduleEvent{
|
|
{
|
|
RescheduleTime: fail.Add(-3 * time.Minute).UTC().UnixNano(),
|
|
},
|
|
},
|
|
ShouldGC: true,
|
|
},
|
|
}
|
|
|
|
for _, tc := range harness {
|
|
alloc := &structs.Allocation{}
|
|
alloc.ModifyIndex = tc.ModifyIndex
|
|
alloc.DesiredStatus = tc.DesiredStatus
|
|
alloc.ClientStatus = tc.ClientStatus
|
|
alloc.RescheduleTracker = &structs.RescheduleTracker{Events: tc.RescheduleTrackers}
|
|
alloc.NextAllocation = tc.NextAllocID
|
|
job := mock.Job()
|
|
alloc.TaskGroup = job.TaskGroups[0].Name
|
|
job.TaskGroups[0].ReschedulePolicy = tc.ReschedulePolicy
|
|
if tc.JobStatus != "" {
|
|
job.Status = tc.JobStatus
|
|
}
|
|
job.Stop = tc.JobStop
|
|
|
|
t.Run(tc.Desc, func(t *testing.T) {
|
|
if got := allocGCEligible(alloc, job, tc.GCTime, tc.ThresholdIndex); got != tc.ShouldGC {
|
|
t.Fatalf("expected %v but got %v", tc.ShouldGC, got)
|
|
}
|
|
})
|
|
|
|
}
|
|
|
|
// Verify nil job
|
|
alloc := mock.Alloc()
|
|
alloc.ClientStatus = structs.AllocClientStatusComplete
|
|
require.True(t, allocGCEligible(alloc, nil, time.Now(), 1000))
|
|
}
|
|
|
|
func TestCoreScheduler_CSIPluginGC(t *testing.T) {
|
|
ci.Parallel(t)
|
|
|
|
srv, cleanupSRV := TestServer(t, nil)
|
|
defer cleanupSRV()
|
|
testutil.WaitForLeader(t, srv.RPC)
|
|
|
|
srv.fsm.timetable.table = make([]TimeTableEntry, 1, 10)
|
|
|
|
deleteNodes := state.CreateTestCSIPlugin(srv.fsm.State(), "foo")
|
|
defer deleteNodes()
|
|
store := srv.fsm.State()
|
|
|
|
// Update the time tables to make this work
|
|
tt := srv.fsm.TimeTable()
|
|
index := uint64(2000)
|
|
tt.Witness(index, time.Now().UTC().Add(-1*srv.config.CSIPluginGCThreshold))
|
|
|
|
// Create a core scheduler
|
|
snap, err := store.Snapshot()
|
|
require.NoError(t, err)
|
|
core := NewCoreScheduler(srv, snap)
|
|
|
|
// Attempt the GC
|
|
index++
|
|
gc := srv.coreJobEval(structs.CoreJobCSIPluginGC, index)
|
|
require.NoError(t, core.Process(gc))
|
|
|
|
// Should not be gone (plugin in use)
|
|
ws := memdb.NewWatchSet()
|
|
plug, err := store.CSIPluginByID(ws, "foo")
|
|
require.NotNil(t, plug)
|
|
require.NoError(t, err)
|
|
|
|
// Empty the plugin
|
|
plug.Controllers = map[string]*structs.CSIInfo{}
|
|
plug.Nodes = map[string]*structs.CSIInfo{}
|
|
|
|
index++
|
|
err = store.UpsertCSIPlugin(index, plug)
|
|
require.NoError(t, err)
|
|
|
|
// Retry
|
|
index++
|
|
gc = srv.coreJobEval(structs.CoreJobCSIPluginGC, index)
|
|
require.NoError(t, core.Process(gc))
|
|
|
|
// Should be gone
|
|
plug, err = store.CSIPluginByID(ws, "foo")
|
|
require.Nil(t, plug)
|
|
require.NoError(t, err)
|
|
}
|
|
|
|
func TestCoreScheduler_CSIVolumeClaimGC(t *testing.T) {
|
|
srv, shutdown := TestServer(t, func(c *Config) {
|
|
c.NumSchedulers = 0 // Prevent automatic dequeue
|
|
})
|
|
|
|
defer shutdown()
|
|
testutil.WaitForLeader(t, srv.RPC)
|
|
codec := rpcClient(t, srv)
|
|
|
|
index := uint64(1)
|
|
volID := uuid.Generate()
|
|
ns := structs.DefaultNamespace
|
|
pluginID := "foo"
|
|
|
|
store := srv.fsm.State()
|
|
ws := memdb.NewWatchSet()
|
|
|
|
index, _ = store.LatestIndex()
|
|
|
|
// Create client node and plugin
|
|
node := mock.Node()
|
|
node.Attributes["nomad.version"] = "0.11.0" // needs client RPCs
|
|
node.CSINodePlugins = map[string]*structs.CSIInfo{
|
|
pluginID: {
|
|
PluginID: pluginID,
|
|
Healthy: true,
|
|
NodeInfo: &structs.CSINodeInfo{},
|
|
},
|
|
}
|
|
index++
|
|
err := store.UpsertNode(structs.MsgTypeTestSetup, index, node)
|
|
require.NoError(t, err)
|
|
|
|
// *Important*: for volume writes in this test we must use RPCs
|
|
// rather than StateStore methods directly, or the blocking query
|
|
// in volumewatcher won't get the final update for GC because it's
|
|
// watching on a different store at that point
|
|
|
|
// Register a volume
|
|
vols := []*structs.CSIVolume{{
|
|
ID: volID,
|
|
Namespace: ns,
|
|
PluginID: pluginID,
|
|
Topologies: []*structs.CSITopology{},
|
|
RequestedCapabilities: []*structs.CSIVolumeCapability{{
|
|
AccessMode: structs.CSIVolumeAccessModeMultiNodeSingleWriter,
|
|
AttachmentMode: structs.CSIVolumeAttachmentModeFilesystem,
|
|
}},
|
|
}}
|
|
volReq := &structs.CSIVolumeRegisterRequest{Volumes: vols}
|
|
volReq.Namespace = ns
|
|
volReq.Region = srv.config.Region
|
|
|
|
err = msgpackrpc.CallWithCodec(codec, "CSIVolume.Register",
|
|
volReq, &structs.CSIVolumeRegisterResponse{})
|
|
require.NoError(t, err)
|
|
|
|
// Create a job with two allocs that claim the volume.
|
|
// We use two allocs here, one of which is not running, so
|
|
// that we can assert the volumewatcher has made one
|
|
// complete pass (and removed the 2nd alloc) before we
|
|
// run the GC
|
|
eval := mock.Eval()
|
|
eval.Status = structs.EvalStatusFailed
|
|
index++
|
|
store.UpsertJobSummary(index, mock.JobSummary(eval.JobID))
|
|
index++
|
|
err = store.UpsertEvals(structs.MsgTypeTestSetup, index, []*structs.Evaluation{eval})
|
|
require.Nil(t, err)
|
|
|
|
job := mock.Job()
|
|
job.ID = eval.JobID
|
|
job.Status = structs.JobStatusRunning
|
|
index++
|
|
err = store.UpsertJob(structs.MsgTypeTestSetup, index, job)
|
|
require.NoError(t, err)
|
|
|
|
alloc1, alloc2 := mock.Alloc(), mock.Alloc()
|
|
alloc1.NodeID = node.ID
|
|
alloc1.ClientStatus = structs.AllocClientStatusRunning
|
|
alloc1.Job = job
|
|
alloc1.JobID = job.ID
|
|
alloc1.EvalID = eval.ID
|
|
|
|
alloc2.NodeID = node.ID
|
|
alloc2.ClientStatus = structs.AllocClientStatusComplete
|
|
alloc2.DesiredStatus = structs.AllocDesiredStatusStop
|
|
alloc2.Job = job
|
|
alloc2.JobID = job.ID
|
|
alloc2.EvalID = eval.ID
|
|
|
|
summary := mock.JobSummary(alloc1.JobID)
|
|
index++
|
|
require.NoError(t, store.UpsertJobSummary(index, summary))
|
|
summary = mock.JobSummary(alloc2.JobID)
|
|
index++
|
|
require.NoError(t, store.UpsertJobSummary(index, summary))
|
|
index++
|
|
require.NoError(t, store.UpsertAllocs(structs.MsgTypeTestSetup, index, []*structs.Allocation{alloc1, alloc2}))
|
|
|
|
req := &structs.CSIVolumeClaimRequest{
|
|
VolumeID: volID,
|
|
AllocationID: alloc1.ID,
|
|
NodeID: uuid.Generate(), // doesn't exist so we don't get errors trying to unmount volumes from it
|
|
Claim: structs.CSIVolumeClaimWrite,
|
|
AccessMode: structs.CSIVolumeAccessModeMultiNodeMultiWriter,
|
|
AttachmentMode: structs.CSIVolumeAttachmentModeFilesystem,
|
|
State: structs.CSIVolumeClaimStateTaken,
|
|
WriteRequest: structs.WriteRequest{
|
|
Namespace: ns,
|
|
Region: srv.config.Region,
|
|
},
|
|
}
|
|
err = msgpackrpc.CallWithCodec(codec, "CSIVolume.Claim",
|
|
req, &structs.CSIVolumeClaimResponse{})
|
|
require.NoError(t, err, "write claim should succeed")
|
|
|
|
req.AllocationID = alloc2.ID
|
|
req.State = structs.CSIVolumeClaimStateUnpublishing
|
|
|
|
err = msgpackrpc.CallWithCodec(codec, "CSIVolume.Claim",
|
|
req, &structs.CSIVolumeClaimResponse{})
|
|
require.NoError(t, err, "unpublishing claim should succeed")
|
|
|
|
require.Eventually(t, func() bool {
|
|
vol, err := store.CSIVolumeByID(ws, ns, volID)
|
|
require.NoError(t, err)
|
|
return len(vol.WriteClaims) == 1 &&
|
|
len(vol.WriteAllocs) == 1 &&
|
|
len(vol.PastClaims) == 0
|
|
}, time.Second*1, 100*time.Millisecond,
|
|
"volumewatcher should have released unpublishing claim without GC")
|
|
|
|
// At this point we can guarantee that volumewatcher is waiting
|
|
// for new work. Delete allocation and job so that the next pass
|
|
// thru volumewatcher has more work to do
|
|
index, _ = store.LatestIndex()
|
|
index++
|
|
err = store.DeleteJob(index, ns, job.ID)
|
|
require.NoError(t, err)
|
|
index, _ = store.LatestIndex()
|
|
index++
|
|
err = store.DeleteEval(index, []string{eval.ID}, []string{alloc1.ID}, false)
|
|
require.NoError(t, err)
|
|
|
|
// Create a core scheduler and attempt the volume claim GC
|
|
snap, err := store.Snapshot()
|
|
require.NoError(t, err)
|
|
|
|
core := NewCoreScheduler(srv, snap)
|
|
|
|
index, _ = snap.LatestIndex()
|
|
index++
|
|
gc := srv.coreJobEval(structs.CoreJobForceGC, index)
|
|
c := core.(*CoreScheduler)
|
|
require.NoError(t, c.csiVolumeClaimGC(gc))
|
|
|
|
// the only remaining claim is for a deleted alloc with no path to
|
|
// the non-existent node, so volumewatcher will release the
|
|
// remaining claim
|
|
require.Eventually(t, func() bool {
|
|
vol, _ := store.CSIVolumeByID(ws, ns, volID)
|
|
return len(vol.WriteClaims) == 0 &&
|
|
len(vol.WriteAllocs) == 0 &&
|
|
len(vol.PastClaims) == 0
|
|
}, time.Second*2, 10*time.Millisecond, "claims were not released")
|
|
|
|
}
|
|
|
|
func TestCoreScheduler_CSIBadState_ClaimGC(t *testing.T) {
|
|
ci.Parallel(t)
|
|
|
|
srv, shutdown := TestServer(t, func(c *Config) {
|
|
c.NumSchedulers = 0 // Prevent automatic dequeue
|
|
})
|
|
|
|
defer shutdown()
|
|
testutil.WaitForLeader(t, srv.RPC)
|
|
|
|
err := state.TestBadCSIState(t, srv.State())
|
|
require.NoError(t, err)
|
|
|
|
snap, err := srv.State().Snapshot()
|
|
require.NoError(t, err)
|
|
core := NewCoreScheduler(srv, snap)
|
|
|
|
index, _ := srv.State().LatestIndex()
|
|
index++
|
|
gc := srv.coreJobEval(structs.CoreJobForceGC, index)
|
|
c := core.(*CoreScheduler)
|
|
require.NoError(t, c.csiVolumeClaimGC(gc))
|
|
|
|
require.Eventually(t, func() bool {
|
|
vol, _ := srv.State().CSIVolumeByID(nil,
|
|
structs.DefaultNamespace, "csi-volume-nfs0")
|
|
if len(vol.PastClaims) != 2 {
|
|
return false
|
|
}
|
|
for _, claim := range vol.PastClaims {
|
|
if claim.State != structs.CSIVolumeClaimStateUnpublishing {
|
|
return false
|
|
}
|
|
}
|
|
return true
|
|
}, time.Second*5, 10*time.Millisecond, "invalid claims should be marked for GC")
|
|
|
|
}
|
|
|
|
// TestCoreScheduler_RootKeyGC exercises root key GC
|
|
func TestCoreScheduler_RootKeyGC(t *testing.T) {
|
|
ci.Parallel(t)
|
|
|
|
srv, cleanup := TestServer(t, nil)
|
|
defer cleanup()
|
|
testutil.WaitForLeader(t, srv.RPC)
|
|
|
|
// reset the time table
|
|
srv.fsm.timetable.table = make([]TimeTableEntry, 1, 10)
|
|
|
|
// active key, will never be GC'd
|
|
store := srv.fsm.State()
|
|
key0, err := store.GetActiveRootKeyMeta(nil)
|
|
require.NotNil(t, key0, "expected keyring to be bootstapped")
|
|
require.NoError(t, err)
|
|
|
|
// insert an "old" inactive key
|
|
key1 := structs.NewRootKeyMeta()
|
|
key1.SetInactive()
|
|
require.NoError(t, store.UpsertRootKeyMeta(600, key1, false))
|
|
|
|
// insert an "old" and inactive key with a variable that's using it
|
|
key2 := structs.NewRootKeyMeta()
|
|
key2.SetInactive()
|
|
require.NoError(t, store.UpsertRootKeyMeta(700, key2, false))
|
|
|
|
variable := mock.VariableEncrypted()
|
|
variable.KeyID = key2.KeyID
|
|
|
|
setResp := store.VarSet(601, &structs.VarApplyStateRequest{
|
|
Op: structs.VarOpSet,
|
|
Var: variable,
|
|
})
|
|
require.NoError(t, setResp.Error)
|
|
|
|
// insert an "old" key that's inactive but being used by an alloc
|
|
key3 := structs.NewRootKeyMeta()
|
|
key3.SetInactive()
|
|
require.NoError(t, store.UpsertRootKeyMeta(800, key3, false))
|
|
|
|
// insert the allocation using key3
|
|
alloc := mock.Alloc()
|
|
alloc.ClientStatus = structs.AllocClientStatusRunning
|
|
alloc.SigningKeyID = key3.KeyID
|
|
require.NoError(t, store.UpsertAllocs(
|
|
structs.MsgTypeTestSetup, 850, []*structs.Allocation{alloc}))
|
|
|
|
// insert an "old" key that's inactive but being used by an alloc
|
|
key4 := structs.NewRootKeyMeta()
|
|
key4.SetInactive()
|
|
require.NoError(t, store.UpsertRootKeyMeta(900, key4, false))
|
|
|
|
// insert the dead allocation using key4
|
|
alloc2 := mock.Alloc()
|
|
alloc2.ClientStatus = structs.AllocClientStatusFailed
|
|
alloc2.DesiredStatus = structs.AllocDesiredStatusStop
|
|
alloc2.SigningKeyID = key4.KeyID
|
|
require.NoError(t, store.UpsertAllocs(
|
|
structs.MsgTypeTestSetup, 950, []*structs.Allocation{alloc2}))
|
|
|
|
// insert a time table index before the last key
|
|
tt := srv.fsm.TimeTable()
|
|
tt.Witness(1000, time.Now().UTC().Add(-1*srv.config.RootKeyGCThreshold))
|
|
|
|
// insert a "new" but inactive key
|
|
key5 := structs.NewRootKeyMeta()
|
|
key5.SetInactive()
|
|
require.NoError(t, store.UpsertRootKeyMeta(1500, key5, false))
|
|
|
|
// run the core job
|
|
snap, err := store.Snapshot()
|
|
require.NoError(t, err)
|
|
core := NewCoreScheduler(srv, snap)
|
|
eval := srv.coreJobEval(structs.CoreJobRootKeyRotateOrGC, 2000)
|
|
c := core.(*CoreScheduler)
|
|
require.NoError(t, c.rootKeyRotateOrGC(eval))
|
|
|
|
ws := memdb.NewWatchSet()
|
|
key, err := store.RootKeyMetaByID(ws, key0.KeyID)
|
|
require.NoError(t, err)
|
|
require.NotNil(t, key, "active key should not have been GCd")
|
|
|
|
key, err = store.RootKeyMetaByID(ws, key1.KeyID)
|
|
require.NoError(t, err)
|
|
require.Nil(t, key, "old and unused inactive key should have been GCd")
|
|
|
|
key, err = store.RootKeyMetaByID(ws, key2.KeyID)
|
|
require.NoError(t, err)
|
|
require.NotNil(t, key, "old key should not have been GCd if still in use")
|
|
|
|
key, err = store.RootKeyMetaByID(ws, key3.KeyID)
|
|
require.NoError(t, err)
|
|
require.NotNil(t, key, "old key used to sign a live alloc should not have been GCd")
|
|
|
|
key, err = store.RootKeyMetaByID(ws, key4.KeyID)
|
|
require.NoError(t, err)
|
|
require.Nil(t, key, "old key used to sign a terminal alloc should have been GCd")
|
|
|
|
key, err = store.RootKeyMetaByID(ws, key5.KeyID)
|
|
require.NoError(t, err)
|
|
require.NotNil(t, key, "new key should not have been GCd")
|
|
|
|
}
|
|
|
|
// TestCoreScheduler_VariablesRekey exercises variables rekeying
|
|
func TestCoreScheduler_VariablesRekey(t *testing.T) {
|
|
ci.Parallel(t)
|
|
|
|
srv, cleanup := TestServer(t, nil)
|
|
defer cleanup()
|
|
testutil.WaitForLeader(t, srv.RPC)
|
|
|
|
store := srv.fsm.State()
|
|
key0, err := store.GetActiveRootKeyMeta(nil)
|
|
require.NotNil(t, key0, "expected keyring to be bootstapped")
|
|
require.NoError(t, err)
|
|
|
|
for i := 0; i < 3; i++ {
|
|
req := &structs.VariablesApplyRequest{
|
|
Op: structs.VarOpSet,
|
|
Var: mock.Variable(),
|
|
WriteRequest: structs.WriteRequest{Region: srv.config.Region},
|
|
}
|
|
resp := &structs.VariablesApplyResponse{}
|
|
require.NoError(t, srv.RPC("Variables.Apply", req, resp))
|
|
}
|
|
|
|
rotateReq := &structs.KeyringRotateRootKeyRequest{
|
|
WriteRequest: structs.WriteRequest{
|
|
Region: srv.config.Region,
|
|
},
|
|
}
|
|
var rotateResp structs.KeyringRotateRootKeyResponse
|
|
require.NoError(t, srv.RPC("Keyring.Rotate", rotateReq, &rotateResp))
|
|
|
|
for i := 0; i < 3; i++ {
|
|
req := &structs.VariablesApplyRequest{
|
|
Op: structs.VarOpSet,
|
|
Var: mock.Variable(),
|
|
WriteRequest: structs.WriteRequest{Region: srv.config.Region},
|
|
}
|
|
resp := &structs.VariablesApplyResponse{}
|
|
require.NoError(t, srv.RPC("Variables.Apply", req, resp))
|
|
}
|
|
|
|
rotateReq.Full = true
|
|
require.NoError(t, srv.RPC("Keyring.Rotate", rotateReq, &rotateResp))
|
|
newKeyID := rotateResp.Key.KeyID
|
|
|
|
require.Eventually(t, func() bool {
|
|
ws := memdb.NewWatchSet()
|
|
iter, err := store.Variables(ws)
|
|
require.NoError(t, err)
|
|
for {
|
|
raw := iter.Next()
|
|
if raw == nil {
|
|
break
|
|
}
|
|
variable := raw.(*structs.VariableEncrypted)
|
|
if variable.KeyID != newKeyID {
|
|
return false
|
|
}
|
|
}
|
|
return true
|
|
}, time.Second*5, 100*time.Millisecond,
|
|
"variable rekey should be complete")
|
|
|
|
}
|
|
|
|
func TestCoreScheduler_FailLoop(t *testing.T) {
|
|
ci.Parallel(t)
|
|
|
|
srv, cleanupSrv := TestServer(t, func(c *Config) {
|
|
c.NumSchedulers = 0 // Prevent automatic dequeue
|
|
c.EvalDeliveryLimit = 2
|
|
c.EvalFailedFollowupBaselineDelay = time.Duration(50 * time.Millisecond)
|
|
c.EvalFailedFollowupDelayRange = time.Duration(1 * time.Millisecond)
|
|
})
|
|
defer cleanupSrv()
|
|
codec := rpcClient(t, srv)
|
|
sched := []string{structs.JobTypeCore}
|
|
|
|
testutil.WaitForResult(func() (bool, error) {
|
|
return srv.evalBroker.Enabled(), nil
|
|
}, func(err error) {
|
|
t.Fatalf("should enable eval broker")
|
|
})
|
|
|
|
// Enqueue a core job eval that can never succeed because it was enqueued
|
|
// by another leader that's now gone
|
|
expected := srv.coreJobEval(structs.CoreJobCSIPluginGC, 100)
|
|
expected.LeaderACL = "nonsense"
|
|
srv.evalBroker.Enqueue(expected)
|
|
|
|
nack := func(evalID, token string) error {
|
|
req := &structs.EvalAckRequest{
|
|
EvalID: evalID,
|
|
Token: token,
|
|
WriteRequest: structs.WriteRequest{Region: "global"},
|
|
}
|
|
var resp structs.GenericResponse
|
|
return msgpackrpc.CallWithCodec(codec, "Eval.Nack", req, &resp)
|
|
}
|
|
|
|
out, token, err := srv.evalBroker.Dequeue(sched, time.Second*5)
|
|
require.NoError(t, err)
|
|
require.NotNil(t, out)
|
|
require.Equal(t, expected, out)
|
|
|
|
// first fail
|
|
require.NoError(t, nack(out.ID, token))
|
|
|
|
out, token, err = srv.evalBroker.Dequeue(sched, time.Second*5)
|
|
require.NoError(t, err)
|
|
require.NotNil(t, out)
|
|
require.Equal(t, expected, out)
|
|
|
|
// second fail, should not result in failed-follow-up
|
|
require.NoError(t, nack(out.ID, token))
|
|
|
|
out, token, err = srv.evalBroker.Dequeue(sched, time.Second*5)
|
|
require.NoError(t, err)
|
|
if out != nil {
|
|
t.Fatalf(
|
|
"failed core jobs should not result in follow-up. TriggeredBy: %v",
|
|
out.TriggeredBy)
|
|
}
|
|
}
|
|
|
|
func TestCoreScheduler_ExpiredACLTokenGC(t *testing.T) {
|
|
ci.Parallel(t)
|
|
|
|
testServer, rootACLToken, testServerShutdown := TestACLServer(t, func(c *Config) {
|
|
c.NumSchedulers = 0
|
|
})
|
|
defer testServerShutdown()
|
|
testutil.WaitForLeader(t, testServer.RPC)
|
|
|
|
now := time.Now().UTC()
|
|
|
|
// Craft some specific local and global tokens. For each type, one is
|
|
// expired, one is not.
|
|
expiredGlobal := mock.ACLToken()
|
|
expiredGlobal.Global = true
|
|
expiredGlobal.ExpirationTime = pointer.Of(now.Add(-2 * time.Hour))
|
|
|
|
unexpiredGlobal := mock.ACLToken()
|
|
unexpiredGlobal.Global = true
|
|
unexpiredGlobal.ExpirationTime = pointer.Of(now.Add(2 * time.Hour))
|
|
|
|
expiredLocal := mock.ACLToken()
|
|
expiredLocal.ExpirationTime = pointer.Of(now.Add(-2 * time.Hour))
|
|
|
|
unexpiredLocal := mock.ACLToken()
|
|
unexpiredLocal.ExpirationTime = pointer.Of(now.Add(2 * time.Hour))
|
|
|
|
// Upsert these into state.
|
|
err := testServer.State().UpsertACLTokens(structs.MsgTypeTestSetup, 10, []*structs.ACLToken{
|
|
expiredGlobal, unexpiredGlobal, expiredLocal, unexpiredLocal,
|
|
})
|
|
require.NoError(t, err)
|
|
|
|
// Overwrite the timetable. The existing timetable has an entry due to the
|
|
// ACL bootstrapping which makes witnessing a new index at a timestamp in
|
|
// the past impossible.
|
|
tt := NewTimeTable(timeTableGranularity, timeTableLimit)
|
|
tt.Witness(20, time.Now().UTC().Add(-1*testServer.config.ACLTokenExpirationGCThreshold))
|
|
testServer.fsm.timetable = tt
|
|
|
|
// Generate the core scheduler.
|
|
snap, err := testServer.State().Snapshot()
|
|
require.NoError(t, err)
|
|
coreScheduler := NewCoreScheduler(testServer, snap)
|
|
|
|
// Trigger global and local periodic garbage collection runs.
|
|
index, err := testServer.State().LatestIndex()
|
|
require.NoError(t, err)
|
|
index++
|
|
|
|
globalGCEval := testServer.coreJobEval(structs.CoreJobGlobalTokenExpiredGC, index)
|
|
require.NoError(t, coreScheduler.Process(globalGCEval))
|
|
|
|
localGCEval := testServer.coreJobEval(structs.CoreJobLocalTokenExpiredGC, index)
|
|
require.NoError(t, coreScheduler.Process(localGCEval))
|
|
|
|
// Ensure the ACL tokens stored within state are as expected.
|
|
iter, err := testServer.State().ACLTokens(nil, state.SortDefault)
|
|
require.NoError(t, err)
|
|
|
|
var tokens []*structs.ACLToken
|
|
for raw := iter.Next(); raw != nil; raw = iter.Next() {
|
|
tokens = append(tokens, raw.(*structs.ACLToken))
|
|
}
|
|
require.ElementsMatch(t, []*structs.ACLToken{rootACLToken, unexpiredGlobal, unexpiredLocal}, tokens)
|
|
}
|
|
|
|
func TestCoreScheduler_ExpiredACLTokenGC_Force(t *testing.T) {
|
|
ci.Parallel(t)
|
|
|
|
testServer, rootACLToken, testServerShutdown := TestACLServer(t, func(c *Config) {
|
|
c.NumSchedulers = 0
|
|
})
|
|
defer testServerShutdown()
|
|
testutil.WaitForLeader(t, testServer.RPC)
|
|
|
|
// This time is the threshold for all expiry calls to be based on. All
|
|
// tokens with expiry can use this as their base and use Add().
|
|
expiryTimeThreshold := time.Now().UTC()
|
|
|
|
// Track expired and non-expired tokens for local and global tokens in
|
|
// separate arrays, so we have a clear way to test state.
|
|
var expiredGlobalTokens, nonExpiredGlobalTokens, expiredLocalTokens, nonExpiredLocalTokens []*structs.ACLToken
|
|
|
|
// Add the root ACL token to the appropriate array. This will be returned
|
|
// from state so must be accounted for and tested.
|
|
nonExpiredGlobalTokens = append(nonExpiredGlobalTokens, rootACLToken)
|
|
|
|
// Generate and upsert a number of mixed expired, non-expired global
|
|
// tokens.
|
|
for i := 0; i < 20; i++ {
|
|
mockedToken := mock.ACLToken()
|
|
mockedToken.Global = true
|
|
if i%2 == 0 {
|
|
expiredGlobalTokens = append(expiredGlobalTokens, mockedToken)
|
|
mockedToken.ExpirationTime = pointer.Of(expiryTimeThreshold.Add(-24 * time.Hour))
|
|
} else {
|
|
nonExpiredGlobalTokens = append(nonExpiredGlobalTokens, mockedToken)
|
|
mockedToken.ExpirationTime = pointer.Of(expiryTimeThreshold.Add(24 * time.Hour))
|
|
}
|
|
}
|
|
|
|
// Generate and upsert a number of mixed expired, non-expired local
|
|
// tokens.
|
|
for i := 0; i < 20; i++ {
|
|
mockedToken := mock.ACLToken()
|
|
mockedToken.Global = false
|
|
if i%2 == 0 {
|
|
expiredLocalTokens = append(expiredLocalTokens, mockedToken)
|
|
mockedToken.ExpirationTime = pointer.Of(expiryTimeThreshold.Add(-24 * time.Hour))
|
|
} else {
|
|
nonExpiredLocalTokens = append(nonExpiredLocalTokens, mockedToken)
|
|
mockedToken.ExpirationTime = pointer.Of(expiryTimeThreshold.Add(24 * time.Hour))
|
|
}
|
|
}
|
|
|
|
allTokens := append(expiredGlobalTokens, nonExpiredGlobalTokens...)
|
|
allTokens = append(allTokens, expiredLocalTokens...)
|
|
allTokens = append(allTokens, nonExpiredLocalTokens...)
|
|
|
|
// Upsert them all.
|
|
err := testServer.State().UpsertACLTokens(structs.MsgTypeTestSetup, 10, allTokens)
|
|
require.NoError(t, err)
|
|
|
|
// This function provides an easy way to get all tokens out of the
|
|
// iterator.
|
|
fromIteratorFunc := func(iter memdb.ResultIterator) []*structs.ACLToken {
|
|
var tokens []*structs.ACLToken
|
|
for raw := iter.Next(); raw != nil; raw = iter.Next() {
|
|
tokens = append(tokens, raw.(*structs.ACLToken))
|
|
}
|
|
return tokens
|
|
}
|
|
|
|
// Check all the tokens are correctly stored within state.
|
|
iter, err := testServer.State().ACLTokens(nil, state.SortDefault)
|
|
require.NoError(t, err)
|
|
|
|
tokens := fromIteratorFunc(iter)
|
|
require.ElementsMatch(t, allTokens, tokens)
|
|
|
|
// Generate the core scheduler and trigger a forced garbage collection
|
|
// which should delete all expired tokens.
|
|
snap, err := testServer.State().Snapshot()
|
|
require.NoError(t, err)
|
|
coreScheduler := NewCoreScheduler(testServer, snap)
|
|
|
|
index, err := testServer.State().LatestIndex()
|
|
require.NoError(t, err)
|
|
index++
|
|
|
|
forceGCEval := testServer.coreJobEval(structs.CoreJobForceGC, index)
|
|
require.NoError(t, coreScheduler.Process(forceGCEval))
|
|
|
|
// List all the remaining ACL tokens to be sure they are as expected.
|
|
iter, err = testServer.State().ACLTokens(nil, state.SortDefault)
|
|
require.NoError(t, err)
|
|
|
|
tokens = fromIteratorFunc(iter)
|
|
require.ElementsMatch(t, append(nonExpiredGlobalTokens, nonExpiredLocalTokens...), tokens)
|
|
}
|