3c941c6bc3
This change adds a new table that will store ACL binding rule objects. The two indexes allow fast lookups by their ID, or by which auth method they are linked to. Snapshot persist and restore functionality ensures this table can be saved and restored from snapshots. In order to write and delete the object to state, new Raft messages have been added. All RPC request and response structs, along with object functions such as diff and canonicalize have been included within this work as it is nicely separated from the other areas of work.
677 lines
15 KiB
Go
677 lines
15 KiB
Go
package state
|
|
|
|
import (
|
|
"reflect"
|
|
"testing"
|
|
"time"
|
|
|
|
"github.com/hashicorp/go-memdb"
|
|
"github.com/hashicorp/nomad/ci"
|
|
"github.com/hashicorp/nomad/helper/uuid"
|
|
"github.com/hashicorp/nomad/nomad/mock"
|
|
"github.com/hashicorp/nomad/nomad/structs"
|
|
"github.com/shoenig/test/must"
|
|
"github.com/stretchr/testify/assert"
|
|
"github.com/stretchr/testify/require"
|
|
)
|
|
|
|
func TestStateStore_RestoreNode(t *testing.T) {
|
|
ci.Parallel(t)
|
|
|
|
state := testStateStore(t)
|
|
node := mock.Node()
|
|
|
|
restore, err := state.Restore()
|
|
if err != nil {
|
|
t.Fatalf("err: %v", err)
|
|
}
|
|
|
|
err = restore.NodeRestore(node)
|
|
if err != nil {
|
|
t.Fatalf("err: %v", err)
|
|
}
|
|
require.NoError(t, restore.Commit())
|
|
|
|
ws := memdb.NewWatchSet()
|
|
out, err := state.NodeByID(ws, node.ID)
|
|
if err != nil {
|
|
t.Fatalf("err: %v", err)
|
|
}
|
|
|
|
if !reflect.DeepEqual(out, node) {
|
|
t.Fatalf("Bad: %#v %#v", out, node)
|
|
}
|
|
}
|
|
|
|
func TestStateStore_RestoreJob(t *testing.T) {
|
|
ci.Parallel(t)
|
|
|
|
state := testStateStore(t)
|
|
job := mock.Job()
|
|
|
|
restore, err := state.Restore()
|
|
if err != nil {
|
|
t.Fatalf("err: %v", err)
|
|
}
|
|
|
|
err = restore.JobRestore(job)
|
|
if err != nil {
|
|
t.Fatalf("err: %v", err)
|
|
}
|
|
require.NoError(t, restore.Commit())
|
|
|
|
ws := memdb.NewWatchSet()
|
|
out, err := state.JobByID(ws, job.Namespace, job.ID)
|
|
if err != nil {
|
|
t.Fatalf("err: %v", err)
|
|
}
|
|
|
|
if !reflect.DeepEqual(out, job) {
|
|
t.Fatalf("Bad: %#v %#v", out, job)
|
|
}
|
|
}
|
|
|
|
func TestStateStore_RestorePeriodicLaunch(t *testing.T) {
|
|
ci.Parallel(t)
|
|
|
|
state := testStateStore(t)
|
|
job := mock.Job()
|
|
launch := &structs.PeriodicLaunch{
|
|
ID: job.ID,
|
|
Namespace: job.Namespace,
|
|
Launch: time.Now(),
|
|
}
|
|
|
|
restore, err := state.Restore()
|
|
if err != nil {
|
|
t.Fatalf("err: %v", err)
|
|
}
|
|
|
|
err = restore.PeriodicLaunchRestore(launch)
|
|
if err != nil {
|
|
t.Fatalf("err: %v", err)
|
|
}
|
|
require.NoError(t, restore.Commit())
|
|
|
|
ws := memdb.NewWatchSet()
|
|
out, err := state.PeriodicLaunchByID(ws, job.Namespace, job.ID)
|
|
if err != nil {
|
|
t.Fatalf("err: %v", err)
|
|
}
|
|
|
|
if !reflect.DeepEqual(out, launch) {
|
|
t.Fatalf("Bad: %#v %#v", out, job)
|
|
}
|
|
|
|
if watchFired(ws) {
|
|
t.Fatalf("bad")
|
|
}
|
|
}
|
|
|
|
func TestStateStore_RestoreJobVersion(t *testing.T) {
|
|
ci.Parallel(t)
|
|
|
|
state := testStateStore(t)
|
|
job := mock.Job()
|
|
|
|
restore, err := state.Restore()
|
|
if err != nil {
|
|
t.Fatalf("err: %v", err)
|
|
}
|
|
|
|
err = restore.JobVersionRestore(job)
|
|
if err != nil {
|
|
t.Fatalf("err: %v", err)
|
|
}
|
|
require.NoError(t, restore.Commit())
|
|
|
|
ws := memdb.NewWatchSet()
|
|
out, err := state.JobByIDAndVersion(ws, job.Namespace, job.ID, job.Version)
|
|
if err != nil {
|
|
t.Fatalf("err: %v", err)
|
|
}
|
|
|
|
if !reflect.DeepEqual(out, job) {
|
|
t.Fatalf("Bad: %#v %#v", out, job)
|
|
}
|
|
|
|
if watchFired(ws) {
|
|
t.Fatalf("bad")
|
|
}
|
|
}
|
|
|
|
func TestStateStore_RestoreDeployment(t *testing.T) {
|
|
ci.Parallel(t)
|
|
|
|
state := testStateStore(t)
|
|
d := mock.Deployment()
|
|
|
|
restore, err := state.Restore()
|
|
if err != nil {
|
|
t.Fatalf("err: %v", err)
|
|
}
|
|
|
|
err = restore.DeploymentRestore(d)
|
|
if err != nil {
|
|
t.Fatalf("err: %v", err)
|
|
}
|
|
require.NoError(t, restore.Commit())
|
|
|
|
ws := memdb.NewWatchSet()
|
|
out, err := state.DeploymentByID(ws, d.ID)
|
|
if err != nil {
|
|
t.Fatalf("err: %v", err)
|
|
}
|
|
|
|
if !reflect.DeepEqual(out, d) {
|
|
t.Fatalf("Bad: %#v %#v", out, d)
|
|
}
|
|
|
|
if watchFired(ws) {
|
|
t.Fatalf("bad")
|
|
}
|
|
}
|
|
|
|
func TestStateStore_RestoreJobSummary(t *testing.T) {
|
|
ci.Parallel(t)
|
|
|
|
state := testStateStore(t)
|
|
job := mock.Job()
|
|
jobSummary := &structs.JobSummary{
|
|
JobID: job.ID,
|
|
Namespace: job.Namespace,
|
|
Summary: map[string]structs.TaskGroupSummary{
|
|
"web": {
|
|
Starting: 10,
|
|
},
|
|
},
|
|
}
|
|
restore, err := state.Restore()
|
|
if err != nil {
|
|
t.Fatalf("err: %v", err)
|
|
}
|
|
|
|
err = restore.JobSummaryRestore(jobSummary)
|
|
if err != nil {
|
|
t.Fatalf("err: %v", err)
|
|
}
|
|
require.NoError(t, restore.Commit())
|
|
|
|
ws := memdb.NewWatchSet()
|
|
out, err := state.JobSummaryByID(ws, job.Namespace, job.ID)
|
|
if err != nil {
|
|
t.Fatalf("err: %v", err)
|
|
}
|
|
|
|
if !reflect.DeepEqual(out, jobSummary) {
|
|
t.Fatalf("Bad: %#v %#v", out, jobSummary)
|
|
}
|
|
}
|
|
|
|
func TestStateStore_RestoreCSIPlugin(t *testing.T) {
|
|
ci.Parallel(t)
|
|
require := require.New(t)
|
|
|
|
state := testStateStore(t)
|
|
plugin := mock.CSIPlugin()
|
|
|
|
restore, err := state.Restore()
|
|
require.NoError(err)
|
|
|
|
err = restore.CSIPluginRestore(plugin)
|
|
require.NoError(err)
|
|
require.NoError(restore.Commit())
|
|
|
|
ws := memdb.NewWatchSet()
|
|
out, err := state.CSIPluginByID(ws, plugin.ID)
|
|
require.NoError(err)
|
|
require.EqualValues(out, plugin)
|
|
}
|
|
|
|
func TestStateStore_RestoreCSIVolume(t *testing.T) {
|
|
ci.Parallel(t)
|
|
require := require.New(t)
|
|
|
|
state := testStateStore(t)
|
|
plugin := mock.CSIPlugin()
|
|
volume := mock.CSIVolume(plugin)
|
|
|
|
restore, err := state.Restore()
|
|
require.NoError(err)
|
|
|
|
err = restore.CSIVolumeRestore(volume)
|
|
require.NoError(err)
|
|
restore.Commit()
|
|
|
|
ws := memdb.NewWatchSet()
|
|
out, err := state.CSIVolumeByID(ws, "default", volume.ID)
|
|
require.NoError(err)
|
|
require.EqualValues(out, volume)
|
|
}
|
|
|
|
func TestStateStore_RestoreIndex(t *testing.T) {
|
|
ci.Parallel(t)
|
|
|
|
state := testStateStore(t)
|
|
|
|
restore, err := state.Restore()
|
|
if err != nil {
|
|
t.Fatalf("err: %v", err)
|
|
}
|
|
|
|
index := &IndexEntry{"jobs", 1000}
|
|
err = restore.IndexRestore(index)
|
|
if err != nil {
|
|
t.Fatalf("err: %v", err)
|
|
}
|
|
|
|
require.NoError(t, restore.Commit())
|
|
|
|
out, err := state.Index("jobs")
|
|
if err != nil {
|
|
t.Fatalf("err: %v", err)
|
|
}
|
|
|
|
if out != 1000 {
|
|
t.Fatalf("Bad: %#v %#v", out, 1000)
|
|
}
|
|
}
|
|
|
|
func TestStateStore_RestoreEval(t *testing.T) {
|
|
ci.Parallel(t)
|
|
|
|
state := testStateStore(t)
|
|
eval := mock.Eval()
|
|
|
|
restore, err := state.Restore()
|
|
if err != nil {
|
|
t.Fatalf("err: %v", err)
|
|
}
|
|
|
|
err = restore.EvalRestore(eval)
|
|
if err != nil {
|
|
t.Fatalf("err: %v", err)
|
|
}
|
|
require.NoError(t, restore.Commit())
|
|
|
|
ws := memdb.NewWatchSet()
|
|
out, err := state.EvalByID(ws, eval.ID)
|
|
if err != nil {
|
|
t.Fatalf("err: %v", err)
|
|
}
|
|
|
|
if !reflect.DeepEqual(out, eval) {
|
|
t.Fatalf("Bad: %#v %#v", out, eval)
|
|
}
|
|
}
|
|
|
|
func TestStateStore_RestoreAlloc(t *testing.T) {
|
|
ci.Parallel(t)
|
|
|
|
state := testStateStore(t)
|
|
alloc := mock.Alloc()
|
|
|
|
restore, err := state.Restore()
|
|
if err != nil {
|
|
t.Fatalf("err: %v", err)
|
|
}
|
|
|
|
err = restore.AllocRestore(alloc)
|
|
if err != nil {
|
|
t.Fatalf("err: %v", err)
|
|
}
|
|
|
|
require.NoError(t, restore.Commit())
|
|
|
|
ws := memdb.NewWatchSet()
|
|
out, err := state.AllocByID(ws, alloc.ID)
|
|
if err != nil {
|
|
t.Fatalf("err: %v", err)
|
|
}
|
|
|
|
if !reflect.DeepEqual(out, alloc) {
|
|
t.Fatalf("Bad: %#v %#v", out, alloc)
|
|
}
|
|
|
|
if watchFired(ws) {
|
|
t.Fatalf("bad")
|
|
}
|
|
}
|
|
|
|
func TestStateStore_RestoreVaultAccessor(t *testing.T) {
|
|
ci.Parallel(t)
|
|
|
|
state := testStateStore(t)
|
|
a := mock.VaultAccessor()
|
|
|
|
restore, err := state.Restore()
|
|
if err != nil {
|
|
t.Fatalf("err: %v", err)
|
|
}
|
|
|
|
err = restore.VaultAccessorRestore(a)
|
|
if err != nil {
|
|
t.Fatalf("err: %v", err)
|
|
}
|
|
require.NoError(t, restore.Commit())
|
|
|
|
ws := memdb.NewWatchSet()
|
|
out, err := state.VaultAccessor(ws, a.Accessor)
|
|
if err != nil {
|
|
t.Fatalf("err: %v", err)
|
|
}
|
|
|
|
if !reflect.DeepEqual(out, a) {
|
|
t.Fatalf("Bad: %#v %#v", out, a)
|
|
}
|
|
|
|
if watchFired(ws) {
|
|
t.Fatalf("bad")
|
|
}
|
|
}
|
|
|
|
func TestStateStore_RestoreSITokenAccessor(t *testing.T) {
|
|
ci.Parallel(t)
|
|
r := require.New(t)
|
|
|
|
state := testStateStore(t)
|
|
a1 := mock.SITokenAccessor()
|
|
|
|
restore, err := state.Restore()
|
|
r.NoError(err)
|
|
|
|
err = restore.SITokenAccessorRestore(a1)
|
|
r.NoError(err)
|
|
|
|
require.NoError(t, restore.Commit())
|
|
|
|
ws := memdb.NewWatchSet()
|
|
result, err := state.SITokenAccessor(ws, a1.AccessorID)
|
|
r.NoError(err)
|
|
r.Equal(a1, result)
|
|
|
|
wsFired := watchFired(ws)
|
|
r.False(wsFired)
|
|
}
|
|
|
|
func TestStateStore_RestoreACLPolicy(t *testing.T) {
|
|
ci.Parallel(t)
|
|
|
|
state := testStateStore(t)
|
|
policy := mock.ACLPolicy()
|
|
|
|
restore, err := state.Restore()
|
|
if err != nil {
|
|
t.Fatalf("err: %v", err)
|
|
}
|
|
|
|
err = restore.ACLPolicyRestore(policy)
|
|
if err != nil {
|
|
t.Fatalf("err: %v", err)
|
|
}
|
|
require.NoError(t, restore.Commit())
|
|
|
|
ws := memdb.NewWatchSet()
|
|
out, err := state.ACLPolicyByName(ws, policy.Name)
|
|
if err != nil {
|
|
t.Fatalf("err: %v", err)
|
|
}
|
|
assert.Equal(t, policy, out)
|
|
}
|
|
|
|
func TestStateStore_RestoreACLToken(t *testing.T) {
|
|
ci.Parallel(t)
|
|
|
|
state := testStateStore(t)
|
|
token := mock.ACLToken()
|
|
|
|
restore, err := state.Restore()
|
|
if err != nil {
|
|
t.Fatalf("err: %v", err)
|
|
}
|
|
|
|
err = restore.ACLTokenRestore(token)
|
|
if err != nil {
|
|
t.Fatalf("err: %v", err)
|
|
}
|
|
require.NoError(t, restore.Commit())
|
|
|
|
ws := memdb.NewWatchSet()
|
|
out, err := state.ACLTokenByAccessorID(ws, token.AccessorID)
|
|
if err != nil {
|
|
t.Fatalf("err: %v", err)
|
|
}
|
|
assert.Equal(t, token, out)
|
|
}
|
|
|
|
func TestStateStore_ClusterMetadataRestore(t *testing.T) {
|
|
require := require.New(t)
|
|
|
|
state := testStateStore(t)
|
|
clusterID := "12345678-1234-1234-1234-1234567890"
|
|
now := time.Now().UnixNano()
|
|
meta := &structs.ClusterMetadata{ClusterID: clusterID, CreateTime: now}
|
|
|
|
restore, err := state.Restore()
|
|
require.NoError(err)
|
|
|
|
err = restore.ClusterMetadataRestore(meta)
|
|
require.NoError(err)
|
|
|
|
require.NoError(restore.Commit())
|
|
|
|
out, err := state.ClusterMetadata(nil)
|
|
require.NoError(err)
|
|
require.Equal(clusterID, out.ClusterID)
|
|
require.Equal(now, out.CreateTime)
|
|
}
|
|
|
|
func TestStateStore_RestoreScalingPolicy(t *testing.T) {
|
|
ci.Parallel(t)
|
|
require := require.New(t)
|
|
|
|
state := testStateStore(t)
|
|
scalingPolicy := mock.ScalingPolicy()
|
|
|
|
restore, err := state.Restore()
|
|
require.NoError(err)
|
|
|
|
err = restore.ScalingPolicyRestore(scalingPolicy)
|
|
require.NoError(err)
|
|
require.NoError(restore.Commit())
|
|
|
|
ws := memdb.NewWatchSet()
|
|
out, err := state.ScalingPolicyByID(ws, scalingPolicy.ID)
|
|
require.NoError(err)
|
|
require.EqualValues(out, scalingPolicy)
|
|
}
|
|
|
|
func TestStateStore_RestoreScalingEvents(t *testing.T) {
|
|
ci.Parallel(t)
|
|
require := require.New(t)
|
|
|
|
state := testStateStore(t)
|
|
jobScalingEvents := &structs.JobScalingEvents{
|
|
Namespace: uuid.Generate(),
|
|
JobID: uuid.Generate(),
|
|
ScalingEvents: map[string][]*structs.ScalingEvent{
|
|
uuid.Generate(): {
|
|
structs.NewScalingEvent(uuid.Generate()),
|
|
},
|
|
},
|
|
}
|
|
|
|
restore, err := state.Restore()
|
|
require.NoError(err)
|
|
|
|
err = restore.ScalingEventsRestore(jobScalingEvents)
|
|
require.NoError(err)
|
|
require.NoError(restore.Commit())
|
|
|
|
ws := memdb.NewWatchSet()
|
|
out, _, err := state.ScalingEventsByJob(ws, jobScalingEvents.Namespace,
|
|
jobScalingEvents.JobID)
|
|
require.NoError(err)
|
|
require.NotNil(out)
|
|
require.EqualValues(jobScalingEvents.ScalingEvents, out)
|
|
}
|
|
|
|
func TestStateStore_RestoreSchedulerConfig(t *testing.T) {
|
|
ci.Parallel(t)
|
|
|
|
state := testStateStore(t)
|
|
schedConfig := &structs.SchedulerConfiguration{
|
|
PreemptionConfig: structs.PreemptionConfig{
|
|
SystemSchedulerEnabled: false,
|
|
},
|
|
CreateIndex: 100,
|
|
ModifyIndex: 200,
|
|
}
|
|
|
|
require := require.New(t)
|
|
restore, err := state.Restore()
|
|
require.Nil(err)
|
|
|
|
err = restore.SchedulerConfigRestore(schedConfig)
|
|
require.Nil(err)
|
|
|
|
require.NoError(restore.Commit())
|
|
|
|
modIndex, out, err := state.SchedulerConfig()
|
|
require.Nil(err)
|
|
require.Equal(schedConfig.ModifyIndex, modIndex)
|
|
|
|
require.Equal(schedConfig, out)
|
|
}
|
|
|
|
func TestStateStore_ServiceRegistrationRestore(t *testing.T) {
|
|
ci.Parallel(t)
|
|
testState := testStateStore(t)
|
|
|
|
// Set up our test registrations and index.
|
|
expectedIndex := uint64(13)
|
|
serviceRegs := mock.ServiceRegistrations()
|
|
|
|
restore, err := testState.Restore()
|
|
require.NoError(t, err)
|
|
|
|
// Iterate the service registrations, restore, and commit. Set the indexes
|
|
// on the objects, so we can check these.
|
|
for i := range serviceRegs {
|
|
serviceRegs[i].ModifyIndex = expectedIndex
|
|
serviceRegs[i].CreateIndex = expectedIndex
|
|
require.NoError(t, restore.ServiceRegistrationRestore(serviceRegs[i]))
|
|
}
|
|
require.NoError(t, restore.Commit())
|
|
|
|
// Check the state is now populated as we expect and that we can find the
|
|
// restored registrations.
|
|
ws := memdb.NewWatchSet()
|
|
|
|
for i := range serviceRegs {
|
|
out, err := testState.GetServiceRegistrationByID(ws, serviceRegs[i].Namespace, serviceRegs[i].ID)
|
|
require.NoError(t, err)
|
|
require.Equal(t, serviceRegs[i], out)
|
|
}
|
|
}
|
|
|
|
func TestStateStore_VariablesRestore(t *testing.T) {
|
|
ci.Parallel(t)
|
|
testState := testStateStore(t)
|
|
|
|
// Set up our test variables and index.
|
|
expectedIndex := uint64(13)
|
|
svs := mock.VariablesEncrypted(5, 5)
|
|
|
|
restore, err := testState.Restore()
|
|
require.NoError(t, err)
|
|
|
|
// Iterate the variables, restore, and commit. Set the indexes
|
|
// on the objects, so we can check these.
|
|
for i := range svs {
|
|
svs[i].ModifyIndex = expectedIndex
|
|
svs[i].CreateIndex = expectedIndex
|
|
require.NoError(t, restore.VariablesRestore(svs[i]))
|
|
}
|
|
require.NoError(t, restore.Commit())
|
|
|
|
// Check the state is now populated as we expect and that we can find the
|
|
// restored variables.
|
|
ws := memdb.NewWatchSet()
|
|
|
|
for i := range svs {
|
|
out, err := testState.GetVariable(ws, svs[i].Namespace, svs[i].Path)
|
|
require.NoError(t, err)
|
|
require.Equal(t, svs[i], out)
|
|
}
|
|
}
|
|
|
|
func TestStateStore_ACLRoleRestore(t *testing.T) {
|
|
ci.Parallel(t)
|
|
testState := testStateStore(t)
|
|
|
|
// Set up our test registrations and index.
|
|
expectedIndex := uint64(13)
|
|
aclRole := mock.ACLRole()
|
|
aclRole.CreateIndex = expectedIndex
|
|
aclRole.ModifyIndex = expectedIndex
|
|
|
|
restore, err := testState.Restore()
|
|
require.NoError(t, err)
|
|
require.NoError(t, restore.ACLRoleRestore(aclRole))
|
|
require.NoError(t, restore.Commit())
|
|
|
|
// Check the state is now populated as we expect and that we can find the
|
|
// restored registrations.
|
|
ws := memdb.NewWatchSet()
|
|
out, err := testState.GetACLRoleByName(ws, aclRole.Name)
|
|
require.NoError(t, err)
|
|
require.Equal(t, aclRole, out)
|
|
}
|
|
|
|
func TestStateStore_ACLAuthMethodRestore(t *testing.T) {
|
|
ci.Parallel(t)
|
|
testState := testStateStore(t)
|
|
|
|
// Set up our test registrations and index.
|
|
expectedIndex := uint64(13)
|
|
authMethod := mock.ACLAuthMethod()
|
|
authMethod.CreateIndex = expectedIndex
|
|
authMethod.ModifyIndex = expectedIndex
|
|
|
|
restore, err := testState.Restore()
|
|
require.NoError(t, err)
|
|
require.NoError(t, restore.ACLAuthMethodRestore(authMethod))
|
|
require.NoError(t, restore.Commit())
|
|
|
|
// Check the state is now populated as we expect and that we can find the
|
|
// restored registrations.
|
|
ws := memdb.NewWatchSet()
|
|
out, err := testState.GetACLAuthMethodByName(ws, authMethod.Name)
|
|
require.NoError(t, err)
|
|
require.Equal(t, authMethod, out)
|
|
}
|
|
|
|
func TestStateStore_ACLBindingRuleRestore(t *testing.T) {
|
|
ci.Parallel(t)
|
|
testState := testStateStore(t)
|
|
|
|
// Set up our test ACL binding rule and index.
|
|
expectedIndex := uint64(13)
|
|
aclBindingRule := mock.ACLBindingRule()
|
|
aclBindingRule.CreateIndex = expectedIndex
|
|
aclBindingRule.ModifyIndex = expectedIndex
|
|
|
|
restore, err := testState.Restore()
|
|
must.NoError(t, err)
|
|
must.NoError(t, restore.ACLBindingRuleRestore(aclBindingRule))
|
|
must.NoError(t, restore.Commit())
|
|
|
|
// Check the state is now populated as we expect and that we can find the
|
|
// restored ACL binding rule.
|
|
ws := memdb.NewWatchSet()
|
|
out, err := testState.GetACLBindingRule(ws, aclBindingRule.ID)
|
|
must.NoError(t, err)
|
|
must.Eq(t, aclBindingRule, out)
|
|
}
|