diff --git a/agent/consul/fsm/fsm.go b/agent/consul/fsm/fsm.go index ea71abb60..bc72744df 100644 --- a/agent/consul/fsm/fsm.go +++ b/agent/consul/fsm/fsm.go @@ -7,7 +7,6 @@ import ( "sync" "time" - "github.com/armon/go-metrics" "github.com/hashicorp/consul/agent/consul/state" "github.com/hashicorp/consul/agent/structs" "github.com/hashicorp/go-msgpack/codec" @@ -61,20 +60,6 @@ type FSM struct { gc *state.TombstoneGC } -// consulSnapshot is used to provide a snapshot of the current -// state in a way that can be accessed concurrently with operations -// that may modify the live state. -type consulSnapshot struct { - state *state.Snapshot -} - -// snapshotHeader is the first entry in our snapshot -type snapshotHeader struct { - // LastIndex is the last index that affects the data. - // This is used when we do the restore for watchers. - LastIndex uint64 -} - // New is used to construct a new FSM with a blank state. func New(gc *state.TombstoneGC, logOutput io.Writer) (*FSM, error) { stateNew, err := state.NewStateStore(gc) @@ -140,7 +125,7 @@ func (c *FSM) Snapshot() (raft.FSMSnapshot, error) { c.logger.Printf("[INFO] consul.fsm: snapshot created in %v", time.Since(start)) }(time.Now()) - return &consulSnapshot{c.state.Snapshot()}, nil + return &snapshot{c.state.Snapshot()}, nil } // Restore streams in the snapshot and replaces the current state store with a @@ -290,273 +275,3 @@ func (c *FSM) Restore(old io.ReadCloser) error { stateOld.Abandon() return nil } - -func (s *consulSnapshot) Persist(sink raft.SnapshotSink) error { - defer metrics.MeasureSince([]string{"consul", "fsm", "persist"}, time.Now()) - defer metrics.MeasureSince([]string{"fsm", "persist"}, time.Now()) - - // Register the nodes - encoder := codec.NewEncoder(sink, msgpackHandle) - - // Write the header - header := snapshotHeader{ - LastIndex: s.state.LastIndex(), - } - if err := encoder.Encode(&header); err != nil { - sink.Cancel() - return err - } - - if err := s.persistNodes(sink, encoder); err != nil { - sink.Cancel() - return err - } - - if err := s.persistSessions(sink, encoder); err != nil { - sink.Cancel() - return err - } - - if err := s.persistACLs(sink, encoder); err != nil { - sink.Cancel() - return err - } - - if err := s.persistKVs(sink, encoder); err != nil { - sink.Cancel() - return err - } - - if err := s.persistTombstones(sink, encoder); err != nil { - sink.Cancel() - return err - } - - if err := s.persistPreparedQueries(sink, encoder); err != nil { - sink.Cancel() - return err - } - - if err := s.persistAutopilot(sink, encoder); err != nil { - sink.Cancel() - return err - } - - return nil -} - -func (s *consulSnapshot) persistNodes(sink raft.SnapshotSink, - encoder *codec.Encoder) error { - - // Get all the nodes - nodes, err := s.state.Nodes() - if err != nil { - return err - } - - // Register each node - for node := nodes.Next(); node != nil; node = nodes.Next() { - n := node.(*structs.Node) - req := structs.RegisterRequest{ - Node: n.Node, - Address: n.Address, - TaggedAddresses: n.TaggedAddresses, - } - - // Register the node itself - if _, err := sink.Write([]byte{byte(structs.RegisterRequestType)}); err != nil { - return err - } - if err := encoder.Encode(&req); err != nil { - return err - } - - // Register each service this node has - services, err := s.state.Services(n.Node) - if err != nil { - return err - } - for service := services.Next(); service != nil; service = services.Next() { - if _, err := sink.Write([]byte{byte(structs.RegisterRequestType)}); err != nil { - return err - } - req.Service = service.(*structs.ServiceNode).ToNodeService() - if err := encoder.Encode(&req); err != nil { - return err - } - } - - // Register each check this node has - req.Service = nil - checks, err := s.state.Checks(n.Node) - if err != nil { - return err - } - for check := checks.Next(); check != nil; check = checks.Next() { - if _, err := sink.Write([]byte{byte(structs.RegisterRequestType)}); err != nil { - return err - } - req.Check = check.(*structs.HealthCheck) - if err := encoder.Encode(&req); err != nil { - return err - } - } - } - - // Save the coordinates separately since they are not part of the - // register request interface. To avoid copying them out, we turn - // them into batches with a single coordinate each. - coords, err := s.state.Coordinates() - if err != nil { - return err - } - for coord := coords.Next(); coord != nil; coord = coords.Next() { - if _, err := sink.Write([]byte{byte(structs.CoordinateBatchUpdateType)}); err != nil { - return err - } - updates := structs.Coordinates{coord.(*structs.Coordinate)} - if err := encoder.Encode(&updates); err != nil { - return err - } - } - return nil -} - -func (s *consulSnapshot) persistSessions(sink raft.SnapshotSink, - encoder *codec.Encoder) error { - sessions, err := s.state.Sessions() - if err != nil { - return err - } - - for session := sessions.Next(); session != nil; session = sessions.Next() { - if _, err := sink.Write([]byte{byte(structs.SessionRequestType)}); err != nil { - return err - } - if err := encoder.Encode(session.(*structs.Session)); err != nil { - return err - } - } - return nil -} - -func (s *consulSnapshot) persistACLs(sink raft.SnapshotSink, - encoder *codec.Encoder) error { - acls, err := s.state.ACLs() - if err != nil { - return err - } - - for acl := acls.Next(); acl != nil; acl = acls.Next() { - if _, err := sink.Write([]byte{byte(structs.ACLRequestType)}); err != nil { - return err - } - if err := encoder.Encode(acl.(*structs.ACL)); err != nil { - return err - } - } - - bs, err := s.state.ACLBootstrap() - if err != nil { - return err - } - if bs != nil { - if _, err := sink.Write([]byte{byte(structs.ACLBootstrapRequestType)}); err != nil { - return err - } - if err := encoder.Encode(bs); err != nil { - return err - } - } - - return nil -} - -func (s *consulSnapshot) persistKVs(sink raft.SnapshotSink, - encoder *codec.Encoder) error { - entries, err := s.state.KVs() - if err != nil { - return err - } - - for entry := entries.Next(); entry != nil; entry = entries.Next() { - if _, err := sink.Write([]byte{byte(structs.KVSRequestType)}); err != nil { - return err - } - if err := encoder.Encode(entry.(*structs.DirEntry)); err != nil { - return err - } - } - return nil -} - -func (s *consulSnapshot) persistTombstones(sink raft.SnapshotSink, - encoder *codec.Encoder) error { - stones, err := s.state.Tombstones() - if err != nil { - return err - } - - for stone := stones.Next(); stone != nil; stone = stones.Next() { - if _, err := sink.Write([]byte{byte(structs.TombstoneRequestType)}); err != nil { - return err - } - - // For historical reasons, these are serialized in the snapshots - // as KV entries. We want to keep the snapshot format compatible - // with pre-0.6 versions for now. - s := stone.(*state.Tombstone) - fake := &structs.DirEntry{ - Key: s.Key, - RaftIndex: structs.RaftIndex{ - ModifyIndex: s.Index, - }, - } - if err := encoder.Encode(fake); err != nil { - return err - } - } - return nil -} - -func (s *consulSnapshot) persistPreparedQueries(sink raft.SnapshotSink, - encoder *codec.Encoder) error { - queries, err := s.state.PreparedQueries() - if err != nil { - return err - } - - for _, query := range queries { - if _, err := sink.Write([]byte{byte(structs.PreparedQueryRequestType)}); err != nil { - return err - } - if err := encoder.Encode(query); err != nil { - return err - } - } - return nil -} - -func (s *consulSnapshot) persistAutopilot(sink raft.SnapshotSink, - encoder *codec.Encoder) error { - autopilot, err := s.state.Autopilot() - if err != nil { - return err - } - if autopilot == nil { - return nil - } - - if _, err := sink.Write([]byte{byte(structs.AutopilotRequestType)}); err != nil { - return err - } - if err := encoder.Encode(autopilot); err != nil { - return err - } - - return nil -} - -func (s *consulSnapshot) Release() { - s.state.Close() -} diff --git a/agent/consul/fsm/snapshot.go b/agent/consul/fsm/snapshot.go new file mode 100644 index 000000000..3735fea89 --- /dev/null +++ b/agent/consul/fsm/snapshot.go @@ -0,0 +1,295 @@ +package fsm + +import ( + "time" + + "github.com/armon/go-metrics" + "github.com/hashicorp/consul/agent/consul/state" + "github.com/hashicorp/consul/agent/structs" + "github.com/hashicorp/go-msgpack/codec" + "github.com/hashicorp/raft" +) + +// snapshot is used to provide a snapshot of the current +// state in a way that can be accessed concurrently with operations +// that may modify the live state. +type snapshot struct { + state *state.Snapshot +} + +// snapshotHeader is the first entry in our snapshot +type snapshotHeader struct { + // LastIndex is the last index that affects the data. + // This is used when we do the restore for watchers. + LastIndex uint64 +} + +func (s *snapshot) Persist(sink raft.SnapshotSink) error { + defer metrics.MeasureSince([]string{"consul", "fsm", "persist"}, time.Now()) + defer metrics.MeasureSince([]string{"fsm", "persist"}, time.Now()) + + // Register the nodes + encoder := codec.NewEncoder(sink, msgpackHandle) + + // Write the header + header := snapshotHeader{ + LastIndex: s.state.LastIndex(), + } + if err := encoder.Encode(&header); err != nil { + sink.Cancel() + return err + } + + if err := s.persistNodes(sink, encoder); err != nil { + sink.Cancel() + return err + } + + if err := s.persistSessions(sink, encoder); err != nil { + sink.Cancel() + return err + } + + if err := s.persistACLs(sink, encoder); err != nil { + sink.Cancel() + return err + } + + if err := s.persistKVs(sink, encoder); err != nil { + sink.Cancel() + return err + } + + if err := s.persistTombstones(sink, encoder); err != nil { + sink.Cancel() + return err + } + + if err := s.persistPreparedQueries(sink, encoder); err != nil { + sink.Cancel() + return err + } + + if err := s.persistAutopilot(sink, encoder); err != nil { + sink.Cancel() + return err + } + + return nil +} + +func (s *snapshot) persistNodes(sink raft.SnapshotSink, + encoder *codec.Encoder) error { + + // Get all the nodes + nodes, err := s.state.Nodes() + if err != nil { + return err + } + + // Register each node + for node := nodes.Next(); node != nil; node = nodes.Next() { + n := node.(*structs.Node) + req := structs.RegisterRequest{ + Node: n.Node, + Address: n.Address, + TaggedAddresses: n.TaggedAddresses, + } + + // Register the node itself + if _, err := sink.Write([]byte{byte(structs.RegisterRequestType)}); err != nil { + return err + } + if err := encoder.Encode(&req); err != nil { + return err + } + + // Register each service this node has + services, err := s.state.Services(n.Node) + if err != nil { + return err + } + for service := services.Next(); service != nil; service = services.Next() { + if _, err := sink.Write([]byte{byte(structs.RegisterRequestType)}); err != nil { + return err + } + req.Service = service.(*structs.ServiceNode).ToNodeService() + if err := encoder.Encode(&req); err != nil { + return err + } + } + + // Register each check this node has + req.Service = nil + checks, err := s.state.Checks(n.Node) + if err != nil { + return err + } + for check := checks.Next(); check != nil; check = checks.Next() { + if _, err := sink.Write([]byte{byte(structs.RegisterRequestType)}); err != nil { + return err + } + req.Check = check.(*structs.HealthCheck) + if err := encoder.Encode(&req); err != nil { + return err + } + } + } + + // Save the coordinates separately since they are not part of the + // register request interface. To avoid copying them out, we turn + // them into batches with a single coordinate each. + coords, err := s.state.Coordinates() + if err != nil { + return err + } + for coord := coords.Next(); coord != nil; coord = coords.Next() { + if _, err := sink.Write([]byte{byte(structs.CoordinateBatchUpdateType)}); err != nil { + return err + } + updates := structs.Coordinates{coord.(*structs.Coordinate)} + if err := encoder.Encode(&updates); err != nil { + return err + } + } + return nil +} + +func (s *snapshot) persistSessions(sink raft.SnapshotSink, + encoder *codec.Encoder) error { + sessions, err := s.state.Sessions() + if err != nil { + return err + } + + for session := sessions.Next(); session != nil; session = sessions.Next() { + if _, err := sink.Write([]byte{byte(structs.SessionRequestType)}); err != nil { + return err + } + if err := encoder.Encode(session.(*structs.Session)); err != nil { + return err + } + } + return nil +} + +func (s *snapshot) persistACLs(sink raft.SnapshotSink, + encoder *codec.Encoder) error { + acls, err := s.state.ACLs() + if err != nil { + return err + } + + for acl := acls.Next(); acl != nil; acl = acls.Next() { + if _, err := sink.Write([]byte{byte(structs.ACLRequestType)}); err != nil { + return err + } + if err := encoder.Encode(acl.(*structs.ACL)); err != nil { + return err + } + } + + bs, err := s.state.ACLBootstrap() + if err != nil { + return err + } + if bs != nil { + if _, err := sink.Write([]byte{byte(structs.ACLBootstrapRequestType)}); err != nil { + return err + } + if err := encoder.Encode(bs); err != nil { + return err + } + } + + return nil +} + +func (s *snapshot) persistKVs(sink raft.SnapshotSink, + encoder *codec.Encoder) error { + entries, err := s.state.KVs() + if err != nil { + return err + } + + for entry := entries.Next(); entry != nil; entry = entries.Next() { + if _, err := sink.Write([]byte{byte(structs.KVSRequestType)}); err != nil { + return err + } + if err := encoder.Encode(entry.(*structs.DirEntry)); err != nil { + return err + } + } + return nil +} + +func (s *snapshot) persistTombstones(sink raft.SnapshotSink, + encoder *codec.Encoder) error { + stones, err := s.state.Tombstones() + if err != nil { + return err + } + + for stone := stones.Next(); stone != nil; stone = stones.Next() { + if _, err := sink.Write([]byte{byte(structs.TombstoneRequestType)}); err != nil { + return err + } + + // For historical reasons, these are serialized in the snapshots + // as KV entries. We want to keep the snapshot format compatible + // with pre-0.6 versions for now. + s := stone.(*state.Tombstone) + fake := &structs.DirEntry{ + Key: s.Key, + RaftIndex: structs.RaftIndex{ + ModifyIndex: s.Index, + }, + } + if err := encoder.Encode(fake); err != nil { + return err + } + } + return nil +} + +func (s *snapshot) persistPreparedQueries(sink raft.SnapshotSink, + encoder *codec.Encoder) error { + queries, err := s.state.PreparedQueries() + if err != nil { + return err + } + + for _, query := range queries { + if _, err := sink.Write([]byte{byte(structs.PreparedQueryRequestType)}); err != nil { + return err + } + if err := encoder.Encode(query); err != nil { + return err + } + } + return nil +} + +func (s *snapshot) persistAutopilot(sink raft.SnapshotSink, + encoder *codec.Encoder) error { + autopilot, err := s.state.Autopilot() + if err != nil { + return err + } + if autopilot == nil { + return nil + } + + if _, err := sink.Write([]byte{byte(structs.AutopilotRequestType)}); err != nil { + return err + } + if err := encoder.Encode(autopilot); err != nil { + return err + } + + return nil +} + +func (s *snapshot) Release() { + s.state.Close() +}