open-consul/consul/fsm_test.go
Frank Schroeder 9f8f258d4d Remove duplicate constants
This patch removes duplicate internal copies of constants in the structs
package which are also defined in the api package. The api.KVOp type
with all its values for the TXN endpoint and the api.HealthXXX constants
are now used throughout the codebase.

This resulted in some circular dependencies in the testutil package
which have been resolved by copying code and constants and moving the
WaitForLeader function into a separate testrpc package.
2017-04-20 09:54:49 -07:00

1422 lines
30 KiB
Go

package consul
import (
"bytes"
"fmt"
"os"
"reflect"
"testing"
"time"
"github.com/hashicorp/consul/api"
"github.com/hashicorp/consul/consul/state"
"github.com/hashicorp/consul/consul/structs"
"github.com/hashicorp/consul/lib"
"github.com/hashicorp/consul/types"
"github.com/hashicorp/go-uuid"
"github.com/hashicorp/raft"
)
type MockSink struct {
*bytes.Buffer
cancel bool
}
func (m *MockSink) ID() string {
return "Mock"
}
func (m *MockSink) Cancel() error {
m.cancel = true
return nil
}
func (m *MockSink) Close() error {
return nil
}
func makeLog(buf []byte) *raft.Log {
return &raft.Log{
Index: 1,
Term: 1,
Type: raft.LogCommand,
Data: buf,
}
}
func generateUUID() (ret string) {
var err error
if ret, err = uuid.GenerateUUID(); err != nil {
panic(fmt.Sprintf("Unable to generate a UUID, %v", err))
}
return ret
}
func TestFSM_RegisterNode(t *testing.T) {
fsm, err := NewFSM(nil, os.Stderr)
if err != nil {
t.Fatalf("err: %v", err)
}
req := structs.RegisterRequest{
Datacenter: "dc1",
Node: "foo",
Address: "127.0.0.1",
}
buf, err := structs.Encode(structs.RegisterRequestType, req)
if err != nil {
t.Fatalf("err: %v", err)
}
resp := fsm.Apply(makeLog(buf))
if resp != nil {
t.Fatalf("resp: %v", resp)
}
// Verify we are registered
_, node, err := fsm.state.GetNode("foo")
if err != nil {
t.Fatalf("err: %s", err)
}
if node == nil {
t.Fatalf("not found!")
}
if node.ModifyIndex != 1 {
t.Fatalf("bad index: %d", node.ModifyIndex)
}
// Verify service registered
_, services, err := fsm.state.NodeServices(nil, "foo")
if err != nil {
t.Fatalf("err: %s", err)
}
if len(services.Services) != 0 {
t.Fatalf("Services: %v", services)
}
}
func TestFSM_RegisterNode_Service(t *testing.T) {
fsm, err := NewFSM(nil, os.Stderr)
if err != nil {
t.Fatalf("err: %v", err)
}
req := structs.RegisterRequest{
Datacenter: "dc1",
Node: "foo",
Address: "127.0.0.1",
Service: &structs.NodeService{
ID: "db",
Service: "db",
Tags: []string{"master"},
Port: 8000,
},
Check: &structs.HealthCheck{
Node: "foo",
CheckID: "db",
Name: "db connectivity",
Status: api.HealthPassing,
ServiceID: "db",
},
}
buf, err := structs.Encode(structs.RegisterRequestType, req)
if err != nil {
t.Fatalf("err: %v", err)
}
resp := fsm.Apply(makeLog(buf))
if resp != nil {
t.Fatalf("resp: %v", resp)
}
// Verify we are registered
_, node, err := fsm.state.GetNode("foo")
if err != nil {
t.Fatalf("err: %s", err)
}
if node == nil {
t.Fatalf("not found!")
}
// Verify service registered
_, services, err := fsm.state.NodeServices(nil, "foo")
if err != nil {
t.Fatalf("err: %s", err)
}
if _, ok := services.Services["db"]; !ok {
t.Fatalf("not registered!")
}
// Verify check
_, checks, err := fsm.state.NodeChecks(nil, "foo")
if err != nil {
t.Fatalf("err: %s", err)
}
if checks[0].CheckID != "db" {
t.Fatalf("not registered!")
}
}
func TestFSM_DeregisterService(t *testing.T) {
fsm, err := NewFSM(nil, os.Stderr)
if err != nil {
t.Fatalf("err: %v", err)
}
req := structs.RegisterRequest{
Datacenter: "dc1",
Node: "foo",
Address: "127.0.0.1",
Service: &structs.NodeService{
ID: "db",
Service: "db",
Tags: []string{"master"},
Port: 8000,
},
}
buf, err := structs.Encode(structs.RegisterRequestType, req)
if err != nil {
t.Fatalf("err: %v", err)
}
resp := fsm.Apply(makeLog(buf))
if resp != nil {
t.Fatalf("resp: %v", resp)
}
dereg := structs.DeregisterRequest{
Datacenter: "dc1",
Node: "foo",
ServiceID: "db",
}
buf, err = structs.Encode(structs.DeregisterRequestType, dereg)
if err != nil {
t.Fatalf("err: %v", err)
}
resp = fsm.Apply(makeLog(buf))
if resp != nil {
t.Fatalf("resp: %v", resp)
}
// Verify we are registered
_, node, err := fsm.state.GetNode("foo")
if err != nil {
t.Fatalf("err: %s", err)
}
if node == nil {
t.Fatalf("not found!")
}
// Verify service not registered
_, services, err := fsm.state.NodeServices(nil, "foo")
if err != nil {
t.Fatalf("err: %s", err)
}
if _, ok := services.Services["db"]; ok {
t.Fatalf("db registered!")
}
}
func TestFSM_DeregisterCheck(t *testing.T) {
fsm, err := NewFSM(nil, os.Stderr)
if err != nil {
t.Fatalf("err: %v", err)
}
req := structs.RegisterRequest{
Datacenter: "dc1",
Node: "foo",
Address: "127.0.0.1",
Check: &structs.HealthCheck{
Node: "foo",
CheckID: "mem",
Name: "memory util",
Status: api.HealthPassing,
},
}
buf, err := structs.Encode(structs.RegisterRequestType, req)
if err != nil {
t.Fatalf("err: %v", err)
}
resp := fsm.Apply(makeLog(buf))
if resp != nil {
t.Fatalf("resp: %v", resp)
}
dereg := structs.DeregisterRequest{
Datacenter: "dc1",
Node: "foo",
CheckID: "mem",
}
buf, err = structs.Encode(structs.DeregisterRequestType, dereg)
if err != nil {
t.Fatalf("err: %v", err)
}
resp = fsm.Apply(makeLog(buf))
if resp != nil {
t.Fatalf("resp: %v", resp)
}
// Verify we are registered
_, node, err := fsm.state.GetNode("foo")
if err != nil {
t.Fatalf("err: %s", err)
}
if node == nil {
t.Fatalf("not found!")
}
// Verify check not registered
_, checks, err := fsm.state.NodeChecks(nil, "foo")
if err != nil {
t.Fatalf("err: %s", err)
}
if len(checks) != 0 {
t.Fatalf("check registered!")
}
}
func TestFSM_DeregisterNode(t *testing.T) {
fsm, err := NewFSM(nil, os.Stderr)
if err != nil {
t.Fatalf("err: %v", err)
}
req := structs.RegisterRequest{
Datacenter: "dc1",
Node: "foo",
Address: "127.0.0.1",
Service: &structs.NodeService{
ID: "db",
Service: "db",
Tags: []string{"master"},
Port: 8000,
},
Check: &structs.HealthCheck{
Node: "foo",
CheckID: "db",
Name: "db connectivity",
Status: api.HealthPassing,
ServiceID: "db",
},
}
buf, err := structs.Encode(structs.RegisterRequestType, req)
if err != nil {
t.Fatalf("err: %v", err)
}
resp := fsm.Apply(makeLog(buf))
if resp != nil {
t.Fatalf("resp: %v", resp)
}
dereg := structs.DeregisterRequest{
Datacenter: "dc1",
Node: "foo",
}
buf, err = structs.Encode(structs.DeregisterRequestType, dereg)
if err != nil {
t.Fatalf("err: %v", err)
}
resp = fsm.Apply(makeLog(buf))
if resp != nil {
t.Fatalf("resp: %v", resp)
}
// Verify we are not registered
_, node, err := fsm.state.GetNode("foo")
if err != nil {
t.Fatalf("err: %s", err)
}
if node != nil {
t.Fatalf("found!")
}
// Verify service not registered
_, services, err := fsm.state.NodeServices(nil, "foo")
if err != nil {
t.Fatalf("err: %s", err)
}
if services != nil {
t.Fatalf("Services: %v", services)
}
// Verify checks not registered
_, checks, err := fsm.state.NodeChecks(nil, "foo")
if err != nil {
t.Fatalf("err: %s", err)
}
if len(checks) != 0 {
t.Fatalf("Services: %v", services)
}
}
func TestFSM_SnapshotRestore(t *testing.T) {
fsm, err := NewFSM(nil, os.Stderr)
if err != nil {
t.Fatalf("err: %v", err)
}
// Add some state
fsm.state.EnsureNode(1, &structs.Node{Node: "foo", Address: "127.0.0.1"})
fsm.state.EnsureNode(2, &structs.Node{Node: "baz", Address: "127.0.0.2", TaggedAddresses: map[string]string{"hello": "1.2.3.4"}})
fsm.state.EnsureService(3, "foo", &structs.NodeService{ID: "web", Service: "web", Tags: nil, Address: "127.0.0.1", Port: 80})
fsm.state.EnsureService(4, "foo", &structs.NodeService{ID: "db", Service: "db", Tags: []string{"primary"}, Address: "127.0.0.1", Port: 5000})
fsm.state.EnsureService(5, "baz", &structs.NodeService{ID: "web", Service: "web", Tags: nil, Address: "127.0.0.2", Port: 80})
fsm.state.EnsureService(6, "baz", &structs.NodeService{ID: "db", Service: "db", Tags: []string{"secondary"}, Address: "127.0.0.2", Port: 5000})
fsm.state.EnsureCheck(7, &structs.HealthCheck{
Node: "foo",
CheckID: "web",
Name: "web connectivity",
Status: api.HealthPassing,
ServiceID: "web",
})
fsm.state.KVSSet(8, &structs.DirEntry{
Key: "/test",
Value: []byte("foo"),
})
session := &structs.Session{ID: generateUUID(), Node: "foo"}
fsm.state.SessionCreate(9, session)
acl := &structs.ACL{ID: generateUUID(), Name: "User Token"}
fsm.state.ACLSet(10, acl)
fsm.state.KVSSet(11, &structs.DirEntry{
Key: "/remove",
Value: []byte("foo"),
})
fsm.state.KVSDelete(12, "/remove")
idx, _, err := fsm.state.KVSList(nil, "/remove")
if err != nil {
t.Fatalf("err: %s", err)
}
if idx != 12 {
t.Fatalf("bad index: %d", idx)
}
updates := structs.Coordinates{
&structs.Coordinate{
Node: "baz",
Coord: generateRandomCoordinate(),
},
&structs.Coordinate{
Node: "foo",
Coord: generateRandomCoordinate(),
},
}
if err := fsm.state.CoordinateBatchUpdate(13, updates); err != nil {
t.Fatalf("err: %s", err)
}
query := structs.PreparedQuery{
ID: generateUUID(),
Service: structs.ServiceQuery{
Service: "web",
},
RaftIndex: structs.RaftIndex{
CreateIndex: 14,
ModifyIndex: 14,
},
}
if err := fsm.state.PreparedQuerySet(14, &query); err != nil {
t.Fatalf("err: %s", err)
}
autopilotConf := &structs.AutopilotConfig{
CleanupDeadServers: true,
LastContactThreshold: 100 * time.Millisecond,
MaxTrailingLogs: 222,
}
if err := fsm.state.AutopilotSetConfig(15, autopilotConf); err != nil {
t.Fatalf("err: %s", err)
}
// Snapshot
snap, err := fsm.Snapshot()
if err != nil {
t.Fatalf("err: %v", err)
}
defer snap.Release()
// Persist
buf := bytes.NewBuffer(nil)
sink := &MockSink{buf, false}
if err := snap.Persist(sink); err != nil {
t.Fatalf("err: %v", err)
}
// Try to restore on a new FSM
fsm2, err := NewFSM(nil, os.Stderr)
if err != nil {
t.Fatalf("err: %v", err)
}
// Do a restore
if err := fsm2.Restore(sink); err != nil {
t.Fatalf("err: %v", err)
}
// Verify the contents
_, nodes, err := fsm2.state.Nodes(nil)
if err != nil {
t.Fatalf("err: %s", err)
}
if len(nodes) != 2 {
t.Fatalf("bad: %v", nodes)
}
if nodes[0].Node != "baz" ||
nodes[0].Address != "127.0.0.2" ||
len(nodes[0].TaggedAddresses) != 1 ||
nodes[0].TaggedAddresses["hello"] != "1.2.3.4" {
t.Fatalf("bad: %v", nodes[0])
}
if nodes[1].Node != "foo" ||
nodes[1].Address != "127.0.0.1" ||
len(nodes[1].TaggedAddresses) != 0 {
t.Fatalf("bad: %v", nodes[1])
}
_, fooSrv, err := fsm2.state.NodeServices(nil, "foo")
if err != nil {
t.Fatalf("err: %s", err)
}
if len(fooSrv.Services) != 2 {
t.Fatalf("Bad: %v", fooSrv)
}
if !lib.StrContains(fooSrv.Services["db"].Tags, "primary") {
t.Fatalf("Bad: %v", fooSrv)
}
if fooSrv.Services["db"].Port != 5000 {
t.Fatalf("Bad: %v", fooSrv)
}
_, checks, err := fsm2.state.NodeChecks(nil, "foo")
if err != nil {
t.Fatalf("err: %s", err)
}
if len(checks) != 1 {
t.Fatalf("Bad: %v", checks)
}
// Verify key is set
_, d, err := fsm2.state.KVSGet(nil, "/test")
if err != nil {
t.Fatalf("err: %v", err)
}
if string(d.Value) != "foo" {
t.Fatalf("bad: %v", d)
}
// Verify session is restored
idx, s, err := fsm2.state.SessionGet(nil, session.ID)
if err != nil {
t.Fatalf("err: %v", err)
}
if s.Node != "foo" {
t.Fatalf("bad: %v", s)
}
if idx <= 1 {
t.Fatalf("bad index: %d", idx)
}
// Verify ACL is restored
_, a, err := fsm2.state.ACLGet(nil, acl.ID)
if err != nil {
t.Fatalf("err: %v", err)
}
if a.Name != "User Token" {
t.Fatalf("bad: %v", a)
}
if a.ModifyIndex <= 1 {
t.Fatalf("bad index: %d", idx)
}
// Verify tombstones are restored
func() {
snap := fsm2.state.Snapshot()
defer snap.Close()
stones, err := snap.Tombstones()
if err != nil {
t.Fatalf("err: %s", err)
}
stone := stones.Next().(*state.Tombstone)
if stone == nil {
t.Fatalf("missing tombstone")
}
if stone.Key != "/remove" || stone.Index != 12 {
t.Fatalf("bad: %v", stone)
}
if stones.Next() != nil {
t.Fatalf("unexpected extra tombstones")
}
}()
// Verify coordinates are restored
_, coords, err := fsm2.state.Coordinates(nil)
if err != nil {
t.Fatalf("err: %s", err)
}
if !reflect.DeepEqual(coords, updates) {
t.Fatalf("bad: %#v", coords)
}
// Verify queries are restored.
_, queries, err := fsm2.state.PreparedQueryList(nil)
if err != nil {
t.Fatalf("err: %s", err)
}
if len(queries) != 1 {
t.Fatalf("bad: %#v", queries)
}
if !reflect.DeepEqual(queries[0], &query) {
t.Fatalf("bad: %#v", queries[0])
}
// Verify autopilot config is restored.
_, restoredConf, err := fsm2.state.AutopilotConfig()
if err != nil {
t.Fatalf("err: %s", err)
}
if !reflect.DeepEqual(restoredConf, autopilotConf) {
t.Fatalf("bad: %#v, %#v", restoredConf, autopilotConf)
}
// Snapshot
snap, err = fsm2.Snapshot()
if err != nil {
t.Fatalf("err: %v", err)
}
defer snap.Release()
// Persist
buf = bytes.NewBuffer(nil)
sink = &MockSink{buf, false}
if err := snap.Persist(sink); err != nil {
t.Fatalf("err: %v", err)
}
// Try to restore on the old FSM and make sure it abandons the old state
// store.
abandonCh := fsm.state.AbandonCh()
if err := fsm.Restore(sink); err != nil {
t.Fatalf("err: %v", err)
}
select {
case <-abandonCh:
default:
t.Fatalf("bad")
}
}
func TestFSM_BadRestore(t *testing.T) {
// Create an FSM with some state.
fsm, err := NewFSM(nil, os.Stderr)
if err != nil {
t.Fatalf("err: %v", err)
}
fsm.state.EnsureNode(1, &structs.Node{Node: "foo", Address: "127.0.0.1"})
abandonCh := fsm.state.AbandonCh()
// Do a bad restore.
buf := bytes.NewBuffer([]byte("bad snapshot"))
sink := &MockSink{buf, false}
if err := fsm.Restore(sink); err == nil {
t.Fatalf("err: %v", err)
}
// Verify the contents didn't get corrupted.
_, nodes, err := fsm.state.Nodes(nil)
if err != nil {
t.Fatalf("err: %s", err)
}
if len(nodes) != 1 {
t.Fatalf("bad: %v", nodes)
}
if nodes[0].Node != "foo" ||
nodes[0].Address != "127.0.0.1" ||
len(nodes[0].TaggedAddresses) != 0 {
t.Fatalf("bad: %v", nodes[0])
}
// Verify the old state store didn't get abandoned.
select {
case <-abandonCh:
t.Fatalf("bad")
default:
}
}
func TestFSM_KVSDelete(t *testing.T) {
fsm, err := NewFSM(nil, os.Stderr)
if err != nil {
t.Fatalf("err: %v", err)
}
req := structs.KVSRequest{
Datacenter: "dc1",
Op: api.KVSet,
DirEnt: structs.DirEntry{
Key: "/test/path",
Flags: 0,
Value: []byte("test"),
},
}
buf, err := structs.Encode(structs.KVSRequestType, req)
if err != nil {
t.Fatalf("err: %v", err)
}
resp := fsm.Apply(makeLog(buf))
if resp != nil {
t.Fatalf("resp: %v", resp)
}
// Run the delete
req.Op = api.KVDelete
buf, err = structs.Encode(structs.KVSRequestType, req)
if err != nil {
t.Fatalf("err: %v", err)
}
resp = fsm.Apply(makeLog(buf))
if resp != nil {
t.Fatalf("resp: %v", resp)
}
// Verify key is not set
_, d, err := fsm.state.KVSGet(nil, "/test/path")
if err != nil {
t.Fatalf("err: %v", err)
}
if d != nil {
t.Fatalf("key present")
}
}
func TestFSM_KVSDeleteTree(t *testing.T) {
fsm, err := NewFSM(nil, os.Stderr)
if err != nil {
t.Fatalf("err: %v", err)
}
req := structs.KVSRequest{
Datacenter: "dc1",
Op: api.KVSet,
DirEnt: structs.DirEntry{
Key: "/test/path",
Flags: 0,
Value: []byte("test"),
},
}
buf, err := structs.Encode(structs.KVSRequestType, req)
if err != nil {
t.Fatalf("err: %v", err)
}
resp := fsm.Apply(makeLog(buf))
if resp != nil {
t.Fatalf("resp: %v", resp)
}
// Run the delete tree
req.Op = api.KVDeleteTree
req.DirEnt.Key = "/test"
buf, err = structs.Encode(structs.KVSRequestType, req)
if err != nil {
t.Fatalf("err: %v", err)
}
resp = fsm.Apply(makeLog(buf))
if resp != nil {
t.Fatalf("resp: %v", resp)
}
// Verify key is not set
_, d, err := fsm.state.KVSGet(nil, "/test/path")
if err != nil {
t.Fatalf("err: %v", err)
}
if d != nil {
t.Fatalf("key present")
}
}
func TestFSM_KVSDeleteCheckAndSet(t *testing.T) {
fsm, err := NewFSM(nil, os.Stderr)
if err != nil {
t.Fatalf("err: %v", err)
}
req := structs.KVSRequest{
Datacenter: "dc1",
Op: api.KVSet,
DirEnt: structs.DirEntry{
Key: "/test/path",
Flags: 0,
Value: []byte("test"),
},
}
buf, err := structs.Encode(structs.KVSRequestType, req)
if err != nil {
t.Fatalf("err: %v", err)
}
resp := fsm.Apply(makeLog(buf))
if resp != nil {
t.Fatalf("resp: %v", resp)
}
// Verify key is set
_, d, err := fsm.state.KVSGet(nil, "/test/path")
if err != nil {
t.Fatalf("err: %v", err)
}
if d == nil {
t.Fatalf("key missing")
}
// Run the check-and-set
req.Op = api.KVDeleteCAS
req.DirEnt.ModifyIndex = d.ModifyIndex
buf, err = structs.Encode(structs.KVSRequestType, req)
if err != nil {
t.Fatalf("err: %v", err)
}
resp = fsm.Apply(makeLog(buf))
if resp.(bool) != true {
t.Fatalf("resp: %v", resp)
}
// Verify key is gone
_, d, err = fsm.state.KVSGet(nil, "/test/path")
if err != nil {
t.Fatalf("err: %v", err)
}
if d != nil {
t.Fatalf("bad: %v", d)
}
}
func TestFSM_KVSCheckAndSet(t *testing.T) {
fsm, err := NewFSM(nil, os.Stderr)
if err != nil {
t.Fatalf("err: %v", err)
}
req := structs.KVSRequest{
Datacenter: "dc1",
Op: api.KVSet,
DirEnt: structs.DirEntry{
Key: "/test/path",
Flags: 0,
Value: []byte("test"),
},
}
buf, err := structs.Encode(structs.KVSRequestType, req)
if err != nil {
t.Fatalf("err: %v", err)
}
resp := fsm.Apply(makeLog(buf))
if resp != nil {
t.Fatalf("resp: %v", resp)
}
// Verify key is set
_, d, err := fsm.state.KVSGet(nil, "/test/path")
if err != nil {
t.Fatalf("err: %v", err)
}
if d == nil {
t.Fatalf("key missing")
}
// Run the check-and-set
req.Op = api.KVCAS
req.DirEnt.ModifyIndex = d.ModifyIndex
req.DirEnt.Value = []byte("zip")
buf, err = structs.Encode(structs.KVSRequestType, req)
if err != nil {
t.Fatalf("err: %v", err)
}
resp = fsm.Apply(makeLog(buf))
if resp.(bool) != true {
t.Fatalf("resp: %v", resp)
}
// Verify key is updated
_, d, err = fsm.state.KVSGet(nil, "/test/path")
if err != nil {
t.Fatalf("err: %v", err)
}
if string(d.Value) != "zip" {
t.Fatalf("bad: %v", d)
}
}
func TestFSM_CoordinateUpdate(t *testing.T) {
fsm, err := NewFSM(nil, os.Stderr)
if err != nil {
t.Fatalf("err: %v", err)
}
// Register some nodes.
fsm.state.EnsureNode(1, &structs.Node{Node: "node1", Address: "127.0.0.1"})
fsm.state.EnsureNode(2, &structs.Node{Node: "node2", Address: "127.0.0.1"})
// Write a batch of two coordinates.
updates := structs.Coordinates{
&structs.Coordinate{
Node: "node1",
Coord: generateRandomCoordinate(),
},
&structs.Coordinate{
Node: "node2",
Coord: generateRandomCoordinate(),
},
}
buf, err := structs.Encode(structs.CoordinateBatchUpdateType, updates)
if err != nil {
t.Fatalf("err: %v", err)
}
resp := fsm.Apply(makeLog(buf))
if resp != nil {
t.Fatalf("resp: %v", resp)
}
// Read back the two coordinates to make sure they got updated.
_, coords, err := fsm.state.Coordinates(nil)
if err != nil {
t.Fatalf("err: %s", err)
}
if !reflect.DeepEqual(coords, updates) {
t.Fatalf("bad: %#v", coords)
}
}
func TestFSM_SessionCreate_Destroy(t *testing.T) {
fsm, err := NewFSM(nil, os.Stderr)
if err != nil {
t.Fatalf("err: %v", err)
}
fsm.state.EnsureNode(1, &structs.Node{Node: "foo", Address: "127.0.0.1"})
fsm.state.EnsureCheck(2, &structs.HealthCheck{
Node: "foo",
CheckID: "web",
Status: api.HealthPassing,
})
// Create a new session
req := structs.SessionRequest{
Datacenter: "dc1",
Op: structs.SessionCreate,
Session: structs.Session{
ID: generateUUID(),
Node: "foo",
Checks: []types.CheckID{"web"},
},
}
buf, err := structs.Encode(structs.SessionRequestType, req)
if err != nil {
t.Fatalf("err: %v", err)
}
resp := fsm.Apply(makeLog(buf))
if err, ok := resp.(error); ok {
t.Fatalf("resp: %v", err)
}
// Get the session
id := resp.(string)
_, session, err := fsm.state.SessionGet(nil, id)
if err != nil {
t.Fatalf("err: %v", err)
}
if session == nil {
t.Fatalf("missing")
}
// Verify the session
if session.ID != id {
t.Fatalf("bad: %v", *session)
}
if session.Node != "foo" {
t.Fatalf("bad: %v", *session)
}
if session.Checks[0] != "web" {
t.Fatalf("bad: %v", *session)
}
// Try to destroy
destroy := structs.SessionRequest{
Datacenter: "dc1",
Op: structs.SessionDestroy,
Session: structs.Session{
ID: id,
},
}
buf, err = structs.Encode(structs.SessionRequestType, destroy)
if err != nil {
t.Fatalf("err: %v", err)
}
resp = fsm.Apply(makeLog(buf))
if resp != nil {
t.Fatalf("resp: %v", resp)
}
_, session, err = fsm.state.SessionGet(nil, id)
if err != nil {
t.Fatalf("err: %v", err)
}
if session != nil {
t.Fatalf("should be destroyed")
}
}
func TestFSM_KVSLock(t *testing.T) {
fsm, err := NewFSM(nil, os.Stderr)
if err != nil {
t.Fatalf("err: %v", err)
}
fsm.state.EnsureNode(1, &structs.Node{Node: "foo", Address: "127.0.0.1"})
session := &structs.Session{ID: generateUUID(), Node: "foo"}
fsm.state.SessionCreate(2, session)
req := structs.KVSRequest{
Datacenter: "dc1",
Op: api.KVLock,
DirEnt: structs.DirEntry{
Key: "/test/path",
Value: []byte("test"),
Session: session.ID,
},
}
buf, err := structs.Encode(structs.KVSRequestType, req)
if err != nil {
t.Fatalf("err: %v", err)
}
resp := fsm.Apply(makeLog(buf))
if resp != true {
t.Fatalf("resp: %v", resp)
}
// Verify key is locked
_, d, err := fsm.state.KVSGet(nil, "/test/path")
if err != nil {
t.Fatalf("err: %v", err)
}
if d == nil {
t.Fatalf("missing")
}
if d.LockIndex != 1 {
t.Fatalf("bad: %v", *d)
}
if d.Session != session.ID {
t.Fatalf("bad: %v", *d)
}
}
func TestFSM_KVSUnlock(t *testing.T) {
fsm, err := NewFSM(nil, os.Stderr)
if err != nil {
t.Fatalf("err: %v", err)
}
fsm.state.EnsureNode(1, &structs.Node{Node: "foo", Address: "127.0.0.1"})
session := &structs.Session{ID: generateUUID(), Node: "foo"}
fsm.state.SessionCreate(2, session)
req := structs.KVSRequest{
Datacenter: "dc1",
Op: api.KVLock,
DirEnt: structs.DirEntry{
Key: "/test/path",
Value: []byte("test"),
Session: session.ID,
},
}
buf, err := structs.Encode(structs.KVSRequestType, req)
if err != nil {
t.Fatalf("err: %v", err)
}
resp := fsm.Apply(makeLog(buf))
if resp != true {
t.Fatalf("resp: %v", resp)
}
req = structs.KVSRequest{
Datacenter: "dc1",
Op: api.KVUnlock,
DirEnt: structs.DirEntry{
Key: "/test/path",
Value: []byte("test"),
Session: session.ID,
},
}
buf, err = structs.Encode(structs.KVSRequestType, req)
if err != nil {
t.Fatalf("err: %v", err)
}
resp = fsm.Apply(makeLog(buf))
if resp != true {
t.Fatalf("resp: %v", resp)
}
// Verify key is unlocked
_, d, err := fsm.state.KVSGet(nil, "/test/path")
if err != nil {
t.Fatalf("err: %v", err)
}
if d == nil {
t.Fatalf("missing")
}
if d.LockIndex != 1 {
t.Fatalf("bad: %v", *d)
}
if d.Session != "" {
t.Fatalf("bad: %v", *d)
}
}
func TestFSM_ACL_Set_Delete(t *testing.T) {
fsm, err := NewFSM(nil, os.Stderr)
if err != nil {
t.Fatalf("err: %v", err)
}
// Create a new ACL
req := structs.ACLRequest{
Datacenter: "dc1",
Op: structs.ACLSet,
ACL: structs.ACL{
ID: generateUUID(),
Name: "User token",
Type: structs.ACLTypeClient,
},
}
buf, err := structs.Encode(structs.ACLRequestType, req)
if err != nil {
t.Fatalf("err: %v", err)
}
resp := fsm.Apply(makeLog(buf))
if err, ok := resp.(error); ok {
t.Fatalf("resp: %v", err)
}
// Get the ACL
id := resp.(string)
_, acl, err := fsm.state.ACLGet(nil, id)
if err != nil {
t.Fatalf("err: %v", err)
}
if acl == nil {
t.Fatalf("missing")
}
// Verify the ACL
if acl.ID != id {
t.Fatalf("bad: %v", *acl)
}
if acl.Name != "User token" {
t.Fatalf("bad: %v", *acl)
}
if acl.Type != structs.ACLTypeClient {
t.Fatalf("bad: %v", *acl)
}
// Try to destroy
destroy := structs.ACLRequest{
Datacenter: "dc1",
Op: structs.ACLDelete,
ACL: structs.ACL{
ID: id,
},
}
buf, err = structs.Encode(structs.ACLRequestType, destroy)
if err != nil {
t.Fatalf("err: %v", err)
}
resp = fsm.Apply(makeLog(buf))
if resp != nil {
t.Fatalf("resp: %v", resp)
}
_, acl, err = fsm.state.ACLGet(nil, id)
if err != nil {
t.Fatalf("err: %v", err)
}
if acl != nil {
t.Fatalf("should be destroyed")
}
}
func TestFSM_PreparedQuery_CRUD(t *testing.T) {
fsm, err := NewFSM(nil, os.Stderr)
if err != nil {
t.Fatalf("err: %v", err)
}
// Register a service to query on.
fsm.state.EnsureNode(1, &structs.Node{Node: "foo", Address: "127.0.0.1"})
fsm.state.EnsureService(2, "foo", &structs.NodeService{ID: "web", Service: "web", Tags: nil, Address: "127.0.0.1", Port: 80})
// Create a new query.
query := structs.PreparedQueryRequest{
Op: structs.PreparedQueryCreate,
Query: &structs.PreparedQuery{
ID: generateUUID(),
Service: structs.ServiceQuery{
Service: "web",
},
},
}
{
buf, err := structs.Encode(structs.PreparedQueryRequestType, query)
if err != nil {
t.Fatalf("err: %v", err)
}
resp := fsm.Apply(makeLog(buf))
if resp != nil {
t.Fatalf("resp: %v", resp)
}
}
// Verify it's in the state store.
{
_, actual, err := fsm.state.PreparedQueryGet(nil, query.Query.ID)
if err != nil {
t.Fatalf("err: %s", err)
}
actual.CreateIndex, actual.ModifyIndex = 0, 0
if !reflect.DeepEqual(actual, query.Query) {
t.Fatalf("bad: %v", actual)
}
}
// Make an update to the query.
query.Op = structs.PreparedQueryUpdate
query.Query.Name = "my-query"
{
buf, err := structs.Encode(structs.PreparedQueryRequestType, query)
if err != nil {
t.Fatalf("err: %v", err)
}
resp := fsm.Apply(makeLog(buf))
if resp != nil {
t.Fatalf("resp: %v", resp)
}
}
// Verify the update.
{
_, actual, err := fsm.state.PreparedQueryGet(nil, query.Query.ID)
if err != nil {
t.Fatalf("err: %s", err)
}
actual.CreateIndex, actual.ModifyIndex = 0, 0
if !reflect.DeepEqual(actual, query.Query) {
t.Fatalf("bad: %v", actual)
}
}
// Delete the query.
query.Op = structs.PreparedQueryDelete
{
buf, err := structs.Encode(structs.PreparedQueryRequestType, query)
if err != nil {
t.Fatalf("err: %v", err)
}
resp := fsm.Apply(makeLog(buf))
if resp != nil {
t.Fatalf("resp: %v", resp)
}
}
// Make sure it's gone.
{
_, actual, err := fsm.state.PreparedQueryGet(nil, query.Query.ID)
if err != nil {
t.Fatalf("err: %s", err)
}
if actual != nil {
t.Fatalf("bad: %v", actual)
}
}
}
func TestFSM_TombstoneReap(t *testing.T) {
fsm, err := NewFSM(nil, os.Stderr)
if err != nil {
t.Fatalf("err: %v", err)
}
// Create some tombstones
fsm.state.KVSSet(11, &structs.DirEntry{
Key: "/remove",
Value: []byte("foo"),
})
fsm.state.KVSDelete(12, "/remove")
idx, _, err := fsm.state.KVSList(nil, "/remove")
if err != nil {
t.Fatalf("err: %s", err)
}
if idx != 12 {
t.Fatalf("bad index: %d", idx)
}
// Create a new reap request
req := structs.TombstoneRequest{
Datacenter: "dc1",
Op: structs.TombstoneReap,
ReapIndex: 12,
}
buf, err := structs.Encode(structs.TombstoneRequestType, req)
if err != nil {
t.Fatalf("err: %v", err)
}
resp := fsm.Apply(makeLog(buf))
if err, ok := resp.(error); ok {
t.Fatalf("resp: %v", err)
}
// Verify the tombstones are gone
snap := fsm.state.Snapshot()
defer snap.Close()
stones, err := snap.Tombstones()
if err != nil {
t.Fatalf("err: %s", err)
}
if stones.Next() != nil {
t.Fatalf("unexpected extra tombstones")
}
}
func TestFSM_Txn(t *testing.T) {
fsm, err := NewFSM(nil, os.Stderr)
if err != nil {
t.Fatalf("err: %v", err)
}
// Set a key using a transaction.
req := structs.TxnRequest{
Datacenter: "dc1",
Ops: structs.TxnOps{
&structs.TxnOp{
KV: &structs.TxnKVOp{
Verb: api.KVSet,
DirEnt: structs.DirEntry{
Key: "/test/path",
Flags: 0,
Value: []byte("test"),
},
},
},
},
}
buf, err := structs.Encode(structs.TxnRequestType, req)
if err != nil {
t.Fatalf("err: %v", err)
}
resp := fsm.Apply(makeLog(buf))
if _, ok := resp.(structs.TxnResponse); !ok {
t.Fatalf("bad response type: %T", resp)
}
// Verify key is set directly in the state store.
_, d, err := fsm.state.KVSGet(nil, "/test/path")
if err != nil {
t.Fatalf("err: %v", err)
}
if d == nil {
t.Fatalf("missing")
}
}
func TestFSM_Autopilot(t *testing.T) {
fsm, err := NewFSM(nil, os.Stderr)
if err != nil {
t.Fatalf("err: %v", err)
}
// Set the autopilot config using a request.
req := structs.AutopilotSetConfigRequest{
Datacenter: "dc1",
Config: structs.AutopilotConfig{
CleanupDeadServers: true,
LastContactThreshold: 10 * time.Second,
MaxTrailingLogs: 300,
},
}
buf, err := structs.Encode(structs.AutopilotRequestType, req)
if err != nil {
t.Fatalf("err: %v", err)
}
resp := fsm.Apply(makeLog(buf))
if _, ok := resp.(error); ok {
t.Fatalf("bad: %v", resp)
}
// Verify key is set directly in the state store.
_, config, err := fsm.state.AutopilotConfig()
if err != nil {
t.Fatalf("err: %v", err)
}
if config.CleanupDeadServers != req.Config.CleanupDeadServers {
t.Fatalf("bad: %v", config.CleanupDeadServers)
}
if config.LastContactThreshold != req.Config.LastContactThreshold {
t.Fatalf("bad: %v", config.LastContactThreshold)
}
if config.MaxTrailingLogs != req.Config.MaxTrailingLogs {
t.Fatalf("bad: %v", config.MaxTrailingLogs)
}
// Now use CAS and provide an old index
req.CAS = true
req.Config.CleanupDeadServers = false
req.Config.ModifyIndex = config.ModifyIndex - 1
buf, err = structs.Encode(structs.AutopilotRequestType, req)
if err != nil {
t.Fatalf("err: %v", err)
}
resp = fsm.Apply(makeLog(buf))
if _, ok := resp.(error); ok {
t.Fatalf("bad: %v", resp)
}
_, config, err = fsm.state.AutopilotConfig()
if err != nil {
t.Fatalf("err: %v", err)
}
if !config.CleanupDeadServers {
t.Fatalf("bad: %v", config.CleanupDeadServers)
}
}
func TestFSM_IgnoreUnknown(t *testing.T) {
fsm, err := NewFSM(nil, os.Stderr)
if err != nil {
t.Fatalf("err: %v", err)
}
// Create a new reap request
type UnknownRequest struct {
Foo string
}
req := UnknownRequest{Foo: "bar"}
msgType := structs.IgnoreUnknownTypeFlag | 64
buf, err := structs.Encode(msgType, req)
if err != nil {
t.Fatalf("err: %v", err)
}
// Apply should work, even though not supported
resp := fsm.Apply(makeLog(buf))
if err, ok := resp.(error); ok {
t.Fatalf("resp: %v", err)
}
}