Creates a registration mechanism for snapshot and restore.
This commit is contained in:
parent
8571555703
commit
c4bc89a187
|
@ -107,7 +107,7 @@ func (c *FSM) Apply(log *raft.Log) interface{} {
|
|||
}
|
||||
|
||||
// Apply based on the dispatch table, if possible.
|
||||
if fn, ok := c.apply[msgType]; ok {
|
||||
if fn := c.apply[msgType]; fn != nil {
|
||||
return fn(buf[1:], log.Index)
|
||||
}
|
||||
|
||||
|
@ -164,102 +164,15 @@ func (c *FSM) Restore(old io.ReadCloser) error {
|
|||
}
|
||||
|
||||
// Decode
|
||||
switch structs.MessageType(msgType[0]) {
|
||||
case structs.RegisterRequestType:
|
||||
var req structs.RegisterRequest
|
||||
if err := dec.Decode(&req); err != nil {
|
||||
msg := structs.MessageType(msgType[0])
|
||||
if fn := restorers[msg]; fn != nil {
|
||||
if err := fn(&header, restore, dec); err != nil {
|
||||
return err
|
||||
}
|
||||
if err := restore.Registration(header.LastIndex, &req); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
case structs.KVSRequestType:
|
||||
var req structs.DirEntry
|
||||
if err := dec.Decode(&req); err != nil {
|
||||
return err
|
||||
}
|
||||
if err := restore.KVS(&req); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
case structs.TombstoneRequestType:
|
||||
var req structs.DirEntry
|
||||
if err := dec.Decode(&req); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
// For historical reasons, these are serialized in the
|
||||
// snapshots as KV entries. We want to keep the snapshot
|
||||
// format compatible with pre-0.6 versions for now.
|
||||
stone := &state.Tombstone{
|
||||
Key: req.Key,
|
||||
Index: req.ModifyIndex,
|
||||
}
|
||||
if err := restore.Tombstone(stone); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
case structs.SessionRequestType:
|
||||
var req structs.Session
|
||||
if err := dec.Decode(&req); err != nil {
|
||||
return err
|
||||
}
|
||||
if err := restore.Session(&req); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
case structs.ACLRequestType:
|
||||
var req structs.ACL
|
||||
if err := dec.Decode(&req); err != nil {
|
||||
return err
|
||||
}
|
||||
if err := restore.ACL(&req); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
case structs.ACLBootstrapRequestType:
|
||||
var req structs.ACLBootstrap
|
||||
if err := dec.Decode(&req); err != nil {
|
||||
return err
|
||||
}
|
||||
if err := restore.ACLBootstrap(&req); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
case structs.CoordinateBatchUpdateType:
|
||||
var req structs.Coordinates
|
||||
if err := dec.Decode(&req); err != nil {
|
||||
return err
|
||||
|
||||
}
|
||||
if err := restore.Coordinates(header.LastIndex, req); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
case structs.PreparedQueryRequestType:
|
||||
var req structs.PreparedQuery
|
||||
if err := dec.Decode(&req); err != nil {
|
||||
return err
|
||||
}
|
||||
if err := restore.PreparedQuery(&req); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
case structs.AutopilotRequestType:
|
||||
var req structs.AutopilotConfig
|
||||
if err := dec.Decode(&req); err != nil {
|
||||
return err
|
||||
}
|
||||
if err := restore.Autopilot(&req); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
default:
|
||||
return fmt.Errorf("Unrecognized msg type: %v", msgType)
|
||||
} else {
|
||||
return fmt.Errorf("Unrecognized msg type %d", msg)
|
||||
}
|
||||
}
|
||||
|
||||
restore.Commit()
|
||||
|
||||
// External code might be calling State(), so we need to synchronize
|
||||
|
|
|
@ -3,16 +3,10 @@ package fsm
|
|||
import (
|
||||
"bytes"
|
||||
"os"
|
||||
"reflect"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/hashicorp/consul/agent/consul/state"
|
||||
"github.com/hashicorp/consul/agent/structs"
|
||||
"github.com/hashicorp/consul/api"
|
||||
"github.com/hashicorp/consul/lib"
|
||||
"github.com/hashicorp/raft"
|
||||
"github.com/pascaldekloe/goe/verify"
|
||||
)
|
||||
|
||||
type MockSink struct {
|
||||
|
@ -42,317 +36,6 @@ func makeLog(buf []byte) *raft.Log {
|
|||
}
|
||||
}
|
||||
|
||||
func TestFSM_SnapshotRestore(t *testing.T) {
|
||||
t.Parallel()
|
||||
fsm, err := New(nil, os.Stderr)
|
||||
if err != nil {
|
||||
t.Fatalf("err: %v", err)
|
||||
}
|
||||
|
||||
// Add some state
|
||||
fsm.state.EnsureNode(1, &structs.Node{Node: "foo", Address: "127.0.0.1"})
|
||||
fsm.state.EnsureNode(2, &structs.Node{Node: "baz", Address: "127.0.0.2", TaggedAddresses: map[string]string{"hello": "1.2.3.4"}})
|
||||
fsm.state.EnsureService(3, "foo", &structs.NodeService{ID: "web", Service: "web", Tags: nil, Address: "127.0.0.1", Port: 80})
|
||||
fsm.state.EnsureService(4, "foo", &structs.NodeService{ID: "db", Service: "db", Tags: []string{"primary"}, Address: "127.0.0.1", Port: 5000})
|
||||
fsm.state.EnsureService(5, "baz", &structs.NodeService{ID: "web", Service: "web", Tags: nil, Address: "127.0.0.2", Port: 80})
|
||||
fsm.state.EnsureService(6, "baz", &structs.NodeService{ID: "db", Service: "db", Tags: []string{"secondary"}, Address: "127.0.0.2", Port: 5000})
|
||||
fsm.state.EnsureCheck(7, &structs.HealthCheck{
|
||||
Node: "foo",
|
||||
CheckID: "web",
|
||||
Name: "web connectivity",
|
||||
Status: api.HealthPassing,
|
||||
ServiceID: "web",
|
||||
})
|
||||
fsm.state.KVSSet(8, &structs.DirEntry{
|
||||
Key: "/test",
|
||||
Value: []byte("foo"),
|
||||
})
|
||||
session := &structs.Session{ID: generateUUID(), Node: "foo"}
|
||||
fsm.state.SessionCreate(9, session)
|
||||
acl := &structs.ACL{ID: generateUUID(), Name: "User Token"}
|
||||
fsm.state.ACLSet(10, acl)
|
||||
if _, err := fsm.state.ACLBootstrapInit(10); err != nil {
|
||||
t.Fatalf("err: %v", err)
|
||||
}
|
||||
|
||||
fsm.state.KVSSet(11, &structs.DirEntry{
|
||||
Key: "/remove",
|
||||
Value: []byte("foo"),
|
||||
})
|
||||
fsm.state.KVSDelete(12, "/remove")
|
||||
idx, _, err := fsm.state.KVSList(nil, "/remove")
|
||||
if err != nil {
|
||||
t.Fatalf("err: %s", err)
|
||||
}
|
||||
if idx != 12 {
|
||||
t.Fatalf("bad index: %d", idx)
|
||||
}
|
||||
|
||||
updates := structs.Coordinates{
|
||||
&structs.Coordinate{
|
||||
Node: "baz",
|
||||
Coord: generateRandomCoordinate(),
|
||||
},
|
||||
&structs.Coordinate{
|
||||
Node: "foo",
|
||||
Coord: generateRandomCoordinate(),
|
||||
},
|
||||
}
|
||||
if err := fsm.state.CoordinateBatchUpdate(13, updates); err != nil {
|
||||
t.Fatalf("err: %s", err)
|
||||
}
|
||||
|
||||
query := structs.PreparedQuery{
|
||||
ID: generateUUID(),
|
||||
Service: structs.ServiceQuery{
|
||||
Service: "web",
|
||||
},
|
||||
RaftIndex: structs.RaftIndex{
|
||||
CreateIndex: 14,
|
||||
ModifyIndex: 14,
|
||||
},
|
||||
}
|
||||
if err := fsm.state.PreparedQuerySet(14, &query); err != nil {
|
||||
t.Fatalf("err: %s", err)
|
||||
}
|
||||
|
||||
autopilotConf := &structs.AutopilotConfig{
|
||||
CleanupDeadServers: true,
|
||||
LastContactThreshold: 100 * time.Millisecond,
|
||||
MaxTrailingLogs: 222,
|
||||
}
|
||||
if err := fsm.state.AutopilotSetConfig(15, autopilotConf); err != nil {
|
||||
t.Fatalf("err: %s", err)
|
||||
}
|
||||
|
||||
// Snapshot
|
||||
snap, err := fsm.Snapshot()
|
||||
if err != nil {
|
||||
t.Fatalf("err: %v", err)
|
||||
}
|
||||
defer snap.Release()
|
||||
|
||||
// Persist
|
||||
buf := bytes.NewBuffer(nil)
|
||||
sink := &MockSink{buf, false}
|
||||
if err := snap.Persist(sink); err != nil {
|
||||
t.Fatalf("err: %v", err)
|
||||
}
|
||||
|
||||
// Try to restore on a new FSM
|
||||
fsm2, err := New(nil, os.Stderr)
|
||||
if err != nil {
|
||||
t.Fatalf("err: %v", err)
|
||||
}
|
||||
|
||||
// Do a restore
|
||||
if err := fsm2.Restore(sink); err != nil {
|
||||
t.Fatalf("err: %v", err)
|
||||
}
|
||||
|
||||
// Verify the contents
|
||||
_, nodes, err := fsm2.state.Nodes(nil)
|
||||
if err != nil {
|
||||
t.Fatalf("err: %s", err)
|
||||
}
|
||||
if len(nodes) != 2 {
|
||||
t.Fatalf("bad: %v", nodes)
|
||||
}
|
||||
if nodes[0].Node != "baz" ||
|
||||
nodes[0].Address != "127.0.0.2" ||
|
||||
len(nodes[0].TaggedAddresses) != 1 ||
|
||||
nodes[0].TaggedAddresses["hello"] != "1.2.3.4" {
|
||||
t.Fatalf("bad: %v", nodes[0])
|
||||
}
|
||||
if nodes[1].Node != "foo" ||
|
||||
nodes[1].Address != "127.0.0.1" ||
|
||||
len(nodes[1].TaggedAddresses) != 0 {
|
||||
t.Fatalf("bad: %v", nodes[1])
|
||||
}
|
||||
|
||||
_, fooSrv, err := fsm2.state.NodeServices(nil, "foo")
|
||||
if err != nil {
|
||||
t.Fatalf("err: %s", err)
|
||||
}
|
||||
if len(fooSrv.Services) != 2 {
|
||||
t.Fatalf("Bad: %v", fooSrv)
|
||||
}
|
||||
if !lib.StrContains(fooSrv.Services["db"].Tags, "primary") {
|
||||
t.Fatalf("Bad: %v", fooSrv)
|
||||
}
|
||||
if fooSrv.Services["db"].Port != 5000 {
|
||||
t.Fatalf("Bad: %v", fooSrv)
|
||||
}
|
||||
|
||||
_, checks, err := fsm2.state.NodeChecks(nil, "foo")
|
||||
if err != nil {
|
||||
t.Fatalf("err: %s", err)
|
||||
}
|
||||
if len(checks) != 1 {
|
||||
t.Fatalf("Bad: %v", checks)
|
||||
}
|
||||
|
||||
// Verify key is set
|
||||
_, d, err := fsm2.state.KVSGet(nil, "/test")
|
||||
if err != nil {
|
||||
t.Fatalf("err: %v", err)
|
||||
}
|
||||
if string(d.Value) != "foo" {
|
||||
t.Fatalf("bad: %v", d)
|
||||
}
|
||||
|
||||
// Verify session is restored
|
||||
idx, s, err := fsm2.state.SessionGet(nil, session.ID)
|
||||
if err != nil {
|
||||
t.Fatalf("err: %v", err)
|
||||
}
|
||||
if s.Node != "foo" {
|
||||
t.Fatalf("bad: %v", s)
|
||||
}
|
||||
if idx <= 1 {
|
||||
t.Fatalf("bad index: %d", idx)
|
||||
}
|
||||
|
||||
// Verify ACL is restored
|
||||
_, a, err := fsm2.state.ACLGet(nil, acl.ID)
|
||||
if err != nil {
|
||||
t.Fatalf("err: %v", err)
|
||||
}
|
||||
if a.Name != "User Token" {
|
||||
t.Fatalf("bad: %v", a)
|
||||
}
|
||||
if a.ModifyIndex <= 1 {
|
||||
t.Fatalf("bad index: %d", idx)
|
||||
}
|
||||
gotB, err := fsm2.state.ACLGetBootstrap()
|
||||
if err != nil {
|
||||
t.Fatalf("err: %v", err)
|
||||
}
|
||||
wantB := &structs.ACLBootstrap{
|
||||
AllowBootstrap: true,
|
||||
RaftIndex: structs.RaftIndex{
|
||||
CreateIndex: 10,
|
||||
ModifyIndex: 10,
|
||||
},
|
||||
}
|
||||
verify.Values(t, "", gotB, wantB)
|
||||
|
||||
// Verify tombstones are restored
|
||||
func() {
|
||||
snap := fsm2.state.Snapshot()
|
||||
defer snap.Close()
|
||||
stones, err := snap.Tombstones()
|
||||
if err != nil {
|
||||
t.Fatalf("err: %s", err)
|
||||
}
|
||||
stone := stones.Next().(*state.Tombstone)
|
||||
if stone == nil {
|
||||
t.Fatalf("missing tombstone")
|
||||
}
|
||||
if stone.Key != "/remove" || stone.Index != 12 {
|
||||
t.Fatalf("bad: %v", stone)
|
||||
}
|
||||
if stones.Next() != nil {
|
||||
t.Fatalf("unexpected extra tombstones")
|
||||
}
|
||||
}()
|
||||
|
||||
// Verify coordinates are restored
|
||||
_, coords, err := fsm2.state.Coordinates(nil)
|
||||
if err != nil {
|
||||
t.Fatalf("err: %s", err)
|
||||
}
|
||||
if !reflect.DeepEqual(coords, updates) {
|
||||
t.Fatalf("bad: %#v", coords)
|
||||
}
|
||||
|
||||
// Verify queries are restored.
|
||||
_, queries, err := fsm2.state.PreparedQueryList(nil)
|
||||
if err != nil {
|
||||
t.Fatalf("err: %s", err)
|
||||
}
|
||||
if len(queries) != 1 {
|
||||
t.Fatalf("bad: %#v", queries)
|
||||
}
|
||||
if !reflect.DeepEqual(queries[0], &query) {
|
||||
t.Fatalf("bad: %#v", queries[0])
|
||||
}
|
||||
|
||||
// Verify autopilot config is restored.
|
||||
_, restoredConf, err := fsm2.state.AutopilotConfig()
|
||||
if err != nil {
|
||||
t.Fatalf("err: %s", err)
|
||||
}
|
||||
if !reflect.DeepEqual(restoredConf, autopilotConf) {
|
||||
t.Fatalf("bad: %#v, %#v", restoredConf, autopilotConf)
|
||||
}
|
||||
|
||||
// Snapshot
|
||||
snap, err = fsm2.Snapshot()
|
||||
if err != nil {
|
||||
t.Fatalf("err: %v", err)
|
||||
}
|
||||
defer snap.Release()
|
||||
|
||||
// Persist
|
||||
buf = bytes.NewBuffer(nil)
|
||||
sink = &MockSink{buf, false}
|
||||
if err := snap.Persist(sink); err != nil {
|
||||
t.Fatalf("err: %v", err)
|
||||
}
|
||||
|
||||
// Try to restore on the old FSM and make sure it abandons the old state
|
||||
// store.
|
||||
abandonCh := fsm.state.AbandonCh()
|
||||
if err := fsm.Restore(sink); err != nil {
|
||||
t.Fatalf("err: %v", err)
|
||||
}
|
||||
select {
|
||||
case <-abandonCh:
|
||||
default:
|
||||
t.Fatalf("bad")
|
||||
}
|
||||
}
|
||||
|
||||
func TestFSM_BadRestore(t *testing.T) {
|
||||
t.Parallel()
|
||||
// Create an FSM with some state.
|
||||
fsm, err := New(nil, os.Stderr)
|
||||
if err != nil {
|
||||
t.Fatalf("err: %v", err)
|
||||
}
|
||||
fsm.state.EnsureNode(1, &structs.Node{Node: "foo", Address: "127.0.0.1"})
|
||||
abandonCh := fsm.state.AbandonCh()
|
||||
|
||||
// Do a bad restore.
|
||||
buf := bytes.NewBuffer([]byte("bad snapshot"))
|
||||
sink := &MockSink{buf, false}
|
||||
if err := fsm.Restore(sink); err == nil {
|
||||
t.Fatalf("err: %v", err)
|
||||
}
|
||||
|
||||
// Verify the contents didn't get corrupted.
|
||||
_, nodes, err := fsm.state.Nodes(nil)
|
||||
if err != nil {
|
||||
t.Fatalf("err: %s", err)
|
||||
}
|
||||
if len(nodes) != 1 {
|
||||
t.Fatalf("bad: %v", nodes)
|
||||
}
|
||||
if nodes[0].Node != "foo" ||
|
||||
nodes[0].Address != "127.0.0.1" ||
|
||||
len(nodes[0].TaggedAddresses) != 0 {
|
||||
t.Fatalf("bad: %v", nodes[0])
|
||||
}
|
||||
|
||||
// Verify the old state store didn't get abandoned.
|
||||
select {
|
||||
case <-abandonCh:
|
||||
t.Fatalf("bad")
|
||||
default:
|
||||
}
|
||||
}
|
||||
|
||||
func TestFSM_IgnoreUnknown(t *testing.T) {
|
||||
t.Parallel()
|
||||
fsm, err := New(nil, os.Stderr)
|
||||
|
|
|
@ -1,6 +1,7 @@
|
|||
package fsm
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"time"
|
||||
|
||||
"github.com/armon/go-metrics"
|
||||
|
@ -24,272 +25,61 @@ type snapshotHeader struct {
|
|||
LastIndex uint64
|
||||
}
|
||||
|
||||
// persister is a function used to help snapshot the FSM state.
|
||||
type persister func(s *snapshot, sink raft.SnapshotSink, encoder *codec.Encoder) error
|
||||
|
||||
// persisters is a list of snapshot functions.
|
||||
var persisters []persister
|
||||
|
||||
// registerPersister adds a new helper. This should be called at package
|
||||
// init() time.
|
||||
func registerPersister(fn persister) {
|
||||
persisters = append(persisters, fn)
|
||||
}
|
||||
|
||||
// restorer is a function used to load back a snapshot of the FSM state.
|
||||
type restorer func(header *snapshotHeader, restore *state.Restore, decoder *codec.Decoder) error
|
||||
|
||||
// restorers is a map of restore functions by message type.
|
||||
var restorers map[structs.MessageType]restorer
|
||||
|
||||
// registerRestorer adds a new helper. This should be called at package
|
||||
// init() time.
|
||||
func registerRestorer(msg structs.MessageType, fn restorer) {
|
||||
if restorers == nil {
|
||||
restorers = make(map[structs.MessageType]restorer)
|
||||
}
|
||||
if restorers[msg] != nil {
|
||||
panic(fmt.Errorf("Message %d is already registered", msg))
|
||||
}
|
||||
restorers[msg] = fn
|
||||
}
|
||||
|
||||
// Persist saves the FSM snapshot out to the given sink.
|
||||
func (s *snapshot) Persist(sink raft.SnapshotSink) error {
|
||||
defer metrics.MeasureSince([]string{"consul", "fsm", "persist"}, time.Now())
|
||||
defer metrics.MeasureSince([]string{"fsm", "persist"}, time.Now())
|
||||
|
||||
// Register the nodes
|
||||
encoder := codec.NewEncoder(sink, msgpackHandle)
|
||||
|
||||
// Write the header
|
||||
header := snapshotHeader{
|
||||
LastIndex: s.state.LastIndex(),
|
||||
}
|
||||
encoder := codec.NewEncoder(sink, msgpackHandle)
|
||||
if err := encoder.Encode(&header); err != nil {
|
||||
sink.Cancel()
|
||||
return err
|
||||
}
|
||||
|
||||
if err := s.persistNodes(sink, encoder); err != nil {
|
||||
sink.Cancel()
|
||||
return err
|
||||
}
|
||||
|
||||
if err := s.persistSessions(sink, encoder); err != nil {
|
||||
sink.Cancel()
|
||||
return err
|
||||
}
|
||||
|
||||
if err := s.persistACLs(sink, encoder); err != nil {
|
||||
sink.Cancel()
|
||||
return err
|
||||
}
|
||||
|
||||
if err := s.persistKVs(sink, encoder); err != nil {
|
||||
sink.Cancel()
|
||||
return err
|
||||
}
|
||||
|
||||
if err := s.persistTombstones(sink, encoder); err != nil {
|
||||
sink.Cancel()
|
||||
return err
|
||||
}
|
||||
|
||||
if err := s.persistPreparedQueries(sink, encoder); err != nil {
|
||||
sink.Cancel()
|
||||
return err
|
||||
}
|
||||
|
||||
if err := s.persistAutopilot(sink, encoder); err != nil {
|
||||
sink.Cancel()
|
||||
return err
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (s *snapshot) persistNodes(sink raft.SnapshotSink,
|
||||
encoder *codec.Encoder) error {
|
||||
|
||||
// Get all the nodes
|
||||
nodes, err := s.state.Nodes()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
// Register each node
|
||||
for node := nodes.Next(); node != nil; node = nodes.Next() {
|
||||
n := node.(*structs.Node)
|
||||
req := structs.RegisterRequest{
|
||||
Node: n.Node,
|
||||
Address: n.Address,
|
||||
TaggedAddresses: n.TaggedAddresses,
|
||||
}
|
||||
|
||||
// Register the node itself
|
||||
if _, err := sink.Write([]byte{byte(structs.RegisterRequestType)}); err != nil {
|
||||
return err
|
||||
}
|
||||
if err := encoder.Encode(&req); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
// Register each service this node has
|
||||
services, err := s.state.Services(n.Node)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
for service := services.Next(); service != nil; service = services.Next() {
|
||||
if _, err := sink.Write([]byte{byte(structs.RegisterRequestType)}); err != nil {
|
||||
return err
|
||||
}
|
||||
req.Service = service.(*structs.ServiceNode).ToNodeService()
|
||||
if err := encoder.Encode(&req); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
||||
// Register each check this node has
|
||||
req.Service = nil
|
||||
checks, err := s.state.Checks(n.Node)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
for check := checks.Next(); check != nil; check = checks.Next() {
|
||||
if _, err := sink.Write([]byte{byte(structs.RegisterRequestType)}); err != nil {
|
||||
return err
|
||||
}
|
||||
req.Check = check.(*structs.HealthCheck)
|
||||
if err := encoder.Encode(&req); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Save the coordinates separately since they are not part of the
|
||||
// register request interface. To avoid copying them out, we turn
|
||||
// them into batches with a single coordinate each.
|
||||
coords, err := s.state.Coordinates()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
for coord := coords.Next(); coord != nil; coord = coords.Next() {
|
||||
if _, err := sink.Write([]byte{byte(structs.CoordinateBatchUpdateType)}); err != nil {
|
||||
return err
|
||||
}
|
||||
updates := structs.Coordinates{coord.(*structs.Coordinate)}
|
||||
if err := encoder.Encode(&updates); err != nil {
|
||||
// Run all the persisters to write the FSM state.
|
||||
for _, fn := range persisters {
|
||||
if err := fn(s, sink, encoder); err != nil {
|
||||
sink.Cancel()
|
||||
return err
|
||||
}
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (s *snapshot) persistSessions(sink raft.SnapshotSink,
|
||||
encoder *codec.Encoder) error {
|
||||
sessions, err := s.state.Sessions()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
for session := sessions.Next(); session != nil; session = sessions.Next() {
|
||||
if _, err := sink.Write([]byte{byte(structs.SessionRequestType)}); err != nil {
|
||||
return err
|
||||
}
|
||||
if err := encoder.Encode(session.(*structs.Session)); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (s *snapshot) persistACLs(sink raft.SnapshotSink,
|
||||
encoder *codec.Encoder) error {
|
||||
acls, err := s.state.ACLs()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
for acl := acls.Next(); acl != nil; acl = acls.Next() {
|
||||
if _, err := sink.Write([]byte{byte(structs.ACLRequestType)}); err != nil {
|
||||
return err
|
||||
}
|
||||
if err := encoder.Encode(acl.(*structs.ACL)); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
||||
bs, err := s.state.ACLBootstrap()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
if bs != nil {
|
||||
if _, err := sink.Write([]byte{byte(structs.ACLBootstrapRequestType)}); err != nil {
|
||||
return err
|
||||
}
|
||||
if err := encoder.Encode(bs); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (s *snapshot) persistKVs(sink raft.SnapshotSink,
|
||||
encoder *codec.Encoder) error {
|
||||
entries, err := s.state.KVs()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
for entry := entries.Next(); entry != nil; entry = entries.Next() {
|
||||
if _, err := sink.Write([]byte{byte(structs.KVSRequestType)}); err != nil {
|
||||
return err
|
||||
}
|
||||
if err := encoder.Encode(entry.(*structs.DirEntry)); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (s *snapshot) persistTombstones(sink raft.SnapshotSink,
|
||||
encoder *codec.Encoder) error {
|
||||
stones, err := s.state.Tombstones()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
for stone := stones.Next(); stone != nil; stone = stones.Next() {
|
||||
if _, err := sink.Write([]byte{byte(structs.TombstoneRequestType)}); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
// For historical reasons, these are serialized in the snapshots
|
||||
// as KV entries. We want to keep the snapshot format compatible
|
||||
// with pre-0.6 versions for now.
|
||||
s := stone.(*state.Tombstone)
|
||||
fake := &structs.DirEntry{
|
||||
Key: s.Key,
|
||||
RaftIndex: structs.RaftIndex{
|
||||
ModifyIndex: s.Index,
|
||||
},
|
||||
}
|
||||
if err := encoder.Encode(fake); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (s *snapshot) persistPreparedQueries(sink raft.SnapshotSink,
|
||||
encoder *codec.Encoder) error {
|
||||
queries, err := s.state.PreparedQueries()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
for _, query := range queries {
|
||||
if _, err := sink.Write([]byte{byte(structs.PreparedQueryRequestType)}); err != nil {
|
||||
return err
|
||||
}
|
||||
if err := encoder.Encode(query); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (s *snapshot) persistAutopilot(sink raft.SnapshotSink,
|
||||
encoder *codec.Encoder) error {
|
||||
autopilot, err := s.state.Autopilot()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
if autopilot == nil {
|
||||
return nil
|
||||
}
|
||||
|
||||
if _, err := sink.Write([]byte{byte(structs.AutopilotRequestType)}); err != nil {
|
||||
return err
|
||||
}
|
||||
if err := encoder.Encode(autopilot); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (s *snapshot) Release() {
|
||||
s.state.Close()
|
||||
}
|
||||
|
|
|
@ -0,0 +1,365 @@
|
|||
package fsm
|
||||
|
||||
import (
|
||||
"github.com/hashicorp/consul/agent/consul/state"
|
||||
"github.com/hashicorp/consul/agent/structs"
|
||||
"github.com/hashicorp/go-msgpack/codec"
|
||||
"github.com/hashicorp/raft"
|
||||
)
|
||||
|
||||
func init() {
|
||||
registerPersister(persistOSS)
|
||||
|
||||
registerRestorer(structs.RegisterRequestType, restoreRegistration)
|
||||
registerRestorer(structs.KVSRequestType, restoreKV)
|
||||
registerRestorer(structs.TombstoneRequestType, restoreTombstone)
|
||||
registerRestorer(structs.SessionRequestType, restoreSession)
|
||||
registerRestorer(structs.ACLRequestType, restoreACL)
|
||||
registerRestorer(structs.ACLBootstrapRequestType, restoreACLBootstrap)
|
||||
registerRestorer(structs.CoordinateBatchUpdateType, restoreCoordinates)
|
||||
registerRestorer(structs.PreparedQueryRequestType, restorePreparedQuery)
|
||||
registerRestorer(structs.AutopilotRequestType, restoreAutopilot)
|
||||
}
|
||||
|
||||
func persistOSS(s *snapshot, sink raft.SnapshotSink, encoder *codec.Encoder) error {
|
||||
if err := s.persistNodes(sink, encoder); err != nil {
|
||||
return err
|
||||
}
|
||||
if err := s.persistSessions(sink, encoder); err != nil {
|
||||
return err
|
||||
}
|
||||
if err := s.persistACLs(sink, encoder); err != nil {
|
||||
return err
|
||||
}
|
||||
if err := s.persistKVs(sink, encoder); err != nil {
|
||||
return err
|
||||
}
|
||||
if err := s.persistTombstones(sink, encoder); err != nil {
|
||||
return err
|
||||
}
|
||||
if err := s.persistPreparedQueries(sink, encoder); err != nil {
|
||||
return err
|
||||
}
|
||||
if err := s.persistAutopilot(sink, encoder); err != nil {
|
||||
return err
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (s *snapshot) persistNodes(sink raft.SnapshotSink,
|
||||
encoder *codec.Encoder) error {
|
||||
|
||||
// Get all the nodes
|
||||
nodes, err := s.state.Nodes()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
// Register each node
|
||||
for node := nodes.Next(); node != nil; node = nodes.Next() {
|
||||
n := node.(*structs.Node)
|
||||
req := structs.RegisterRequest{
|
||||
Node: n.Node,
|
||||
Address: n.Address,
|
||||
TaggedAddresses: n.TaggedAddresses,
|
||||
}
|
||||
|
||||
// Register the node itself
|
||||
if _, err := sink.Write([]byte{byte(structs.RegisterRequestType)}); err != nil {
|
||||
return err
|
||||
}
|
||||
if err := encoder.Encode(&req); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
// Register each service this node has
|
||||
services, err := s.state.Services(n.Node)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
for service := services.Next(); service != nil; service = services.Next() {
|
||||
if _, err := sink.Write([]byte{byte(structs.RegisterRequestType)}); err != nil {
|
||||
return err
|
||||
}
|
||||
req.Service = service.(*structs.ServiceNode).ToNodeService()
|
||||
if err := encoder.Encode(&req); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
||||
// Register each check this node has
|
||||
req.Service = nil
|
||||
checks, err := s.state.Checks(n.Node)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
for check := checks.Next(); check != nil; check = checks.Next() {
|
||||
if _, err := sink.Write([]byte{byte(structs.RegisterRequestType)}); err != nil {
|
||||
return err
|
||||
}
|
||||
req.Check = check.(*structs.HealthCheck)
|
||||
if err := encoder.Encode(&req); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Save the coordinates separately since they are not part of the
|
||||
// register request interface. To avoid copying them out, we turn
|
||||
// them into batches with a single coordinate each.
|
||||
coords, err := s.state.Coordinates()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
for coord := coords.Next(); coord != nil; coord = coords.Next() {
|
||||
if _, err := sink.Write([]byte{byte(structs.CoordinateBatchUpdateType)}); err != nil {
|
||||
return err
|
||||
}
|
||||
updates := structs.Coordinates{coord.(*structs.Coordinate)}
|
||||
if err := encoder.Encode(&updates); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (s *snapshot) persistSessions(sink raft.SnapshotSink,
|
||||
encoder *codec.Encoder) error {
|
||||
sessions, err := s.state.Sessions()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
for session := sessions.Next(); session != nil; session = sessions.Next() {
|
||||
if _, err := sink.Write([]byte{byte(structs.SessionRequestType)}); err != nil {
|
||||
return err
|
||||
}
|
||||
if err := encoder.Encode(session.(*structs.Session)); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (s *snapshot) persistACLs(sink raft.SnapshotSink,
|
||||
encoder *codec.Encoder) error {
|
||||
acls, err := s.state.ACLs()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
for acl := acls.Next(); acl != nil; acl = acls.Next() {
|
||||
if _, err := sink.Write([]byte{byte(structs.ACLRequestType)}); err != nil {
|
||||
return err
|
||||
}
|
||||
if err := encoder.Encode(acl.(*structs.ACL)); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
||||
bs, err := s.state.ACLBootstrap()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
if bs != nil {
|
||||
if _, err := sink.Write([]byte{byte(structs.ACLBootstrapRequestType)}); err != nil {
|
||||
return err
|
||||
}
|
||||
if err := encoder.Encode(bs); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (s *snapshot) persistKVs(sink raft.SnapshotSink,
|
||||
encoder *codec.Encoder) error {
|
||||
entries, err := s.state.KVs()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
for entry := entries.Next(); entry != nil; entry = entries.Next() {
|
||||
if _, err := sink.Write([]byte{byte(structs.KVSRequestType)}); err != nil {
|
||||
return err
|
||||
}
|
||||
if err := encoder.Encode(entry.(*structs.DirEntry)); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (s *snapshot) persistTombstones(sink raft.SnapshotSink,
|
||||
encoder *codec.Encoder) error {
|
||||
stones, err := s.state.Tombstones()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
for stone := stones.Next(); stone != nil; stone = stones.Next() {
|
||||
if _, err := sink.Write([]byte{byte(structs.TombstoneRequestType)}); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
// For historical reasons, these are serialized in the snapshots
|
||||
// as KV entries. We want to keep the snapshot format compatible
|
||||
// with pre-0.6 versions for now.
|
||||
s := stone.(*state.Tombstone)
|
||||
fake := &structs.DirEntry{
|
||||
Key: s.Key,
|
||||
RaftIndex: structs.RaftIndex{
|
||||
ModifyIndex: s.Index,
|
||||
},
|
||||
}
|
||||
if err := encoder.Encode(fake); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (s *snapshot) persistPreparedQueries(sink raft.SnapshotSink,
|
||||
encoder *codec.Encoder) error {
|
||||
queries, err := s.state.PreparedQueries()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
for _, query := range queries {
|
||||
if _, err := sink.Write([]byte{byte(structs.PreparedQueryRequestType)}); err != nil {
|
||||
return err
|
||||
}
|
||||
if err := encoder.Encode(query); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (s *snapshot) persistAutopilot(sink raft.SnapshotSink,
|
||||
encoder *codec.Encoder) error {
|
||||
autopilot, err := s.state.Autopilot()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
if autopilot == nil {
|
||||
return nil
|
||||
}
|
||||
|
||||
if _, err := sink.Write([]byte{byte(structs.AutopilotRequestType)}); err != nil {
|
||||
return err
|
||||
}
|
||||
if err := encoder.Encode(autopilot); err != nil {
|
||||
return err
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func restoreRegistration(header *snapshotHeader, restore *state.Restore, decoder *codec.Decoder) error {
|
||||
var req structs.RegisterRequest
|
||||
if err := decoder.Decode(&req); err != nil {
|
||||
return err
|
||||
}
|
||||
if err := restore.Registration(header.LastIndex, &req); err != nil {
|
||||
return err
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func restoreKV(header *snapshotHeader, restore *state.Restore, decoder *codec.Decoder) error {
|
||||
var req structs.DirEntry
|
||||
if err := decoder.Decode(&req); err != nil {
|
||||
return err
|
||||
}
|
||||
if err := restore.KVS(&req); err != nil {
|
||||
return err
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func restoreTombstone(header *snapshotHeader, restore *state.Restore, decoder *codec.Decoder) error {
|
||||
var req structs.DirEntry
|
||||
if err := decoder.Decode(&req); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
// For historical reasons, these are serialized in the
|
||||
// snapshots as KV entries. We want to keep the snapshot
|
||||
// format compatible with pre-0.6 versions for now.
|
||||
stone := &state.Tombstone{
|
||||
Key: req.Key,
|
||||
Index: req.ModifyIndex,
|
||||
}
|
||||
if err := restore.Tombstone(stone); err != nil {
|
||||
return err
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func restoreSession(header *snapshotHeader, restore *state.Restore, decoder *codec.Decoder) error {
|
||||
var req structs.Session
|
||||
if err := decoder.Decode(&req); err != nil {
|
||||
return err
|
||||
}
|
||||
if err := restore.Session(&req); err != nil {
|
||||
return err
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func restoreACL(header *snapshotHeader, restore *state.Restore, decoder *codec.Decoder) error {
|
||||
var req structs.ACL
|
||||
if err := decoder.Decode(&req); err != nil {
|
||||
return err
|
||||
}
|
||||
if err := restore.ACL(&req); err != nil {
|
||||
return err
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func restoreACLBootstrap(header *snapshotHeader, restore *state.Restore, decoder *codec.Decoder) error {
|
||||
var req structs.ACLBootstrap
|
||||
if err := decoder.Decode(&req); err != nil {
|
||||
return err
|
||||
}
|
||||
if err := restore.ACLBootstrap(&req); err != nil {
|
||||
return err
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func restoreCoordinates(header *snapshotHeader, restore *state.Restore, decoder *codec.Decoder) error {
|
||||
var req structs.Coordinates
|
||||
if err := decoder.Decode(&req); err != nil {
|
||||
return err
|
||||
}
|
||||
if err := restore.Coordinates(header.LastIndex, req); err != nil {
|
||||
return err
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func restorePreparedQuery(header *snapshotHeader, restore *state.Restore, decoder *codec.Decoder) error {
|
||||
var req structs.PreparedQuery
|
||||
if err := decoder.Decode(&req); err != nil {
|
||||
return err
|
||||
}
|
||||
if err := restore.PreparedQuery(&req); err != nil {
|
||||
return err
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func restoreAutopilot(header *snapshotHeader, restore *state.Restore, decoder *codec.Decoder) error {
|
||||
var req structs.AutopilotConfig
|
||||
if err := decoder.Decode(&req); err != nil {
|
||||
return err
|
||||
}
|
||||
if err := restore.Autopilot(&req); err != nil {
|
||||
return err
|
||||
}
|
||||
return nil
|
||||
}
|
|
@ -0,0 +1,326 @@
|
|||
package fsm
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"os"
|
||||
"reflect"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/hashicorp/consul/agent/consul/state"
|
||||
"github.com/hashicorp/consul/agent/structs"
|
||||
"github.com/hashicorp/consul/api"
|
||||
"github.com/hashicorp/consul/lib"
|
||||
"github.com/pascaldekloe/goe/verify"
|
||||
)
|
||||
|
||||
func TestFSM_SnapshotRestore_OSS(t *testing.T) {
|
||||
t.Parallel()
|
||||
fsm, err := New(nil, os.Stderr)
|
||||
if err != nil {
|
||||
t.Fatalf("err: %v", err)
|
||||
}
|
||||
|
||||
// Add some state
|
||||
fsm.state.EnsureNode(1, &structs.Node{Node: "foo", Address: "127.0.0.1"})
|
||||
fsm.state.EnsureNode(2, &structs.Node{Node: "baz", Address: "127.0.0.2", TaggedAddresses: map[string]string{"hello": "1.2.3.4"}})
|
||||
fsm.state.EnsureService(3, "foo", &structs.NodeService{ID: "web", Service: "web", Tags: nil, Address: "127.0.0.1", Port: 80})
|
||||
fsm.state.EnsureService(4, "foo", &structs.NodeService{ID: "db", Service: "db", Tags: []string{"primary"}, Address: "127.0.0.1", Port: 5000})
|
||||
fsm.state.EnsureService(5, "baz", &structs.NodeService{ID: "web", Service: "web", Tags: nil, Address: "127.0.0.2", Port: 80})
|
||||
fsm.state.EnsureService(6, "baz", &structs.NodeService{ID: "db", Service: "db", Tags: []string{"secondary"}, Address: "127.0.0.2", Port: 5000})
|
||||
fsm.state.EnsureCheck(7, &structs.HealthCheck{
|
||||
Node: "foo",
|
||||
CheckID: "web",
|
||||
Name: "web connectivity",
|
||||
Status: api.HealthPassing,
|
||||
ServiceID: "web",
|
||||
})
|
||||
fsm.state.KVSSet(8, &structs.DirEntry{
|
||||
Key: "/test",
|
||||
Value: []byte("foo"),
|
||||
})
|
||||
session := &structs.Session{ID: generateUUID(), Node: "foo"}
|
||||
fsm.state.SessionCreate(9, session)
|
||||
acl := &structs.ACL{ID: generateUUID(), Name: "User Token"}
|
||||
fsm.state.ACLSet(10, acl)
|
||||
if _, err := fsm.state.ACLBootstrapInit(10); err != nil {
|
||||
t.Fatalf("err: %v", err)
|
||||
}
|
||||
|
||||
fsm.state.KVSSet(11, &structs.DirEntry{
|
||||
Key: "/remove",
|
||||
Value: []byte("foo"),
|
||||
})
|
||||
fsm.state.KVSDelete(12, "/remove")
|
||||
idx, _, err := fsm.state.KVSList(nil, "/remove")
|
||||
if err != nil {
|
||||
t.Fatalf("err: %s", err)
|
||||
}
|
||||
if idx != 12 {
|
||||
t.Fatalf("bad index: %d", idx)
|
||||
}
|
||||
|
||||
updates := structs.Coordinates{
|
||||
&structs.Coordinate{
|
||||
Node: "baz",
|
||||
Coord: generateRandomCoordinate(),
|
||||
},
|
||||
&structs.Coordinate{
|
||||
Node: "foo",
|
||||
Coord: generateRandomCoordinate(),
|
||||
},
|
||||
}
|
||||
if err := fsm.state.CoordinateBatchUpdate(13, updates); err != nil {
|
||||
t.Fatalf("err: %s", err)
|
||||
}
|
||||
|
||||
query := structs.PreparedQuery{
|
||||
ID: generateUUID(),
|
||||
Service: structs.ServiceQuery{
|
||||
Service: "web",
|
||||
},
|
||||
RaftIndex: structs.RaftIndex{
|
||||
CreateIndex: 14,
|
||||
ModifyIndex: 14,
|
||||
},
|
||||
}
|
||||
if err := fsm.state.PreparedQuerySet(14, &query); err != nil {
|
||||
t.Fatalf("err: %s", err)
|
||||
}
|
||||
|
||||
autopilotConf := &structs.AutopilotConfig{
|
||||
CleanupDeadServers: true,
|
||||
LastContactThreshold: 100 * time.Millisecond,
|
||||
MaxTrailingLogs: 222,
|
||||
}
|
||||
if err := fsm.state.AutopilotSetConfig(15, autopilotConf); err != nil {
|
||||
t.Fatalf("err: %s", err)
|
||||
}
|
||||
|
||||
// Snapshot
|
||||
snap, err := fsm.Snapshot()
|
||||
if err != nil {
|
||||
t.Fatalf("err: %v", err)
|
||||
}
|
||||
defer snap.Release()
|
||||
|
||||
// Persist
|
||||
buf := bytes.NewBuffer(nil)
|
||||
sink := &MockSink{buf, false}
|
||||
if err := snap.Persist(sink); err != nil {
|
||||
t.Fatalf("err: %v", err)
|
||||
}
|
||||
|
||||
// Try to restore on a new FSM
|
||||
fsm2, err := New(nil, os.Stderr)
|
||||
if err != nil {
|
||||
t.Fatalf("err: %v", err)
|
||||
}
|
||||
|
||||
// Do a restore
|
||||
if err := fsm2.Restore(sink); err != nil {
|
||||
t.Fatalf("err: %v", err)
|
||||
}
|
||||
|
||||
// Verify the contents
|
||||
_, nodes, err := fsm2.state.Nodes(nil)
|
||||
if err != nil {
|
||||
t.Fatalf("err: %s", err)
|
||||
}
|
||||
if len(nodes) != 2 {
|
||||
t.Fatalf("bad: %v", nodes)
|
||||
}
|
||||
if nodes[0].Node != "baz" ||
|
||||
nodes[0].Address != "127.0.0.2" ||
|
||||
len(nodes[0].TaggedAddresses) != 1 ||
|
||||
nodes[0].TaggedAddresses["hello"] != "1.2.3.4" {
|
||||
t.Fatalf("bad: %v", nodes[0])
|
||||
}
|
||||
if nodes[1].Node != "foo" ||
|
||||
nodes[1].Address != "127.0.0.1" ||
|
||||
len(nodes[1].TaggedAddresses) != 0 {
|
||||
t.Fatalf("bad: %v", nodes[1])
|
||||
}
|
||||
|
||||
_, fooSrv, err := fsm2.state.NodeServices(nil, "foo")
|
||||
if err != nil {
|
||||
t.Fatalf("err: %s", err)
|
||||
}
|
||||
if len(fooSrv.Services) != 2 {
|
||||
t.Fatalf("Bad: %v", fooSrv)
|
||||
}
|
||||
if !lib.StrContains(fooSrv.Services["db"].Tags, "primary") {
|
||||
t.Fatalf("Bad: %v", fooSrv)
|
||||
}
|
||||
if fooSrv.Services["db"].Port != 5000 {
|
||||
t.Fatalf("Bad: %v", fooSrv)
|
||||
}
|
||||
|
||||
_, checks, err := fsm2.state.NodeChecks(nil, "foo")
|
||||
if err != nil {
|
||||
t.Fatalf("err: %s", err)
|
||||
}
|
||||
if len(checks) != 1 {
|
||||
t.Fatalf("Bad: %v", checks)
|
||||
}
|
||||
|
||||
// Verify key is set
|
||||
_, d, err := fsm2.state.KVSGet(nil, "/test")
|
||||
if err != nil {
|
||||
t.Fatalf("err: %v", err)
|
||||
}
|
||||
if string(d.Value) != "foo" {
|
||||
t.Fatalf("bad: %v", d)
|
||||
}
|
||||
|
||||
// Verify session is restored
|
||||
idx, s, err := fsm2.state.SessionGet(nil, session.ID)
|
||||
if err != nil {
|
||||
t.Fatalf("err: %v", err)
|
||||
}
|
||||
if s.Node != "foo" {
|
||||
t.Fatalf("bad: %v", s)
|
||||
}
|
||||
if idx <= 1 {
|
||||
t.Fatalf("bad index: %d", idx)
|
||||
}
|
||||
|
||||
// Verify ACL is restored
|
||||
_, a, err := fsm2.state.ACLGet(nil, acl.ID)
|
||||
if err != nil {
|
||||
t.Fatalf("err: %v", err)
|
||||
}
|
||||
if a.Name != "User Token" {
|
||||
t.Fatalf("bad: %v", a)
|
||||
}
|
||||
if a.ModifyIndex <= 1 {
|
||||
t.Fatalf("bad index: %d", idx)
|
||||
}
|
||||
gotB, err := fsm2.state.ACLGetBootstrap()
|
||||
if err != nil {
|
||||
t.Fatalf("err: %v", err)
|
||||
}
|
||||
wantB := &structs.ACLBootstrap{
|
||||
AllowBootstrap: true,
|
||||
RaftIndex: structs.RaftIndex{
|
||||
CreateIndex: 10,
|
||||
ModifyIndex: 10,
|
||||
},
|
||||
}
|
||||
verify.Values(t, "", gotB, wantB)
|
||||
|
||||
// Verify tombstones are restored
|
||||
func() {
|
||||
snap := fsm2.state.Snapshot()
|
||||
defer snap.Close()
|
||||
stones, err := snap.Tombstones()
|
||||
if err != nil {
|
||||
t.Fatalf("err: %s", err)
|
||||
}
|
||||
stone := stones.Next().(*state.Tombstone)
|
||||
if stone == nil {
|
||||
t.Fatalf("missing tombstone")
|
||||
}
|
||||
if stone.Key != "/remove" || stone.Index != 12 {
|
||||
t.Fatalf("bad: %v", stone)
|
||||
}
|
||||
if stones.Next() != nil {
|
||||
t.Fatalf("unexpected extra tombstones")
|
||||
}
|
||||
}()
|
||||
|
||||
// Verify coordinates are restored
|
||||
_, coords, err := fsm2.state.Coordinates(nil)
|
||||
if err != nil {
|
||||
t.Fatalf("err: %s", err)
|
||||
}
|
||||
if !reflect.DeepEqual(coords, updates) {
|
||||
t.Fatalf("bad: %#v", coords)
|
||||
}
|
||||
|
||||
// Verify queries are restored.
|
||||
_, queries, err := fsm2.state.PreparedQueryList(nil)
|
||||
if err != nil {
|
||||
t.Fatalf("err: %s", err)
|
||||
}
|
||||
if len(queries) != 1 {
|
||||
t.Fatalf("bad: %#v", queries)
|
||||
}
|
||||
if !reflect.DeepEqual(queries[0], &query) {
|
||||
t.Fatalf("bad: %#v", queries[0])
|
||||
}
|
||||
|
||||
// Verify autopilot config is restored.
|
||||
_, restoredConf, err := fsm2.state.AutopilotConfig()
|
||||
if err != nil {
|
||||
t.Fatalf("err: %s", err)
|
||||
}
|
||||
if !reflect.DeepEqual(restoredConf, autopilotConf) {
|
||||
t.Fatalf("bad: %#v, %#v", restoredConf, autopilotConf)
|
||||
}
|
||||
|
||||
// Snapshot
|
||||
snap, err = fsm2.Snapshot()
|
||||
if err != nil {
|
||||
t.Fatalf("err: %v", err)
|
||||
}
|
||||
defer snap.Release()
|
||||
|
||||
// Persist
|
||||
buf = bytes.NewBuffer(nil)
|
||||
sink = &MockSink{buf, false}
|
||||
if err := snap.Persist(sink); err != nil {
|
||||
t.Fatalf("err: %v", err)
|
||||
}
|
||||
|
||||
// Try to restore on the old FSM and make sure it abandons the old state
|
||||
// store.
|
||||
abandonCh := fsm.state.AbandonCh()
|
||||
if err := fsm.Restore(sink); err != nil {
|
||||
t.Fatalf("err: %v", err)
|
||||
}
|
||||
select {
|
||||
case <-abandonCh:
|
||||
default:
|
||||
t.Fatalf("bad")
|
||||
}
|
||||
}
|
||||
|
||||
func TestFSM_BadRestore_OSS(t *testing.T) {
|
||||
t.Parallel()
|
||||
// Create an FSM with some state.
|
||||
fsm, err := New(nil, os.Stderr)
|
||||
if err != nil {
|
||||
t.Fatalf("err: %v", err)
|
||||
}
|
||||
fsm.state.EnsureNode(1, &structs.Node{Node: "foo", Address: "127.0.0.1"})
|
||||
abandonCh := fsm.state.AbandonCh()
|
||||
|
||||
// Do a bad restore.
|
||||
buf := bytes.NewBuffer([]byte("bad snapshot"))
|
||||
sink := &MockSink{buf, false}
|
||||
if err := fsm.Restore(sink); err == nil {
|
||||
t.Fatalf("err: %v", err)
|
||||
}
|
||||
|
||||
// Verify the contents didn't get corrupted.
|
||||
_, nodes, err := fsm.state.Nodes(nil)
|
||||
if err != nil {
|
||||
t.Fatalf("err: %s", err)
|
||||
}
|
||||
if len(nodes) != 1 {
|
||||
t.Fatalf("bad: %v", nodes)
|
||||
}
|
||||
if nodes[0].Node != "foo" ||
|
||||
nodes[0].Address != "127.0.0.1" ||
|
||||
len(nodes[0].TaggedAddresses) != 0 {
|
||||
t.Fatalf("bad: %v", nodes[0])
|
||||
}
|
||||
|
||||
// Verify the old state store didn't get abandoned.
|
||||
select {
|
||||
case <-abandonCh:
|
||||
t.Fatalf("bad")
|
||||
default:
|
||||
}
|
||||
}
|
Loading…
Reference in New Issue