open-consul/agent/consul/fsm/commands_ce_test.go

1807 lines
41 KiB
Go

// Copyright (c) HashiCorp, Inc.
// SPDX-License-Identifier: MPL-2.0
package fsm
import (
"bytes"
"context"
"errors"
"fmt"
"math/rand"
"reflect"
"testing"
"time"
"github.com/mitchellh/mapstructure"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"google.golang.org/grpc"
"google.golang.org/protobuf/proto"
"github.com/hashicorp/consul/agent/connect"
"github.com/hashicorp/consul/agent/consul/state"
"github.com/hashicorp/consul/agent/structs"
"github.com/hashicorp/consul/api"
"github.com/hashicorp/consul/internal/storage"
raftstorage "github.com/hashicorp/consul/internal/storage/raft"
"github.com/hashicorp/consul/proto-public/pbresource"
"github.com/hashicorp/consul/proto/private/prototest"
"github.com/hashicorp/consul/sdk/testutil"
"github.com/hashicorp/consul/types"
"github.com/hashicorp/go-raftchunking"
raftchunkingtypes "github.com/hashicorp/go-raftchunking/types"
"github.com/hashicorp/go-uuid"
"github.com/hashicorp/raft"
"github.com/hashicorp/serf/coordinate"
)
func generateUUID() (ret string) {
var err error
if ret, err = uuid.GenerateUUID(); err != nil {
panic(fmt.Sprintf("Unable to generate a UUID, %v", err))
}
return ret
}
func generateRandomCoordinate() *coordinate.Coordinate {
config := coordinate.DefaultConfig()
coord := coordinate.NewCoordinate(config)
for i := range coord.Vec {
coord.Vec[i] = rand.NormFloat64()
}
coord.Error = rand.NormFloat64()
coord.Adjustment = rand.NormFloat64()
return coord
}
func TestFSM_RegisterNode(t *testing.T) {
t.Parallel()
logger := testutil.Logger(t)
fsm, err := New(nil, logger)
if err != nil {
t.Fatalf("err: %v", err)
}
req := structs.RegisterRequest{
Datacenter: "dc1",
Node: "foo",
Address: "127.0.0.1",
EnterpriseMeta: *structs.NodeEnterpriseMetaInDefaultPartition(),
}
buf, err := structs.Encode(structs.RegisterRequestType, req)
if err != nil {
t.Fatalf("err: %v", err)
}
resp := fsm.Apply(makeLog(buf))
if resp != nil {
t.Fatalf("resp: %v", resp)
}
// Verify we are registered
_, node, err := fsm.state.GetNode("foo", nil, "")
if err != nil {
t.Fatalf("err: %s", err)
}
if node == nil {
t.Fatalf("not found!")
}
if node.ModifyIndex != 1 {
t.Fatalf("bad index: %d", node.ModifyIndex)
}
// Verify service registered
_, services, err := fsm.state.NodeServices(nil, "foo", structs.DefaultEnterpriseMetaInDefaultPartition(), "")
if err != nil {
t.Fatalf("err: %s", err)
}
if len(services.Services) != 0 {
t.Fatalf("Services: %v", services)
}
}
func TestFSM_RegisterNode_Service(t *testing.T) {
t.Parallel()
logger := testutil.Logger(t)
fsm, err := New(nil, logger)
if err != nil {
t.Fatalf("err: %v", err)
}
req := structs.RegisterRequest{
Datacenter: "dc1",
Node: "foo",
Address: "127.0.0.1",
Service: &structs.NodeService{
ID: "db",
Service: "db",
Tags: []string{"primary"},
Port: 8000,
},
Check: &structs.HealthCheck{
Node: "foo",
CheckID: "db",
Name: "db connectivity",
Status: api.HealthPassing,
ServiceID: "db",
},
EnterpriseMeta: *structs.NodeEnterpriseMetaInDefaultPartition(),
}
buf, err := structs.Encode(structs.RegisterRequestType, req)
if err != nil {
t.Fatalf("err: %v", err)
}
resp := fsm.Apply(makeLog(buf))
if resp != nil {
t.Fatalf("resp: %v", resp)
}
// Verify we are registered
_, node, err := fsm.state.GetNode("foo", nil, "")
if err != nil {
t.Fatalf("err: %s", err)
}
if node == nil {
t.Fatalf("not found!")
}
// Verify service registered
_, services, err := fsm.state.NodeServices(nil, "foo", structs.DefaultEnterpriseMetaInDefaultPartition(), "")
if err != nil {
t.Fatalf("err: %s", err)
}
if _, ok := services.Services["db"]; !ok {
t.Fatalf("not registered!")
}
// Verify check
_, checks, err := fsm.state.NodeChecks(nil, "foo", structs.DefaultEnterpriseMetaInDefaultPartition(), "")
if err != nil {
t.Fatalf("err: %s", err)
}
if checks[0].CheckID != "db" {
t.Fatalf("not registered!")
}
}
func TestFSM_DeregisterService(t *testing.T) {
t.Parallel()
logger := testutil.Logger(t)
fsm, err := New(nil, logger)
if err != nil {
t.Fatalf("err: %v", err)
}
req := structs.RegisterRequest{
Datacenter: "dc1",
Node: "foo",
Address: "127.0.0.1",
Service: &structs.NodeService{
ID: "db",
Service: "db",
Tags: []string{"primary"},
Port: 8000,
},
}
buf, err := structs.Encode(structs.RegisterRequestType, req)
if err != nil {
t.Fatalf("err: %v", err)
}
resp := fsm.Apply(makeLog(buf))
if resp != nil {
t.Fatalf("resp: %v", resp)
}
dereg := structs.DeregisterRequest{
Datacenter: "dc1",
Node: "foo",
ServiceID: "db",
}
buf, err = structs.Encode(structs.DeregisterRequestType, dereg)
if err != nil {
t.Fatalf("err: %v", err)
}
resp = fsm.Apply(makeLog(buf))
if resp != nil {
t.Fatalf("resp: %v", resp)
}
// Verify we are registered
_, node, err := fsm.state.GetNode("foo", nil, "")
if err != nil {
t.Fatalf("err: %s", err)
}
if node == nil {
t.Fatalf("not found!")
}
// Verify service not registered
_, services, err := fsm.state.NodeServices(nil, "foo", structs.DefaultEnterpriseMetaInDefaultPartition(), "")
if err != nil {
t.Fatalf("err: %s", err)
}
if _, ok := services.Services["db"]; ok {
t.Fatalf("db registered!")
}
}
func TestFSM_DeregisterCheck(t *testing.T) {
t.Parallel()
logger := testutil.Logger(t)
fsm, err := New(nil, logger)
if err != nil {
t.Fatalf("err: %v", err)
}
req := structs.RegisterRequest{
Datacenter: "dc1",
Node: "foo",
Address: "127.0.0.1",
Check: &structs.HealthCheck{
Node: "foo",
CheckID: "mem",
Name: "memory util",
Status: api.HealthPassing,
},
}
buf, err := structs.Encode(structs.RegisterRequestType, req)
if err != nil {
t.Fatalf("err: %v", err)
}
resp := fsm.Apply(makeLog(buf))
if resp != nil {
t.Fatalf("resp: %v", resp)
}
dereg := structs.DeregisterRequest{
Datacenter: "dc1",
Node: "foo",
CheckID: "mem",
}
buf, err = structs.Encode(structs.DeregisterRequestType, dereg)
if err != nil {
t.Fatalf("err: %v", err)
}
resp = fsm.Apply(makeLog(buf))
if resp != nil {
t.Fatalf("resp: %v", resp)
}
// Verify we are registered
_, node, err := fsm.state.GetNode("foo", nil, "")
if err != nil {
t.Fatalf("err: %s", err)
}
if node == nil {
t.Fatalf("not found!")
}
// Verify check not registered
_, checks, err := fsm.state.NodeChecks(nil, "foo", structs.DefaultEnterpriseMetaInDefaultPartition(), "")
if err != nil {
t.Fatalf("err: %s", err)
}
if len(checks) != 0 {
t.Fatalf("check registered!")
}
}
func TestFSM_DeregisterNode(t *testing.T) {
t.Parallel()
logger := testutil.Logger(t)
fsm, err := New(nil, logger)
if err != nil {
t.Fatalf("err: %v", err)
}
req := structs.RegisterRequest{
Datacenter: "dc1",
Node: "foo",
Address: "127.0.0.1",
Service: &structs.NodeService{
ID: "db",
Service: "db",
Tags: []string{"primary"},
Port: 8000,
},
Check: &structs.HealthCheck{
Node: "foo",
CheckID: "db",
Name: "db connectivity",
Status: api.HealthPassing,
ServiceID: "db",
},
}
buf, err := structs.Encode(structs.RegisterRequestType, req)
if err != nil {
t.Fatalf("err: %v", err)
}
resp := fsm.Apply(makeLog(buf))
if resp != nil {
t.Fatalf("resp: %v", resp)
}
dereg := structs.DeregisterRequest{
Datacenter: "dc1",
Node: "foo",
}
buf, err = structs.Encode(structs.DeregisterRequestType, dereg)
if err != nil {
t.Fatalf("err: %v", err)
}
resp = fsm.Apply(makeLog(buf))
if resp != nil {
t.Fatalf("resp: %v", resp)
}
// Verify we are not registered
_, node, err := fsm.state.GetNode("foo", nil, "")
if err != nil {
t.Fatalf("err: %s", err)
}
if node != nil {
t.Fatalf("found!")
}
// Verify service not registered
_, services, err := fsm.state.NodeServices(nil, "foo", structs.DefaultEnterpriseMetaInDefaultPartition(), "")
if err != nil {
t.Fatalf("err: %s", err)
}
if services != nil {
t.Fatalf("Services: %v", services)
}
// Verify checks not registered
_, checks, err := fsm.state.NodeChecks(nil, "foo", structs.DefaultEnterpriseMetaInDefaultPartition(), "")
if err != nil {
t.Fatalf("err: %s", err)
}
if len(checks) != 0 {
t.Fatalf("Services: %v", services)
}
}
func TestFSM_KVSDelete(t *testing.T) {
t.Parallel()
logger := testutil.Logger(t)
fsm, err := New(nil, logger)
if err != nil {
t.Fatalf("err: %v", err)
}
req := structs.KVSRequest{
Datacenter: "dc1",
Op: api.KVSet,
DirEnt: structs.DirEntry{
Key: "/test/path",
Flags: 0,
Value: []byte("test"),
},
}
buf, err := structs.Encode(structs.KVSRequestType, req)
if err != nil {
t.Fatalf("err: %v", err)
}
resp := fsm.Apply(makeLog(buf))
if resp != nil {
t.Fatalf("resp: %v", resp)
}
// Run the delete
req.Op = api.KVDelete
buf, err = structs.Encode(structs.KVSRequestType, req)
if err != nil {
t.Fatalf("err: %v", err)
}
resp = fsm.Apply(makeLog(buf))
if resp != nil {
t.Fatalf("resp: %v", resp)
}
// Verify key is not set
_, d, err := fsm.state.KVSGet(nil, "/test/path", nil)
if err != nil {
t.Fatalf("err: %v", err)
}
if d != nil {
t.Fatalf("key present")
}
}
func TestFSM_KVSDeleteTree(t *testing.T) {
t.Parallel()
logger := testutil.Logger(t)
fsm, err := New(nil, logger)
if err != nil {
t.Fatalf("err: %v", err)
}
req := structs.KVSRequest{
Datacenter: "dc1",
Op: api.KVSet,
DirEnt: structs.DirEntry{
Key: "/test/path",
Flags: 0,
Value: []byte("test"),
},
}
buf, err := structs.Encode(structs.KVSRequestType, req)
if err != nil {
t.Fatalf("err: %v", err)
}
resp := fsm.Apply(makeLog(buf))
if resp != nil {
t.Fatalf("resp: %v", resp)
}
// Run the delete tree
req.Op = api.KVDeleteTree
req.DirEnt.Key = "/test"
buf, err = structs.Encode(structs.KVSRequestType, req)
if err != nil {
t.Fatalf("err: %v", err)
}
resp = fsm.Apply(makeLog(buf))
if resp != nil {
t.Fatalf("resp: %v", resp)
}
// Verify key is not set
_, d, err := fsm.state.KVSGet(nil, "/test/path", nil)
if err != nil {
t.Fatalf("err: %v", err)
}
if d != nil {
t.Fatalf("key present")
}
}
func TestFSM_KVSDeleteCheckAndSet(t *testing.T) {
t.Parallel()
logger := testutil.Logger(t)
fsm, err := New(nil, logger)
if err != nil {
t.Fatalf("err: %v", err)
}
req := structs.KVSRequest{
Datacenter: "dc1",
Op: api.KVSet,
DirEnt: structs.DirEntry{
Key: "/test/path",
Flags: 0,
Value: []byte("test"),
},
}
buf, err := structs.Encode(structs.KVSRequestType, req)
if err != nil {
t.Fatalf("err: %v", err)
}
resp := fsm.Apply(makeLog(buf))
if resp != nil {
t.Fatalf("resp: %v", resp)
}
// Verify key is set
_, d, err := fsm.state.KVSGet(nil, "/test/path", nil)
if err != nil {
t.Fatalf("err: %v", err)
}
if d == nil {
t.Fatalf("key missing")
}
// Run the check-and-set
req.Op = api.KVDeleteCAS
req.DirEnt.ModifyIndex = d.ModifyIndex
buf, err = structs.Encode(structs.KVSRequestType, req)
if err != nil {
t.Fatalf("err: %v", err)
}
resp = fsm.Apply(makeLog(buf))
if resp.(bool) != true {
t.Fatalf("resp: %v", resp)
}
// Verify key is gone
_, d, err = fsm.state.KVSGet(nil, "/test/path", nil)
if err != nil {
t.Fatalf("err: %v", err)
}
if d != nil {
t.Fatalf("bad: %v", d)
}
}
func TestFSM_KVSCheckAndSet(t *testing.T) {
t.Parallel()
logger := testutil.Logger(t)
fsm, err := New(nil, logger)
if err != nil {
t.Fatalf("err: %v", err)
}
req := structs.KVSRequest{
Datacenter: "dc1",
Op: api.KVSet,
DirEnt: structs.DirEntry{
Key: "/test/path",
Flags: 0,
Value: []byte("test"),
},
}
buf, err := structs.Encode(structs.KVSRequestType, req)
if err != nil {
t.Fatalf("err: %v", err)
}
resp := fsm.Apply(makeLog(buf))
if resp != nil {
t.Fatalf("resp: %v", resp)
}
// Verify key is set
_, d, err := fsm.state.KVSGet(nil, "/test/path", nil)
if err != nil {
t.Fatalf("err: %v", err)
}
if d == nil {
t.Fatalf("key missing")
}
// Run the check-and-set
req.Op = api.KVCAS
req.DirEnt.ModifyIndex = d.ModifyIndex
req.DirEnt.Value = []byte("zip")
buf, err = structs.Encode(structs.KVSRequestType, req)
if err != nil {
t.Fatalf("err: %v", err)
}
resp = fsm.Apply(makeLog(buf))
if resp.(bool) != true {
t.Fatalf("resp: %v", resp)
}
// Verify key is updated
_, d, err = fsm.state.KVSGet(nil, "/test/path", nil)
if err != nil {
t.Fatalf("err: %v", err)
}
if string(d.Value) != "zip" {
t.Fatalf("bad: %v", d)
}
}
func TestFSM_KVSLock(t *testing.T) {
t.Parallel()
logger := testutil.Logger(t)
fsm, err := New(nil, logger)
if err != nil {
t.Fatalf("err: %v", err)
}
err = fsm.state.EnsureNode(1, &structs.Node{Node: "foo", Address: "127.0.0.1"})
if err != nil {
t.Fatalf("err: %v", err)
}
session := &structs.Session{ID: generateUUID(), Node: "foo"}
err = fsm.state.SessionCreate(2, session)
if err != nil {
t.Fatalf("err: %v", err)
}
req := structs.KVSRequest{
Datacenter: "dc1",
Op: api.KVLock,
DirEnt: structs.DirEntry{
Key: "/test/path",
Value: []byte("test"),
Session: session.ID,
},
}
buf, err := structs.Encode(structs.KVSRequestType, req)
if err != nil {
t.Fatalf("err: %v", err)
}
resp := fsm.Apply(makeLog(buf))
if resp != true {
t.Fatalf("resp: %v", resp)
}
// Verify key is locked
_, d, err := fsm.state.KVSGet(nil, "/test/path", nil)
if err != nil {
t.Fatalf("err: %v", err)
}
if d == nil {
t.Fatalf("missing")
}
if d.LockIndex != 1 {
t.Fatalf("bad: %v", *d)
}
if d.Session != session.ID {
t.Fatalf("bad: %v", *d)
}
}
func TestFSM_KVSUnlock(t *testing.T) {
t.Parallel()
logger := testutil.Logger(t)
fsm, err := New(nil, logger)
if err != nil {
t.Fatalf("err: %v", err)
}
err = fsm.state.EnsureNode(1, &structs.Node{Node: "foo", Address: "127.0.0.1"})
if err != nil {
t.Fatalf("err: %v", err)
}
session := &structs.Session{ID: generateUUID(), Node: "foo"}
err = fsm.state.SessionCreate(2, session)
if err != nil {
t.Fatalf("err: %v", err)
}
req := structs.KVSRequest{
Datacenter: "dc1",
Op: api.KVLock,
DirEnt: structs.DirEntry{
Key: "/test/path",
Value: []byte("test"),
Session: session.ID,
},
}
buf, err := structs.Encode(structs.KVSRequestType, req)
if err != nil {
t.Fatalf("err: %v", err)
}
resp := fsm.Apply(makeLog(buf))
if resp != true {
t.Fatalf("resp: %v", resp)
}
req = structs.KVSRequest{
Datacenter: "dc1",
Op: api.KVUnlock,
DirEnt: structs.DirEntry{
Key: "/test/path",
Value: []byte("test"),
Session: session.ID,
},
}
buf, err = structs.Encode(structs.KVSRequestType, req)
if err != nil {
t.Fatalf("err: %v", err)
}
resp = fsm.Apply(makeLog(buf))
if resp != true {
t.Fatalf("resp: %v", resp)
}
// Verify key is unlocked
_, d, err := fsm.state.KVSGet(nil, "/test/path", nil)
if err != nil {
t.Fatalf("err: %v", err)
}
if d == nil {
t.Fatalf("missing")
}
if d.LockIndex != 1 {
t.Fatalf("bad: %v", *d)
}
if d.Session != "" {
t.Fatalf("bad: %v", *d)
}
}
func TestFSM_CoordinateUpdate(t *testing.T) {
t.Parallel()
logger := testutil.Logger(t)
fsm, err := New(nil, logger)
if err != nil {
t.Fatalf("err: %v", err)
}
// Register some nodes.
err = fsm.state.EnsureNode(1, &structs.Node{Node: "node1", Address: "127.0.0.1"})
if err != nil {
t.Fatalf("err: %v", err)
}
err = fsm.state.EnsureNode(2, &structs.Node{Node: "node2", Address: "127.0.0.1"})
if err != nil {
t.Fatalf("err: %v", err)
}
// Write a batch of two coordinates.
updates := structs.Coordinates{
&structs.Coordinate{
Node: "node1",
Partition: structs.NodeEnterpriseMetaInDefaultPartition().PartitionOrEmpty(),
Coord: generateRandomCoordinate(),
},
&structs.Coordinate{
Node: "node2",
Partition: structs.NodeEnterpriseMetaInDefaultPartition().PartitionOrEmpty(),
Coord: generateRandomCoordinate(),
},
}
buf, err := structs.Encode(structs.CoordinateBatchUpdateType, updates)
if err != nil {
t.Fatalf("err: %v", err)
}
resp := fsm.Apply(makeLog(buf))
if resp != nil {
t.Fatalf("resp: %v", resp)
}
// Read back the two coordinates to make sure they got updated.
_, coords, err := fsm.state.Coordinates(nil, nil)
if err != nil {
t.Fatalf("err: %s", err)
}
require.Equal(t, updates, coords)
}
func TestFSM_SessionCreate_Destroy(t *testing.T) {
t.Parallel()
logger := testutil.Logger(t)
fsm, err := New(nil, logger)
if err != nil {
t.Fatalf("err: %v", err)
}
err = fsm.state.EnsureNode(1, &structs.Node{Node: "foo", Address: "127.0.0.1"})
if err != nil {
t.Fatalf("err: %v", err)
}
err = fsm.state.EnsureCheck(2, &structs.HealthCheck{
Node: "foo",
CheckID: "web",
Status: api.HealthPassing,
})
if err != nil {
t.Fatalf("err: %v", err)
}
// Create a new session
req := structs.SessionRequest{
Datacenter: "dc1",
Op: structs.SessionCreate,
Session: structs.Session{
ID: generateUUID(),
Node: "foo",
Checks: []types.CheckID{"web"},
},
}
buf, err := structs.Encode(structs.SessionRequestType, req)
if err != nil {
t.Fatalf("err: %v", err)
}
resp := fsm.Apply(makeLog(buf))
if err, ok := resp.(error); ok {
t.Fatalf("resp: %v", err)
}
// Get the session
id := resp.(string)
_, session, err := fsm.state.SessionGet(nil, id, nil)
if err != nil {
t.Fatalf("err: %v", err)
}
if session == nil {
t.Fatalf("missing")
}
// Verify the session
if session.ID != id {
t.Fatalf("bad: %v", *session)
}
if session.Node != "foo" {
t.Fatalf("bad: %v", *session)
}
if session.Checks[0] != "web" {
t.Fatalf("bad: %v", *session)
}
// Try to destroy
destroy := structs.SessionRequest{
Datacenter: "dc1",
Op: structs.SessionDestroy,
Session: structs.Session{
ID: id,
},
}
buf, err = structs.Encode(structs.SessionRequestType, destroy)
if err != nil {
t.Fatalf("err: %v", err)
}
resp = fsm.Apply(makeLog(buf))
if resp != nil {
t.Fatalf("resp: %v", resp)
}
_, session, err = fsm.state.SessionGet(nil, id, nil)
if err != nil {
t.Fatalf("err: %v", err)
}
if session != nil {
t.Fatalf("should be destroyed")
}
}
func TestFSM_PreparedQuery_CRUD(t *testing.T) {
t.Parallel()
logger := testutil.Logger(t)
fsm, err := New(nil, logger)
if err != nil {
t.Fatalf("err: %v", err)
}
// Register a service to query on.
err = fsm.state.EnsureNode(1, &structs.Node{Node: "foo", Address: "127.0.0.1"})
if err != nil {
t.Fatalf("err: %v", err)
}
err = fsm.state.EnsureService(2, "foo", &structs.NodeService{ID: "web", Service: "web", Tags: nil, Address: "127.0.0.1", Port: 80})
if err != nil {
t.Fatalf("err: %v", err)
}
// Create a new query.
query := structs.PreparedQueryRequest{
Op: structs.PreparedQueryCreate,
Query: &structs.PreparedQuery{
ID: generateUUID(),
Service: structs.ServiceQuery{
Service: "web",
},
},
}
{
buf, err := structs.Encode(structs.PreparedQueryRequestType, query)
if err != nil {
t.Fatalf("err: %v", err)
}
resp := fsm.Apply(makeLog(buf))
if resp != nil {
t.Fatalf("resp: %v", resp)
}
}
// Verify it's in the state store.
{
_, actual, err := fsm.state.PreparedQueryGet(nil, query.Query.ID)
if err != nil {
t.Fatalf("err: %s", err)
}
actual.CreateIndex, actual.ModifyIndex = 0, 0
if !reflect.DeepEqual(actual, query.Query) {
t.Fatalf("bad: %v", actual)
}
}
// Make an update to the query.
query.Op = structs.PreparedQueryUpdate
query.Query.Name = "my-query"
{
buf, err := structs.Encode(structs.PreparedQueryRequestType, query)
if err != nil {
t.Fatalf("err: %v", err)
}
resp := fsm.Apply(makeLog(buf))
if resp != nil {
t.Fatalf("resp: %v", resp)
}
}
// Verify the update.
{
_, actual, err := fsm.state.PreparedQueryGet(nil, query.Query.ID)
if err != nil {
t.Fatalf("err: %s", err)
}
actual.CreateIndex, actual.ModifyIndex = 0, 0
if !reflect.DeepEqual(actual, query.Query) {
t.Fatalf("bad: %v", actual)
}
}
// Delete the query.
query.Op = structs.PreparedQueryDelete
{
buf, err := structs.Encode(structs.PreparedQueryRequestType, query)
if err != nil {
t.Fatalf("err: %v", err)
}
resp := fsm.Apply(makeLog(buf))
if resp != nil {
t.Fatalf("resp: %v", resp)
}
}
// Make sure it's gone.
{
_, actual, err := fsm.state.PreparedQueryGet(nil, query.Query.ID)
if err != nil {
t.Fatalf("err: %s", err)
}
if actual != nil {
t.Fatalf("bad: %v", actual)
}
}
}
func TestFSM_TombstoneReap(t *testing.T) {
t.Parallel()
logger := testutil.Logger(t)
fsm, err := New(nil, logger)
if err != nil {
t.Fatalf("err: %v", err)
}
// Create some tombstones
err = fsm.state.KVSSet(11, &structs.DirEntry{
Key: "/remove",
Value: []byte("foo"),
})
if err != nil {
t.Fatalf("err: %v", err)
}
err = fsm.state.KVSDelete(12, "/remove", nil)
if err != nil {
t.Fatalf("err: %v", err)
}
idx, _, err := fsm.state.KVSList(nil, "/remove", nil)
if err != nil {
t.Fatalf("err: %s", err)
}
if idx != 12 {
t.Fatalf("bad index: %d", idx)
}
// Create a new reap request
req := structs.TombstoneRequest{
Datacenter: "dc1",
Op: structs.TombstoneReap,
ReapIndex: 12,
}
buf, err := structs.Encode(structs.TombstoneRequestType, req)
if err != nil {
t.Fatalf("err: %v", err)
}
resp := fsm.Apply(makeLog(buf))
if err, ok := resp.(error); ok {
t.Fatalf("resp: %v", err)
}
// Verify the tombstones are gone
snap := fsm.state.Snapshot()
defer snap.Close()
stones, err := snap.Tombstones()
if err != nil {
t.Fatalf("err: %s", err)
}
if stones.Next() != nil {
t.Fatalf("unexpected extra tombstones")
}
}
func TestFSM_Txn(t *testing.T) {
t.Parallel()
logger := testutil.Logger(t)
fsm, err := New(nil, logger)
if err != nil {
t.Fatalf("err: %v", err)
}
// Set a key using a transaction.
req := structs.TxnRequest{
Datacenter: "dc1",
Ops: structs.TxnOps{
&structs.TxnOp{
KV: &structs.TxnKVOp{
Verb: api.KVSet,
DirEnt: structs.DirEntry{
Key: "/test/path",
Flags: 0,
Value: []byte("test"),
},
},
},
},
}
buf, err := structs.Encode(structs.TxnRequestType, req)
if err != nil {
t.Fatalf("err: %v", err)
}
resp := fsm.Apply(makeLog(buf))
if _, ok := resp.(structs.TxnResponse); !ok {
t.Fatalf("bad response type: %T", resp)
}
// Verify key is set directly in the state store.
_, d, err := fsm.state.KVSGet(nil, "/test/path", nil)
if err != nil {
t.Fatalf("err: %v", err)
}
if d == nil {
t.Fatalf("missing")
}
}
func TestFSM_Autopilot(t *testing.T) {
t.Parallel()
logger := testutil.Logger(t)
fsm, err := New(nil, logger)
if err != nil {
t.Fatalf("err: %v", err)
}
// Set the autopilot config using a request.
req := structs.AutopilotSetConfigRequest{
Datacenter: "dc1",
Config: structs.AutopilotConfig{
CleanupDeadServers: true,
LastContactThreshold: 10 * time.Second,
MaxTrailingLogs: 300,
},
}
buf, err := structs.Encode(structs.AutopilotRequestType, req)
if err != nil {
t.Fatalf("err: %v", err)
}
resp := fsm.Apply(makeLog(buf))
if _, ok := resp.(error); ok {
t.Fatalf("bad: %v", resp)
}
// Verify key is set directly in the state store.
_, config, err := fsm.state.AutopilotConfig()
if err != nil {
t.Fatalf("err: %v", err)
}
if config.CleanupDeadServers != req.Config.CleanupDeadServers {
t.Fatalf("bad: %v", config.CleanupDeadServers)
}
if config.LastContactThreshold != req.Config.LastContactThreshold {
t.Fatalf("bad: %v", config.LastContactThreshold)
}
if config.MaxTrailingLogs != req.Config.MaxTrailingLogs {
t.Fatalf("bad: %v", config.MaxTrailingLogs)
}
// Now use CAS and provide an old index
req.CAS = true
req.Config.CleanupDeadServers = false
req.Config.ModifyIndex = config.ModifyIndex - 1
buf, err = structs.Encode(structs.AutopilotRequestType, req)
if err != nil {
t.Fatalf("err: %v", err)
}
resp = fsm.Apply(makeLog(buf))
if _, ok := resp.(error); ok {
t.Fatalf("bad: %v", resp)
}
_, config, err = fsm.state.AutopilotConfig()
if err != nil {
t.Fatalf("err: %v", err)
}
if !config.CleanupDeadServers {
t.Fatalf("bad: %v", config.CleanupDeadServers)
}
}
func TestFSM_Intention_CRUD(t *testing.T) {
t.Parallel()
logger := testutil.Logger(t)
fsm, err := New(nil, logger)
assert.Nil(t, err)
// Create a new intention.
ixn := structs.IntentionRequest{
Datacenter: "dc1",
Op: structs.IntentionOpCreate,
Intention: structs.TestIntention(t),
}
ixn.Intention.ID = generateUUID()
//nolint:staticcheck
ixn.Intention.UpdatePrecedence()
{
buf, err := structs.Encode(structs.IntentionRequestType, ixn)
assert.Nil(t, err)
assert.Nil(t, fsm.Apply(makeLog(buf)))
}
// Verify it's in the state store.
{
_, _, actual, err := fsm.state.IntentionGet(nil, ixn.Intention.ID)
assert.Nil(t, err)
actual.CreateIndex, actual.ModifyIndex = 0, 0
actual.CreatedAt = ixn.Intention.CreatedAt
actual.UpdatedAt = ixn.Intention.UpdatedAt
assert.Equal(t, ixn.Intention, actual)
}
// Make an update
ixn.Op = structs.IntentionOpUpdate
ixn.Intention.SourceName = "api"
{
buf, err := structs.Encode(structs.IntentionRequestType, ixn)
assert.Nil(t, err)
assert.Nil(t, fsm.Apply(makeLog(buf)))
}
// Verify the update.
{
_, _, actual, err := fsm.state.IntentionGet(nil, ixn.Intention.ID)
assert.Nil(t, err)
actual.CreateIndex, actual.ModifyIndex = 0, 0
actual.CreatedAt = ixn.Intention.CreatedAt
actual.UpdatedAt = ixn.Intention.UpdatedAt
assert.Equal(t, ixn.Intention, actual)
}
// Delete
ixn.Op = structs.IntentionOpDelete
{
buf, err := structs.Encode(structs.IntentionRequestType, ixn)
assert.Nil(t, err)
assert.Nil(t, fsm.Apply(makeLog(buf)))
}
// Make sure it's gone.
{
_, _, actual, err := fsm.state.IntentionGet(nil, ixn.Intention.ID)
assert.Nil(t, err)
assert.Nil(t, actual)
}
}
func TestFSM_CAConfig(t *testing.T) {
t.Parallel()
logger := testutil.Logger(t)
fsm, err := New(nil, logger)
assert.Nil(t, err)
// Set the autopilot config using a request.
req := structs.CARequest{
Op: structs.CAOpSetConfig,
Config: &structs.CAConfiguration{
Provider: "consul",
Config: map[string]interface{}{
"PrivateKey": "asdf",
"RootCert": "qwer",
"IntermediateCertTTL": 365 * 24 * time.Hour,
},
},
}
buf, err := structs.Encode(structs.ConnectCARequestType, req)
assert.Nil(t, err)
resp := fsm.Apply(makeLog(buf))
if _, ok := resp.(error); ok {
t.Fatalf("bad: %v", resp)
}
// Verify key is set directly in the state store.
_, config, err := fsm.state.CAConfig(nil)
if err != nil {
t.Fatalf("err: %v", err)
}
var conf *structs.ConsulCAProviderConfig
if err := mapstructure.WeakDecode(config.Config, &conf); err != nil {
t.Fatalf("error decoding config: %s, %v", err, config.Config)
}
if got, want := config.Provider, req.Config.Provider; got != want {
t.Fatalf("got %v, want %v", got, want)
}
if got, want := conf.PrivateKey, "asdf"; got != want {
t.Fatalf("got %v, want %v", got, want)
}
if got, want := conf.RootCert, "qwer"; got != want {
t.Fatalf("got %v, want %v", got, want)
}
if got, want := conf.IntermediateCertTTL, 365*24*time.Hour; got != want {
t.Fatalf("got %v, want %v", got, want)
}
// Now use CAS and provide an old index
req.Config.Provider = "static"
req.Config.ModifyIndex = config.ModifyIndex - 1
buf, err = structs.Encode(structs.ConnectCARequestType, req)
if err != nil {
t.Fatalf("err: %v", err)
}
resp = fsm.Apply(makeLog(buf))
if _, ok := resp.(error); ok {
t.Fatalf("bad: %v", resp)
}
_, config, err = fsm.state.CAConfig(nil)
assert.Nil(t, err)
if config.Provider != "static" {
t.Fatalf("bad: %v", config.Provider)
}
}
func TestFSM_CARoots(t *testing.T) {
t.Parallel()
logger := testutil.Logger(t)
fsm, err := New(nil, logger)
assert.Nil(t, err)
// Roots
ca1 := connect.TestCA(t, nil)
ca2 := connect.TestCA(t, nil)
ca2.Active = false
// Create a new request.
req := structs.CARequest{
Op: structs.CAOpSetRoots,
Roots: []*structs.CARoot{ca1, ca2},
}
{
buf, err := structs.Encode(structs.ConnectCARequestType, req)
assert.Nil(t, err)
assert.True(t, fsm.Apply(makeLog(buf)).(bool))
}
// Verify it's in the state store.
{
_, roots, err := fsm.state.CARoots(nil)
assert.Nil(t, err)
assert.Len(t, roots, 2)
}
}
func TestFSM_CABuiltinProvider(t *testing.T) {
t.Parallel()
logger := testutil.Logger(t)
fsm, err := New(nil, logger)
assert.Nil(t, err)
// Provider state.
expected := &structs.CAConsulProviderState{
ID: "foo",
PrivateKey: "a",
RootCert: "b",
RaftIndex: structs.RaftIndex{
CreateIndex: 1,
ModifyIndex: 1,
},
}
// Create a new request.
req := structs.CARequest{
Op: structs.CAOpSetProviderState,
ProviderState: expected,
}
{
buf, err := structs.Encode(structs.ConnectCARequestType, req)
assert.Nil(t, err)
assert.True(t, fsm.Apply(makeLog(buf)).(bool))
}
// Verify it's in the state store.
{
_, state, err := fsm.state.CAProviderState("foo")
assert.Nil(t, err)
assert.Equal(t, expected, state)
}
}
func TestFSM_ConfigEntry(t *testing.T) {
t.Parallel()
logger := testutil.Logger(t)
fsm, err := New(nil, logger)
require.NoError(t, err)
// Create a simple config entry
entry := &structs.ProxyConfigEntry{
Kind: structs.ProxyDefaults,
Name: "global",
Config: map[string]interface{}{
"foo": "bar",
},
EnterpriseMeta: *structs.DefaultEnterpriseMetaInDefaultPartition(),
}
// Create a new request.
req := &structs.ConfigEntryRequest{
Op: structs.ConfigEntryUpsert,
Entry: entry,
}
{
buf, err := structs.Encode(structs.ConfigEntryRequestType, req)
require.NoError(t, err)
resp := fsm.Apply(makeLog(buf))
if _, ok := resp.(error); ok {
t.Fatalf("bad: %v", resp)
}
}
// Verify it's in the state store.
{
_, config, err := fsm.state.ConfigEntry(nil, structs.ProxyDefaults, "global", nil)
require.NoError(t, err)
entry.RaftIndex.CreateIndex = 1
entry.RaftIndex.ModifyIndex = 1
require.Equal(t, entry, config)
}
}
func TestFSM_ConfigEntry_StatusCAS(t *testing.T) {
t.Parallel()
logger := testutil.Logger(t)
fsm, err := New(nil, logger)
require.NoError(t, err)
// Create a simple config entry
entry := &structs.APIGatewayConfigEntry{
Kind: structs.APIGateway,
Name: "global",
EnterpriseMeta: *structs.DefaultEnterpriseMetaInDefaultPartition(),
Status: structs.Status{
Conditions: []structs.Condition{{
Status: string(api.ConditionStatusTrue),
}},
},
}
// Create a new request.
req := &structs.ConfigEntryRequest{
Op: structs.ConfigEntryUpsertCAS,
Entry: entry,
}
{
buf, err := structs.Encode(structs.ConfigEntryRequestType, req)
require.NoError(t, err)
resp := fsm.Apply(makeLog(buf))
if _, ok := resp.(error); ok {
t.Fatalf("bad: %v", resp)
}
}
// Verify it's in the state store.
{
_, config, err := fsm.state.ConfigEntry(nil, structs.APIGateway, "global", nil)
require.NoError(t, err)
entry.RaftIndex.CreateIndex = 1
entry.RaftIndex.ModifyIndex = 1
require.Equal(t, entry.DefaultStatus(), config.(*structs.APIGatewayConfigEntry).GetStatus())
}
// do a status update
entry.Status = structs.Status{
Conditions: []structs.Condition{{
Status: string(api.ConditionStatusTrue),
}},
}
req = &structs.ConfigEntryRequest{
Op: structs.ConfigEntryUpsertWithStatusCAS,
Entry: entry,
}
{
buf, err := structs.Encode(structs.ConfigEntryRequestType, req)
require.NoError(t, err)
resp := fsm.Apply(makeLog(buf))
if _, ok := resp.(error); ok {
t.Fatalf("bad: %v", resp)
}
}
// Verify it's in the state store.
{
_, config, err := fsm.state.ConfigEntry(nil, structs.APIGateway, "global", nil)
require.NoError(t, err)
entry.RaftIndex.ModifyIndex = 2
conditions := config.(*structs.APIGatewayConfigEntry).Status.Conditions
require.Len(t, conditions, 1)
require.Equal(t, string(api.ConditionStatusTrue), conditions[0].Status)
}
// attempt to change the status with a regular update and make sure it's ignored
entry.Status = structs.Status{
Conditions: []structs.Condition{{
Status: "Bar",
}},
}
req = &structs.ConfigEntryRequest{
Op: structs.ConfigEntryUpsertCAS,
Entry: entry,
}
{
buf, err := structs.Encode(structs.ConfigEntryRequestType, req)
require.NoError(t, err)
resp := fsm.Apply(makeLog(buf))
if _, ok := resp.(error); ok {
t.Fatalf("bad: %v", resp)
}
}
// Verify it's in the state store.
{
_, config, err := fsm.state.ConfigEntry(nil, structs.APIGateway, "global", nil)
require.NoError(t, err)
conditions := config.(*structs.APIGatewayConfigEntry).Status.Conditions
require.Len(t, conditions, 1)
require.Equal(t, string(api.ConditionStatusTrue), conditions[0].Status)
}
}
func TestFSM_ConfigEntry_DeleteCAS(t *testing.T) {
t.Parallel()
logger := testutil.Logger(t)
fsm, err := New(nil, logger)
require.NoError(t, err)
// Create a simple config entry and write it to the state store.
entry := &structs.ServiceConfigEntry{
Kind: structs.ServiceDefaults,
Name: "global",
}
require.NoError(t, fsm.state.EnsureConfigEntry(1, entry))
// Raft index is populated by EnsureConfigEntry, hold on to it so that we can
// restore it later.
raftIndex := entry.RaftIndex
require.NotZero(t, raftIndex.ModifyIndex)
// Attempt a CAS delete with an invalid index.
entry = entry.Clone()
entry.RaftIndex = structs.RaftIndex{
ModifyIndex: 99,
}
req := &structs.ConfigEntryRequest{
Op: structs.ConfigEntryDeleteCAS,
Entry: entry,
}
buf, err := structs.Encode(structs.ConfigEntryRequestType, req)
require.NoError(t, err)
// Expect to get boolean false back.
rsp := fsm.Apply(makeLog(buf))
didDelete, isBool := rsp.(bool)
require.True(t, isBool)
require.False(t, didDelete)
// Attempt a CAS delete with a valid index.
entry.RaftIndex = raftIndex
buf, err = structs.Encode(structs.ConfigEntryRequestType, req)
require.NoError(t, err)
// Expect to get boolean true back.
rsp = fsm.Apply(makeLog(buf))
didDelete, isBool = rsp.(bool)
require.True(t, isBool)
require.True(t, didDelete)
}
func TestFSM_Resources(t *testing.T) {
t.Parallel()
logger := testutil.Logger(t)
handle := &testRaftHandle{}
storageBackend := newStorageBackend(t, handle)
fsm := NewFromDeps(Deps{
Logger: logger,
NewStateStore: func() *state.Store { return state.NewStateStore(nil) },
StorageBackend: storageBackend,
})
handle.apply = func(msg []byte) (any, error) {
buf := append(
[]byte{uint8(structs.ResourceOperationType)},
msg...,
)
return fsm.Apply(makeLog(buf)), nil
}
resource, err := storageBackend.WriteCAS(context.Background(), &pbresource.Resource{
Id: &pbresource.ID{
Type: &pbresource.Type{
Group: "test",
GroupVersion: "v1",
Kind: "foo",
},
Tenancy: &pbresource.Tenancy{
Partition: "default",
PeerName: "local",
Namespace: "default",
},
Name: "bar",
Uid: "a",
},
})
require.NoError(t, err)
storedResource, err := storageBackend.Read(context.Background(), storage.EventualConsistency, resource.Id)
require.NoError(t, err)
prototest.AssertDeepEqual(t, resource, storedResource)
}
// This adapts another test by chunking the encoded data and then performing
// out-of-order applies of half the logs. It then snapshots, restores to a new
// FSM, and applies the rest. The goal is to verify that chunking snapshotting
// works as expected.
func TestFSM_Chunking_Lifecycle(t *testing.T) {
if testing.Short() {
t.Skip("too slow for testing.Short")
}
t.Parallel()
logger := testutil.Logger(t)
fsm := NewFromDeps(Deps{
Logger: logger,
NewStateStore: func() *state.Store {
return state.NewStateStore(nil)
},
StorageBackend: newStorageBackend(t, nil),
})
var logOfLogs [][]*raft.Log
for i := 0; i < 10; i++ {
req := structs.RegisterRequest{
Datacenter: "dc1",
Node: fmt.Sprintf("foo%d", i),
Address: "127.0.0.1",
Service: &structs.NodeService{
ID: "db",
Service: "db",
Tags: []string{"primary"},
Port: 8000,
},
Check: &structs.HealthCheck{
Node: fmt.Sprintf("foo%d", i),
CheckID: "db",
Name: "db connectivity",
Status: api.HealthPassing,
ServiceID: "db",
},
}
buf, err := structs.Encode(structs.RegisterRequestType, req)
require.NoError(t, err)
var logs []*raft.Log
for j, b := range buf {
chunkInfo := &raftchunkingtypes.ChunkInfo{
OpNum: uint64(32 + i),
SequenceNum: uint32(j),
NumChunks: uint32(len(buf)),
}
chunkBytes, err := proto.Marshal(chunkInfo)
require.NoError(t, err)
logs = append(logs, &raft.Log{
Data: []byte{b},
Extensions: chunkBytes,
})
}
logOfLogs = append(logOfLogs, logs)
}
// The reason for the skipping is to test out-of-order applies which are
// theoretically possible. Apply some logs from each set of chunks, but not
// the full set, and out of order.
for _, logs := range logOfLogs {
resp := fsm.chunker.Apply(logs[8])
assert.Nil(t, resp)
resp = fsm.chunker.Apply(logs[0])
assert.Nil(t, resp)
resp = fsm.chunker.Apply(logs[3])
assert.Nil(t, resp)
}
// Verify we are not registered
for i := 0; i < 10; i++ {
_, node, err := fsm.state.GetNode(fmt.Sprintf("foo%d", i), nil, "")
require.NoError(t, err)
assert.Nil(t, node)
}
// Snapshot, restore elsewhere, apply the rest of the logs, make sure it
// looks right
snap, err := fsm.Snapshot()
require.NoError(t, err)
defer snap.Release()
sinkBuf := bytes.NewBuffer(nil)
sink := &MockSink{sinkBuf, false}
err = snap.Persist(sink)
require.NoError(t, err)
fsm2 := NewFromDeps(Deps{
Logger: logger,
NewStateStore: func() *state.Store {
return state.NewStateStore(nil)
},
StorageBackend: newStorageBackend(t, nil),
})
err = fsm2.Restore(sink)
require.NoError(t, err)
// Verify we are still not registered
for i := 0; i < 10; i++ {
_, node, err := fsm2.state.GetNode(fmt.Sprintf("foo%d", i), nil, "")
require.NoError(t, err)
assert.Nil(t, node)
}
// Apply the rest of the logs
for _, logs := range logOfLogs {
var resp interface{}
for i, log := range logs {
switch i {
case 0, 3, 8:
default:
resp = fsm2.chunker.Apply(log)
if i != len(logs)-1 {
assert.Nil(t, resp)
}
}
}
_, ok := resp.(raftchunking.ChunkingSuccess)
assert.True(t, ok)
}
// Verify we are registered
for i := 0; i < 10; i++ {
_, node, err := fsm2.state.GetNode(fmt.Sprintf("foo%d", i), nil, "")
require.NoError(t, err)
assert.NotNil(t, node)
// Verify service registered
_, services, err := fsm2.state.NodeServices(nil, fmt.Sprintf("foo%d", i), structs.DefaultEnterpriseMetaInDefaultPartition(), "")
require.NoError(t, err)
require.NotNil(t, services)
_, ok := services.Services["db"]
assert.True(t, ok)
// Verify check
_, checks, err := fsm2.state.NodeChecks(nil, fmt.Sprintf("foo%d", i), nil, "")
require.NoError(t, err)
require.NotNil(t, checks)
assert.Equal(t, string(checks[0].CheckID), "db")
}
}
func TestFSM_Chunking_TermChange(t *testing.T) {
t.Parallel()
logger := testutil.Logger(t)
fsm, err := New(nil, logger)
require.NoError(t, err)
req := structs.RegisterRequest{
Datacenter: "dc1",
Node: "foo",
Address: "127.0.0.1",
Service: &structs.NodeService{
ID: "db",
Service: "db",
Tags: []string{"primary"},
Port: 8000,
},
Check: &structs.HealthCheck{
Node: "foo",
CheckID: "db",
Name: "db connectivity",
Status: api.HealthPassing,
ServiceID: "db",
},
}
buf, err := structs.Encode(structs.RegisterRequestType, req)
require.NoError(t, err)
// Only need two chunks to test this
chunks := [][]byte{
buf[0:2],
buf[2:],
}
var logs []*raft.Log
for i, b := range chunks {
chunkInfo := &raftchunkingtypes.ChunkInfo{
OpNum: uint64(32),
SequenceNum: uint32(i),
NumChunks: uint32(len(chunks)),
}
chunkBytes, err := proto.Marshal(chunkInfo)
if err != nil {
t.Fatal(err)
}
logs = append(logs, &raft.Log{
Term: uint64(i),
Data: b,
Extensions: chunkBytes,
})
}
// We should see nil for both
for _, log := range logs {
resp := fsm.chunker.Apply(log)
assert.Nil(t, resp)
}
// Now verify the other baseline, that when the term doesn't change we see
// non-nil. First make the chunker have a clean state, then set the terms
// to be the same.
err = fsm.chunker.RestoreState(nil)
if err != nil {
t.Fatalf("err: %v", err)
}
logs[1].Term = uint64(0)
// We should see nil only for the first one
for i, log := range logs {
resp := fsm.chunker.Apply(log)
if i == 0 {
assert.Nil(t, resp)
}
if i == 1 {
assert.NotNil(t, resp)
}
}
}
func newStorageBackend(t *testing.T, handle raftstorage.Handle) *raftstorage.Backend {
t.Helper()
backend, err := raftstorage.NewBackend(handle, testutil.Logger(t))
require.NoError(t, err)
ctx, cancel := context.WithCancel(context.Background())
t.Cleanup(cancel)
go backend.Run(ctx)
return backend
}
type testRaftHandle struct {
apply func(msg []byte) (any, error)
}
func (h *testRaftHandle) Apply(msg []byte) (any, error) { return h.apply(msg) }
func (testRaftHandle) IsLeader() bool { return true }
func (testRaftHandle) EnsureStrongConsistency(context.Context) error { return nil }
func (testRaftHandle) DialLeader() (*grpc.ClientConn, error) {
return nil, errors.New("DialLeader not implemented")
}