diff --git a/agent/consul/state/catalog.go b/agent/consul/state/catalog.go index 2a81c1071..3eb733bbe 100644 --- a/agent/consul/state/catalog.go +++ b/agent/consul/state/catalog.go @@ -10,6 +10,10 @@ import ( "github.com/hashicorp/go-memdb" ) +const ( + servicesTableName = "services" +) + // nodesTableSchema returns a new table schema used for storing node // information. func nodesTableSchema() *memdb.TableSchema { @@ -87,6 +91,15 @@ func servicesTableSchema() *memdb.TableSchema { Lowercase: true, }, }, + "proxy_destination": &memdb.IndexSchema{ + Name: "proxy_destination", + AllowMissing: true, + Unique: false, + Indexer: &memdb.StringFieldIndex{ + Field: "ServiceProxyDestination", + Lowercase: true, + }, + }, }, } } @@ -839,6 +852,39 @@ func (s *Store) ServiceTagNodes(ws memdb.WatchSet, service string, tag string) ( return idx, results, nil } +// ConnectServiceNodes returns the nodes associated with a Connect +// compatible destination for the given service name. This will include +// both proxies and native integrations. +func (s *Store) ConnectServiceNodes(ws memdb.WatchSet, serviceName string) (uint64, structs.ServiceNodes, error) { + tx := s.db.Txn(false) + defer tx.Abort() + + // Get the table index. + idx := maxIndexForService(tx, serviceName, false) + + // Find all the proxies. When we support native integrations we'll have + // to perform another table lookup here. + services, err := tx.Get(servicesTableName, "proxy_destination", serviceName) + if err != nil { + return 0, nil, fmt.Errorf("failed service lookup: %s", err) + } + ws.Add(services.WatchCh()) + + // Store them + var results structs.ServiceNodes + for service := services.Next(); service != nil; service = services.Next() { + results = append(results, service.(*structs.ServiceNode)) + } + + // Fill in the node details. + results, err = s.parseServiceNodes(tx, ws, results) + if err != nil { + return 0, nil, fmt.Errorf("failed parsing service nodes: %s", err) + } + + return idx, results, nil +} + // serviceTagFilter returns true (should filter) if the given service node // doesn't contain the given tag. func serviceTagFilter(sn *structs.ServiceNode, tag string) bool { diff --git a/agent/consul/state/catalog_test.go b/agent/consul/state/catalog_test.go index c057ebea6..1f20fb9b8 100644 --- a/agent/consul/state/catalog_test.go +++ b/agent/consul/state/catalog_test.go @@ -1572,6 +1572,48 @@ func TestStateStore_DeleteService(t *testing.T) { } } +func TestStateStore_ConnectServiceNodes(t *testing.T) { + assert := assert.New(t) + s := testStateStore(t) + + // Listing with no results returns an empty list. + ws := memdb.NewWatchSet() + idx, nodes, err := s.ConnectServiceNodes(ws, "db") + assert.Nil(err) + assert.Equal(idx, uint64(0)) + assert.Len(nodes, 0) + + // Create some nodes and services. + assert.Nil(s.EnsureNode(10, &structs.Node{Node: "foo", Address: "127.0.0.1"})) + assert.Nil(s.EnsureNode(11, &structs.Node{Node: "bar", Address: "127.0.0.2"})) + assert.Nil(s.EnsureService(12, "foo", &structs.NodeService{ID: "db", Service: "db", Tags: nil, Address: "", Port: 5000})) + assert.Nil(s.EnsureService(13, "bar", &structs.NodeService{ID: "api", Service: "api", Tags: nil, Address: "", Port: 5000})) + assert.Nil(s.EnsureService(14, "foo", &structs.NodeService{Kind: structs.ServiceKindConnectProxy, ID: "proxy", Service: "proxy", ProxyDestination: "db", Port: 8000})) + assert.Nil(s.EnsureService(15, "bar", &structs.NodeService{Kind: structs.ServiceKindConnectProxy, ID: "proxy", Service: "proxy", ProxyDestination: "db", Port: 8000})) + assert.Nil(s.EnsureService(16, "bar", &structs.NodeService{ID: "db2", Service: "db", Tags: []string{"slave"}, Address: "", Port: 8001})) + assert.True(watchFired(ws)) + + // Read everything back. + ws = memdb.NewWatchSet() + idx, nodes, err = s.ConnectServiceNodes(ws, "db") + assert.Nil(err) + assert.Equal(idx, uint64(idx)) + assert.Len(nodes, 2) + + for _, n := range nodes { + assert.Equal(structs.ServiceKindConnectProxy, n.ServiceKind) + assert.Equal("db", n.ServiceProxyDestination) + } + + // Registering some unrelated node should not fire the watch. + testRegisterNode(t, s, 17, "nope") + assert.False(watchFired(ws)) + + // But removing a node with the "db" service should fire the watch. + assert.Nil(s.DeleteNode(18, "bar")) + assert.True(watchFired(ws)) +} + func TestStateStore_Service_Snapshot(t *testing.T) { s := testStateStore(t)