golint: No stutter
This commit is contained in:
parent
8c7bb7b65a
commit
f50d6871f9
|
@ -16,7 +16,7 @@ import (
|
||||||
"github.com/hashicorp/serf/serf"
|
"github.com/hashicorp/serf/serf"
|
||||||
)
|
)
|
||||||
|
|
||||||
type AgentSelf struct {
|
type Self struct {
|
||||||
Config *Config
|
Config *Config
|
||||||
Coord *coordinate.Coordinate
|
Coord *coordinate.Coordinate
|
||||||
Member serf.Member
|
Member serf.Member
|
||||||
|
@ -44,7 +44,7 @@ func (s *HTTPServer) AgentSelf(resp http.ResponseWriter, req *http.Request) (int
|
||||||
return nil, errPermissionDenied
|
return nil, errPermissionDenied
|
||||||
}
|
}
|
||||||
|
|
||||||
return AgentSelf{
|
return Self{
|
||||||
Config: s.agent.config,
|
Config: s.agent.config,
|
||||||
Coord: c,
|
Coord: c,
|
||||||
Member: s.agent.LocalMember(),
|
Member: s.agent.LocalMember(),
|
||||||
|
|
|
@ -223,7 +223,7 @@ func TestAgent_Self(t *testing.T) {
|
||||||
t.Fatalf("err: %v", err)
|
t.Fatalf("err: %v", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
val := obj.(AgentSelf)
|
val := obj.(Self)
|
||||||
if int(val.Member.Port) != srv.agent.config.Ports.SerfLan {
|
if int(val.Member.Port) != srv.agent.config.Ports.SerfLan {
|
||||||
t.Fatalf("incorrect port: %v", obj)
|
t.Fatalf("incorrect port: %v", obj)
|
||||||
}
|
}
|
||||||
|
|
|
@ -1090,7 +1090,7 @@ func (c *Command) Run(args []string) int {
|
||||||
|
|
||||||
// Register the watches
|
// Register the watches
|
||||||
for _, wp := range config.WatchPlans {
|
for _, wp := range config.WatchPlans {
|
||||||
go func(wp *watch.WatchPlan) {
|
go func(wp *watch.Plan) {
|
||||||
wp.Handler = makeWatchHandler(logOutput, wp.Exempt["handler"])
|
wp.Handler = makeWatchHandler(logOutput, wp.Exempt["handler"])
|
||||||
wp.LogOutput = c.logOutput
|
wp.LogOutput = c.logOutput
|
||||||
addr := httpAddr.String()
|
addr := httpAddr.String()
|
||||||
|
@ -1307,7 +1307,7 @@ func (c *Command) handleReload(config *Config) (*Config, error) {
|
||||||
|
|
||||||
// Register the new watches
|
// Register the new watches
|
||||||
for _, wp := range newConf.WatchPlans {
|
for _, wp := range newConf.WatchPlans {
|
||||||
go func(wp *watch.WatchPlan) {
|
go func(wp *watch.Plan) {
|
||||||
wp.Handler = makeWatchHandler(c.logOutput, wp.Exempt["handler"])
|
wp.Handler = makeWatchHandler(c.logOutput, wp.Exempt["handler"])
|
||||||
wp.LogOutput = c.logOutput
|
wp.LogOutput = c.logOutput
|
||||||
if err := wp.Run(httpAddr.String()); err != nil {
|
if err := wp.Run(httpAddr.String()); err != nil {
|
||||||
|
|
|
@ -718,7 +718,7 @@ type Config struct {
|
||||||
VersionPrerelease string `mapstructure:"-"`
|
VersionPrerelease string `mapstructure:"-"`
|
||||||
|
|
||||||
// WatchPlans contains the compiled watches
|
// WatchPlans contains the compiled watches
|
||||||
WatchPlans []*watch.WatchPlan `mapstructure:"-" json:"-"`
|
WatchPlans []*watch.Plan `mapstructure:"-" json:"-"`
|
||||||
|
|
||||||
// UnixSockets is a map of socket configuration data
|
// UnixSockets is a map of socket configuration data
|
||||||
UnixSockets UnixSocketConfig `mapstructure:"unix_sockets"`
|
UnixSockets UnixSocketConfig `mapstructure:"unix_sockets"`
|
||||||
|
|
|
@ -148,7 +148,7 @@ func (a *ACL) Get(args *structs.ACLSpecificRequest,
|
||||||
|
|
||||||
return a.srv.blockingQuery(&args.QueryOptions,
|
return a.srv.blockingQuery(&args.QueryOptions,
|
||||||
&reply.QueryMeta,
|
&reply.QueryMeta,
|
||||||
func(ws memdb.WatchSet, state *state.StateStore) error {
|
func(ws memdb.WatchSet, state *state.Store) error {
|
||||||
index, acl, err := state.ACLGet(ws, args.ACL)
|
index, acl, err := state.ACLGet(ws, args.ACL)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
|
@ -225,7 +225,7 @@ func (a *ACL) List(args *structs.DCSpecificRequest,
|
||||||
|
|
||||||
return a.srv.blockingQuery(&args.QueryOptions,
|
return a.srv.blockingQuery(&args.QueryOptions,
|
||||||
&reply.QueryMeta,
|
&reply.QueryMeta,
|
||||||
func(ws memdb.WatchSet, state *state.StateStore) error {
|
func(ws memdb.WatchSet, state *state.Store) error {
|
||||||
index, acls, err := state.ACLList(ws)
|
index, acls, err := state.ACLList(ws)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
|
|
|
@ -169,7 +169,7 @@ func (c *Catalog) ListNodes(args *structs.DCSpecificRequest, reply *structs.Inde
|
||||||
return c.srv.blockingQuery(
|
return c.srv.blockingQuery(
|
||||||
&args.QueryOptions,
|
&args.QueryOptions,
|
||||||
&reply.QueryMeta,
|
&reply.QueryMeta,
|
||||||
func(ws memdb.WatchSet, state *state.StateStore) error {
|
func(ws memdb.WatchSet, state *state.Store) error {
|
||||||
var index uint64
|
var index uint64
|
||||||
var nodes structs.Nodes
|
var nodes structs.Nodes
|
||||||
var err error
|
var err error
|
||||||
|
@ -199,7 +199,7 @@ func (c *Catalog) ListServices(args *structs.DCSpecificRequest, reply *structs.I
|
||||||
return c.srv.blockingQuery(
|
return c.srv.blockingQuery(
|
||||||
&args.QueryOptions,
|
&args.QueryOptions,
|
||||||
&reply.QueryMeta,
|
&reply.QueryMeta,
|
||||||
func(ws memdb.WatchSet, state *state.StateStore) error {
|
func(ws memdb.WatchSet, state *state.Store) error {
|
||||||
var index uint64
|
var index uint64
|
||||||
var services structs.Services
|
var services structs.Services
|
||||||
var err error
|
var err error
|
||||||
|
@ -231,7 +231,7 @@ func (c *Catalog) ServiceNodes(args *structs.ServiceSpecificRequest, reply *stru
|
||||||
err := c.srv.blockingQuery(
|
err := c.srv.blockingQuery(
|
||||||
&args.QueryOptions,
|
&args.QueryOptions,
|
||||||
&reply.QueryMeta,
|
&reply.QueryMeta,
|
||||||
func(ws memdb.WatchSet, state *state.StateStore) error {
|
func(ws memdb.WatchSet, state *state.Store) error {
|
||||||
var index uint64
|
var index uint64
|
||||||
var services structs.ServiceNodes
|
var services structs.ServiceNodes
|
||||||
var err error
|
var err error
|
||||||
|
@ -286,7 +286,7 @@ func (c *Catalog) NodeServices(args *structs.NodeSpecificRequest, reply *structs
|
||||||
return c.srv.blockingQuery(
|
return c.srv.blockingQuery(
|
||||||
&args.QueryOptions,
|
&args.QueryOptions,
|
||||||
&reply.QueryMeta,
|
&reply.QueryMeta,
|
||||||
func(ws memdb.WatchSet, state *state.StateStore) error {
|
func(ws memdb.WatchSet, state *state.Store) error {
|
||||||
index, services, err := state.NodeServices(ws, args.Node)
|
index, services, err := state.NodeServices(ws, args.Node)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
|
|
|
@ -169,7 +169,7 @@ func (c *Coordinate) ListNodes(args *structs.DCSpecificRequest, reply *structs.I
|
||||||
|
|
||||||
return c.srv.blockingQuery(&args.QueryOptions,
|
return c.srv.blockingQuery(&args.QueryOptions,
|
||||||
&reply.QueryMeta,
|
&reply.QueryMeta,
|
||||||
func(ws memdb.WatchSet, state *state.StateStore) error {
|
func(ws memdb.WatchSet, state *state.Store) error {
|
||||||
index, coords, err := state.Coordinates(ws)
|
index, coords, err := state.Coordinates(ws)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
|
|
|
@ -31,7 +31,7 @@ type consulFSM struct {
|
||||||
// new state store). Everything internal here is synchronized by the
|
// new state store). Everything internal here is synchronized by the
|
||||||
// Raft side, so doesn't need to lock this.
|
// Raft side, so doesn't need to lock this.
|
||||||
stateLock sync.RWMutex
|
stateLock sync.RWMutex
|
||||||
state *state.StateStore
|
state *state.Store
|
||||||
|
|
||||||
gc *state.TombstoneGC
|
gc *state.TombstoneGC
|
||||||
}
|
}
|
||||||
|
@ -40,7 +40,7 @@ type consulFSM struct {
|
||||||
// state in a way that can be accessed concurrently with operations
|
// state in a way that can be accessed concurrently with operations
|
||||||
// that may modify the live state.
|
// that may modify the live state.
|
||||||
type consulSnapshot struct {
|
type consulSnapshot struct {
|
||||||
state *state.StateSnapshot
|
state *state.Snapshot
|
||||||
}
|
}
|
||||||
|
|
||||||
// snapshotHeader is the first entry in our snapshot
|
// snapshotHeader is the first entry in our snapshot
|
||||||
|
@ -67,7 +67,7 @@ func NewFSM(gc *state.TombstoneGC, logOutput io.Writer) (*consulFSM, error) {
|
||||||
}
|
}
|
||||||
|
|
||||||
// State is used to return a handle to the current state
|
// State is used to return a handle to the current state
|
||||||
func (c *consulFSM) State() *state.StateStore {
|
func (c *consulFSM) State() *state.Store {
|
||||||
c.stateLock.RLock()
|
c.stateLock.RLock()
|
||||||
defer c.stateLock.RUnlock()
|
defer c.stateLock.RUnlock()
|
||||||
return c.state
|
return c.state
|
||||||
|
|
|
@ -23,7 +23,7 @@ func (h *Health) ChecksInState(args *structs.ChecksInStateRequest,
|
||||||
return h.srv.blockingQuery(
|
return h.srv.blockingQuery(
|
||||||
&args.QueryOptions,
|
&args.QueryOptions,
|
||||||
&reply.QueryMeta,
|
&reply.QueryMeta,
|
||||||
func(ws memdb.WatchSet, state *state.StateStore) error {
|
func(ws memdb.WatchSet, state *state.Store) error {
|
||||||
var index uint64
|
var index uint64
|
||||||
var checks structs.HealthChecks
|
var checks structs.HealthChecks
|
||||||
var err error
|
var err error
|
||||||
|
@ -53,7 +53,7 @@ func (h *Health) NodeChecks(args *structs.NodeSpecificRequest,
|
||||||
return h.srv.blockingQuery(
|
return h.srv.blockingQuery(
|
||||||
&args.QueryOptions,
|
&args.QueryOptions,
|
||||||
&reply.QueryMeta,
|
&reply.QueryMeta,
|
||||||
func(ws memdb.WatchSet, state *state.StateStore) error {
|
func(ws memdb.WatchSet, state *state.Store) error {
|
||||||
index, checks, err := state.NodeChecks(ws, args.Node)
|
index, checks, err := state.NodeChecks(ws, args.Node)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
|
@ -79,7 +79,7 @@ func (h *Health) ServiceChecks(args *structs.ServiceSpecificRequest,
|
||||||
return h.srv.blockingQuery(
|
return h.srv.blockingQuery(
|
||||||
&args.QueryOptions,
|
&args.QueryOptions,
|
||||||
&reply.QueryMeta,
|
&reply.QueryMeta,
|
||||||
func(ws memdb.WatchSet, state *state.StateStore) error {
|
func(ws memdb.WatchSet, state *state.Store) error {
|
||||||
var index uint64
|
var index uint64
|
||||||
var checks structs.HealthChecks
|
var checks structs.HealthChecks
|
||||||
var err error
|
var err error
|
||||||
|
@ -113,7 +113,7 @@ func (h *Health) ServiceNodes(args *structs.ServiceSpecificRequest, reply *struc
|
||||||
err := h.srv.blockingQuery(
|
err := h.srv.blockingQuery(
|
||||||
&args.QueryOptions,
|
&args.QueryOptions,
|
||||||
&reply.QueryMeta,
|
&reply.QueryMeta,
|
||||||
func(ws memdb.WatchSet, state *state.StateStore) error {
|
func(ws memdb.WatchSet, state *state.Store) error {
|
||||||
var index uint64
|
var index uint64
|
||||||
var nodes structs.CheckServiceNodes
|
var nodes structs.CheckServiceNodes
|
||||||
var err error
|
var err error
|
||||||
|
|
|
@ -26,7 +26,7 @@ func (m *Internal) NodeInfo(args *structs.NodeSpecificRequest,
|
||||||
return m.srv.blockingQuery(
|
return m.srv.blockingQuery(
|
||||||
&args.QueryOptions,
|
&args.QueryOptions,
|
||||||
&reply.QueryMeta,
|
&reply.QueryMeta,
|
||||||
func(ws memdb.WatchSet, state *state.StateStore) error {
|
func(ws memdb.WatchSet, state *state.Store) error {
|
||||||
index, dump, err := state.NodeInfo(ws, args.Node)
|
index, dump, err := state.NodeInfo(ws, args.Node)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
|
@ -47,7 +47,7 @@ func (m *Internal) NodeDump(args *structs.DCSpecificRequest,
|
||||||
return m.srv.blockingQuery(
|
return m.srv.blockingQuery(
|
||||||
&args.QueryOptions,
|
&args.QueryOptions,
|
||||||
&reply.QueryMeta,
|
&reply.QueryMeta,
|
||||||
func(ws memdb.WatchSet, state *state.StateStore) error {
|
func(ws memdb.WatchSet, state *state.Store) error {
|
||||||
index, dump, err := state.NodeDump(ws)
|
index, dump, err := state.NodeDump(ws)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
|
|
|
@ -123,7 +123,7 @@ func (k *KVS) Get(args *structs.KeyRequest, reply *structs.IndexedDirEntries) er
|
||||||
return k.srv.blockingQuery(
|
return k.srv.blockingQuery(
|
||||||
&args.QueryOptions,
|
&args.QueryOptions,
|
||||||
&reply.QueryMeta,
|
&reply.QueryMeta,
|
||||||
func(ws memdb.WatchSet, state *state.StateStore) error {
|
func(ws memdb.WatchSet, state *state.Store) error {
|
||||||
index, ent, err := state.KVSGet(ws, args.Key)
|
index, ent, err := state.KVSGet(ws, args.Key)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
|
@ -162,7 +162,7 @@ func (k *KVS) List(args *structs.KeyRequest, reply *structs.IndexedDirEntries) e
|
||||||
return k.srv.blockingQuery(
|
return k.srv.blockingQuery(
|
||||||
&args.QueryOptions,
|
&args.QueryOptions,
|
||||||
&reply.QueryMeta,
|
&reply.QueryMeta,
|
||||||
func(ws memdb.WatchSet, state *state.StateStore) error {
|
func(ws memdb.WatchSet, state *state.Store) error {
|
||||||
index, ent, err := state.KVSList(ws, args.Key)
|
index, ent, err := state.KVSList(ws, args.Key)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
|
@ -202,7 +202,7 @@ func (k *KVS) ListKeys(args *structs.KeyListRequest, reply *structs.IndexedKeyLi
|
||||||
return k.srv.blockingQuery(
|
return k.srv.blockingQuery(
|
||||||
&args.QueryOptions,
|
&args.QueryOptions,
|
||||||
&reply.QueryMeta,
|
&reply.QueryMeta,
|
||||||
func(ws memdb.WatchSet, state *state.StateStore) error {
|
func(ws memdb.WatchSet, state *state.Store) error {
|
||||||
index, keys, err := state.KVSListKeys(ws, args.Prefix, args.Seperator)
|
index, keys, err := state.KVSListKeys(ws, args.Prefix, args.Seperator)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
|
|
|
@ -221,7 +221,7 @@ func (p *PreparedQuery) Get(args *structs.PreparedQuerySpecificRequest,
|
||||||
return p.srv.blockingQuery(
|
return p.srv.blockingQuery(
|
||||||
&args.QueryOptions,
|
&args.QueryOptions,
|
||||||
&reply.QueryMeta,
|
&reply.QueryMeta,
|
||||||
func(ws memdb.WatchSet, state *state.StateStore) error {
|
func(ws memdb.WatchSet, state *state.Store) error {
|
||||||
index, query, err := state.PreparedQueryGet(ws, args.QueryID)
|
index, query, err := state.PreparedQueryGet(ws, args.QueryID)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
|
@ -265,7 +265,7 @@ func (p *PreparedQuery) List(args *structs.DCSpecificRequest, reply *structs.Ind
|
||||||
return p.srv.blockingQuery(
|
return p.srv.blockingQuery(
|
||||||
&args.QueryOptions,
|
&args.QueryOptions,
|
||||||
&reply.QueryMeta,
|
&reply.QueryMeta,
|
||||||
func(ws memdb.WatchSet, state *state.StateStore) error {
|
func(ws memdb.WatchSet, state *state.Store) error {
|
||||||
index, queries, err := state.PreparedQueryList(ws)
|
index, queries, err := state.PreparedQueryList(ws)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
|
|
|
@ -344,7 +344,7 @@ func (s *Server) raftApply(t structs.MessageType, msg interface{}) (interface{},
|
||||||
// store should be used (vs. calling fsm.State()) since the given state store
|
// store should be used (vs. calling fsm.State()) since the given state store
|
||||||
// will be correctly watched for changes if the state store is restored from
|
// will be correctly watched for changes if the state store is restored from
|
||||||
// a snapshot.
|
// a snapshot.
|
||||||
type queryFn func(memdb.WatchSet, *state.StateStore) error
|
type queryFn func(memdb.WatchSet, *state.Store) error
|
||||||
|
|
||||||
// blockingQuery is used to process a potentially blocking query operation.
|
// blockingQuery is used to process a potentially blocking query operation.
|
||||||
func (s *Server) blockingQuery(queryOpts *structs.QueryOptions, queryMeta *structs.QueryMeta,
|
func (s *Server) blockingQuery(queryOpts *structs.QueryOptions, queryMeta *structs.QueryMeta,
|
||||||
|
|
|
@ -84,7 +84,7 @@ func TestRPC_blockingQuery(t *testing.T) {
|
||||||
var opts structs.QueryOptions
|
var opts structs.QueryOptions
|
||||||
var meta structs.QueryMeta
|
var meta structs.QueryMeta
|
||||||
var calls int
|
var calls int
|
||||||
fn := func(ws memdb.WatchSet, state *state.StateStore) error {
|
fn := func(ws memdb.WatchSet, state *state.Store) error {
|
||||||
calls++
|
calls++
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
@ -103,7 +103,7 @@ func TestRPC_blockingQuery(t *testing.T) {
|
||||||
}
|
}
|
||||||
var meta structs.QueryMeta
|
var meta structs.QueryMeta
|
||||||
var calls int
|
var calls int
|
||||||
fn := func(ws memdb.WatchSet, state *state.StateStore) error {
|
fn := func(ws memdb.WatchSet, state *state.Store) error {
|
||||||
if calls == 0 {
|
if calls == 0 {
|
||||||
meta.Index = 3
|
meta.Index = 3
|
||||||
|
|
||||||
|
@ -132,7 +132,7 @@ func TestRPC_blockingQuery(t *testing.T) {
|
||||||
}
|
}
|
||||||
var meta structs.QueryMeta
|
var meta structs.QueryMeta
|
||||||
var calls int
|
var calls int
|
||||||
fn := func(ws memdb.WatchSet, state *state.StateStore) error {
|
fn := func(ws memdb.WatchSet, state *state.Store) error {
|
||||||
if calls == 0 {
|
if calls == 0 {
|
||||||
meta.Index = 3
|
meta.Index = 3
|
||||||
|
|
||||||
|
|
|
@ -147,7 +147,7 @@ func (s *Session) Get(args *structs.SessionSpecificRequest,
|
||||||
return s.srv.blockingQuery(
|
return s.srv.blockingQuery(
|
||||||
&args.QueryOptions,
|
&args.QueryOptions,
|
||||||
&reply.QueryMeta,
|
&reply.QueryMeta,
|
||||||
func(ws memdb.WatchSet, state *state.StateStore) error {
|
func(ws memdb.WatchSet, state *state.Store) error {
|
||||||
index, session, err := state.SessionGet(ws, args.Session)
|
index, session, err := state.SessionGet(ws, args.Session)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
|
@ -176,7 +176,7 @@ func (s *Session) List(args *structs.DCSpecificRequest,
|
||||||
return s.srv.blockingQuery(
|
return s.srv.blockingQuery(
|
||||||
&args.QueryOptions,
|
&args.QueryOptions,
|
||||||
&reply.QueryMeta,
|
&reply.QueryMeta,
|
||||||
func(ws memdb.WatchSet, state *state.StateStore) error {
|
func(ws memdb.WatchSet, state *state.Store) error {
|
||||||
index, sessions, err := state.SessionList(ws)
|
index, sessions, err := state.SessionList(ws)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
|
@ -200,7 +200,7 @@ func (s *Session) NodeSessions(args *structs.NodeSpecificRequest,
|
||||||
return s.srv.blockingQuery(
|
return s.srv.blockingQuery(
|
||||||
&args.QueryOptions,
|
&args.QueryOptions,
|
||||||
&reply.QueryMeta,
|
&reply.QueryMeta,
|
||||||
func(ws memdb.WatchSet, state *state.StateStore) error {
|
func(ws memdb.WatchSet, state *state.Store) error {
|
||||||
index, sessions, err := state.NodeSessions(ws, args.Node)
|
index, sessions, err := state.NodeSessions(ws, args.Node)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
|
|
|
@ -8,7 +8,7 @@ import (
|
||||||
)
|
)
|
||||||
|
|
||||||
// ACLs is used to pull all the ACLs from the snapshot.
|
// ACLs is used to pull all the ACLs from the snapshot.
|
||||||
func (s *StateSnapshot) ACLs() (memdb.ResultIterator, error) {
|
func (s *Snapshot) ACLs() (memdb.ResultIterator, error) {
|
||||||
iter, err := s.tx.Get("acls", "id")
|
iter, err := s.tx.Get("acls", "id")
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
|
@ -17,7 +17,7 @@ func (s *StateSnapshot) ACLs() (memdb.ResultIterator, error) {
|
||||||
}
|
}
|
||||||
|
|
||||||
// ACL is used when restoring from a snapshot. For general inserts, use ACLSet.
|
// ACL is used when restoring from a snapshot. For general inserts, use ACLSet.
|
||||||
func (s *StateRestore) ACL(acl *structs.ACL) error {
|
func (s *Restore) ACL(acl *structs.ACL) error {
|
||||||
if err := s.tx.Insert("acls", acl); err != nil {
|
if err := s.tx.Insert("acls", acl); err != nil {
|
||||||
return fmt.Errorf("failed restoring acl: %s", err)
|
return fmt.Errorf("failed restoring acl: %s", err)
|
||||||
}
|
}
|
||||||
|
@ -30,7 +30,7 @@ func (s *StateRestore) ACL(acl *structs.ACL) error {
|
||||||
}
|
}
|
||||||
|
|
||||||
// ACLSet is used to insert an ACL rule into the state store.
|
// ACLSet is used to insert an ACL rule into the state store.
|
||||||
func (s *StateStore) ACLSet(idx uint64, acl *structs.ACL) error {
|
func (s *Store) ACLSet(idx uint64, acl *structs.ACL) error {
|
||||||
tx := s.db.Txn(true)
|
tx := s.db.Txn(true)
|
||||||
defer tx.Abort()
|
defer tx.Abort()
|
||||||
|
|
||||||
|
@ -45,7 +45,7 @@ func (s *StateStore) ACLSet(idx uint64, acl *structs.ACL) error {
|
||||||
|
|
||||||
// aclSetTxn is the inner method used to insert an ACL rule with the
|
// aclSetTxn is the inner method used to insert an ACL rule with the
|
||||||
// proper indexes into the state store.
|
// proper indexes into the state store.
|
||||||
func (s *StateStore) aclSetTxn(tx *memdb.Txn, idx uint64, acl *structs.ACL) error {
|
func (s *Store) aclSetTxn(tx *memdb.Txn, idx uint64, acl *structs.ACL) error {
|
||||||
// Check that the ID is set
|
// Check that the ID is set
|
||||||
if acl.ID == "" {
|
if acl.ID == "" {
|
||||||
return ErrMissingACLID
|
return ErrMissingACLID
|
||||||
|
@ -78,7 +78,7 @@ func (s *StateStore) aclSetTxn(tx *memdb.Txn, idx uint64, acl *structs.ACL) erro
|
||||||
}
|
}
|
||||||
|
|
||||||
// ACLGet is used to look up an existing ACL by ID.
|
// ACLGet is used to look up an existing ACL by ID.
|
||||||
func (s *StateStore) ACLGet(ws memdb.WatchSet, aclID string) (uint64, *structs.ACL, error) {
|
func (s *Store) ACLGet(ws memdb.WatchSet, aclID string) (uint64, *structs.ACL, error) {
|
||||||
tx := s.db.Txn(false)
|
tx := s.db.Txn(false)
|
||||||
defer tx.Abort()
|
defer tx.Abort()
|
||||||
|
|
||||||
|
@ -99,7 +99,7 @@ func (s *StateStore) ACLGet(ws memdb.WatchSet, aclID string) (uint64, *structs.A
|
||||||
}
|
}
|
||||||
|
|
||||||
// ACLList is used to list out all of the ACLs in the state store.
|
// ACLList is used to list out all of the ACLs in the state store.
|
||||||
func (s *StateStore) ACLList(ws memdb.WatchSet) (uint64, structs.ACLs, error) {
|
func (s *Store) ACLList(ws memdb.WatchSet) (uint64, structs.ACLs, error) {
|
||||||
tx := s.db.Txn(false)
|
tx := s.db.Txn(false)
|
||||||
defer tx.Abort()
|
defer tx.Abort()
|
||||||
|
|
||||||
|
@ -116,7 +116,7 @@ func (s *StateStore) ACLList(ws memdb.WatchSet) (uint64, structs.ACLs, error) {
|
||||||
|
|
||||||
// aclListTxn is used to list out all of the ACLs in the state store. This is a
|
// aclListTxn is used to list out all of the ACLs in the state store. This is a
|
||||||
// function vs. a method so it can be called from the snapshotter.
|
// function vs. a method so it can be called from the snapshotter.
|
||||||
func (s *StateStore) aclListTxn(tx *memdb.Txn, ws memdb.WatchSet) (structs.ACLs, error) {
|
func (s *Store) aclListTxn(tx *memdb.Txn, ws memdb.WatchSet) (structs.ACLs, error) {
|
||||||
// Query all of the ACLs in the state store
|
// Query all of the ACLs in the state store
|
||||||
iter, err := tx.Get("acls", "id")
|
iter, err := tx.Get("acls", "id")
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
@ -135,7 +135,7 @@ func (s *StateStore) aclListTxn(tx *memdb.Txn, ws memdb.WatchSet) (structs.ACLs,
|
||||||
|
|
||||||
// ACLDelete is used to remove an existing ACL from the state store. If
|
// ACLDelete is used to remove an existing ACL from the state store. If
|
||||||
// the ACL does not exist this is a no-op and no error is returned.
|
// the ACL does not exist this is a no-op and no error is returned.
|
||||||
func (s *StateStore) ACLDelete(idx uint64, aclID string) error {
|
func (s *Store) ACLDelete(idx uint64, aclID string) error {
|
||||||
tx := s.db.Txn(true)
|
tx := s.db.Txn(true)
|
||||||
defer tx.Abort()
|
defer tx.Abort()
|
||||||
|
|
||||||
|
@ -150,7 +150,7 @@ func (s *StateStore) ACLDelete(idx uint64, aclID string) error {
|
||||||
|
|
||||||
// aclDeleteTxn is used to delete an ACL from the state store within
|
// aclDeleteTxn is used to delete an ACL from the state store within
|
||||||
// an existing transaction.
|
// an existing transaction.
|
||||||
func (s *StateStore) aclDeleteTxn(tx *memdb.Txn, idx uint64, aclID string) error {
|
func (s *Store) aclDeleteTxn(tx *memdb.Txn, idx uint64, aclID string) error {
|
||||||
// Look up the existing ACL
|
// Look up the existing ACL
|
||||||
acl, err := tx.First("acls", "id", aclID)
|
acl, err := tx.First("acls", "id", aclID)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
|
|
@ -8,7 +8,7 @@ import (
|
||||||
)
|
)
|
||||||
|
|
||||||
// Autopilot is used to pull the autopilot config from the snapshot.
|
// Autopilot is used to pull the autopilot config from the snapshot.
|
||||||
func (s *StateSnapshot) Autopilot() (*structs.AutopilotConfig, error) {
|
func (s *Snapshot) Autopilot() (*structs.AutopilotConfig, error) {
|
||||||
c, err := s.tx.First("autopilot-config", "id")
|
c, err := s.tx.First("autopilot-config", "id")
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
|
@ -23,7 +23,7 @@ func (s *StateSnapshot) Autopilot() (*structs.AutopilotConfig, error) {
|
||||||
}
|
}
|
||||||
|
|
||||||
// Autopilot is used when restoring from a snapshot.
|
// Autopilot is used when restoring from a snapshot.
|
||||||
func (s *StateRestore) Autopilot(config *structs.AutopilotConfig) error {
|
func (s *Restore) Autopilot(config *structs.AutopilotConfig) error {
|
||||||
if err := s.tx.Insert("autopilot-config", config); err != nil {
|
if err := s.tx.Insert("autopilot-config", config); err != nil {
|
||||||
return fmt.Errorf("failed restoring autopilot config: %s", err)
|
return fmt.Errorf("failed restoring autopilot config: %s", err)
|
||||||
}
|
}
|
||||||
|
@ -32,7 +32,7 @@ func (s *StateRestore) Autopilot(config *structs.AutopilotConfig) error {
|
||||||
}
|
}
|
||||||
|
|
||||||
// AutopilotConfig is used to get the current Autopilot configuration.
|
// AutopilotConfig is used to get the current Autopilot configuration.
|
||||||
func (s *StateStore) AutopilotConfig() (uint64, *structs.AutopilotConfig, error) {
|
func (s *Store) AutopilotConfig() (uint64, *structs.AutopilotConfig, error) {
|
||||||
tx := s.db.Txn(false)
|
tx := s.db.Txn(false)
|
||||||
defer tx.Abort()
|
defer tx.Abort()
|
||||||
|
|
||||||
|
@ -51,7 +51,7 @@ func (s *StateStore) AutopilotConfig() (uint64, *structs.AutopilotConfig, error)
|
||||||
}
|
}
|
||||||
|
|
||||||
// AutopilotSetConfig is used to set the current Autopilot configuration.
|
// AutopilotSetConfig is used to set the current Autopilot configuration.
|
||||||
func (s *StateStore) AutopilotSetConfig(idx uint64, config *structs.AutopilotConfig) error {
|
func (s *Store) AutopilotSetConfig(idx uint64, config *structs.AutopilotConfig) error {
|
||||||
tx := s.db.Txn(true)
|
tx := s.db.Txn(true)
|
||||||
defer tx.Abort()
|
defer tx.Abort()
|
||||||
|
|
||||||
|
@ -64,7 +64,7 @@ func (s *StateStore) AutopilotSetConfig(idx uint64, config *structs.AutopilotCon
|
||||||
// AutopilotCASConfig is used to try updating the Autopilot configuration with a
|
// AutopilotCASConfig is used to try updating the Autopilot configuration with a
|
||||||
// given Raft index. If the CAS index specified is not equal to the last observed index
|
// given Raft index. If the CAS index specified is not equal to the last observed index
|
||||||
// for the config, then the call is a noop,
|
// for the config, then the call is a noop,
|
||||||
func (s *StateStore) AutopilotCASConfig(idx, cidx uint64, config *structs.AutopilotConfig) (bool, error) {
|
func (s *Store) AutopilotCASConfig(idx, cidx uint64, config *structs.AutopilotConfig) (bool, error) {
|
||||||
tx := s.db.Txn(true)
|
tx := s.db.Txn(true)
|
||||||
defer tx.Abort()
|
defer tx.Abort()
|
||||||
|
|
||||||
|
@ -88,7 +88,7 @@ func (s *StateStore) AutopilotCASConfig(idx, cidx uint64, config *structs.Autopi
|
||||||
return true, nil
|
return true, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s *StateStore) autopilotSetConfigTxn(idx uint64, tx *memdb.Txn, config *structs.AutopilotConfig) error {
|
func (s *Store) autopilotSetConfigTxn(idx uint64, tx *memdb.Txn, config *structs.AutopilotConfig) error {
|
||||||
// Check for an existing config
|
// Check for an existing config
|
||||||
existing, err := tx.First("autopilot-config", "id")
|
existing, err := tx.First("autopilot-config", "id")
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
|
|
@ -28,7 +28,7 @@ func resizeNodeLookupKey(s string) string {
|
||||||
}
|
}
|
||||||
|
|
||||||
// Nodes is used to pull the full list of nodes for use during snapshots.
|
// Nodes is used to pull the full list of nodes for use during snapshots.
|
||||||
func (s *StateSnapshot) Nodes() (memdb.ResultIterator, error) {
|
func (s *Snapshot) Nodes() (memdb.ResultIterator, error) {
|
||||||
iter, err := s.tx.Get("nodes", "id")
|
iter, err := s.tx.Get("nodes", "id")
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
|
@ -38,7 +38,7 @@ func (s *StateSnapshot) Nodes() (memdb.ResultIterator, error) {
|
||||||
|
|
||||||
// Services is used to pull the full list of services for a given node for use
|
// Services is used to pull the full list of services for a given node for use
|
||||||
// during snapshots.
|
// during snapshots.
|
||||||
func (s *StateSnapshot) Services(node string) (memdb.ResultIterator, error) {
|
func (s *Snapshot) Services(node string) (memdb.ResultIterator, error) {
|
||||||
iter, err := s.tx.Get("services", "node", node)
|
iter, err := s.tx.Get("services", "node", node)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
|
@ -48,7 +48,7 @@ func (s *StateSnapshot) Services(node string) (memdb.ResultIterator, error) {
|
||||||
|
|
||||||
// Checks is used to pull the full list of checks for a given node for use
|
// Checks is used to pull the full list of checks for a given node for use
|
||||||
// during snapshots.
|
// during snapshots.
|
||||||
func (s *StateSnapshot) Checks(node string) (memdb.ResultIterator, error) {
|
func (s *Snapshot) Checks(node string) (memdb.ResultIterator, error) {
|
||||||
iter, err := s.tx.Get("checks", "node", node)
|
iter, err := s.tx.Get("checks", "node", node)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
|
@ -59,7 +59,7 @@ func (s *StateSnapshot) Checks(node string) (memdb.ResultIterator, error) {
|
||||||
// Registration is used to make sure a node, service, and check registration is
|
// Registration is used to make sure a node, service, and check registration is
|
||||||
// performed within a single transaction to avoid race conditions on state
|
// performed within a single transaction to avoid race conditions on state
|
||||||
// updates.
|
// updates.
|
||||||
func (s *StateRestore) Registration(idx uint64, req *structs.RegisterRequest) error {
|
func (s *Restore) Registration(idx uint64, req *structs.RegisterRequest) error {
|
||||||
if err := s.store.ensureRegistrationTxn(s.tx, idx, req); err != nil {
|
if err := s.store.ensureRegistrationTxn(s.tx, idx, req); err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
@ -69,7 +69,7 @@ func (s *StateRestore) Registration(idx uint64, req *structs.RegisterRequest) er
|
||||||
// EnsureRegistration is used to make sure a node, service, and check
|
// EnsureRegistration is used to make sure a node, service, and check
|
||||||
// registration is performed within a single transaction to avoid race
|
// registration is performed within a single transaction to avoid race
|
||||||
// conditions on state updates.
|
// conditions on state updates.
|
||||||
func (s *StateStore) EnsureRegistration(idx uint64, req *structs.RegisterRequest) error {
|
func (s *Store) EnsureRegistration(idx uint64, req *structs.RegisterRequest) error {
|
||||||
tx := s.db.Txn(true)
|
tx := s.db.Txn(true)
|
||||||
defer tx.Abort()
|
defer tx.Abort()
|
||||||
|
|
||||||
|
@ -84,7 +84,7 @@ func (s *StateStore) EnsureRegistration(idx uint64, req *structs.RegisterRequest
|
||||||
// ensureRegistrationTxn is used to make sure a node, service, and check
|
// ensureRegistrationTxn is used to make sure a node, service, and check
|
||||||
// registration is performed within a single transaction to avoid race
|
// registration is performed within a single transaction to avoid race
|
||||||
// conditions on state updates.
|
// conditions on state updates.
|
||||||
func (s *StateStore) ensureRegistrationTxn(tx *memdb.Txn, idx uint64, req *structs.RegisterRequest) error {
|
func (s *Store) ensureRegistrationTxn(tx *memdb.Txn, idx uint64, req *structs.RegisterRequest) error {
|
||||||
// Create a node structure.
|
// Create a node structure.
|
||||||
node := &structs.Node{
|
node := &structs.Node{
|
||||||
ID: req.ID,
|
ID: req.ID,
|
||||||
|
@ -152,7 +152,7 @@ func (s *StateStore) ensureRegistrationTxn(tx *memdb.Txn, idx uint64, req *struc
|
||||||
}
|
}
|
||||||
|
|
||||||
// EnsureNode is used to upsert node registration or modification.
|
// EnsureNode is used to upsert node registration or modification.
|
||||||
func (s *StateStore) EnsureNode(idx uint64, node *structs.Node) error {
|
func (s *Store) EnsureNode(idx uint64, node *structs.Node) error {
|
||||||
tx := s.db.Txn(true)
|
tx := s.db.Txn(true)
|
||||||
defer tx.Abort()
|
defer tx.Abort()
|
||||||
|
|
||||||
|
@ -168,7 +168,7 @@ func (s *StateStore) EnsureNode(idx uint64, node *structs.Node) error {
|
||||||
// ensureNodeTxn is the inner function called to actually create a node
|
// ensureNodeTxn is the inner function called to actually create a node
|
||||||
// registration or modify an existing one in the state store. It allows
|
// registration or modify an existing one in the state store. It allows
|
||||||
// passing in a memdb transaction so it may be part of a larger txn.
|
// passing in a memdb transaction so it may be part of a larger txn.
|
||||||
func (s *StateStore) ensureNodeTxn(tx *memdb.Txn, idx uint64, node *structs.Node) error {
|
func (s *Store) ensureNodeTxn(tx *memdb.Txn, idx uint64, node *structs.Node) error {
|
||||||
// See if there's an existing node with this UUID, and make sure the
|
// See if there's an existing node with this UUID, and make sure the
|
||||||
// name is the same.
|
// name is the same.
|
||||||
var n *structs.Node
|
var n *structs.Node
|
||||||
|
@ -218,7 +218,7 @@ func (s *StateStore) ensureNodeTxn(tx *memdb.Txn, idx uint64, node *structs.Node
|
||||||
}
|
}
|
||||||
|
|
||||||
// GetNode is used to retrieve a node registration by node name ID.
|
// GetNode is used to retrieve a node registration by node name ID.
|
||||||
func (s *StateStore) GetNode(id string) (uint64, *structs.Node, error) {
|
func (s *Store) GetNode(id string) (uint64, *structs.Node, error) {
|
||||||
tx := s.db.Txn(false)
|
tx := s.db.Txn(false)
|
||||||
defer tx.Abort()
|
defer tx.Abort()
|
||||||
|
|
||||||
|
@ -237,7 +237,7 @@ func (s *StateStore) GetNode(id string) (uint64, *structs.Node, error) {
|
||||||
}
|
}
|
||||||
|
|
||||||
// GetNodeID is used to retrieve a node registration by node ID.
|
// GetNodeID is used to retrieve a node registration by node ID.
|
||||||
func (s *StateStore) GetNodeID(id types.NodeID) (uint64, *structs.Node, error) {
|
func (s *Store) GetNodeID(id types.NodeID) (uint64, *structs.Node, error) {
|
||||||
tx := s.db.Txn(false)
|
tx := s.db.Txn(false)
|
||||||
defer tx.Abort()
|
defer tx.Abort()
|
||||||
|
|
||||||
|
@ -256,7 +256,7 @@ func (s *StateStore) GetNodeID(id types.NodeID) (uint64, *structs.Node, error) {
|
||||||
}
|
}
|
||||||
|
|
||||||
// Nodes is used to return all of the known nodes.
|
// Nodes is used to return all of the known nodes.
|
||||||
func (s *StateStore) Nodes(ws memdb.WatchSet) (uint64, structs.Nodes, error) {
|
func (s *Store) Nodes(ws memdb.WatchSet) (uint64, structs.Nodes, error) {
|
||||||
tx := s.db.Txn(false)
|
tx := s.db.Txn(false)
|
||||||
defer tx.Abort()
|
defer tx.Abort()
|
||||||
|
|
||||||
|
@ -279,7 +279,7 @@ func (s *StateStore) Nodes(ws memdb.WatchSet) (uint64, structs.Nodes, error) {
|
||||||
}
|
}
|
||||||
|
|
||||||
// NodesByMeta is used to return all nodes with the given metadata key/value pairs.
|
// NodesByMeta is used to return all nodes with the given metadata key/value pairs.
|
||||||
func (s *StateStore) NodesByMeta(ws memdb.WatchSet, filters map[string]string) (uint64, structs.Nodes, error) {
|
func (s *Store) NodesByMeta(ws memdb.WatchSet, filters map[string]string) (uint64, structs.Nodes, error) {
|
||||||
tx := s.db.Txn(false)
|
tx := s.db.Txn(false)
|
||||||
defer tx.Abort()
|
defer tx.Abort()
|
||||||
|
|
||||||
|
@ -310,7 +310,7 @@ func (s *StateStore) NodesByMeta(ws memdb.WatchSet, filters map[string]string) (
|
||||||
}
|
}
|
||||||
|
|
||||||
// DeleteNode is used to delete a given node by its ID.
|
// DeleteNode is used to delete a given node by its ID.
|
||||||
func (s *StateStore) DeleteNode(idx uint64, nodeName string) error {
|
func (s *Store) DeleteNode(idx uint64, nodeName string) error {
|
||||||
tx := s.db.Txn(true)
|
tx := s.db.Txn(true)
|
||||||
defer tx.Abort()
|
defer tx.Abort()
|
||||||
|
|
||||||
|
@ -325,7 +325,7 @@ func (s *StateStore) DeleteNode(idx uint64, nodeName string) error {
|
||||||
|
|
||||||
// deleteNodeTxn is the inner method used for removing a node from
|
// deleteNodeTxn is the inner method used for removing a node from
|
||||||
// the store within a given transaction.
|
// the store within a given transaction.
|
||||||
func (s *StateStore) deleteNodeTxn(tx *memdb.Txn, idx uint64, nodeName string) error {
|
func (s *Store) deleteNodeTxn(tx *memdb.Txn, idx uint64, nodeName string) error {
|
||||||
// Look up the node.
|
// Look up the node.
|
||||||
node, err := tx.First("nodes", "id", nodeName)
|
node, err := tx.First("nodes", "id", nodeName)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
@ -413,7 +413,7 @@ func (s *StateStore) deleteNodeTxn(tx *memdb.Txn, idx uint64, nodeName string) e
|
||||||
}
|
}
|
||||||
|
|
||||||
// EnsureService is called to upsert creation of a given NodeService.
|
// EnsureService is called to upsert creation of a given NodeService.
|
||||||
func (s *StateStore) EnsureService(idx uint64, node string, svc *structs.NodeService) error {
|
func (s *Store) EnsureService(idx uint64, node string, svc *structs.NodeService) error {
|
||||||
tx := s.db.Txn(true)
|
tx := s.db.Txn(true)
|
||||||
defer tx.Abort()
|
defer tx.Abort()
|
||||||
|
|
||||||
|
@ -428,7 +428,7 @@ func (s *StateStore) EnsureService(idx uint64, node string, svc *structs.NodeSer
|
||||||
|
|
||||||
// ensureServiceTxn is used to upsert a service registration within an
|
// ensureServiceTxn is used to upsert a service registration within an
|
||||||
// existing memdb transaction.
|
// existing memdb transaction.
|
||||||
func (s *StateStore) ensureServiceTxn(tx *memdb.Txn, idx uint64, node string, svc *structs.NodeService) error {
|
func (s *Store) ensureServiceTxn(tx *memdb.Txn, idx uint64, node string, svc *structs.NodeService) error {
|
||||||
// Check for existing service
|
// Check for existing service
|
||||||
existing, err := tx.First("services", "id", node, svc.ID)
|
existing, err := tx.First("services", "id", node, svc.ID)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
@ -468,7 +468,7 @@ func (s *StateStore) ensureServiceTxn(tx *memdb.Txn, idx uint64, node string, sv
|
||||||
}
|
}
|
||||||
|
|
||||||
// Services returns all services along with a list of associated tags.
|
// Services returns all services along with a list of associated tags.
|
||||||
func (s *StateStore) Services(ws memdb.WatchSet) (uint64, structs.Services, error) {
|
func (s *Store) Services(ws memdb.WatchSet) (uint64, structs.Services, error) {
|
||||||
tx := s.db.Txn(false)
|
tx := s.db.Txn(false)
|
||||||
defer tx.Abort()
|
defer tx.Abort()
|
||||||
|
|
||||||
|
@ -509,7 +509,7 @@ func (s *StateStore) Services(ws memdb.WatchSet) (uint64, structs.Services, erro
|
||||||
}
|
}
|
||||||
|
|
||||||
// ServicesByNodeMeta returns all services, filtered by the given node metadata.
|
// ServicesByNodeMeta returns all services, filtered by the given node metadata.
|
||||||
func (s *StateStore) ServicesByNodeMeta(ws memdb.WatchSet, filters map[string]string) (uint64, structs.Services, error) {
|
func (s *Store) ServicesByNodeMeta(ws memdb.WatchSet, filters map[string]string) (uint64, structs.Services, error) {
|
||||||
tx := s.db.Txn(false)
|
tx := s.db.Txn(false)
|
||||||
defer tx.Abort()
|
defer tx.Abort()
|
||||||
|
|
||||||
|
@ -578,7 +578,7 @@ func (s *StateStore) ServicesByNodeMeta(ws memdb.WatchSet, filters map[string]st
|
||||||
}
|
}
|
||||||
|
|
||||||
// ServiceNodes returns the nodes associated with a given service name.
|
// ServiceNodes returns the nodes associated with a given service name.
|
||||||
func (s *StateStore) ServiceNodes(ws memdb.WatchSet, serviceName string) (uint64, structs.ServiceNodes, error) {
|
func (s *Store) ServiceNodes(ws memdb.WatchSet, serviceName string) (uint64, structs.ServiceNodes, error) {
|
||||||
tx := s.db.Txn(false)
|
tx := s.db.Txn(false)
|
||||||
defer tx.Abort()
|
defer tx.Abort()
|
||||||
|
|
||||||
|
@ -607,7 +607,7 @@ func (s *StateStore) ServiceNodes(ws memdb.WatchSet, serviceName string) (uint64
|
||||||
|
|
||||||
// ServiceTagNodes returns the nodes associated with a given service, filtering
|
// ServiceTagNodes returns the nodes associated with a given service, filtering
|
||||||
// out services that don't contain the given tag.
|
// out services that don't contain the given tag.
|
||||||
func (s *StateStore) ServiceTagNodes(ws memdb.WatchSet, service string, tag string) (uint64, structs.ServiceNodes, error) {
|
func (s *Store) ServiceTagNodes(ws memdb.WatchSet, service string, tag string) (uint64, structs.ServiceNodes, error) {
|
||||||
tx := s.db.Txn(false)
|
tx := s.db.Txn(false)
|
||||||
defer tx.Abort()
|
defer tx.Abort()
|
||||||
|
|
||||||
|
@ -656,7 +656,7 @@ func serviceTagFilter(sn *structs.ServiceNode, tag string) bool {
|
||||||
|
|
||||||
// parseServiceNodes iterates over a services query and fills in the node details,
|
// parseServiceNodes iterates over a services query and fills in the node details,
|
||||||
// returning a ServiceNodes slice.
|
// returning a ServiceNodes slice.
|
||||||
func (s *StateStore) parseServiceNodes(tx *memdb.Txn, ws memdb.WatchSet, services structs.ServiceNodes) (structs.ServiceNodes, error) {
|
func (s *Store) parseServiceNodes(tx *memdb.Txn, ws memdb.WatchSet, services structs.ServiceNodes) (structs.ServiceNodes, error) {
|
||||||
// We don't want to track an unlimited number of nodes, so we pull a
|
// We don't want to track an unlimited number of nodes, so we pull a
|
||||||
// top-level watch to use as a fallback.
|
// top-level watch to use as a fallback.
|
||||||
allNodes, err := tx.Get("nodes", "id")
|
allNodes, err := tx.Get("nodes", "id")
|
||||||
|
@ -697,7 +697,7 @@ func (s *StateStore) parseServiceNodes(tx *memdb.Txn, ws memdb.WatchSet, service
|
||||||
|
|
||||||
// NodeService is used to retrieve a specific service associated with the given
|
// NodeService is used to retrieve a specific service associated with the given
|
||||||
// node.
|
// node.
|
||||||
func (s *StateStore) NodeService(nodeName string, serviceID string) (uint64, *structs.NodeService, error) {
|
func (s *Store) NodeService(nodeName string, serviceID string) (uint64, *structs.NodeService, error) {
|
||||||
tx := s.db.Txn(false)
|
tx := s.db.Txn(false)
|
||||||
defer tx.Abort()
|
defer tx.Abort()
|
||||||
|
|
||||||
|
@ -718,7 +718,7 @@ func (s *StateStore) NodeService(nodeName string, serviceID string) (uint64, *st
|
||||||
}
|
}
|
||||||
|
|
||||||
// NodeServices is used to query service registrations by node name or UUID.
|
// NodeServices is used to query service registrations by node name or UUID.
|
||||||
func (s *StateStore) NodeServices(ws memdb.WatchSet, nodeNameOrID string) (uint64, *structs.NodeServices, error) {
|
func (s *Store) NodeServices(ws memdb.WatchSet, nodeNameOrID string) (uint64, *structs.NodeServices, error) {
|
||||||
tx := s.db.Txn(false)
|
tx := s.db.Txn(false)
|
||||||
defer tx.Abort()
|
defer tx.Abort()
|
||||||
|
|
||||||
|
@ -792,7 +792,7 @@ func (s *StateStore) NodeServices(ws memdb.WatchSet, nodeNameOrID string) (uint6
|
||||||
}
|
}
|
||||||
|
|
||||||
// DeleteService is used to delete a given service associated with a node.
|
// DeleteService is used to delete a given service associated with a node.
|
||||||
func (s *StateStore) DeleteService(idx uint64, nodeName, serviceID string) error {
|
func (s *Store) DeleteService(idx uint64, nodeName, serviceID string) error {
|
||||||
tx := s.db.Txn(true)
|
tx := s.db.Txn(true)
|
||||||
defer tx.Abort()
|
defer tx.Abort()
|
||||||
|
|
||||||
|
@ -807,7 +807,7 @@ func (s *StateStore) DeleteService(idx uint64, nodeName, serviceID string) error
|
||||||
|
|
||||||
// deleteServiceTxn is the inner method called to remove a service
|
// deleteServiceTxn is the inner method called to remove a service
|
||||||
// registration within an existing transaction.
|
// registration within an existing transaction.
|
||||||
func (s *StateStore) deleteServiceTxn(tx *memdb.Txn, idx uint64, nodeName, serviceID string) error {
|
func (s *Store) deleteServiceTxn(tx *memdb.Txn, idx uint64, nodeName, serviceID string) error {
|
||||||
// Look up the service.
|
// Look up the service.
|
||||||
service, err := tx.First("services", "id", nodeName, serviceID)
|
service, err := tx.First("services", "id", nodeName, serviceID)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
@ -852,7 +852,7 @@ func (s *StateStore) deleteServiceTxn(tx *memdb.Txn, idx uint64, nodeName, servi
|
||||||
}
|
}
|
||||||
|
|
||||||
// EnsureCheck is used to store a check registration in the db.
|
// EnsureCheck is used to store a check registration in the db.
|
||||||
func (s *StateStore) EnsureCheck(idx uint64, hc *structs.HealthCheck) error {
|
func (s *Store) EnsureCheck(idx uint64, hc *structs.HealthCheck) error {
|
||||||
tx := s.db.Txn(true)
|
tx := s.db.Txn(true)
|
||||||
defer tx.Abort()
|
defer tx.Abort()
|
||||||
|
|
||||||
|
@ -868,7 +868,7 @@ func (s *StateStore) EnsureCheck(idx uint64, hc *structs.HealthCheck) error {
|
||||||
// ensureCheckTransaction is used as the inner method to handle inserting
|
// ensureCheckTransaction is used as the inner method to handle inserting
|
||||||
// a health check into the state store. It ensures safety against inserting
|
// a health check into the state store. It ensures safety against inserting
|
||||||
// checks with no matching node or service.
|
// checks with no matching node or service.
|
||||||
func (s *StateStore) ensureCheckTxn(tx *memdb.Txn, idx uint64, hc *structs.HealthCheck) error {
|
func (s *Store) ensureCheckTxn(tx *memdb.Txn, idx uint64, hc *structs.HealthCheck) error {
|
||||||
// Check if we have an existing health check
|
// Check if we have an existing health check
|
||||||
existing, err := tx.First("checks", "id", hc.Node, string(hc.CheckID))
|
existing, err := tx.First("checks", "id", hc.Node, string(hc.CheckID))
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
@ -947,7 +947,7 @@ func (s *StateStore) ensureCheckTxn(tx *memdb.Txn, idx uint64, hc *structs.Healt
|
||||||
|
|
||||||
// NodeCheck is used to retrieve a specific check associated with the given
|
// NodeCheck is used to retrieve a specific check associated with the given
|
||||||
// node.
|
// node.
|
||||||
func (s *StateStore) NodeCheck(nodeName string, checkID types.CheckID) (uint64, *structs.HealthCheck, error) {
|
func (s *Store) NodeCheck(nodeName string, checkID types.CheckID) (uint64, *structs.HealthCheck, error) {
|
||||||
tx := s.db.Txn(false)
|
tx := s.db.Txn(false)
|
||||||
defer tx.Abort()
|
defer tx.Abort()
|
||||||
|
|
||||||
|
@ -969,7 +969,7 @@ func (s *StateStore) NodeCheck(nodeName string, checkID types.CheckID) (uint64,
|
||||||
|
|
||||||
// NodeChecks is used to retrieve checks associated with the
|
// NodeChecks is used to retrieve checks associated with the
|
||||||
// given node from the state store.
|
// given node from the state store.
|
||||||
func (s *StateStore) NodeChecks(ws memdb.WatchSet, nodeName string) (uint64, structs.HealthChecks, error) {
|
func (s *Store) NodeChecks(ws memdb.WatchSet, nodeName string) (uint64, structs.HealthChecks, error) {
|
||||||
tx := s.db.Txn(false)
|
tx := s.db.Txn(false)
|
||||||
defer tx.Abort()
|
defer tx.Abort()
|
||||||
|
|
||||||
|
@ -993,7 +993,7 @@ func (s *StateStore) NodeChecks(ws memdb.WatchSet, nodeName string) (uint64, str
|
||||||
// ServiceChecks is used to get all checks associated with a
|
// ServiceChecks is used to get all checks associated with a
|
||||||
// given service ID. The query is performed against a service
|
// given service ID. The query is performed against a service
|
||||||
// _name_ instead of a service ID.
|
// _name_ instead of a service ID.
|
||||||
func (s *StateStore) ServiceChecks(ws memdb.WatchSet, serviceName string) (uint64, structs.HealthChecks, error) {
|
func (s *Store) ServiceChecks(ws memdb.WatchSet, serviceName string) (uint64, structs.HealthChecks, error) {
|
||||||
tx := s.db.Txn(false)
|
tx := s.db.Txn(false)
|
||||||
defer tx.Abort()
|
defer tx.Abort()
|
||||||
|
|
||||||
|
@ -1017,7 +1017,7 @@ func (s *StateStore) ServiceChecks(ws memdb.WatchSet, serviceName string) (uint6
|
||||||
// ServiceChecksByNodeMeta is used to get all checks associated with a
|
// ServiceChecksByNodeMeta is used to get all checks associated with a
|
||||||
// given service ID, filtered by the given node metadata values. The query
|
// given service ID, filtered by the given node metadata values. The query
|
||||||
// is performed against a service _name_ instead of a service ID.
|
// is performed against a service _name_ instead of a service ID.
|
||||||
func (s *StateStore) ServiceChecksByNodeMeta(ws memdb.WatchSet, serviceName string,
|
func (s *Store) ServiceChecksByNodeMeta(ws memdb.WatchSet, serviceName string,
|
||||||
filters map[string]string) (uint64, structs.HealthChecks, error) {
|
filters map[string]string) (uint64, structs.HealthChecks, error) {
|
||||||
|
|
||||||
tx := s.db.Txn(false)
|
tx := s.db.Txn(false)
|
||||||
|
@ -1038,7 +1038,7 @@ func (s *StateStore) ServiceChecksByNodeMeta(ws memdb.WatchSet, serviceName stri
|
||||||
|
|
||||||
// ChecksInState is used to query the state store for all checks
|
// ChecksInState is used to query the state store for all checks
|
||||||
// which are in the provided state.
|
// which are in the provided state.
|
||||||
func (s *StateStore) ChecksInState(ws memdb.WatchSet, state string) (uint64, structs.HealthChecks, error) {
|
func (s *Store) ChecksInState(ws memdb.WatchSet, state string) (uint64, structs.HealthChecks, error) {
|
||||||
tx := s.db.Txn(false)
|
tx := s.db.Txn(false)
|
||||||
defer tx.Abort()
|
defer tx.Abort()
|
||||||
|
|
||||||
|
@ -1070,7 +1070,7 @@ func (s *StateStore) ChecksInState(ws memdb.WatchSet, state string) (uint64, str
|
||||||
|
|
||||||
// ChecksInStateByNodeMeta is used to query the state store for all checks
|
// ChecksInStateByNodeMeta is used to query the state store for all checks
|
||||||
// which are in the provided state, filtered by the given node metadata values.
|
// which are in the provided state, filtered by the given node metadata values.
|
||||||
func (s *StateStore) ChecksInStateByNodeMeta(ws memdb.WatchSet, state string, filters map[string]string) (uint64, structs.HealthChecks, error) {
|
func (s *Store) ChecksInStateByNodeMeta(ws memdb.WatchSet, state string, filters map[string]string) (uint64, structs.HealthChecks, error) {
|
||||||
tx := s.db.Txn(false)
|
tx := s.db.Txn(false)
|
||||||
defer tx.Abort()
|
defer tx.Abort()
|
||||||
|
|
||||||
|
@ -1098,7 +1098,7 @@ func (s *StateStore) ChecksInStateByNodeMeta(ws memdb.WatchSet, state string, fi
|
||||||
|
|
||||||
// parseChecksByNodeMeta is a helper function used to deduplicate some
|
// parseChecksByNodeMeta is a helper function used to deduplicate some
|
||||||
// repetitive code for returning health checks filtered by node metadata fields.
|
// repetitive code for returning health checks filtered by node metadata fields.
|
||||||
func (s *StateStore) parseChecksByNodeMeta(tx *memdb.Txn, ws memdb.WatchSet,
|
func (s *Store) parseChecksByNodeMeta(tx *memdb.Txn, ws memdb.WatchSet,
|
||||||
idx uint64, iter memdb.ResultIterator, filters map[string]string) (uint64, structs.HealthChecks, error) {
|
idx uint64, iter memdb.ResultIterator, filters map[string]string) (uint64, structs.HealthChecks, error) {
|
||||||
|
|
||||||
// We don't want to track an unlimited number of nodes, so we pull a
|
// We don't want to track an unlimited number of nodes, so we pull a
|
||||||
|
@ -1132,7 +1132,7 @@ func (s *StateStore) parseChecksByNodeMeta(tx *memdb.Txn, ws memdb.WatchSet,
|
||||||
}
|
}
|
||||||
|
|
||||||
// DeleteCheck is used to delete a health check registration.
|
// DeleteCheck is used to delete a health check registration.
|
||||||
func (s *StateStore) DeleteCheck(idx uint64, node string, checkID types.CheckID) error {
|
func (s *Store) DeleteCheck(idx uint64, node string, checkID types.CheckID) error {
|
||||||
tx := s.db.Txn(true)
|
tx := s.db.Txn(true)
|
||||||
defer tx.Abort()
|
defer tx.Abort()
|
||||||
|
|
||||||
|
@ -1147,7 +1147,7 @@ func (s *StateStore) DeleteCheck(idx uint64, node string, checkID types.CheckID)
|
||||||
|
|
||||||
// deleteCheckTxn is the inner method used to call a health
|
// deleteCheckTxn is the inner method used to call a health
|
||||||
// check deletion within an existing transaction.
|
// check deletion within an existing transaction.
|
||||||
func (s *StateStore) deleteCheckTxn(tx *memdb.Txn, idx uint64, node string, checkID types.CheckID) error {
|
func (s *Store) deleteCheckTxn(tx *memdb.Txn, idx uint64, node string, checkID types.CheckID) error {
|
||||||
// Try to retrieve the existing health check.
|
// Try to retrieve the existing health check.
|
||||||
hc, err := tx.First("checks", "id", node, string(checkID))
|
hc, err := tx.First("checks", "id", node, string(checkID))
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
@ -1186,7 +1186,7 @@ func (s *StateStore) deleteCheckTxn(tx *memdb.Txn, idx uint64, node string, chec
|
||||||
}
|
}
|
||||||
|
|
||||||
// CheckServiceNodes is used to query all nodes and checks for a given service.
|
// CheckServiceNodes is used to query all nodes and checks for a given service.
|
||||||
func (s *StateStore) CheckServiceNodes(ws memdb.WatchSet, serviceName string) (uint64, structs.CheckServiceNodes, error) {
|
func (s *Store) CheckServiceNodes(ws memdb.WatchSet, serviceName string) (uint64, structs.CheckServiceNodes, error) {
|
||||||
tx := s.db.Txn(false)
|
tx := s.db.Txn(false)
|
||||||
defer tx.Abort()
|
defer tx.Abort()
|
||||||
|
|
||||||
|
@ -1210,7 +1210,7 @@ func (s *StateStore) CheckServiceNodes(ws memdb.WatchSet, serviceName string) (u
|
||||||
|
|
||||||
// CheckServiceTagNodes is used to query all nodes and checks for a given
|
// CheckServiceTagNodes is used to query all nodes and checks for a given
|
||||||
// service, filtering out services that don't contain the given tag.
|
// service, filtering out services that don't contain the given tag.
|
||||||
func (s *StateStore) CheckServiceTagNodes(ws memdb.WatchSet, serviceName, tag string) (uint64, structs.CheckServiceNodes, error) {
|
func (s *Store) CheckServiceTagNodes(ws memdb.WatchSet, serviceName, tag string) (uint64, structs.CheckServiceNodes, error) {
|
||||||
tx := s.db.Txn(false)
|
tx := s.db.Txn(false)
|
||||||
defer tx.Abort()
|
defer tx.Abort()
|
||||||
|
|
||||||
|
@ -1238,7 +1238,7 @@ func (s *StateStore) CheckServiceTagNodes(ws memdb.WatchSet, serviceName, tag st
|
||||||
// parseCheckServiceNodes is used to parse through a given set of services,
|
// parseCheckServiceNodes is used to parse through a given set of services,
|
||||||
// and query for an associated node and a set of checks. This is the inner
|
// and query for an associated node and a set of checks. This is the inner
|
||||||
// method used to return a rich set of results from a more simple query.
|
// method used to return a rich set of results from a more simple query.
|
||||||
func (s *StateStore) parseCheckServiceNodes(
|
func (s *Store) parseCheckServiceNodes(
|
||||||
tx *memdb.Txn, ws memdb.WatchSet, idx uint64,
|
tx *memdb.Txn, ws memdb.WatchSet, idx uint64,
|
||||||
serviceName string, services structs.ServiceNodes,
|
serviceName string, services structs.ServiceNodes,
|
||||||
err error) (uint64, structs.CheckServiceNodes, error) {
|
err error) (uint64, structs.CheckServiceNodes, error) {
|
||||||
|
@ -1318,7 +1318,7 @@ func (s *StateStore) parseCheckServiceNodes(
|
||||||
|
|
||||||
// NodeInfo is used to generate a dump of a single node. The dump includes
|
// NodeInfo is used to generate a dump of a single node. The dump includes
|
||||||
// all services and checks which are registered against the node.
|
// all services and checks which are registered against the node.
|
||||||
func (s *StateStore) NodeInfo(ws memdb.WatchSet, node string) (uint64, structs.NodeDump, error) {
|
func (s *Store) NodeInfo(ws memdb.WatchSet, node string) (uint64, structs.NodeDump, error) {
|
||||||
tx := s.db.Txn(false)
|
tx := s.db.Txn(false)
|
||||||
defer tx.Abort()
|
defer tx.Abort()
|
||||||
|
|
||||||
|
@ -1337,7 +1337,7 @@ func (s *StateStore) NodeInfo(ws memdb.WatchSet, node string) (uint64, structs.N
|
||||||
// NodeDump is used to generate a dump of all nodes. This call is expensive
|
// NodeDump is used to generate a dump of all nodes. This call is expensive
|
||||||
// as it has to query every node, service, and check. The response can also
|
// as it has to query every node, service, and check. The response can also
|
||||||
// be quite large since there is currently no filtering applied.
|
// be quite large since there is currently no filtering applied.
|
||||||
func (s *StateStore) NodeDump(ws memdb.WatchSet) (uint64, structs.NodeDump, error) {
|
func (s *Store) NodeDump(ws memdb.WatchSet) (uint64, structs.NodeDump, error) {
|
||||||
tx := s.db.Txn(false)
|
tx := s.db.Txn(false)
|
||||||
defer tx.Abort()
|
defer tx.Abort()
|
||||||
|
|
||||||
|
@ -1356,7 +1356,7 @@ func (s *StateStore) NodeDump(ws memdb.WatchSet) (uint64, structs.NodeDump, erro
|
||||||
// parseNodes takes an iterator over a set of nodes and returns a struct
|
// parseNodes takes an iterator over a set of nodes and returns a struct
|
||||||
// containing the nodes along with all of their associated services
|
// containing the nodes along with all of their associated services
|
||||||
// and/or health checks.
|
// and/or health checks.
|
||||||
func (s *StateStore) parseNodes(tx *memdb.Txn, ws memdb.WatchSet, idx uint64,
|
func (s *Store) parseNodes(tx *memdb.Txn, ws memdb.WatchSet, idx uint64,
|
||||||
iter memdb.ResultIterator) (uint64, structs.NodeDump, error) {
|
iter memdb.ResultIterator) (uint64, structs.NodeDump, error) {
|
||||||
|
|
||||||
// We don't want to track an unlimited number of services, so we pull a
|
// We don't want to track an unlimited number of services, so we pull a
|
||||||
|
|
|
@ -9,7 +9,7 @@ import (
|
||||||
)
|
)
|
||||||
|
|
||||||
// Coordinates is used to pull all the coordinates from the snapshot.
|
// Coordinates is used to pull all the coordinates from the snapshot.
|
||||||
func (s *StateSnapshot) Coordinates() (memdb.ResultIterator, error) {
|
func (s *Snapshot) Coordinates() (memdb.ResultIterator, error) {
|
||||||
iter, err := s.tx.Get("coordinates", "id")
|
iter, err := s.tx.Get("coordinates", "id")
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
|
@ -20,7 +20,7 @@ func (s *StateSnapshot) Coordinates() (memdb.ResultIterator, error) {
|
||||||
// Coordinates is used when restoring from a snapshot. For general inserts, use
|
// Coordinates is used when restoring from a snapshot. For general inserts, use
|
||||||
// CoordinateBatchUpdate. We do less vetting of the updates here because they
|
// CoordinateBatchUpdate. We do less vetting of the updates here because they
|
||||||
// already got checked on the way in during a batch update.
|
// already got checked on the way in during a batch update.
|
||||||
func (s *StateRestore) Coordinates(idx uint64, updates structs.Coordinates) error {
|
func (s *Restore) Coordinates(idx uint64, updates structs.Coordinates) error {
|
||||||
for _, update := range updates {
|
for _, update := range updates {
|
||||||
if err := s.tx.Insert("coordinates", update); err != nil {
|
if err := s.tx.Insert("coordinates", update); err != nil {
|
||||||
return fmt.Errorf("failed restoring coordinate: %s", err)
|
return fmt.Errorf("failed restoring coordinate: %s", err)
|
||||||
|
@ -39,7 +39,7 @@ func (s *StateRestore) Coordinates(idx uint64, updates structs.Coordinates) erro
|
||||||
// nil, none of the Raft or node information is returned. This hits the 90%
|
// nil, none of the Raft or node information is returned. This hits the 90%
|
||||||
// internal-to-Consul use case for this data, and this isn't exposed via an
|
// internal-to-Consul use case for this data, and this isn't exposed via an
|
||||||
// endpoint, so it doesn't matter that the Raft info isn't available.
|
// endpoint, so it doesn't matter that the Raft info isn't available.
|
||||||
func (s *StateStore) CoordinateGetRaw(node string) (*coordinate.Coordinate, error) {
|
func (s *Store) CoordinateGetRaw(node string) (*coordinate.Coordinate, error) {
|
||||||
tx := s.db.Txn(false)
|
tx := s.db.Txn(false)
|
||||||
defer tx.Abort()
|
defer tx.Abort()
|
||||||
|
|
||||||
|
@ -57,7 +57,7 @@ func (s *StateStore) CoordinateGetRaw(node string) (*coordinate.Coordinate, erro
|
||||||
}
|
}
|
||||||
|
|
||||||
// Coordinates queries for all nodes with coordinates.
|
// Coordinates queries for all nodes with coordinates.
|
||||||
func (s *StateStore) Coordinates(ws memdb.WatchSet) (uint64, structs.Coordinates, error) {
|
func (s *Store) Coordinates(ws memdb.WatchSet) (uint64, structs.Coordinates, error) {
|
||||||
tx := s.db.Txn(false)
|
tx := s.db.Txn(false)
|
||||||
defer tx.Abort()
|
defer tx.Abort()
|
||||||
|
|
||||||
|
@ -80,7 +80,7 @@ func (s *StateStore) Coordinates(ws memdb.WatchSet) (uint64, structs.Coordinates
|
||||||
|
|
||||||
// CoordinateBatchUpdate processes a batch of coordinate updates and applies
|
// CoordinateBatchUpdate processes a batch of coordinate updates and applies
|
||||||
// them in a single transaction.
|
// them in a single transaction.
|
||||||
func (s *StateStore) CoordinateBatchUpdate(idx uint64, updates structs.Coordinates) error {
|
func (s *Store) CoordinateBatchUpdate(idx uint64, updates structs.Coordinates) error {
|
||||||
tx := s.db.Txn(true)
|
tx := s.db.Txn(true)
|
||||||
defer tx.Abort()
|
defer tx.Abort()
|
||||||
|
|
||||||
|
|
|
@ -10,7 +10,7 @@ import (
|
||||||
)
|
)
|
||||||
|
|
||||||
// KVs is used to pull the full list of KVS entries for use during snapshots.
|
// KVs is used to pull the full list of KVS entries for use during snapshots.
|
||||||
func (s *StateSnapshot) KVs() (memdb.ResultIterator, error) {
|
func (s *Snapshot) KVs() (memdb.ResultIterator, error) {
|
||||||
iter, err := s.tx.Get("kvs", "id_prefix")
|
iter, err := s.tx.Get("kvs", "id_prefix")
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
|
@ -19,12 +19,12 @@ func (s *StateSnapshot) KVs() (memdb.ResultIterator, error) {
|
||||||
}
|
}
|
||||||
|
|
||||||
// Tombstones is used to pull all the tombstones from the graveyard.
|
// Tombstones is used to pull all the tombstones from the graveyard.
|
||||||
func (s *StateSnapshot) Tombstones() (memdb.ResultIterator, error) {
|
func (s *Snapshot) Tombstones() (memdb.ResultIterator, error) {
|
||||||
return s.store.kvsGraveyard.DumpTxn(s.tx)
|
return s.store.kvsGraveyard.DumpTxn(s.tx)
|
||||||
}
|
}
|
||||||
|
|
||||||
// KVS is used when restoring from a snapshot. Use KVSSet for general inserts.
|
// KVS is used when restoring from a snapshot. Use KVSSet for general inserts.
|
||||||
func (s *StateRestore) KVS(entry *structs.DirEntry) error {
|
func (s *Restore) KVS(entry *structs.DirEntry) error {
|
||||||
if err := s.tx.Insert("kvs", entry); err != nil {
|
if err := s.tx.Insert("kvs", entry); err != nil {
|
||||||
return fmt.Errorf("failed inserting kvs entry: %s", err)
|
return fmt.Errorf("failed inserting kvs entry: %s", err)
|
||||||
}
|
}
|
||||||
|
@ -37,7 +37,7 @@ func (s *StateRestore) KVS(entry *structs.DirEntry) error {
|
||||||
|
|
||||||
// Tombstone is used when restoring from a snapshot. For general inserts, use
|
// Tombstone is used when restoring from a snapshot. For general inserts, use
|
||||||
// Graveyard.InsertTxn.
|
// Graveyard.InsertTxn.
|
||||||
func (s *StateRestore) Tombstone(stone *Tombstone) error {
|
func (s *Restore) Tombstone(stone *Tombstone) error {
|
||||||
if err := s.store.kvsGraveyard.RestoreTxn(s.tx, stone); err != nil {
|
if err := s.store.kvsGraveyard.RestoreTxn(s.tx, stone); err != nil {
|
||||||
return fmt.Errorf("failed restoring tombstone: %s", err)
|
return fmt.Errorf("failed restoring tombstone: %s", err)
|
||||||
}
|
}
|
||||||
|
@ -47,7 +47,7 @@ func (s *StateRestore) Tombstone(stone *Tombstone) error {
|
||||||
// ReapTombstones is used to delete all the tombstones with an index
|
// ReapTombstones is used to delete all the tombstones with an index
|
||||||
// less than or equal to the given index. This is used to prevent
|
// less than or equal to the given index. This is used to prevent
|
||||||
// unbounded storage growth of the tombstones.
|
// unbounded storage growth of the tombstones.
|
||||||
func (s *StateStore) ReapTombstones(index uint64) error {
|
func (s *Store) ReapTombstones(index uint64) error {
|
||||||
tx := s.db.Txn(true)
|
tx := s.db.Txn(true)
|
||||||
defer tx.Abort()
|
defer tx.Abort()
|
||||||
|
|
||||||
|
@ -60,7 +60,7 @@ func (s *StateStore) ReapTombstones(index uint64) error {
|
||||||
}
|
}
|
||||||
|
|
||||||
// KVSSet is used to store a key/value pair.
|
// KVSSet is used to store a key/value pair.
|
||||||
func (s *StateStore) KVSSet(idx uint64, entry *structs.DirEntry) error {
|
func (s *Store) KVSSet(idx uint64, entry *structs.DirEntry) error {
|
||||||
tx := s.db.Txn(true)
|
tx := s.db.Txn(true)
|
||||||
defer tx.Abort()
|
defer tx.Abort()
|
||||||
|
|
||||||
|
@ -78,7 +78,7 @@ func (s *StateStore) KVSSet(idx uint64, entry *structs.DirEntry) error {
|
||||||
// If updateSession is true, then the incoming entry will set the new
|
// If updateSession is true, then the incoming entry will set the new
|
||||||
// session (should be validated before calling this). Otherwise, we will keep
|
// session (should be validated before calling this). Otherwise, we will keep
|
||||||
// whatever the existing session is.
|
// whatever the existing session is.
|
||||||
func (s *StateStore) kvsSetTxn(tx *memdb.Txn, idx uint64, entry *structs.DirEntry, updateSession bool) error {
|
func (s *Store) kvsSetTxn(tx *memdb.Txn, idx uint64, entry *structs.DirEntry, updateSession bool) error {
|
||||||
// Retrieve an existing KV pair
|
// Retrieve an existing KV pair
|
||||||
existing, err := tx.First("kvs", "id", entry.Key)
|
existing, err := tx.First("kvs", "id", entry.Key)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
@ -115,7 +115,7 @@ func (s *StateStore) kvsSetTxn(tx *memdb.Txn, idx uint64, entry *structs.DirEntr
|
||||||
}
|
}
|
||||||
|
|
||||||
// KVSGet is used to retrieve a key/value pair from the state store.
|
// KVSGet is used to retrieve a key/value pair from the state store.
|
||||||
func (s *StateStore) KVSGet(ws memdb.WatchSet, key string) (uint64, *structs.DirEntry, error) {
|
func (s *Store) KVSGet(ws memdb.WatchSet, key string) (uint64, *structs.DirEntry, error) {
|
||||||
tx := s.db.Txn(false)
|
tx := s.db.Txn(false)
|
||||||
defer tx.Abort()
|
defer tx.Abort()
|
||||||
|
|
||||||
|
@ -124,7 +124,7 @@ func (s *StateStore) KVSGet(ws memdb.WatchSet, key string) (uint64, *structs.Dir
|
||||||
|
|
||||||
// kvsGetTxn is the inner method that gets a KVS entry inside an existing
|
// kvsGetTxn is the inner method that gets a KVS entry inside an existing
|
||||||
// transaction.
|
// transaction.
|
||||||
func (s *StateStore) kvsGetTxn(tx *memdb.Txn, ws memdb.WatchSet, key string) (uint64, *structs.DirEntry, error) {
|
func (s *Store) kvsGetTxn(tx *memdb.Txn, ws memdb.WatchSet, key string) (uint64, *structs.DirEntry, error) {
|
||||||
// Get the table index.
|
// Get the table index.
|
||||||
idx := maxIndexTxn(tx, "kvs", "tombstones")
|
idx := maxIndexTxn(tx, "kvs", "tombstones")
|
||||||
|
|
||||||
|
@ -144,7 +144,7 @@ func (s *StateStore) kvsGetTxn(tx *memdb.Txn, ws memdb.WatchSet, key string) (ui
|
||||||
// prefix is left empty, all keys in the KVS will be returned. The returned
|
// prefix is left empty, all keys in the KVS will be returned. The returned
|
||||||
// is the max index of the returned kvs entries or applicable tombstones, or
|
// is the max index of the returned kvs entries or applicable tombstones, or
|
||||||
// else it's the full table indexes for kvs and tombstones.
|
// else it's the full table indexes for kvs and tombstones.
|
||||||
func (s *StateStore) KVSList(ws memdb.WatchSet, prefix string) (uint64, structs.DirEntries, error) {
|
func (s *Store) KVSList(ws memdb.WatchSet, prefix string) (uint64, structs.DirEntries, error) {
|
||||||
tx := s.db.Txn(false)
|
tx := s.db.Txn(false)
|
||||||
defer tx.Abort()
|
defer tx.Abort()
|
||||||
|
|
||||||
|
@ -153,7 +153,7 @@ func (s *StateStore) KVSList(ws memdb.WatchSet, prefix string) (uint64, structs.
|
||||||
|
|
||||||
// kvsListTxn is the inner method that gets a list of KVS entries matching a
|
// kvsListTxn is the inner method that gets a list of KVS entries matching a
|
||||||
// prefix.
|
// prefix.
|
||||||
func (s *StateStore) kvsListTxn(tx *memdb.Txn, ws memdb.WatchSet, prefix string) (uint64, structs.DirEntries, error) {
|
func (s *Store) kvsListTxn(tx *memdb.Txn, ws memdb.WatchSet, prefix string) (uint64, structs.DirEntries, error) {
|
||||||
// Get the table indexes.
|
// Get the table indexes.
|
||||||
idx := maxIndexTxn(tx, "kvs", "tombstones")
|
idx := maxIndexTxn(tx, "kvs", "tombstones")
|
||||||
|
|
||||||
|
@ -201,7 +201,7 @@ func (s *StateStore) kvsListTxn(tx *memdb.Txn, ws memdb.WatchSet, prefix string)
|
||||||
// An optional separator may be specified, which can be used to slice off a part
|
// An optional separator may be specified, which can be used to slice off a part
|
||||||
// of the response so that only a subset of the prefix is returned. In this
|
// of the response so that only a subset of the prefix is returned. In this
|
||||||
// mode, the keys which are omitted are still counted in the returned index.
|
// mode, the keys which are omitted are still counted in the returned index.
|
||||||
func (s *StateStore) KVSListKeys(ws memdb.WatchSet, prefix, sep string) (uint64, []string, error) {
|
func (s *Store) KVSListKeys(ws memdb.WatchSet, prefix, sep string) (uint64, []string, error) {
|
||||||
tx := s.db.Txn(false)
|
tx := s.db.Txn(false)
|
||||||
defer tx.Abort()
|
defer tx.Abort()
|
||||||
|
|
||||||
|
@ -274,7 +274,7 @@ func (s *StateStore) KVSListKeys(ws memdb.WatchSet, prefix, sep string) (uint64,
|
||||||
|
|
||||||
// KVSDelete is used to perform a shallow delete on a single key in the
|
// KVSDelete is used to perform a shallow delete on a single key in the
|
||||||
// the state store.
|
// the state store.
|
||||||
func (s *StateStore) KVSDelete(idx uint64, key string) error {
|
func (s *Store) KVSDelete(idx uint64, key string) error {
|
||||||
tx := s.db.Txn(true)
|
tx := s.db.Txn(true)
|
||||||
defer tx.Abort()
|
defer tx.Abort()
|
||||||
|
|
||||||
|
@ -289,7 +289,7 @@ func (s *StateStore) KVSDelete(idx uint64, key string) error {
|
||||||
|
|
||||||
// kvsDeleteTxn is the inner method used to perform the actual deletion
|
// kvsDeleteTxn is the inner method used to perform the actual deletion
|
||||||
// of a key/value pair within an existing transaction.
|
// of a key/value pair within an existing transaction.
|
||||||
func (s *StateStore) kvsDeleteTxn(tx *memdb.Txn, idx uint64, key string) error {
|
func (s *Store) kvsDeleteTxn(tx *memdb.Txn, idx uint64, key string) error {
|
||||||
// Look up the entry in the state store.
|
// Look up the entry in the state store.
|
||||||
entry, err := tx.First("kvs", "id", key)
|
entry, err := tx.First("kvs", "id", key)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
@ -319,7 +319,7 @@ func (s *StateStore) kvsDeleteTxn(tx *memdb.Txn, idx uint64, key string) error {
|
||||||
// raft index. If the CAS index specified is not equal to the last
|
// raft index. If the CAS index specified is not equal to the last
|
||||||
// observed index for the given key, then the call is a noop, otherwise
|
// observed index for the given key, then the call is a noop, otherwise
|
||||||
// a normal KV delete is invoked.
|
// a normal KV delete is invoked.
|
||||||
func (s *StateStore) KVSDeleteCAS(idx, cidx uint64, key string) (bool, error) {
|
func (s *Store) KVSDeleteCAS(idx, cidx uint64, key string) (bool, error) {
|
||||||
tx := s.db.Txn(true)
|
tx := s.db.Txn(true)
|
||||||
defer tx.Abort()
|
defer tx.Abort()
|
||||||
|
|
||||||
|
@ -334,7 +334,7 @@ func (s *StateStore) KVSDeleteCAS(idx, cidx uint64, key string) (bool, error) {
|
||||||
|
|
||||||
// kvsDeleteCASTxn is the inner method that does a CAS delete within an existing
|
// kvsDeleteCASTxn is the inner method that does a CAS delete within an existing
|
||||||
// transaction.
|
// transaction.
|
||||||
func (s *StateStore) kvsDeleteCASTxn(tx *memdb.Txn, idx, cidx uint64, key string) (bool, error) {
|
func (s *Store) kvsDeleteCASTxn(tx *memdb.Txn, idx, cidx uint64, key string) (bool, error) {
|
||||||
// Retrieve the existing kvs entry, if any exists.
|
// Retrieve the existing kvs entry, if any exists.
|
||||||
entry, err := tx.First("kvs", "id", key)
|
entry, err := tx.First("kvs", "id", key)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
@ -360,7 +360,7 @@ func (s *StateStore) kvsDeleteCASTxn(tx *memdb.Txn, idx, cidx uint64, key string
|
||||||
// ModifyIndex in the provided entry is used to determine if we should
|
// ModifyIndex in the provided entry is used to determine if we should
|
||||||
// write the entry to the state store or bail. Returns a bool indicating
|
// write the entry to the state store or bail. Returns a bool indicating
|
||||||
// if a write happened and any error.
|
// if a write happened and any error.
|
||||||
func (s *StateStore) KVSSetCAS(idx uint64, entry *structs.DirEntry) (bool, error) {
|
func (s *Store) KVSSetCAS(idx uint64, entry *structs.DirEntry) (bool, error) {
|
||||||
tx := s.db.Txn(true)
|
tx := s.db.Txn(true)
|
||||||
defer tx.Abort()
|
defer tx.Abort()
|
||||||
|
|
||||||
|
@ -375,7 +375,7 @@ func (s *StateStore) KVSSetCAS(idx uint64, entry *structs.DirEntry) (bool, error
|
||||||
|
|
||||||
// kvsSetCASTxn is the inner method used to do a CAS inside an existing
|
// kvsSetCASTxn is the inner method used to do a CAS inside an existing
|
||||||
// transaction.
|
// transaction.
|
||||||
func (s *StateStore) kvsSetCASTxn(tx *memdb.Txn, idx uint64, entry *structs.DirEntry) (bool, error) {
|
func (s *Store) kvsSetCASTxn(tx *memdb.Txn, idx uint64, entry *structs.DirEntry) (bool, error) {
|
||||||
// Retrieve the existing entry.
|
// Retrieve the existing entry.
|
||||||
existing, err := tx.First("kvs", "id", entry.Key)
|
existing, err := tx.First("kvs", "id", entry.Key)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
@ -405,7 +405,7 @@ func (s *StateStore) kvsSetCASTxn(tx *memdb.Txn, idx uint64, entry *structs.DirE
|
||||||
// KVSDeleteTree is used to do a recursive delete on a key prefix
|
// KVSDeleteTree is used to do a recursive delete on a key prefix
|
||||||
// in the state store. If any keys are modified, the last index is
|
// in the state store. If any keys are modified, the last index is
|
||||||
// set, otherwise this is a no-op.
|
// set, otherwise this is a no-op.
|
||||||
func (s *StateStore) KVSDeleteTree(idx uint64, prefix string) error {
|
func (s *Store) KVSDeleteTree(idx uint64, prefix string) error {
|
||||||
tx := s.db.Txn(true)
|
tx := s.db.Txn(true)
|
||||||
defer tx.Abort()
|
defer tx.Abort()
|
||||||
|
|
||||||
|
@ -419,7 +419,7 @@ func (s *StateStore) KVSDeleteTree(idx uint64, prefix string) error {
|
||||||
|
|
||||||
// kvsDeleteTreeTxn is the inner method that does a recursive delete inside an
|
// kvsDeleteTreeTxn is the inner method that does a recursive delete inside an
|
||||||
// existing transaction.
|
// existing transaction.
|
||||||
func (s *StateStore) kvsDeleteTreeTxn(tx *memdb.Txn, idx uint64, prefix string) error {
|
func (s *Store) kvsDeleteTreeTxn(tx *memdb.Txn, idx uint64, prefix string) error {
|
||||||
// Get an iterator over all of the keys with the given prefix.
|
// Get an iterator over all of the keys with the given prefix.
|
||||||
entries, err := tx.Get("kvs", "id_prefix", prefix)
|
entries, err := tx.Get("kvs", "id_prefix", prefix)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
@ -459,13 +459,13 @@ func (s *StateStore) kvsDeleteTreeTxn(tx *memdb.Txn, idx uint64, prefix string)
|
||||||
|
|
||||||
// KVSLockDelay returns the expiration time for any lock delay associated with
|
// KVSLockDelay returns the expiration time for any lock delay associated with
|
||||||
// the given key.
|
// the given key.
|
||||||
func (s *StateStore) KVSLockDelay(key string) time.Time {
|
func (s *Store) KVSLockDelay(key string) time.Time {
|
||||||
return s.lockDelay.GetExpiration(key)
|
return s.lockDelay.GetExpiration(key)
|
||||||
}
|
}
|
||||||
|
|
||||||
// KVSLock is similar to KVSSet but only performs the set if the lock can be
|
// KVSLock is similar to KVSSet but only performs the set if the lock can be
|
||||||
// acquired.
|
// acquired.
|
||||||
func (s *StateStore) KVSLock(idx uint64, entry *structs.DirEntry) (bool, error) {
|
func (s *Store) KVSLock(idx uint64, entry *structs.DirEntry) (bool, error) {
|
||||||
tx := s.db.Txn(true)
|
tx := s.db.Txn(true)
|
||||||
defer tx.Abort()
|
defer tx.Abort()
|
||||||
|
|
||||||
|
@ -480,7 +480,7 @@ func (s *StateStore) KVSLock(idx uint64, entry *structs.DirEntry) (bool, error)
|
||||||
|
|
||||||
// kvsLockTxn is the inner method that does a lock inside an existing
|
// kvsLockTxn is the inner method that does a lock inside an existing
|
||||||
// transaction.
|
// transaction.
|
||||||
func (s *StateStore) kvsLockTxn(tx *memdb.Txn, idx uint64, entry *structs.DirEntry) (bool, error) {
|
func (s *Store) kvsLockTxn(tx *memdb.Txn, idx uint64, entry *structs.DirEntry) (bool, error) {
|
||||||
// Verify that a session is present.
|
// Verify that a session is present.
|
||||||
if entry.Session == "" {
|
if entry.Session == "" {
|
||||||
return false, fmt.Errorf("missing session")
|
return false, fmt.Errorf("missing session")
|
||||||
|
@ -531,7 +531,7 @@ func (s *StateStore) kvsLockTxn(tx *memdb.Txn, idx uint64, entry *structs.DirEnt
|
||||||
|
|
||||||
// KVSUnlock is similar to KVSSet but only performs the set if the lock can be
|
// KVSUnlock is similar to KVSSet but only performs the set if the lock can be
|
||||||
// unlocked (the key must already exist and be locked).
|
// unlocked (the key must already exist and be locked).
|
||||||
func (s *StateStore) KVSUnlock(idx uint64, entry *structs.DirEntry) (bool, error) {
|
func (s *Store) KVSUnlock(idx uint64, entry *structs.DirEntry) (bool, error) {
|
||||||
tx := s.db.Txn(true)
|
tx := s.db.Txn(true)
|
||||||
defer tx.Abort()
|
defer tx.Abort()
|
||||||
|
|
||||||
|
@ -546,7 +546,7 @@ func (s *StateStore) KVSUnlock(idx uint64, entry *structs.DirEntry) (bool, error
|
||||||
|
|
||||||
// kvsUnlockTxn is the inner method that does an unlock inside an existing
|
// kvsUnlockTxn is the inner method that does an unlock inside an existing
|
||||||
// transaction.
|
// transaction.
|
||||||
func (s *StateStore) kvsUnlockTxn(tx *memdb.Txn, idx uint64, entry *structs.DirEntry) (bool, error) {
|
func (s *Store) kvsUnlockTxn(tx *memdb.Txn, idx uint64, entry *structs.DirEntry) (bool, error) {
|
||||||
// Verify that a session is present.
|
// Verify that a session is present.
|
||||||
if entry.Session == "" {
|
if entry.Session == "" {
|
||||||
return false, fmt.Errorf("missing session")
|
return false, fmt.Errorf("missing session")
|
||||||
|
@ -584,7 +584,7 @@ func (s *StateStore) kvsUnlockTxn(tx *memdb.Txn, idx uint64, entry *structs.DirE
|
||||||
|
|
||||||
// kvsCheckSessionTxn checks to see if the given session matches the current
|
// kvsCheckSessionTxn checks to see if the given session matches the current
|
||||||
// entry for a key.
|
// entry for a key.
|
||||||
func (s *StateStore) kvsCheckSessionTxn(tx *memdb.Txn, key string, session string) (*structs.DirEntry, error) {
|
func (s *Store) kvsCheckSessionTxn(tx *memdb.Txn, key string, session string) (*structs.DirEntry, error) {
|
||||||
entry, err := tx.First("kvs", "id", key)
|
entry, err := tx.First("kvs", "id", key)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, fmt.Errorf("failed kvs lookup: %s", err)
|
return nil, fmt.Errorf("failed kvs lookup: %s", err)
|
||||||
|
@ -603,7 +603,7 @@ func (s *StateStore) kvsCheckSessionTxn(tx *memdb.Txn, key string, session strin
|
||||||
|
|
||||||
// kvsCheckIndexTxn checks to see if the given modify index matches the current
|
// kvsCheckIndexTxn checks to see if the given modify index matches the current
|
||||||
// entry for a key.
|
// entry for a key.
|
||||||
func (s *StateStore) kvsCheckIndexTxn(tx *memdb.Txn, key string, cidx uint64) (*structs.DirEntry, error) {
|
func (s *Store) kvsCheckIndexTxn(tx *memdb.Txn, key string, cidx uint64) (*structs.DirEntry, error) {
|
||||||
entry, err := tx.First("kvs", "id", key)
|
entry, err := tx.First("kvs", "id", key)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, fmt.Errorf("failed kvs lookup: %s", err)
|
return nil, fmt.Errorf("failed kvs lookup: %s", err)
|
||||||
|
|
|
@ -45,7 +45,7 @@ func toPreparedQuery(wrapped interface{}) *structs.PreparedQuery {
|
||||||
}
|
}
|
||||||
|
|
||||||
// PreparedQueries is used to pull all the prepared queries from the snapshot.
|
// PreparedQueries is used to pull all the prepared queries from the snapshot.
|
||||||
func (s *StateSnapshot) PreparedQueries() (structs.PreparedQueries, error) {
|
func (s *Snapshot) PreparedQueries() (structs.PreparedQueries, error) {
|
||||||
queries, err := s.tx.Get("prepared-queries", "id")
|
queries, err := s.tx.Get("prepared-queries", "id")
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
|
@ -60,7 +60,7 @@ func (s *StateSnapshot) PreparedQueries() (structs.PreparedQueries, error) {
|
||||||
|
|
||||||
// PreparedQuery is used when restoring from a snapshot. For general inserts,
|
// PreparedQuery is used when restoring from a snapshot. For general inserts,
|
||||||
// use PreparedQuerySet.
|
// use PreparedQuerySet.
|
||||||
func (s *StateRestore) PreparedQuery(query *structs.PreparedQuery) error {
|
func (s *Restore) PreparedQuery(query *structs.PreparedQuery) error {
|
||||||
// If this is a template, compile it, otherwise leave the compiled
|
// If this is a template, compile it, otherwise leave the compiled
|
||||||
// template field nil.
|
// template field nil.
|
||||||
var ct *prepared_query.CompiledTemplate
|
var ct *prepared_query.CompiledTemplate
|
||||||
|
@ -84,7 +84,7 @@ func (s *StateRestore) PreparedQuery(query *structs.PreparedQuery) error {
|
||||||
}
|
}
|
||||||
|
|
||||||
// PreparedQuerySet is used to create or update a prepared query.
|
// PreparedQuerySet is used to create or update a prepared query.
|
||||||
func (s *StateStore) PreparedQuerySet(idx uint64, query *structs.PreparedQuery) error {
|
func (s *Store) PreparedQuerySet(idx uint64, query *structs.PreparedQuery) error {
|
||||||
tx := s.db.Txn(true)
|
tx := s.db.Txn(true)
|
||||||
defer tx.Abort()
|
defer tx.Abort()
|
||||||
|
|
||||||
|
@ -98,7 +98,7 @@ func (s *StateStore) PreparedQuerySet(idx uint64, query *structs.PreparedQuery)
|
||||||
|
|
||||||
// preparedQuerySetTxn is the inner method used to insert a prepared query with
|
// preparedQuerySetTxn is the inner method used to insert a prepared query with
|
||||||
// the proper indexes into the state store.
|
// the proper indexes into the state store.
|
||||||
func (s *StateStore) preparedQuerySetTxn(tx *memdb.Txn, idx uint64, query *structs.PreparedQuery) error {
|
func (s *Store) preparedQuerySetTxn(tx *memdb.Txn, idx uint64, query *structs.PreparedQuery) error {
|
||||||
// Check that the ID is set.
|
// Check that the ID is set.
|
||||||
if query.ID == "" {
|
if query.ID == "" {
|
||||||
return ErrMissingQueryID
|
return ErrMissingQueryID
|
||||||
|
@ -201,7 +201,7 @@ func (s *StateStore) preparedQuerySetTxn(tx *memdb.Txn, idx uint64, query *struc
|
||||||
}
|
}
|
||||||
|
|
||||||
// PreparedQueryDelete deletes the given query by ID.
|
// PreparedQueryDelete deletes the given query by ID.
|
||||||
func (s *StateStore) PreparedQueryDelete(idx uint64, queryID string) error {
|
func (s *Store) PreparedQueryDelete(idx uint64, queryID string) error {
|
||||||
tx := s.db.Txn(true)
|
tx := s.db.Txn(true)
|
||||||
defer tx.Abort()
|
defer tx.Abort()
|
||||||
|
|
||||||
|
@ -215,7 +215,7 @@ func (s *StateStore) PreparedQueryDelete(idx uint64, queryID string) error {
|
||||||
|
|
||||||
// preparedQueryDeleteTxn is the inner method used to delete a prepared query
|
// preparedQueryDeleteTxn is the inner method used to delete a prepared query
|
||||||
// with the proper indexes into the state store.
|
// with the proper indexes into the state store.
|
||||||
func (s *StateStore) preparedQueryDeleteTxn(tx *memdb.Txn, idx uint64, queryID string) error {
|
func (s *Store) preparedQueryDeleteTxn(tx *memdb.Txn, idx uint64, queryID string) error {
|
||||||
// Pull the query.
|
// Pull the query.
|
||||||
wrapped, err := tx.First("prepared-queries", "id", queryID)
|
wrapped, err := tx.First("prepared-queries", "id", queryID)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
@ -237,7 +237,7 @@ func (s *StateStore) preparedQueryDeleteTxn(tx *memdb.Txn, idx uint64, queryID s
|
||||||
}
|
}
|
||||||
|
|
||||||
// PreparedQueryGet returns the given prepared query by ID.
|
// PreparedQueryGet returns the given prepared query by ID.
|
||||||
func (s *StateStore) PreparedQueryGet(ws memdb.WatchSet, queryID string) (uint64, *structs.PreparedQuery, error) {
|
func (s *Store) PreparedQueryGet(ws memdb.WatchSet, queryID string) (uint64, *structs.PreparedQuery, error) {
|
||||||
tx := s.db.Txn(false)
|
tx := s.db.Txn(false)
|
||||||
defer tx.Abort()
|
defer tx.Abort()
|
||||||
|
|
||||||
|
@ -256,7 +256,7 @@ func (s *StateStore) PreparedQueryGet(ws memdb.WatchSet, queryID string) (uint64
|
||||||
// PreparedQueryResolve returns the given prepared query by looking up an ID or
|
// PreparedQueryResolve returns the given prepared query by looking up an ID or
|
||||||
// Name. If the query was looked up by name and it's a template, then the
|
// Name. If the query was looked up by name and it's a template, then the
|
||||||
// template will be rendered before it is returned.
|
// template will be rendered before it is returned.
|
||||||
func (s *StateStore) PreparedQueryResolve(queryIDOrName string) (uint64, *structs.PreparedQuery, error) {
|
func (s *Store) PreparedQueryResolve(queryIDOrName string) (uint64, *structs.PreparedQuery, error) {
|
||||||
tx := s.db.Txn(false)
|
tx := s.db.Txn(false)
|
||||||
defer tx.Abort()
|
defer tx.Abort()
|
||||||
|
|
||||||
|
@ -331,7 +331,7 @@ func (s *StateStore) PreparedQueryResolve(queryIDOrName string) (uint64, *struct
|
||||||
}
|
}
|
||||||
|
|
||||||
// PreparedQueryList returns all the prepared queries.
|
// PreparedQueryList returns all the prepared queries.
|
||||||
func (s *StateStore) PreparedQueryList(ws memdb.WatchSet) (uint64, structs.PreparedQueries, error) {
|
func (s *Store) PreparedQueryList(ws memdb.WatchSet) (uint64, structs.PreparedQueries, error) {
|
||||||
tx := s.db.Txn(false)
|
tx := s.db.Txn(false)
|
||||||
defer tx.Abort()
|
defer tx.Abort()
|
||||||
|
|
||||||
|
|
|
@ -10,7 +10,7 @@ import (
|
||||||
)
|
)
|
||||||
|
|
||||||
// Sessions is used to pull the full list of sessions for use during snapshots.
|
// Sessions is used to pull the full list of sessions for use during snapshots.
|
||||||
func (s *StateSnapshot) Sessions() (memdb.ResultIterator, error) {
|
func (s *Snapshot) Sessions() (memdb.ResultIterator, error) {
|
||||||
iter, err := s.tx.Get("sessions", "id")
|
iter, err := s.tx.Get("sessions", "id")
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
|
@ -20,7 +20,7 @@ func (s *StateSnapshot) Sessions() (memdb.ResultIterator, error) {
|
||||||
|
|
||||||
// Session is used when restoring from a snapshot. For general inserts, use
|
// Session is used when restoring from a snapshot. For general inserts, use
|
||||||
// SessionCreate.
|
// SessionCreate.
|
||||||
func (s *StateRestore) Session(sess *structs.Session) error {
|
func (s *Restore) Session(sess *structs.Session) error {
|
||||||
// Insert the session.
|
// Insert the session.
|
||||||
if err := s.tx.Insert("sessions", sess); err != nil {
|
if err := s.tx.Insert("sessions", sess); err != nil {
|
||||||
return fmt.Errorf("failed inserting session: %s", err)
|
return fmt.Errorf("failed inserting session: %s", err)
|
||||||
|
@ -47,7 +47,7 @@ func (s *StateRestore) Session(sess *structs.Session) error {
|
||||||
}
|
}
|
||||||
|
|
||||||
// SessionCreate is used to register a new session in the state store.
|
// SessionCreate is used to register a new session in the state store.
|
||||||
func (s *StateStore) SessionCreate(idx uint64, sess *structs.Session) error {
|
func (s *Store) SessionCreate(idx uint64, sess *structs.Session) error {
|
||||||
tx := s.db.Txn(true)
|
tx := s.db.Txn(true)
|
||||||
defer tx.Abort()
|
defer tx.Abort()
|
||||||
|
|
||||||
|
@ -70,7 +70,7 @@ func (s *StateStore) SessionCreate(idx uint64, sess *structs.Session) error {
|
||||||
// sessionCreateTxn is the inner method used for creating session entries in
|
// sessionCreateTxn is the inner method used for creating session entries in
|
||||||
// an open transaction. Any health checks registered with the session will be
|
// an open transaction. Any health checks registered with the session will be
|
||||||
// checked for failing status. Returns any error encountered.
|
// checked for failing status. Returns any error encountered.
|
||||||
func (s *StateStore) sessionCreateTxn(tx *memdb.Txn, idx uint64, sess *structs.Session) error {
|
func (s *Store) sessionCreateTxn(tx *memdb.Txn, idx uint64, sess *structs.Session) error {
|
||||||
// Check that we have a session ID
|
// Check that we have a session ID
|
||||||
if sess.ID == "" {
|
if sess.ID == "" {
|
||||||
return ErrMissingSessionID
|
return ErrMissingSessionID
|
||||||
|
@ -144,7 +144,7 @@ func (s *StateStore) sessionCreateTxn(tx *memdb.Txn, idx uint64, sess *structs.S
|
||||||
}
|
}
|
||||||
|
|
||||||
// SessionGet is used to retrieve an active session from the state store.
|
// SessionGet is used to retrieve an active session from the state store.
|
||||||
func (s *StateStore) SessionGet(ws memdb.WatchSet, sessionID string) (uint64, *structs.Session, error) {
|
func (s *Store) SessionGet(ws memdb.WatchSet, sessionID string) (uint64, *structs.Session, error) {
|
||||||
tx := s.db.Txn(false)
|
tx := s.db.Txn(false)
|
||||||
defer tx.Abort()
|
defer tx.Abort()
|
||||||
|
|
||||||
|
@ -164,7 +164,7 @@ func (s *StateStore) SessionGet(ws memdb.WatchSet, sessionID string) (uint64, *s
|
||||||
}
|
}
|
||||||
|
|
||||||
// SessionList returns a slice containing all of the active sessions.
|
// SessionList returns a slice containing all of the active sessions.
|
||||||
func (s *StateStore) SessionList(ws memdb.WatchSet) (uint64, structs.Sessions, error) {
|
func (s *Store) SessionList(ws memdb.WatchSet) (uint64, structs.Sessions, error) {
|
||||||
tx := s.db.Txn(false)
|
tx := s.db.Txn(false)
|
||||||
defer tx.Abort()
|
defer tx.Abort()
|
||||||
|
|
||||||
|
@ -189,7 +189,7 @@ func (s *StateStore) SessionList(ws memdb.WatchSet) (uint64, structs.Sessions, e
|
||||||
// NodeSessions returns a set of active sessions associated
|
// NodeSessions returns a set of active sessions associated
|
||||||
// with the given node ID. The returned index is the highest
|
// with the given node ID. The returned index is the highest
|
||||||
// index seen from the result set.
|
// index seen from the result set.
|
||||||
func (s *StateStore) NodeSessions(ws memdb.WatchSet, nodeID string) (uint64, structs.Sessions, error) {
|
func (s *Store) NodeSessions(ws memdb.WatchSet, nodeID string) (uint64, structs.Sessions, error) {
|
||||||
tx := s.db.Txn(false)
|
tx := s.db.Txn(false)
|
||||||
defer tx.Abort()
|
defer tx.Abort()
|
||||||
|
|
||||||
|
@ -214,7 +214,7 @@ func (s *StateStore) NodeSessions(ws memdb.WatchSet, nodeID string) (uint64, str
|
||||||
// SessionDestroy is used to remove an active session. This will
|
// SessionDestroy is used to remove an active session. This will
|
||||||
// implicitly invalidate the session and invoke the specified
|
// implicitly invalidate the session and invoke the specified
|
||||||
// session destroy behavior.
|
// session destroy behavior.
|
||||||
func (s *StateStore) SessionDestroy(idx uint64, sessionID string) error {
|
func (s *Store) SessionDestroy(idx uint64, sessionID string) error {
|
||||||
tx := s.db.Txn(true)
|
tx := s.db.Txn(true)
|
||||||
defer tx.Abort()
|
defer tx.Abort()
|
||||||
|
|
||||||
|
@ -229,7 +229,7 @@ func (s *StateStore) SessionDestroy(idx uint64, sessionID string) error {
|
||||||
|
|
||||||
// deleteSessionTxn is the inner method, which is used to do the actual
|
// deleteSessionTxn is the inner method, which is used to do the actual
|
||||||
// session deletion and handle session invalidation, etc.
|
// session deletion and handle session invalidation, etc.
|
||||||
func (s *StateStore) deleteSessionTxn(tx *memdb.Txn, idx uint64, sessionID string) error {
|
func (s *Store) deleteSessionTxn(tx *memdb.Txn, idx uint64, sessionID string) error {
|
||||||
// Look up the session.
|
// Look up the session.
|
||||||
sess, err := tx.First("sessions", "id", sessionID)
|
sess, err := tx.First("sessions", "id", sessionID)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
|
|
@ -42,11 +42,11 @@ const (
|
||||||
watchLimit = 2048
|
watchLimit = 2048
|
||||||
)
|
)
|
||||||
|
|
||||||
// StateStore is where we store all of Consul's state, including
|
// Store is where we store all of Consul's state, including
|
||||||
// records of node registrations, services, checks, key/value
|
// records of node registrations, services, checks, key/value
|
||||||
// pairs and more. The DB is entirely in-memory and is constructed
|
// pairs and more. The DB is entirely in-memory and is constructed
|
||||||
// from the Raft log through the FSM.
|
// from the Raft log through the FSM.
|
||||||
type StateStore struct {
|
type Store struct {
|
||||||
schema *memdb.DBSchema
|
schema *memdb.DBSchema
|
||||||
db *memdb.MemDB
|
db *memdb.MemDB
|
||||||
|
|
||||||
|
@ -61,18 +61,18 @@ type StateStore struct {
|
||||||
lockDelay *Delay
|
lockDelay *Delay
|
||||||
}
|
}
|
||||||
|
|
||||||
// StateSnapshot is used to provide a point-in-time snapshot. It
|
// Snapshot is used to provide a point-in-time snapshot. It
|
||||||
// works by starting a read transaction against the whole state store.
|
// works by starting a read transaction against the whole state store.
|
||||||
type StateSnapshot struct {
|
type Snapshot struct {
|
||||||
store *StateStore
|
store *Store
|
||||||
tx *memdb.Txn
|
tx *memdb.Txn
|
||||||
lastIndex uint64
|
lastIndex uint64
|
||||||
}
|
}
|
||||||
|
|
||||||
// StateRestore is used to efficiently manage restoring a large amount of
|
// Restore is used to efficiently manage restoring a large amount of
|
||||||
// data to a state store.
|
// data to a state store.
|
||||||
type StateRestore struct {
|
type Restore struct {
|
||||||
store *StateStore
|
store *Store
|
||||||
tx *memdb.Txn
|
tx *memdb.Txn
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -93,7 +93,7 @@ type sessionCheck struct {
|
||||||
}
|
}
|
||||||
|
|
||||||
// NewStateStore creates a new in-memory state storage layer.
|
// NewStateStore creates a new in-memory state storage layer.
|
||||||
func NewStateStore(gc *TombstoneGC) (*StateStore, error) {
|
func NewStateStore(gc *TombstoneGC) (*Store, error) {
|
||||||
// Create the in-memory DB.
|
// Create the in-memory DB.
|
||||||
schema := stateStoreSchema()
|
schema := stateStoreSchema()
|
||||||
db, err := memdb.NewMemDB(schema)
|
db, err := memdb.NewMemDB(schema)
|
||||||
|
@ -102,7 +102,7 @@ func NewStateStore(gc *TombstoneGC) (*StateStore, error) {
|
||||||
}
|
}
|
||||||
|
|
||||||
// Create and return the state store.
|
// Create and return the state store.
|
||||||
s := &StateStore{
|
s := &Store{
|
||||||
schema: schema,
|
schema: schema,
|
||||||
db: db,
|
db: db,
|
||||||
abandonCh: make(chan struct{}),
|
abandonCh: make(chan struct{}),
|
||||||
|
@ -113,7 +113,7 @@ func NewStateStore(gc *TombstoneGC) (*StateStore, error) {
|
||||||
}
|
}
|
||||||
|
|
||||||
// Snapshot is used to create a point-in-time snapshot of the entire db.
|
// Snapshot is used to create a point-in-time snapshot of the entire db.
|
||||||
func (s *StateStore) Snapshot() *StateSnapshot {
|
func (s *Store) Snapshot() *Snapshot {
|
||||||
tx := s.db.Txn(false)
|
tx := s.db.Txn(false)
|
||||||
|
|
||||||
var tables []string
|
var tables []string
|
||||||
|
@ -122,54 +122,54 @@ func (s *StateStore) Snapshot() *StateSnapshot {
|
||||||
}
|
}
|
||||||
idx := maxIndexTxn(tx, tables...)
|
idx := maxIndexTxn(tx, tables...)
|
||||||
|
|
||||||
return &StateSnapshot{s, tx, idx}
|
return &Snapshot{s, tx, idx}
|
||||||
}
|
}
|
||||||
|
|
||||||
// LastIndex returns that last index that affects the snapshotted data.
|
// LastIndex returns that last index that affects the snapshotted data.
|
||||||
func (s *StateSnapshot) LastIndex() uint64 {
|
func (s *Snapshot) LastIndex() uint64 {
|
||||||
return s.lastIndex
|
return s.lastIndex
|
||||||
}
|
}
|
||||||
|
|
||||||
// Close performs cleanup of a state snapshot.
|
// Close performs cleanup of a state snapshot.
|
||||||
func (s *StateSnapshot) Close() {
|
func (s *Snapshot) Close() {
|
||||||
s.tx.Abort()
|
s.tx.Abort()
|
||||||
}
|
}
|
||||||
|
|
||||||
// Restore is used to efficiently manage restoring a large amount of data into
|
// Restore is used to efficiently manage restoring a large amount of data into
|
||||||
// the state store. It works by doing all the restores inside of a single
|
// the state store. It works by doing all the restores inside of a single
|
||||||
// transaction.
|
// transaction.
|
||||||
func (s *StateStore) Restore() *StateRestore {
|
func (s *Store) Restore() *Restore {
|
||||||
tx := s.db.Txn(true)
|
tx := s.db.Txn(true)
|
||||||
return &StateRestore{s, tx}
|
return &Restore{s, tx}
|
||||||
}
|
}
|
||||||
|
|
||||||
// Abort abandons the changes made by a restore. This or Commit should always be
|
// Abort abandons the changes made by a restore. This or Commit should always be
|
||||||
// called.
|
// called.
|
||||||
func (s *StateRestore) Abort() {
|
func (s *Restore) Abort() {
|
||||||
s.tx.Abort()
|
s.tx.Abort()
|
||||||
}
|
}
|
||||||
|
|
||||||
// Commit commits the changes made by a restore. This or Abort should always be
|
// Commit commits the changes made by a restore. This or Abort should always be
|
||||||
// called.
|
// called.
|
||||||
func (s *StateRestore) Commit() {
|
func (s *Restore) Commit() {
|
||||||
s.tx.Commit()
|
s.tx.Commit()
|
||||||
}
|
}
|
||||||
|
|
||||||
// AbandonCh returns a channel you can wait on to know if the state store was
|
// AbandonCh returns a channel you can wait on to know if the state store was
|
||||||
// abandoned.
|
// abandoned.
|
||||||
func (s *StateStore) AbandonCh() <-chan struct{} {
|
func (s *Store) AbandonCh() <-chan struct{} {
|
||||||
return s.abandonCh
|
return s.abandonCh
|
||||||
}
|
}
|
||||||
|
|
||||||
// Abandon is used to signal that the given state store has been abandoned.
|
// Abandon is used to signal that the given state store has been abandoned.
|
||||||
// Calling this more than one time will panic.
|
// Calling this more than one time will panic.
|
||||||
func (s *StateStore) Abandon() {
|
func (s *Store) Abandon() {
|
||||||
close(s.abandonCh)
|
close(s.abandonCh)
|
||||||
}
|
}
|
||||||
|
|
||||||
// maxIndex is a helper used to retrieve the highest known index
|
// maxIndex is a helper used to retrieve the highest known index
|
||||||
// amongst a set of tables in the db.
|
// amongst a set of tables in the db.
|
||||||
func (s *StateStore) maxIndex(tables ...string) uint64 {
|
func (s *Store) maxIndex(tables ...string) uint64 {
|
||||||
tx := s.db.Txn(false)
|
tx := s.db.Txn(false)
|
||||||
defer tx.Abort()
|
defer tx.Abort()
|
||||||
return maxIndexTxn(tx, tables...)
|
return maxIndexTxn(tx, tables...)
|
||||||
|
|
|
@ -25,7 +25,7 @@ func testUUID() string {
|
||||||
buf[10:16])
|
buf[10:16])
|
||||||
}
|
}
|
||||||
|
|
||||||
func testStateStore(t *testing.T) *StateStore {
|
func testStateStore(t *testing.T) *Store {
|
||||||
s, err := NewStateStore(nil)
|
s, err := NewStateStore(nil)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Fatalf("err: %s", err)
|
t.Fatalf("err: %s", err)
|
||||||
|
@ -36,11 +36,11 @@ func testStateStore(t *testing.T) *StateStore {
|
||||||
return s
|
return s
|
||||||
}
|
}
|
||||||
|
|
||||||
func testRegisterNode(t *testing.T, s *StateStore, idx uint64, nodeID string) {
|
func testRegisterNode(t *testing.T, s *Store, idx uint64, nodeID string) {
|
||||||
testRegisterNodeWithMeta(t, s, idx, nodeID, nil)
|
testRegisterNodeWithMeta(t, s, idx, nodeID, nil)
|
||||||
}
|
}
|
||||||
|
|
||||||
func testRegisterNodeWithMeta(t *testing.T, s *StateStore, idx uint64, nodeID string, meta map[string]string) {
|
func testRegisterNodeWithMeta(t *testing.T, s *Store, idx uint64, nodeID string, meta map[string]string) {
|
||||||
node := &structs.Node{Node: nodeID, Meta: meta}
|
node := &structs.Node{Node: nodeID, Meta: meta}
|
||||||
if err := s.EnsureNode(idx, node); err != nil {
|
if err := s.EnsureNode(idx, node); err != nil {
|
||||||
t.Fatalf("err: %s", err)
|
t.Fatalf("err: %s", err)
|
||||||
|
@ -57,7 +57,7 @@ func testRegisterNodeWithMeta(t *testing.T, s *StateStore, idx uint64, nodeID st
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func testRegisterService(t *testing.T, s *StateStore, idx uint64, nodeID, serviceID string) {
|
func testRegisterService(t *testing.T, s *Store, idx uint64, nodeID, serviceID string) {
|
||||||
svc := &structs.NodeService{
|
svc := &structs.NodeService{
|
||||||
ID: serviceID,
|
ID: serviceID,
|
||||||
Service: serviceID,
|
Service: serviceID,
|
||||||
|
@ -81,7 +81,7 @@ func testRegisterService(t *testing.T, s *StateStore, idx uint64, nodeID, servic
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func testRegisterCheck(t *testing.T, s *StateStore, idx uint64,
|
func testRegisterCheck(t *testing.T, s *Store, idx uint64,
|
||||||
nodeID string, serviceID string, checkID types.CheckID, state string) {
|
nodeID string, serviceID string, checkID types.CheckID, state string) {
|
||||||
chk := &structs.HealthCheck{
|
chk := &structs.HealthCheck{
|
||||||
Node: nodeID,
|
Node: nodeID,
|
||||||
|
@ -107,7 +107,7 @@ func testRegisterCheck(t *testing.T, s *StateStore, idx uint64,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func testSetKey(t *testing.T, s *StateStore, idx uint64, key, value string) {
|
func testSetKey(t *testing.T, s *Store, idx uint64, key, value string) {
|
||||||
entry := &structs.DirEntry{Key: key, Value: []byte(value)}
|
entry := &structs.DirEntry{Key: key, Value: []byte(value)}
|
||||||
if err := s.KVSSet(idx, entry); err != nil {
|
if err := s.KVSSet(idx, entry); err != nil {
|
||||||
t.Fatalf("err: %s", err)
|
t.Fatalf("err: %s", err)
|
||||||
|
|
|
@ -9,7 +9,7 @@ import (
|
||||||
)
|
)
|
||||||
|
|
||||||
// txnKVS handles all KV-related operations.
|
// txnKVS handles all KV-related operations.
|
||||||
func (s *StateStore) txnKVS(tx *memdb.Txn, idx uint64, op *structs.TxnKVOp) (structs.TxnResults, error) {
|
func (s *Store) txnKVS(tx *memdb.Txn, idx uint64, op *structs.TxnKVOp) (structs.TxnResults, error) {
|
||||||
var entry *structs.DirEntry
|
var entry *structs.DirEntry
|
||||||
var err error
|
var err error
|
||||||
|
|
||||||
|
@ -111,7 +111,7 @@ func (s *StateStore) txnKVS(tx *memdb.Txn, idx uint64, op *structs.TxnKVOp) (str
|
||||||
}
|
}
|
||||||
|
|
||||||
// txnDispatch runs the given operations inside the state store transaction.
|
// txnDispatch runs the given operations inside the state store transaction.
|
||||||
func (s *StateStore) txnDispatch(tx *memdb.Txn, idx uint64, ops structs.TxnOps) (structs.TxnResults, structs.TxnErrors) {
|
func (s *Store) txnDispatch(tx *memdb.Txn, idx uint64, ops structs.TxnOps) (structs.TxnResults, structs.TxnErrors) {
|
||||||
results := make(structs.TxnResults, 0, len(ops))
|
results := make(structs.TxnResults, 0, len(ops))
|
||||||
errors := make(structs.TxnErrors, 0, len(ops))
|
errors := make(structs.TxnErrors, 0, len(ops))
|
||||||
for i, op := range ops {
|
for i, op := range ops {
|
||||||
|
@ -149,7 +149,7 @@ func (s *StateStore) txnDispatch(tx *memdb.Txn, idx uint64, ops structs.TxnOps)
|
||||||
// any of the operations fail, the entire transaction will be rolled back. This
|
// any of the operations fail, the entire transaction will be rolled back. This
|
||||||
// is done in a full write transaction on the state store, so reads and writes
|
// is done in a full write transaction on the state store, so reads and writes
|
||||||
// are possible
|
// are possible
|
||||||
func (s *StateStore) TxnRW(idx uint64, ops structs.TxnOps) (structs.TxnResults, structs.TxnErrors) {
|
func (s *Store) TxnRW(idx uint64, ops structs.TxnOps) (structs.TxnResults, structs.TxnErrors) {
|
||||||
tx := s.db.Txn(true)
|
tx := s.db.Txn(true)
|
||||||
defer tx.Abort()
|
defer tx.Abort()
|
||||||
|
|
||||||
|
@ -165,7 +165,7 @@ func (s *StateStore) TxnRW(idx uint64, ops structs.TxnOps) (structs.TxnResults,
|
||||||
// TxnRO runs the given operations inside a single read transaction in the state
|
// TxnRO runs the given operations inside a single read transaction in the state
|
||||||
// store. You must verify outside this function that no write operations are
|
// store. You must verify outside this function that no write operations are
|
||||||
// present, otherwise you'll get an error from the state store.
|
// present, otherwise you'll get an error from the state store.
|
||||||
func (s *StateStore) TxnRO(ops structs.TxnOps) (structs.TxnResults, structs.TxnErrors) {
|
func (s *Store) TxnRO(ops structs.TxnOps) (structs.TxnResults, structs.TxnErrors) {
|
||||||
tx := s.db.Txn(false)
|
tx := s.db.Txn(false)
|
||||||
defer tx.Abort()
|
defer tx.Abort()
|
||||||
|
|
||||||
|
|
|
@ -8,7 +8,7 @@ import (
|
||||||
|
|
||||||
// watchFactory is a function that can create a new WatchFunc
|
// watchFactory is a function that can create a new WatchFunc
|
||||||
// from a parameter configuration
|
// from a parameter configuration
|
||||||
type watchFactory func(params map[string]interface{}) (WatchFunc, error)
|
type watchFactory func(params map[string]interface{}) (WatcherFunc, error)
|
||||||
|
|
||||||
// watchFuncFactory maps each type to a factory function
|
// watchFuncFactory maps each type to a factory function
|
||||||
var watchFuncFactory map[string]watchFactory
|
var watchFuncFactory map[string]watchFactory
|
||||||
|
@ -26,7 +26,7 @@ func init() {
|
||||||
}
|
}
|
||||||
|
|
||||||
// keyWatch is used to return a key watching function
|
// keyWatch is used to return a key watching function
|
||||||
func keyWatch(params map[string]interface{}) (WatchFunc, error) {
|
func keyWatch(params map[string]interface{}) (WatcherFunc, error) {
|
||||||
stale := false
|
stale := false
|
||||||
if err := assignValueBool(params, "stale", &stale); err != nil {
|
if err := assignValueBool(params, "stale", &stale); err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
|
@ -39,7 +39,7 @@ func keyWatch(params map[string]interface{}) (WatchFunc, error) {
|
||||||
if key == "" {
|
if key == "" {
|
||||||
return nil, fmt.Errorf("Must specify a single key to watch")
|
return nil, fmt.Errorf("Must specify a single key to watch")
|
||||||
}
|
}
|
||||||
fn := func(p *WatchPlan) (uint64, interface{}, error) {
|
fn := func(p *Plan) (uint64, interface{}, error) {
|
||||||
kv := p.client.KV()
|
kv := p.client.KV()
|
||||||
opts := consulapi.QueryOptions{AllowStale: stale, WaitIndex: p.lastIndex}
|
opts := consulapi.QueryOptions{AllowStale: stale, WaitIndex: p.lastIndex}
|
||||||
pair, meta, err := kv.Get(key, &opts)
|
pair, meta, err := kv.Get(key, &opts)
|
||||||
|
@ -55,7 +55,7 @@ func keyWatch(params map[string]interface{}) (WatchFunc, error) {
|
||||||
}
|
}
|
||||||
|
|
||||||
// keyPrefixWatch is used to return a key prefix watching function
|
// keyPrefixWatch is used to return a key prefix watching function
|
||||||
func keyPrefixWatch(params map[string]interface{}) (WatchFunc, error) {
|
func keyPrefixWatch(params map[string]interface{}) (WatcherFunc, error) {
|
||||||
stale := false
|
stale := false
|
||||||
if err := assignValueBool(params, "stale", &stale); err != nil {
|
if err := assignValueBool(params, "stale", &stale); err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
|
@ -68,7 +68,7 @@ func keyPrefixWatch(params map[string]interface{}) (WatchFunc, error) {
|
||||||
if prefix == "" {
|
if prefix == "" {
|
||||||
return nil, fmt.Errorf("Must specify a single prefix to watch")
|
return nil, fmt.Errorf("Must specify a single prefix to watch")
|
||||||
}
|
}
|
||||||
fn := func(p *WatchPlan) (uint64, interface{}, error) {
|
fn := func(p *Plan) (uint64, interface{}, error) {
|
||||||
kv := p.client.KV()
|
kv := p.client.KV()
|
||||||
opts := consulapi.QueryOptions{AllowStale: stale, WaitIndex: p.lastIndex}
|
opts := consulapi.QueryOptions{AllowStale: stale, WaitIndex: p.lastIndex}
|
||||||
pairs, meta, err := kv.List(prefix, &opts)
|
pairs, meta, err := kv.List(prefix, &opts)
|
||||||
|
@ -81,13 +81,13 @@ func keyPrefixWatch(params map[string]interface{}) (WatchFunc, error) {
|
||||||
}
|
}
|
||||||
|
|
||||||
// servicesWatch is used to watch the list of available services
|
// servicesWatch is used to watch the list of available services
|
||||||
func servicesWatch(params map[string]interface{}) (WatchFunc, error) {
|
func servicesWatch(params map[string]interface{}) (WatcherFunc, error) {
|
||||||
stale := false
|
stale := false
|
||||||
if err := assignValueBool(params, "stale", &stale); err != nil {
|
if err := assignValueBool(params, "stale", &stale); err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
|
||||||
fn := func(p *WatchPlan) (uint64, interface{}, error) {
|
fn := func(p *Plan) (uint64, interface{}, error) {
|
||||||
catalog := p.client.Catalog()
|
catalog := p.client.Catalog()
|
||||||
opts := consulapi.QueryOptions{AllowStale: stale, WaitIndex: p.lastIndex}
|
opts := consulapi.QueryOptions{AllowStale: stale, WaitIndex: p.lastIndex}
|
||||||
services, meta, err := catalog.Services(&opts)
|
services, meta, err := catalog.Services(&opts)
|
||||||
|
@ -100,13 +100,13 @@ func servicesWatch(params map[string]interface{}) (WatchFunc, error) {
|
||||||
}
|
}
|
||||||
|
|
||||||
// nodesWatch is used to watch the list of available nodes
|
// nodesWatch is used to watch the list of available nodes
|
||||||
func nodesWatch(params map[string]interface{}) (WatchFunc, error) {
|
func nodesWatch(params map[string]interface{}) (WatcherFunc, error) {
|
||||||
stale := false
|
stale := false
|
||||||
if err := assignValueBool(params, "stale", &stale); err != nil {
|
if err := assignValueBool(params, "stale", &stale); err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
|
||||||
fn := func(p *WatchPlan) (uint64, interface{}, error) {
|
fn := func(p *Plan) (uint64, interface{}, error) {
|
||||||
catalog := p.client.Catalog()
|
catalog := p.client.Catalog()
|
||||||
opts := consulapi.QueryOptions{AllowStale: stale, WaitIndex: p.lastIndex}
|
opts := consulapi.QueryOptions{AllowStale: stale, WaitIndex: p.lastIndex}
|
||||||
nodes, meta, err := catalog.Nodes(&opts)
|
nodes, meta, err := catalog.Nodes(&opts)
|
||||||
|
@ -119,7 +119,7 @@ func nodesWatch(params map[string]interface{}) (WatchFunc, error) {
|
||||||
}
|
}
|
||||||
|
|
||||||
// serviceWatch is used to watch a specific service for changes
|
// serviceWatch is used to watch a specific service for changes
|
||||||
func serviceWatch(params map[string]interface{}) (WatchFunc, error) {
|
func serviceWatch(params map[string]interface{}) (WatcherFunc, error) {
|
||||||
stale := false
|
stale := false
|
||||||
if err := assignValueBool(params, "stale", &stale); err != nil {
|
if err := assignValueBool(params, "stale", &stale); err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
|
@ -142,7 +142,7 @@ func serviceWatch(params map[string]interface{}) (WatchFunc, error) {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
|
||||||
fn := func(p *WatchPlan) (uint64, interface{}, error) {
|
fn := func(p *Plan) (uint64, interface{}, error) {
|
||||||
health := p.client.Health()
|
health := p.client.Health()
|
||||||
opts := consulapi.QueryOptions{AllowStale: stale, WaitIndex: p.lastIndex}
|
opts := consulapi.QueryOptions{AllowStale: stale, WaitIndex: p.lastIndex}
|
||||||
nodes, meta, err := health.Service(service, tag, passingOnly, &opts)
|
nodes, meta, err := health.Service(service, tag, passingOnly, &opts)
|
||||||
|
@ -155,7 +155,7 @@ func serviceWatch(params map[string]interface{}) (WatchFunc, error) {
|
||||||
}
|
}
|
||||||
|
|
||||||
// checksWatch is used to watch a specific checks in a given state
|
// checksWatch is used to watch a specific checks in a given state
|
||||||
func checksWatch(params map[string]interface{}) (WatchFunc, error) {
|
func checksWatch(params map[string]interface{}) (WatcherFunc, error) {
|
||||||
stale := false
|
stale := false
|
||||||
if err := assignValueBool(params, "stale", &stale); err != nil {
|
if err := assignValueBool(params, "stale", &stale); err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
|
@ -175,7 +175,7 @@ func checksWatch(params map[string]interface{}) (WatchFunc, error) {
|
||||||
state = "any"
|
state = "any"
|
||||||
}
|
}
|
||||||
|
|
||||||
fn := func(p *WatchPlan) (uint64, interface{}, error) {
|
fn := func(p *Plan) (uint64, interface{}, error) {
|
||||||
health := p.client.Health()
|
health := p.client.Health()
|
||||||
opts := consulapi.QueryOptions{AllowStale: stale, WaitIndex: p.lastIndex}
|
opts := consulapi.QueryOptions{AllowStale: stale, WaitIndex: p.lastIndex}
|
||||||
var checks []*consulapi.HealthCheck
|
var checks []*consulapi.HealthCheck
|
||||||
|
@ -195,7 +195,7 @@ func checksWatch(params map[string]interface{}) (WatchFunc, error) {
|
||||||
}
|
}
|
||||||
|
|
||||||
// eventWatch is used to watch for events, optionally filtering on name
|
// eventWatch is used to watch for events, optionally filtering on name
|
||||||
func eventWatch(params map[string]interface{}) (WatchFunc, error) {
|
func eventWatch(params map[string]interface{}) (WatcherFunc, error) {
|
||||||
// The stale setting doesn't apply to events.
|
// The stale setting doesn't apply to events.
|
||||||
|
|
||||||
var name string
|
var name string
|
||||||
|
@ -203,7 +203,7 @@ func eventWatch(params map[string]interface{}) (WatchFunc, error) {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
|
||||||
fn := func(p *WatchPlan) (uint64, interface{}, error) {
|
fn := func(p *Plan) (uint64, interface{}, error) {
|
||||||
event := p.client.Event()
|
event := p.client.Event()
|
||||||
opts := consulapi.QueryOptions{WaitIndex: p.lastIndex}
|
opts := consulapi.QueryOptions{WaitIndex: p.lastIndex}
|
||||||
events, meta, err := event.List(name, &opts)
|
events, meta, err := event.List(name, &opts)
|
||||||
|
|
|
@ -20,7 +20,7 @@ const (
|
||||||
)
|
)
|
||||||
|
|
||||||
// Run is used to run a watch plan
|
// Run is used to run a watch plan
|
||||||
func (p *WatchPlan) Run(address string) error {
|
func (p *Plan) Run(address string) error {
|
||||||
// Setup the client
|
// Setup the client
|
||||||
p.address = address
|
p.address = address
|
||||||
conf := consulapi.DefaultConfig()
|
conf := consulapi.DefaultConfig()
|
||||||
|
@ -45,7 +45,7 @@ func (p *WatchPlan) Run(address string) error {
|
||||||
OUTER:
|
OUTER:
|
||||||
for !p.shouldStop() {
|
for !p.shouldStop() {
|
||||||
// Invoke the handler
|
// Invoke the handler
|
||||||
index, result, err := p.Func(p)
|
index, result, err := p.Watcher(p)
|
||||||
|
|
||||||
// Check if we should terminate since the function
|
// Check if we should terminate since the function
|
||||||
// could have blocked for a while
|
// could have blocked for a while
|
||||||
|
@ -96,7 +96,7 @@ OUTER:
|
||||||
}
|
}
|
||||||
|
|
||||||
// Stop is used to stop running the watch plan
|
// Stop is used to stop running the watch plan
|
||||||
func (p *WatchPlan) Stop() {
|
func (p *Plan) Stop() {
|
||||||
p.stopLock.Lock()
|
p.stopLock.Lock()
|
||||||
defer p.stopLock.Unlock()
|
defer p.stopLock.Unlock()
|
||||||
if p.stop {
|
if p.stop {
|
||||||
|
@ -106,7 +106,7 @@ func (p *WatchPlan) Stop() {
|
||||||
close(p.stopCh)
|
close(p.stopCh)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (p *WatchPlan) shouldStop() bool {
|
func (p *Plan) shouldStop() bool {
|
||||||
select {
|
select {
|
||||||
case <-p.stopCh:
|
case <-p.stopCh:
|
||||||
return true
|
return true
|
||||||
|
|
|
@ -9,15 +9,15 @@ func init() {
|
||||||
watchFuncFactory["noop"] = noopWatch
|
watchFuncFactory["noop"] = noopWatch
|
||||||
}
|
}
|
||||||
|
|
||||||
func noopWatch(params map[string]interface{}) (WatchFunc, error) {
|
func noopWatch(params map[string]interface{}) (WatcherFunc, error) {
|
||||||
fn := func(p *WatchPlan) (uint64, interface{}, error) {
|
fn := func(p *Plan) (uint64, interface{}, error) {
|
||||||
idx := p.lastIndex + 1
|
idx := p.lastIndex + 1
|
||||||
return idx, idx, nil
|
return idx, idx, nil
|
||||||
}
|
}
|
||||||
return fn, nil
|
return fn, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func mustParse(t *testing.T, q string) *WatchPlan {
|
func mustParse(t *testing.T, q string) *Plan {
|
||||||
params := makeParams(t, q)
|
params := makeParams(t, q)
|
||||||
plan, err := Parse(params)
|
plan, err := Parse(params)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
|
|
@ -8,17 +8,17 @@ import (
|
||||||
consulapi "github.com/hashicorp/consul/api"
|
consulapi "github.com/hashicorp/consul/api"
|
||||||
)
|
)
|
||||||
|
|
||||||
// WatchPlan is the parsed version of a watch specification. A watch provides
|
// Plan is the parsed version of a watch specification. A watch provides
|
||||||
// the details of a query, which generates a view into the Consul data store.
|
// the details of a query, which generates a view into the Consul data store.
|
||||||
// This view is watched for changes and a handler is invoked to take any
|
// This view is watched for changes and a handler is invoked to take any
|
||||||
// appropriate actions.
|
// appropriate actions.
|
||||||
type WatchPlan struct {
|
type Plan struct {
|
||||||
Datacenter string
|
Datacenter string
|
||||||
Token string
|
Token string
|
||||||
Type string
|
Type string
|
||||||
Exempt map[string]interface{}
|
Exempt map[string]interface{}
|
||||||
|
|
||||||
Func WatchFunc
|
Watcher WatcherFunc
|
||||||
Handler HandlerFunc
|
Handler HandlerFunc
|
||||||
LogOutput io.Writer
|
LogOutput io.Writer
|
||||||
|
|
||||||
|
@ -32,21 +32,21 @@ type WatchPlan struct {
|
||||||
stopLock sync.Mutex
|
stopLock sync.Mutex
|
||||||
}
|
}
|
||||||
|
|
||||||
// WatchFunc is used to watch for a diff
|
// WatcherFunc is used to watch for a diff
|
||||||
type WatchFunc func(*WatchPlan) (uint64, interface{}, error)
|
type WatcherFunc func(*Plan) (uint64, interface{}, error)
|
||||||
|
|
||||||
// HandlerFunc is used to handle new data
|
// HandlerFunc is used to handle new data
|
||||||
type HandlerFunc func(uint64, interface{})
|
type HandlerFunc func(uint64, interface{})
|
||||||
|
|
||||||
// Parse takes a watch query and compiles it into a WatchPlan or an error
|
// Parse takes a watch query and compiles it into a WatchPlan or an error
|
||||||
func Parse(params map[string]interface{}) (*WatchPlan, error) {
|
func Parse(params map[string]interface{}) (*Plan, error) {
|
||||||
return ParseExempt(params, nil)
|
return ParseExempt(params, nil)
|
||||||
}
|
}
|
||||||
|
|
||||||
// ParseExempt takes a watch query and compiles it into a WatchPlan or an error
|
// ParseExempt takes a watch query and compiles it into a WatchPlan or an error
|
||||||
// Any exempt parameters are stored in the Exempt map
|
// Any exempt parameters are stored in the Exempt map
|
||||||
func ParseExempt(params map[string]interface{}, exempt []string) (*WatchPlan, error) {
|
func ParseExempt(params map[string]interface{}, exempt []string) (*Plan, error) {
|
||||||
plan := &WatchPlan{
|
plan := &Plan{
|
||||||
stopCh: make(chan struct{}),
|
stopCh: make(chan struct{}),
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -77,7 +77,7 @@ func ParseExempt(params map[string]interface{}, exempt []string) (*WatchPlan, er
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
plan.Func = fn
|
plan.Watcher = fn
|
||||||
|
|
||||||
// Remove the exempt parameters
|
// Remove the exempt parameters
|
||||||
if len(exempt) > 0 {
|
if len(exempt) > 0 {
|
||||||
|
|
Loading…
Reference in New Issue