Add support for filtering the 'List Services' API

1. Create a bexpr filter for performing the filtering
2. Change the state store functions to return the raw (not aggregated)
   list of ServiceNodes.
3. Move the aggregate service tags by name logic out of the state store
   functions into a new function called from the RPC endpoint
4. Perform the filtering in the endpoint before aggregation.
This commit is contained in:
Daniel Kimsey 2022-08-10 16:52:32 -05:00
parent 8ca0a872ed
commit 4243e1e05f
7 changed files with 177 additions and 82 deletions

3
.changelog/11742.txt Normal file
View File

@ -0,0 +1,3 @@
```release-note:improvement
api: Add filtering support to Catalog's List Services (v1/catalog/services)
```

View File

@ -565,6 +565,11 @@ func (c *Catalog) ListServices(args *structs.DCSpecificRequest, reply *structs.I
return err return err
} }
filter, err := bexpr.CreateFilter(args.Filter, nil, []*structs.ServiceNode{})
if err != nil {
return err
}
// Set reply enterprise metadata after resolving and validating the token so // Set reply enterprise metadata after resolving and validating the token so
// that we can properly infer metadata from the token. // that we can properly infer metadata from the token.
reply.EnterpriseMeta = args.EnterpriseMeta reply.EnterpriseMeta = args.EnterpriseMeta
@ -574,10 +579,11 @@ func (c *Catalog) ListServices(args *structs.DCSpecificRequest, reply *structs.I
&reply.QueryMeta, &reply.QueryMeta,
func(ws memdb.WatchSet, state *state.Store) error { func(ws memdb.WatchSet, state *state.Store) error {
var err error var err error
var serviceNodes structs.ServiceNodes
if len(args.NodeMetaFilters) > 0 { if len(args.NodeMetaFilters) > 0 {
reply.Index, reply.Services, err = state.ServicesByNodeMeta(ws, args.NodeMetaFilters, &args.EnterpriseMeta, args.PeerName) reply.Index, serviceNodes, err = state.ServicesByNodeMeta(ws, args.NodeMetaFilters, &args.EnterpriseMeta, args.PeerName)
} else { } else {
reply.Index, reply.Services, err = state.Services(ws, &args.EnterpriseMeta, args.PeerName) reply.Index, serviceNodes, err = state.Services(ws, &args.EnterpriseMeta, args.PeerName)
} }
if err != nil { if err != nil {
return err return err
@ -588,11 +594,43 @@ func (c *Catalog) ListServices(args *structs.DCSpecificRequest, reply *structs.I
return nil return nil
} }
raw, err := filter.Execute(serviceNodes)
if err != nil {
return err
}
reply.Services = servicesTagsByName(raw.(structs.ServiceNodes))
c.srv.filterACLWithAuthorizer(authz, reply) c.srv.filterACLWithAuthorizer(authz, reply)
return nil return nil
}) })
} }
func servicesTagsByName(services []*structs.ServiceNode) structs.Services {
unique := make(map[string]map[string]struct{})
for _, svc := range services {
tags, ok := unique[svc.ServiceName]
if !ok {
unique[svc.ServiceName] = make(map[string]struct{})
tags = unique[svc.ServiceName]
}
for _, tag := range svc.ServiceTags {
tags[tag] = struct{}{}
}
}
// Generate the output structure.
var results = make(structs.Services)
for service, tags := range unique {
results[service] = make([]string, 0, len(tags))
for tag := range tags {
results[service] = append(results[service], tag)
}
}
return results
}
// ServiceList is used to query the services in a DC. // ServiceList is used to query the services in a DC.
// Returns services as a list of ServiceNames. // Returns services as a list of ServiceNames.
func (c *Catalog) ServiceList(args *structs.DCSpecificRequest, reply *structs.IndexedServiceList) error { func (c *Catalog) ServiceList(args *structs.DCSpecificRequest, reply *structs.IndexedServiceList) error {

View File

@ -1523,6 +1523,45 @@ func TestCatalog_ListServices_NodeMetaFilter(t *testing.T) {
} }
} }
func TestCatalog_ListServices_Filter(t *testing.T) {
t.Parallel()
_, s1 := testServer(t)
codec := rpcClient(t, s1)
testrpc.WaitForTestAgent(t, s1.RPC, "dc1")
// prep the cluster with some data we can use in our filters
registerTestCatalogEntries(t, codec)
// Run the tests against the test server
t.Run("ListServices", func(t *testing.T) {
args := structs.DCSpecificRequest{
Datacenter: "dc1",
}
args.Filter = "ServiceName == redis"
out := new(structs.IndexedServices)
require.NoError(t, msgpackrpc.CallWithCodec(codec, "Catalog.ListServices", &args, out))
require.Contains(t, out.Services, "redis")
require.ElementsMatch(t, []string{"v1", "v2"}, out.Services["redis"])
args.Filter = "NodeMeta.os == NoSuchOS"
out = new(structs.IndexedServices)
require.NoError(t, msgpackrpc.CallWithCodec(codec, "Catalog.ListServices", &args, out))
require.Len(t, out.Services, 0)
args.Filter = "NodeMeta.NoSuchMetadata == linux"
out = new(structs.IndexedServices)
require.NoError(t, msgpackrpc.CallWithCodec(codec, "Catalog.ListServices", &args, out))
require.Len(t, out.Services, 0)
args.Filter = "InvalidField == linux"
out = new(structs.IndexedServices)
require.Error(t, msgpackrpc.CallWithCodec(codec, "Catalog.ListServices", &args, out))
})
}
func TestCatalog_ListServices_Blocking(t *testing.T) { func TestCatalog_ListServices_Blocking(t *testing.T) {
if testing.Short() { if testing.Short() {
t.Skip("too slow for testing.Short") t.Skip("too slow for testing.Short")

View File

@ -1134,7 +1134,7 @@ func terminatingGatewayVirtualIPsSupported(tx ReadTxn, ws memdb.WatchSet) (bool,
} }
// Services returns all services along with a list of associated tags. // Services returns all services along with a list of associated tags.
func (s *Store) Services(ws memdb.WatchSet, entMeta *acl.EnterpriseMeta, peerName string) (uint64, structs.Services, error) { func (s *Store) Services(ws memdb.WatchSet, entMeta *acl.EnterpriseMeta, peerName string) (uint64, []*structs.ServiceNode, error) {
tx := s.db.Txn(false) tx := s.db.Txn(false)
defer tx.Abort() defer tx.Abort()
@ -1148,30 +1148,11 @@ func (s *Store) Services(ws memdb.WatchSet, entMeta *acl.EnterpriseMeta, peerNam
} }
ws.Add(services.WatchCh()) ws.Add(services.WatchCh())
// Rip through the services and enumerate them and their unique set of var result []*structs.ServiceNode
// tags.
unique := make(map[string]map[string]struct{})
for service := services.Next(); service != nil; service = services.Next() { for service := services.Next(); service != nil; service = services.Next() {
svc := service.(*structs.ServiceNode) result = append(result, service.(*structs.ServiceNode))
tags, ok := unique[svc.ServiceName]
if !ok {
unique[svc.ServiceName] = make(map[string]struct{})
tags = unique[svc.ServiceName]
} }
for _, tag := range svc.ServiceTags { return idx, result, nil
tags[tag] = struct{}{}
}
}
// Generate the output structure.
var results = make(structs.Services)
for service, tags := range unique {
results[service] = make([]string, 0, len(tags))
for tag := range tags {
results[service] = append(results[service], tag)
}
}
return idx, results, nil
} }
func (s *Store) ServiceList(ws memdb.WatchSet, entMeta *acl.EnterpriseMeta, peerName string) (uint64, structs.ServiceList, error) { func (s *Store) ServiceList(ws memdb.WatchSet, entMeta *acl.EnterpriseMeta, peerName string) (uint64, structs.ServiceList, error) {
@ -1212,7 +1193,7 @@ func serviceListTxn(tx ReadTxn, ws memdb.WatchSet, entMeta *acl.EnterpriseMeta,
} }
// ServicesByNodeMeta returns all services, filtered by the given node metadata. // ServicesByNodeMeta returns all services, filtered by the given node metadata.
func (s *Store) ServicesByNodeMeta(ws memdb.WatchSet, filters map[string]string, entMeta *acl.EnterpriseMeta, peerName string) (uint64, structs.Services, error) { func (s *Store) ServicesByNodeMeta(ws memdb.WatchSet, filters map[string]string, entMeta *acl.EnterpriseMeta, peerName string) (uint64, []*structs.ServiceNode, error) {
tx := s.db.Txn(false) tx := s.db.Txn(false)
defer tx.Abort() defer tx.Abort()
@ -1259,8 +1240,7 @@ func (s *Store) ServicesByNodeMeta(ws memdb.WatchSet, filters map[string]string,
} }
allServicesCh := allServices.WatchCh() allServicesCh := allServices.WatchCh()
// Populate the services map var result structs.ServiceNodes
unique := make(map[string]map[string]struct{})
for node := nodes.Next(); node != nil; node = nodes.Next() { for node := nodes.Next(); node != nil; node = nodes.Next() {
n := node.(*structs.Node) n := node.(*structs.Node)
if len(filters) > 1 && !structs.SatisfiesMetaFilters(n.Meta, filters) { if len(filters) > 1 && !structs.SatisfiesMetaFilters(n.Meta, filters) {
@ -1274,30 +1254,11 @@ func (s *Store) ServicesByNodeMeta(ws memdb.WatchSet, filters map[string]string,
} }
ws.AddWithLimit(watchLimit, services.WatchCh(), allServicesCh) ws.AddWithLimit(watchLimit, services.WatchCh(), allServicesCh)
// Rip through the services and enumerate them and their unique set of
// tags.
for service := services.Next(); service != nil; service = services.Next() { for service := services.Next(); service != nil; service = services.Next() {
svc := service.(*structs.ServiceNode) result = append(result, service.(*structs.ServiceNode))
tags, ok := unique[svc.ServiceName]
if !ok {
unique[svc.ServiceName] = make(map[string]struct{})
tags = unique[svc.ServiceName]
}
for _, tag := range svc.ServiceTags {
tags[tag] = struct{}{}
} }
} }
} return idx, result, nil
// Generate the output structure.
var results = make(structs.Services)
for service, tags := range unique {
results[service] = make([]string, 0, len(tags))
for tag := range tags {
results[service] = append(results[service], tag)
}
}
return idx, results, nil
} }
// maxIndexForService return the maximum Raft Index for a service // maxIndexForService return the maximum Raft Index for a service

View File

@ -12,6 +12,8 @@ import (
"github.com/hashicorp/consul/acl" "github.com/hashicorp/consul/acl"
"github.com/google/go-cmp/cmp"
"github.com/google/go-cmp/cmp/cmpopts"
"github.com/hashicorp/go-memdb" "github.com/hashicorp/go-memdb"
"github.com/hashicorp/go-uuid" "github.com/hashicorp/go-uuid"
"github.com/stretchr/testify/assert" "github.com/stretchr/testify/assert"
@ -2105,7 +2107,7 @@ func TestStateStore_Services(t *testing.T) {
if err := s.EnsureService(2, "node1", ns1); err != nil { if err := s.EnsureService(2, "node1", ns1); err != nil {
t.Fatalf("err: %s", err) t.Fatalf("err: %s", err)
} }
testRegisterService(t, s, 3, "node1", "dogs") ns1Dogs := testRegisterService(t, s, 3, "node1", "dogs")
testRegisterNode(t, s, 4, "node2") testRegisterNode(t, s, 4, "node2")
ns2 := &structs.NodeService{ ns2 := &structs.NodeService{
ID: "service3", ID: "service3",
@ -2131,19 +2133,13 @@ func TestStateStore_Services(t *testing.T) {
t.Fatalf("bad index: %d", idx) t.Fatalf("bad index: %d", idx)
} }
// Verify the result. We sort the lists since the order is // Verify the result.
// non-deterministic (it's built using a map internally). expected := []*structs.ServiceNode{
expected := structs.Services{ ns1Dogs.ToServiceNode("node1"),
"redis": []string{"prod", "primary", "replica"}, ns1.ToServiceNode("node1"),
"dogs": []string{}, ns2.ToServiceNode("node2"),
}
sort.Strings(expected["redis"])
for _, tags := range services {
sort.Strings(tags)
}
if !reflect.DeepEqual(expected, services) {
t.Fatalf("bad: %#v", services)
} }
assertDeepEqual(t, services, expected, cmpopts.IgnoreFields(structs.ServiceNode{}, "RaftIndex"))
// Deleting a node with a service should fire the watch. // Deleting a node with a service should fire the watch.
if err := s.DeleteNode(6, "node1", nil, ""); err != nil { if err := s.DeleteNode(6, "node1", nil, ""); err != nil {
@ -2206,11 +2202,10 @@ func TestStateStore_ServicesByNodeMeta(t *testing.T) {
if err != nil { if err != nil {
t.Fatalf("err: %s", err) t.Fatalf("err: %s", err)
} }
expected := structs.Services{ expected := []*structs.ServiceNode{
"redis": []string{"primary", "prod"}, ns1.ToServiceNode("node0"),
} }
sort.Strings(res["redis"]) assertDeepEqual(t, res, expected, cmpopts.IgnoreFields(structs.ServiceNode{}, "RaftIndex"))
require.Equal(t, expected, res)
}) })
t.Run("Get all services using the common meta value", func(t *testing.T) { t.Run("Get all services using the common meta value", func(t *testing.T) {
@ -2218,11 +2213,12 @@ func TestStateStore_ServicesByNodeMeta(t *testing.T) {
if err != nil { if err != nil {
t.Fatalf("err: %s", err) t.Fatalf("err: %s", err)
} }
expected := structs.Services{ require.Len(t, res, 2)
"redis": []string{"primary", "prod", "replica"}, expected := []*structs.ServiceNode{
ns1.ToServiceNode("node0"),
ns2.ToServiceNode("node1"),
} }
sort.Strings(res["redis"]) assertDeepEqual(t, res, expected, cmpopts.IgnoreFields(structs.ServiceNode{}, "RaftIndex"))
require.Equal(t, expected, res)
}) })
t.Run("Get an empty list for an invalid meta value", func(t *testing.T) { t.Run("Get an empty list for an invalid meta value", func(t *testing.T) {
@ -2230,8 +2226,8 @@ func TestStateStore_ServicesByNodeMeta(t *testing.T) {
if err != nil { if err != nil {
t.Fatalf("err: %s", err) t.Fatalf("err: %s", err)
} }
expected := structs.Services{} var expected []*structs.ServiceNode
require.Equal(t, expected, res) assertDeepEqual(t, res, expected, cmpopts.IgnoreFields(structs.ServiceNode{}, "RaftIndex"))
}) })
t.Run("Get the first node's service instance using multiple meta filters", func(t *testing.T) { t.Run("Get the first node's service instance using multiple meta filters", func(t *testing.T) {
@ -2239,11 +2235,10 @@ func TestStateStore_ServicesByNodeMeta(t *testing.T) {
if err != nil { if err != nil {
t.Fatalf("err: %s", err) t.Fatalf("err: %s", err)
} }
expected := structs.Services{ expected := []*structs.ServiceNode{
"redis": []string{"primary", "prod"}, ns1.ToServiceNode("node0"),
} }
sort.Strings(res["redis"]) assertDeepEqual(t, res, expected, cmpopts.IgnoreFields(structs.ServiceNode{}, "RaftIndex"))
require.Equal(t, expected, res)
}) })
t.Run("Registering some unrelated node + service should not fire the watch.", func(t *testing.T) { t.Run("Registering some unrelated node + service should not fire the watch.", func(t *testing.T) {
@ -8807,3 +8802,10 @@ func setVirtualIPFlags(t *testing.T, s *Store) {
Value: "true", Value: "true",
})) }))
} }
func assertDeepEqual(t *testing.T, x, y interface{}, opts ...cmp.Option) {
t.Helper()
if diff := cmp.Diff(x, y, opts...); diff != "" {
t.Fatalf("assertion failed: values are not equal\n--- expected\n+++ actual\n%v", diff)
}
}

View File

@ -146,13 +146,13 @@ func testRegisterServiceOpts(t *testing.T, s *Store, idx uint64, nodeID, service
// testRegisterServiceWithChange registers a service and allow ensuring the consul index is updated // testRegisterServiceWithChange registers a service and allow ensuring the consul index is updated
// even if service already exists if using `modifyAccordingIndex`. // even if service already exists if using `modifyAccordingIndex`.
// This is done by setting the transaction ID in "version" meta so service will be updated if it already exists // This is done by setting the transaction ID in "version" meta so service will be updated if it already exists
func testRegisterServiceWithChange(t *testing.T, s *Store, idx uint64, nodeID, serviceID string, modifyAccordingIndex bool) { func testRegisterServiceWithChange(t *testing.T, s *Store, idx uint64, nodeID, serviceID string, modifyAccordingIndex bool) *structs.NodeService {
testRegisterServiceWithChangeOpts(t, s, idx, nodeID, serviceID, modifyAccordingIndex) return testRegisterServiceWithChangeOpts(t, s, idx, nodeID, serviceID, modifyAccordingIndex)
} }
// testRegisterServiceWithChangeOpts is the same as testRegisterServiceWithChange with the addition of opts that can // testRegisterServiceWithChangeOpts is the same as testRegisterServiceWithChange with the addition of opts that can
// modify the service prior to writing. // modify the service prior to writing.
func testRegisterServiceWithChangeOpts(t *testing.T, s *Store, idx uint64, nodeID, serviceID string, modifyAccordingIndex bool, opts ...func(service *structs.NodeService)) { func testRegisterServiceWithChangeOpts(t *testing.T, s *Store, idx uint64, nodeID, serviceID string, modifyAccordingIndex bool, opts ...func(service *structs.NodeService)) *structs.NodeService {
meta := make(map[string]string) meta := make(map[string]string)
if modifyAccordingIndex { if modifyAccordingIndex {
meta["version"] = fmt.Sprint(idx) meta["version"] = fmt.Sprint(idx)
@ -183,14 +183,15 @@ func testRegisterServiceWithChangeOpts(t *testing.T, s *Store, idx uint64, nodeI
result.ServiceID != serviceID { result.ServiceID != serviceID {
t.Fatalf("bad service: %#v", result) t.Fatalf("bad service: %#v", result)
} }
return svc
} }
// testRegisterService register a service with given transaction idx // testRegisterService register a service with given transaction idx
// If the service already exists, transaction number might not be increased // If the service already exists, transaction number might not be increased
// Use `testRegisterServiceWithChange()` if you want perform a registration that // Use `testRegisterServiceWithChange()` if you want perform a registration that
// ensures the transaction is updated by setting idx in Meta of Service // ensures the transaction is updated by setting idx in Meta of Service
func testRegisterService(t *testing.T, s *Store, idx uint64, nodeID, serviceID string) { func testRegisterService(t *testing.T, s *Store, idx uint64, nodeID, serviceID string) *structs.NodeService {
testRegisterServiceWithChange(t, s, idx, nodeID, serviceID, false) return testRegisterServiceWithChange(t, s, idx, nodeID, serviceID, false)
} }
func testRegisterConnectService(t *testing.T, s *Store, idx uint64, nodeID, serviceID string) { func testRegisterConnectService(t *testing.T, s *Store, idx uint64, nodeID, serviceID string) {

View File

@ -410,13 +410,64 @@ The corresponding CLI command is [`consul catalog services`](/commands/catalog/s
- `dc` `(string: "")` - Specifies the datacenter to query. This will default to - `dc` `(string: "")` - Specifies the datacenter to query. This will default to
the datacenter of the agent being queried. the datacenter of the agent being queried.
- `node-meta` `(string: "")` - Specifies a desired node metadata key/value pair - `node-meta` `(string: "")` **Deprecated** - Use `filter` with the `NodeMeta` selector instead.
This parameter will be removed in a future version of Consul.
Specifies a desired node metadata key/value pair
of the form `key:value`. This parameter can be specified multiple times, and of the form `key:value`. This parameter can be specified multiple times, and
filters the results to nodes with the specified key/value pairs. filters the results to nodes with the specified key/value pairs.
- `ns` `(string: "")` <EnterpriseAlert inline /> - Specifies the namespace of the services you lookup. - `ns` `(string: "")` <EnterpriseAlert inline /> - Specifies the namespace of the services you lookup.
You can also [specify the namespace through other methods](#methods-to-specify-namespace). You can also [specify the namespace through other methods](#methods-to-specify-namespace).
- `filter` `(string: "")` - Specifies the expression used to filter the
queries results prior to returning the data.
### Filtering
The filter will be executed against each Service mapping within the catalog.
The following selectors and filter operations are supported:
| Selector | Supported Operations |
| ---------------------------------------------------- | -------------------------------------------------- |
| `Address` | Equal, Not Equal, In, Not In, Matches, Not Matches |
| `Datacenter` | Equal, Not Equal, In, Not In, Matches, Not Matches |
| `ID` | Equal, Not Equal, In, Not In, Matches, Not Matches |
| `Node` | Equal, Not Equal, In, Not In, Matches, Not Matches |
| `NodeMeta.<any>` | Equal, Not Equal, In, Not In, Matches, Not Matches |
| `NodeMeta` | Is Empty, Is Not Empty, In, Not In |
| `ServiceAddress` | Equal, Not Equal, In, Not In, Matches, Not Matches |
| `ServiceConnect.Native` | Equal, Not Equal |
| `ServiceEnableTagOverride` | Equal, Not Equal |
| `ServiceID` | Equal, Not Equal, In, Not In, Matches, Not Matches |
| `ServiceKind` | Equal, Not Equal, In, Not In, Matches, Not Matches |
| `ServiceMeta.<any>` | Equal, Not Equal, In, Not In, Matches, Not Matches |
| `ServiceMeta` | Is Empty, Is Not Empty, In, Not In |
| `ServiceName` | Equal, Not Equal, In, Not In, Matches, Not Matches |
| `ServicePort` | Equal, Not Equal |
| `ServiceProxy.DestinationServiceID` | Equal, Not Equal, In, Not In, Matches, Not Matches |
| `ServiceProxy.DestinationServiceName` | Equal, Not Equal, In, Not In, Matches, Not Matches |
| `ServiceProxy.LocalServiceAddress` | Equal, Not Equal, In, Not In, Matches, Not Matches |
| `ServiceProxy.LocalServicePort` | Equal, Not Equal |
| `ServiceProxy.Mode` | Equal, Not Equal, In, Not In, Matches, Not Matches |
| `ServiceProxy.TransparentProxy.OutboundListenerPort` | Equal, Not Equal |
| `ServiceProxy.MeshGateway.Mode` | Equal, Not Equal, In, Not In, Matches, Not Matches |
| `ServiceProxy.Upstreams.Datacenter` | Equal, Not Equal, In, Not In, Matches, Not Matches |
| `ServiceProxy.Upstreams.DestinationName` | Equal, Not Equal, In, Not In, Matches, Not Matches |
| `ServiceProxy.Upstreams.DestinationNamespace` | Equal, Not Equal, In, Not In, Matches, Not Matches |
| `ServiceProxy.Upstreams.DestinationType` | Equal, Not Equal, In, Not In, Matches, Not Matches |
| `ServiceProxy.Upstreams.LocalBindAddress` | Equal, Not Equal, In, Not In, Matches, Not Matches |
| `ServiceProxy.Upstreams.LocalBindPort` | Equal, Not Equal |
| `ServiceProxy.Upstreams.MeshGateway.Mode` | Equal, Not Equal, In, Not In, Matches, Not Matches |
| `ServiceProxy.Upstreams` | Is Empty, Is Not Empty |
| `ServiceTaggedAddresses.<any>.Address` | Equal, Not Equal, In, Not In, Matches, Not Matches |
| `ServiceTaggedAddresses.<any>.Port` | Equal, Not Equal |
| `ServiceTaggedAddresses` | Is Empty, Is Not Empty, In, Not In |
| `ServiceTags` | In, Not In, Is Empty, Is Not Empty |
| `ServiceWeights.Passing` | Equal, Not Equal |
| `ServiceWeights.Warning` | Equal, Not Equal |
| `TaggedAddresses.<any>` | Equal, Not Equal, In, Not In, Matches, Not Matches |
| `TaggedAddresses` | Is Empty, Is Not Empty, In, Not In |
### Sample Request ### Sample Request
```shell-session ```shell-session