From f07340e94f4b686d682e9b6519b4c04216323ae4 Mon Sep 17 00:00:00 2001 From: Mitchell Hashimoto Date: Tue, 6 Mar 2018 09:31:21 -0800 Subject: [PATCH] agent/consul/fsm,state: snapshot/restore for intentions --- agent/consul/fsm/snapshot_oss.go | 33 ++++++++ agent/consul/fsm/snapshot_oss_test.go | 23 ++++++ agent/consul/state/intention.go | 28 +++++++ agent/consul/state/intention_test.go | 105 ++++++++++++++++++++++++++ 4 files changed, 189 insertions(+) diff --git a/agent/consul/fsm/snapshot_oss.go b/agent/consul/fsm/snapshot_oss.go index be7bfc5a8..1dde3ab0b 100644 --- a/agent/consul/fsm/snapshot_oss.go +++ b/agent/consul/fsm/snapshot_oss.go @@ -20,6 +20,7 @@ func init() { registerRestorer(structs.CoordinateBatchUpdateType, restoreCoordinates) registerRestorer(structs.PreparedQueryRequestType, restorePreparedQuery) registerRestorer(structs.AutopilotRequestType, restoreAutopilot) + registerRestorer(structs.IntentionRequestType, restoreIntention) } func persistOSS(s *snapshot, sink raft.SnapshotSink, encoder *codec.Encoder) error { @@ -44,6 +45,9 @@ func persistOSS(s *snapshot, sink raft.SnapshotSink, encoder *codec.Encoder) err if err := s.persistAutopilot(sink, encoder); err != nil { return err } + if err := s.persistIntentions(sink, encoder); err != nil { + return err + } return nil } @@ -258,6 +262,24 @@ func (s *snapshot) persistAutopilot(sink raft.SnapshotSink, return nil } +func (s *snapshot) persistIntentions(sink raft.SnapshotSink, + encoder *codec.Encoder) error { + ixns, err := s.state.Intentions() + if err != nil { + return err + } + + for _, ixn := range ixns { + if _, err := sink.Write([]byte{byte(structs.IntentionRequestType)}); err != nil { + return err + } + if err := encoder.Encode(ixn); err != nil { + return err + } + } + return nil +} + func restoreRegistration(header *snapshotHeader, restore *state.Restore, decoder *codec.Decoder) error { var req structs.RegisterRequest if err := decoder.Decode(&req); err != nil { @@ -364,3 +386,14 @@ func restoreAutopilot(header *snapshotHeader, restore *state.Restore, decoder *c } return nil } + +func restoreIntention(header *snapshotHeader, restore *state.Restore, decoder *codec.Decoder) error { + var req structs.Intention + if err := decoder.Decode(&req); err != nil { + return err + } + if err := restore.Intention(&req); err != nil { + return err + } + return nil +} diff --git a/agent/consul/fsm/snapshot_oss_test.go b/agent/consul/fsm/snapshot_oss_test.go index 8b8544420..759f825b1 100644 --- a/agent/consul/fsm/snapshot_oss_test.go +++ b/agent/consul/fsm/snapshot_oss_test.go @@ -98,6 +98,17 @@ func TestFSM_SnapshotRestore_OSS(t *testing.T) { t.Fatalf("err: %s", err) } + // Intentions + ixn := structs.TestIntention(t) + ixn.ID = generateUUID() + ixn.RaftIndex = structs.RaftIndex{ + CreateIndex: 14, + ModifyIndex: 14, + } + if err := fsm.state.IntentionSet(14, ixn); err != nil { + t.Fatalf("err: %s", err) + } + // Snapshot snap, err := fsm.Snapshot() if err != nil { @@ -260,6 +271,18 @@ func TestFSM_SnapshotRestore_OSS(t *testing.T) { t.Fatalf("bad: %#v, %#v", restoredConf, autopilotConf) } + // Verify intentions are restored. + _, ixns, err := fsm2.state.Intentions(nil) + if err != nil { + t.Fatalf("err: %s", err) + } + if len(ixns) != 1 { + t.Fatalf("bad: %#v", ixns) + } + if !reflect.DeepEqual(ixns[0], ixn) { + t.Fatalf("bad: %#v", ixns[0]) + } + // Snapshot snap, err = fsm2.Snapshot() if err != nil { diff --git a/agent/consul/state/intention.go b/agent/consul/state/intention.go index 3e83af4d1..bc8bb0213 100644 --- a/agent/consul/state/intention.go +++ b/agent/consul/state/intention.go @@ -68,6 +68,34 @@ func init() { registerSchema(intentionsTableSchema) } +// Intentions is used to pull all the intentions from the snapshot. +func (s *Snapshot) Intentions() (structs.Intentions, error) { + ixns, err := s.tx.Get(intentionsTableName, "id") + if err != nil { + return nil, err + } + + var ret structs.Intentions + for wrapped := ixns.Next(); wrapped != nil; wrapped = ixns.Next() { + ret = append(ret, wrapped.(*structs.Intention)) + } + + return ret, nil +} + +// Intention is used when restoring from a snapshot. +func (s *Restore) Intention(ixn *structs.Intention) error { + // Insert the intention + if err := s.tx.Insert(intentionsTableName, ixn); err != nil { + return fmt.Errorf("failed restoring intention: %s", err) + } + if err := indexUpdateMaxTxn(s.tx, ixn.ModifyIndex, intentionsTableName); err != nil { + return fmt.Errorf("failed updating index: %s", err) + } + + return nil +} + // Intentions returns the list of all intentions. func (s *Store) Intentions(ws memdb.WatchSet) (uint64, structs.Intentions, error) { tx := s.db.Txn(false) diff --git a/agent/consul/state/intention_test.go b/agent/consul/state/intention_test.go index 7f5376509..eb56ff04b 100644 --- a/agent/consul/state/intention_test.go +++ b/agent/consul/state/intention_test.go @@ -455,3 +455,108 @@ func TestStore_IntentionMatch_table(t *testing.T) { }) } } + +func TestStore_Intention_Snapshot_Restore(t *testing.T) { + s := testStateStore(t) + + // Create some intentions. + ixns := structs.Intentions{ + &structs.Intention{ + DestinationName: "foo", + }, + &structs.Intention{ + DestinationName: "bar", + }, + &structs.Intention{ + DestinationName: "baz", + }, + } + + // Force the sort order of the UUIDs before we create them so the + // order is deterministic. + id := testUUID() + ixns[0].ID = "a" + id[1:] + ixns[1].ID = "b" + id[1:] + ixns[2].ID = "c" + id[1:] + + // Now create + for i, ixn := range ixns { + if err := s.IntentionSet(uint64(4+i), ixn); err != nil { + t.Fatalf("err: %s", err) + } + } + + // Snapshot the queries. + snap := s.Snapshot() + defer snap.Close() + + // Alter the real state store. + if err := s.IntentionDelete(7, ixns[0].ID); err != nil { + t.Fatalf("err: %s", err) + } + + // Verify the snapshot. + if idx := snap.LastIndex(); idx != 6 { + t.Fatalf("bad index: %d", idx) + } + expected := structs.Intentions{ + &structs.Intention{ + ID: ixns[0].ID, + DestinationName: "foo", + Meta: map[string]string{}, + RaftIndex: structs.RaftIndex{ + CreateIndex: 4, + ModifyIndex: 4, + }, + }, + &structs.Intention{ + ID: ixns[1].ID, + DestinationName: "bar", + Meta: map[string]string{}, + RaftIndex: structs.RaftIndex{ + CreateIndex: 5, + ModifyIndex: 5, + }, + }, + &structs.Intention{ + ID: ixns[2].ID, + DestinationName: "baz", + Meta: map[string]string{}, + RaftIndex: structs.RaftIndex{ + CreateIndex: 6, + ModifyIndex: 6, + }, + }, + } + dump, err := snap.Intentions() + if err != nil { + t.Fatalf("err: %s", err) + } + if !reflect.DeepEqual(dump, expected) { + t.Fatalf("bad: %#v", dump[0]) + } + + // Restore the values into a new state store. + func() { + s := testStateStore(t) + restore := s.Restore() + for _, ixn := range dump { + if err := restore.Intention(ixn); err != nil { + t.Fatalf("err: %s", err) + } + } + restore.Commit() + + // Read the restored values back out and verify that they match. + idx, actual, err := s.Intentions(nil) + if err != nil { + t.Fatalf("err: %s", err) + } + if idx != 6 { + t.Fatalf("bad index: %d", idx) + } + if !reflect.DeepEqual(actual, expected) { + t.Fatalf("bad: %v", actual) + } + }() +}