open-nomad/nomad/fsm_test.go
Alex Dadgar 6d8bb3a7bd Duplicate blocked evals cancelling improved
The old logic for cancelling duplicate blocked evaluations by job id had
the issue where the newer evaluation could have additional node classes
that it is (in)eligible for that we would not capture. This could make
it such that cluster state could change such that the job would make
progress but no evaluation was unblocked.
2018-11-07 10:08:23 -08:00

2829 lines
66 KiB
Go

package nomad
import (
"bytes"
"fmt"
"reflect"
"strings"
"testing"
"time"
"github.com/google/go-cmp/cmp"
memdb "github.com/hashicorp/go-memdb"
"github.com/hashicorp/nomad/helper"
"github.com/hashicorp/nomad/helper/testlog"
"github.com/hashicorp/nomad/helper/uuid"
"github.com/hashicorp/nomad/nomad/mock"
"github.com/hashicorp/nomad/nomad/state"
"github.com/hashicorp/nomad/nomad/structs"
"github.com/hashicorp/nomad/testutil"
"github.com/hashicorp/raft"
"github.com/kr/pretty"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)
type MockSink struct {
*bytes.Buffer
cancel bool
}
func (m *MockSink) ID() string {
return "Mock"
}
func (m *MockSink) Cancel() error {
m.cancel = true
return nil
}
func (m *MockSink) Close() error {
return nil
}
func testStateStore(t *testing.T) *state.StateStore {
return state.TestStateStore(t)
}
func testFSM(t *testing.T) *nomadFSM {
broker := testBroker(t, 0)
dispatcher, _ := testPeriodicDispatcher(t)
logger := testlog.HCLogger(t)
fsmConfig := &FSMConfig{
EvalBroker: broker,
Periodic: dispatcher,
Blocked: NewBlockedEvals(broker, logger),
Logger: logger,
Region: "global",
}
fsm, err := NewFSM(fsmConfig)
if err != nil {
t.Fatalf("err: %v", err)
}
if fsm == nil {
t.Fatalf("missing fsm")
}
state.TestInitState(t, fsm.state)
return fsm
}
func makeLog(buf []byte) *raft.Log {
return &raft.Log{
Index: 1,
Term: 1,
Type: raft.LogCommand,
Data: buf,
}
}
func TestFSM_UpsertNodeEvents(t *testing.T) {
t.Parallel()
require := require.New(t)
fsm := testFSM(t)
state := fsm.State()
node := mock.Node()
err := state.UpsertNode(1000, node)
if err != nil {
t.Fatalf("err: %v", err)
}
nodeEvent := &structs.NodeEvent{
Message: "Heartbeating failed",
Subsystem: "Heartbeat",
Timestamp: time.Now(),
}
nodeEvents := []*structs.NodeEvent{nodeEvent}
allEvents := map[string][]*structs.NodeEvent{node.ID: nodeEvents}
req := structs.EmitNodeEventsRequest{
NodeEvents: allEvents,
WriteRequest: structs.WriteRequest{Region: "global"},
}
buf, err := structs.Encode(structs.UpsertNodeEventsType, req)
require.Nil(err)
// the response in this case will be an error
resp := fsm.Apply(makeLog(buf))
require.Nil(resp)
ws := memdb.NewWatchSet()
out, err := state.NodeByID(ws, node.ID)
require.Nil(err)
require.Equal(2, len(out.Events))
first := out.Events[1]
require.Equal(uint64(1), first.CreateIndex)
require.Equal("Heartbeating failed", first.Message)
}
func TestFSM_UpsertNode(t *testing.T) {
t.Parallel()
fsm := testFSM(t)
fsm.blockedEvals.SetEnabled(true)
node := mock.Node()
// Mark an eval as blocked.
eval := mock.Eval()
eval.ClassEligibility = map[string]bool{node.ComputedClass: true}
fsm.blockedEvals.Block(eval)
req := structs.NodeRegisterRequest{
Node: node,
}
buf, err := structs.Encode(structs.NodeRegisterRequestType, req)
if err != nil {
t.Fatalf("err: %v", err)
}
resp := fsm.Apply(makeLog(buf))
if resp != nil {
t.Fatalf("resp: %v", resp)
}
// Verify we are registered
ws := memdb.NewWatchSet()
n, err := fsm.State().NodeByID(ws, req.Node.ID)
if err != nil {
t.Fatalf("err: %v", err)
}
if n == nil {
t.Fatalf("not found!")
}
if n.CreateIndex != 1 {
t.Fatalf("bad index: %d", node.CreateIndex)
}
tt := fsm.TimeTable()
index := tt.NearestIndex(time.Now().UTC())
if index != 1 {
t.Fatalf("bad: %d", index)
}
// Verify the eval was unblocked.
testutil.WaitForResult(func() (bool, error) {
bStats := fsm.blockedEvals.Stats()
if bStats.TotalBlocked != 0 {
return false, fmt.Errorf("bad: %#v", bStats)
}
return true, nil
}, func(err error) {
t.Fatalf("err: %s", err)
})
}
func TestFSM_UpsertNode_Canonicalize(t *testing.T) {
t.Parallel()
require := require.New(t)
fsm := testFSM(t)
fsm.blockedEvals.SetEnabled(true)
// Setup a node without eligiblity
node := mock.Node()
node.SchedulingEligibility = ""
req := structs.NodeRegisterRequest{
Node: node,
}
buf, err := structs.Encode(structs.NodeRegisterRequestType, req)
require.Nil(err)
resp := fsm.Apply(makeLog(buf))
require.Nil(resp)
// Verify we are registered
ws := memdb.NewWatchSet()
n, err := fsm.State().NodeByID(ws, req.Node.ID)
require.Nil(err)
require.NotNil(n)
require.EqualValues(1, n.CreateIndex)
require.Equal(structs.NodeSchedulingEligible, n.SchedulingEligibility)
}
func TestFSM_DeregisterNode(t *testing.T) {
t.Parallel()
fsm := testFSM(t)
node := mock.Node()
req := structs.NodeRegisterRequest{
Node: node,
}
buf, err := structs.Encode(structs.NodeRegisterRequestType, req)
if err != nil {
t.Fatalf("err: %v", err)
}
resp := fsm.Apply(makeLog(buf))
if resp != nil {
t.Fatalf("resp: %v", resp)
}
req2 := structs.NodeDeregisterRequest{
NodeID: node.ID,
}
buf, err = structs.Encode(structs.NodeDeregisterRequestType, req2)
if err != nil {
t.Fatalf("err: %v", err)
}
resp = fsm.Apply(makeLog(buf))
if resp != nil {
t.Fatalf("resp: %v", resp)
}
// Verify we are NOT registered
ws := memdb.NewWatchSet()
node, err = fsm.State().NodeByID(ws, req.Node.ID)
if err != nil {
t.Fatalf("err: %v", err)
}
if node != nil {
t.Fatalf("node found!")
}
}
func TestFSM_UpdateNodeStatus(t *testing.T) {
t.Parallel()
require := require.New(t)
fsm := testFSM(t)
fsm.blockedEvals.SetEnabled(true)
node := mock.Node()
req := structs.NodeRegisterRequest{
Node: node,
}
buf, err := structs.Encode(structs.NodeRegisterRequestType, req)
require.NoError(err)
resp := fsm.Apply(makeLog(buf))
require.Nil(resp)
// Mark an eval as blocked.
eval := mock.Eval()
eval.ClassEligibility = map[string]bool{node.ComputedClass: true}
fsm.blockedEvals.Block(eval)
event := &structs.NodeEvent{
Message: "Node ready foo",
Subsystem: structs.NodeEventSubsystemCluster,
Timestamp: time.Now(),
}
req2 := structs.NodeUpdateStatusRequest{
NodeID: node.ID,
Status: structs.NodeStatusReady,
NodeEvent: event,
}
buf, err = structs.Encode(structs.NodeUpdateStatusRequestType, req2)
require.NoError(err)
resp = fsm.Apply(makeLog(buf))
require.Nil(resp)
// Verify the status is ready.
ws := memdb.NewWatchSet()
node, err = fsm.State().NodeByID(ws, req.Node.ID)
require.NoError(err)
require.Equal(structs.NodeStatusReady, node.Status)
require.Len(node.Events, 2)
require.Equal(event.Message, node.Events[1].Message)
// Verify the eval was unblocked.
testutil.WaitForResult(func() (bool, error) {
bStats := fsm.blockedEvals.Stats()
if bStats.TotalBlocked != 0 {
return false, fmt.Errorf("bad: %#v", bStats)
}
return true, nil
}, func(err error) {
t.Fatalf("err: %s", err)
})
}
func TestFSM_BatchUpdateNodeDrain(t *testing.T) {
t.Parallel()
require := require.New(t)
fsm := testFSM(t)
node := mock.Node()
req := structs.NodeRegisterRequest{
Node: node,
}
buf, err := structs.Encode(structs.NodeRegisterRequestType, req)
require.Nil(err)
resp := fsm.Apply(makeLog(buf))
require.Nil(resp)
strategy := &structs.DrainStrategy{
DrainSpec: structs.DrainSpec{
Deadline: 10 * time.Second,
},
}
event := &structs.NodeEvent{
Message: "Drain strategy enabled",
Subsystem: structs.NodeEventSubsystemDrain,
Timestamp: time.Now(),
}
req2 := structs.BatchNodeUpdateDrainRequest{
Updates: map[string]*structs.DrainUpdate{
node.ID: {
DrainStrategy: strategy,
},
},
NodeEvents: map[string]*structs.NodeEvent{
node.ID: event,
},
}
buf, err = structs.Encode(structs.BatchNodeUpdateDrainRequestType, req2)
require.Nil(err)
resp = fsm.Apply(makeLog(buf))
require.Nil(resp)
// Verify drain is set
ws := memdb.NewWatchSet()
node, err = fsm.State().NodeByID(ws, req.Node.ID)
require.Nil(err)
require.True(node.Drain)
require.Equal(node.DrainStrategy, strategy)
require.Len(node.Events, 2)
}
func TestFSM_UpdateNodeDrain(t *testing.T) {
t.Parallel()
require := require.New(t)
fsm := testFSM(t)
node := mock.Node()
req := structs.NodeRegisterRequest{
Node: node,
}
buf, err := structs.Encode(structs.NodeRegisterRequestType, req)
require.Nil(err)
resp := fsm.Apply(makeLog(buf))
require.Nil(resp)
strategy := &structs.DrainStrategy{
DrainSpec: structs.DrainSpec{
Deadline: 10 * time.Second,
},
}
req2 := structs.NodeUpdateDrainRequest{
NodeID: node.ID,
DrainStrategy: strategy,
NodeEvent: &structs.NodeEvent{
Message: "Drain strategy enabled",
Subsystem: structs.NodeEventSubsystemDrain,
Timestamp: time.Now(),
},
}
buf, err = structs.Encode(structs.NodeUpdateDrainRequestType, req2)
require.Nil(err)
resp = fsm.Apply(makeLog(buf))
require.Nil(resp)
// Verify we are NOT registered
ws := memdb.NewWatchSet()
node, err = fsm.State().NodeByID(ws, req.Node.ID)
require.Nil(err)
require.True(node.Drain)
require.Equal(node.DrainStrategy, strategy)
require.Len(node.Events, 2)
}
func TestFSM_UpdateNodeDrain_Pre08_Compatibility(t *testing.T) {
t.Parallel()
require := require.New(t)
fsm := testFSM(t)
// Force a node into the state store without eligiblity
node := mock.Node()
node.SchedulingEligibility = ""
require.Nil(fsm.State().UpsertNode(1, node))
// Do an old style drain
req := structs.NodeUpdateDrainRequest{
NodeID: node.ID,
Drain: true,
}
buf, err := structs.Encode(structs.NodeUpdateDrainRequestType, req)
require.Nil(err)
resp := fsm.Apply(makeLog(buf))
require.Nil(resp)
// Verify we have upgraded to a force drain
ws := memdb.NewWatchSet()
node, err = fsm.State().NodeByID(ws, req.NodeID)
require.Nil(err)
require.True(node.Drain)
expected := &structs.DrainStrategy{
DrainSpec: structs.DrainSpec{
Deadline: -1 * time.Second,
},
}
require.Equal(expected, node.DrainStrategy)
}
func TestFSM_UpdateNodeEligibility(t *testing.T) {
t.Parallel()
require := require.New(t)
fsm := testFSM(t)
node := mock.Node()
req := structs.NodeRegisterRequest{
Node: node,
}
buf, err := structs.Encode(structs.NodeRegisterRequestType, req)
require.Nil(err)
resp := fsm.Apply(makeLog(buf))
require.Nil(resp)
event := &structs.NodeEvent{
Message: "Node marked as ineligible",
Subsystem: structs.NodeEventSubsystemCluster,
Timestamp: time.Now(),
}
// Set the eligibility
req2 := structs.NodeUpdateEligibilityRequest{
NodeID: node.ID,
Eligibility: structs.NodeSchedulingIneligible,
NodeEvent: event,
}
buf, err = structs.Encode(structs.NodeUpdateEligibilityRequestType, req2)
require.Nil(err)
resp = fsm.Apply(makeLog(buf))
require.Nil(resp)
// Lookup the node and check
node, err = fsm.State().NodeByID(nil, req.Node.ID)
require.Nil(err)
require.Equal(node.SchedulingEligibility, structs.NodeSchedulingIneligible)
require.Len(node.Events, 2)
require.Equal(event.Message, node.Events[1].Message)
// Update the drain
strategy := &structs.DrainStrategy{
DrainSpec: structs.DrainSpec{
Deadline: 10 * time.Second,
},
}
req3 := structs.NodeUpdateDrainRequest{
NodeID: node.ID,
DrainStrategy: strategy,
}
buf, err = structs.Encode(structs.NodeUpdateDrainRequestType, req3)
require.Nil(err)
resp = fsm.Apply(makeLog(buf))
require.Nil(resp)
// Try forcing eligibility
req4 := structs.NodeUpdateEligibilityRequest{
NodeID: node.ID,
Eligibility: structs.NodeSchedulingEligible,
}
buf, err = structs.Encode(structs.NodeUpdateEligibilityRequestType, req4)
require.Nil(err)
resp = fsm.Apply(makeLog(buf))
require.NotNil(resp)
err, ok := resp.(error)
require.True(ok)
require.Contains(err.Error(), "draining")
}
func TestFSM_UpdateNodeEligibility_Unblock(t *testing.T) {
t.Parallel()
require := require.New(t)
fsm := testFSM(t)
node := mock.Node()
req := structs.NodeRegisterRequest{
Node: node,
}
buf, err := structs.Encode(structs.NodeRegisterRequestType, req)
require.Nil(err)
resp := fsm.Apply(makeLog(buf))
require.Nil(resp)
// Set the eligibility
req2 := structs.NodeUpdateEligibilityRequest{
NodeID: node.ID,
Eligibility: structs.NodeSchedulingIneligible,
}
buf, err = structs.Encode(structs.NodeUpdateEligibilityRequestType, req2)
require.Nil(err)
resp = fsm.Apply(makeLog(buf))
require.Nil(resp)
// Mark an eval as blocked.
eval := mock.Eval()
eval.ClassEligibility = map[string]bool{node.ComputedClass: true}
fsm.blockedEvals.Block(eval)
// Set eligible
req4 := structs.NodeUpdateEligibilityRequest{
NodeID: node.ID,
Eligibility: structs.NodeSchedulingEligible,
}
buf, err = structs.Encode(structs.NodeUpdateEligibilityRequestType, req4)
require.Nil(err)
resp = fsm.Apply(makeLog(buf))
require.Nil(resp)
// Verify the eval was unblocked.
testutil.WaitForResult(func() (bool, error) {
bStats := fsm.blockedEvals.Stats()
if bStats.TotalBlocked != 0 {
return false, fmt.Errorf("bad: %#v", bStats)
}
return true, nil
}, func(err error) {
t.Fatalf("err: %s", err)
})
}
func TestFSM_RegisterJob(t *testing.T) {
t.Parallel()
fsm := testFSM(t)
job := mock.PeriodicJob()
req := structs.JobRegisterRequest{
Job: job,
WriteRequest: structs.WriteRequest{
Namespace: job.Namespace,
},
}
buf, err := structs.Encode(structs.JobRegisterRequestType, req)
if err != nil {
t.Fatalf("err: %v", err)
}
resp := fsm.Apply(makeLog(buf))
if resp != nil {
t.Fatalf("resp: %v", resp)
}
// Verify we are registered
ws := memdb.NewWatchSet()
jobOut, err := fsm.State().JobByID(ws, req.Namespace, req.Job.ID)
if err != nil {
t.Fatalf("err: %v", err)
}
if jobOut == nil {
t.Fatalf("not found!")
}
if jobOut.CreateIndex != 1 {
t.Fatalf("bad index: %d", jobOut.CreateIndex)
}
// Verify it was added to the periodic runner.
tuple := structs.NamespacedID{
ID: job.ID,
Namespace: job.Namespace,
}
if _, ok := fsm.periodicDispatcher.tracked[tuple]; !ok {
t.Fatal("job not added to periodic runner")
}
// Verify the launch time was tracked.
launchOut, err := fsm.State().PeriodicLaunchByID(ws, req.Namespace, req.Job.ID)
if err != nil {
t.Fatalf("err: %v", err)
}
if launchOut == nil {
t.Fatalf("not found!")
}
if launchOut.Launch.IsZero() {
t.Fatalf("bad launch time: %v", launchOut.Launch)
}
}
func TestFSM_RegisterPeriodicJob_NonLeader(t *testing.T) {
t.Parallel()
fsm := testFSM(t)
// Disable the dispatcher
fsm.periodicDispatcher.SetEnabled(false)
job := mock.PeriodicJob()
req := structs.JobRegisterRequest{
Job: job,
WriteRequest: structs.WriteRequest{
Namespace: job.Namespace,
},
}
buf, err := structs.Encode(structs.JobRegisterRequestType, req)
if err != nil {
t.Fatalf("err: %v", err)
}
resp := fsm.Apply(makeLog(buf))
if resp != nil {
t.Fatalf("resp: %v", resp)
}
// Verify we are registered
ws := memdb.NewWatchSet()
jobOut, err := fsm.State().JobByID(ws, req.Namespace, req.Job.ID)
if err != nil {
t.Fatalf("err: %v", err)
}
if jobOut == nil {
t.Fatalf("not found!")
}
if jobOut.CreateIndex != 1 {
t.Fatalf("bad index: %d", jobOut.CreateIndex)
}
// Verify it wasn't added to the periodic runner.
tuple := structs.NamespacedID{
ID: job.ID,
Namespace: job.Namespace,
}
if _, ok := fsm.periodicDispatcher.tracked[tuple]; ok {
t.Fatal("job added to periodic runner")
}
// Verify the launch time was tracked.
launchOut, err := fsm.State().PeriodicLaunchByID(ws, req.Namespace, req.Job.ID)
if err != nil {
t.Fatalf("err: %v", err)
}
if launchOut == nil {
t.Fatalf("not found!")
}
if launchOut.Launch.IsZero() {
t.Fatalf("bad launch time: %v", launchOut.Launch)
}
}
func TestFSM_RegisterJob_BadNamespace(t *testing.T) {
t.Parallel()
fsm := testFSM(t)
job := mock.Job()
job.Namespace = "foo"
req := structs.JobRegisterRequest{
Job: job,
WriteRequest: structs.WriteRequest{
Namespace: job.Namespace,
},
}
buf, err := structs.Encode(structs.JobRegisterRequestType, req)
if err != nil {
t.Fatalf("err: %v", err)
}
resp := fsm.Apply(makeLog(buf))
if resp == nil {
t.Fatalf("no resp: %v", resp)
}
err, ok := resp.(error)
if !ok {
t.Fatalf("resp not of error type: %T %v", resp, resp)
}
if !strings.Contains(err.Error(), "nonexistent namespace") {
t.Fatalf("bad error: %v", err)
}
// Verify we are not registered
ws := memdb.NewWatchSet()
jobOut, err := fsm.State().JobByID(ws, req.Namespace, req.Job.ID)
if err != nil {
t.Fatalf("err: %v", err)
}
if jobOut != nil {
t.Fatalf("job found!")
}
}
func TestFSM_DeregisterJob_Purge(t *testing.T) {
t.Parallel()
fsm := testFSM(t)
job := mock.PeriodicJob()
req := structs.JobRegisterRequest{
Job: job,
WriteRequest: structs.WriteRequest{
Namespace: job.Namespace,
},
}
buf, err := structs.Encode(structs.JobRegisterRequestType, req)
if err != nil {
t.Fatalf("err: %v", err)
}
resp := fsm.Apply(makeLog(buf))
if resp != nil {
t.Fatalf("resp: %v", resp)
}
req2 := structs.JobDeregisterRequest{
JobID: job.ID,
Purge: true,
WriteRequest: structs.WriteRequest{
Namespace: job.Namespace,
},
}
buf, err = structs.Encode(structs.JobDeregisterRequestType, req2)
if err != nil {
t.Fatalf("err: %v", err)
}
resp = fsm.Apply(makeLog(buf))
if resp != nil {
t.Fatalf("resp: %v", resp)
}
// Verify we are NOT registered
ws := memdb.NewWatchSet()
jobOut, err := fsm.State().JobByID(ws, req.Namespace, req.Job.ID)
if err != nil {
t.Fatalf("err: %v", err)
}
if jobOut != nil {
t.Fatalf("job found!")
}
// Verify it was removed from the periodic runner.
tuple := structs.NamespacedID{
ID: job.ID,
Namespace: job.Namespace,
}
if _, ok := fsm.periodicDispatcher.tracked[tuple]; ok {
t.Fatal("job not removed from periodic runner")
}
// Verify it was removed from the periodic launch table.
launchOut, err := fsm.State().PeriodicLaunchByID(ws, req.Namespace, req.Job.ID)
if err != nil {
t.Fatalf("err: %v", err)
}
if launchOut != nil {
t.Fatalf("launch found!")
}
}
func TestFSM_DeregisterJob_NoPurge(t *testing.T) {
t.Parallel()
fsm := testFSM(t)
job := mock.PeriodicJob()
req := structs.JobRegisterRequest{
Job: job,
WriteRequest: structs.WriteRequest{
Namespace: job.Namespace,
},
}
buf, err := structs.Encode(structs.JobRegisterRequestType, req)
if err != nil {
t.Fatalf("err: %v", err)
}
resp := fsm.Apply(makeLog(buf))
if resp != nil {
t.Fatalf("resp: %v", resp)
}
req2 := structs.JobDeregisterRequest{
JobID: job.ID,
Purge: false,
WriteRequest: structs.WriteRequest{
Namespace: job.Namespace,
},
}
buf, err = structs.Encode(structs.JobDeregisterRequestType, req2)
if err != nil {
t.Fatalf("err: %v", err)
}
resp = fsm.Apply(makeLog(buf))
if resp != nil {
t.Fatalf("resp: %v", resp)
}
// Verify we are NOT registered
ws := memdb.NewWatchSet()
jobOut, err := fsm.State().JobByID(ws, req.Namespace, req.Job.ID)
if err != nil {
t.Fatalf("err: %v", err)
}
if jobOut == nil {
t.Fatalf("job not found!")
}
if !jobOut.Stop {
t.Fatalf("job not stopped found!")
}
// Verify it was removed from the periodic runner.
tuple := structs.NamespacedID{
ID: job.ID,
Namespace: job.Namespace,
}
if _, ok := fsm.periodicDispatcher.tracked[tuple]; ok {
t.Fatal("job not removed from periodic runner")
}
// Verify it was removed from the periodic launch table.
launchOut, err := fsm.State().PeriodicLaunchByID(ws, req.Namespace, req.Job.ID)
if err != nil {
t.Fatalf("err: %v", err)
}
if launchOut == nil {
t.Fatalf("launch not found!")
}
}
func TestFSM_BatchDeregisterJob(t *testing.T) {
t.Parallel()
require := require.New(t)
fsm := testFSM(t)
job := mock.PeriodicJob()
req := structs.JobRegisterRequest{
Job: job,
WriteRequest: structs.WriteRequest{
Namespace: job.Namespace,
},
}
buf, err := structs.Encode(structs.JobRegisterRequestType, req)
require.Nil(err)
resp := fsm.Apply(makeLog(buf))
require.Nil(resp)
job2 := mock.Job()
req2 := structs.JobRegisterRequest{
Job: job2,
WriteRequest: structs.WriteRequest{
Namespace: job2.Namespace,
},
}
buf, err = structs.Encode(structs.JobRegisterRequestType, req2)
require.Nil(err)
resp = fsm.Apply(makeLog(buf))
require.Nil(resp)
req3 := structs.JobBatchDeregisterRequest{
Jobs: map[structs.NamespacedID]*structs.JobDeregisterOptions{
{
ID: job.ID,
Namespace: job.Namespace,
}: {},
{
ID: job2.ID,
Namespace: job2.Namespace,
}: {
Purge: true,
},
},
WriteRequest: structs.WriteRequest{
Namespace: job.Namespace,
},
}
buf, err = structs.Encode(structs.JobBatchDeregisterRequestType, req3)
require.Nil(err)
resp = fsm.Apply(makeLog(buf))
require.Nil(resp)
// Verify we are NOT registered
ws := memdb.NewWatchSet()
jobOut, err := fsm.State().JobByID(ws, req.Namespace, req.Job.ID)
require.Nil(err)
require.NotNil(jobOut)
require.True(jobOut.Stop)
// Verify it was removed from the periodic runner.
tuple := structs.NamespacedID{
ID: job.ID,
Namespace: job.Namespace,
}
require.NotContains(fsm.periodicDispatcher.tracked, tuple)
// Verify it was not removed from the periodic launch table.
launchOut, err := fsm.State().PeriodicLaunchByID(ws, job.Namespace, job.ID)
require.Nil(err)
require.NotNil(launchOut)
// Verify the other jbo was purged
jobOut2, err := fsm.State().JobByID(ws, job2.Namespace, job2.ID)
require.Nil(err)
require.Nil(jobOut2)
}
func TestFSM_UpdateEval(t *testing.T) {
t.Parallel()
fsm := testFSM(t)
fsm.evalBroker.SetEnabled(true)
req := structs.EvalUpdateRequest{
Evals: []*structs.Evaluation{mock.Eval()},
}
buf, err := structs.Encode(structs.EvalUpdateRequestType, req)
if err != nil {
t.Fatalf("err: %v", err)
}
resp := fsm.Apply(makeLog(buf))
if resp != nil {
t.Fatalf("resp: %v", resp)
}
// Verify we are registered
ws := memdb.NewWatchSet()
eval, err := fsm.State().EvalByID(ws, req.Evals[0].ID)
if err != nil {
t.Fatalf("err: %v", err)
}
if eval == nil {
t.Fatalf("not found!")
}
if eval.CreateIndex != 1 {
t.Fatalf("bad index: %d", eval.CreateIndex)
}
// Verify enqueued
stats := fsm.evalBroker.Stats()
if stats.TotalReady != 1 {
t.Fatalf("bad: %#v %#v", stats, eval)
}
}
func TestFSM_UpdateEval_Blocked(t *testing.T) {
t.Parallel()
fsm := testFSM(t)
fsm.evalBroker.SetEnabled(true)
fsm.blockedEvals.SetEnabled(true)
// Create a blocked eval.
eval := mock.Eval()
eval.Status = structs.EvalStatusBlocked
req := structs.EvalUpdateRequest{
Evals: []*structs.Evaluation{eval},
}
buf, err := structs.Encode(structs.EvalUpdateRequestType, req)
if err != nil {
t.Fatalf("err: %v", err)
}
resp := fsm.Apply(makeLog(buf))
if resp != nil {
t.Fatalf("resp: %v", resp)
}
// Verify we are registered
ws := memdb.NewWatchSet()
out, err := fsm.State().EvalByID(ws, eval.ID)
if err != nil {
t.Fatalf("err: %v", err)
}
if out == nil {
t.Fatalf("not found!")
}
if out.CreateIndex != 1 {
t.Fatalf("bad index: %d", out.CreateIndex)
}
// Verify the eval wasn't enqueued
stats := fsm.evalBroker.Stats()
if stats.TotalReady != 0 {
t.Fatalf("bad: %#v %#v", stats, out)
}
// Verify the eval was added to the blocked tracker.
bStats := fsm.blockedEvals.Stats()
if bStats.TotalBlocked != 1 {
t.Fatalf("bad: %#v %#v", bStats, out)
}
}
func TestFSM_UpdateEval_Untrack(t *testing.T) {
t.Parallel()
fsm := testFSM(t)
fsm.evalBroker.SetEnabled(true)
fsm.blockedEvals.SetEnabled(true)
// Mark an eval as blocked.
bEval := mock.Eval()
bEval.ClassEligibility = map[string]bool{"v1:123": true}
fsm.blockedEvals.Block(bEval)
// Create a successful eval for the same job
eval := mock.Eval()
eval.JobID = bEval.JobID
eval.Status = structs.EvalStatusComplete
req := structs.EvalUpdateRequest{
Evals: []*structs.Evaluation{eval},
}
buf, err := structs.Encode(structs.EvalUpdateRequestType, req)
if err != nil {
t.Fatalf("err: %v", err)
}
resp := fsm.Apply(makeLog(buf))
if resp != nil {
t.Fatalf("resp: %v", resp)
}
// Verify we are registered
ws := memdb.NewWatchSet()
out, err := fsm.State().EvalByID(ws, eval.ID)
if err != nil {
t.Fatalf("err: %v", err)
}
if out == nil {
t.Fatalf("not found!")
}
if out.CreateIndex != 1 {
t.Fatalf("bad index: %d", out.CreateIndex)
}
// Verify the eval wasn't enqueued
stats := fsm.evalBroker.Stats()
if stats.TotalReady != 0 {
t.Fatalf("bad: %#v %#v", stats, out)
}
// Verify the eval was untracked in the blocked tracker.
bStats := fsm.blockedEvals.Stats()
if bStats.TotalBlocked != 0 {
t.Fatalf("bad: %#v %#v", bStats, out)
}
}
func TestFSM_UpdateEval_NoUntrack(t *testing.T) {
t.Parallel()
fsm := testFSM(t)
fsm.evalBroker.SetEnabled(true)
fsm.blockedEvals.SetEnabled(true)
// Mark an eval as blocked.
bEval := mock.Eval()
bEval.ClassEligibility = map[string]bool{"v1:123": true}
fsm.blockedEvals.Block(bEval)
// Create a successful eval for the same job but with placement failures
eval := mock.Eval()
eval.JobID = bEval.JobID
eval.Status = structs.EvalStatusComplete
eval.FailedTGAllocs = make(map[string]*structs.AllocMetric)
eval.FailedTGAllocs["test"] = new(structs.AllocMetric)
req := structs.EvalUpdateRequest{
Evals: []*structs.Evaluation{eval},
}
buf, err := structs.Encode(structs.EvalUpdateRequestType, req)
if err != nil {
t.Fatalf("err: %v", err)
}
resp := fsm.Apply(makeLog(buf))
if resp != nil {
t.Fatalf("resp: %v", resp)
}
// Verify we are registered
ws := memdb.NewWatchSet()
out, err := fsm.State().EvalByID(ws, eval.ID)
if err != nil {
t.Fatalf("err: %v", err)
}
if out == nil {
t.Fatalf("not found!")
}
if out.CreateIndex != 1 {
t.Fatalf("bad index: %d", out.CreateIndex)
}
// Verify the eval wasn't enqueued
stats := fsm.evalBroker.Stats()
if stats.TotalReady != 0 {
t.Fatalf("bad: %#v %#v", stats, out)
}
// Verify the eval was not untracked in the blocked tracker.
bStats := fsm.blockedEvals.Stats()
if bStats.TotalBlocked != 1 {
t.Fatalf("bad: %#v %#v", bStats, out)
}
}
func TestFSM_DeleteEval(t *testing.T) {
t.Parallel()
fsm := testFSM(t)
eval := mock.Eval()
req := structs.EvalUpdateRequest{
Evals: []*structs.Evaluation{eval},
}
buf, err := structs.Encode(structs.EvalUpdateRequestType, req)
if err != nil {
t.Fatalf("err: %v", err)
}
resp := fsm.Apply(makeLog(buf))
if resp != nil {
t.Fatalf("resp: %v", resp)
}
req2 := structs.EvalDeleteRequest{
Evals: []string{eval.ID},
}
buf, err = structs.Encode(structs.EvalDeleteRequestType, req2)
if err != nil {
t.Fatalf("err: %v", err)
}
resp = fsm.Apply(makeLog(buf))
if resp != nil {
t.Fatalf("resp: %v", resp)
}
// Verify we are NOT registered
ws := memdb.NewWatchSet()
eval, err = fsm.State().EvalByID(ws, req.Evals[0].ID)
if err != nil {
t.Fatalf("err: %v", err)
}
if eval != nil {
t.Fatalf("eval found!")
}
}
func TestFSM_UpsertAllocs(t *testing.T) {
t.Parallel()
fsm := testFSM(t)
alloc := mock.Alloc()
alloc.Resources = &structs.Resources{} // COMPAT(0.11): Remove in 0.11, used to bypass resource creation in state store
fsm.State().UpsertJobSummary(1, mock.JobSummary(alloc.JobID))
req := structs.AllocUpdateRequest{
Alloc: []*structs.Allocation{alloc},
}
buf, err := structs.Encode(structs.AllocUpdateRequestType, req)
if err != nil {
t.Fatalf("err: %v", err)
}
resp := fsm.Apply(makeLog(buf))
if resp != nil {
t.Fatalf("resp: %v", resp)
}
// Verify we are registered
ws := memdb.NewWatchSet()
out, err := fsm.State().AllocByID(ws, alloc.ID)
if err != nil {
t.Fatalf("err: %v", err)
}
alloc.CreateIndex = out.CreateIndex
alloc.ModifyIndex = out.ModifyIndex
alloc.AllocModifyIndex = out.AllocModifyIndex
if !reflect.DeepEqual(alloc, out) {
t.Fatalf("bad: %#v %#v", alloc, out)
}
evictAlloc := new(structs.Allocation)
*evictAlloc = *alloc
evictAlloc.DesiredStatus = structs.AllocDesiredStatusEvict
req2 := structs.AllocUpdateRequest{
Alloc: []*structs.Allocation{evictAlloc},
}
buf, err = structs.Encode(structs.AllocUpdateRequestType, req2)
if err != nil {
t.Fatalf("err: %v", err)
}
resp = fsm.Apply(makeLog(buf))
if resp != nil {
t.Fatalf("resp: %v", resp)
}
// Verify we are evicted
out, err = fsm.State().AllocByID(ws, alloc.ID)
if err != nil {
t.Fatalf("err: %v", err)
}
if out.DesiredStatus != structs.AllocDesiredStatusEvict {
t.Fatalf("alloc found!")
}
}
func TestFSM_UpsertAllocs_SharedJob(t *testing.T) {
t.Parallel()
fsm := testFSM(t)
alloc := mock.Alloc()
alloc.Resources = &structs.Resources{} // COMPAT(0.11): Remove in 0.11, used to bypass resource creation in state store
fsm.State().UpsertJobSummary(1, mock.JobSummary(alloc.JobID))
job := alloc.Job
alloc.Job = nil
req := structs.AllocUpdateRequest{
Job: job,
Alloc: []*structs.Allocation{alloc},
}
buf, err := structs.Encode(structs.AllocUpdateRequestType, req)
if err != nil {
t.Fatalf("err: %v", err)
}
resp := fsm.Apply(makeLog(buf))
if resp != nil {
t.Fatalf("resp: %v", resp)
}
// Verify we are registered
ws := memdb.NewWatchSet()
out, err := fsm.State().AllocByID(ws, alloc.ID)
if err != nil {
t.Fatalf("err: %v", err)
}
alloc.CreateIndex = out.CreateIndex
alloc.ModifyIndex = out.ModifyIndex
alloc.AllocModifyIndex = out.AllocModifyIndex
// Job should be re-attached
alloc.Job = job
require.Equal(t, alloc, out)
// Ensure that the original job is used
evictAlloc := new(structs.Allocation)
*evictAlloc = *alloc
job = mock.Job()
job.Priority = 123
evictAlloc.Job = nil
evictAlloc.DesiredStatus = structs.AllocDesiredStatusEvict
req2 := structs.AllocUpdateRequest{
Job: job,
Alloc: []*structs.Allocation{evictAlloc},
}
buf, err = structs.Encode(structs.AllocUpdateRequestType, req2)
if err != nil {
t.Fatalf("err: %v", err)
}
resp = fsm.Apply(makeLog(buf))
if resp != nil {
t.Fatalf("resp: %v", resp)
}
// Verify we are evicted
out, err = fsm.State().AllocByID(ws, alloc.ID)
if err != nil {
t.Fatalf("err: %v", err)
}
if out.DesiredStatus != structs.AllocDesiredStatusEvict {
t.Fatalf("alloc found!")
}
if out.Job == nil || out.Job.Priority == 123 {
t.Fatalf("bad job")
}
}
// COMPAT(0.11): Remove in 0.11
func TestFSM_UpsertAllocs_StrippedResources(t *testing.T) {
t.Parallel()
fsm := testFSM(t)
alloc := mock.Alloc()
alloc.Resources = &structs.Resources{
CPU: 500,
MemoryMB: 256,
DiskMB: 150,
Networks: []*structs.NetworkResource{
{
Device: "eth0",
IP: "192.168.0.100",
ReservedPorts: []structs.Port{{Label: "admin", Value: 5000}},
MBits: 50,
DynamicPorts: []structs.Port{{Label: "http"}},
},
},
}
alloc.TaskResources = map[string]*structs.Resources{
"web": {
CPU: 500,
MemoryMB: 256,
Networks: []*structs.NetworkResource{
{
Device: "eth0",
IP: "192.168.0.100",
ReservedPorts: []structs.Port{{Label: "admin", Value: 5000}},
MBits: 50,
DynamicPorts: []structs.Port{{Label: "http", Value: 9876}},
},
},
},
}
alloc.SharedResources = &structs.Resources{
DiskMB: 150,
}
// Need to remove mock dynamic port from alloc as it won't be computed
// in this test
alloc.TaskResources["web"].Networks[0].DynamicPorts[0].Value = 0
fsm.State().UpsertJobSummary(1, mock.JobSummary(alloc.JobID))
job := alloc.Job
origResources := alloc.Resources
alloc.Resources = nil
req := structs.AllocUpdateRequest{
Job: job,
Alloc: []*structs.Allocation{alloc},
}
buf, err := structs.Encode(structs.AllocUpdateRequestType, req)
if err != nil {
t.Fatalf("err: %v", err)
}
resp := fsm.Apply(makeLog(buf))
if resp != nil {
t.Fatalf("resp: %v", resp)
}
// Verify we are registered
ws := memdb.NewWatchSet()
out, err := fsm.State().AllocByID(ws, alloc.ID)
if err != nil {
t.Fatalf("err: %v", err)
}
alloc.CreateIndex = out.CreateIndex
alloc.ModifyIndex = out.ModifyIndex
alloc.AllocModifyIndex = out.AllocModifyIndex
// Resources should be recomputed
origResources.DiskMB = alloc.Job.TaskGroups[0].EphemeralDisk.SizeMB
alloc.Resources = origResources
if !reflect.DeepEqual(alloc, out) {
t.Fatalf("not equal: % #v", pretty.Diff(alloc, out))
}
}
func TestFSM_UpdateAllocFromClient_Unblock(t *testing.T) {
t.Parallel()
fsm := testFSM(t)
fsm.blockedEvals.SetEnabled(true)
state := fsm.State()
node := mock.Node()
state.UpsertNode(1, node)
// Mark an eval as blocked.
eval := mock.Eval()
eval.ClassEligibility = map[string]bool{node.ComputedClass: true}
fsm.blockedEvals.Block(eval)
bStats := fsm.blockedEvals.Stats()
if bStats.TotalBlocked != 1 {
t.Fatalf("bad: %#v", bStats)
}
// Create a completed eval
alloc := mock.Alloc()
alloc.NodeID = node.ID
alloc2 := mock.Alloc()
alloc2.NodeID = node.ID
state.UpsertJobSummary(8, mock.JobSummary(alloc.JobID))
state.UpsertJobSummary(9, mock.JobSummary(alloc2.JobID))
state.UpsertAllocs(10, []*structs.Allocation{alloc, alloc2})
clientAlloc := new(structs.Allocation)
*clientAlloc = *alloc
clientAlloc.ClientStatus = structs.AllocClientStatusComplete
update2 := &structs.Allocation{
ID: alloc2.ID,
ClientStatus: structs.AllocClientStatusRunning,
}
req := structs.AllocUpdateRequest{
Alloc: []*structs.Allocation{clientAlloc, update2},
}
buf, err := structs.Encode(structs.AllocClientUpdateRequestType, req)
if err != nil {
t.Fatalf("err: %v", err)
}
resp := fsm.Apply(makeLog(buf))
if resp != nil {
t.Fatalf("resp: %v", resp)
}
// Verify we are updated
ws := memdb.NewWatchSet()
out, err := fsm.State().AllocByID(ws, alloc.ID)
if err != nil {
t.Fatalf("err: %v", err)
}
clientAlloc.CreateIndex = out.CreateIndex
clientAlloc.ModifyIndex = out.ModifyIndex
if !reflect.DeepEqual(clientAlloc, out) {
t.Fatalf("bad: %#v %#v", clientAlloc, out)
}
out, err = fsm.State().AllocByID(ws, alloc2.ID)
if err != nil {
t.Fatalf("err: %v", err)
}
alloc2.CreateIndex = out.CreateIndex
alloc2.ModifyIndex = out.ModifyIndex
alloc2.ClientStatus = structs.AllocClientStatusRunning
alloc2.TaskStates = nil
if !reflect.DeepEqual(alloc2, out) {
t.Fatalf("bad: %#v %#v", alloc2, out)
}
// Verify the eval was unblocked.
testutil.WaitForResult(func() (bool, error) {
bStats = fsm.blockedEvals.Stats()
if bStats.TotalBlocked != 0 {
return false, fmt.Errorf("bad: %#v %#v", bStats, out)
}
return true, nil
}, func(err error) {
t.Fatalf("err: %s", err)
})
}
func TestFSM_UpdateAllocFromClient(t *testing.T) {
t.Parallel()
fsm := testFSM(t)
state := fsm.State()
require := require.New(t)
alloc := mock.Alloc()
state.UpsertJobSummary(9, mock.JobSummary(alloc.JobID))
state.UpsertAllocs(10, []*structs.Allocation{alloc})
clientAlloc := new(structs.Allocation)
*clientAlloc = *alloc
clientAlloc.ClientStatus = structs.AllocClientStatusFailed
eval := mock.Eval()
eval.JobID = alloc.JobID
eval.TriggeredBy = structs.EvalTriggerRetryFailedAlloc
eval.Type = alloc.Job.Type
req := structs.AllocUpdateRequest{
Alloc: []*structs.Allocation{clientAlloc},
Evals: []*structs.Evaluation{eval},
}
buf, err := structs.Encode(structs.AllocClientUpdateRequestType, req)
require.Nil(err)
resp := fsm.Apply(makeLog(buf))
require.Nil(resp)
// Verify we are registered
ws := memdb.NewWatchSet()
out, err := fsm.State().AllocByID(ws, alloc.ID)
require.Nil(err)
clientAlloc.CreateIndex = out.CreateIndex
clientAlloc.ModifyIndex = out.ModifyIndex
require.Equal(clientAlloc, out)
// Verify eval was inserted
ws = memdb.NewWatchSet()
evals, err := fsm.State().EvalsByJob(ws, eval.Namespace, eval.JobID)
require.Nil(err)
require.Equal(1, len(evals))
res := evals[0]
eval.CreateIndex = res.CreateIndex
eval.ModifyIndex = res.ModifyIndex
require.Equal(eval, res)
}
func TestFSM_UpdateAllocDesiredTransition(t *testing.T) {
t.Parallel()
fsm := testFSM(t)
state := fsm.State()
require := require.New(t)
alloc := mock.Alloc()
alloc2 := mock.Alloc()
alloc2.Job = alloc.Job
alloc2.JobID = alloc.JobID
state.UpsertJobSummary(9, mock.JobSummary(alloc.JobID))
state.UpsertAllocs(10, []*structs.Allocation{alloc, alloc2})
t1 := &structs.DesiredTransition{
Migrate: helper.BoolToPtr(true),
}
eval := &structs.Evaluation{
ID: uuid.Generate(),
Namespace: alloc.Namespace,
Priority: alloc.Job.Priority,
Type: alloc.Job.Type,
TriggeredBy: structs.EvalTriggerNodeDrain,
JobID: alloc.Job.ID,
JobModifyIndex: alloc.Job.ModifyIndex,
Status: structs.EvalStatusPending,
}
req := structs.AllocUpdateDesiredTransitionRequest{
Allocs: map[string]*structs.DesiredTransition{
alloc.ID: t1,
alloc2.ID: t1,
},
Evals: []*structs.Evaluation{eval},
}
buf, err := structs.Encode(structs.AllocUpdateDesiredTransitionRequestType, req)
require.Nil(err)
resp := fsm.Apply(makeLog(buf))
require.Nil(resp)
// Verify we are registered
ws := memdb.NewWatchSet()
out1, err := fsm.State().AllocByID(ws, alloc.ID)
require.Nil(err)
out2, err := fsm.State().AllocByID(ws, alloc2.ID)
require.Nil(err)
evalOut, err := fsm.State().EvalByID(ws, eval.ID)
require.Nil(err)
require.NotNil(evalOut)
require.Equal(eval.ID, evalOut.ID)
require.NotNil(out1.DesiredTransition.Migrate)
require.NotNil(out2.DesiredTransition.Migrate)
require.True(*out1.DesiredTransition.Migrate)
require.True(*out2.DesiredTransition.Migrate)
}
func TestFSM_UpsertVaultAccessor(t *testing.T) {
t.Parallel()
fsm := testFSM(t)
fsm.blockedEvals.SetEnabled(true)
va := mock.VaultAccessor()
va2 := mock.VaultAccessor()
req := structs.VaultAccessorsRequest{
Accessors: []*structs.VaultAccessor{va, va2},
}
buf, err := structs.Encode(structs.VaultAccessorRegisterRequestType, req)
if err != nil {
t.Fatalf("err: %v", err)
}
resp := fsm.Apply(makeLog(buf))
if resp != nil {
t.Fatalf("resp: %v", resp)
}
// Verify we are registered
ws := memdb.NewWatchSet()
out1, err := fsm.State().VaultAccessor(ws, va.Accessor)
if err != nil {
t.Fatalf("err: %v", err)
}
if out1 == nil {
t.Fatalf("not found!")
}
if out1.CreateIndex != 1 {
t.Fatalf("bad index: %d", out1.CreateIndex)
}
out2, err := fsm.State().VaultAccessor(ws, va2.Accessor)
if err != nil {
t.Fatalf("err: %v", err)
}
if out2 == nil {
t.Fatalf("not found!")
}
if out1.CreateIndex != 1 {
t.Fatalf("bad index: %d", out2.CreateIndex)
}
tt := fsm.TimeTable()
index := tt.NearestIndex(time.Now().UTC())
if index != 1 {
t.Fatalf("bad: %d", index)
}
}
func TestFSM_DeregisterVaultAccessor(t *testing.T) {
t.Parallel()
fsm := testFSM(t)
fsm.blockedEvals.SetEnabled(true)
va := mock.VaultAccessor()
va2 := mock.VaultAccessor()
accessors := []*structs.VaultAccessor{va, va2}
// Insert the accessors
if err := fsm.State().UpsertVaultAccessor(1000, accessors); err != nil {
t.Fatalf("bad: %v", err)
}
req := structs.VaultAccessorsRequest{
Accessors: accessors,
}
buf, err := structs.Encode(structs.VaultAccessorDeregisterRequestType, req)
if err != nil {
t.Fatalf("err: %v", err)
}
resp := fsm.Apply(makeLog(buf))
if resp != nil {
t.Fatalf("resp: %v", resp)
}
ws := memdb.NewWatchSet()
out1, err := fsm.State().VaultAccessor(ws, va.Accessor)
if err != nil {
t.Fatalf("err: %v", err)
}
if out1 != nil {
t.Fatalf("not deleted!")
}
tt := fsm.TimeTable()
index := tt.NearestIndex(time.Now().UTC())
if index != 1 {
t.Fatalf("bad: %d", index)
}
}
func TestFSM_ApplyPlanResults(t *testing.T) {
t.Parallel()
fsm := testFSM(t)
// Create the request and create a deployment
alloc := mock.Alloc()
alloc.Resources = &structs.Resources{} // COMPAT(0.11): Remove in 0.11, used to bypass resource creation in state store
job := alloc.Job
alloc.Job = nil
d := mock.Deployment()
d.JobID = job.ID
d.JobModifyIndex = job.ModifyIndex
d.JobVersion = job.Version
alloc.DeploymentID = d.ID
eval := mock.Eval()
eval.JobID = job.ID
fsm.State().UpsertEvals(1, []*structs.Evaluation{eval})
fsm.State().UpsertJobSummary(1, mock.JobSummary(alloc.JobID))
req := structs.ApplyPlanResultsRequest{
AllocUpdateRequest: structs.AllocUpdateRequest{
Job: job,
Alloc: []*structs.Allocation{alloc},
},
Deployment: d,
EvalID: eval.ID,
}
buf, err := structs.Encode(structs.ApplyPlanResultsRequestType, req)
if err != nil {
t.Fatalf("err: %v", err)
}
resp := fsm.Apply(makeLog(buf))
if resp != nil {
t.Fatalf("resp: %v", resp)
}
// Verify the allocation is registered
ws := memdb.NewWatchSet()
assert := assert.New(t)
out, err := fsm.State().AllocByID(ws, alloc.ID)
assert.Nil(err)
alloc.CreateIndex = out.CreateIndex
alloc.ModifyIndex = out.ModifyIndex
alloc.AllocModifyIndex = out.AllocModifyIndex
// Job should be re-attached
alloc.Job = job
assert.Equal(alloc, out)
dout, err := fsm.State().DeploymentByID(ws, d.ID)
assert.Nil(err)
tg, ok := dout.TaskGroups[alloc.TaskGroup]
assert.True(ok)
assert.NotNil(tg)
assert.Equal(1, tg.PlacedAllocs)
// Ensure that the original job is used
evictAlloc := alloc.Copy()
job = mock.Job()
job.Priority = 123
eval = mock.Eval()
eval.JobID = job.ID
fsm.State().UpsertEvals(2, []*structs.Evaluation{eval})
evictAlloc.Job = nil
evictAlloc.DesiredStatus = structs.AllocDesiredStatusEvict
req2 := structs.ApplyPlanResultsRequest{
AllocUpdateRequest: structs.AllocUpdateRequest{
Job: job,
Alloc: []*structs.Allocation{evictAlloc},
},
EvalID: eval.ID,
}
buf, err = structs.Encode(structs.ApplyPlanResultsRequestType, req2)
assert.Nil(err)
log := makeLog(buf)
//set the index to something other than 1
log.Index = 25
resp = fsm.Apply(log)
assert.Nil(resp)
// Verify we are evicted
out, err = fsm.State().AllocByID(ws, alloc.ID)
assert.Nil(err)
assert.Equal(structs.AllocDesiredStatusEvict, out.DesiredStatus)
assert.NotNil(out.Job)
assert.NotEqual(123, out.Job.Priority)
evalOut, err := fsm.State().EvalByID(ws, eval.ID)
assert.Nil(err)
assert.Equal(log.Index, evalOut.ModifyIndex)
}
func TestFSM_DeploymentStatusUpdate(t *testing.T) {
t.Parallel()
fsm := testFSM(t)
fsm.evalBroker.SetEnabled(true)
state := fsm.State()
// Upsert a deployment
d := mock.Deployment()
if err := state.UpsertDeployment(1, d); err != nil {
t.Fatalf("bad: %v", err)
}
// Create a request to update the deployment, create an eval and job
e := mock.Eval()
j := mock.Job()
status, desc := structs.DeploymentStatusFailed, "foo"
req := &structs.DeploymentStatusUpdateRequest{
DeploymentUpdate: &structs.DeploymentStatusUpdate{
DeploymentID: d.ID,
Status: status,
StatusDescription: desc,
},
Job: j,
Eval: e,
}
buf, err := structs.Encode(structs.DeploymentStatusUpdateRequestType, req)
if err != nil {
t.Fatalf("err: %v", err)
}
resp := fsm.Apply(makeLog(buf))
if resp != nil {
t.Fatalf("resp: %v", resp)
}
// Check that the status was updated properly
ws := memdb.NewWatchSet()
dout, err := state.DeploymentByID(ws, d.ID)
if err != nil {
t.Fatalf("bad: %v", err)
}
if dout.Status != status || dout.StatusDescription != desc {
t.Fatalf("bad: %#v", dout)
}
// Check that the evaluation was created
eout, _ := state.EvalByID(ws, e.ID)
if err != nil {
t.Fatalf("bad: %v", err)
}
if eout == nil {
t.Fatalf("bad: %#v", eout)
}
// Check that the job was created
jout, _ := state.JobByID(ws, j.Namespace, j.ID)
if err != nil {
t.Fatalf("bad: %v", err)
}
if jout == nil {
t.Fatalf("bad: %#v", jout)
}
// Assert the eval was enqueued
stats := fsm.evalBroker.Stats()
if stats.TotalReady != 1 {
t.Fatalf("bad: %#v %#v", stats, e)
}
}
func TestFSM_JobStabilityUpdate(t *testing.T) {
t.Parallel()
fsm := testFSM(t)
fsm.evalBroker.SetEnabled(true)
state := fsm.State()
// Upsert a deployment
job := mock.Job()
if err := state.UpsertJob(1, job); err != nil {
t.Fatalf("bad: %v", err)
}
// Create a request to update the job to stable
req := &structs.JobStabilityRequest{
JobID: job.ID,
JobVersion: job.Version,
Stable: true,
WriteRequest: structs.WriteRequest{
Namespace: job.Namespace,
},
}
buf, err := structs.Encode(structs.JobStabilityRequestType, req)
if err != nil {
t.Fatalf("err: %v", err)
}
resp := fsm.Apply(makeLog(buf))
if resp != nil {
t.Fatalf("resp: %v", resp)
}
// Check that the stability was updated properly
ws := memdb.NewWatchSet()
jout, _ := state.JobByIDAndVersion(ws, job.Namespace, job.ID, job.Version)
if err != nil {
t.Fatalf("bad: %v", err)
}
if jout == nil || !jout.Stable {
t.Fatalf("bad: %#v", jout)
}
}
func TestFSM_DeploymentPromotion(t *testing.T) {
t.Parallel()
fsm := testFSM(t)
fsm.evalBroker.SetEnabled(true)
state := fsm.State()
// Create a job with two task groups
j := mock.Job()
tg1 := j.TaskGroups[0]
tg2 := tg1.Copy()
tg2.Name = "foo"
j.TaskGroups = append(j.TaskGroups, tg2)
if err := state.UpsertJob(1, j); err != nil {
t.Fatalf("bad: %v", err)
}
// Create a deployment
d := mock.Deployment()
d.JobID = j.ID
d.TaskGroups = map[string]*structs.DeploymentState{
"web": {
DesiredTotal: 10,
DesiredCanaries: 1,
},
"foo": {
DesiredTotal: 10,
DesiredCanaries: 1,
},
}
if err := state.UpsertDeployment(2, d); err != nil {
t.Fatalf("bad: %v", err)
}
// Create a set of allocations
c1 := mock.Alloc()
c1.JobID = j.ID
c1.DeploymentID = d.ID
d.TaskGroups[c1.TaskGroup].PlacedCanaries = append(d.TaskGroups[c1.TaskGroup].PlacedCanaries, c1.ID)
c1.DeploymentStatus = &structs.AllocDeploymentStatus{
Healthy: helper.BoolToPtr(true),
}
c2 := mock.Alloc()
c2.JobID = j.ID
c2.DeploymentID = d.ID
d.TaskGroups[c2.TaskGroup].PlacedCanaries = append(d.TaskGroups[c2.TaskGroup].PlacedCanaries, c2.ID)
c2.TaskGroup = tg2.Name
c2.DeploymentStatus = &structs.AllocDeploymentStatus{
Healthy: helper.BoolToPtr(true),
}
if err := state.UpsertAllocs(3, []*structs.Allocation{c1, c2}); err != nil {
t.Fatalf("err: %v", err)
}
// Create an eval
e := mock.Eval()
// Promote the canaries
req := &structs.ApplyDeploymentPromoteRequest{
DeploymentPromoteRequest: structs.DeploymentPromoteRequest{
DeploymentID: d.ID,
All: true,
},
Eval: e,
}
buf, err := structs.Encode(structs.DeploymentPromoteRequestType, req)
if err != nil {
t.Fatalf("err: %v", err)
}
resp := fsm.Apply(makeLog(buf))
if resp != nil {
t.Fatalf("resp: %v", resp)
}
// Check that the status per task group was updated properly
ws := memdb.NewWatchSet()
dout, err := state.DeploymentByID(ws, d.ID)
if err != nil {
t.Fatalf("bad: %v", err)
}
if len(dout.TaskGroups) != 2 {
t.Fatalf("bad: %#v", dout.TaskGroups)
}
for tg, state := range dout.TaskGroups {
if !state.Promoted {
t.Fatalf("bad: group %q not promoted %#v", tg, state)
}
}
// Check that the evaluation was created
eout, _ := state.EvalByID(ws, e.ID)
if err != nil {
t.Fatalf("bad: %v", err)
}
if eout == nil {
t.Fatalf("bad: %#v", eout)
}
// Assert the eval was enqueued
stats := fsm.evalBroker.Stats()
if stats.TotalReady != 1 {
t.Fatalf("bad: %#v %#v", stats, e)
}
}
func TestFSM_DeploymentAllocHealth(t *testing.T) {
t.Parallel()
fsm := testFSM(t)
fsm.evalBroker.SetEnabled(true)
state := fsm.State()
// Insert a deployment
d := mock.Deployment()
if err := state.UpsertDeployment(1, d); err != nil {
t.Fatalf("bad: %v", err)
}
// Insert two allocations
a1 := mock.Alloc()
a1.DeploymentID = d.ID
a2 := mock.Alloc()
a2.DeploymentID = d.ID
if err := state.UpsertAllocs(2, []*structs.Allocation{a1, a2}); err != nil {
t.Fatalf("bad: %v", err)
}
// Create a job to roll back to
j := mock.Job()
// Create an eval that should be upserted
e := mock.Eval()
// Create a status update for the deployment
status, desc := structs.DeploymentStatusFailed, "foo"
u := &structs.DeploymentStatusUpdate{
DeploymentID: d.ID,
Status: status,
StatusDescription: desc,
}
// Set health against the deployment
req := &structs.ApplyDeploymentAllocHealthRequest{
DeploymentAllocHealthRequest: structs.DeploymentAllocHealthRequest{
DeploymentID: d.ID,
HealthyAllocationIDs: []string{a1.ID},
UnhealthyAllocationIDs: []string{a2.ID},
},
Job: j,
Eval: e,
DeploymentUpdate: u,
}
buf, err := structs.Encode(structs.DeploymentAllocHealthRequestType, req)
if err != nil {
t.Fatalf("err: %v", err)
}
resp := fsm.Apply(makeLog(buf))
if resp != nil {
t.Fatalf("resp: %v", resp)
}
// Check that the status was updated properly
ws := memdb.NewWatchSet()
dout, err := state.DeploymentByID(ws, d.ID)
if err != nil {
t.Fatalf("bad: %v", err)
}
if dout.Status != status || dout.StatusDescription != desc {
t.Fatalf("bad: %#v", dout)
}
// Check that the evaluation was created
eout, _ := state.EvalByID(ws, e.ID)
if err != nil {
t.Fatalf("bad: %v", err)
}
if eout == nil {
t.Fatalf("bad: %#v", eout)
}
// Check that the job was created
jout, _ := state.JobByID(ws, j.Namespace, j.ID)
if err != nil {
t.Fatalf("bad: %v", err)
}
if jout == nil {
t.Fatalf("bad: %#v", jout)
}
// Check the status of the allocs
out1, err := state.AllocByID(ws, a1.ID)
if err != nil {
t.Fatalf("err: %v", err)
}
out2, err := state.AllocByID(ws, a2.ID)
if err != nil {
t.Fatalf("err: %v", err)
}
if !out1.DeploymentStatus.IsHealthy() {
t.Fatalf("bad: alloc %q not healthy", out1.ID)
}
if !out2.DeploymentStatus.IsUnhealthy() {
t.Fatalf("bad: alloc %q not unhealthy", out2.ID)
}
// Assert the eval was enqueued
stats := fsm.evalBroker.Stats()
if stats.TotalReady != 1 {
t.Fatalf("bad: %#v %#v", stats, e)
}
}
func TestFSM_DeleteDeployment(t *testing.T) {
t.Parallel()
fsm := testFSM(t)
state := fsm.State()
// Upsert a deployments
d := mock.Deployment()
if err := state.UpsertDeployment(1, d); err != nil {
t.Fatalf("bad: %v", err)
}
req := structs.DeploymentDeleteRequest{
Deployments: []string{d.ID},
}
buf, err := structs.Encode(structs.DeploymentDeleteRequestType, req)
if err != nil {
t.Fatalf("err: %v", err)
}
resp := fsm.Apply(makeLog(buf))
if resp != nil {
t.Fatalf("resp: %v", resp)
}
// Verify we are NOT registered
ws := memdb.NewWatchSet()
deployment, err := state.DeploymentByID(ws, d.ID)
if err != nil {
t.Fatalf("err: %v", err)
}
if deployment != nil {
t.Fatalf("deployment found!")
}
}
func TestFSM_UpsertACLPolicies(t *testing.T) {
t.Parallel()
fsm := testFSM(t)
policy := mock.ACLPolicy()
req := structs.ACLPolicyUpsertRequest{
Policies: []*structs.ACLPolicy{policy},
}
buf, err := structs.Encode(structs.ACLPolicyUpsertRequestType, req)
if err != nil {
t.Fatalf("err: %v", err)
}
resp := fsm.Apply(makeLog(buf))
if resp != nil {
t.Fatalf("resp: %v", resp)
}
// Verify we are registered
ws := memdb.NewWatchSet()
out, err := fsm.State().ACLPolicyByName(ws, policy.Name)
assert.Nil(t, err)
assert.NotNil(t, out)
}
func TestFSM_DeleteACLPolicies(t *testing.T) {
t.Parallel()
fsm := testFSM(t)
policy := mock.ACLPolicy()
err := fsm.State().UpsertACLPolicies(1000, []*structs.ACLPolicy{policy})
assert.Nil(t, err)
req := structs.ACLPolicyDeleteRequest{
Names: []string{policy.Name},
}
buf, err := structs.Encode(structs.ACLPolicyDeleteRequestType, req)
if err != nil {
t.Fatalf("err: %v", err)
}
resp := fsm.Apply(makeLog(buf))
if resp != nil {
t.Fatalf("resp: %v", resp)
}
// Verify we are NOT registered
ws := memdb.NewWatchSet()
out, err := fsm.State().ACLPolicyByName(ws, policy.Name)
assert.Nil(t, err)
assert.Nil(t, out)
}
func TestFSM_BootstrapACLTokens(t *testing.T) {
t.Parallel()
fsm := testFSM(t)
token := mock.ACLToken()
req := structs.ACLTokenBootstrapRequest{
Token: token,
}
buf, err := structs.Encode(structs.ACLTokenBootstrapRequestType, req)
if err != nil {
t.Fatalf("err: %v", err)
}
resp := fsm.Apply(makeLog(buf))
if resp != nil {
t.Fatalf("resp: %v", resp)
}
// Verify we are registered
out, err := fsm.State().ACLTokenByAccessorID(nil, token.AccessorID)
assert.Nil(t, err)
assert.NotNil(t, out)
// Test with reset
token2 := mock.ACLToken()
req = structs.ACLTokenBootstrapRequest{
Token: token2,
ResetIndex: out.CreateIndex,
}
buf, err = structs.Encode(structs.ACLTokenBootstrapRequestType, req)
if err != nil {
t.Fatalf("err: %v", err)
}
resp = fsm.Apply(makeLog(buf))
if resp != nil {
t.Fatalf("resp: %v", resp)
}
// Verify we are registered
out2, err := fsm.State().ACLTokenByAccessorID(nil, token2.AccessorID)
assert.Nil(t, err)
assert.NotNil(t, out2)
}
func TestFSM_UpsertACLTokens(t *testing.T) {
t.Parallel()
fsm := testFSM(t)
token := mock.ACLToken()
req := structs.ACLTokenUpsertRequest{
Tokens: []*structs.ACLToken{token},
}
buf, err := structs.Encode(structs.ACLTokenUpsertRequestType, req)
if err != nil {
t.Fatalf("err: %v", err)
}
resp := fsm.Apply(makeLog(buf))
if resp != nil {
t.Fatalf("resp: %v", resp)
}
// Verify we are registered
ws := memdb.NewWatchSet()
out, err := fsm.State().ACLTokenByAccessorID(ws, token.AccessorID)
assert.Nil(t, err)
assert.NotNil(t, out)
}
func TestFSM_DeleteACLTokens(t *testing.T) {
t.Parallel()
fsm := testFSM(t)
token := mock.ACLToken()
err := fsm.State().UpsertACLTokens(1000, []*structs.ACLToken{token})
assert.Nil(t, err)
req := structs.ACLTokenDeleteRequest{
AccessorIDs: []string{token.AccessorID},
}
buf, err := structs.Encode(structs.ACLTokenDeleteRequestType, req)
if err != nil {
t.Fatalf("err: %v", err)
}
resp := fsm.Apply(makeLog(buf))
if resp != nil {
t.Fatalf("resp: %v", resp)
}
// Verify we are NOT registered
ws := memdb.NewWatchSet()
out, err := fsm.State().ACLTokenByAccessorID(ws, token.AccessorID)
assert.Nil(t, err)
assert.Nil(t, out)
}
func testSnapshotRestore(t *testing.T, fsm *nomadFSM) *nomadFSM {
// Snapshot
snap, err := fsm.Snapshot()
if err != nil {
t.Fatalf("err: %v", err)
}
defer snap.Release()
// Persist
buf := bytes.NewBuffer(nil)
sink := &MockSink{buf, false}
if err := snap.Persist(sink); err != nil {
t.Fatalf("err: %v", err)
}
// Try to restore on a new FSM
fsm2 := testFSM(t)
snap, err = fsm2.Snapshot()
if err != nil {
t.Fatalf("err: %v", err)
}
defer snap.Release()
abandonCh := fsm2.State().AbandonCh()
// Do a restore
if err := fsm2.Restore(sink); err != nil {
t.Fatalf("err: %v", err)
}
select {
case <-abandonCh:
default:
t.Fatalf("bad")
}
return fsm2
}
func TestFSM_SnapshotRestore_Nodes(t *testing.T) {
t.Parallel()
// Add some state
fsm := testFSM(t)
state := fsm.State()
node1 := mock.Node()
state.UpsertNode(1000, node1)
// Upgrade this node
node2 := mock.Node()
node2.SchedulingEligibility = ""
state.UpsertNode(1001, node2)
// Verify the contents
fsm2 := testSnapshotRestore(t, fsm)
state2 := fsm2.State()
out1, _ := state2.NodeByID(nil, node1.ID)
out2, _ := state2.NodeByID(nil, node2.ID)
node2.SchedulingEligibility = structs.NodeSchedulingEligible
if !reflect.DeepEqual(node1, out1) {
t.Fatalf("bad: \n%#v\n%#v", out1, node1)
}
if !reflect.DeepEqual(node2, out2) {
t.Fatalf("bad: \n%#v\n%#v", out2, node2)
}
}
func TestFSM_SnapshotRestore_Jobs(t *testing.T) {
t.Parallel()
// Add some state
fsm := testFSM(t)
state := fsm.State()
job1 := mock.Job()
state.UpsertJob(1000, job1)
job2 := mock.Job()
state.UpsertJob(1001, job2)
// Verify the contents
ws := memdb.NewWatchSet()
fsm2 := testSnapshotRestore(t, fsm)
state2 := fsm2.State()
out1, _ := state2.JobByID(ws, job1.Namespace, job1.ID)
out2, _ := state2.JobByID(ws, job2.Namespace, job2.ID)
if !reflect.DeepEqual(job1, out1) {
t.Fatalf("bad: \n%#v\n%#v", out1, job1)
}
if !reflect.DeepEqual(job2, out2) {
t.Fatalf("bad: \n%#v\n%#v", out2, job2)
}
}
func TestFSM_SnapshotRestore_Evals(t *testing.T) {
t.Parallel()
// Add some state
fsm := testFSM(t)
state := fsm.State()
eval1 := mock.Eval()
state.UpsertEvals(1000, []*structs.Evaluation{eval1})
eval2 := mock.Eval()
state.UpsertEvals(1001, []*structs.Evaluation{eval2})
// Verify the contents
fsm2 := testSnapshotRestore(t, fsm)
state2 := fsm2.State()
ws := memdb.NewWatchSet()
out1, _ := state2.EvalByID(ws, eval1.ID)
out2, _ := state2.EvalByID(ws, eval2.ID)
if !reflect.DeepEqual(eval1, out1) {
t.Fatalf("bad: \n%#v\n%#v", out1, eval1)
}
if !reflect.DeepEqual(eval2, out2) {
t.Fatalf("bad: \n%#v\n%#v", out2, eval2)
}
}
func TestFSM_SnapshotRestore_Allocs(t *testing.T) {
t.Parallel()
// Add some state
fsm := testFSM(t)
state := fsm.State()
alloc1 := mock.Alloc()
alloc2 := mock.Alloc()
state.UpsertJobSummary(998, mock.JobSummary(alloc1.JobID))
state.UpsertJobSummary(999, mock.JobSummary(alloc2.JobID))
state.UpsertAllocs(1000, []*structs.Allocation{alloc1})
state.UpsertAllocs(1001, []*structs.Allocation{alloc2})
// Verify the contents
fsm2 := testSnapshotRestore(t, fsm)
state2 := fsm2.State()
ws := memdb.NewWatchSet()
out1, _ := state2.AllocByID(ws, alloc1.ID)
out2, _ := state2.AllocByID(ws, alloc2.ID)
if !reflect.DeepEqual(alloc1, out1) {
t.Fatalf("bad: \n%#v\n%#v", out1, alloc1)
}
if !reflect.DeepEqual(alloc2, out2) {
t.Fatalf("bad: \n%#v\n%#v", out2, alloc2)
}
}
func TestFSM_SnapshotRestore_Indexes(t *testing.T) {
t.Parallel()
// Add some state
fsm := testFSM(t)
state := fsm.State()
node1 := mock.Node()
state.UpsertNode(1000, node1)
// Verify the contents
fsm2 := testSnapshotRestore(t, fsm)
state2 := fsm2.State()
index, err := state2.Index("nodes")
if err != nil {
t.Fatalf("err: %v", err)
}
if index != 1000 {
t.Fatalf("bad: %d", index)
}
}
func TestFSM_SnapshotRestore_TimeTable(t *testing.T) {
t.Parallel()
// Add some state
fsm := testFSM(t)
tt := fsm.TimeTable()
start := time.Now().UTC()
tt.Witness(1000, start)
tt.Witness(2000, start.Add(10*time.Minute))
// Verify the contents
fsm2 := testSnapshotRestore(t, fsm)
tt2 := fsm2.TimeTable()
if tt2.NearestTime(1500) != start {
t.Fatalf("bad")
}
if tt2.NearestIndex(start.Add(15*time.Minute)) != 2000 {
t.Fatalf("bad")
}
}
func TestFSM_SnapshotRestore_PeriodicLaunches(t *testing.T) {
t.Parallel()
// Add some state
fsm := testFSM(t)
state := fsm.State()
job1 := mock.Job()
launch1 := &structs.PeriodicLaunch{
ID: job1.ID,
Namespace: job1.Namespace,
Launch: time.Now(),
}
state.UpsertPeriodicLaunch(1000, launch1)
job2 := mock.Job()
launch2 := &structs.PeriodicLaunch{
ID: job2.ID,
Namespace: job2.Namespace,
Launch: time.Now(),
}
state.UpsertPeriodicLaunch(1001, launch2)
// Verify the contents
fsm2 := testSnapshotRestore(t, fsm)
state2 := fsm2.State()
ws := memdb.NewWatchSet()
out1, _ := state2.PeriodicLaunchByID(ws, launch1.Namespace, launch1.ID)
out2, _ := state2.PeriodicLaunchByID(ws, launch2.Namespace, launch2.ID)
if !cmp.Equal(launch1, out1) {
t.Fatalf("bad: %v", cmp.Diff(launch1, out1))
}
if !cmp.Equal(launch2, out2) {
t.Fatalf("bad: %v", cmp.Diff(launch2, out2))
}
}
func TestFSM_SnapshotRestore_JobSummary(t *testing.T) {
t.Parallel()
// Add some state
fsm := testFSM(t)
state := fsm.State()
job1 := mock.Job()
state.UpsertJob(1000, job1)
ws := memdb.NewWatchSet()
js1, _ := state.JobSummaryByID(ws, job1.Namespace, job1.ID)
job2 := mock.Job()
state.UpsertJob(1001, job2)
js2, _ := state.JobSummaryByID(ws, job2.Namespace, job2.ID)
// Verify the contents
fsm2 := testSnapshotRestore(t, fsm)
state2 := fsm2.State()
out1, _ := state2.JobSummaryByID(ws, job1.Namespace, job1.ID)
out2, _ := state2.JobSummaryByID(ws, job2.Namespace, job2.ID)
if !reflect.DeepEqual(js1, out1) {
t.Fatalf("bad: \n%#v\n%#v", js1, out1)
}
if !reflect.DeepEqual(js2, out2) {
t.Fatalf("bad: \n%#v\n%#v", js2, out2)
}
}
func TestFSM_SnapshotRestore_VaultAccessors(t *testing.T) {
t.Parallel()
// Add some state
fsm := testFSM(t)
state := fsm.State()
a1 := mock.VaultAccessor()
a2 := mock.VaultAccessor()
state.UpsertVaultAccessor(1000, []*structs.VaultAccessor{a1, a2})
// Verify the contents
fsm2 := testSnapshotRestore(t, fsm)
state2 := fsm2.State()
ws := memdb.NewWatchSet()
out1, _ := state2.VaultAccessor(ws, a1.Accessor)
out2, _ := state2.VaultAccessor(ws, a2.Accessor)
if !reflect.DeepEqual(a1, out1) {
t.Fatalf("bad: \n%#v\n%#v", out1, a1)
}
if !reflect.DeepEqual(a2, out2) {
t.Fatalf("bad: \n%#v\n%#v", out2, a2)
}
}
func TestFSM_SnapshotRestore_JobVersions(t *testing.T) {
t.Parallel()
// Add some state
fsm := testFSM(t)
state := fsm.State()
job1 := mock.Job()
state.UpsertJob(1000, job1)
job2 := mock.Job()
job2.ID = job1.ID
state.UpsertJob(1001, job2)
// Verify the contents
ws := memdb.NewWatchSet()
fsm2 := testSnapshotRestore(t, fsm)
state2 := fsm2.State()
out1, _ := state2.JobByIDAndVersion(ws, job1.Namespace, job1.ID, job1.Version)
out2, _ := state2.JobByIDAndVersion(ws, job2.Namespace, job2.ID, job2.Version)
if !reflect.DeepEqual(job1, out1) {
t.Fatalf("bad: \n%#v\n%#v", out1, job1)
}
if !reflect.DeepEqual(job2, out2) {
t.Fatalf("bad: \n%#v\n%#v", out2, job2)
}
if job2.Version != 1 {
t.Fatalf("bad: \n%#v\n%#v", 1, job2)
}
}
func TestFSM_SnapshotRestore_Deployments(t *testing.T) {
t.Parallel()
// Add some state
fsm := testFSM(t)
state := fsm.State()
d1 := mock.Deployment()
d2 := mock.Deployment()
j := mock.Job()
d1.JobID = j.ID
d2.JobID = j.ID
state.UpsertJob(999, j)
state.UpsertDeployment(1000, d1)
state.UpsertDeployment(1001, d2)
// Verify the contents
fsm2 := testSnapshotRestore(t, fsm)
state2 := fsm2.State()
ws := memdb.NewWatchSet()
out1, _ := state2.DeploymentByID(ws, d1.ID)
out2, _ := state2.DeploymentByID(ws, d2.ID)
if !reflect.DeepEqual(d1, out1) {
t.Fatalf("bad: \n%#v\n%#v", out1, d1)
}
if !reflect.DeepEqual(d2, out2) {
t.Fatalf("bad: \n%#v\n%#v", out2, d2)
}
}
func TestFSM_SnapshotRestore_ACLPolicy(t *testing.T) {
t.Parallel()
// Add some state
fsm := testFSM(t)
state := fsm.State()
p1 := mock.ACLPolicy()
p2 := mock.ACLPolicy()
state.UpsertACLPolicies(1000, []*structs.ACLPolicy{p1, p2})
// Verify the contents
fsm2 := testSnapshotRestore(t, fsm)
state2 := fsm2.State()
ws := memdb.NewWatchSet()
out1, _ := state2.ACLPolicyByName(ws, p1.Name)
out2, _ := state2.ACLPolicyByName(ws, p2.Name)
assert.Equal(t, p1, out1)
assert.Equal(t, p2, out2)
}
func TestFSM_SnapshotRestore_ACLTokens(t *testing.T) {
t.Parallel()
// Add some state
fsm := testFSM(t)
state := fsm.State()
tk1 := mock.ACLToken()
tk2 := mock.ACLToken()
state.UpsertACLTokens(1000, []*structs.ACLToken{tk1, tk2})
// Verify the contents
fsm2 := testSnapshotRestore(t, fsm)
state2 := fsm2.State()
ws := memdb.NewWatchSet()
out1, _ := state2.ACLTokenByAccessorID(ws, tk1.AccessorID)
out2, _ := state2.ACLTokenByAccessorID(ws, tk2.AccessorID)
assert.Equal(t, tk1, out1)
assert.Equal(t, tk2, out2)
}
func TestFSM_SnapshotRestore_AddMissingSummary(t *testing.T) {
t.Parallel()
// Add some state
fsm := testFSM(t)
state := fsm.State()
// make an allocation
alloc := mock.Alloc()
state.UpsertJob(1010, alloc.Job)
state.UpsertAllocs(1011, []*structs.Allocation{alloc})
// Delete the summary
state.DeleteJobSummary(1040, alloc.Namespace, alloc.Job.ID)
// Delete the index
if err := state.RemoveIndex("job_summary"); err != nil {
t.Fatalf("err: %v", err)
}
fsm2 := testSnapshotRestore(t, fsm)
state2 := fsm2.State()
latestIndex, _ := state.LatestIndex()
ws := memdb.NewWatchSet()
out, _ := state2.JobSummaryByID(ws, alloc.Namespace, alloc.Job.ID)
expected := structs.JobSummary{
JobID: alloc.Job.ID,
Namespace: alloc.Job.Namespace,
Summary: map[string]structs.TaskGroupSummary{
"web": {
Starting: 1,
},
},
CreateIndex: 1010,
ModifyIndex: latestIndex,
}
if !reflect.DeepEqual(&expected, out) {
t.Fatalf("expected: %#v, actual: %#v", &expected, out)
}
}
func TestFSM_ReconcileSummaries(t *testing.T) {
t.Parallel()
// Add some state
fsm := testFSM(t)
state := fsm.State()
// Add a node
node := mock.Node()
state.UpsertNode(800, node)
// Make a job so that none of the tasks can be placed
job1 := mock.Job()
job1.TaskGroups[0].Tasks[0].Resources.CPU = 5000
state.UpsertJob(1000, job1)
// make a job which can make partial progress
alloc := mock.Alloc()
alloc.NodeID = node.ID
state.UpsertJob(1010, alloc.Job)
state.UpsertAllocs(1011, []*structs.Allocation{alloc})
// Delete the summaries
state.DeleteJobSummary(1030, job1.Namespace, job1.ID)
state.DeleteJobSummary(1040, alloc.Namespace, alloc.Job.ID)
req := structs.GenericRequest{}
buf, err := structs.Encode(structs.ReconcileJobSummariesRequestType, req)
if err != nil {
t.Fatalf("err: %v", err)
}
resp := fsm.Apply(makeLog(buf))
if resp != nil {
t.Fatalf("resp: %v", resp)
}
ws := memdb.NewWatchSet()
out1, _ := state.JobSummaryByID(ws, job1.Namespace, job1.ID)
expected := structs.JobSummary{
JobID: job1.ID,
Namespace: job1.Namespace,
Summary: map[string]structs.TaskGroupSummary{
"web": {
Queued: 10,
},
},
CreateIndex: 1000,
ModifyIndex: out1.ModifyIndex,
}
if !reflect.DeepEqual(&expected, out1) {
t.Fatalf("expected: %#v, actual: %#v", &expected, out1)
}
// This exercises the code path which adds the allocations made by the
// planner and the number of unplaced allocations in the reconcile summaries
// codepath
out2, _ := state.JobSummaryByID(ws, alloc.Namespace, alloc.Job.ID)
expected = structs.JobSummary{
JobID: alloc.Job.ID,
Namespace: alloc.Job.Namespace,
Summary: map[string]structs.TaskGroupSummary{
"web": {
Queued: 9,
Starting: 1,
},
},
CreateIndex: 1010,
ModifyIndex: out2.ModifyIndex,
}
if !reflect.DeepEqual(&expected, out2) {
t.Fatalf("Diff % #v", pretty.Diff(&expected, out2))
}
}
func TestFSM_LeakedDeployments(t *testing.T) {
t.Parallel()
require := require.New(t)
// Add some state
fsm := testFSM(t)
state := fsm.State()
d := mock.Deployment()
require.NoError(state.UpsertDeployment(1000, d))
// Verify the contents
fsm2 := testSnapshotRestore(t, fsm)
state2 := fsm2.State()
out, _ := state2.DeploymentByID(nil, d.ID)
require.NotNil(out)
require.Equal(structs.DeploymentStatusCancelled, out.Status)
}
func TestFSM_Autopilot(t *testing.T) {
t.Parallel()
fsm := testFSM(t)
// Set the autopilot config using a request.
req := structs.AutopilotSetConfigRequest{
Datacenter: "dc1",
Config: structs.AutopilotConfig{
CleanupDeadServers: true,
LastContactThreshold: 10 * time.Second,
MaxTrailingLogs: 300,
},
}
buf, err := structs.Encode(structs.AutopilotRequestType, req)
if err != nil {
t.Fatalf("err: %v", err)
}
resp := fsm.Apply(makeLog(buf))
if _, ok := resp.(error); ok {
t.Fatalf("bad: %v", resp)
}
// Verify key is set directly in the state store.
_, config, err := fsm.state.AutopilotConfig()
if err != nil {
t.Fatalf("err: %v", err)
}
if config.CleanupDeadServers != req.Config.CleanupDeadServers {
t.Fatalf("bad: %v", config.CleanupDeadServers)
}
if config.LastContactThreshold != req.Config.LastContactThreshold {
t.Fatalf("bad: %v", config.LastContactThreshold)
}
if config.MaxTrailingLogs != req.Config.MaxTrailingLogs {
t.Fatalf("bad: %v", config.MaxTrailingLogs)
}
// Now use CAS and provide an old index
req.CAS = true
req.Config.CleanupDeadServers = false
req.Config.ModifyIndex = config.ModifyIndex - 1
buf, err = structs.Encode(structs.AutopilotRequestType, req)
if err != nil {
t.Fatalf("err: %v", err)
}
resp = fsm.Apply(makeLog(buf))
if _, ok := resp.(error); ok {
t.Fatalf("bad: %v", resp)
}
_, config, err = fsm.state.AutopilotConfig()
if err != nil {
t.Fatalf("err: %v", err)
}
if !config.CleanupDeadServers {
t.Fatalf("bad: %v", config.CleanupDeadServers)
}
}