consul/state: return highest index for queries with compound results
This commit is contained in:
parent
e6a9db17d7
commit
9fe029abc3
|
@ -131,23 +131,28 @@ func (s *StateStore) GetNode(id string) (*structs.Node, error) {
|
|||
}
|
||||
|
||||
// Nodes is used to return all of the known nodes.
|
||||
func (s *StateStore) Nodes() (structs.Nodes, error) {
|
||||
func (s *StateStore) Nodes() (uint64, structs.Nodes, error) {
|
||||
tx := s.db.Txn(false)
|
||||
defer tx.Abort()
|
||||
|
||||
// Retrieve all of the nodes
|
||||
nodes, err := tx.Get("nodes", "id")
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("failed nodes lookup: %s", err)
|
||||
return 0, nil, fmt.Errorf("failed nodes lookup: %s", err)
|
||||
}
|
||||
|
||||
// Create and return the nodes list.
|
||||
// TODO: Optimize by returning an iterator.
|
||||
// Create and return the nodes list, tracking the highest
|
||||
// index we see.
|
||||
var lindex uint64
|
||||
var results structs.Nodes
|
||||
for node := nodes.Next(); node != nil; node = nodes.Next() {
|
||||
n := node.(*structs.Node)
|
||||
if n.ModifyIndex > lindex {
|
||||
lindex = n.ModifyIndex
|
||||
}
|
||||
results = append(results, node.(*structs.Node))
|
||||
}
|
||||
return results, nil
|
||||
return lindex, results, nil
|
||||
}
|
||||
|
||||
// DeleteNode is used to delete a given node by its ID.
|
||||
|
@ -263,24 +268,24 @@ func (s *StateStore) ensureServiceTxn(idx uint64, node string, svc *structs.Node
|
|||
}
|
||||
|
||||
// NodeServices is used to query service registrations by node ID.
|
||||
func (s *StateStore) NodeServices(nodeID string) (*structs.NodeServices, error) {
|
||||
func (s *StateStore) NodeServices(nodeID string) (uint64, *structs.NodeServices, error) {
|
||||
tx := s.db.Txn(false)
|
||||
defer tx.Abort()
|
||||
|
||||
// Query the node
|
||||
n, err := tx.First("nodes", "id", nodeID)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("node lookup failed: %s", err)
|
||||
return 0, nil, fmt.Errorf("node lookup failed: %s", err)
|
||||
}
|
||||
if n == nil {
|
||||
return nil, nil
|
||||
return 0, nil, nil
|
||||
}
|
||||
node := n.(*structs.Node)
|
||||
|
||||
// Read all of the services
|
||||
services, err := tx.Get("services", "node", nodeID)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("failed querying services for node %q: %s", nodeID, err)
|
||||
return 0, nil, fmt.Errorf("failed querying services for node %q: %s", nodeID, err)
|
||||
}
|
||||
|
||||
// Initialize the node services struct
|
||||
|
@ -288,19 +293,15 @@ func (s *StateStore) NodeServices(nodeID string) (*structs.NodeServices, error)
|
|||
Node: node,
|
||||
Services: make(map[string]*structs.NodeService),
|
||||
}
|
||||
ns.CreateIndex = node.CreateIndex
|
||||
ns.CreateIndex = node.CreateIndex
|
||||
|
||||
// Add all of the services to the map
|
||||
// Add all of the services to the map, tracking the highest index
|
||||
var lindex uint64
|
||||
for service := services.Next(); service != nil; service = services.Next() {
|
||||
sn := service.(*structs.ServiceNode)
|
||||
|
||||
// Track the highest index
|
||||
if sn.CreateIndex > ns.CreateIndex {
|
||||
ns.CreateIndex = sn.CreateIndex
|
||||
}
|
||||
if sn.ModifyIndex > ns.ModifyIndex {
|
||||
ns.ModifyIndex = sn.ModifyIndex
|
||||
if sn.CreateIndex > lindex {
|
||||
lindex = sn.CreateIndex
|
||||
}
|
||||
|
||||
// Create the NodeService
|
||||
|
@ -318,7 +319,7 @@ func (s *StateStore) NodeServices(nodeID string) (*structs.NodeServices, error)
|
|||
ns.Services[svc.ID] = svc
|
||||
}
|
||||
|
||||
return ns, nil
|
||||
return lindex, ns, nil
|
||||
}
|
||||
|
||||
// DeleteService is used to delete a given service associated with a node.
|
||||
|
@ -450,7 +451,7 @@ func (s *StateStore) ensureCheckTxn(idx uint64, hc *structs.HealthCheck, tx *mem
|
|||
|
||||
// NodeChecks is used to retrieve checks associated with the
|
||||
// given node from the state store.
|
||||
func (s *StateStore) NodeChecks(nodeID string) (structs.HealthChecks, error) {
|
||||
func (s *StateStore) NodeChecks(nodeID string) (uint64, structs.HealthChecks, error) {
|
||||
tx := s.db.Txn(false)
|
||||
defer tx.Abort()
|
||||
return s.parseChecks(tx.Get("checks", "node", nodeID))
|
||||
|
@ -458,17 +459,23 @@ func (s *StateStore) NodeChecks(nodeID string) (structs.HealthChecks, error) {
|
|||
|
||||
// parseChecks is a helper function used to deduplicate some
|
||||
// repetitive code for returning health checks.
|
||||
func (s *StateStore) parseChecks(iter memdb.ResultIterator, err error) (structs.HealthChecks, error) {
|
||||
func (s *StateStore) parseChecks(iter memdb.ResultIterator, err error) (uint64, structs.HealthChecks, error) {
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("failed health check lookup: %s", err)
|
||||
return 0, nil, fmt.Errorf("failed health check lookup: %s", err)
|
||||
}
|
||||
|
||||
// Gather the health checks and return them properly type casted
|
||||
// Gather the health checks and return them properly type casted.
|
||||
// Track the highest index along the way.
|
||||
var results structs.HealthChecks
|
||||
var lindex uint64
|
||||
for hc := iter.Next(); hc != nil; hc = iter.Next() {
|
||||
results = append(results, hc.(*structs.HealthCheck))
|
||||
check := hc.(*structs.HealthCheck)
|
||||
if check.ModifyIndex > lindex {
|
||||
lindex = check.ModifyIndex
|
||||
}
|
||||
results = append(results, check)
|
||||
}
|
||||
return results, nil
|
||||
return lindex, results, nil
|
||||
}
|
||||
|
||||
// DeleteCheck is used to delete a health check registration.
|
||||
|
|
|
@ -158,11 +158,16 @@ func TestStateStore_GetNodes(t *testing.T) {
|
|||
testRegisterNode(t, s, 2, "node2")
|
||||
|
||||
// Retrieve the nodes
|
||||
nodes, err := s.Nodes()
|
||||
idx, nodes, err := s.Nodes()
|
||||
if err != nil {
|
||||
t.Fatalf("err: %s", err)
|
||||
}
|
||||
|
||||
// Highest index was returned
|
||||
if idx != 2 {
|
||||
t.Fatalf("bad index: %d", idx)
|
||||
}
|
||||
|
||||
// All nodes were returned
|
||||
if n := len(nodes); n != 3 {
|
||||
t.Fatalf("bad node count: %d", n)
|
||||
|
@ -231,8 +236,8 @@ func TestStateStore_EnsureService_NodeServices(t *testing.T) {
|
|||
s := testStateStore(t)
|
||||
|
||||
// Fetching services for a node with none returns nil
|
||||
if res, err := s.NodeServices("node1"); err != nil || res != nil {
|
||||
t.Fatalf("expected (nil, nil), got: (%#v, %#v)", res, err)
|
||||
if idx, res, err := s.NodeServices("node1"); err != nil || res != nil || idx != 0 {
|
||||
t.Fatalf("expected (0, nil, nil), got: (%d, %#v, %#v)", idx, res, err)
|
||||
}
|
||||
|
||||
// Register the nodes
|
||||
|
@ -270,11 +275,16 @@ func TestStateStore_EnsureService_NodeServices(t *testing.T) {
|
|||
}
|
||||
|
||||
// Retrieve the services
|
||||
out, err := s.NodeServices("node1")
|
||||
idx, out, err := s.NodeServices("node1")
|
||||
if err != nil {
|
||||
t.Fatalf("err: %s", err)
|
||||
}
|
||||
|
||||
// Highest index for the result set was returned
|
||||
if idx != 20 {
|
||||
t.Fatalf("bad index: %d", idx)
|
||||
}
|
||||
|
||||
// Only the services for the requested node are returned
|
||||
if out == nil || len(out.Services) != 2 {
|
||||
t.Fatalf("bad services: %#v", out)
|
||||
|
@ -293,11 +303,6 @@ func TestStateStore_EnsureService_NodeServices(t *testing.T) {
|
|||
t.Fatalf("bad: %#v %#v", ns2, svc)
|
||||
}
|
||||
|
||||
// Lastly, ensure that the highest index was preserved.
|
||||
if out.CreateIndex != 20 || out.ModifyIndex != 20 {
|
||||
t.Fatalf("bad index: %d, %d", out.CreateIndex, out.ModifyIndex)
|
||||
}
|
||||
|
||||
// Index tables were updated
|
||||
if idx := s.maxIndex("services"); idx != 30 {
|
||||
t.Fatalf("bad index: %d", idx)
|
||||
|
@ -307,17 +312,9 @@ func TestStateStore_EnsureService_NodeServices(t *testing.T) {
|
|||
func TestStateStore_DeleteService(t *testing.T) {
|
||||
s := testStateStore(t)
|
||||
|
||||
// Register a node with one service
|
||||
// Register a node with one service and a check
|
||||
testRegisterNode(t, s, 1, "node1")
|
||||
testRegisterService(t, s, 2, "node1", "service1")
|
||||
|
||||
// The service exists
|
||||
ns, err := s.NodeServices("node1")
|
||||
if err != nil || ns == nil || len(ns.Services) != 1 {
|
||||
t.Fatalf("bad: %#v (err: %#v)", ns, err)
|
||||
}
|
||||
|
||||
// Register a check with the service
|
||||
testRegisterCheck(t, s, 3, "node1", "service1", "check1")
|
||||
|
||||
// Delete the service
|
||||
|
@ -325,14 +322,19 @@ func TestStateStore_DeleteService(t *testing.T) {
|
|||
t.Fatalf("err: %s", err)
|
||||
}
|
||||
|
||||
// The service and check don't exist
|
||||
ns, err = s.NodeServices("node1")
|
||||
// Service doesn't exist.
|
||||
_, ns, err := s.NodeServices("node1")
|
||||
if err != nil || ns == nil || len(ns.Services) != 0 {
|
||||
t.Fatalf("bad: %#v (err: %#v)", ns, err)
|
||||
}
|
||||
checks, err := s.NodeChecks("node1")
|
||||
if err != nil || len(checks) != 0 {
|
||||
t.Fatalf("bad: %#v (err: %s)", checks, err)
|
||||
|
||||
// Check doesn't exist. Check using the raw DB so we can test
|
||||
// that it actually is removed in the state store.
|
||||
tx := s.db.Txn(false)
|
||||
defer tx.Abort()
|
||||
check, err := tx.First("checks", "id", "node1", "check1")
|
||||
if err != nil || check != nil {
|
||||
t.Fatalf("bad: %#v (err: %s)", check, err)
|
||||
}
|
||||
|
||||
// Index tables were updated
|
||||
|
@ -381,10 +383,13 @@ func TestStateStore_EnsureCheck(t *testing.T) {
|
|||
}
|
||||
|
||||
// Retrieve the check and make sure it matches
|
||||
checks, err := s.NodeChecks("node1")
|
||||
idx, checks, err := s.NodeChecks("node1")
|
||||
if err != nil {
|
||||
t.Fatalf("err: %s", err)
|
||||
}
|
||||
if idx != 3 {
|
||||
t.Fatalf("bad index: %d", idx)
|
||||
}
|
||||
if len(checks) != 1 {
|
||||
t.Fatalf("wrong number of checks: %d", len(checks))
|
||||
}
|
||||
|
@ -399,10 +404,13 @@ func TestStateStore_EnsureCheck(t *testing.T) {
|
|||
}
|
||||
|
||||
// Check that we successfully updated
|
||||
checks, err = s.NodeChecks("node1")
|
||||
idx, checks, err = s.NodeChecks("node1")
|
||||
if err != nil {
|
||||
t.Fatalf("err: %s", err)
|
||||
}
|
||||
if idx != 4 {
|
||||
t.Fatalf("bad index: %d", idx)
|
||||
}
|
||||
if len(checks) != 1 {
|
||||
t.Fatalf("wrong number of checks: %d", len(checks))
|
||||
}
|
||||
|
@ -432,7 +440,7 @@ func TestStateStore_DeleteCheck(t *testing.T) {
|
|||
}
|
||||
|
||||
// Check is gone
|
||||
checks, err := s.NodeChecks("node1")
|
||||
_, checks, err := s.NodeChecks("node1")
|
||||
if err != nil {
|
||||
t.Fatalf("err: %s", err)
|
||||
}
|
||||
|
|
|
@ -269,8 +269,6 @@ type NodeService struct {
|
|||
type NodeServices struct {
|
||||
Node *Node
|
||||
Services map[string]*NodeService
|
||||
|
||||
RaftIndex
|
||||
}
|
||||
|
||||
// HealthCheck represents a single check on a given node
|
||||
|
|
Loading…
Reference in New Issue