Fixes remaining non-KV index calclulations and adds a general getWatchTables thing.

This commit is contained in:
James Phillips 2015-10-13 19:18:43 -07:00
parent 32e2439f93
commit 25d7746f38
8 changed files with 242 additions and 197 deletions

View File

@ -52,7 +52,7 @@ type aclCacheEntry struct {
func (s *Server) aclFault(id string) (string, string, error) {
defer metrics.MeasureSince([]string{"consul", "acl", "fault"}, time.Now())
state := s.fsm.State()
acl, err := state.ACLGet(id)
_, acl, err := state.ACLGet(id)
if err != nil {
return "", "", err
}

View File

@ -63,7 +63,7 @@ func (a *ACL) Apply(args *structs.ACLRequest, reply *string) error {
state := a.srv.fsm.State()
for {
args.ACL.ID = generateUUID()
acl, err := state.ACLGet(args.ACL.ID)
_, acl, err := state.ACLGet(args.ACL.ID)
if err != nil {
a.srv.logger.Printf("[ERR] consul.acl: ACL lookup failed: %v", err)
return err
@ -125,14 +125,18 @@ func (a *ACL) Get(args *structs.ACLSpecificRequest,
&reply.QueryMeta,
state.GetQueryWatch("ACLGet"),
func() error {
acl, err := state.ACLGet(args.ACL)
index, acl, err := state.ACLGet(args.ACL)
if err != nil {
return err
}
reply.Index = index
if acl != nil {
reply.Index = acl.ModifyIndex
reply.ACLs = structs.ACLs{acl}
} else {
reply.ACLs = nil
}
return err
return nil
})
}
@ -196,8 +200,12 @@ func (a *ACL) List(args *structs.DCSpecificRequest,
&reply.QueryMeta,
state.GetQueryWatch("ACLList"),
func() error {
var err error
reply.Index, reply.ACLs, err = state.ACLList()
return err
index, acls, err := state.ACLList()
if err != nil {
return err
}
reply.Index, reply.ACLs = index, acls
return nil
})
}

View File

@ -40,7 +40,7 @@ func TestACLEndpoint_Apply(t *testing.T) {
// Verify
state := s1.fsm.State()
s, err := state.ACLGet(out)
_, s, err := state.ACLGet(out)
if err != nil {
t.Fatalf("err: %v", err)
}
@ -62,7 +62,7 @@ func TestACLEndpoint_Apply(t *testing.T) {
}
// Verify
s, err = state.ACLGet(id)
_, s, err = state.ACLGet(id)
if err != nil {
t.Fatalf("err: %v", err)
}
@ -181,7 +181,7 @@ func TestACLEndpoint_Apply_CustomID(t *testing.T) {
// Verify
state := s1.fsm.State()
s, err := state.ACLGet(out)
_, s, err := state.ACLGet(out)
if err != nil {
t.Fatalf("err: %v", err)
}

View File

@ -58,7 +58,7 @@ func TestFSM_RegisterNode(t *testing.T) {
}
// Verify we are registered
node, err := fsm.state.GetNode("foo")
_, node, err := fsm.state.GetNode("foo")
if err != nil {
t.Fatalf("err: %s", err)
}
@ -114,7 +114,7 @@ func TestFSM_RegisterNode_Service(t *testing.T) {
}
// Verify we are registered
node, err := fsm.state.GetNode("foo")
_, node, err := fsm.state.GetNode("foo")
if err != nil {
t.Fatalf("err: %s", err)
}
@ -184,7 +184,7 @@ func TestFSM_DeregisterService(t *testing.T) {
}
// Verify we are registered
node, err := fsm.state.GetNode("foo")
_, node, err := fsm.state.GetNode("foo")
if err != nil {
t.Fatalf("err: %s", err)
}
@ -245,7 +245,7 @@ func TestFSM_DeregisterCheck(t *testing.T) {
}
// Verify we are registered
node, err := fsm.state.GetNode("foo")
_, node, err := fsm.state.GetNode("foo")
if err != nil {
t.Fatalf("err: %s", err)
}
@ -312,7 +312,7 @@ func TestFSM_DeregisterNode(t *testing.T) {
}
// Verify we are not registered
node, err := fsm.state.GetNode("foo")
_, node, err := fsm.state.GetNode("foo")
if err != nil {
t.Fatalf("err: %s", err)
}
@ -459,7 +459,7 @@ func TestFSM_SnapshotRestore(t *testing.T) {
}
// Verify ACL is restored
a, err := fsm2.state.ACLGet(acl.ID)
_, a, err := fsm2.state.ACLGet(acl.ID)
if err != nil {
t.Fatalf("err: %v", err)
}
@ -932,7 +932,7 @@ func TestFSM_ACL_Set_Delete(t *testing.T) {
// Get the ACL
id := resp.(string)
acl, err := fsm.state.ACLGet(id)
_, acl, err := fsm.state.ACLGet(id)
if err != nil {
t.Fatalf("err: %v", err)
}
@ -968,7 +968,7 @@ func TestFSM_ACL_Set_Delete(t *testing.T) {
t.Fatalf("resp: %v", resp)
}
acl, err = fsm.state.ACLGet(id)
_, acl, err = fsm.state.ACLGet(id)
if err != nil {
t.Fatalf("err: %v", err)
}

View File

@ -183,7 +183,7 @@ func (s *Server) initializeACL() error {
// Look for the anonymous token
state := s.fsm.State()
acl, err := state.ACLGet(anonymousToken)
_, acl, err := state.ACLGet(anonymousToken)
if err != nil {
return fmt.Errorf("failed to get anonymous token: %v", err)
}
@ -212,7 +212,7 @@ func (s *Server) initializeACL() error {
}
// Look for the master token
acl, err = state.ACLGet(master)
_, acl, err = state.ACLGet(master)
if err != nil {
return fmt.Errorf("failed to get master token: %v", err)
}
@ -375,7 +375,7 @@ func (s *Server) handleAliveMember(member serf.Member) error {
// Check if the node exists
state := s.fsm.State()
node, err := state.GetNode(member.Name)
_, node, err := state.GetNode(member.Name)
if err != nil {
return fmt.Errorf("failed to lookup node %s: %s", member.Name, err)
}
@ -437,7 +437,7 @@ AFTER_CHECK:
func (s *Server) handleFailedMember(member serf.Member) error {
// Check if the node exists
state := s.fsm.State()
node, err := state.GetNode(member.Name)
_, node, err := state.GetNode(member.Name)
if err != nil {
return fmt.Errorf("failed to lookup node %s: %s", member.Name, err)
}
@ -504,7 +504,7 @@ func (s *Server) handleDeregisterMember(reason string, member serf.Member) error
// Check if the node does not exist
state := s.fsm.State()
node, err := state.GetNode(member.Name)
_, node, err := state.GetNode(member.Name)
if err != nil {
return fmt.Errorf("failed to lookup node %s: %s", member.Name, err)
}

View File

@ -34,7 +34,7 @@ func TestLeader_RegisterMember(t *testing.T) {
// Client should be registered
state := s1.fsm.State()
testutil.WaitForResult(func() (bool, error) {
node, err := state.GetNode(c1.config.NodeName)
_, node, err := state.GetNode(c1.config.NodeName)
if err != nil {
t.Fatalf("err: %v", err)
}
@ -62,7 +62,7 @@ func TestLeader_RegisterMember(t *testing.T) {
}
// Server should be registered
node, err := state.GetNode(s1.config.NodeName)
_, node, err := state.GetNode(s1.config.NodeName)
if err != nil {
t.Fatalf("err: %v", err)
}
@ -104,7 +104,7 @@ func TestLeader_FailedMember(t *testing.T) {
// Should be registered
state := s1.fsm.State()
testutil.WaitForResult(func() (bool, error) {
node, err := state.GetNode(c1.config.NodeName)
_, node, err := state.GetNode(c1.config.NodeName)
if err != nil {
t.Fatalf("err: %v", err)
}
@ -159,7 +159,7 @@ func TestLeader_LeftMember(t *testing.T) {
// Should be registered
testutil.WaitForResult(func() (bool, error) {
node, err := state.GetNode(c1.config.NodeName)
_, node, err := state.GetNode(c1.config.NodeName)
if err != nil {
t.Fatalf("err: %v", err)
}
@ -174,7 +174,7 @@ func TestLeader_LeftMember(t *testing.T) {
// Should be deregistered
testutil.WaitForResult(func() (bool, error) {
node, err := state.GetNode(c1.config.NodeName)
_, node, err := state.GetNode(c1.config.NodeName)
if err != nil {
t.Fatalf("err: %v", err)
}
@ -204,7 +204,7 @@ func TestLeader_ReapMember(t *testing.T) {
// Should be registered
testutil.WaitForResult(func() (bool, error) {
node, err := state.GetNode(c1.config.NodeName)
_, node, err := state.GetNode(c1.config.NodeName)
if err != nil {
t.Fatalf("err: %v", err)
}
@ -227,7 +227,7 @@ func TestLeader_ReapMember(t *testing.T) {
// Should be deregistered
testutil.WaitForResult(func() (bool, error) {
node, err := state.GetNode(c1.config.NodeName)
_, node, err := state.GetNode(c1.config.NodeName)
if err != nil {
t.Fatalf("err: %v", err)
}
@ -268,7 +268,7 @@ func TestLeader_Reconcile_ReapMember(t *testing.T) {
// Node should be gone
state := s1.fsm.State()
node, err := state.GetNode("no-longer-around")
_, node, err := state.GetNode("no-longer-around")
if err != nil {
t.Fatalf("err: %v", err)
}
@ -295,7 +295,7 @@ func TestLeader_Reconcile(t *testing.T) {
// Should not be registered
state := s1.fsm.State()
node, err := state.GetNode(c1.config.NodeName)
_, node, err := state.GetNode(c1.config.NodeName)
if err != nil {
t.Fatalf("err: %v", err)
}
@ -305,7 +305,7 @@ func TestLeader_Reconcile(t *testing.T) {
// Should be registered
testutil.WaitForResult(func() (bool, error) {
node, err = state.GetNode(c1.config.NodeName)
_, node, err = state.GetNode(c1.config.NodeName)
if err != nil {
t.Fatalf("err: %v", err)
}
@ -433,7 +433,7 @@ func TestLeader_LeftLeader(t *testing.T) {
// Verify the old leader is deregistered
state := remain.fsm.State()
testutil.WaitForResult(func() (bool, error) {
node, err := state.GetNode(leader.config.NodeName)
_, node, err := state.GetNode(leader.config.NodeName)
if err != nil {
t.Fatalf("err: %v", err)
}

View File

@ -217,8 +217,7 @@ func (s *StateSnapshot) SessionDump() (structs.Sessions, error) {
// ACLDump is used to pull all the ACLs from the snapshot.
func (s *StateSnapshot) ACLDump() (structs.ACLs, error) {
_, ret, err := aclListTxn(s.tx)
return ret, err
return aclListTxn(s.tx)
}
// maxIndex is a helper used to retrieve the highest known index
@ -282,6 +281,30 @@ func (s *StateStore) ReapTombstones(index uint64) error {
return nil
}
// getWatchTables returns the list of tables that should be watched and used for
// max index calculations for the given query method. This is used for all
// methods except for KVS. This will panic if the method is unknown.
func (s *StateStore) getWatchTables(method string) []string {
switch method {
case "GetNode", "Nodes":
return []string{"nodes"}
case "Services":
return []string{"services"}
case "ServiceNodes", "NodeServices":
return []string{"nodes", "services"}
case "NodeChecks", "ServiceChecks", "ChecksInState":
return []string{"checks"}
case "CheckServiceNodes", "NodeInfo", "NodeDump":
return []string{"nodes", "services", "checks"}
case "SessionGet", "SessionList", "NodeSessions":
return []string{"sessions"}
case "ACLGet", "ACLList":
return []string{"acls"}
}
panic(fmt.Sprintf("Unknown method %s", method))
}
// getTableWatch returns a full table watch for the given table. This will panic
// if the table doesn't have a full table watch.
func (s *StateStore) getTableWatch(table string) Watch {
@ -294,28 +317,18 @@ func (s *StateStore) getTableWatch(table string) Watch {
// GetQueryWatch returns a watch for the given query method. This is
// used for all methods except for KV; you should call GetKVSWatch instead.
// This will panic if the method is unknown.
func (s *StateStore) GetQueryWatch(method string) Watch {
switch method {
case "GetNode", "Nodes":
return s.getTableWatch("nodes")
case "Services":
return s.getTableWatch("services")
case "ServiceNodes", "NodeServices":
return NewMultiWatch(s.getTableWatch("nodes"),
s.getTableWatch("services"))
case "NodeChecks", "ServiceChecks", "ChecksInState":
return s.getTableWatch("checks")
case "CheckServiceNodes", "NodeInfo", "NodeDump":
return NewMultiWatch(s.getTableWatch("nodes"),
s.getTableWatch("services"),
s.getTableWatch("checks"))
case "SessionGet", "SessionList", "NodeSessions":
return s.getTableWatch("sessions")
case "ACLGet", "ACLList":
return s.getTableWatch("acls")
tables := s.getWatchTables(method)
if len(tables) == 1 {
return s.getTableWatch(tables[0])
}
panic(fmt.Sprintf("Unknown method %s", method))
var watches []Watch
for _, table := range tables {
watches = append(watches, s.getTableWatch(table))
}
return NewMultiWatch(watches...)
}
// GetKVSWatch returns a watch for the given prefix in the key value store.
@ -405,19 +418,22 @@ func (s *StateStore) ensureNodeTxn(tx *memdb.Txn, idx uint64, node *structs.Node
}
// GetNode is used to retrieve a node registration by node ID.
func (s *StateStore) GetNode(id string) (*structs.Node, error) {
func (s *StateStore) GetNode(id string) (uint64, *structs.Node, error) {
tx := s.db.Txn(false)
defer tx.Abort()
// Get the table index.
idx := maxIndexTxn(tx, s.getWatchTables("GetNode")...)
// Retrieve the node from the state store
node, err := tx.First("nodes", "id", id)
if err != nil {
return nil, fmt.Errorf("node lookup failed: %s", err)
return 0, nil, fmt.Errorf("node lookup failed: %s", err)
}
if node != nil {
return node.(*structs.Node), nil
return idx, node.(*structs.Node), nil
}
return nil, nil
return idx, nil, nil
}
// Nodes is used to return all of the known nodes.
@ -425,24 +441,21 @@ func (s *StateStore) Nodes() (uint64, structs.Nodes, error) {
tx := s.db.Txn(false)
defer tx.Abort()
// Get the table index.
idx := maxIndexTxn(tx, s.getWatchTables("Nodes")...)
// Retrieve all of the nodes
nodes, err := tx.Get("nodes", "id")
if err != nil {
return 0, nil, fmt.Errorf("failed nodes lookup: %s", err)
}
// Create and return the nodes list, tracking the highest
// index we see.
var lindex uint64
// Create and return the nodes list.
var results structs.Nodes
for node := nodes.Next(); node != nil; node = nodes.Next() {
n := node.(*structs.Node)
if n.ModifyIndex > lindex {
lindex = n.ModifyIndex
}
results = append(results, n)
results = append(results, node.(*structs.Node))
}
return lindex, results, nil
return idx, results, nil
}
// DeleteNode is used to delete a given node by its ID.
@ -610,6 +623,9 @@ func (s *StateStore) Services() (uint64, structs.Services, error) {
tx := s.db.Txn(false)
defer tx.Abort()
// Get the table index.
idx := maxIndexTxn(tx, s.getWatchTables("Services")...)
// List all the services.
services, err := tx.Get("services", "id")
if err != nil {
@ -618,17 +634,9 @@ func (s *StateStore) Services() (uint64, structs.Services, error) {
// Rip through the services and enumerate them and their unique set of
// tags.
var lindex uint64
unique := make(map[string]map[string]struct{})
for service := services.Next(); service != nil; service = services.Next() {
sn := service.(*structs.ServiceNode)
// Track the highest index
if sn.ModifyIndex > lindex {
lindex = sn.ModifyIndex
}
// Capture the unique set of tags.
tags, ok := unique[sn.ServiceName]
if !ok {
unique[sn.ServiceName] = make(map[string]struct{})
@ -647,7 +655,7 @@ func (s *StateStore) Services() (uint64, structs.Services, error) {
results[service] = append(results[service], tag)
}
}
return lindex, results, nil
return idx, results, nil
}
// ServiceNodes returns the nodes associated with a given service.
@ -655,17 +663,25 @@ func (s *StateStore) ServiceNodes(service string) (uint64, structs.ServiceNodes,
tx := s.db.Txn(false)
defer tx.Abort()
// Get the table index.
idx := maxIndexTxn(tx, s.getWatchTables("ServiceNodes")...)
// List all the services.
services, err := tx.Get("services", "service", service)
if err != nil {
return 0, nil, fmt.Errorf("failed service lookup: %s", err)
}
var results structs.ServiceNodes
for s := services.Next(); s != nil; s = services.Next() {
sn := s.(*structs.ServiceNode)
results = append(results, sn)
results = append(results, s.(*structs.ServiceNode))
}
return s.parseServiceNodes(tx, results)
// Fill in the address details.
results, err = s.parseServiceNodes(tx, results)
if err != nil {
return 0, nil, fmt.Errorf("failed parsing service nodes: %s", err)
}
return idx, results, nil
}
// ServiceTagNodes returns the nodes associated with a given service, filtering
@ -674,11 +690,16 @@ func (s *StateStore) ServiceTagNodes(service, tag string) (uint64, structs.Servi
tx := s.db.Txn(false)
defer tx.Abort()
// Get the table index.
idx := maxIndexTxn(tx, s.getWatchTables("ServiceNodes")...)
// List all the services.
services, err := tx.Get("services", "service", service)
if err != nil {
return 0, nil, fmt.Errorf("failed service lookup: %s", err)
}
// Gather all the services and apply the tag filter.
var results structs.ServiceNodes
for s := services.Next(); s != nil; s = services.Next() {
sn := s.(*structs.ServiceNode)
@ -686,7 +707,13 @@ func (s *StateStore) ServiceTagNodes(service, tag string) (uint64, structs.Servi
results = append(results, sn)
}
}
return s.parseServiceNodes(tx, results)
// Fill in the address details.
results, err = s.parseServiceNodes(tx, results)
if err != nil {
return 0, nil, fmt.Errorf("failed parsing service nodes: %s", err)
}
return idx, results, nil
}
// serviceTagFilter returns true (should filter) if the given service node
@ -707,15 +734,9 @@ func serviceTagFilter(sn *structs.ServiceNode, tag string) bool {
// parseServiceNodes iterates over a services query and fills in the node details,
// returning a ServiceNodes slice.
func (s *StateStore) parseServiceNodes(tx *memdb.Txn, services structs.ServiceNodes) (uint64, structs.ServiceNodes, error) {
func (s *StateStore) parseServiceNodes(tx *memdb.Txn, services structs.ServiceNodes) (structs.ServiceNodes, error) {
var results structs.ServiceNodes
var lindex uint64
for _, sn := range services {
// Track the highest index.
if sn.ModifyIndex > lindex {
lindex = sn.ModifyIndex
}
// TODO (slackpad) - This is sketchy because we are altering the
// structure from the database, but we are hitting a non-indexed
// field. Think about this a little and make sure it's really
@ -724,12 +745,12 @@ func (s *StateStore) parseServiceNodes(tx *memdb.Txn, services structs.ServiceNo
// Fill in the address of the node.
n, err := tx.First("nodes", "id", sn.Node)
if err != nil {
return 0, nil, fmt.Errorf("failed node lookup: %s", err)
return nil, fmt.Errorf("failed node lookup: %s", err)
}
sn.Address = n.(*structs.Node).Address
results = append(results, sn)
}
return lindex, results, nil
return results, nil
}
// NodeServices is used to query service registrations by node ID.
@ -737,6 +758,9 @@ func (s *StateStore) NodeServices(nodeID string) (uint64, *structs.NodeServices,
tx := s.db.Txn(false)
defer tx.Abort()
// Get the table index.
idx := maxIndexTxn(tx, s.getWatchTables("NodeServices")...)
// Query the node
n, err := tx.First("nodes", "id", nodeID)
if err != nil {
@ -759,16 +783,10 @@ func (s *StateStore) NodeServices(nodeID string) (uint64, *structs.NodeServices,
Services: make(map[string]*structs.NodeService),
}
// Add all of the services to the map, tracking the highest index
var lindex uint64
// Add all of the services to the map.
for service := services.Next(); service != nil; service = services.Next() {
sn := service.(*structs.ServiceNode)
// Track the highest index
if sn.ModifyIndex > lindex {
lindex = sn.ModifyIndex
}
// Create the NodeService
svc := &structs.NodeService{
ID: sn.ServiceID,
@ -784,7 +802,7 @@ func (s *StateStore) NodeServices(nodeID string) (uint64, *structs.NodeServices,
ns.Services[svc.ID] = svc
}
return lindex, ns, nil
return idx, ns, nil
}
// DeleteService is used to delete a given service associated with a node.
@ -953,7 +971,16 @@ func (s *StateStore) ensureCheckTxn(tx *memdb.Txn, idx uint64, hc *structs.Healt
func (s *StateStore) NodeChecks(nodeID string) (uint64, structs.HealthChecks, error) {
tx := s.db.Txn(false)
defer tx.Abort()
return s.parseChecks(tx.Get("checks", "node", nodeID))
// Get the table index.
idx := maxIndexTxn(tx, s.getWatchTables("NodeChecks")...)
// Return the checks.
checks, err := tx.Get("checks", "node", nodeID)
if err != nil {
return 0, nil, fmt.Errorf("failed check lookup: %s", err)
}
return s.parseChecks(idx, checks)
}
// ServiceChecks is used to get all checks associated with a
@ -962,7 +989,16 @@ func (s *StateStore) NodeChecks(nodeID string) (uint64, structs.HealthChecks, er
func (s *StateStore) ServiceChecks(serviceName string) (uint64, structs.HealthChecks, error) {
tx := s.db.Txn(false)
defer tx.Abort()
return s.parseChecks(tx.Get("checks", "service", serviceName))
// Get the table index.
idx := maxIndexTxn(tx, s.getWatchTables("ServiceChecks")...)
// Return the checks.
checks, err := tx.Get("checks", "service", serviceName)
if err != nil {
return 0, nil, fmt.Errorf("failed check lookup: %s", err)
}
return s.parseChecks(idx, checks)
}
// ChecksInState is used to query the state store for all checks
@ -971,34 +1007,35 @@ func (s *StateStore) ChecksInState(state string) (uint64, structs.HealthChecks,
tx := s.db.Txn(false)
defer tx.Abort()
// Get the table index.
idx := maxIndexTxn(tx, s.getWatchTables("ChecksInState")...)
// Query all checks if HealthAny is passed
if state == structs.HealthAny {
return s.parseChecks(tx.Get("checks", "status"))
checks, err := tx.Get("checks", "status")
if err != nil {
return 0, nil, fmt.Errorf("failed check lookup: %s", err)
}
return s.parseChecks(idx, checks)
}
// Any other state we need to query for explicitly
return s.parseChecks(tx.Get("checks", "status", state))
checks, err := tx.Get("checks", "status", state)
if err != nil {
return 0, nil, fmt.Errorf("failed check lookup: %s", err)
}
return s.parseChecks(idx, checks)
}
// parseChecks is a helper function used to deduplicate some
// repetitive code for returning health checks.
func (s *StateStore) parseChecks(iter memdb.ResultIterator, err error) (uint64, structs.HealthChecks, error) {
if err != nil {
return 0, nil, fmt.Errorf("failed health check lookup: %s", err)
}
func (s *StateStore) parseChecks(idx uint64, iter memdb.ResultIterator) (uint64, structs.HealthChecks, error) {
// Gather the health checks and return them properly type casted.
// Track the highest index along the way.
var results structs.HealthChecks
var lindex uint64
for check := iter.Next(); check != nil; check = iter.Next() {
hc := check.(*structs.HealthCheck)
if hc.ModifyIndex > lindex {
lindex = hc.ModifyIndex
}
results = append(results, hc)
results = append(results, check.(*structs.HealthCheck))
}
return lindex, results, nil
return idx, results, nil
}
// DeleteCheck is used to delete a health check registration.
@ -1066,18 +1103,21 @@ func (s *StateStore) CheckServiceNodes(service string) (uint64, structs.CheckSer
tx := s.db.Txn(false)
defer tx.Abort()
// Get the table index.
idx := maxIndexTxn(tx, s.getWatchTables("CheckServiceNodes")...)
// Query the state store for the service.
services, err := tx.Get("services", "service", service)
if err != nil {
return 0, nil, fmt.Errorf("failed service lookup: %s", err)
}
// Return the results.
var results structs.ServiceNodes
for s := services.Next(); s != nil; s = services.Next() {
sn := s.(*structs.ServiceNode)
results = append(results, sn)
results = append(results, s.(*structs.ServiceNode))
}
return s.parseCheckServiceNodes(tx, results, err)
return s.parseCheckServiceNodes(tx, idx, results, err)
}
// CheckServiceTagNodes is used to query all nodes and checks for a given
@ -1088,12 +1128,16 @@ func (s *StateStore) CheckServiceTagNodes(service, tag string) (uint64, structs.
tx := s.db.Txn(false)
defer tx.Abort()
// Get the table index.
idx := maxIndexTxn(tx, s.getWatchTables("CheckServiceNodes")...)
// Query the state store for the service.
services, err := tx.Get("services", "service", service)
if err != nil {
return 0, nil, fmt.Errorf("failed service lookup: %s", err)
}
// Return the results, filtering by tag.
var results structs.ServiceNodes
for s := services.Next(); s != nil; s = services.Next() {
sn := s.(*structs.ServiceNode)
@ -1101,27 +1145,21 @@ func (s *StateStore) CheckServiceTagNodes(service, tag string) (uint64, structs.
results = append(results, sn)
}
}
return s.parseCheckServiceNodes(tx, results, err)
return s.parseCheckServiceNodes(tx, idx, results, err)
}
// parseCheckServiceNodes is used to parse through a given set of services,
// and query for an associated node and a set of checks. This is the inner
// method used to return a rich set of results from a more simple query.
func (s *StateStore) parseCheckServiceNodes(
tx *memdb.Txn, services structs.ServiceNodes,
tx *memdb.Txn, idx uint64, services structs.ServiceNodes,
err error) (uint64, structs.CheckServiceNodes, error) {
if err != nil {
return 0, nil, err
}
var results structs.CheckServiceNodes
var lindex uint64
for _, sn := range services {
// Compute the index.
if sn.ModifyIndex > lindex {
lindex = sn.ModifyIndex
}
// Retrieve the node.
n, err := tx.First("nodes", "id", sn.Node)
if err != nil {
@ -1131,9 +1169,6 @@ func (s *StateStore) parseCheckServiceNodes(
return 0, nil, ErrMissingNode
}
node := n.(*structs.Node)
if node.ModifyIndex > lindex {
lindex = node.ModifyIndex
}
// We need to return the checks specific to the given service
// as well as the node itself. Unfortunately, memdb won't let
@ -1147,9 +1182,6 @@ func (s *StateStore) parseCheckServiceNodes(
for check := iter.Next(); check != nil; check = iter.Next() {
hc := check.(*structs.HealthCheck)
if hc.ServiceID == "" || hc.ServiceID == sn.ServiceID {
if hc.ModifyIndex > lindex {
lindex = hc.ModifyIndex
}
checks = append(checks, hc)
}
}
@ -1168,7 +1200,7 @@ func (s *StateStore) parseCheckServiceNodes(
})
}
return lindex, results, nil
return idx, results, nil
}
// NodeInfo is used to generate a dump of a single node. The dump includes
@ -1177,12 +1209,15 @@ func (s *StateStore) NodeInfo(node string) (uint64, structs.NodeDump, error) {
tx := s.db.Txn(false)
defer tx.Abort()
// Get the table index.
idx := maxIndexTxn(tx, s.getWatchTables("NodeInfo")...)
// Query the node by the passed node
nodes, err := tx.Get("nodes", "id", node)
if err != nil {
return 0, nil, fmt.Errorf("failed node lookup: %s", err)
}
return s.parseNodes(tx, nodes)
return s.parseNodes(tx, idx, nodes)
}
// NodeDump is used to generate a dump of all nodes. This call is expensive
@ -1192,28 +1227,26 @@ func (s *StateStore) NodeDump() (uint64, structs.NodeDump, error) {
tx := s.db.Txn(false)
defer tx.Abort()
// Get the table index.
idx := maxIndexTxn(tx, s.getWatchTables("NodeDump")...)
// Fetch all of the registered nodes
nodes, err := tx.Get("nodes", "id")
if err != nil {
return 0, nil, fmt.Errorf("failed node lookup: %s", err)
}
return s.parseNodes(tx, nodes)
return s.parseNodes(tx, idx, nodes)
}
// parseNodes takes an iterator over a set of nodes and returns a struct
// containing the nodes along with all of their associated services
// and/or health checks.
func (s *StateStore) parseNodes(
tx *memdb.Txn,
func (s *StateStore) parseNodes(tx *memdb.Txn, idx uint64,
iter memdb.ResultIterator) (uint64, structs.NodeDump, error) {
var results structs.NodeDump
var lindex uint64
for n := iter.Next(); n != nil; n = iter.Next() {
node := n.(*structs.Node)
if node.ModifyIndex > lindex {
lindex = node.ModifyIndex
}
// Create the wrapped node
dump := &structs.NodeInfo{
@ -1228,9 +1261,6 @@ func (s *StateStore) parseNodes(
}
for service := services.Next(); service != nil; service = services.Next() {
svc := service.(*structs.ServiceNode)
if svc.ModifyIndex > lindex {
lindex = svc.ModifyIndex
}
ns := &structs.NodeService{
ID: svc.ServiceID,
Service: svc.ServiceName,
@ -1250,16 +1280,13 @@ func (s *StateStore) parseNodes(
}
for check := checks.Next(); check != nil; check = checks.Next() {
hc := check.(*structs.HealthCheck)
if hc.ModifyIndex > lindex {
lindex = hc.ModifyIndex
}
dump.Checks = append(dump.Checks, hc)
}
// Add the result to the slice
results = append(results, dump)
}
return lindex, results, nil
return idx, results, nil
}
// KVSSet is used to store a key/value pair.
@ -1324,7 +1351,7 @@ func (s *StateStore) KVSGet(key string) (uint64, *structs.DirEntry, error) {
defer tx.Abort()
// Get the table index.
idx := maxIndexTxn(tx, "kvs")
idx := maxIndexTxn(tx, "kvs", "tombstones")
// Retrieve the key.
entry, err := tx.First("kvs", "id", key)
@ -1854,7 +1881,7 @@ func (s *StateStore) SessionGet(sessionID string) (uint64, *structs.Session, err
defer tx.Abort()
// Get the table index.
idx := maxIndexTxn(tx, "sessions")
idx := maxIndexTxn(tx, s.getWatchTables("SessionGet")...)
// Look up the session by its ID
session, err := tx.First("sessions", "id", sessionID)
@ -1873,7 +1900,7 @@ func (s *StateStore) SessionList() (uint64, structs.Sessions, error) {
defer tx.Abort()
// Get the table index.
idx := maxIndexTxn(tx, "sessions")
idx := maxIndexTxn(tx, s.getWatchTables("SessionList")...)
// Query all of the active sessions.
sessions, err := tx.Get("sessions", "id")
@ -1897,7 +1924,7 @@ func (s *StateStore) NodeSessions(nodeID string) (uint64, structs.Sessions, erro
defer tx.Abort()
// Get the table index.
idx := maxIndexTxn(tx, "sessions")
idx := maxIndexTxn(tx, s.getWatchTables("NodeSessions")...)
// Get all of the sessions which belong to the node
sessions, err := tx.Get("sessions", "node", nodeID)
@ -2110,50 +2137,56 @@ func (s *StateStore) aclSetTxn(tx *memdb.Txn, idx uint64, acl *structs.ACL) erro
}
// ACLGet is used to look up an existing ACL by ID.
func (s *StateStore) ACLGet(aclID string) (*structs.ACL, error) {
func (s *StateStore) ACLGet(aclID string) (uint64, *structs.ACL, error) {
tx := s.db.Txn(false)
defer tx.Abort()
// Get the table index.
idx := maxIndexTxn(tx, s.getWatchTables("ACLGet")...)
// Query for the existing ACL
acl, err := tx.First("acls", "id", aclID)
if err != nil {
return nil, fmt.Errorf("failed acl lookup: %s", err)
return 0, nil, fmt.Errorf("failed acl lookup: %s", err)
}
if acl != nil {
return acl.(*structs.ACL), nil
return idx, acl.(*structs.ACL), nil
}
return nil, nil
return idx, nil, nil
}
// ACLList is used to list out all of the ACLs in the state store.
func (s *StateStore) ACLList() (uint64, structs.ACLs, error) {
tx := s.db.Txn(false)
defer tx.Abort()
return aclListTxn(tx)
// Get the table index.
idx := maxIndexTxn(tx, s.getWatchTables("ACLList")...)
// Return the ACLs.
acls, err := aclListTxn(tx)
if err != nil {
return 0, nil, fmt.Errorf("failed acl lookup: %s", err)
}
return idx, acls, nil
}
// aclListTxn is used to list out all of the ACLs in the state store. This is a
// function vs. a method so it can be called from the snapshotter.
func aclListTxn(tx *memdb.Txn) (uint64, structs.ACLs, error) {
func aclListTxn(tx *memdb.Txn) (structs.ACLs, error) {
// Query all of the ACLs in the state store
acls, err := tx.Get("acls", "id")
if err != nil {
return 0, nil, fmt.Errorf("failed acl lookup: %s", err)
return nil, fmt.Errorf("failed acl lookup: %s", err)
}
// Go over all of the ACLs and build the response
var result structs.ACLs
var lindex uint64
for acl := acls.Next(); acl != nil; acl = acls.Next() {
a := acl.(*structs.ACL)
result = append(result, a)
// Accumulate the highest index
if a.ModifyIndex > lindex {
lindex = a.ModifyIndex
}
}
return lindex, result, nil
return result, nil
}
// ACLDelete is used to remove an existing ACL from the state store. If

View File

@ -286,6 +286,9 @@ func TestStateStore_GetWatches(t *testing.T) {
if w := s.GetQueryWatch("Nodes"); w == nil {
t.Fatalf("didn't get a watch")
}
if w := s.GetQueryWatch("NodeDump"); w == nil {
t.Fatalf("didn't get a watch")
}
if w := s.GetKVSWatch("/dogs"); w == nil {
t.Fatalf("didn't get a watch")
}
@ -305,7 +308,7 @@ func TestStateStore_EnsureRegistration(t *testing.T) {
// Retrieve the node and verify its contents.
verifyNode := func(created, modified uint64) {
out, err := s.GetNode("node1")
_, out, err := s.GetNode("node1")
if err != nil {
t.Fatalf("err: %s", err)
}
@ -479,7 +482,7 @@ func TestStateStore_EnsureNode(t *testing.T) {
s := testStateStore(t)
// Fetching a non-existent node returns nil
if node, err := s.GetNode("node1"); node != nil || err != nil {
if _, node, err := s.GetNode("node1"); node != nil || err != nil {
t.Fatalf("expected (nil, nil), got: (%#v, %#v)", node, err)
}
@ -495,7 +498,7 @@ func TestStateStore_EnsureNode(t *testing.T) {
}
// Retrieve the node again
out, err := s.GetNode("node1")
idx, out, err := s.GetNode("node1")
if err != nil {
t.Fatalf("err: %s", err)
}
@ -509,6 +512,9 @@ func TestStateStore_EnsureNode(t *testing.T) {
if out.CreateIndex != 1 || out.ModifyIndex != 1 {
t.Fatalf("bad node index: %#v", out)
}
if idx != 1 {
t.Fatalf("bad index: %d", idx)
}
// Update the node registration
in.Address = "1.1.1.2"
@ -517,7 +523,7 @@ func TestStateStore_EnsureNode(t *testing.T) {
}
// Retrieve the node
out, err = s.GetNode("node1")
idx, out, err = s.GetNode("node1")
if err != nil {
t.Fatalf("err: %s", err)
}
@ -526,21 +532,22 @@ func TestStateStore_EnsureNode(t *testing.T) {
if out.CreateIndex != 1 || out.ModifyIndex != 2 || out.Address != "1.1.1.2" {
t.Fatalf("bad: %#v", out)
}
if idx != 2 {
t.Fatalf("bad index: %d", idx)
}
// Node upsert preserves the create index
if err := s.EnsureNode(3, in); err != nil {
t.Fatalf("err: %s", err)
}
out, err = s.GetNode("node1")
idx, out, err = s.GetNode("node1")
if err != nil {
t.Fatalf("err: %s", err)
}
if out.CreateIndex != 1 || out.ModifyIndex != 3 || out.Address != "1.1.1.2" {
t.Fatalf("node was modified: %#v", out)
}
// Index tables were updated
if idx := s.maxIndex("nodes"); idx != 3 {
if idx != 3 {
t.Fatalf("bad index: %d", idx)
}
}
@ -601,8 +608,8 @@ func TestStateStore_DeleteNode(t *testing.T) {
}
// The node was removed
if n, err := s.GetNode("node1"); err != nil || n != nil {
t.Fatalf("bad: %#v (err: %#v)", n, err)
if idx, n, err := s.GetNode("node1"); err != nil || n != nil || idx != 3 {
t.Fatalf("bad: %#v %d (err: %#v)", n, idx, err)
}
// Associated service was removed. Need to query this directly out of
@ -774,9 +781,7 @@ func TestStateStore_EnsureService(t *testing.T) {
if err != nil {
t.Fatalf("err: %s", err)
}
// Highest index for the result set was returned
if idx != 20 {
if idx != 30 {
t.Fatalf("bad index: %d", idx)
}
@ -1011,7 +1016,7 @@ func TestStateStore_ServiceTagNodes(t *testing.T) {
if err != nil {
t.Fatalf("err: %s", err)
}
if idx != 17 {
if idx != 19 {
t.Fatalf("bad: %v", idx)
}
if len(nodes) != 1 {
@ -1058,7 +1063,7 @@ func TestStateStore_ServiceTagNodes_MultipleTags(t *testing.T) {
if err != nil {
t.Fatalf("err: %s", err)
}
if idx != 17 {
if idx != 19 {
t.Fatalf("bad: %v", idx)
}
if len(nodes) != 1 {
@ -1092,7 +1097,7 @@ func TestStateStore_ServiceTagNodes_MultipleTags(t *testing.T) {
if err != nil {
t.Fatalf("err: %s", err)
}
if idx != 18 {
if idx != 19 {
t.Fatalf("bad: %v", idx)
}
if len(nodes) != 1 {
@ -1381,7 +1386,7 @@ func TestStateStore_NodeChecks(t *testing.T) {
if err != nil {
t.Fatalf("err: %s", err)
}
if idx != 3 {
if idx != 6 {
t.Fatalf("bad index: %d", idx)
}
if len(checks) != 2 || checks[0].CheckID != "check1" || checks[1].CheckID != "check2" {
@ -1420,7 +1425,7 @@ func TestStateStore_ServiceChecks(t *testing.T) {
if err != nil {
t.Fatalf("err: %s", err)
}
if idx != 3 {
if idx != 6 {
t.Fatalf("bad index: %d", idx)
}
if len(checks) != 2 || checks[0].CheckID != "check1" || checks[1].CheckID != "check2" {
@ -1535,11 +1540,7 @@ func TestStateStore_CheckServiceNodes(t *testing.T) {
if err != nil {
t.Fatalf("err: %s", err)
}
// Check the index returned matches the result set. The index
// should be the highest observed from the result, in this case
// this comes from the check registration.
if idx != 6 {
if idx != 7 {
t.Fatalf("bad index: %d", idx)
}
@ -1863,7 +1864,7 @@ func TestStateStore_NodeInfo_NodeDump(t *testing.T) {
if err != nil {
t.Fatalf("err: %s", err)
}
if idx != 8 {
if idx != 9 {
t.Fatalf("bad index: %d", idx)
}
if len(dump) != 1 || !reflect.DeepEqual(dump[0], expect[0]) {
@ -3973,10 +3974,10 @@ func TestStateStore_Session_Invalidate_Key_Delete_Behavior(t *testing.T) {
func TestStateStore_ACLSet_ACLGet(t *testing.T) {
s := testStateStore(t)
// Querying ACL's with no results returns nil
res, err := s.ACLGet("nope")
if res != nil || err != nil {
t.Fatalf("expected (nil, nil), got: (%#v, %#v)", res, err)
// Querying ACLs with no results returns nil
idx, res, err := s.ACLGet("nope")
if idx != 0 || res != nil || err != nil {
t.Fatalf("expected (0, nil, nil), got: (%d, %#v, %#v)", idx, res, err)
}
// Inserting an ACL with empty ID is disallowed
@ -4006,10 +4007,13 @@ func TestStateStore_ACLSet_ACLGet(t *testing.T) {
}
// Retrieve the ACL again
result, err := s.ACLGet("acl1")
idx, result, err := s.ACLGet("acl1")
if err != nil {
t.Fatalf("err: %s", err)
}
if idx != 1 {
t.Fatalf("bad index: %d", idx)
}
// Check that the ACL matches the result
expect := &structs.ACL{