From c4bc89a187aa61229127c47afaa5fb182acd46d9 Mon Sep 17 00:00:00 2001 From: James Phillips Date: Wed, 29 Nov 2017 17:33:57 -0800 Subject: [PATCH] Creates a registration mechanism for snapshot and restore. --- agent/consul/fsm/fsm.go | 99 +------ agent/consul/fsm/fsm_test.go | 317 ---------------------- agent/consul/fsm/snapshot.go | 284 +++----------------- agent/consul/fsm/snapshot_oss.go | 365 ++++++++++++++++++++++++++ agent/consul/fsm/snapshot_oss_test.go | 326 +++++++++++++++++++++++ 5 files changed, 734 insertions(+), 657 deletions(-) create mode 100644 agent/consul/fsm/snapshot_oss.go create mode 100644 agent/consul/fsm/snapshot_oss_test.go diff --git a/agent/consul/fsm/fsm.go b/agent/consul/fsm/fsm.go index bc72744df..87824b872 100644 --- a/agent/consul/fsm/fsm.go +++ b/agent/consul/fsm/fsm.go @@ -107,7 +107,7 @@ func (c *FSM) Apply(log *raft.Log) interface{} { } // Apply based on the dispatch table, if possible. - if fn, ok := c.apply[msgType]; ok { + if fn := c.apply[msgType]; fn != nil { return fn(buf[1:], log.Index) } @@ -164,102 +164,15 @@ func (c *FSM) Restore(old io.ReadCloser) error { } // Decode - switch structs.MessageType(msgType[0]) { - case structs.RegisterRequestType: - var req structs.RegisterRequest - if err := dec.Decode(&req); err != nil { + msg := structs.MessageType(msgType[0]) + if fn := restorers[msg]; fn != nil { + if err := fn(&header, restore, dec); err != nil { return err } - if err := restore.Registration(header.LastIndex, &req); err != nil { - return err - } - - case structs.KVSRequestType: - var req structs.DirEntry - if err := dec.Decode(&req); err != nil { - return err - } - if err := restore.KVS(&req); err != nil { - return err - } - - case structs.TombstoneRequestType: - var req structs.DirEntry - if err := dec.Decode(&req); err != nil { - return err - } - - // For historical reasons, these are serialized in the - // snapshots as KV entries. We want to keep the snapshot - // format compatible with pre-0.6 versions for now. - stone := &state.Tombstone{ - Key: req.Key, - Index: req.ModifyIndex, - } - if err := restore.Tombstone(stone); err != nil { - return err - } - - case structs.SessionRequestType: - var req structs.Session - if err := dec.Decode(&req); err != nil { - return err - } - if err := restore.Session(&req); err != nil { - return err - } - - case structs.ACLRequestType: - var req structs.ACL - if err := dec.Decode(&req); err != nil { - return err - } - if err := restore.ACL(&req); err != nil { - return err - } - - case structs.ACLBootstrapRequestType: - var req structs.ACLBootstrap - if err := dec.Decode(&req); err != nil { - return err - } - if err := restore.ACLBootstrap(&req); err != nil { - return err - } - - case structs.CoordinateBatchUpdateType: - var req structs.Coordinates - if err := dec.Decode(&req); err != nil { - return err - - } - if err := restore.Coordinates(header.LastIndex, req); err != nil { - return err - } - - case structs.PreparedQueryRequestType: - var req structs.PreparedQuery - if err := dec.Decode(&req); err != nil { - return err - } - if err := restore.PreparedQuery(&req); err != nil { - return err - } - - case structs.AutopilotRequestType: - var req structs.AutopilotConfig - if err := dec.Decode(&req); err != nil { - return err - } - if err := restore.Autopilot(&req); err != nil { - return err - } - - default: - return fmt.Errorf("Unrecognized msg type: %v", msgType) + } else { + return fmt.Errorf("Unrecognized msg type %d", msg) } } - restore.Commit() // External code might be calling State(), so we need to synchronize diff --git a/agent/consul/fsm/fsm_test.go b/agent/consul/fsm/fsm_test.go index 70a91d679..de763abfb 100644 --- a/agent/consul/fsm/fsm_test.go +++ b/agent/consul/fsm/fsm_test.go @@ -3,16 +3,10 @@ package fsm import ( "bytes" "os" - "reflect" "testing" - "time" - "github.com/hashicorp/consul/agent/consul/state" "github.com/hashicorp/consul/agent/structs" - "github.com/hashicorp/consul/api" - "github.com/hashicorp/consul/lib" "github.com/hashicorp/raft" - "github.com/pascaldekloe/goe/verify" ) type MockSink struct { @@ -42,317 +36,6 @@ func makeLog(buf []byte) *raft.Log { } } -func TestFSM_SnapshotRestore(t *testing.T) { - t.Parallel() - fsm, err := New(nil, os.Stderr) - if err != nil { - t.Fatalf("err: %v", err) - } - - // Add some state - fsm.state.EnsureNode(1, &structs.Node{Node: "foo", Address: "127.0.0.1"}) - fsm.state.EnsureNode(2, &structs.Node{Node: "baz", Address: "127.0.0.2", TaggedAddresses: map[string]string{"hello": "1.2.3.4"}}) - fsm.state.EnsureService(3, "foo", &structs.NodeService{ID: "web", Service: "web", Tags: nil, Address: "127.0.0.1", Port: 80}) - fsm.state.EnsureService(4, "foo", &structs.NodeService{ID: "db", Service: "db", Tags: []string{"primary"}, Address: "127.0.0.1", Port: 5000}) - fsm.state.EnsureService(5, "baz", &structs.NodeService{ID: "web", Service: "web", Tags: nil, Address: "127.0.0.2", Port: 80}) - fsm.state.EnsureService(6, "baz", &structs.NodeService{ID: "db", Service: "db", Tags: []string{"secondary"}, Address: "127.0.0.2", Port: 5000}) - fsm.state.EnsureCheck(7, &structs.HealthCheck{ - Node: "foo", - CheckID: "web", - Name: "web connectivity", - Status: api.HealthPassing, - ServiceID: "web", - }) - fsm.state.KVSSet(8, &structs.DirEntry{ - Key: "/test", - Value: []byte("foo"), - }) - session := &structs.Session{ID: generateUUID(), Node: "foo"} - fsm.state.SessionCreate(9, session) - acl := &structs.ACL{ID: generateUUID(), Name: "User Token"} - fsm.state.ACLSet(10, acl) - if _, err := fsm.state.ACLBootstrapInit(10); err != nil { - t.Fatalf("err: %v", err) - } - - fsm.state.KVSSet(11, &structs.DirEntry{ - Key: "/remove", - Value: []byte("foo"), - }) - fsm.state.KVSDelete(12, "/remove") - idx, _, err := fsm.state.KVSList(nil, "/remove") - if err != nil { - t.Fatalf("err: %s", err) - } - if idx != 12 { - t.Fatalf("bad index: %d", idx) - } - - updates := structs.Coordinates{ - &structs.Coordinate{ - Node: "baz", - Coord: generateRandomCoordinate(), - }, - &structs.Coordinate{ - Node: "foo", - Coord: generateRandomCoordinate(), - }, - } - if err := fsm.state.CoordinateBatchUpdate(13, updates); err != nil { - t.Fatalf("err: %s", err) - } - - query := structs.PreparedQuery{ - ID: generateUUID(), - Service: structs.ServiceQuery{ - Service: "web", - }, - RaftIndex: structs.RaftIndex{ - CreateIndex: 14, - ModifyIndex: 14, - }, - } - if err := fsm.state.PreparedQuerySet(14, &query); err != nil { - t.Fatalf("err: %s", err) - } - - autopilotConf := &structs.AutopilotConfig{ - CleanupDeadServers: true, - LastContactThreshold: 100 * time.Millisecond, - MaxTrailingLogs: 222, - } - if err := fsm.state.AutopilotSetConfig(15, autopilotConf); err != nil { - t.Fatalf("err: %s", err) - } - - // Snapshot - snap, err := fsm.Snapshot() - if err != nil { - t.Fatalf("err: %v", err) - } - defer snap.Release() - - // Persist - buf := bytes.NewBuffer(nil) - sink := &MockSink{buf, false} - if err := snap.Persist(sink); err != nil { - t.Fatalf("err: %v", err) - } - - // Try to restore on a new FSM - fsm2, err := New(nil, os.Stderr) - if err != nil { - t.Fatalf("err: %v", err) - } - - // Do a restore - if err := fsm2.Restore(sink); err != nil { - t.Fatalf("err: %v", err) - } - - // Verify the contents - _, nodes, err := fsm2.state.Nodes(nil) - if err != nil { - t.Fatalf("err: %s", err) - } - if len(nodes) != 2 { - t.Fatalf("bad: %v", nodes) - } - if nodes[0].Node != "baz" || - nodes[0].Address != "127.0.0.2" || - len(nodes[0].TaggedAddresses) != 1 || - nodes[0].TaggedAddresses["hello"] != "1.2.3.4" { - t.Fatalf("bad: %v", nodes[0]) - } - if nodes[1].Node != "foo" || - nodes[1].Address != "127.0.0.1" || - len(nodes[1].TaggedAddresses) != 0 { - t.Fatalf("bad: %v", nodes[1]) - } - - _, fooSrv, err := fsm2.state.NodeServices(nil, "foo") - if err != nil { - t.Fatalf("err: %s", err) - } - if len(fooSrv.Services) != 2 { - t.Fatalf("Bad: %v", fooSrv) - } - if !lib.StrContains(fooSrv.Services["db"].Tags, "primary") { - t.Fatalf("Bad: %v", fooSrv) - } - if fooSrv.Services["db"].Port != 5000 { - t.Fatalf("Bad: %v", fooSrv) - } - - _, checks, err := fsm2.state.NodeChecks(nil, "foo") - if err != nil { - t.Fatalf("err: %s", err) - } - if len(checks) != 1 { - t.Fatalf("Bad: %v", checks) - } - - // Verify key is set - _, d, err := fsm2.state.KVSGet(nil, "/test") - if err != nil { - t.Fatalf("err: %v", err) - } - if string(d.Value) != "foo" { - t.Fatalf("bad: %v", d) - } - - // Verify session is restored - idx, s, err := fsm2.state.SessionGet(nil, session.ID) - if err != nil { - t.Fatalf("err: %v", err) - } - if s.Node != "foo" { - t.Fatalf("bad: %v", s) - } - if idx <= 1 { - t.Fatalf("bad index: %d", idx) - } - - // Verify ACL is restored - _, a, err := fsm2.state.ACLGet(nil, acl.ID) - if err != nil { - t.Fatalf("err: %v", err) - } - if a.Name != "User Token" { - t.Fatalf("bad: %v", a) - } - if a.ModifyIndex <= 1 { - t.Fatalf("bad index: %d", idx) - } - gotB, err := fsm2.state.ACLGetBootstrap() - if err != nil { - t.Fatalf("err: %v", err) - } - wantB := &structs.ACLBootstrap{ - AllowBootstrap: true, - RaftIndex: structs.RaftIndex{ - CreateIndex: 10, - ModifyIndex: 10, - }, - } - verify.Values(t, "", gotB, wantB) - - // Verify tombstones are restored - func() { - snap := fsm2.state.Snapshot() - defer snap.Close() - stones, err := snap.Tombstones() - if err != nil { - t.Fatalf("err: %s", err) - } - stone := stones.Next().(*state.Tombstone) - if stone == nil { - t.Fatalf("missing tombstone") - } - if stone.Key != "/remove" || stone.Index != 12 { - t.Fatalf("bad: %v", stone) - } - if stones.Next() != nil { - t.Fatalf("unexpected extra tombstones") - } - }() - - // Verify coordinates are restored - _, coords, err := fsm2.state.Coordinates(nil) - if err != nil { - t.Fatalf("err: %s", err) - } - if !reflect.DeepEqual(coords, updates) { - t.Fatalf("bad: %#v", coords) - } - - // Verify queries are restored. - _, queries, err := fsm2.state.PreparedQueryList(nil) - if err != nil { - t.Fatalf("err: %s", err) - } - if len(queries) != 1 { - t.Fatalf("bad: %#v", queries) - } - if !reflect.DeepEqual(queries[0], &query) { - t.Fatalf("bad: %#v", queries[0]) - } - - // Verify autopilot config is restored. - _, restoredConf, err := fsm2.state.AutopilotConfig() - if err != nil { - t.Fatalf("err: %s", err) - } - if !reflect.DeepEqual(restoredConf, autopilotConf) { - t.Fatalf("bad: %#v, %#v", restoredConf, autopilotConf) - } - - // Snapshot - snap, err = fsm2.Snapshot() - if err != nil { - t.Fatalf("err: %v", err) - } - defer snap.Release() - - // Persist - buf = bytes.NewBuffer(nil) - sink = &MockSink{buf, false} - if err := snap.Persist(sink); err != nil { - t.Fatalf("err: %v", err) - } - - // Try to restore on the old FSM and make sure it abandons the old state - // store. - abandonCh := fsm.state.AbandonCh() - if err := fsm.Restore(sink); err != nil { - t.Fatalf("err: %v", err) - } - select { - case <-abandonCh: - default: - t.Fatalf("bad") - } -} - -func TestFSM_BadRestore(t *testing.T) { - t.Parallel() - // Create an FSM with some state. - fsm, err := New(nil, os.Stderr) - if err != nil { - t.Fatalf("err: %v", err) - } - fsm.state.EnsureNode(1, &structs.Node{Node: "foo", Address: "127.0.0.1"}) - abandonCh := fsm.state.AbandonCh() - - // Do a bad restore. - buf := bytes.NewBuffer([]byte("bad snapshot")) - sink := &MockSink{buf, false} - if err := fsm.Restore(sink); err == nil { - t.Fatalf("err: %v", err) - } - - // Verify the contents didn't get corrupted. - _, nodes, err := fsm.state.Nodes(nil) - if err != nil { - t.Fatalf("err: %s", err) - } - if len(nodes) != 1 { - t.Fatalf("bad: %v", nodes) - } - if nodes[0].Node != "foo" || - nodes[0].Address != "127.0.0.1" || - len(nodes[0].TaggedAddresses) != 0 { - t.Fatalf("bad: %v", nodes[0]) - } - - // Verify the old state store didn't get abandoned. - select { - case <-abandonCh: - t.Fatalf("bad") - default: - } -} - func TestFSM_IgnoreUnknown(t *testing.T) { t.Parallel() fsm, err := New(nil, os.Stderr) diff --git a/agent/consul/fsm/snapshot.go b/agent/consul/fsm/snapshot.go index 3735fea89..3721f0756 100644 --- a/agent/consul/fsm/snapshot.go +++ b/agent/consul/fsm/snapshot.go @@ -1,6 +1,7 @@ package fsm import ( + "fmt" "time" "github.com/armon/go-metrics" @@ -24,272 +25,61 @@ type snapshotHeader struct { LastIndex uint64 } +// persister is a function used to help snapshot the FSM state. +type persister func(s *snapshot, sink raft.SnapshotSink, encoder *codec.Encoder) error + +// persisters is a list of snapshot functions. +var persisters []persister + +// registerPersister adds a new helper. This should be called at package +// init() time. +func registerPersister(fn persister) { + persisters = append(persisters, fn) +} + +// restorer is a function used to load back a snapshot of the FSM state. +type restorer func(header *snapshotHeader, restore *state.Restore, decoder *codec.Decoder) error + +// restorers is a map of restore functions by message type. +var restorers map[structs.MessageType]restorer + +// registerRestorer adds a new helper. This should be called at package +// init() time. +func registerRestorer(msg structs.MessageType, fn restorer) { + if restorers == nil { + restorers = make(map[structs.MessageType]restorer) + } + if restorers[msg] != nil { + panic(fmt.Errorf("Message %d is already registered", msg)) + } + restorers[msg] = fn +} + +// Persist saves the FSM snapshot out to the given sink. func (s *snapshot) Persist(sink raft.SnapshotSink) error { defer metrics.MeasureSince([]string{"consul", "fsm", "persist"}, time.Now()) defer metrics.MeasureSince([]string{"fsm", "persist"}, time.Now()) - // Register the nodes - encoder := codec.NewEncoder(sink, msgpackHandle) - // Write the header header := snapshotHeader{ LastIndex: s.state.LastIndex(), } + encoder := codec.NewEncoder(sink, msgpackHandle) if err := encoder.Encode(&header); err != nil { sink.Cancel() return err } - if err := s.persistNodes(sink, encoder); err != nil { - sink.Cancel() - return err - } - - if err := s.persistSessions(sink, encoder); err != nil { - sink.Cancel() - return err - } - - if err := s.persistACLs(sink, encoder); err != nil { - sink.Cancel() - return err - } - - if err := s.persistKVs(sink, encoder); err != nil { - sink.Cancel() - return err - } - - if err := s.persistTombstones(sink, encoder); err != nil { - sink.Cancel() - return err - } - - if err := s.persistPreparedQueries(sink, encoder); err != nil { - sink.Cancel() - return err - } - - if err := s.persistAutopilot(sink, encoder); err != nil { - sink.Cancel() - return err - } - - return nil -} - -func (s *snapshot) persistNodes(sink raft.SnapshotSink, - encoder *codec.Encoder) error { - - // Get all the nodes - nodes, err := s.state.Nodes() - if err != nil { - return err - } - - // Register each node - for node := nodes.Next(); node != nil; node = nodes.Next() { - n := node.(*structs.Node) - req := structs.RegisterRequest{ - Node: n.Node, - Address: n.Address, - TaggedAddresses: n.TaggedAddresses, - } - - // Register the node itself - if _, err := sink.Write([]byte{byte(structs.RegisterRequestType)}); err != nil { - return err - } - if err := encoder.Encode(&req); err != nil { - return err - } - - // Register each service this node has - services, err := s.state.Services(n.Node) - if err != nil { - return err - } - for service := services.Next(); service != nil; service = services.Next() { - if _, err := sink.Write([]byte{byte(structs.RegisterRequestType)}); err != nil { - return err - } - req.Service = service.(*structs.ServiceNode).ToNodeService() - if err := encoder.Encode(&req); err != nil { - return err - } - } - - // Register each check this node has - req.Service = nil - checks, err := s.state.Checks(n.Node) - if err != nil { - return err - } - for check := checks.Next(); check != nil; check = checks.Next() { - if _, err := sink.Write([]byte{byte(structs.RegisterRequestType)}); err != nil { - return err - } - req.Check = check.(*structs.HealthCheck) - if err := encoder.Encode(&req); err != nil { - return err - } - } - } - - // Save the coordinates separately since they are not part of the - // register request interface. To avoid copying them out, we turn - // them into batches with a single coordinate each. - coords, err := s.state.Coordinates() - if err != nil { - return err - } - for coord := coords.Next(); coord != nil; coord = coords.Next() { - if _, err := sink.Write([]byte{byte(structs.CoordinateBatchUpdateType)}); err != nil { - return err - } - updates := structs.Coordinates{coord.(*structs.Coordinate)} - if err := encoder.Encode(&updates); err != nil { + // Run all the persisters to write the FSM state. + for _, fn := range persisters { + if err := fn(s, sink, encoder); err != nil { + sink.Cancel() return err } } return nil } -func (s *snapshot) persistSessions(sink raft.SnapshotSink, - encoder *codec.Encoder) error { - sessions, err := s.state.Sessions() - if err != nil { - return err - } - - for session := sessions.Next(); session != nil; session = sessions.Next() { - if _, err := sink.Write([]byte{byte(structs.SessionRequestType)}); err != nil { - return err - } - if err := encoder.Encode(session.(*structs.Session)); err != nil { - return err - } - } - return nil -} - -func (s *snapshot) persistACLs(sink raft.SnapshotSink, - encoder *codec.Encoder) error { - acls, err := s.state.ACLs() - if err != nil { - return err - } - - for acl := acls.Next(); acl != nil; acl = acls.Next() { - if _, err := sink.Write([]byte{byte(structs.ACLRequestType)}); err != nil { - return err - } - if err := encoder.Encode(acl.(*structs.ACL)); err != nil { - return err - } - } - - bs, err := s.state.ACLBootstrap() - if err != nil { - return err - } - if bs != nil { - if _, err := sink.Write([]byte{byte(structs.ACLBootstrapRequestType)}); err != nil { - return err - } - if err := encoder.Encode(bs); err != nil { - return err - } - } - - return nil -} - -func (s *snapshot) persistKVs(sink raft.SnapshotSink, - encoder *codec.Encoder) error { - entries, err := s.state.KVs() - if err != nil { - return err - } - - for entry := entries.Next(); entry != nil; entry = entries.Next() { - if _, err := sink.Write([]byte{byte(structs.KVSRequestType)}); err != nil { - return err - } - if err := encoder.Encode(entry.(*structs.DirEntry)); err != nil { - return err - } - } - return nil -} - -func (s *snapshot) persistTombstones(sink raft.SnapshotSink, - encoder *codec.Encoder) error { - stones, err := s.state.Tombstones() - if err != nil { - return err - } - - for stone := stones.Next(); stone != nil; stone = stones.Next() { - if _, err := sink.Write([]byte{byte(structs.TombstoneRequestType)}); err != nil { - return err - } - - // For historical reasons, these are serialized in the snapshots - // as KV entries. We want to keep the snapshot format compatible - // with pre-0.6 versions for now. - s := stone.(*state.Tombstone) - fake := &structs.DirEntry{ - Key: s.Key, - RaftIndex: structs.RaftIndex{ - ModifyIndex: s.Index, - }, - } - if err := encoder.Encode(fake); err != nil { - return err - } - } - return nil -} - -func (s *snapshot) persistPreparedQueries(sink raft.SnapshotSink, - encoder *codec.Encoder) error { - queries, err := s.state.PreparedQueries() - if err != nil { - return err - } - - for _, query := range queries { - if _, err := sink.Write([]byte{byte(structs.PreparedQueryRequestType)}); err != nil { - return err - } - if err := encoder.Encode(query); err != nil { - return err - } - } - return nil -} - -func (s *snapshot) persistAutopilot(sink raft.SnapshotSink, - encoder *codec.Encoder) error { - autopilot, err := s.state.Autopilot() - if err != nil { - return err - } - if autopilot == nil { - return nil - } - - if _, err := sink.Write([]byte{byte(structs.AutopilotRequestType)}); err != nil { - return err - } - if err := encoder.Encode(autopilot); err != nil { - return err - } - - return nil -} - func (s *snapshot) Release() { s.state.Close() } diff --git a/agent/consul/fsm/snapshot_oss.go b/agent/consul/fsm/snapshot_oss.go new file mode 100644 index 000000000..4bf753722 --- /dev/null +++ b/agent/consul/fsm/snapshot_oss.go @@ -0,0 +1,365 @@ +package fsm + +import ( + "github.com/hashicorp/consul/agent/consul/state" + "github.com/hashicorp/consul/agent/structs" + "github.com/hashicorp/go-msgpack/codec" + "github.com/hashicorp/raft" +) + +func init() { + registerPersister(persistOSS) + + registerRestorer(structs.RegisterRequestType, restoreRegistration) + registerRestorer(structs.KVSRequestType, restoreKV) + registerRestorer(structs.TombstoneRequestType, restoreTombstone) + registerRestorer(structs.SessionRequestType, restoreSession) + registerRestorer(structs.ACLRequestType, restoreACL) + registerRestorer(structs.ACLBootstrapRequestType, restoreACLBootstrap) + registerRestorer(structs.CoordinateBatchUpdateType, restoreCoordinates) + registerRestorer(structs.PreparedQueryRequestType, restorePreparedQuery) + registerRestorer(structs.AutopilotRequestType, restoreAutopilot) +} + +func persistOSS(s *snapshot, sink raft.SnapshotSink, encoder *codec.Encoder) error { + if err := s.persistNodes(sink, encoder); err != nil { + return err + } + if err := s.persistSessions(sink, encoder); err != nil { + return err + } + if err := s.persistACLs(sink, encoder); err != nil { + return err + } + if err := s.persistKVs(sink, encoder); err != nil { + return err + } + if err := s.persistTombstones(sink, encoder); err != nil { + return err + } + if err := s.persistPreparedQueries(sink, encoder); err != nil { + return err + } + if err := s.persistAutopilot(sink, encoder); err != nil { + return err + } + return nil +} + +func (s *snapshot) persistNodes(sink raft.SnapshotSink, + encoder *codec.Encoder) error { + + // Get all the nodes + nodes, err := s.state.Nodes() + if err != nil { + return err + } + + // Register each node + for node := nodes.Next(); node != nil; node = nodes.Next() { + n := node.(*structs.Node) + req := structs.RegisterRequest{ + Node: n.Node, + Address: n.Address, + TaggedAddresses: n.TaggedAddresses, + } + + // Register the node itself + if _, err := sink.Write([]byte{byte(structs.RegisterRequestType)}); err != nil { + return err + } + if err := encoder.Encode(&req); err != nil { + return err + } + + // Register each service this node has + services, err := s.state.Services(n.Node) + if err != nil { + return err + } + for service := services.Next(); service != nil; service = services.Next() { + if _, err := sink.Write([]byte{byte(structs.RegisterRequestType)}); err != nil { + return err + } + req.Service = service.(*structs.ServiceNode).ToNodeService() + if err := encoder.Encode(&req); err != nil { + return err + } + } + + // Register each check this node has + req.Service = nil + checks, err := s.state.Checks(n.Node) + if err != nil { + return err + } + for check := checks.Next(); check != nil; check = checks.Next() { + if _, err := sink.Write([]byte{byte(structs.RegisterRequestType)}); err != nil { + return err + } + req.Check = check.(*structs.HealthCheck) + if err := encoder.Encode(&req); err != nil { + return err + } + } + } + + // Save the coordinates separately since they are not part of the + // register request interface. To avoid copying them out, we turn + // them into batches with a single coordinate each. + coords, err := s.state.Coordinates() + if err != nil { + return err + } + for coord := coords.Next(); coord != nil; coord = coords.Next() { + if _, err := sink.Write([]byte{byte(structs.CoordinateBatchUpdateType)}); err != nil { + return err + } + updates := structs.Coordinates{coord.(*structs.Coordinate)} + if err := encoder.Encode(&updates); err != nil { + return err + } + } + return nil +} + +func (s *snapshot) persistSessions(sink raft.SnapshotSink, + encoder *codec.Encoder) error { + sessions, err := s.state.Sessions() + if err != nil { + return err + } + + for session := sessions.Next(); session != nil; session = sessions.Next() { + if _, err := sink.Write([]byte{byte(structs.SessionRequestType)}); err != nil { + return err + } + if err := encoder.Encode(session.(*structs.Session)); err != nil { + return err + } + } + return nil +} + +func (s *snapshot) persistACLs(sink raft.SnapshotSink, + encoder *codec.Encoder) error { + acls, err := s.state.ACLs() + if err != nil { + return err + } + + for acl := acls.Next(); acl != nil; acl = acls.Next() { + if _, err := sink.Write([]byte{byte(structs.ACLRequestType)}); err != nil { + return err + } + if err := encoder.Encode(acl.(*structs.ACL)); err != nil { + return err + } + } + + bs, err := s.state.ACLBootstrap() + if err != nil { + return err + } + if bs != nil { + if _, err := sink.Write([]byte{byte(structs.ACLBootstrapRequestType)}); err != nil { + return err + } + if err := encoder.Encode(bs); err != nil { + return err + } + } + + return nil +} + +func (s *snapshot) persistKVs(sink raft.SnapshotSink, + encoder *codec.Encoder) error { + entries, err := s.state.KVs() + if err != nil { + return err + } + + for entry := entries.Next(); entry != nil; entry = entries.Next() { + if _, err := sink.Write([]byte{byte(structs.KVSRequestType)}); err != nil { + return err + } + if err := encoder.Encode(entry.(*structs.DirEntry)); err != nil { + return err + } + } + return nil +} + +func (s *snapshot) persistTombstones(sink raft.SnapshotSink, + encoder *codec.Encoder) error { + stones, err := s.state.Tombstones() + if err != nil { + return err + } + + for stone := stones.Next(); stone != nil; stone = stones.Next() { + if _, err := sink.Write([]byte{byte(structs.TombstoneRequestType)}); err != nil { + return err + } + + // For historical reasons, these are serialized in the snapshots + // as KV entries. We want to keep the snapshot format compatible + // with pre-0.6 versions for now. + s := stone.(*state.Tombstone) + fake := &structs.DirEntry{ + Key: s.Key, + RaftIndex: structs.RaftIndex{ + ModifyIndex: s.Index, + }, + } + if err := encoder.Encode(fake); err != nil { + return err + } + } + return nil +} + +func (s *snapshot) persistPreparedQueries(sink raft.SnapshotSink, + encoder *codec.Encoder) error { + queries, err := s.state.PreparedQueries() + if err != nil { + return err + } + + for _, query := range queries { + if _, err := sink.Write([]byte{byte(structs.PreparedQueryRequestType)}); err != nil { + return err + } + if err := encoder.Encode(query); err != nil { + return err + } + } + return nil +} + +func (s *snapshot) persistAutopilot(sink raft.SnapshotSink, + encoder *codec.Encoder) error { + autopilot, err := s.state.Autopilot() + if err != nil { + return err + } + if autopilot == nil { + return nil + } + + if _, err := sink.Write([]byte{byte(structs.AutopilotRequestType)}); err != nil { + return err + } + if err := encoder.Encode(autopilot); err != nil { + return err + } + return nil +} + +func restoreRegistration(header *snapshotHeader, restore *state.Restore, decoder *codec.Decoder) error { + var req structs.RegisterRequest + if err := decoder.Decode(&req); err != nil { + return err + } + if err := restore.Registration(header.LastIndex, &req); err != nil { + return err + } + return nil +} + +func restoreKV(header *snapshotHeader, restore *state.Restore, decoder *codec.Decoder) error { + var req structs.DirEntry + if err := decoder.Decode(&req); err != nil { + return err + } + if err := restore.KVS(&req); err != nil { + return err + } + return nil +} + +func restoreTombstone(header *snapshotHeader, restore *state.Restore, decoder *codec.Decoder) error { + var req structs.DirEntry + if err := decoder.Decode(&req); err != nil { + return err + } + + // For historical reasons, these are serialized in the + // snapshots as KV entries. We want to keep the snapshot + // format compatible with pre-0.6 versions for now. + stone := &state.Tombstone{ + Key: req.Key, + Index: req.ModifyIndex, + } + if err := restore.Tombstone(stone); err != nil { + return err + } + return nil +} + +func restoreSession(header *snapshotHeader, restore *state.Restore, decoder *codec.Decoder) error { + var req structs.Session + if err := decoder.Decode(&req); err != nil { + return err + } + if err := restore.Session(&req); err != nil { + return err + } + return nil +} + +func restoreACL(header *snapshotHeader, restore *state.Restore, decoder *codec.Decoder) error { + var req structs.ACL + if err := decoder.Decode(&req); err != nil { + return err + } + if err := restore.ACL(&req); err != nil { + return err + } + return nil +} + +func restoreACLBootstrap(header *snapshotHeader, restore *state.Restore, decoder *codec.Decoder) error { + var req structs.ACLBootstrap + if err := decoder.Decode(&req); err != nil { + return err + } + if err := restore.ACLBootstrap(&req); err != nil { + return err + } + return nil +} + +func restoreCoordinates(header *snapshotHeader, restore *state.Restore, decoder *codec.Decoder) error { + var req structs.Coordinates + if err := decoder.Decode(&req); err != nil { + return err + } + if err := restore.Coordinates(header.LastIndex, req); err != nil { + return err + } + return nil +} + +func restorePreparedQuery(header *snapshotHeader, restore *state.Restore, decoder *codec.Decoder) error { + var req structs.PreparedQuery + if err := decoder.Decode(&req); err != nil { + return err + } + if err := restore.PreparedQuery(&req); err != nil { + return err + } + return nil +} + +func restoreAutopilot(header *snapshotHeader, restore *state.Restore, decoder *codec.Decoder) error { + var req structs.AutopilotConfig + if err := decoder.Decode(&req); err != nil { + return err + } + if err := restore.Autopilot(&req); err != nil { + return err + } + return nil +} diff --git a/agent/consul/fsm/snapshot_oss_test.go b/agent/consul/fsm/snapshot_oss_test.go new file mode 100644 index 000000000..ff2419f69 --- /dev/null +++ b/agent/consul/fsm/snapshot_oss_test.go @@ -0,0 +1,326 @@ +package fsm + +import ( + "bytes" + "os" + "reflect" + "testing" + "time" + + "github.com/hashicorp/consul/agent/consul/state" + "github.com/hashicorp/consul/agent/structs" + "github.com/hashicorp/consul/api" + "github.com/hashicorp/consul/lib" + "github.com/pascaldekloe/goe/verify" +) + +func TestFSM_SnapshotRestore_OSS(t *testing.T) { + t.Parallel() + fsm, err := New(nil, os.Stderr) + if err != nil { + t.Fatalf("err: %v", err) + } + + // Add some state + fsm.state.EnsureNode(1, &structs.Node{Node: "foo", Address: "127.0.0.1"}) + fsm.state.EnsureNode(2, &structs.Node{Node: "baz", Address: "127.0.0.2", TaggedAddresses: map[string]string{"hello": "1.2.3.4"}}) + fsm.state.EnsureService(3, "foo", &structs.NodeService{ID: "web", Service: "web", Tags: nil, Address: "127.0.0.1", Port: 80}) + fsm.state.EnsureService(4, "foo", &structs.NodeService{ID: "db", Service: "db", Tags: []string{"primary"}, Address: "127.0.0.1", Port: 5000}) + fsm.state.EnsureService(5, "baz", &structs.NodeService{ID: "web", Service: "web", Tags: nil, Address: "127.0.0.2", Port: 80}) + fsm.state.EnsureService(6, "baz", &structs.NodeService{ID: "db", Service: "db", Tags: []string{"secondary"}, Address: "127.0.0.2", Port: 5000}) + fsm.state.EnsureCheck(7, &structs.HealthCheck{ + Node: "foo", + CheckID: "web", + Name: "web connectivity", + Status: api.HealthPassing, + ServiceID: "web", + }) + fsm.state.KVSSet(8, &structs.DirEntry{ + Key: "/test", + Value: []byte("foo"), + }) + session := &structs.Session{ID: generateUUID(), Node: "foo"} + fsm.state.SessionCreate(9, session) + acl := &structs.ACL{ID: generateUUID(), Name: "User Token"} + fsm.state.ACLSet(10, acl) + if _, err := fsm.state.ACLBootstrapInit(10); err != nil { + t.Fatalf("err: %v", err) + } + + fsm.state.KVSSet(11, &structs.DirEntry{ + Key: "/remove", + Value: []byte("foo"), + }) + fsm.state.KVSDelete(12, "/remove") + idx, _, err := fsm.state.KVSList(nil, "/remove") + if err != nil { + t.Fatalf("err: %s", err) + } + if idx != 12 { + t.Fatalf("bad index: %d", idx) + } + + updates := structs.Coordinates{ + &structs.Coordinate{ + Node: "baz", + Coord: generateRandomCoordinate(), + }, + &structs.Coordinate{ + Node: "foo", + Coord: generateRandomCoordinate(), + }, + } + if err := fsm.state.CoordinateBatchUpdate(13, updates); err != nil { + t.Fatalf("err: %s", err) + } + + query := structs.PreparedQuery{ + ID: generateUUID(), + Service: structs.ServiceQuery{ + Service: "web", + }, + RaftIndex: structs.RaftIndex{ + CreateIndex: 14, + ModifyIndex: 14, + }, + } + if err := fsm.state.PreparedQuerySet(14, &query); err != nil { + t.Fatalf("err: %s", err) + } + + autopilotConf := &structs.AutopilotConfig{ + CleanupDeadServers: true, + LastContactThreshold: 100 * time.Millisecond, + MaxTrailingLogs: 222, + } + if err := fsm.state.AutopilotSetConfig(15, autopilotConf); err != nil { + t.Fatalf("err: %s", err) + } + + // Snapshot + snap, err := fsm.Snapshot() + if err != nil { + t.Fatalf("err: %v", err) + } + defer snap.Release() + + // Persist + buf := bytes.NewBuffer(nil) + sink := &MockSink{buf, false} + if err := snap.Persist(sink); err != nil { + t.Fatalf("err: %v", err) + } + + // Try to restore on a new FSM + fsm2, err := New(nil, os.Stderr) + if err != nil { + t.Fatalf("err: %v", err) + } + + // Do a restore + if err := fsm2.Restore(sink); err != nil { + t.Fatalf("err: %v", err) + } + + // Verify the contents + _, nodes, err := fsm2.state.Nodes(nil) + if err != nil { + t.Fatalf("err: %s", err) + } + if len(nodes) != 2 { + t.Fatalf("bad: %v", nodes) + } + if nodes[0].Node != "baz" || + nodes[0].Address != "127.0.0.2" || + len(nodes[0].TaggedAddresses) != 1 || + nodes[0].TaggedAddresses["hello"] != "1.2.3.4" { + t.Fatalf("bad: %v", nodes[0]) + } + if nodes[1].Node != "foo" || + nodes[1].Address != "127.0.0.1" || + len(nodes[1].TaggedAddresses) != 0 { + t.Fatalf("bad: %v", nodes[1]) + } + + _, fooSrv, err := fsm2.state.NodeServices(nil, "foo") + if err != nil { + t.Fatalf("err: %s", err) + } + if len(fooSrv.Services) != 2 { + t.Fatalf("Bad: %v", fooSrv) + } + if !lib.StrContains(fooSrv.Services["db"].Tags, "primary") { + t.Fatalf("Bad: %v", fooSrv) + } + if fooSrv.Services["db"].Port != 5000 { + t.Fatalf("Bad: %v", fooSrv) + } + + _, checks, err := fsm2.state.NodeChecks(nil, "foo") + if err != nil { + t.Fatalf("err: %s", err) + } + if len(checks) != 1 { + t.Fatalf("Bad: %v", checks) + } + + // Verify key is set + _, d, err := fsm2.state.KVSGet(nil, "/test") + if err != nil { + t.Fatalf("err: %v", err) + } + if string(d.Value) != "foo" { + t.Fatalf("bad: %v", d) + } + + // Verify session is restored + idx, s, err := fsm2.state.SessionGet(nil, session.ID) + if err != nil { + t.Fatalf("err: %v", err) + } + if s.Node != "foo" { + t.Fatalf("bad: %v", s) + } + if idx <= 1 { + t.Fatalf("bad index: %d", idx) + } + + // Verify ACL is restored + _, a, err := fsm2.state.ACLGet(nil, acl.ID) + if err != nil { + t.Fatalf("err: %v", err) + } + if a.Name != "User Token" { + t.Fatalf("bad: %v", a) + } + if a.ModifyIndex <= 1 { + t.Fatalf("bad index: %d", idx) + } + gotB, err := fsm2.state.ACLGetBootstrap() + if err != nil { + t.Fatalf("err: %v", err) + } + wantB := &structs.ACLBootstrap{ + AllowBootstrap: true, + RaftIndex: structs.RaftIndex{ + CreateIndex: 10, + ModifyIndex: 10, + }, + } + verify.Values(t, "", gotB, wantB) + + // Verify tombstones are restored + func() { + snap := fsm2.state.Snapshot() + defer snap.Close() + stones, err := snap.Tombstones() + if err != nil { + t.Fatalf("err: %s", err) + } + stone := stones.Next().(*state.Tombstone) + if stone == nil { + t.Fatalf("missing tombstone") + } + if stone.Key != "/remove" || stone.Index != 12 { + t.Fatalf("bad: %v", stone) + } + if stones.Next() != nil { + t.Fatalf("unexpected extra tombstones") + } + }() + + // Verify coordinates are restored + _, coords, err := fsm2.state.Coordinates(nil) + if err != nil { + t.Fatalf("err: %s", err) + } + if !reflect.DeepEqual(coords, updates) { + t.Fatalf("bad: %#v", coords) + } + + // Verify queries are restored. + _, queries, err := fsm2.state.PreparedQueryList(nil) + if err != nil { + t.Fatalf("err: %s", err) + } + if len(queries) != 1 { + t.Fatalf("bad: %#v", queries) + } + if !reflect.DeepEqual(queries[0], &query) { + t.Fatalf("bad: %#v", queries[0]) + } + + // Verify autopilot config is restored. + _, restoredConf, err := fsm2.state.AutopilotConfig() + if err != nil { + t.Fatalf("err: %s", err) + } + if !reflect.DeepEqual(restoredConf, autopilotConf) { + t.Fatalf("bad: %#v, %#v", restoredConf, autopilotConf) + } + + // Snapshot + snap, err = fsm2.Snapshot() + if err != nil { + t.Fatalf("err: %v", err) + } + defer snap.Release() + + // Persist + buf = bytes.NewBuffer(nil) + sink = &MockSink{buf, false} + if err := snap.Persist(sink); err != nil { + t.Fatalf("err: %v", err) + } + + // Try to restore on the old FSM and make sure it abandons the old state + // store. + abandonCh := fsm.state.AbandonCh() + if err := fsm.Restore(sink); err != nil { + t.Fatalf("err: %v", err) + } + select { + case <-abandonCh: + default: + t.Fatalf("bad") + } +} + +func TestFSM_BadRestore_OSS(t *testing.T) { + t.Parallel() + // Create an FSM with some state. + fsm, err := New(nil, os.Stderr) + if err != nil { + t.Fatalf("err: %v", err) + } + fsm.state.EnsureNode(1, &structs.Node{Node: "foo", Address: "127.0.0.1"}) + abandonCh := fsm.state.AbandonCh() + + // Do a bad restore. + buf := bytes.NewBuffer([]byte("bad snapshot")) + sink := &MockSink{buf, false} + if err := fsm.Restore(sink); err == nil { + t.Fatalf("err: %v", err) + } + + // Verify the contents didn't get corrupted. + _, nodes, err := fsm.state.Nodes(nil) + if err != nil { + t.Fatalf("err: %s", err) + } + if len(nodes) != 1 { + t.Fatalf("bad: %v", nodes) + } + if nodes[0].Node != "foo" || + nodes[0].Address != "127.0.0.1" || + len(nodes[0].TaggedAddresses) != 0 { + t.Fatalf("bad: %v", nodes[0]) + } + + // Verify the old state store didn't get abandoned. + select { + case <-abandonCh: + t.Fatalf("bad") + default: + } +}