From f50d6871f97d4b19c3e0407dcf9511cd653ab5cd Mon Sep 17 00:00:00 2001 From: Frank Schroeder Date: Thu, 20 Apr 2017 17:46:29 -0700 Subject: [PATCH] golint: No stutter --- command/agent/agent_endpoint.go | 4 +- command/agent/agent_endpoint_test.go | 2 +- command/agent/command.go | 4 +- command/agent/config.go | 2 +- consul/acl_endpoint.go | 4 +- consul/catalog_endpoint.go | 8 +-- consul/coordinate_endpoint.go | 2 +- consul/fsm.go | 6 +- consul/health_endpoint.go | 8 +-- consul/internal_endpoint.go | 4 +- consul/kvs_endpoint.go | 6 +- consul/prepared_query_endpoint.go | 4 +- consul/rpc.go | 2 +- consul/rpc_test.go | 6 +- consul/session_endpoint.go | 6 +- consul/state/acl.go | 18 +++--- consul/state/autopilot.go | 12 ++-- consul/state/catalog.go | 84 ++++++++++++++-------------- consul/state/coordinate.go | 10 ++-- consul/state/kvs.go | 54 +++++++++--------- consul/state/prepared_query.go | 18 +++--- consul/state/session.go | 18 +++--- consul/state/state_store.go | 42 +++++++------- consul/state/state_store_test.go | 12 ++-- consul/state/txn.go | 8 +-- watch/funcs.go | 30 +++++----- watch/plan.go | 8 +-- watch/plan_test.go | 6 +- watch/watch.go | 18 +++--- 29 files changed, 203 insertions(+), 203 deletions(-) diff --git a/command/agent/agent_endpoint.go b/command/agent/agent_endpoint.go index bded940c3..63be9cc16 100644 --- a/command/agent/agent_endpoint.go +++ b/command/agent/agent_endpoint.go @@ -16,7 +16,7 @@ import ( "github.com/hashicorp/serf/serf" ) -type AgentSelf struct { +type Self struct { Config *Config Coord *coordinate.Coordinate Member serf.Member @@ -44,7 +44,7 @@ func (s *HTTPServer) AgentSelf(resp http.ResponseWriter, req *http.Request) (int return nil, errPermissionDenied } - return AgentSelf{ + return Self{ Config: s.agent.config, Coord: c, Member: s.agent.LocalMember(), diff --git a/command/agent/agent_endpoint_test.go b/command/agent/agent_endpoint_test.go index c35820933..bb442d983 100644 --- a/command/agent/agent_endpoint_test.go +++ b/command/agent/agent_endpoint_test.go @@ -223,7 +223,7 @@ func TestAgent_Self(t *testing.T) { t.Fatalf("err: %v", err) } - val := obj.(AgentSelf) + val := obj.(Self) if int(val.Member.Port) != srv.agent.config.Ports.SerfLan { t.Fatalf("incorrect port: %v", obj) } diff --git a/command/agent/command.go b/command/agent/command.go index 8aa96e3c5..cbac0af33 100644 --- a/command/agent/command.go +++ b/command/agent/command.go @@ -1090,7 +1090,7 @@ func (c *Command) Run(args []string) int { // Register the watches for _, wp := range config.WatchPlans { - go func(wp *watch.WatchPlan) { + go func(wp *watch.Plan) { wp.Handler = makeWatchHandler(logOutput, wp.Exempt["handler"]) wp.LogOutput = c.logOutput addr := httpAddr.String() @@ -1307,7 +1307,7 @@ func (c *Command) handleReload(config *Config) (*Config, error) { // Register the new watches for _, wp := range newConf.WatchPlans { - go func(wp *watch.WatchPlan) { + go func(wp *watch.Plan) { wp.Handler = makeWatchHandler(c.logOutput, wp.Exempt["handler"]) wp.LogOutput = c.logOutput if err := wp.Run(httpAddr.String()); err != nil { diff --git a/command/agent/config.go b/command/agent/config.go index 67fd118da..e2df8eb1b 100644 --- a/command/agent/config.go +++ b/command/agent/config.go @@ -718,7 +718,7 @@ type Config struct { VersionPrerelease string `mapstructure:"-"` // WatchPlans contains the compiled watches - WatchPlans []*watch.WatchPlan `mapstructure:"-" json:"-"` + WatchPlans []*watch.Plan `mapstructure:"-" json:"-"` // UnixSockets is a map of socket configuration data UnixSockets UnixSocketConfig `mapstructure:"unix_sockets"` diff --git a/consul/acl_endpoint.go b/consul/acl_endpoint.go index 9c04e0881..49042b842 100644 --- a/consul/acl_endpoint.go +++ b/consul/acl_endpoint.go @@ -148,7 +148,7 @@ func (a *ACL) Get(args *structs.ACLSpecificRequest, return a.srv.blockingQuery(&args.QueryOptions, &reply.QueryMeta, - func(ws memdb.WatchSet, state *state.StateStore) error { + func(ws memdb.WatchSet, state *state.Store) error { index, acl, err := state.ACLGet(ws, args.ACL) if err != nil { return err @@ -225,7 +225,7 @@ func (a *ACL) List(args *structs.DCSpecificRequest, return a.srv.blockingQuery(&args.QueryOptions, &reply.QueryMeta, - func(ws memdb.WatchSet, state *state.StateStore) error { + func(ws memdb.WatchSet, state *state.Store) error { index, acls, err := state.ACLList(ws) if err != nil { return err diff --git a/consul/catalog_endpoint.go b/consul/catalog_endpoint.go index abce8a472..b329060ce 100644 --- a/consul/catalog_endpoint.go +++ b/consul/catalog_endpoint.go @@ -169,7 +169,7 @@ func (c *Catalog) ListNodes(args *structs.DCSpecificRequest, reply *structs.Inde return c.srv.blockingQuery( &args.QueryOptions, &reply.QueryMeta, - func(ws memdb.WatchSet, state *state.StateStore) error { + func(ws memdb.WatchSet, state *state.Store) error { var index uint64 var nodes structs.Nodes var err error @@ -199,7 +199,7 @@ func (c *Catalog) ListServices(args *structs.DCSpecificRequest, reply *structs.I return c.srv.blockingQuery( &args.QueryOptions, &reply.QueryMeta, - func(ws memdb.WatchSet, state *state.StateStore) error { + func(ws memdb.WatchSet, state *state.Store) error { var index uint64 var services structs.Services var err error @@ -231,7 +231,7 @@ func (c *Catalog) ServiceNodes(args *structs.ServiceSpecificRequest, reply *stru err := c.srv.blockingQuery( &args.QueryOptions, &reply.QueryMeta, - func(ws memdb.WatchSet, state *state.StateStore) error { + func(ws memdb.WatchSet, state *state.Store) error { var index uint64 var services structs.ServiceNodes var err error @@ -286,7 +286,7 @@ func (c *Catalog) NodeServices(args *structs.NodeSpecificRequest, reply *structs return c.srv.blockingQuery( &args.QueryOptions, &reply.QueryMeta, - func(ws memdb.WatchSet, state *state.StateStore) error { + func(ws memdb.WatchSet, state *state.Store) error { index, services, err := state.NodeServices(ws, args.Node) if err != nil { return err diff --git a/consul/coordinate_endpoint.go b/consul/coordinate_endpoint.go index 33eb23e9d..c297c1088 100644 --- a/consul/coordinate_endpoint.go +++ b/consul/coordinate_endpoint.go @@ -169,7 +169,7 @@ func (c *Coordinate) ListNodes(args *structs.DCSpecificRequest, reply *structs.I return c.srv.blockingQuery(&args.QueryOptions, &reply.QueryMeta, - func(ws memdb.WatchSet, state *state.StateStore) error { + func(ws memdb.WatchSet, state *state.Store) error { index, coords, err := state.Coordinates(ws) if err != nil { return err diff --git a/consul/fsm.go b/consul/fsm.go index 214ab71d8..96cea58ca 100644 --- a/consul/fsm.go +++ b/consul/fsm.go @@ -31,7 +31,7 @@ type consulFSM struct { // new state store). Everything internal here is synchronized by the // Raft side, so doesn't need to lock this. stateLock sync.RWMutex - state *state.StateStore + state *state.Store gc *state.TombstoneGC } @@ -40,7 +40,7 @@ type consulFSM struct { // state in a way that can be accessed concurrently with operations // that may modify the live state. type consulSnapshot struct { - state *state.StateSnapshot + state *state.Snapshot } // snapshotHeader is the first entry in our snapshot @@ -67,7 +67,7 @@ func NewFSM(gc *state.TombstoneGC, logOutput io.Writer) (*consulFSM, error) { } // State is used to return a handle to the current state -func (c *consulFSM) State() *state.StateStore { +func (c *consulFSM) State() *state.Store { c.stateLock.RLock() defer c.stateLock.RUnlock() return c.state diff --git a/consul/health_endpoint.go b/consul/health_endpoint.go index aa225fb83..2562e07f3 100644 --- a/consul/health_endpoint.go +++ b/consul/health_endpoint.go @@ -23,7 +23,7 @@ func (h *Health) ChecksInState(args *structs.ChecksInStateRequest, return h.srv.blockingQuery( &args.QueryOptions, &reply.QueryMeta, - func(ws memdb.WatchSet, state *state.StateStore) error { + func(ws memdb.WatchSet, state *state.Store) error { var index uint64 var checks structs.HealthChecks var err error @@ -53,7 +53,7 @@ func (h *Health) NodeChecks(args *structs.NodeSpecificRequest, return h.srv.blockingQuery( &args.QueryOptions, &reply.QueryMeta, - func(ws memdb.WatchSet, state *state.StateStore) error { + func(ws memdb.WatchSet, state *state.Store) error { index, checks, err := state.NodeChecks(ws, args.Node) if err != nil { return err @@ -79,7 +79,7 @@ func (h *Health) ServiceChecks(args *structs.ServiceSpecificRequest, return h.srv.blockingQuery( &args.QueryOptions, &reply.QueryMeta, - func(ws memdb.WatchSet, state *state.StateStore) error { + func(ws memdb.WatchSet, state *state.Store) error { var index uint64 var checks structs.HealthChecks var err error @@ -113,7 +113,7 @@ func (h *Health) ServiceNodes(args *structs.ServiceSpecificRequest, reply *struc err := h.srv.blockingQuery( &args.QueryOptions, &reply.QueryMeta, - func(ws memdb.WatchSet, state *state.StateStore) error { + func(ws memdb.WatchSet, state *state.Store) error { var index uint64 var nodes structs.CheckServiceNodes var err error diff --git a/consul/internal_endpoint.go b/consul/internal_endpoint.go index e3f44dc55..fe45ccd46 100644 --- a/consul/internal_endpoint.go +++ b/consul/internal_endpoint.go @@ -26,7 +26,7 @@ func (m *Internal) NodeInfo(args *structs.NodeSpecificRequest, return m.srv.blockingQuery( &args.QueryOptions, &reply.QueryMeta, - func(ws memdb.WatchSet, state *state.StateStore) error { + func(ws memdb.WatchSet, state *state.Store) error { index, dump, err := state.NodeInfo(ws, args.Node) if err != nil { return err @@ -47,7 +47,7 @@ func (m *Internal) NodeDump(args *structs.DCSpecificRequest, return m.srv.blockingQuery( &args.QueryOptions, &reply.QueryMeta, - func(ws memdb.WatchSet, state *state.StateStore) error { + func(ws memdb.WatchSet, state *state.Store) error { index, dump, err := state.NodeDump(ws) if err != nil { return err diff --git a/consul/kvs_endpoint.go b/consul/kvs_endpoint.go index 03b378450..e47832e5c 100644 --- a/consul/kvs_endpoint.go +++ b/consul/kvs_endpoint.go @@ -123,7 +123,7 @@ func (k *KVS) Get(args *structs.KeyRequest, reply *structs.IndexedDirEntries) er return k.srv.blockingQuery( &args.QueryOptions, &reply.QueryMeta, - func(ws memdb.WatchSet, state *state.StateStore) error { + func(ws memdb.WatchSet, state *state.Store) error { index, ent, err := state.KVSGet(ws, args.Key) if err != nil { return err @@ -162,7 +162,7 @@ func (k *KVS) List(args *structs.KeyRequest, reply *structs.IndexedDirEntries) e return k.srv.blockingQuery( &args.QueryOptions, &reply.QueryMeta, - func(ws memdb.WatchSet, state *state.StateStore) error { + func(ws memdb.WatchSet, state *state.Store) error { index, ent, err := state.KVSList(ws, args.Key) if err != nil { return err @@ -202,7 +202,7 @@ func (k *KVS) ListKeys(args *structs.KeyListRequest, reply *structs.IndexedKeyLi return k.srv.blockingQuery( &args.QueryOptions, &reply.QueryMeta, - func(ws memdb.WatchSet, state *state.StateStore) error { + func(ws memdb.WatchSet, state *state.Store) error { index, keys, err := state.KVSListKeys(ws, args.Prefix, args.Seperator) if err != nil { return err diff --git a/consul/prepared_query_endpoint.go b/consul/prepared_query_endpoint.go index b24179963..8b329ebe6 100644 --- a/consul/prepared_query_endpoint.go +++ b/consul/prepared_query_endpoint.go @@ -221,7 +221,7 @@ func (p *PreparedQuery) Get(args *structs.PreparedQuerySpecificRequest, return p.srv.blockingQuery( &args.QueryOptions, &reply.QueryMeta, - func(ws memdb.WatchSet, state *state.StateStore) error { + func(ws memdb.WatchSet, state *state.Store) error { index, query, err := state.PreparedQueryGet(ws, args.QueryID) if err != nil { return err @@ -265,7 +265,7 @@ func (p *PreparedQuery) List(args *structs.DCSpecificRequest, reply *structs.Ind return p.srv.blockingQuery( &args.QueryOptions, &reply.QueryMeta, - func(ws memdb.WatchSet, state *state.StateStore) error { + func(ws memdb.WatchSet, state *state.Store) error { index, queries, err := state.PreparedQueryList(ws) if err != nil { return err diff --git a/consul/rpc.go b/consul/rpc.go index 4531f2c46..3b9849685 100644 --- a/consul/rpc.go +++ b/consul/rpc.go @@ -344,7 +344,7 @@ func (s *Server) raftApply(t structs.MessageType, msg interface{}) (interface{}, // store should be used (vs. calling fsm.State()) since the given state store // will be correctly watched for changes if the state store is restored from // a snapshot. -type queryFn func(memdb.WatchSet, *state.StateStore) error +type queryFn func(memdb.WatchSet, *state.Store) error // blockingQuery is used to process a potentially blocking query operation. func (s *Server) blockingQuery(queryOpts *structs.QueryOptions, queryMeta *structs.QueryMeta, diff --git a/consul/rpc_test.go b/consul/rpc_test.go index 85ef47ae2..16a8c1cdb 100644 --- a/consul/rpc_test.go +++ b/consul/rpc_test.go @@ -84,7 +84,7 @@ func TestRPC_blockingQuery(t *testing.T) { var opts structs.QueryOptions var meta structs.QueryMeta var calls int - fn := func(ws memdb.WatchSet, state *state.StateStore) error { + fn := func(ws memdb.WatchSet, state *state.Store) error { calls++ return nil } @@ -103,7 +103,7 @@ func TestRPC_blockingQuery(t *testing.T) { } var meta structs.QueryMeta var calls int - fn := func(ws memdb.WatchSet, state *state.StateStore) error { + fn := func(ws memdb.WatchSet, state *state.Store) error { if calls == 0 { meta.Index = 3 @@ -132,7 +132,7 @@ func TestRPC_blockingQuery(t *testing.T) { } var meta structs.QueryMeta var calls int - fn := func(ws memdb.WatchSet, state *state.StateStore) error { + fn := func(ws memdb.WatchSet, state *state.Store) error { if calls == 0 { meta.Index = 3 diff --git a/consul/session_endpoint.go b/consul/session_endpoint.go index c076c3c84..6d5bc8ddd 100644 --- a/consul/session_endpoint.go +++ b/consul/session_endpoint.go @@ -147,7 +147,7 @@ func (s *Session) Get(args *structs.SessionSpecificRequest, return s.srv.blockingQuery( &args.QueryOptions, &reply.QueryMeta, - func(ws memdb.WatchSet, state *state.StateStore) error { + func(ws memdb.WatchSet, state *state.Store) error { index, session, err := state.SessionGet(ws, args.Session) if err != nil { return err @@ -176,7 +176,7 @@ func (s *Session) List(args *structs.DCSpecificRequest, return s.srv.blockingQuery( &args.QueryOptions, &reply.QueryMeta, - func(ws memdb.WatchSet, state *state.StateStore) error { + func(ws memdb.WatchSet, state *state.Store) error { index, sessions, err := state.SessionList(ws) if err != nil { return err @@ -200,7 +200,7 @@ func (s *Session) NodeSessions(args *structs.NodeSpecificRequest, return s.srv.blockingQuery( &args.QueryOptions, &reply.QueryMeta, - func(ws memdb.WatchSet, state *state.StateStore) error { + func(ws memdb.WatchSet, state *state.Store) error { index, sessions, err := state.NodeSessions(ws, args.Node) if err != nil { return err diff --git a/consul/state/acl.go b/consul/state/acl.go index c99600fe8..d6d9d9ccf 100644 --- a/consul/state/acl.go +++ b/consul/state/acl.go @@ -8,7 +8,7 @@ import ( ) // ACLs is used to pull all the ACLs from the snapshot. -func (s *StateSnapshot) ACLs() (memdb.ResultIterator, error) { +func (s *Snapshot) ACLs() (memdb.ResultIterator, error) { iter, err := s.tx.Get("acls", "id") if err != nil { return nil, err @@ -17,7 +17,7 @@ func (s *StateSnapshot) ACLs() (memdb.ResultIterator, error) { } // ACL is used when restoring from a snapshot. For general inserts, use ACLSet. -func (s *StateRestore) ACL(acl *structs.ACL) error { +func (s *Restore) ACL(acl *structs.ACL) error { if err := s.tx.Insert("acls", acl); err != nil { return fmt.Errorf("failed restoring acl: %s", err) } @@ -30,7 +30,7 @@ func (s *StateRestore) ACL(acl *structs.ACL) error { } // ACLSet is used to insert an ACL rule into the state store. -func (s *StateStore) ACLSet(idx uint64, acl *structs.ACL) error { +func (s *Store) ACLSet(idx uint64, acl *structs.ACL) error { tx := s.db.Txn(true) defer tx.Abort() @@ -45,7 +45,7 @@ func (s *StateStore) ACLSet(idx uint64, acl *structs.ACL) error { // aclSetTxn is the inner method used to insert an ACL rule with the // proper indexes into the state store. -func (s *StateStore) aclSetTxn(tx *memdb.Txn, idx uint64, acl *structs.ACL) error { +func (s *Store) aclSetTxn(tx *memdb.Txn, idx uint64, acl *structs.ACL) error { // Check that the ID is set if acl.ID == "" { return ErrMissingACLID @@ -78,7 +78,7 @@ func (s *StateStore) aclSetTxn(tx *memdb.Txn, idx uint64, acl *structs.ACL) erro } // ACLGet is used to look up an existing ACL by ID. -func (s *StateStore) ACLGet(ws memdb.WatchSet, aclID string) (uint64, *structs.ACL, error) { +func (s *Store) ACLGet(ws memdb.WatchSet, aclID string) (uint64, *structs.ACL, error) { tx := s.db.Txn(false) defer tx.Abort() @@ -99,7 +99,7 @@ func (s *StateStore) ACLGet(ws memdb.WatchSet, aclID string) (uint64, *structs.A } // ACLList is used to list out all of the ACLs in the state store. -func (s *StateStore) ACLList(ws memdb.WatchSet) (uint64, structs.ACLs, error) { +func (s *Store) ACLList(ws memdb.WatchSet) (uint64, structs.ACLs, error) { tx := s.db.Txn(false) defer tx.Abort() @@ -116,7 +116,7 @@ func (s *StateStore) ACLList(ws memdb.WatchSet) (uint64, structs.ACLs, error) { // aclListTxn is used to list out all of the ACLs in the state store. This is a // function vs. a method so it can be called from the snapshotter. -func (s *StateStore) aclListTxn(tx *memdb.Txn, ws memdb.WatchSet) (structs.ACLs, error) { +func (s *Store) aclListTxn(tx *memdb.Txn, ws memdb.WatchSet) (structs.ACLs, error) { // Query all of the ACLs in the state store iter, err := tx.Get("acls", "id") if err != nil { @@ -135,7 +135,7 @@ func (s *StateStore) aclListTxn(tx *memdb.Txn, ws memdb.WatchSet) (structs.ACLs, // ACLDelete is used to remove an existing ACL from the state store. If // the ACL does not exist this is a no-op and no error is returned. -func (s *StateStore) ACLDelete(idx uint64, aclID string) error { +func (s *Store) ACLDelete(idx uint64, aclID string) error { tx := s.db.Txn(true) defer tx.Abort() @@ -150,7 +150,7 @@ func (s *StateStore) ACLDelete(idx uint64, aclID string) error { // aclDeleteTxn is used to delete an ACL from the state store within // an existing transaction. -func (s *StateStore) aclDeleteTxn(tx *memdb.Txn, idx uint64, aclID string) error { +func (s *Store) aclDeleteTxn(tx *memdb.Txn, idx uint64, aclID string) error { // Look up the existing ACL acl, err := tx.First("acls", "id", aclID) if err != nil { diff --git a/consul/state/autopilot.go b/consul/state/autopilot.go index d69e5da9c..fc31eb46d 100644 --- a/consul/state/autopilot.go +++ b/consul/state/autopilot.go @@ -8,7 +8,7 @@ import ( ) // Autopilot is used to pull the autopilot config from the snapshot. -func (s *StateSnapshot) Autopilot() (*structs.AutopilotConfig, error) { +func (s *Snapshot) Autopilot() (*structs.AutopilotConfig, error) { c, err := s.tx.First("autopilot-config", "id") if err != nil { return nil, err @@ -23,7 +23,7 @@ func (s *StateSnapshot) Autopilot() (*structs.AutopilotConfig, error) { } // Autopilot is used when restoring from a snapshot. -func (s *StateRestore) Autopilot(config *structs.AutopilotConfig) error { +func (s *Restore) Autopilot(config *structs.AutopilotConfig) error { if err := s.tx.Insert("autopilot-config", config); err != nil { return fmt.Errorf("failed restoring autopilot config: %s", err) } @@ -32,7 +32,7 @@ func (s *StateRestore) Autopilot(config *structs.AutopilotConfig) error { } // AutopilotConfig is used to get the current Autopilot configuration. -func (s *StateStore) AutopilotConfig() (uint64, *structs.AutopilotConfig, error) { +func (s *Store) AutopilotConfig() (uint64, *structs.AutopilotConfig, error) { tx := s.db.Txn(false) defer tx.Abort() @@ -51,7 +51,7 @@ func (s *StateStore) AutopilotConfig() (uint64, *structs.AutopilotConfig, error) } // AutopilotSetConfig is used to set the current Autopilot configuration. -func (s *StateStore) AutopilotSetConfig(idx uint64, config *structs.AutopilotConfig) error { +func (s *Store) AutopilotSetConfig(idx uint64, config *structs.AutopilotConfig) error { tx := s.db.Txn(true) defer tx.Abort() @@ -64,7 +64,7 @@ func (s *StateStore) AutopilotSetConfig(idx uint64, config *structs.AutopilotCon // AutopilotCASConfig is used to try updating the Autopilot configuration with a // given Raft index. If the CAS index specified is not equal to the last observed index // for the config, then the call is a noop, -func (s *StateStore) AutopilotCASConfig(idx, cidx uint64, config *structs.AutopilotConfig) (bool, error) { +func (s *Store) AutopilotCASConfig(idx, cidx uint64, config *structs.AutopilotConfig) (bool, error) { tx := s.db.Txn(true) defer tx.Abort() @@ -88,7 +88,7 @@ func (s *StateStore) AutopilotCASConfig(idx, cidx uint64, config *structs.Autopi return true, nil } -func (s *StateStore) autopilotSetConfigTxn(idx uint64, tx *memdb.Txn, config *structs.AutopilotConfig) error { +func (s *Store) autopilotSetConfigTxn(idx uint64, tx *memdb.Txn, config *structs.AutopilotConfig) error { // Check for an existing config existing, err := tx.First("autopilot-config", "id") if err != nil { diff --git a/consul/state/catalog.go b/consul/state/catalog.go index f25333429..25c7e4cf4 100644 --- a/consul/state/catalog.go +++ b/consul/state/catalog.go @@ -28,7 +28,7 @@ func resizeNodeLookupKey(s string) string { } // Nodes is used to pull the full list of nodes for use during snapshots. -func (s *StateSnapshot) Nodes() (memdb.ResultIterator, error) { +func (s *Snapshot) Nodes() (memdb.ResultIterator, error) { iter, err := s.tx.Get("nodes", "id") if err != nil { return nil, err @@ -38,7 +38,7 @@ func (s *StateSnapshot) Nodes() (memdb.ResultIterator, error) { // Services is used to pull the full list of services for a given node for use // during snapshots. -func (s *StateSnapshot) Services(node string) (memdb.ResultIterator, error) { +func (s *Snapshot) Services(node string) (memdb.ResultIterator, error) { iter, err := s.tx.Get("services", "node", node) if err != nil { return nil, err @@ -48,7 +48,7 @@ func (s *StateSnapshot) Services(node string) (memdb.ResultIterator, error) { // Checks is used to pull the full list of checks for a given node for use // during snapshots. -func (s *StateSnapshot) Checks(node string) (memdb.ResultIterator, error) { +func (s *Snapshot) Checks(node string) (memdb.ResultIterator, error) { iter, err := s.tx.Get("checks", "node", node) if err != nil { return nil, err @@ -59,7 +59,7 @@ func (s *StateSnapshot) Checks(node string) (memdb.ResultIterator, error) { // Registration is used to make sure a node, service, and check registration is // performed within a single transaction to avoid race conditions on state // updates. -func (s *StateRestore) Registration(idx uint64, req *structs.RegisterRequest) error { +func (s *Restore) Registration(idx uint64, req *structs.RegisterRequest) error { if err := s.store.ensureRegistrationTxn(s.tx, idx, req); err != nil { return err } @@ -69,7 +69,7 @@ func (s *StateRestore) Registration(idx uint64, req *structs.RegisterRequest) er // EnsureRegistration is used to make sure a node, service, and check // registration is performed within a single transaction to avoid race // conditions on state updates. -func (s *StateStore) EnsureRegistration(idx uint64, req *structs.RegisterRequest) error { +func (s *Store) EnsureRegistration(idx uint64, req *structs.RegisterRequest) error { tx := s.db.Txn(true) defer tx.Abort() @@ -84,7 +84,7 @@ func (s *StateStore) EnsureRegistration(idx uint64, req *structs.RegisterRequest // ensureRegistrationTxn is used to make sure a node, service, and check // registration is performed within a single transaction to avoid race // conditions on state updates. -func (s *StateStore) ensureRegistrationTxn(tx *memdb.Txn, idx uint64, req *structs.RegisterRequest) error { +func (s *Store) ensureRegistrationTxn(tx *memdb.Txn, idx uint64, req *structs.RegisterRequest) error { // Create a node structure. node := &structs.Node{ ID: req.ID, @@ -152,7 +152,7 @@ func (s *StateStore) ensureRegistrationTxn(tx *memdb.Txn, idx uint64, req *struc } // EnsureNode is used to upsert node registration or modification. -func (s *StateStore) EnsureNode(idx uint64, node *structs.Node) error { +func (s *Store) EnsureNode(idx uint64, node *structs.Node) error { tx := s.db.Txn(true) defer tx.Abort() @@ -168,7 +168,7 @@ func (s *StateStore) EnsureNode(idx uint64, node *structs.Node) error { // ensureNodeTxn is the inner function called to actually create a node // registration or modify an existing one in the state store. It allows // passing in a memdb transaction so it may be part of a larger txn. -func (s *StateStore) ensureNodeTxn(tx *memdb.Txn, idx uint64, node *structs.Node) error { +func (s *Store) ensureNodeTxn(tx *memdb.Txn, idx uint64, node *structs.Node) error { // See if there's an existing node with this UUID, and make sure the // name is the same. var n *structs.Node @@ -218,7 +218,7 @@ func (s *StateStore) ensureNodeTxn(tx *memdb.Txn, idx uint64, node *structs.Node } // GetNode is used to retrieve a node registration by node name ID. -func (s *StateStore) GetNode(id string) (uint64, *structs.Node, error) { +func (s *Store) GetNode(id string) (uint64, *structs.Node, error) { tx := s.db.Txn(false) defer tx.Abort() @@ -237,7 +237,7 @@ func (s *StateStore) GetNode(id string) (uint64, *structs.Node, error) { } // GetNodeID is used to retrieve a node registration by node ID. -func (s *StateStore) GetNodeID(id types.NodeID) (uint64, *structs.Node, error) { +func (s *Store) GetNodeID(id types.NodeID) (uint64, *structs.Node, error) { tx := s.db.Txn(false) defer tx.Abort() @@ -256,7 +256,7 @@ func (s *StateStore) GetNodeID(id types.NodeID) (uint64, *structs.Node, error) { } // Nodes is used to return all of the known nodes. -func (s *StateStore) Nodes(ws memdb.WatchSet) (uint64, structs.Nodes, error) { +func (s *Store) Nodes(ws memdb.WatchSet) (uint64, structs.Nodes, error) { tx := s.db.Txn(false) defer tx.Abort() @@ -279,7 +279,7 @@ func (s *StateStore) Nodes(ws memdb.WatchSet) (uint64, structs.Nodes, error) { } // NodesByMeta is used to return all nodes with the given metadata key/value pairs. -func (s *StateStore) NodesByMeta(ws memdb.WatchSet, filters map[string]string) (uint64, structs.Nodes, error) { +func (s *Store) NodesByMeta(ws memdb.WatchSet, filters map[string]string) (uint64, structs.Nodes, error) { tx := s.db.Txn(false) defer tx.Abort() @@ -310,7 +310,7 @@ func (s *StateStore) NodesByMeta(ws memdb.WatchSet, filters map[string]string) ( } // DeleteNode is used to delete a given node by its ID. -func (s *StateStore) DeleteNode(idx uint64, nodeName string) error { +func (s *Store) DeleteNode(idx uint64, nodeName string) error { tx := s.db.Txn(true) defer tx.Abort() @@ -325,7 +325,7 @@ func (s *StateStore) DeleteNode(idx uint64, nodeName string) error { // deleteNodeTxn is the inner method used for removing a node from // the store within a given transaction. -func (s *StateStore) deleteNodeTxn(tx *memdb.Txn, idx uint64, nodeName string) error { +func (s *Store) deleteNodeTxn(tx *memdb.Txn, idx uint64, nodeName string) error { // Look up the node. node, err := tx.First("nodes", "id", nodeName) if err != nil { @@ -413,7 +413,7 @@ func (s *StateStore) deleteNodeTxn(tx *memdb.Txn, idx uint64, nodeName string) e } // EnsureService is called to upsert creation of a given NodeService. -func (s *StateStore) EnsureService(idx uint64, node string, svc *structs.NodeService) error { +func (s *Store) EnsureService(idx uint64, node string, svc *structs.NodeService) error { tx := s.db.Txn(true) defer tx.Abort() @@ -428,7 +428,7 @@ func (s *StateStore) EnsureService(idx uint64, node string, svc *structs.NodeSer // ensureServiceTxn is used to upsert a service registration within an // existing memdb transaction. -func (s *StateStore) ensureServiceTxn(tx *memdb.Txn, idx uint64, node string, svc *structs.NodeService) error { +func (s *Store) ensureServiceTxn(tx *memdb.Txn, idx uint64, node string, svc *structs.NodeService) error { // Check for existing service existing, err := tx.First("services", "id", node, svc.ID) if err != nil { @@ -468,7 +468,7 @@ func (s *StateStore) ensureServiceTxn(tx *memdb.Txn, idx uint64, node string, sv } // Services returns all services along with a list of associated tags. -func (s *StateStore) Services(ws memdb.WatchSet) (uint64, structs.Services, error) { +func (s *Store) Services(ws memdb.WatchSet) (uint64, structs.Services, error) { tx := s.db.Txn(false) defer tx.Abort() @@ -509,7 +509,7 @@ func (s *StateStore) Services(ws memdb.WatchSet) (uint64, structs.Services, erro } // ServicesByNodeMeta returns all services, filtered by the given node metadata. -func (s *StateStore) ServicesByNodeMeta(ws memdb.WatchSet, filters map[string]string) (uint64, structs.Services, error) { +func (s *Store) ServicesByNodeMeta(ws memdb.WatchSet, filters map[string]string) (uint64, structs.Services, error) { tx := s.db.Txn(false) defer tx.Abort() @@ -578,7 +578,7 @@ func (s *StateStore) ServicesByNodeMeta(ws memdb.WatchSet, filters map[string]st } // ServiceNodes returns the nodes associated with a given service name. -func (s *StateStore) ServiceNodes(ws memdb.WatchSet, serviceName string) (uint64, structs.ServiceNodes, error) { +func (s *Store) ServiceNodes(ws memdb.WatchSet, serviceName string) (uint64, structs.ServiceNodes, error) { tx := s.db.Txn(false) defer tx.Abort() @@ -607,7 +607,7 @@ func (s *StateStore) ServiceNodes(ws memdb.WatchSet, serviceName string) (uint64 // ServiceTagNodes returns the nodes associated with a given service, filtering // out services that don't contain the given tag. -func (s *StateStore) ServiceTagNodes(ws memdb.WatchSet, service string, tag string) (uint64, structs.ServiceNodes, error) { +func (s *Store) ServiceTagNodes(ws memdb.WatchSet, service string, tag string) (uint64, structs.ServiceNodes, error) { tx := s.db.Txn(false) defer tx.Abort() @@ -656,7 +656,7 @@ func serviceTagFilter(sn *structs.ServiceNode, tag string) bool { // parseServiceNodes iterates over a services query and fills in the node details, // returning a ServiceNodes slice. -func (s *StateStore) parseServiceNodes(tx *memdb.Txn, ws memdb.WatchSet, services structs.ServiceNodes) (structs.ServiceNodes, error) { +func (s *Store) parseServiceNodes(tx *memdb.Txn, ws memdb.WatchSet, services structs.ServiceNodes) (structs.ServiceNodes, error) { // We don't want to track an unlimited number of nodes, so we pull a // top-level watch to use as a fallback. allNodes, err := tx.Get("nodes", "id") @@ -697,7 +697,7 @@ func (s *StateStore) parseServiceNodes(tx *memdb.Txn, ws memdb.WatchSet, service // NodeService is used to retrieve a specific service associated with the given // node. -func (s *StateStore) NodeService(nodeName string, serviceID string) (uint64, *structs.NodeService, error) { +func (s *Store) NodeService(nodeName string, serviceID string) (uint64, *structs.NodeService, error) { tx := s.db.Txn(false) defer tx.Abort() @@ -718,7 +718,7 @@ func (s *StateStore) NodeService(nodeName string, serviceID string) (uint64, *st } // NodeServices is used to query service registrations by node name or UUID. -func (s *StateStore) NodeServices(ws memdb.WatchSet, nodeNameOrID string) (uint64, *structs.NodeServices, error) { +func (s *Store) NodeServices(ws memdb.WatchSet, nodeNameOrID string) (uint64, *structs.NodeServices, error) { tx := s.db.Txn(false) defer tx.Abort() @@ -792,7 +792,7 @@ func (s *StateStore) NodeServices(ws memdb.WatchSet, nodeNameOrID string) (uint6 } // DeleteService is used to delete a given service associated with a node. -func (s *StateStore) DeleteService(idx uint64, nodeName, serviceID string) error { +func (s *Store) DeleteService(idx uint64, nodeName, serviceID string) error { tx := s.db.Txn(true) defer tx.Abort() @@ -807,7 +807,7 @@ func (s *StateStore) DeleteService(idx uint64, nodeName, serviceID string) error // deleteServiceTxn is the inner method called to remove a service // registration within an existing transaction. -func (s *StateStore) deleteServiceTxn(tx *memdb.Txn, idx uint64, nodeName, serviceID string) error { +func (s *Store) deleteServiceTxn(tx *memdb.Txn, idx uint64, nodeName, serviceID string) error { // Look up the service. service, err := tx.First("services", "id", nodeName, serviceID) if err != nil { @@ -852,7 +852,7 @@ func (s *StateStore) deleteServiceTxn(tx *memdb.Txn, idx uint64, nodeName, servi } // EnsureCheck is used to store a check registration in the db. -func (s *StateStore) EnsureCheck(idx uint64, hc *structs.HealthCheck) error { +func (s *Store) EnsureCheck(idx uint64, hc *structs.HealthCheck) error { tx := s.db.Txn(true) defer tx.Abort() @@ -868,7 +868,7 @@ func (s *StateStore) EnsureCheck(idx uint64, hc *structs.HealthCheck) error { // ensureCheckTransaction is used as the inner method to handle inserting // a health check into the state store. It ensures safety against inserting // checks with no matching node or service. -func (s *StateStore) ensureCheckTxn(tx *memdb.Txn, idx uint64, hc *structs.HealthCheck) error { +func (s *Store) ensureCheckTxn(tx *memdb.Txn, idx uint64, hc *structs.HealthCheck) error { // Check if we have an existing health check existing, err := tx.First("checks", "id", hc.Node, string(hc.CheckID)) if err != nil { @@ -947,7 +947,7 @@ func (s *StateStore) ensureCheckTxn(tx *memdb.Txn, idx uint64, hc *structs.Healt // NodeCheck is used to retrieve a specific check associated with the given // node. -func (s *StateStore) NodeCheck(nodeName string, checkID types.CheckID) (uint64, *structs.HealthCheck, error) { +func (s *Store) NodeCheck(nodeName string, checkID types.CheckID) (uint64, *structs.HealthCheck, error) { tx := s.db.Txn(false) defer tx.Abort() @@ -969,7 +969,7 @@ func (s *StateStore) NodeCheck(nodeName string, checkID types.CheckID) (uint64, // NodeChecks is used to retrieve checks associated with the // given node from the state store. -func (s *StateStore) NodeChecks(ws memdb.WatchSet, nodeName string) (uint64, structs.HealthChecks, error) { +func (s *Store) NodeChecks(ws memdb.WatchSet, nodeName string) (uint64, structs.HealthChecks, error) { tx := s.db.Txn(false) defer tx.Abort() @@ -993,7 +993,7 @@ func (s *StateStore) NodeChecks(ws memdb.WatchSet, nodeName string) (uint64, str // ServiceChecks is used to get all checks associated with a // given service ID. The query is performed against a service // _name_ instead of a service ID. -func (s *StateStore) ServiceChecks(ws memdb.WatchSet, serviceName string) (uint64, structs.HealthChecks, error) { +func (s *Store) ServiceChecks(ws memdb.WatchSet, serviceName string) (uint64, structs.HealthChecks, error) { tx := s.db.Txn(false) defer tx.Abort() @@ -1017,7 +1017,7 @@ func (s *StateStore) ServiceChecks(ws memdb.WatchSet, serviceName string) (uint6 // ServiceChecksByNodeMeta is used to get all checks associated with a // given service ID, filtered by the given node metadata values. The query // is performed against a service _name_ instead of a service ID. -func (s *StateStore) ServiceChecksByNodeMeta(ws memdb.WatchSet, serviceName string, +func (s *Store) ServiceChecksByNodeMeta(ws memdb.WatchSet, serviceName string, filters map[string]string) (uint64, structs.HealthChecks, error) { tx := s.db.Txn(false) @@ -1038,7 +1038,7 @@ func (s *StateStore) ServiceChecksByNodeMeta(ws memdb.WatchSet, serviceName stri // ChecksInState is used to query the state store for all checks // which are in the provided state. -func (s *StateStore) ChecksInState(ws memdb.WatchSet, state string) (uint64, structs.HealthChecks, error) { +func (s *Store) ChecksInState(ws memdb.WatchSet, state string) (uint64, structs.HealthChecks, error) { tx := s.db.Txn(false) defer tx.Abort() @@ -1070,7 +1070,7 @@ func (s *StateStore) ChecksInState(ws memdb.WatchSet, state string) (uint64, str // ChecksInStateByNodeMeta is used to query the state store for all checks // which are in the provided state, filtered by the given node metadata values. -func (s *StateStore) ChecksInStateByNodeMeta(ws memdb.WatchSet, state string, filters map[string]string) (uint64, structs.HealthChecks, error) { +func (s *Store) ChecksInStateByNodeMeta(ws memdb.WatchSet, state string, filters map[string]string) (uint64, structs.HealthChecks, error) { tx := s.db.Txn(false) defer tx.Abort() @@ -1098,7 +1098,7 @@ func (s *StateStore) ChecksInStateByNodeMeta(ws memdb.WatchSet, state string, fi // parseChecksByNodeMeta is a helper function used to deduplicate some // repetitive code for returning health checks filtered by node metadata fields. -func (s *StateStore) parseChecksByNodeMeta(tx *memdb.Txn, ws memdb.WatchSet, +func (s *Store) parseChecksByNodeMeta(tx *memdb.Txn, ws memdb.WatchSet, idx uint64, iter memdb.ResultIterator, filters map[string]string) (uint64, structs.HealthChecks, error) { // We don't want to track an unlimited number of nodes, so we pull a @@ -1132,7 +1132,7 @@ func (s *StateStore) parseChecksByNodeMeta(tx *memdb.Txn, ws memdb.WatchSet, } // DeleteCheck is used to delete a health check registration. -func (s *StateStore) DeleteCheck(idx uint64, node string, checkID types.CheckID) error { +func (s *Store) DeleteCheck(idx uint64, node string, checkID types.CheckID) error { tx := s.db.Txn(true) defer tx.Abort() @@ -1147,7 +1147,7 @@ func (s *StateStore) DeleteCheck(idx uint64, node string, checkID types.CheckID) // deleteCheckTxn is the inner method used to call a health // check deletion within an existing transaction. -func (s *StateStore) deleteCheckTxn(tx *memdb.Txn, idx uint64, node string, checkID types.CheckID) error { +func (s *Store) deleteCheckTxn(tx *memdb.Txn, idx uint64, node string, checkID types.CheckID) error { // Try to retrieve the existing health check. hc, err := tx.First("checks", "id", node, string(checkID)) if err != nil { @@ -1186,7 +1186,7 @@ func (s *StateStore) deleteCheckTxn(tx *memdb.Txn, idx uint64, node string, chec } // CheckServiceNodes is used to query all nodes and checks for a given service. -func (s *StateStore) CheckServiceNodes(ws memdb.WatchSet, serviceName string) (uint64, structs.CheckServiceNodes, error) { +func (s *Store) CheckServiceNodes(ws memdb.WatchSet, serviceName string) (uint64, structs.CheckServiceNodes, error) { tx := s.db.Txn(false) defer tx.Abort() @@ -1210,7 +1210,7 @@ func (s *StateStore) CheckServiceNodes(ws memdb.WatchSet, serviceName string) (u // CheckServiceTagNodes is used to query all nodes and checks for a given // service, filtering out services that don't contain the given tag. -func (s *StateStore) CheckServiceTagNodes(ws memdb.WatchSet, serviceName, tag string) (uint64, structs.CheckServiceNodes, error) { +func (s *Store) CheckServiceTagNodes(ws memdb.WatchSet, serviceName, tag string) (uint64, structs.CheckServiceNodes, error) { tx := s.db.Txn(false) defer tx.Abort() @@ -1238,7 +1238,7 @@ func (s *StateStore) CheckServiceTagNodes(ws memdb.WatchSet, serviceName, tag st // parseCheckServiceNodes is used to parse through a given set of services, // and query for an associated node and a set of checks. This is the inner // method used to return a rich set of results from a more simple query. -func (s *StateStore) parseCheckServiceNodes( +func (s *Store) parseCheckServiceNodes( tx *memdb.Txn, ws memdb.WatchSet, idx uint64, serviceName string, services structs.ServiceNodes, err error) (uint64, structs.CheckServiceNodes, error) { @@ -1318,7 +1318,7 @@ func (s *StateStore) parseCheckServiceNodes( // NodeInfo is used to generate a dump of a single node. The dump includes // all services and checks which are registered against the node. -func (s *StateStore) NodeInfo(ws memdb.WatchSet, node string) (uint64, structs.NodeDump, error) { +func (s *Store) NodeInfo(ws memdb.WatchSet, node string) (uint64, structs.NodeDump, error) { tx := s.db.Txn(false) defer tx.Abort() @@ -1337,7 +1337,7 @@ func (s *StateStore) NodeInfo(ws memdb.WatchSet, node string) (uint64, structs.N // NodeDump is used to generate a dump of all nodes. This call is expensive // as it has to query every node, service, and check. The response can also // be quite large since there is currently no filtering applied. -func (s *StateStore) NodeDump(ws memdb.WatchSet) (uint64, structs.NodeDump, error) { +func (s *Store) NodeDump(ws memdb.WatchSet) (uint64, structs.NodeDump, error) { tx := s.db.Txn(false) defer tx.Abort() @@ -1356,7 +1356,7 @@ func (s *StateStore) NodeDump(ws memdb.WatchSet) (uint64, structs.NodeDump, erro // parseNodes takes an iterator over a set of nodes and returns a struct // containing the nodes along with all of their associated services // and/or health checks. -func (s *StateStore) parseNodes(tx *memdb.Txn, ws memdb.WatchSet, idx uint64, +func (s *Store) parseNodes(tx *memdb.Txn, ws memdb.WatchSet, idx uint64, iter memdb.ResultIterator) (uint64, structs.NodeDump, error) { // We don't want to track an unlimited number of services, so we pull a diff --git a/consul/state/coordinate.go b/consul/state/coordinate.go index 6cfba415e..2829053f2 100644 --- a/consul/state/coordinate.go +++ b/consul/state/coordinate.go @@ -9,7 +9,7 @@ import ( ) // Coordinates is used to pull all the coordinates from the snapshot. -func (s *StateSnapshot) Coordinates() (memdb.ResultIterator, error) { +func (s *Snapshot) Coordinates() (memdb.ResultIterator, error) { iter, err := s.tx.Get("coordinates", "id") if err != nil { return nil, err @@ -20,7 +20,7 @@ func (s *StateSnapshot) Coordinates() (memdb.ResultIterator, error) { // Coordinates is used when restoring from a snapshot. For general inserts, use // CoordinateBatchUpdate. We do less vetting of the updates here because they // already got checked on the way in during a batch update. -func (s *StateRestore) Coordinates(idx uint64, updates structs.Coordinates) error { +func (s *Restore) Coordinates(idx uint64, updates structs.Coordinates) error { for _, update := range updates { if err := s.tx.Insert("coordinates", update); err != nil { return fmt.Errorf("failed restoring coordinate: %s", err) @@ -39,7 +39,7 @@ func (s *StateRestore) Coordinates(idx uint64, updates structs.Coordinates) erro // nil, none of the Raft or node information is returned. This hits the 90% // internal-to-Consul use case for this data, and this isn't exposed via an // endpoint, so it doesn't matter that the Raft info isn't available. -func (s *StateStore) CoordinateGetRaw(node string) (*coordinate.Coordinate, error) { +func (s *Store) CoordinateGetRaw(node string) (*coordinate.Coordinate, error) { tx := s.db.Txn(false) defer tx.Abort() @@ -57,7 +57,7 @@ func (s *StateStore) CoordinateGetRaw(node string) (*coordinate.Coordinate, erro } // Coordinates queries for all nodes with coordinates. -func (s *StateStore) Coordinates(ws memdb.WatchSet) (uint64, structs.Coordinates, error) { +func (s *Store) Coordinates(ws memdb.WatchSet) (uint64, structs.Coordinates, error) { tx := s.db.Txn(false) defer tx.Abort() @@ -80,7 +80,7 @@ func (s *StateStore) Coordinates(ws memdb.WatchSet) (uint64, structs.Coordinates // CoordinateBatchUpdate processes a batch of coordinate updates and applies // them in a single transaction. -func (s *StateStore) CoordinateBatchUpdate(idx uint64, updates structs.Coordinates) error { +func (s *Store) CoordinateBatchUpdate(idx uint64, updates structs.Coordinates) error { tx := s.db.Txn(true) defer tx.Abort() diff --git a/consul/state/kvs.go b/consul/state/kvs.go index c111380a9..2ae20d445 100644 --- a/consul/state/kvs.go +++ b/consul/state/kvs.go @@ -10,7 +10,7 @@ import ( ) // KVs is used to pull the full list of KVS entries for use during snapshots. -func (s *StateSnapshot) KVs() (memdb.ResultIterator, error) { +func (s *Snapshot) KVs() (memdb.ResultIterator, error) { iter, err := s.tx.Get("kvs", "id_prefix") if err != nil { return nil, err @@ -19,12 +19,12 @@ func (s *StateSnapshot) KVs() (memdb.ResultIterator, error) { } // Tombstones is used to pull all the tombstones from the graveyard. -func (s *StateSnapshot) Tombstones() (memdb.ResultIterator, error) { +func (s *Snapshot) Tombstones() (memdb.ResultIterator, error) { return s.store.kvsGraveyard.DumpTxn(s.tx) } // KVS is used when restoring from a snapshot. Use KVSSet for general inserts. -func (s *StateRestore) KVS(entry *structs.DirEntry) error { +func (s *Restore) KVS(entry *structs.DirEntry) error { if err := s.tx.Insert("kvs", entry); err != nil { return fmt.Errorf("failed inserting kvs entry: %s", err) } @@ -37,7 +37,7 @@ func (s *StateRestore) KVS(entry *structs.DirEntry) error { // Tombstone is used when restoring from a snapshot. For general inserts, use // Graveyard.InsertTxn. -func (s *StateRestore) Tombstone(stone *Tombstone) error { +func (s *Restore) Tombstone(stone *Tombstone) error { if err := s.store.kvsGraveyard.RestoreTxn(s.tx, stone); err != nil { return fmt.Errorf("failed restoring tombstone: %s", err) } @@ -47,7 +47,7 @@ func (s *StateRestore) Tombstone(stone *Tombstone) error { // ReapTombstones is used to delete all the tombstones with an index // less than or equal to the given index. This is used to prevent // unbounded storage growth of the tombstones. -func (s *StateStore) ReapTombstones(index uint64) error { +func (s *Store) ReapTombstones(index uint64) error { tx := s.db.Txn(true) defer tx.Abort() @@ -60,7 +60,7 @@ func (s *StateStore) ReapTombstones(index uint64) error { } // KVSSet is used to store a key/value pair. -func (s *StateStore) KVSSet(idx uint64, entry *structs.DirEntry) error { +func (s *Store) KVSSet(idx uint64, entry *structs.DirEntry) error { tx := s.db.Txn(true) defer tx.Abort() @@ -78,7 +78,7 @@ func (s *StateStore) KVSSet(idx uint64, entry *structs.DirEntry) error { // If updateSession is true, then the incoming entry will set the new // session (should be validated before calling this). Otherwise, we will keep // whatever the existing session is. -func (s *StateStore) kvsSetTxn(tx *memdb.Txn, idx uint64, entry *structs.DirEntry, updateSession bool) error { +func (s *Store) kvsSetTxn(tx *memdb.Txn, idx uint64, entry *structs.DirEntry, updateSession bool) error { // Retrieve an existing KV pair existing, err := tx.First("kvs", "id", entry.Key) if err != nil { @@ -115,7 +115,7 @@ func (s *StateStore) kvsSetTxn(tx *memdb.Txn, idx uint64, entry *structs.DirEntr } // KVSGet is used to retrieve a key/value pair from the state store. -func (s *StateStore) KVSGet(ws memdb.WatchSet, key string) (uint64, *structs.DirEntry, error) { +func (s *Store) KVSGet(ws memdb.WatchSet, key string) (uint64, *structs.DirEntry, error) { tx := s.db.Txn(false) defer tx.Abort() @@ -124,7 +124,7 @@ func (s *StateStore) KVSGet(ws memdb.WatchSet, key string) (uint64, *structs.Dir // kvsGetTxn is the inner method that gets a KVS entry inside an existing // transaction. -func (s *StateStore) kvsGetTxn(tx *memdb.Txn, ws memdb.WatchSet, key string) (uint64, *structs.DirEntry, error) { +func (s *Store) kvsGetTxn(tx *memdb.Txn, ws memdb.WatchSet, key string) (uint64, *structs.DirEntry, error) { // Get the table index. idx := maxIndexTxn(tx, "kvs", "tombstones") @@ -144,7 +144,7 @@ func (s *StateStore) kvsGetTxn(tx *memdb.Txn, ws memdb.WatchSet, key string) (ui // prefix is left empty, all keys in the KVS will be returned. The returned // is the max index of the returned kvs entries or applicable tombstones, or // else it's the full table indexes for kvs and tombstones. -func (s *StateStore) KVSList(ws memdb.WatchSet, prefix string) (uint64, structs.DirEntries, error) { +func (s *Store) KVSList(ws memdb.WatchSet, prefix string) (uint64, structs.DirEntries, error) { tx := s.db.Txn(false) defer tx.Abort() @@ -153,7 +153,7 @@ func (s *StateStore) KVSList(ws memdb.WatchSet, prefix string) (uint64, structs. // kvsListTxn is the inner method that gets a list of KVS entries matching a // prefix. -func (s *StateStore) kvsListTxn(tx *memdb.Txn, ws memdb.WatchSet, prefix string) (uint64, structs.DirEntries, error) { +func (s *Store) kvsListTxn(tx *memdb.Txn, ws memdb.WatchSet, prefix string) (uint64, structs.DirEntries, error) { // Get the table indexes. idx := maxIndexTxn(tx, "kvs", "tombstones") @@ -201,7 +201,7 @@ func (s *StateStore) kvsListTxn(tx *memdb.Txn, ws memdb.WatchSet, prefix string) // An optional separator may be specified, which can be used to slice off a part // of the response so that only a subset of the prefix is returned. In this // mode, the keys which are omitted are still counted in the returned index. -func (s *StateStore) KVSListKeys(ws memdb.WatchSet, prefix, sep string) (uint64, []string, error) { +func (s *Store) KVSListKeys(ws memdb.WatchSet, prefix, sep string) (uint64, []string, error) { tx := s.db.Txn(false) defer tx.Abort() @@ -274,7 +274,7 @@ func (s *StateStore) KVSListKeys(ws memdb.WatchSet, prefix, sep string) (uint64, // KVSDelete is used to perform a shallow delete on a single key in the // the state store. -func (s *StateStore) KVSDelete(idx uint64, key string) error { +func (s *Store) KVSDelete(idx uint64, key string) error { tx := s.db.Txn(true) defer tx.Abort() @@ -289,7 +289,7 @@ func (s *StateStore) KVSDelete(idx uint64, key string) error { // kvsDeleteTxn is the inner method used to perform the actual deletion // of a key/value pair within an existing transaction. -func (s *StateStore) kvsDeleteTxn(tx *memdb.Txn, idx uint64, key string) error { +func (s *Store) kvsDeleteTxn(tx *memdb.Txn, idx uint64, key string) error { // Look up the entry in the state store. entry, err := tx.First("kvs", "id", key) if err != nil { @@ -319,7 +319,7 @@ func (s *StateStore) kvsDeleteTxn(tx *memdb.Txn, idx uint64, key string) error { // raft index. If the CAS index specified is not equal to the last // observed index for the given key, then the call is a noop, otherwise // a normal KV delete is invoked. -func (s *StateStore) KVSDeleteCAS(idx, cidx uint64, key string) (bool, error) { +func (s *Store) KVSDeleteCAS(idx, cidx uint64, key string) (bool, error) { tx := s.db.Txn(true) defer tx.Abort() @@ -334,7 +334,7 @@ func (s *StateStore) KVSDeleteCAS(idx, cidx uint64, key string) (bool, error) { // kvsDeleteCASTxn is the inner method that does a CAS delete within an existing // transaction. -func (s *StateStore) kvsDeleteCASTxn(tx *memdb.Txn, idx, cidx uint64, key string) (bool, error) { +func (s *Store) kvsDeleteCASTxn(tx *memdb.Txn, idx, cidx uint64, key string) (bool, error) { // Retrieve the existing kvs entry, if any exists. entry, err := tx.First("kvs", "id", key) if err != nil { @@ -360,7 +360,7 @@ func (s *StateStore) kvsDeleteCASTxn(tx *memdb.Txn, idx, cidx uint64, key string // ModifyIndex in the provided entry is used to determine if we should // write the entry to the state store or bail. Returns a bool indicating // if a write happened and any error. -func (s *StateStore) KVSSetCAS(idx uint64, entry *structs.DirEntry) (bool, error) { +func (s *Store) KVSSetCAS(idx uint64, entry *structs.DirEntry) (bool, error) { tx := s.db.Txn(true) defer tx.Abort() @@ -375,7 +375,7 @@ func (s *StateStore) KVSSetCAS(idx uint64, entry *structs.DirEntry) (bool, error // kvsSetCASTxn is the inner method used to do a CAS inside an existing // transaction. -func (s *StateStore) kvsSetCASTxn(tx *memdb.Txn, idx uint64, entry *structs.DirEntry) (bool, error) { +func (s *Store) kvsSetCASTxn(tx *memdb.Txn, idx uint64, entry *structs.DirEntry) (bool, error) { // Retrieve the existing entry. existing, err := tx.First("kvs", "id", entry.Key) if err != nil { @@ -405,7 +405,7 @@ func (s *StateStore) kvsSetCASTxn(tx *memdb.Txn, idx uint64, entry *structs.DirE // KVSDeleteTree is used to do a recursive delete on a key prefix // in the state store. If any keys are modified, the last index is // set, otherwise this is a no-op. -func (s *StateStore) KVSDeleteTree(idx uint64, prefix string) error { +func (s *Store) KVSDeleteTree(idx uint64, prefix string) error { tx := s.db.Txn(true) defer tx.Abort() @@ -419,7 +419,7 @@ func (s *StateStore) KVSDeleteTree(idx uint64, prefix string) error { // kvsDeleteTreeTxn is the inner method that does a recursive delete inside an // existing transaction. -func (s *StateStore) kvsDeleteTreeTxn(tx *memdb.Txn, idx uint64, prefix string) error { +func (s *Store) kvsDeleteTreeTxn(tx *memdb.Txn, idx uint64, prefix string) error { // Get an iterator over all of the keys with the given prefix. entries, err := tx.Get("kvs", "id_prefix", prefix) if err != nil { @@ -459,13 +459,13 @@ func (s *StateStore) kvsDeleteTreeTxn(tx *memdb.Txn, idx uint64, prefix string) // KVSLockDelay returns the expiration time for any lock delay associated with // the given key. -func (s *StateStore) KVSLockDelay(key string) time.Time { +func (s *Store) KVSLockDelay(key string) time.Time { return s.lockDelay.GetExpiration(key) } // KVSLock is similar to KVSSet but only performs the set if the lock can be // acquired. -func (s *StateStore) KVSLock(idx uint64, entry *structs.DirEntry) (bool, error) { +func (s *Store) KVSLock(idx uint64, entry *structs.DirEntry) (bool, error) { tx := s.db.Txn(true) defer tx.Abort() @@ -480,7 +480,7 @@ func (s *StateStore) KVSLock(idx uint64, entry *structs.DirEntry) (bool, error) // kvsLockTxn is the inner method that does a lock inside an existing // transaction. -func (s *StateStore) kvsLockTxn(tx *memdb.Txn, idx uint64, entry *structs.DirEntry) (bool, error) { +func (s *Store) kvsLockTxn(tx *memdb.Txn, idx uint64, entry *structs.DirEntry) (bool, error) { // Verify that a session is present. if entry.Session == "" { return false, fmt.Errorf("missing session") @@ -531,7 +531,7 @@ func (s *StateStore) kvsLockTxn(tx *memdb.Txn, idx uint64, entry *structs.DirEnt // KVSUnlock is similar to KVSSet but only performs the set if the lock can be // unlocked (the key must already exist and be locked). -func (s *StateStore) KVSUnlock(idx uint64, entry *structs.DirEntry) (bool, error) { +func (s *Store) KVSUnlock(idx uint64, entry *structs.DirEntry) (bool, error) { tx := s.db.Txn(true) defer tx.Abort() @@ -546,7 +546,7 @@ func (s *StateStore) KVSUnlock(idx uint64, entry *structs.DirEntry) (bool, error // kvsUnlockTxn is the inner method that does an unlock inside an existing // transaction. -func (s *StateStore) kvsUnlockTxn(tx *memdb.Txn, idx uint64, entry *structs.DirEntry) (bool, error) { +func (s *Store) kvsUnlockTxn(tx *memdb.Txn, idx uint64, entry *structs.DirEntry) (bool, error) { // Verify that a session is present. if entry.Session == "" { return false, fmt.Errorf("missing session") @@ -584,7 +584,7 @@ func (s *StateStore) kvsUnlockTxn(tx *memdb.Txn, idx uint64, entry *structs.DirE // kvsCheckSessionTxn checks to see if the given session matches the current // entry for a key. -func (s *StateStore) kvsCheckSessionTxn(tx *memdb.Txn, key string, session string) (*structs.DirEntry, error) { +func (s *Store) kvsCheckSessionTxn(tx *memdb.Txn, key string, session string) (*structs.DirEntry, error) { entry, err := tx.First("kvs", "id", key) if err != nil { return nil, fmt.Errorf("failed kvs lookup: %s", err) @@ -603,7 +603,7 @@ func (s *StateStore) kvsCheckSessionTxn(tx *memdb.Txn, key string, session strin // kvsCheckIndexTxn checks to see if the given modify index matches the current // entry for a key. -func (s *StateStore) kvsCheckIndexTxn(tx *memdb.Txn, key string, cidx uint64) (*structs.DirEntry, error) { +func (s *Store) kvsCheckIndexTxn(tx *memdb.Txn, key string, cidx uint64) (*structs.DirEntry, error) { entry, err := tx.First("kvs", "id", key) if err != nil { return nil, fmt.Errorf("failed kvs lookup: %s", err) diff --git a/consul/state/prepared_query.go b/consul/state/prepared_query.go index 2b15ca990..65f7cdf8b 100644 --- a/consul/state/prepared_query.go +++ b/consul/state/prepared_query.go @@ -45,7 +45,7 @@ func toPreparedQuery(wrapped interface{}) *structs.PreparedQuery { } // PreparedQueries is used to pull all the prepared queries from the snapshot. -func (s *StateSnapshot) PreparedQueries() (structs.PreparedQueries, error) { +func (s *Snapshot) PreparedQueries() (structs.PreparedQueries, error) { queries, err := s.tx.Get("prepared-queries", "id") if err != nil { return nil, err @@ -60,7 +60,7 @@ func (s *StateSnapshot) PreparedQueries() (structs.PreparedQueries, error) { // PreparedQuery is used when restoring from a snapshot. For general inserts, // use PreparedQuerySet. -func (s *StateRestore) PreparedQuery(query *structs.PreparedQuery) error { +func (s *Restore) PreparedQuery(query *structs.PreparedQuery) error { // If this is a template, compile it, otherwise leave the compiled // template field nil. var ct *prepared_query.CompiledTemplate @@ -84,7 +84,7 @@ func (s *StateRestore) PreparedQuery(query *structs.PreparedQuery) error { } // PreparedQuerySet is used to create or update a prepared query. -func (s *StateStore) PreparedQuerySet(idx uint64, query *structs.PreparedQuery) error { +func (s *Store) PreparedQuerySet(idx uint64, query *structs.PreparedQuery) error { tx := s.db.Txn(true) defer tx.Abort() @@ -98,7 +98,7 @@ func (s *StateStore) PreparedQuerySet(idx uint64, query *structs.PreparedQuery) // preparedQuerySetTxn is the inner method used to insert a prepared query with // the proper indexes into the state store. -func (s *StateStore) preparedQuerySetTxn(tx *memdb.Txn, idx uint64, query *structs.PreparedQuery) error { +func (s *Store) preparedQuerySetTxn(tx *memdb.Txn, idx uint64, query *structs.PreparedQuery) error { // Check that the ID is set. if query.ID == "" { return ErrMissingQueryID @@ -201,7 +201,7 @@ func (s *StateStore) preparedQuerySetTxn(tx *memdb.Txn, idx uint64, query *struc } // PreparedQueryDelete deletes the given query by ID. -func (s *StateStore) PreparedQueryDelete(idx uint64, queryID string) error { +func (s *Store) PreparedQueryDelete(idx uint64, queryID string) error { tx := s.db.Txn(true) defer tx.Abort() @@ -215,7 +215,7 @@ func (s *StateStore) PreparedQueryDelete(idx uint64, queryID string) error { // preparedQueryDeleteTxn is the inner method used to delete a prepared query // with the proper indexes into the state store. -func (s *StateStore) preparedQueryDeleteTxn(tx *memdb.Txn, idx uint64, queryID string) error { +func (s *Store) preparedQueryDeleteTxn(tx *memdb.Txn, idx uint64, queryID string) error { // Pull the query. wrapped, err := tx.First("prepared-queries", "id", queryID) if err != nil { @@ -237,7 +237,7 @@ func (s *StateStore) preparedQueryDeleteTxn(tx *memdb.Txn, idx uint64, queryID s } // PreparedQueryGet returns the given prepared query by ID. -func (s *StateStore) PreparedQueryGet(ws memdb.WatchSet, queryID string) (uint64, *structs.PreparedQuery, error) { +func (s *Store) PreparedQueryGet(ws memdb.WatchSet, queryID string) (uint64, *structs.PreparedQuery, error) { tx := s.db.Txn(false) defer tx.Abort() @@ -256,7 +256,7 @@ func (s *StateStore) PreparedQueryGet(ws memdb.WatchSet, queryID string) (uint64 // PreparedQueryResolve returns the given prepared query by looking up an ID or // Name. If the query was looked up by name and it's a template, then the // template will be rendered before it is returned. -func (s *StateStore) PreparedQueryResolve(queryIDOrName string) (uint64, *structs.PreparedQuery, error) { +func (s *Store) PreparedQueryResolve(queryIDOrName string) (uint64, *structs.PreparedQuery, error) { tx := s.db.Txn(false) defer tx.Abort() @@ -331,7 +331,7 @@ func (s *StateStore) PreparedQueryResolve(queryIDOrName string) (uint64, *struct } // PreparedQueryList returns all the prepared queries. -func (s *StateStore) PreparedQueryList(ws memdb.WatchSet) (uint64, structs.PreparedQueries, error) { +func (s *Store) PreparedQueryList(ws memdb.WatchSet) (uint64, structs.PreparedQueries, error) { tx := s.db.Txn(false) defer tx.Abort() diff --git a/consul/state/session.go b/consul/state/session.go index 2320ebf17..e41d43c7b 100644 --- a/consul/state/session.go +++ b/consul/state/session.go @@ -10,7 +10,7 @@ import ( ) // Sessions is used to pull the full list of sessions for use during snapshots. -func (s *StateSnapshot) Sessions() (memdb.ResultIterator, error) { +func (s *Snapshot) Sessions() (memdb.ResultIterator, error) { iter, err := s.tx.Get("sessions", "id") if err != nil { return nil, err @@ -20,7 +20,7 @@ func (s *StateSnapshot) Sessions() (memdb.ResultIterator, error) { // Session is used when restoring from a snapshot. For general inserts, use // SessionCreate. -func (s *StateRestore) Session(sess *structs.Session) error { +func (s *Restore) Session(sess *structs.Session) error { // Insert the session. if err := s.tx.Insert("sessions", sess); err != nil { return fmt.Errorf("failed inserting session: %s", err) @@ -47,7 +47,7 @@ func (s *StateRestore) Session(sess *structs.Session) error { } // SessionCreate is used to register a new session in the state store. -func (s *StateStore) SessionCreate(idx uint64, sess *structs.Session) error { +func (s *Store) SessionCreate(idx uint64, sess *structs.Session) error { tx := s.db.Txn(true) defer tx.Abort() @@ -70,7 +70,7 @@ func (s *StateStore) SessionCreate(idx uint64, sess *structs.Session) error { // sessionCreateTxn is the inner method used for creating session entries in // an open transaction. Any health checks registered with the session will be // checked for failing status. Returns any error encountered. -func (s *StateStore) sessionCreateTxn(tx *memdb.Txn, idx uint64, sess *structs.Session) error { +func (s *Store) sessionCreateTxn(tx *memdb.Txn, idx uint64, sess *structs.Session) error { // Check that we have a session ID if sess.ID == "" { return ErrMissingSessionID @@ -144,7 +144,7 @@ func (s *StateStore) sessionCreateTxn(tx *memdb.Txn, idx uint64, sess *structs.S } // SessionGet is used to retrieve an active session from the state store. -func (s *StateStore) SessionGet(ws memdb.WatchSet, sessionID string) (uint64, *structs.Session, error) { +func (s *Store) SessionGet(ws memdb.WatchSet, sessionID string) (uint64, *structs.Session, error) { tx := s.db.Txn(false) defer tx.Abort() @@ -164,7 +164,7 @@ func (s *StateStore) SessionGet(ws memdb.WatchSet, sessionID string) (uint64, *s } // SessionList returns a slice containing all of the active sessions. -func (s *StateStore) SessionList(ws memdb.WatchSet) (uint64, structs.Sessions, error) { +func (s *Store) SessionList(ws memdb.WatchSet) (uint64, structs.Sessions, error) { tx := s.db.Txn(false) defer tx.Abort() @@ -189,7 +189,7 @@ func (s *StateStore) SessionList(ws memdb.WatchSet) (uint64, structs.Sessions, e // NodeSessions returns a set of active sessions associated // with the given node ID. The returned index is the highest // index seen from the result set. -func (s *StateStore) NodeSessions(ws memdb.WatchSet, nodeID string) (uint64, structs.Sessions, error) { +func (s *Store) NodeSessions(ws memdb.WatchSet, nodeID string) (uint64, structs.Sessions, error) { tx := s.db.Txn(false) defer tx.Abort() @@ -214,7 +214,7 @@ func (s *StateStore) NodeSessions(ws memdb.WatchSet, nodeID string) (uint64, str // SessionDestroy is used to remove an active session. This will // implicitly invalidate the session and invoke the specified // session destroy behavior. -func (s *StateStore) SessionDestroy(idx uint64, sessionID string) error { +func (s *Store) SessionDestroy(idx uint64, sessionID string) error { tx := s.db.Txn(true) defer tx.Abort() @@ -229,7 +229,7 @@ func (s *StateStore) SessionDestroy(idx uint64, sessionID string) error { // deleteSessionTxn is the inner method, which is used to do the actual // session deletion and handle session invalidation, etc. -func (s *StateStore) deleteSessionTxn(tx *memdb.Txn, idx uint64, sessionID string) error { +func (s *Store) deleteSessionTxn(tx *memdb.Txn, idx uint64, sessionID string) error { // Look up the session. sess, err := tx.First("sessions", "id", sessionID) if err != nil { diff --git a/consul/state/state_store.go b/consul/state/state_store.go index 837a4f6f8..94947f366 100644 --- a/consul/state/state_store.go +++ b/consul/state/state_store.go @@ -42,11 +42,11 @@ const ( watchLimit = 2048 ) -// StateStore is where we store all of Consul's state, including +// Store is where we store all of Consul's state, including // records of node registrations, services, checks, key/value // pairs and more. The DB is entirely in-memory and is constructed // from the Raft log through the FSM. -type StateStore struct { +type Store struct { schema *memdb.DBSchema db *memdb.MemDB @@ -61,18 +61,18 @@ type StateStore struct { lockDelay *Delay } -// StateSnapshot is used to provide a point-in-time snapshot. It +// Snapshot is used to provide a point-in-time snapshot. It // works by starting a read transaction against the whole state store. -type StateSnapshot struct { - store *StateStore +type Snapshot struct { + store *Store tx *memdb.Txn lastIndex uint64 } -// StateRestore is used to efficiently manage restoring a large amount of +// Restore is used to efficiently manage restoring a large amount of // data to a state store. -type StateRestore struct { - store *StateStore +type Restore struct { + store *Store tx *memdb.Txn } @@ -93,7 +93,7 @@ type sessionCheck struct { } // NewStateStore creates a new in-memory state storage layer. -func NewStateStore(gc *TombstoneGC) (*StateStore, error) { +func NewStateStore(gc *TombstoneGC) (*Store, error) { // Create the in-memory DB. schema := stateStoreSchema() db, err := memdb.NewMemDB(schema) @@ -102,7 +102,7 @@ func NewStateStore(gc *TombstoneGC) (*StateStore, error) { } // Create and return the state store. - s := &StateStore{ + s := &Store{ schema: schema, db: db, abandonCh: make(chan struct{}), @@ -113,7 +113,7 @@ func NewStateStore(gc *TombstoneGC) (*StateStore, error) { } // Snapshot is used to create a point-in-time snapshot of the entire db. -func (s *StateStore) Snapshot() *StateSnapshot { +func (s *Store) Snapshot() *Snapshot { tx := s.db.Txn(false) var tables []string @@ -122,54 +122,54 @@ func (s *StateStore) Snapshot() *StateSnapshot { } idx := maxIndexTxn(tx, tables...) - return &StateSnapshot{s, tx, idx} + return &Snapshot{s, tx, idx} } // LastIndex returns that last index that affects the snapshotted data. -func (s *StateSnapshot) LastIndex() uint64 { +func (s *Snapshot) LastIndex() uint64 { return s.lastIndex } // Close performs cleanup of a state snapshot. -func (s *StateSnapshot) Close() { +func (s *Snapshot) Close() { s.tx.Abort() } // Restore is used to efficiently manage restoring a large amount of data into // the state store. It works by doing all the restores inside of a single // transaction. -func (s *StateStore) Restore() *StateRestore { +func (s *Store) Restore() *Restore { tx := s.db.Txn(true) - return &StateRestore{s, tx} + return &Restore{s, tx} } // Abort abandons the changes made by a restore. This or Commit should always be // called. -func (s *StateRestore) Abort() { +func (s *Restore) Abort() { s.tx.Abort() } // Commit commits the changes made by a restore. This or Abort should always be // called. -func (s *StateRestore) Commit() { +func (s *Restore) Commit() { s.tx.Commit() } // AbandonCh returns a channel you can wait on to know if the state store was // abandoned. -func (s *StateStore) AbandonCh() <-chan struct{} { +func (s *Store) AbandonCh() <-chan struct{} { return s.abandonCh } // Abandon is used to signal that the given state store has been abandoned. // Calling this more than one time will panic. -func (s *StateStore) Abandon() { +func (s *Store) Abandon() { close(s.abandonCh) } // maxIndex is a helper used to retrieve the highest known index // amongst a set of tables in the db. -func (s *StateStore) maxIndex(tables ...string) uint64 { +func (s *Store) maxIndex(tables ...string) uint64 { tx := s.db.Txn(false) defer tx.Abort() return maxIndexTxn(tx, tables...) diff --git a/consul/state/state_store_test.go b/consul/state/state_store_test.go index e58b71e6d..73f33db3a 100644 --- a/consul/state/state_store_test.go +++ b/consul/state/state_store_test.go @@ -25,7 +25,7 @@ func testUUID() string { buf[10:16]) } -func testStateStore(t *testing.T) *StateStore { +func testStateStore(t *testing.T) *Store { s, err := NewStateStore(nil) if err != nil { t.Fatalf("err: %s", err) @@ -36,11 +36,11 @@ func testStateStore(t *testing.T) *StateStore { return s } -func testRegisterNode(t *testing.T, s *StateStore, idx uint64, nodeID string) { +func testRegisterNode(t *testing.T, s *Store, idx uint64, nodeID string) { testRegisterNodeWithMeta(t, s, idx, nodeID, nil) } -func testRegisterNodeWithMeta(t *testing.T, s *StateStore, idx uint64, nodeID string, meta map[string]string) { +func testRegisterNodeWithMeta(t *testing.T, s *Store, idx uint64, nodeID string, meta map[string]string) { node := &structs.Node{Node: nodeID, Meta: meta} if err := s.EnsureNode(idx, node); err != nil { t.Fatalf("err: %s", err) @@ -57,7 +57,7 @@ func testRegisterNodeWithMeta(t *testing.T, s *StateStore, idx uint64, nodeID st } } -func testRegisterService(t *testing.T, s *StateStore, idx uint64, nodeID, serviceID string) { +func testRegisterService(t *testing.T, s *Store, idx uint64, nodeID, serviceID string) { svc := &structs.NodeService{ ID: serviceID, Service: serviceID, @@ -81,7 +81,7 @@ func testRegisterService(t *testing.T, s *StateStore, idx uint64, nodeID, servic } } -func testRegisterCheck(t *testing.T, s *StateStore, idx uint64, +func testRegisterCheck(t *testing.T, s *Store, idx uint64, nodeID string, serviceID string, checkID types.CheckID, state string) { chk := &structs.HealthCheck{ Node: nodeID, @@ -107,7 +107,7 @@ func testRegisterCheck(t *testing.T, s *StateStore, idx uint64, } } -func testSetKey(t *testing.T, s *StateStore, idx uint64, key, value string) { +func testSetKey(t *testing.T, s *Store, idx uint64, key, value string) { entry := &structs.DirEntry{Key: key, Value: []byte(value)} if err := s.KVSSet(idx, entry); err != nil { t.Fatalf("err: %s", err) diff --git a/consul/state/txn.go b/consul/state/txn.go index 60fb3c2e8..b821d13d3 100644 --- a/consul/state/txn.go +++ b/consul/state/txn.go @@ -9,7 +9,7 @@ import ( ) // txnKVS handles all KV-related operations. -func (s *StateStore) txnKVS(tx *memdb.Txn, idx uint64, op *structs.TxnKVOp) (structs.TxnResults, error) { +func (s *Store) txnKVS(tx *memdb.Txn, idx uint64, op *structs.TxnKVOp) (structs.TxnResults, error) { var entry *structs.DirEntry var err error @@ -111,7 +111,7 @@ func (s *StateStore) txnKVS(tx *memdb.Txn, idx uint64, op *structs.TxnKVOp) (str } // txnDispatch runs the given operations inside the state store transaction. -func (s *StateStore) txnDispatch(tx *memdb.Txn, idx uint64, ops structs.TxnOps) (structs.TxnResults, structs.TxnErrors) { +func (s *Store) txnDispatch(tx *memdb.Txn, idx uint64, ops structs.TxnOps) (structs.TxnResults, structs.TxnErrors) { results := make(structs.TxnResults, 0, len(ops)) errors := make(structs.TxnErrors, 0, len(ops)) for i, op := range ops { @@ -149,7 +149,7 @@ func (s *StateStore) txnDispatch(tx *memdb.Txn, idx uint64, ops structs.TxnOps) // any of the operations fail, the entire transaction will be rolled back. This // is done in a full write transaction on the state store, so reads and writes // are possible -func (s *StateStore) TxnRW(idx uint64, ops structs.TxnOps) (structs.TxnResults, structs.TxnErrors) { +func (s *Store) TxnRW(idx uint64, ops structs.TxnOps) (structs.TxnResults, structs.TxnErrors) { tx := s.db.Txn(true) defer tx.Abort() @@ -165,7 +165,7 @@ func (s *StateStore) TxnRW(idx uint64, ops structs.TxnOps) (structs.TxnResults, // TxnRO runs the given operations inside a single read transaction in the state // store. You must verify outside this function that no write operations are // present, otherwise you'll get an error from the state store. -func (s *StateStore) TxnRO(ops structs.TxnOps) (structs.TxnResults, structs.TxnErrors) { +func (s *Store) TxnRO(ops structs.TxnOps) (structs.TxnResults, structs.TxnErrors) { tx := s.db.Txn(false) defer tx.Abort() diff --git a/watch/funcs.go b/watch/funcs.go index 2d267c163..0fd7fdb9e 100644 --- a/watch/funcs.go +++ b/watch/funcs.go @@ -8,7 +8,7 @@ import ( // watchFactory is a function that can create a new WatchFunc // from a parameter configuration -type watchFactory func(params map[string]interface{}) (WatchFunc, error) +type watchFactory func(params map[string]interface{}) (WatcherFunc, error) // watchFuncFactory maps each type to a factory function var watchFuncFactory map[string]watchFactory @@ -26,7 +26,7 @@ func init() { } // keyWatch is used to return a key watching function -func keyWatch(params map[string]interface{}) (WatchFunc, error) { +func keyWatch(params map[string]interface{}) (WatcherFunc, error) { stale := false if err := assignValueBool(params, "stale", &stale); err != nil { return nil, err @@ -39,7 +39,7 @@ func keyWatch(params map[string]interface{}) (WatchFunc, error) { if key == "" { return nil, fmt.Errorf("Must specify a single key to watch") } - fn := func(p *WatchPlan) (uint64, interface{}, error) { + fn := func(p *Plan) (uint64, interface{}, error) { kv := p.client.KV() opts := consulapi.QueryOptions{AllowStale: stale, WaitIndex: p.lastIndex} pair, meta, err := kv.Get(key, &opts) @@ -55,7 +55,7 @@ func keyWatch(params map[string]interface{}) (WatchFunc, error) { } // keyPrefixWatch is used to return a key prefix watching function -func keyPrefixWatch(params map[string]interface{}) (WatchFunc, error) { +func keyPrefixWatch(params map[string]interface{}) (WatcherFunc, error) { stale := false if err := assignValueBool(params, "stale", &stale); err != nil { return nil, err @@ -68,7 +68,7 @@ func keyPrefixWatch(params map[string]interface{}) (WatchFunc, error) { if prefix == "" { return nil, fmt.Errorf("Must specify a single prefix to watch") } - fn := func(p *WatchPlan) (uint64, interface{}, error) { + fn := func(p *Plan) (uint64, interface{}, error) { kv := p.client.KV() opts := consulapi.QueryOptions{AllowStale: stale, WaitIndex: p.lastIndex} pairs, meta, err := kv.List(prefix, &opts) @@ -81,13 +81,13 @@ func keyPrefixWatch(params map[string]interface{}) (WatchFunc, error) { } // servicesWatch is used to watch the list of available services -func servicesWatch(params map[string]interface{}) (WatchFunc, error) { +func servicesWatch(params map[string]interface{}) (WatcherFunc, error) { stale := false if err := assignValueBool(params, "stale", &stale); err != nil { return nil, err } - fn := func(p *WatchPlan) (uint64, interface{}, error) { + fn := func(p *Plan) (uint64, interface{}, error) { catalog := p.client.Catalog() opts := consulapi.QueryOptions{AllowStale: stale, WaitIndex: p.lastIndex} services, meta, err := catalog.Services(&opts) @@ -100,13 +100,13 @@ func servicesWatch(params map[string]interface{}) (WatchFunc, error) { } // nodesWatch is used to watch the list of available nodes -func nodesWatch(params map[string]interface{}) (WatchFunc, error) { +func nodesWatch(params map[string]interface{}) (WatcherFunc, error) { stale := false if err := assignValueBool(params, "stale", &stale); err != nil { return nil, err } - fn := func(p *WatchPlan) (uint64, interface{}, error) { + fn := func(p *Plan) (uint64, interface{}, error) { catalog := p.client.Catalog() opts := consulapi.QueryOptions{AllowStale: stale, WaitIndex: p.lastIndex} nodes, meta, err := catalog.Nodes(&opts) @@ -119,7 +119,7 @@ func nodesWatch(params map[string]interface{}) (WatchFunc, error) { } // serviceWatch is used to watch a specific service for changes -func serviceWatch(params map[string]interface{}) (WatchFunc, error) { +func serviceWatch(params map[string]interface{}) (WatcherFunc, error) { stale := false if err := assignValueBool(params, "stale", &stale); err != nil { return nil, err @@ -142,7 +142,7 @@ func serviceWatch(params map[string]interface{}) (WatchFunc, error) { return nil, err } - fn := func(p *WatchPlan) (uint64, interface{}, error) { + fn := func(p *Plan) (uint64, interface{}, error) { health := p.client.Health() opts := consulapi.QueryOptions{AllowStale: stale, WaitIndex: p.lastIndex} nodes, meta, err := health.Service(service, tag, passingOnly, &opts) @@ -155,7 +155,7 @@ func serviceWatch(params map[string]interface{}) (WatchFunc, error) { } // checksWatch is used to watch a specific checks in a given state -func checksWatch(params map[string]interface{}) (WatchFunc, error) { +func checksWatch(params map[string]interface{}) (WatcherFunc, error) { stale := false if err := assignValueBool(params, "stale", &stale); err != nil { return nil, err @@ -175,7 +175,7 @@ func checksWatch(params map[string]interface{}) (WatchFunc, error) { state = "any" } - fn := func(p *WatchPlan) (uint64, interface{}, error) { + fn := func(p *Plan) (uint64, interface{}, error) { health := p.client.Health() opts := consulapi.QueryOptions{AllowStale: stale, WaitIndex: p.lastIndex} var checks []*consulapi.HealthCheck @@ -195,7 +195,7 @@ func checksWatch(params map[string]interface{}) (WatchFunc, error) { } // eventWatch is used to watch for events, optionally filtering on name -func eventWatch(params map[string]interface{}) (WatchFunc, error) { +func eventWatch(params map[string]interface{}) (WatcherFunc, error) { // The stale setting doesn't apply to events. var name string @@ -203,7 +203,7 @@ func eventWatch(params map[string]interface{}) (WatchFunc, error) { return nil, err } - fn := func(p *WatchPlan) (uint64, interface{}, error) { + fn := func(p *Plan) (uint64, interface{}, error) { event := p.client.Event() opts := consulapi.QueryOptions{WaitIndex: p.lastIndex} events, meta, err := event.List(name, &opts) diff --git a/watch/plan.go b/watch/plan.go index 0fd4a747e..8dc877968 100644 --- a/watch/plan.go +++ b/watch/plan.go @@ -20,7 +20,7 @@ const ( ) // Run is used to run a watch plan -func (p *WatchPlan) Run(address string) error { +func (p *Plan) Run(address string) error { // Setup the client p.address = address conf := consulapi.DefaultConfig() @@ -45,7 +45,7 @@ func (p *WatchPlan) Run(address string) error { OUTER: for !p.shouldStop() { // Invoke the handler - index, result, err := p.Func(p) + index, result, err := p.Watcher(p) // Check if we should terminate since the function // could have blocked for a while @@ -96,7 +96,7 @@ OUTER: } // Stop is used to stop running the watch plan -func (p *WatchPlan) Stop() { +func (p *Plan) Stop() { p.stopLock.Lock() defer p.stopLock.Unlock() if p.stop { @@ -106,7 +106,7 @@ func (p *WatchPlan) Stop() { close(p.stopCh) } -func (p *WatchPlan) shouldStop() bool { +func (p *Plan) shouldStop() bool { select { case <-p.stopCh: return true diff --git a/watch/plan_test.go b/watch/plan_test.go index d545165a6..4bb7d8038 100644 --- a/watch/plan_test.go +++ b/watch/plan_test.go @@ -9,15 +9,15 @@ func init() { watchFuncFactory["noop"] = noopWatch } -func noopWatch(params map[string]interface{}) (WatchFunc, error) { - fn := func(p *WatchPlan) (uint64, interface{}, error) { +func noopWatch(params map[string]interface{}) (WatcherFunc, error) { + fn := func(p *Plan) (uint64, interface{}, error) { idx := p.lastIndex + 1 return idx, idx, nil } return fn, nil } -func mustParse(t *testing.T, q string) *WatchPlan { +func mustParse(t *testing.T, q string) *Plan { params := makeParams(t, q) plan, err := Parse(params) if err != nil { diff --git a/watch/watch.go b/watch/watch.go index 7283e3bde..79a8bbcde 100644 --- a/watch/watch.go +++ b/watch/watch.go @@ -8,17 +8,17 @@ import ( consulapi "github.com/hashicorp/consul/api" ) -// WatchPlan is the parsed version of a watch specification. A watch provides +// Plan is the parsed version of a watch specification. A watch provides // the details of a query, which generates a view into the Consul data store. // This view is watched for changes and a handler is invoked to take any // appropriate actions. -type WatchPlan struct { +type Plan struct { Datacenter string Token string Type string Exempt map[string]interface{} - Func WatchFunc + Watcher WatcherFunc Handler HandlerFunc LogOutput io.Writer @@ -32,21 +32,21 @@ type WatchPlan struct { stopLock sync.Mutex } -// WatchFunc is used to watch for a diff -type WatchFunc func(*WatchPlan) (uint64, interface{}, error) +// WatcherFunc is used to watch for a diff +type WatcherFunc func(*Plan) (uint64, interface{}, error) // HandlerFunc is used to handle new data type HandlerFunc func(uint64, interface{}) // Parse takes a watch query and compiles it into a WatchPlan or an error -func Parse(params map[string]interface{}) (*WatchPlan, error) { +func Parse(params map[string]interface{}) (*Plan, error) { return ParseExempt(params, nil) } // ParseExempt takes a watch query and compiles it into a WatchPlan or an error // Any exempt parameters are stored in the Exempt map -func ParseExempt(params map[string]interface{}, exempt []string) (*WatchPlan, error) { - plan := &WatchPlan{ +func ParseExempt(params map[string]interface{}, exempt []string) (*Plan, error) { + plan := &Plan{ stopCh: make(chan struct{}), } @@ -77,7 +77,7 @@ func ParseExempt(params map[string]interface{}, exempt []string) (*WatchPlan, er if err != nil { return nil, err } - plan.Func = fn + plan.Watcher = fn // Remove the exempt parameters if len(exempt) > 0 {