consul/state: querying node services works
This commit is contained in:
parent
859ffe14e1
commit
08553f0fef
|
@ -90,8 +90,25 @@ func servicesTableSchema() *memdb.TableSchema {
|
|||
Name: "id",
|
||||
AllowMissing: false,
|
||||
Unique: true,
|
||||
Indexer: &memdb.CompoundIndex{
|
||||
Indexes: []memdb.Indexer{
|
||||
&memdb.StringFieldIndex{
|
||||
Field: "Node",
|
||||
Lowercase: true,
|
||||
},
|
||||
&memdb.StringFieldIndex{
|
||||
Field: "ServiceID",
|
||||
Lowercase: true,
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
"node": &memdb.IndexSchema{
|
||||
Name: "node",
|
||||
AllowMissing: false,
|
||||
Unique: false,
|
||||
Indexer: &memdb.StringFieldIndex{
|
||||
Field: "ID",
|
||||
Field: "Node",
|
||||
Lowercase: true,
|
||||
},
|
||||
},
|
||||
|
@ -100,7 +117,7 @@ func servicesTableSchema() *memdb.TableSchema {
|
|||
AllowMissing: true,
|
||||
Unique: false,
|
||||
Indexer: &memdb.StringFieldIndex{
|
||||
Field: "Service",
|
||||
Field: "ServiceName",
|
||||
Lowercase: true,
|
||||
},
|
||||
},
|
||||
|
|
|
@ -100,12 +100,12 @@ func (s *StateStore) GetNode(id string) (*structs.Node, error) {
|
|||
}
|
||||
|
||||
// EnsureService is called to upsert creation of a given NodeService.
|
||||
func (s *StateStore) EnsureService(idx uint64, svc *structs.NodeService) error {
|
||||
func (s *StateStore) EnsureService(idx uint64, node string, svc *structs.NodeService) error {
|
||||
tx := s.db.Txn(true)
|
||||
defer tx.Abort()
|
||||
|
||||
// Call the service registration upsert
|
||||
if err := s.ensureServiceTxn(idx, svc, tx); err != nil {
|
||||
if err := s.ensureServiceTxn(idx, node, svc, tx); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
|
@ -115,24 +115,34 @@ func (s *StateStore) EnsureService(idx uint64, svc *structs.NodeService) error {
|
|||
|
||||
// ensureServiceTxn is used to upsert a service registration within an
|
||||
// existing memdb transaction.
|
||||
func (s *StateStore) ensureServiceTxn(idx uint64, svc *structs.NodeService, tx *memdb.Txn) error {
|
||||
func (s *StateStore) ensureServiceTxn(idx uint64, node string, svc *structs.NodeService, tx *memdb.Txn) error {
|
||||
// Check for existing service
|
||||
existing, err := tx.First("services", "id", svc.Service)
|
||||
existing, err := tx.First("services", "id", node, svc.Service)
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed service lookup: %s", err)
|
||||
}
|
||||
|
||||
// Create the service node entry
|
||||
entry := &structs.ServiceNode{
|
||||
Node: node,
|
||||
ServiceID: svc.ID,
|
||||
ServiceName: svc.Service,
|
||||
ServiceTags: svc.Tags,
|
||||
ServiceAddress: svc.Address,
|
||||
ServicePort: svc.Port,
|
||||
}
|
||||
|
||||
// Populate the indexes
|
||||
if existing != nil {
|
||||
svc.CreateIndex = existing.(*structs.NodeService).CreateIndex
|
||||
svc.ModifyIndex = idx
|
||||
entry.CreateIndex = existing.(*structs.NodeService).CreateIndex
|
||||
entry.ModifyIndex = idx
|
||||
} else {
|
||||
svc.CreateIndex = idx
|
||||
svc.ModifyIndex = idx
|
||||
entry.CreateIndex = idx
|
||||
entry.ModifyIndex = idx
|
||||
}
|
||||
|
||||
// Insert the service and update the index
|
||||
if err := tx.Insert("services", svc); err != nil {
|
||||
if err := tx.Insert("services", entry); err != nil {
|
||||
return fmt.Errorf("failed inserting service: %s", err)
|
||||
}
|
||||
if err := tx.Insert("index", &IndexEntry{"services", idx}); err != nil {
|
||||
|
@ -140,3 +150,45 @@ func (s *StateStore) ensureServiceTxn(idx uint64, svc *structs.NodeService, tx *
|
|||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// NodeServices is used to query service registrations by node ID.
|
||||
func (s *StateStore) NodeServices(nodeID string) (*structs.NodeServices, error) {
|
||||
tx := s.db.Txn(false)
|
||||
defer tx.Abort()
|
||||
|
||||
// Query the node
|
||||
node, err := tx.First("nodes", "id", nodeID)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("node lookup failed: %s", err)
|
||||
}
|
||||
if node == nil {
|
||||
return nil, nil
|
||||
}
|
||||
|
||||
// Read all of the services
|
||||
services, err := tx.Get("services", "node", nodeID)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("failed querying services for node %q: %s", nodeID, err)
|
||||
}
|
||||
|
||||
// Initialize the node services struct
|
||||
ns := &structs.NodeServices{
|
||||
Node: *node.(*structs.Node),
|
||||
Services: make(map[string]*structs.NodeService),
|
||||
}
|
||||
|
||||
// Add all of the services to the map
|
||||
for service := services.Next(); service != nil; service = services.Next() {
|
||||
sn := service.(*structs.ServiceNode)
|
||||
svc := &structs.NodeService{
|
||||
ID: sn.ServiceID,
|
||||
Service: sn.ServiceName,
|
||||
Tags: sn.ServiceTags,
|
||||
Address: sn.ServiceAddress,
|
||||
Port: sn.ServicePort,
|
||||
}
|
||||
ns.Services[svc.ID] = svc
|
||||
}
|
||||
|
||||
return ns, nil
|
||||
}
|
||||
|
|
|
@ -2,6 +2,7 @@ package state
|
|||
|
||||
import (
|
||||
"os"
|
||||
"reflect"
|
||||
"testing"
|
||||
|
||||
"github.com/hashicorp/consul/consul/structs"
|
||||
|
@ -83,11 +84,26 @@ func TestStateStore_EnsureNode_GetNode(t *testing.T) {
|
|||
}
|
||||
}
|
||||
|
||||
func TestStateStore_EnsureService(t *testing.T) {
|
||||
func TestStateStore_EnsureService_NodeServices(t *testing.T) {
|
||||
s := testStateStore(t)
|
||||
|
||||
// Fetching services for a node with none returns nil
|
||||
if res, err := s.NodeServices("node1"); err != nil || res != nil {
|
||||
t.Fatalf("expected (nil, nil), got: (%#v, %#v)", res, err)
|
||||
}
|
||||
|
||||
// Register the nodes
|
||||
for i, nr := range []*structs.Node{
|
||||
&structs.Node{Node: "node1", Address: "1.1.1.1"},
|
||||
&structs.Node{Node: "node2", Address: "1.1.1.2"},
|
||||
} {
|
||||
if err := s.EnsureNode(uint64(i), nr); err != nil {
|
||||
t.Fatalf("err: %s", err)
|
||||
}
|
||||
}
|
||||
|
||||
// Create the service registration
|
||||
in := &structs.NodeService{
|
||||
ns1 := &structs.NodeService{
|
||||
ID: "service1",
|
||||
Service: "redis",
|
||||
Tags: []string{"prod"},
|
||||
|
@ -96,7 +112,40 @@ func TestStateStore_EnsureService(t *testing.T) {
|
|||
}
|
||||
|
||||
// Service successfully registers into the state store
|
||||
if err := s.EnsureService(1, in); err != nil {
|
||||
if err := s.EnsureService(10, "node1", ns1); err != nil {
|
||||
t.Fatalf("err: %s", err)
|
||||
}
|
||||
|
||||
// Register a similar service against both nodes
|
||||
ns2 := *ns1
|
||||
ns2.ID = "service2"
|
||||
for _, n := range []string{"node1", "node2"} {
|
||||
if err := s.EnsureService(20, n, &ns2); err != nil {
|
||||
t.Fatalf("err: %s", err)
|
||||
}
|
||||
}
|
||||
|
||||
// Register a different service on the bad node
|
||||
ns3 := *ns1
|
||||
ns3.ID = "service3"
|
||||
if err := s.EnsureService(30, "node2", &ns3); err != nil {
|
||||
t.Fatalf("err: %s", err)
|
||||
}
|
||||
|
||||
// Retrieve the services
|
||||
out, err := s.NodeServices("node1")
|
||||
if err != nil {
|
||||
t.Fatalf("err: %s", err)
|
||||
}
|
||||
|
||||
// Only the services for the requested node are returned
|
||||
if out == nil || len(out.Services) != 2 {
|
||||
t.Fatalf("bad services: %#v", out)
|
||||
}
|
||||
if svc := out.Services["service1"]; !reflect.DeepEqual(ns1, svc) {
|
||||
t.Fatalf("bad: %#v", svc)
|
||||
}
|
||||
if svc := out.Services["service2"]; !reflect.DeepEqual(&ns2, svc) {
|
||||
t.Fatalf("bad: %#v %#v", ns2, svc)
|
||||
}
|
||||
}
|
||||
|
|
|
@ -249,6 +249,8 @@ type ServiceNode struct {
|
|||
ServiceTags []string
|
||||
ServiceAddress string
|
||||
ServicePort int
|
||||
|
||||
Index
|
||||
}
|
||||
type ServiceNodes []ServiceNode
|
||||
|
||||
|
|
Loading…
Reference in New Issue