1504 lines
32 KiB
Go
1504 lines
32 KiB
Go
package consul
|
|
|
|
import (
|
|
"bytes"
|
|
"fmt"
|
|
"os"
|
|
"reflect"
|
|
"testing"
|
|
"time"
|
|
|
|
"github.com/hashicorp/consul/agent/consul/state"
|
|
"github.com/hashicorp/consul/agent/structs"
|
|
"github.com/hashicorp/consul/api"
|
|
"github.com/hashicorp/consul/lib"
|
|
"github.com/hashicorp/consul/types"
|
|
"github.com/hashicorp/go-uuid"
|
|
"github.com/hashicorp/raft"
|
|
"github.com/pascaldekloe/goe/verify"
|
|
)
|
|
|
|
type MockSink struct {
|
|
*bytes.Buffer
|
|
cancel bool
|
|
}
|
|
|
|
func (m *MockSink) ID() string {
|
|
return "Mock"
|
|
}
|
|
|
|
func (m *MockSink) Cancel() error {
|
|
m.cancel = true
|
|
return nil
|
|
}
|
|
|
|
func (m *MockSink) Close() error {
|
|
return nil
|
|
}
|
|
|
|
func makeLog(buf []byte) *raft.Log {
|
|
return &raft.Log{
|
|
Index: 1,
|
|
Term: 1,
|
|
Type: raft.LogCommand,
|
|
Data: buf,
|
|
}
|
|
}
|
|
|
|
func generateUUID() (ret string) {
|
|
var err error
|
|
if ret, err = uuid.GenerateUUID(); err != nil {
|
|
panic(fmt.Sprintf("Unable to generate a UUID, %v", err))
|
|
}
|
|
return ret
|
|
}
|
|
|
|
func TestFSM_RegisterNode(t *testing.T) {
|
|
t.Parallel()
|
|
fsm, err := NewFSM(nil, os.Stderr)
|
|
if err != nil {
|
|
t.Fatalf("err: %v", err)
|
|
}
|
|
|
|
req := structs.RegisterRequest{
|
|
Datacenter: "dc1",
|
|
Node: "foo",
|
|
Address: "127.0.0.1",
|
|
}
|
|
buf, err := structs.Encode(structs.RegisterRequestType, req)
|
|
if err != nil {
|
|
t.Fatalf("err: %v", err)
|
|
}
|
|
|
|
resp := fsm.Apply(makeLog(buf))
|
|
if resp != nil {
|
|
t.Fatalf("resp: %v", resp)
|
|
}
|
|
|
|
// Verify we are registered
|
|
_, node, err := fsm.state.GetNode("foo")
|
|
if err != nil {
|
|
t.Fatalf("err: %s", err)
|
|
}
|
|
if node == nil {
|
|
t.Fatalf("not found!")
|
|
}
|
|
if node.ModifyIndex != 1 {
|
|
t.Fatalf("bad index: %d", node.ModifyIndex)
|
|
}
|
|
|
|
// Verify service registered
|
|
_, services, err := fsm.state.NodeServices(nil, "foo")
|
|
if err != nil {
|
|
t.Fatalf("err: %s", err)
|
|
}
|
|
if len(services.Services) != 0 {
|
|
t.Fatalf("Services: %v", services)
|
|
}
|
|
}
|
|
|
|
func TestFSM_RegisterNode_Service(t *testing.T) {
|
|
t.Parallel()
|
|
fsm, err := NewFSM(nil, os.Stderr)
|
|
if err != nil {
|
|
t.Fatalf("err: %v", err)
|
|
}
|
|
|
|
req := structs.RegisterRequest{
|
|
Datacenter: "dc1",
|
|
Node: "foo",
|
|
Address: "127.0.0.1",
|
|
Service: &structs.NodeService{
|
|
ID: "db",
|
|
Service: "db",
|
|
Tags: []string{"master"},
|
|
Port: 8000,
|
|
},
|
|
Check: &structs.HealthCheck{
|
|
Node: "foo",
|
|
CheckID: "db",
|
|
Name: "db connectivity",
|
|
Status: api.HealthPassing,
|
|
ServiceID: "db",
|
|
},
|
|
}
|
|
buf, err := structs.Encode(structs.RegisterRequestType, req)
|
|
if err != nil {
|
|
t.Fatalf("err: %v", err)
|
|
}
|
|
|
|
resp := fsm.Apply(makeLog(buf))
|
|
if resp != nil {
|
|
t.Fatalf("resp: %v", resp)
|
|
}
|
|
|
|
// Verify we are registered
|
|
_, node, err := fsm.state.GetNode("foo")
|
|
if err != nil {
|
|
t.Fatalf("err: %s", err)
|
|
}
|
|
if node == nil {
|
|
t.Fatalf("not found!")
|
|
}
|
|
|
|
// Verify service registered
|
|
_, services, err := fsm.state.NodeServices(nil, "foo")
|
|
if err != nil {
|
|
t.Fatalf("err: %s", err)
|
|
}
|
|
if _, ok := services.Services["db"]; !ok {
|
|
t.Fatalf("not registered!")
|
|
}
|
|
|
|
// Verify check
|
|
_, checks, err := fsm.state.NodeChecks(nil, "foo")
|
|
if err != nil {
|
|
t.Fatalf("err: %s", err)
|
|
}
|
|
if checks[0].CheckID != "db" {
|
|
t.Fatalf("not registered!")
|
|
}
|
|
}
|
|
|
|
func TestFSM_DeregisterService(t *testing.T) {
|
|
t.Parallel()
|
|
fsm, err := NewFSM(nil, os.Stderr)
|
|
if err != nil {
|
|
t.Fatalf("err: %v", err)
|
|
}
|
|
|
|
req := structs.RegisterRequest{
|
|
Datacenter: "dc1",
|
|
Node: "foo",
|
|
Address: "127.0.0.1",
|
|
Service: &structs.NodeService{
|
|
ID: "db",
|
|
Service: "db",
|
|
Tags: []string{"master"},
|
|
Port: 8000,
|
|
},
|
|
}
|
|
buf, err := structs.Encode(structs.RegisterRequestType, req)
|
|
if err != nil {
|
|
t.Fatalf("err: %v", err)
|
|
}
|
|
|
|
resp := fsm.Apply(makeLog(buf))
|
|
if resp != nil {
|
|
t.Fatalf("resp: %v", resp)
|
|
}
|
|
|
|
dereg := structs.DeregisterRequest{
|
|
Datacenter: "dc1",
|
|
Node: "foo",
|
|
ServiceID: "db",
|
|
}
|
|
buf, err = structs.Encode(structs.DeregisterRequestType, dereg)
|
|
if err != nil {
|
|
t.Fatalf("err: %v", err)
|
|
}
|
|
|
|
resp = fsm.Apply(makeLog(buf))
|
|
if resp != nil {
|
|
t.Fatalf("resp: %v", resp)
|
|
}
|
|
|
|
// Verify we are registered
|
|
_, node, err := fsm.state.GetNode("foo")
|
|
if err != nil {
|
|
t.Fatalf("err: %s", err)
|
|
}
|
|
if node == nil {
|
|
t.Fatalf("not found!")
|
|
}
|
|
|
|
// Verify service not registered
|
|
_, services, err := fsm.state.NodeServices(nil, "foo")
|
|
if err != nil {
|
|
t.Fatalf("err: %s", err)
|
|
}
|
|
if _, ok := services.Services["db"]; ok {
|
|
t.Fatalf("db registered!")
|
|
}
|
|
}
|
|
|
|
func TestFSM_DeregisterCheck(t *testing.T) {
|
|
t.Parallel()
|
|
fsm, err := NewFSM(nil, os.Stderr)
|
|
if err != nil {
|
|
t.Fatalf("err: %v", err)
|
|
}
|
|
|
|
req := structs.RegisterRequest{
|
|
Datacenter: "dc1",
|
|
Node: "foo",
|
|
Address: "127.0.0.1",
|
|
Check: &structs.HealthCheck{
|
|
Node: "foo",
|
|
CheckID: "mem",
|
|
Name: "memory util",
|
|
Status: api.HealthPassing,
|
|
},
|
|
}
|
|
buf, err := structs.Encode(structs.RegisterRequestType, req)
|
|
if err != nil {
|
|
t.Fatalf("err: %v", err)
|
|
}
|
|
|
|
resp := fsm.Apply(makeLog(buf))
|
|
if resp != nil {
|
|
t.Fatalf("resp: %v", resp)
|
|
}
|
|
|
|
dereg := structs.DeregisterRequest{
|
|
Datacenter: "dc1",
|
|
Node: "foo",
|
|
CheckID: "mem",
|
|
}
|
|
buf, err = structs.Encode(structs.DeregisterRequestType, dereg)
|
|
if err != nil {
|
|
t.Fatalf("err: %v", err)
|
|
}
|
|
|
|
resp = fsm.Apply(makeLog(buf))
|
|
if resp != nil {
|
|
t.Fatalf("resp: %v", resp)
|
|
}
|
|
|
|
// Verify we are registered
|
|
_, node, err := fsm.state.GetNode("foo")
|
|
if err != nil {
|
|
t.Fatalf("err: %s", err)
|
|
}
|
|
if node == nil {
|
|
t.Fatalf("not found!")
|
|
}
|
|
|
|
// Verify check not registered
|
|
_, checks, err := fsm.state.NodeChecks(nil, "foo")
|
|
if err != nil {
|
|
t.Fatalf("err: %s", err)
|
|
}
|
|
if len(checks) != 0 {
|
|
t.Fatalf("check registered!")
|
|
}
|
|
}
|
|
|
|
func TestFSM_DeregisterNode(t *testing.T) {
|
|
t.Parallel()
|
|
fsm, err := NewFSM(nil, os.Stderr)
|
|
if err != nil {
|
|
t.Fatalf("err: %v", err)
|
|
}
|
|
|
|
req := structs.RegisterRequest{
|
|
Datacenter: "dc1",
|
|
Node: "foo",
|
|
Address: "127.0.0.1",
|
|
Service: &structs.NodeService{
|
|
ID: "db",
|
|
Service: "db",
|
|
Tags: []string{"master"},
|
|
Port: 8000,
|
|
},
|
|
Check: &structs.HealthCheck{
|
|
Node: "foo",
|
|
CheckID: "db",
|
|
Name: "db connectivity",
|
|
Status: api.HealthPassing,
|
|
ServiceID: "db",
|
|
},
|
|
}
|
|
buf, err := structs.Encode(structs.RegisterRequestType, req)
|
|
if err != nil {
|
|
t.Fatalf("err: %v", err)
|
|
}
|
|
|
|
resp := fsm.Apply(makeLog(buf))
|
|
if resp != nil {
|
|
t.Fatalf("resp: %v", resp)
|
|
}
|
|
|
|
dereg := structs.DeregisterRequest{
|
|
Datacenter: "dc1",
|
|
Node: "foo",
|
|
}
|
|
buf, err = structs.Encode(structs.DeregisterRequestType, dereg)
|
|
if err != nil {
|
|
t.Fatalf("err: %v", err)
|
|
}
|
|
|
|
resp = fsm.Apply(makeLog(buf))
|
|
if resp != nil {
|
|
t.Fatalf("resp: %v", resp)
|
|
}
|
|
|
|
// Verify we are not registered
|
|
_, node, err := fsm.state.GetNode("foo")
|
|
if err != nil {
|
|
t.Fatalf("err: %s", err)
|
|
}
|
|
if node != nil {
|
|
t.Fatalf("found!")
|
|
}
|
|
|
|
// Verify service not registered
|
|
_, services, err := fsm.state.NodeServices(nil, "foo")
|
|
if err != nil {
|
|
t.Fatalf("err: %s", err)
|
|
}
|
|
if services != nil {
|
|
t.Fatalf("Services: %v", services)
|
|
}
|
|
|
|
// Verify checks not registered
|
|
_, checks, err := fsm.state.NodeChecks(nil, "foo")
|
|
if err != nil {
|
|
t.Fatalf("err: %s", err)
|
|
}
|
|
if len(checks) != 0 {
|
|
t.Fatalf("Services: %v", services)
|
|
}
|
|
}
|
|
|
|
func TestFSM_SnapshotRestore(t *testing.T) {
|
|
t.Parallel()
|
|
fsm, err := NewFSM(nil, os.Stderr)
|
|
if err != nil {
|
|
t.Fatalf("err: %v", err)
|
|
}
|
|
|
|
// Add some state
|
|
fsm.state.EnsureNode(1, &structs.Node{Node: "foo", Address: "127.0.0.1"})
|
|
fsm.state.EnsureNode(2, &structs.Node{Node: "baz", Address: "127.0.0.2", TaggedAddresses: map[string]string{"hello": "1.2.3.4"}})
|
|
fsm.state.EnsureService(3, "foo", &structs.NodeService{ID: "web", Service: "web", Tags: nil, Address: "127.0.0.1", Port: 80})
|
|
fsm.state.EnsureService(4, "foo", &structs.NodeService{ID: "db", Service: "db", Tags: []string{"primary"}, Address: "127.0.0.1", Port: 5000})
|
|
fsm.state.EnsureService(5, "baz", &structs.NodeService{ID: "web", Service: "web", Tags: nil, Address: "127.0.0.2", Port: 80})
|
|
fsm.state.EnsureService(6, "baz", &structs.NodeService{ID: "db", Service: "db", Tags: []string{"secondary"}, Address: "127.0.0.2", Port: 5000})
|
|
fsm.state.EnsureCheck(7, &structs.HealthCheck{
|
|
Node: "foo",
|
|
CheckID: "web",
|
|
Name: "web connectivity",
|
|
Status: api.HealthPassing,
|
|
ServiceID: "web",
|
|
})
|
|
fsm.state.KVSSet(8, &structs.DirEntry{
|
|
Key: "/test",
|
|
Value: []byte("foo"),
|
|
})
|
|
session := &structs.Session{ID: generateUUID(), Node: "foo"}
|
|
fsm.state.SessionCreate(9, session)
|
|
acl := &structs.ACL{ID: generateUUID(), Name: "User Token"}
|
|
fsm.state.ACLSet(10, acl)
|
|
if _, err := fsm.state.ACLBootstrapInit(10); err != nil {
|
|
t.Fatalf("err: %v", err)
|
|
}
|
|
|
|
fsm.state.KVSSet(11, &structs.DirEntry{
|
|
Key: "/remove",
|
|
Value: []byte("foo"),
|
|
})
|
|
fsm.state.KVSDelete(12, "/remove")
|
|
idx, _, err := fsm.state.KVSList(nil, "/remove")
|
|
if err != nil {
|
|
t.Fatalf("err: %s", err)
|
|
}
|
|
if idx != 12 {
|
|
t.Fatalf("bad index: %d", idx)
|
|
}
|
|
|
|
updates := structs.Coordinates{
|
|
&structs.Coordinate{
|
|
Node: "baz",
|
|
Coord: generateRandomCoordinate(),
|
|
},
|
|
&structs.Coordinate{
|
|
Node: "foo",
|
|
Coord: generateRandomCoordinate(),
|
|
},
|
|
}
|
|
if err := fsm.state.CoordinateBatchUpdate(13, updates); err != nil {
|
|
t.Fatalf("err: %s", err)
|
|
}
|
|
|
|
query := structs.PreparedQuery{
|
|
ID: generateUUID(),
|
|
Service: structs.ServiceQuery{
|
|
Service: "web",
|
|
},
|
|
RaftIndex: structs.RaftIndex{
|
|
CreateIndex: 14,
|
|
ModifyIndex: 14,
|
|
},
|
|
}
|
|
if err := fsm.state.PreparedQuerySet(14, &query); err != nil {
|
|
t.Fatalf("err: %s", err)
|
|
}
|
|
|
|
autopilotConf := &structs.AutopilotConfig{
|
|
CleanupDeadServers: true,
|
|
LastContactThreshold: 100 * time.Millisecond,
|
|
MaxTrailingLogs: 222,
|
|
}
|
|
if err := fsm.state.AutopilotSetConfig(15, autopilotConf); err != nil {
|
|
t.Fatalf("err: %s", err)
|
|
}
|
|
|
|
// Snapshot
|
|
snap, err := fsm.Snapshot()
|
|
if err != nil {
|
|
t.Fatalf("err: %v", err)
|
|
}
|
|
defer snap.Release()
|
|
|
|
// Persist
|
|
buf := bytes.NewBuffer(nil)
|
|
sink := &MockSink{buf, false}
|
|
if err := snap.Persist(sink); err != nil {
|
|
t.Fatalf("err: %v", err)
|
|
}
|
|
|
|
// Try to restore on a new FSM
|
|
fsm2, err := NewFSM(nil, os.Stderr)
|
|
if err != nil {
|
|
t.Fatalf("err: %v", err)
|
|
}
|
|
|
|
// Do a restore
|
|
if err := fsm2.Restore(sink); err != nil {
|
|
t.Fatalf("err: %v", err)
|
|
}
|
|
|
|
// Verify the contents
|
|
_, nodes, err := fsm2.state.Nodes(nil)
|
|
if err != nil {
|
|
t.Fatalf("err: %s", err)
|
|
}
|
|
if len(nodes) != 2 {
|
|
t.Fatalf("bad: %v", nodes)
|
|
}
|
|
if nodes[0].Node != "baz" ||
|
|
nodes[0].Address != "127.0.0.2" ||
|
|
len(nodes[0].TaggedAddresses) != 1 ||
|
|
nodes[0].TaggedAddresses["hello"] != "1.2.3.4" {
|
|
t.Fatalf("bad: %v", nodes[0])
|
|
}
|
|
if nodes[1].Node != "foo" ||
|
|
nodes[1].Address != "127.0.0.1" ||
|
|
len(nodes[1].TaggedAddresses) != 0 {
|
|
t.Fatalf("bad: %v", nodes[1])
|
|
}
|
|
|
|
_, fooSrv, err := fsm2.state.NodeServices(nil, "foo")
|
|
if err != nil {
|
|
t.Fatalf("err: %s", err)
|
|
}
|
|
if len(fooSrv.Services) != 2 {
|
|
t.Fatalf("Bad: %v", fooSrv)
|
|
}
|
|
if !lib.StrContains(fooSrv.Services["db"].Tags, "primary") {
|
|
t.Fatalf("Bad: %v", fooSrv)
|
|
}
|
|
if fooSrv.Services["db"].Port != 5000 {
|
|
t.Fatalf("Bad: %v", fooSrv)
|
|
}
|
|
|
|
_, checks, err := fsm2.state.NodeChecks(nil, "foo")
|
|
if err != nil {
|
|
t.Fatalf("err: %s", err)
|
|
}
|
|
if len(checks) != 1 {
|
|
t.Fatalf("Bad: %v", checks)
|
|
}
|
|
|
|
// Verify key is set
|
|
_, d, err := fsm2.state.KVSGet(nil, "/test")
|
|
if err != nil {
|
|
t.Fatalf("err: %v", err)
|
|
}
|
|
if string(d.Value) != "foo" {
|
|
t.Fatalf("bad: %v", d)
|
|
}
|
|
|
|
// Verify session is restored
|
|
idx, s, err := fsm2.state.SessionGet(nil, session.ID)
|
|
if err != nil {
|
|
t.Fatalf("err: %v", err)
|
|
}
|
|
if s.Node != "foo" {
|
|
t.Fatalf("bad: %v", s)
|
|
}
|
|
if idx <= 1 {
|
|
t.Fatalf("bad index: %d", idx)
|
|
}
|
|
|
|
// Verify ACL is restored
|
|
_, a, err := fsm2.state.ACLGet(nil, acl.ID)
|
|
if err != nil {
|
|
t.Fatalf("err: %v", err)
|
|
}
|
|
if a.Name != "User Token" {
|
|
t.Fatalf("bad: %v", a)
|
|
}
|
|
if a.ModifyIndex <= 1 {
|
|
t.Fatalf("bad index: %d", idx)
|
|
}
|
|
gotB, err := fsm2.state.ACLGetBootstrap()
|
|
if err != nil {
|
|
t.Fatalf("err: %v", err)
|
|
}
|
|
wantB := &structs.ACLBootstrap{
|
|
AllowBootstrap: true,
|
|
RaftIndex: structs.RaftIndex{
|
|
CreateIndex: 10,
|
|
ModifyIndex: 10,
|
|
},
|
|
}
|
|
verify.Values(t, "", gotB, wantB)
|
|
|
|
// Verify tombstones are restored
|
|
func() {
|
|
snap := fsm2.state.Snapshot()
|
|
defer snap.Close()
|
|
stones, err := snap.Tombstones()
|
|
if err != nil {
|
|
t.Fatalf("err: %s", err)
|
|
}
|
|
stone := stones.Next().(*state.Tombstone)
|
|
if stone == nil {
|
|
t.Fatalf("missing tombstone")
|
|
}
|
|
if stone.Key != "/remove" || stone.Index != 12 {
|
|
t.Fatalf("bad: %v", stone)
|
|
}
|
|
if stones.Next() != nil {
|
|
t.Fatalf("unexpected extra tombstones")
|
|
}
|
|
}()
|
|
|
|
// Verify coordinates are restored
|
|
_, coords, err := fsm2.state.Coordinates(nil)
|
|
if err != nil {
|
|
t.Fatalf("err: %s", err)
|
|
}
|
|
if !reflect.DeepEqual(coords, updates) {
|
|
t.Fatalf("bad: %#v", coords)
|
|
}
|
|
|
|
// Verify queries are restored.
|
|
_, queries, err := fsm2.state.PreparedQueryList(nil)
|
|
if err != nil {
|
|
t.Fatalf("err: %s", err)
|
|
}
|
|
if len(queries) != 1 {
|
|
t.Fatalf("bad: %#v", queries)
|
|
}
|
|
if !reflect.DeepEqual(queries[0], &query) {
|
|
t.Fatalf("bad: %#v", queries[0])
|
|
}
|
|
|
|
// Verify autopilot config is restored.
|
|
_, restoredConf, err := fsm2.state.AutopilotConfig()
|
|
if err != nil {
|
|
t.Fatalf("err: %s", err)
|
|
}
|
|
if !reflect.DeepEqual(restoredConf, autopilotConf) {
|
|
t.Fatalf("bad: %#v, %#v", restoredConf, autopilotConf)
|
|
}
|
|
|
|
// Snapshot
|
|
snap, err = fsm2.Snapshot()
|
|
if err != nil {
|
|
t.Fatalf("err: %v", err)
|
|
}
|
|
defer snap.Release()
|
|
|
|
// Persist
|
|
buf = bytes.NewBuffer(nil)
|
|
sink = &MockSink{buf, false}
|
|
if err := snap.Persist(sink); err != nil {
|
|
t.Fatalf("err: %v", err)
|
|
}
|
|
|
|
// Try to restore on the old FSM and make sure it abandons the old state
|
|
// store.
|
|
abandonCh := fsm.state.AbandonCh()
|
|
if err := fsm.Restore(sink); err != nil {
|
|
t.Fatalf("err: %v", err)
|
|
}
|
|
select {
|
|
case <-abandonCh:
|
|
default:
|
|
t.Fatalf("bad")
|
|
}
|
|
}
|
|
|
|
func TestFSM_BadRestore(t *testing.T) {
|
|
t.Parallel()
|
|
// Create an FSM with some state.
|
|
fsm, err := NewFSM(nil, os.Stderr)
|
|
if err != nil {
|
|
t.Fatalf("err: %v", err)
|
|
}
|
|
fsm.state.EnsureNode(1, &structs.Node{Node: "foo", Address: "127.0.0.1"})
|
|
abandonCh := fsm.state.AbandonCh()
|
|
|
|
// Do a bad restore.
|
|
buf := bytes.NewBuffer([]byte("bad snapshot"))
|
|
sink := &MockSink{buf, false}
|
|
if err := fsm.Restore(sink); err == nil {
|
|
t.Fatalf("err: %v", err)
|
|
}
|
|
|
|
// Verify the contents didn't get corrupted.
|
|
_, nodes, err := fsm.state.Nodes(nil)
|
|
if err != nil {
|
|
t.Fatalf("err: %s", err)
|
|
}
|
|
if len(nodes) != 1 {
|
|
t.Fatalf("bad: %v", nodes)
|
|
}
|
|
if nodes[0].Node != "foo" ||
|
|
nodes[0].Address != "127.0.0.1" ||
|
|
len(nodes[0].TaggedAddresses) != 0 {
|
|
t.Fatalf("bad: %v", nodes[0])
|
|
}
|
|
|
|
// Verify the old state store didn't get abandoned.
|
|
select {
|
|
case <-abandonCh:
|
|
t.Fatalf("bad")
|
|
default:
|
|
}
|
|
}
|
|
|
|
func TestFSM_KVSDelete(t *testing.T) {
|
|
t.Parallel()
|
|
fsm, err := NewFSM(nil, os.Stderr)
|
|
if err != nil {
|
|
t.Fatalf("err: %v", err)
|
|
}
|
|
|
|
req := structs.KVSRequest{
|
|
Datacenter: "dc1",
|
|
Op: api.KVSet,
|
|
DirEnt: structs.DirEntry{
|
|
Key: "/test/path",
|
|
Flags: 0,
|
|
Value: []byte("test"),
|
|
},
|
|
}
|
|
buf, err := structs.Encode(structs.KVSRequestType, req)
|
|
if err != nil {
|
|
t.Fatalf("err: %v", err)
|
|
}
|
|
resp := fsm.Apply(makeLog(buf))
|
|
if resp != nil {
|
|
t.Fatalf("resp: %v", resp)
|
|
}
|
|
|
|
// Run the delete
|
|
req.Op = api.KVDelete
|
|
buf, err = structs.Encode(structs.KVSRequestType, req)
|
|
if err != nil {
|
|
t.Fatalf("err: %v", err)
|
|
}
|
|
resp = fsm.Apply(makeLog(buf))
|
|
if resp != nil {
|
|
t.Fatalf("resp: %v", resp)
|
|
}
|
|
|
|
// Verify key is not set
|
|
_, d, err := fsm.state.KVSGet(nil, "/test/path")
|
|
if err != nil {
|
|
t.Fatalf("err: %v", err)
|
|
}
|
|
if d != nil {
|
|
t.Fatalf("key present")
|
|
}
|
|
}
|
|
|
|
func TestFSM_KVSDeleteTree(t *testing.T) {
|
|
t.Parallel()
|
|
fsm, err := NewFSM(nil, os.Stderr)
|
|
if err != nil {
|
|
t.Fatalf("err: %v", err)
|
|
}
|
|
|
|
req := structs.KVSRequest{
|
|
Datacenter: "dc1",
|
|
Op: api.KVSet,
|
|
DirEnt: structs.DirEntry{
|
|
Key: "/test/path",
|
|
Flags: 0,
|
|
Value: []byte("test"),
|
|
},
|
|
}
|
|
buf, err := structs.Encode(structs.KVSRequestType, req)
|
|
if err != nil {
|
|
t.Fatalf("err: %v", err)
|
|
}
|
|
resp := fsm.Apply(makeLog(buf))
|
|
if resp != nil {
|
|
t.Fatalf("resp: %v", resp)
|
|
}
|
|
|
|
// Run the delete tree
|
|
req.Op = api.KVDeleteTree
|
|
req.DirEnt.Key = "/test"
|
|
buf, err = structs.Encode(structs.KVSRequestType, req)
|
|
if err != nil {
|
|
t.Fatalf("err: %v", err)
|
|
}
|
|
resp = fsm.Apply(makeLog(buf))
|
|
if resp != nil {
|
|
t.Fatalf("resp: %v", resp)
|
|
}
|
|
|
|
// Verify key is not set
|
|
_, d, err := fsm.state.KVSGet(nil, "/test/path")
|
|
if err != nil {
|
|
t.Fatalf("err: %v", err)
|
|
}
|
|
if d != nil {
|
|
t.Fatalf("key present")
|
|
}
|
|
}
|
|
|
|
func TestFSM_KVSDeleteCheckAndSet(t *testing.T) {
|
|
t.Parallel()
|
|
fsm, err := NewFSM(nil, os.Stderr)
|
|
if err != nil {
|
|
t.Fatalf("err: %v", err)
|
|
}
|
|
|
|
req := structs.KVSRequest{
|
|
Datacenter: "dc1",
|
|
Op: api.KVSet,
|
|
DirEnt: structs.DirEntry{
|
|
Key: "/test/path",
|
|
Flags: 0,
|
|
Value: []byte("test"),
|
|
},
|
|
}
|
|
buf, err := structs.Encode(structs.KVSRequestType, req)
|
|
if err != nil {
|
|
t.Fatalf("err: %v", err)
|
|
}
|
|
resp := fsm.Apply(makeLog(buf))
|
|
if resp != nil {
|
|
t.Fatalf("resp: %v", resp)
|
|
}
|
|
|
|
// Verify key is set
|
|
_, d, err := fsm.state.KVSGet(nil, "/test/path")
|
|
if err != nil {
|
|
t.Fatalf("err: %v", err)
|
|
}
|
|
if d == nil {
|
|
t.Fatalf("key missing")
|
|
}
|
|
|
|
// Run the check-and-set
|
|
req.Op = api.KVDeleteCAS
|
|
req.DirEnt.ModifyIndex = d.ModifyIndex
|
|
buf, err = structs.Encode(structs.KVSRequestType, req)
|
|
if err != nil {
|
|
t.Fatalf("err: %v", err)
|
|
}
|
|
resp = fsm.Apply(makeLog(buf))
|
|
if resp.(bool) != true {
|
|
t.Fatalf("resp: %v", resp)
|
|
}
|
|
|
|
// Verify key is gone
|
|
_, d, err = fsm.state.KVSGet(nil, "/test/path")
|
|
if err != nil {
|
|
t.Fatalf("err: %v", err)
|
|
}
|
|
if d != nil {
|
|
t.Fatalf("bad: %v", d)
|
|
}
|
|
}
|
|
|
|
func TestFSM_KVSCheckAndSet(t *testing.T) {
|
|
t.Parallel()
|
|
fsm, err := NewFSM(nil, os.Stderr)
|
|
if err != nil {
|
|
t.Fatalf("err: %v", err)
|
|
}
|
|
|
|
req := structs.KVSRequest{
|
|
Datacenter: "dc1",
|
|
Op: api.KVSet,
|
|
DirEnt: structs.DirEntry{
|
|
Key: "/test/path",
|
|
Flags: 0,
|
|
Value: []byte("test"),
|
|
},
|
|
}
|
|
buf, err := structs.Encode(structs.KVSRequestType, req)
|
|
if err != nil {
|
|
t.Fatalf("err: %v", err)
|
|
}
|
|
resp := fsm.Apply(makeLog(buf))
|
|
if resp != nil {
|
|
t.Fatalf("resp: %v", resp)
|
|
}
|
|
|
|
// Verify key is set
|
|
_, d, err := fsm.state.KVSGet(nil, "/test/path")
|
|
if err != nil {
|
|
t.Fatalf("err: %v", err)
|
|
}
|
|
if d == nil {
|
|
t.Fatalf("key missing")
|
|
}
|
|
|
|
// Run the check-and-set
|
|
req.Op = api.KVCAS
|
|
req.DirEnt.ModifyIndex = d.ModifyIndex
|
|
req.DirEnt.Value = []byte("zip")
|
|
buf, err = structs.Encode(structs.KVSRequestType, req)
|
|
if err != nil {
|
|
t.Fatalf("err: %v", err)
|
|
}
|
|
resp = fsm.Apply(makeLog(buf))
|
|
if resp.(bool) != true {
|
|
t.Fatalf("resp: %v", resp)
|
|
}
|
|
|
|
// Verify key is updated
|
|
_, d, err = fsm.state.KVSGet(nil, "/test/path")
|
|
if err != nil {
|
|
t.Fatalf("err: %v", err)
|
|
}
|
|
if string(d.Value) != "zip" {
|
|
t.Fatalf("bad: %v", d)
|
|
}
|
|
}
|
|
|
|
func TestFSM_CoordinateUpdate(t *testing.T) {
|
|
t.Parallel()
|
|
fsm, err := NewFSM(nil, os.Stderr)
|
|
if err != nil {
|
|
t.Fatalf("err: %v", err)
|
|
}
|
|
|
|
// Register some nodes.
|
|
fsm.state.EnsureNode(1, &structs.Node{Node: "node1", Address: "127.0.0.1"})
|
|
fsm.state.EnsureNode(2, &structs.Node{Node: "node2", Address: "127.0.0.1"})
|
|
|
|
// Write a batch of two coordinates.
|
|
updates := structs.Coordinates{
|
|
&structs.Coordinate{
|
|
Node: "node1",
|
|
Coord: generateRandomCoordinate(),
|
|
},
|
|
&structs.Coordinate{
|
|
Node: "node2",
|
|
Coord: generateRandomCoordinate(),
|
|
},
|
|
}
|
|
buf, err := structs.Encode(structs.CoordinateBatchUpdateType, updates)
|
|
if err != nil {
|
|
t.Fatalf("err: %v", err)
|
|
}
|
|
resp := fsm.Apply(makeLog(buf))
|
|
if resp != nil {
|
|
t.Fatalf("resp: %v", resp)
|
|
}
|
|
|
|
// Read back the two coordinates to make sure they got updated.
|
|
_, coords, err := fsm.state.Coordinates(nil)
|
|
if err != nil {
|
|
t.Fatalf("err: %s", err)
|
|
}
|
|
if !reflect.DeepEqual(coords, updates) {
|
|
t.Fatalf("bad: %#v", coords)
|
|
}
|
|
}
|
|
|
|
func TestFSM_SessionCreate_Destroy(t *testing.T) {
|
|
t.Parallel()
|
|
fsm, err := NewFSM(nil, os.Stderr)
|
|
if err != nil {
|
|
t.Fatalf("err: %v", err)
|
|
}
|
|
|
|
fsm.state.EnsureNode(1, &structs.Node{Node: "foo", Address: "127.0.0.1"})
|
|
fsm.state.EnsureCheck(2, &structs.HealthCheck{
|
|
Node: "foo",
|
|
CheckID: "web",
|
|
Status: api.HealthPassing,
|
|
})
|
|
|
|
// Create a new session
|
|
req := structs.SessionRequest{
|
|
Datacenter: "dc1",
|
|
Op: structs.SessionCreate,
|
|
Session: structs.Session{
|
|
ID: generateUUID(),
|
|
Node: "foo",
|
|
Checks: []types.CheckID{"web"},
|
|
},
|
|
}
|
|
buf, err := structs.Encode(structs.SessionRequestType, req)
|
|
if err != nil {
|
|
t.Fatalf("err: %v", err)
|
|
}
|
|
resp := fsm.Apply(makeLog(buf))
|
|
if err, ok := resp.(error); ok {
|
|
t.Fatalf("resp: %v", err)
|
|
}
|
|
|
|
// Get the session
|
|
id := resp.(string)
|
|
_, session, err := fsm.state.SessionGet(nil, id)
|
|
if err != nil {
|
|
t.Fatalf("err: %v", err)
|
|
}
|
|
if session == nil {
|
|
t.Fatalf("missing")
|
|
}
|
|
|
|
// Verify the session
|
|
if session.ID != id {
|
|
t.Fatalf("bad: %v", *session)
|
|
}
|
|
if session.Node != "foo" {
|
|
t.Fatalf("bad: %v", *session)
|
|
}
|
|
if session.Checks[0] != "web" {
|
|
t.Fatalf("bad: %v", *session)
|
|
}
|
|
|
|
// Try to destroy
|
|
destroy := structs.SessionRequest{
|
|
Datacenter: "dc1",
|
|
Op: structs.SessionDestroy,
|
|
Session: structs.Session{
|
|
ID: id,
|
|
},
|
|
}
|
|
buf, err = structs.Encode(structs.SessionRequestType, destroy)
|
|
if err != nil {
|
|
t.Fatalf("err: %v", err)
|
|
}
|
|
resp = fsm.Apply(makeLog(buf))
|
|
if resp != nil {
|
|
t.Fatalf("resp: %v", resp)
|
|
}
|
|
|
|
_, session, err = fsm.state.SessionGet(nil, id)
|
|
if err != nil {
|
|
t.Fatalf("err: %v", err)
|
|
}
|
|
if session != nil {
|
|
t.Fatalf("should be destroyed")
|
|
}
|
|
}
|
|
|
|
func TestFSM_KVSLock(t *testing.T) {
|
|
t.Parallel()
|
|
fsm, err := NewFSM(nil, os.Stderr)
|
|
if err != nil {
|
|
t.Fatalf("err: %v", err)
|
|
}
|
|
|
|
fsm.state.EnsureNode(1, &structs.Node{Node: "foo", Address: "127.0.0.1"})
|
|
session := &structs.Session{ID: generateUUID(), Node: "foo"}
|
|
fsm.state.SessionCreate(2, session)
|
|
|
|
req := structs.KVSRequest{
|
|
Datacenter: "dc1",
|
|
Op: api.KVLock,
|
|
DirEnt: structs.DirEntry{
|
|
Key: "/test/path",
|
|
Value: []byte("test"),
|
|
Session: session.ID,
|
|
},
|
|
}
|
|
buf, err := structs.Encode(structs.KVSRequestType, req)
|
|
if err != nil {
|
|
t.Fatalf("err: %v", err)
|
|
}
|
|
resp := fsm.Apply(makeLog(buf))
|
|
if resp != true {
|
|
t.Fatalf("resp: %v", resp)
|
|
}
|
|
|
|
// Verify key is locked
|
|
_, d, err := fsm.state.KVSGet(nil, "/test/path")
|
|
if err != nil {
|
|
t.Fatalf("err: %v", err)
|
|
}
|
|
if d == nil {
|
|
t.Fatalf("missing")
|
|
}
|
|
if d.LockIndex != 1 {
|
|
t.Fatalf("bad: %v", *d)
|
|
}
|
|
if d.Session != session.ID {
|
|
t.Fatalf("bad: %v", *d)
|
|
}
|
|
}
|
|
|
|
func TestFSM_KVSUnlock(t *testing.T) {
|
|
t.Parallel()
|
|
fsm, err := NewFSM(nil, os.Stderr)
|
|
if err != nil {
|
|
t.Fatalf("err: %v", err)
|
|
}
|
|
|
|
fsm.state.EnsureNode(1, &structs.Node{Node: "foo", Address: "127.0.0.1"})
|
|
session := &structs.Session{ID: generateUUID(), Node: "foo"}
|
|
fsm.state.SessionCreate(2, session)
|
|
|
|
req := structs.KVSRequest{
|
|
Datacenter: "dc1",
|
|
Op: api.KVLock,
|
|
DirEnt: structs.DirEntry{
|
|
Key: "/test/path",
|
|
Value: []byte("test"),
|
|
Session: session.ID,
|
|
},
|
|
}
|
|
buf, err := structs.Encode(structs.KVSRequestType, req)
|
|
if err != nil {
|
|
t.Fatalf("err: %v", err)
|
|
}
|
|
resp := fsm.Apply(makeLog(buf))
|
|
if resp != true {
|
|
t.Fatalf("resp: %v", resp)
|
|
}
|
|
|
|
req = structs.KVSRequest{
|
|
Datacenter: "dc1",
|
|
Op: api.KVUnlock,
|
|
DirEnt: structs.DirEntry{
|
|
Key: "/test/path",
|
|
Value: []byte("test"),
|
|
Session: session.ID,
|
|
},
|
|
}
|
|
buf, err = structs.Encode(structs.KVSRequestType, req)
|
|
if err != nil {
|
|
t.Fatalf("err: %v", err)
|
|
}
|
|
resp = fsm.Apply(makeLog(buf))
|
|
if resp != true {
|
|
t.Fatalf("resp: %v", resp)
|
|
}
|
|
|
|
// Verify key is unlocked
|
|
_, d, err := fsm.state.KVSGet(nil, "/test/path")
|
|
if err != nil {
|
|
t.Fatalf("err: %v", err)
|
|
}
|
|
if d == nil {
|
|
t.Fatalf("missing")
|
|
}
|
|
if d.LockIndex != 1 {
|
|
t.Fatalf("bad: %v", *d)
|
|
}
|
|
if d.Session != "" {
|
|
t.Fatalf("bad: %v", *d)
|
|
}
|
|
}
|
|
|
|
func TestFSM_ACL_CRUD(t *testing.T) {
|
|
t.Parallel()
|
|
fsm, err := NewFSM(nil, os.Stderr)
|
|
if err != nil {
|
|
t.Fatalf("err: %v", err)
|
|
}
|
|
|
|
// Create a new ACL.
|
|
req := structs.ACLRequest{
|
|
Datacenter: "dc1",
|
|
Op: structs.ACLSet,
|
|
ACL: structs.ACL{
|
|
ID: generateUUID(),
|
|
Name: "User token",
|
|
Type: structs.ACLTypeClient,
|
|
},
|
|
}
|
|
buf, err := structs.Encode(structs.ACLRequestType, req)
|
|
if err != nil {
|
|
t.Fatalf("err: %v", err)
|
|
}
|
|
resp := fsm.Apply(makeLog(buf))
|
|
if err, ok := resp.(error); ok {
|
|
t.Fatalf("resp: %v", err)
|
|
}
|
|
|
|
// Get the ACL.
|
|
id := resp.(string)
|
|
_, acl, err := fsm.state.ACLGet(nil, id)
|
|
if err != nil {
|
|
t.Fatalf("err: %v", err)
|
|
}
|
|
if acl == nil {
|
|
t.Fatalf("missing")
|
|
}
|
|
|
|
// Verify the ACL.
|
|
if acl.ID != id {
|
|
t.Fatalf("bad: %v", *acl)
|
|
}
|
|
if acl.Name != "User token" {
|
|
t.Fatalf("bad: %v", *acl)
|
|
}
|
|
if acl.Type != structs.ACLTypeClient {
|
|
t.Fatalf("bad: %v", *acl)
|
|
}
|
|
|
|
// Try to destroy.
|
|
destroy := structs.ACLRequest{
|
|
Datacenter: "dc1",
|
|
Op: structs.ACLDelete,
|
|
ACL: structs.ACL{
|
|
ID: id,
|
|
},
|
|
}
|
|
buf, err = structs.Encode(structs.ACLRequestType, destroy)
|
|
if err != nil {
|
|
t.Fatalf("err: %v", err)
|
|
}
|
|
resp = fsm.Apply(makeLog(buf))
|
|
if resp != nil {
|
|
t.Fatalf("resp: %v", resp)
|
|
}
|
|
|
|
_, acl, err = fsm.state.ACLGet(nil, id)
|
|
if err != nil {
|
|
t.Fatalf("err: %v", err)
|
|
}
|
|
if acl != nil {
|
|
t.Fatalf("should be destroyed")
|
|
}
|
|
|
|
// Initialize bootstrap (should work since we haven't made a management
|
|
// token).
|
|
init := structs.ACLRequest{
|
|
Datacenter: "dc1",
|
|
Op: structs.ACLBootstrapInit,
|
|
}
|
|
buf, err = structs.Encode(structs.ACLRequestType, init)
|
|
if err != nil {
|
|
t.Fatalf("err: %v", err)
|
|
}
|
|
resp = fsm.Apply(makeLog(buf))
|
|
if enabled, ok := resp.(bool); !ok || !enabled {
|
|
t.Fatalf("resp: %v", resp)
|
|
}
|
|
gotB, err := fsm.state.ACLGetBootstrap()
|
|
if err != nil {
|
|
t.Fatalf("err: %v", err)
|
|
}
|
|
wantB := &structs.ACLBootstrap{
|
|
AllowBootstrap: true,
|
|
RaftIndex: gotB.RaftIndex,
|
|
}
|
|
verify.Values(t, "", gotB, wantB)
|
|
|
|
// Do a bootstrap.
|
|
bootstrap := structs.ACLRequest{
|
|
Datacenter: "dc1",
|
|
Op: structs.ACLBootstrapNow,
|
|
ACL: structs.ACL{
|
|
ID: generateUUID(),
|
|
Name: "Bootstrap Token",
|
|
Type: structs.ACLTypeManagement,
|
|
},
|
|
}
|
|
buf, err = structs.Encode(structs.ACLRequestType, bootstrap)
|
|
if err != nil {
|
|
t.Fatalf("err: %v", err)
|
|
}
|
|
resp = fsm.Apply(makeLog(buf))
|
|
respACL, ok := resp.(*structs.ACL)
|
|
if !ok {
|
|
t.Fatalf("resp: %v", resp)
|
|
}
|
|
bootstrap.ACL.CreateIndex = respACL.CreateIndex
|
|
bootstrap.ACL.ModifyIndex = respACL.ModifyIndex
|
|
verify.Values(t, "", respACL, &bootstrap.ACL)
|
|
}
|
|
|
|
func TestFSM_PreparedQuery_CRUD(t *testing.T) {
|
|
t.Parallel()
|
|
fsm, err := NewFSM(nil, os.Stderr)
|
|
if err != nil {
|
|
t.Fatalf("err: %v", err)
|
|
}
|
|
|
|
// Register a service to query on.
|
|
fsm.state.EnsureNode(1, &structs.Node{Node: "foo", Address: "127.0.0.1"})
|
|
fsm.state.EnsureService(2, "foo", &structs.NodeService{ID: "web", Service: "web", Tags: nil, Address: "127.0.0.1", Port: 80})
|
|
|
|
// Create a new query.
|
|
query := structs.PreparedQueryRequest{
|
|
Op: structs.PreparedQueryCreate,
|
|
Query: &structs.PreparedQuery{
|
|
ID: generateUUID(),
|
|
Service: structs.ServiceQuery{
|
|
Service: "web",
|
|
},
|
|
},
|
|
}
|
|
{
|
|
buf, err := structs.Encode(structs.PreparedQueryRequestType, query)
|
|
if err != nil {
|
|
t.Fatalf("err: %v", err)
|
|
}
|
|
resp := fsm.Apply(makeLog(buf))
|
|
if resp != nil {
|
|
t.Fatalf("resp: %v", resp)
|
|
}
|
|
}
|
|
|
|
// Verify it's in the state store.
|
|
{
|
|
_, actual, err := fsm.state.PreparedQueryGet(nil, query.Query.ID)
|
|
if err != nil {
|
|
t.Fatalf("err: %s", err)
|
|
}
|
|
|
|
actual.CreateIndex, actual.ModifyIndex = 0, 0
|
|
if !reflect.DeepEqual(actual, query.Query) {
|
|
t.Fatalf("bad: %v", actual)
|
|
}
|
|
}
|
|
|
|
// Make an update to the query.
|
|
query.Op = structs.PreparedQueryUpdate
|
|
query.Query.Name = "my-query"
|
|
{
|
|
buf, err := structs.Encode(structs.PreparedQueryRequestType, query)
|
|
if err != nil {
|
|
t.Fatalf("err: %v", err)
|
|
}
|
|
resp := fsm.Apply(makeLog(buf))
|
|
if resp != nil {
|
|
t.Fatalf("resp: %v", resp)
|
|
}
|
|
}
|
|
|
|
// Verify the update.
|
|
{
|
|
_, actual, err := fsm.state.PreparedQueryGet(nil, query.Query.ID)
|
|
if err != nil {
|
|
t.Fatalf("err: %s", err)
|
|
}
|
|
|
|
actual.CreateIndex, actual.ModifyIndex = 0, 0
|
|
if !reflect.DeepEqual(actual, query.Query) {
|
|
t.Fatalf("bad: %v", actual)
|
|
}
|
|
}
|
|
|
|
// Delete the query.
|
|
query.Op = structs.PreparedQueryDelete
|
|
{
|
|
buf, err := structs.Encode(structs.PreparedQueryRequestType, query)
|
|
if err != nil {
|
|
t.Fatalf("err: %v", err)
|
|
}
|
|
resp := fsm.Apply(makeLog(buf))
|
|
if resp != nil {
|
|
t.Fatalf("resp: %v", resp)
|
|
}
|
|
}
|
|
|
|
// Make sure it's gone.
|
|
{
|
|
_, actual, err := fsm.state.PreparedQueryGet(nil, query.Query.ID)
|
|
if err != nil {
|
|
t.Fatalf("err: %s", err)
|
|
}
|
|
|
|
if actual != nil {
|
|
t.Fatalf("bad: %v", actual)
|
|
}
|
|
}
|
|
}
|
|
|
|
func TestFSM_TombstoneReap(t *testing.T) {
|
|
t.Parallel()
|
|
fsm, err := NewFSM(nil, os.Stderr)
|
|
if err != nil {
|
|
t.Fatalf("err: %v", err)
|
|
}
|
|
|
|
// Create some tombstones
|
|
fsm.state.KVSSet(11, &structs.DirEntry{
|
|
Key: "/remove",
|
|
Value: []byte("foo"),
|
|
})
|
|
fsm.state.KVSDelete(12, "/remove")
|
|
idx, _, err := fsm.state.KVSList(nil, "/remove")
|
|
if err != nil {
|
|
t.Fatalf("err: %s", err)
|
|
}
|
|
if idx != 12 {
|
|
t.Fatalf("bad index: %d", idx)
|
|
}
|
|
|
|
// Create a new reap request
|
|
req := structs.TombstoneRequest{
|
|
Datacenter: "dc1",
|
|
Op: structs.TombstoneReap,
|
|
ReapIndex: 12,
|
|
}
|
|
buf, err := structs.Encode(structs.TombstoneRequestType, req)
|
|
if err != nil {
|
|
t.Fatalf("err: %v", err)
|
|
}
|
|
resp := fsm.Apply(makeLog(buf))
|
|
if err, ok := resp.(error); ok {
|
|
t.Fatalf("resp: %v", err)
|
|
}
|
|
|
|
// Verify the tombstones are gone
|
|
snap := fsm.state.Snapshot()
|
|
defer snap.Close()
|
|
stones, err := snap.Tombstones()
|
|
if err != nil {
|
|
t.Fatalf("err: %s", err)
|
|
}
|
|
if stones.Next() != nil {
|
|
t.Fatalf("unexpected extra tombstones")
|
|
}
|
|
}
|
|
|
|
func TestFSM_Txn(t *testing.T) {
|
|
t.Parallel()
|
|
fsm, err := NewFSM(nil, os.Stderr)
|
|
if err != nil {
|
|
t.Fatalf("err: %v", err)
|
|
}
|
|
|
|
// Set a key using a transaction.
|
|
req := structs.TxnRequest{
|
|
Datacenter: "dc1",
|
|
Ops: structs.TxnOps{
|
|
&structs.TxnOp{
|
|
KV: &structs.TxnKVOp{
|
|
Verb: api.KVSet,
|
|
DirEnt: structs.DirEntry{
|
|
Key: "/test/path",
|
|
Flags: 0,
|
|
Value: []byte("test"),
|
|
},
|
|
},
|
|
},
|
|
},
|
|
}
|
|
buf, err := structs.Encode(structs.TxnRequestType, req)
|
|
if err != nil {
|
|
t.Fatalf("err: %v", err)
|
|
}
|
|
resp := fsm.Apply(makeLog(buf))
|
|
if _, ok := resp.(structs.TxnResponse); !ok {
|
|
t.Fatalf("bad response type: %T", resp)
|
|
}
|
|
|
|
// Verify key is set directly in the state store.
|
|
_, d, err := fsm.state.KVSGet(nil, "/test/path")
|
|
if err != nil {
|
|
t.Fatalf("err: %v", err)
|
|
}
|
|
if d == nil {
|
|
t.Fatalf("missing")
|
|
}
|
|
}
|
|
|
|
func TestFSM_Autopilot(t *testing.T) {
|
|
t.Parallel()
|
|
fsm, err := NewFSM(nil, os.Stderr)
|
|
if err != nil {
|
|
t.Fatalf("err: %v", err)
|
|
}
|
|
|
|
// Set the autopilot config using a request.
|
|
req := structs.AutopilotSetConfigRequest{
|
|
Datacenter: "dc1",
|
|
Config: structs.AutopilotConfig{
|
|
CleanupDeadServers: true,
|
|
LastContactThreshold: 10 * time.Second,
|
|
MaxTrailingLogs: 300,
|
|
},
|
|
}
|
|
buf, err := structs.Encode(structs.AutopilotRequestType, req)
|
|
if err != nil {
|
|
t.Fatalf("err: %v", err)
|
|
}
|
|
resp := fsm.Apply(makeLog(buf))
|
|
if _, ok := resp.(error); ok {
|
|
t.Fatalf("bad: %v", resp)
|
|
}
|
|
|
|
// Verify key is set directly in the state store.
|
|
_, config, err := fsm.state.AutopilotConfig()
|
|
if err != nil {
|
|
t.Fatalf("err: %v", err)
|
|
}
|
|
if config.CleanupDeadServers != req.Config.CleanupDeadServers {
|
|
t.Fatalf("bad: %v", config.CleanupDeadServers)
|
|
}
|
|
if config.LastContactThreshold != req.Config.LastContactThreshold {
|
|
t.Fatalf("bad: %v", config.LastContactThreshold)
|
|
}
|
|
if config.MaxTrailingLogs != req.Config.MaxTrailingLogs {
|
|
t.Fatalf("bad: %v", config.MaxTrailingLogs)
|
|
}
|
|
|
|
// Now use CAS and provide an old index
|
|
req.CAS = true
|
|
req.Config.CleanupDeadServers = false
|
|
req.Config.ModifyIndex = config.ModifyIndex - 1
|
|
buf, err = structs.Encode(structs.AutopilotRequestType, req)
|
|
if err != nil {
|
|
t.Fatalf("err: %v", err)
|
|
}
|
|
resp = fsm.Apply(makeLog(buf))
|
|
if _, ok := resp.(error); ok {
|
|
t.Fatalf("bad: %v", resp)
|
|
}
|
|
|
|
_, config, err = fsm.state.AutopilotConfig()
|
|
if err != nil {
|
|
t.Fatalf("err: %v", err)
|
|
}
|
|
if !config.CleanupDeadServers {
|
|
t.Fatalf("bad: %v", config.CleanupDeadServers)
|
|
}
|
|
}
|
|
|
|
func TestFSM_IgnoreUnknown(t *testing.T) {
|
|
t.Parallel()
|
|
fsm, err := NewFSM(nil, os.Stderr)
|
|
if err != nil {
|
|
t.Fatalf("err: %v", err)
|
|
}
|
|
|
|
// Create a new reap request
|
|
type UnknownRequest struct {
|
|
Foo string
|
|
}
|
|
req := UnknownRequest{Foo: "bar"}
|
|
msgType := structs.IgnoreUnknownTypeFlag | 64
|
|
buf, err := structs.Encode(msgType, req)
|
|
if err != nil {
|
|
t.Fatalf("err: %v", err)
|
|
}
|
|
|
|
// Apply should work, even though not supported
|
|
resp := fsm.Apply(makeLog(buf))
|
|
if err, ok := resp.(error); ok {
|
|
t.Fatalf("resp: %v", err)
|
|
}
|
|
}
|