Merge pull request #11742 from dekimsey/catalog-service-list-filter

Add support for filtering the 'List Services' API
This commit is contained in:
Chris S. Kim 2022-08-26 11:35:20 -04:00 committed by GitHub
commit a8b2278968
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
7 changed files with 183 additions and 82 deletions

3
.changelog/11742.txt Normal file
View File

@ -0,0 +1,3 @@
```release-note:improvement
api: Add filtering support to Catalog's List Services (v1/catalog/services)
```

View File

@ -565,6 +565,11 @@ func (c *Catalog) ListServices(args *structs.DCSpecificRequest, reply *structs.I
return err
}
filter, err := bexpr.CreateFilter(args.Filter, nil, []*structs.ServiceNode{})
if err != nil {
return err
}
// Set reply enterprise metadata after resolving and validating the token so
// that we can properly infer metadata from the token.
reply.EnterpriseMeta = args.EnterpriseMeta
@ -574,10 +579,11 @@ func (c *Catalog) ListServices(args *structs.DCSpecificRequest, reply *structs.I
&reply.QueryMeta,
func(ws memdb.WatchSet, state *state.Store) error {
var err error
var serviceNodes structs.ServiceNodes
if len(args.NodeMetaFilters) > 0 {
reply.Index, reply.Services, err = state.ServicesByNodeMeta(ws, args.NodeMetaFilters, &args.EnterpriseMeta, args.PeerName)
reply.Index, serviceNodes, err = state.ServicesByNodeMeta(ws, args.NodeMetaFilters, &args.EnterpriseMeta, args.PeerName)
} else {
reply.Index, reply.Services, err = state.Services(ws, &args.EnterpriseMeta, args.PeerName)
reply.Index, serviceNodes, err = state.Services(ws, &args.EnterpriseMeta, args.PeerName)
}
if err != nil {
return err
@ -588,11 +594,43 @@ func (c *Catalog) ListServices(args *structs.DCSpecificRequest, reply *structs.I
return nil
}
raw, err := filter.Execute(serviceNodes)
if err != nil {
return err
}
reply.Services = servicesTagsByName(raw.(structs.ServiceNodes))
c.srv.filterACLWithAuthorizer(authz, reply)
return nil
})
}
func servicesTagsByName(services []*structs.ServiceNode) structs.Services {
unique := make(map[string]map[string]struct{})
for _, svc := range services {
tags, ok := unique[svc.ServiceName]
if !ok {
unique[svc.ServiceName] = make(map[string]struct{})
tags = unique[svc.ServiceName]
}
for _, tag := range svc.ServiceTags {
tags[tag] = struct{}{}
}
}
// Generate the output structure.
var results = make(structs.Services)
for service, tags := range unique {
results[service] = make([]string, 0, len(tags))
for tag := range tags {
results[service] = append(results[service], tag)
}
}
return results
}
// ServiceList is used to query the services in a DC.
// Returns services as a list of ServiceNames.
func (c *Catalog) ServiceList(args *structs.DCSpecificRequest, reply *structs.IndexedServiceList) error {

View File

@ -1523,6 +1523,45 @@ func TestCatalog_ListServices_NodeMetaFilter(t *testing.T) {
}
}
func TestCatalog_ListServices_Filter(t *testing.T) {
t.Parallel()
_, s1 := testServer(t)
codec := rpcClient(t, s1)
testrpc.WaitForTestAgent(t, s1.RPC, "dc1")
// prep the cluster with some data we can use in our filters
registerTestCatalogEntries(t, codec)
// Run the tests against the test server
t.Run("ListServices", func(t *testing.T) {
args := structs.DCSpecificRequest{
Datacenter: "dc1",
}
args.Filter = "ServiceName == redis"
out := new(structs.IndexedServices)
require.NoError(t, msgpackrpc.CallWithCodec(codec, "Catalog.ListServices", &args, out))
require.Contains(t, out.Services, "redis")
require.ElementsMatch(t, []string{"v1", "v2"}, out.Services["redis"])
args.Filter = "NodeMeta.os == NoSuchOS"
out = new(structs.IndexedServices)
require.NoError(t, msgpackrpc.CallWithCodec(codec, "Catalog.ListServices", &args, out))
require.Len(t, out.Services, 0)
args.Filter = "NodeMeta.NoSuchMetadata == linux"
out = new(structs.IndexedServices)
require.NoError(t, msgpackrpc.CallWithCodec(codec, "Catalog.ListServices", &args, out))
require.Len(t, out.Services, 0)
args.Filter = "InvalidField == linux"
out = new(structs.IndexedServices)
require.Error(t, msgpackrpc.CallWithCodec(codec, "Catalog.ListServices", &args, out))
})
}
func TestCatalog_ListServices_Blocking(t *testing.T) {
if testing.Short() {
t.Skip("too slow for testing.Short")

View File

@ -1134,7 +1134,7 @@ func terminatingGatewayVirtualIPsSupported(tx ReadTxn, ws memdb.WatchSet) (bool,
}
// Services returns all services along with a list of associated tags.
func (s *Store) Services(ws memdb.WatchSet, entMeta *acl.EnterpriseMeta, peerName string) (uint64, structs.Services, error) {
func (s *Store) Services(ws memdb.WatchSet, entMeta *acl.EnterpriseMeta, peerName string) (uint64, []*structs.ServiceNode, error) {
tx := s.db.Txn(false)
defer tx.Abort()
@ -1148,30 +1148,11 @@ func (s *Store) Services(ws memdb.WatchSet, entMeta *acl.EnterpriseMeta, peerNam
}
ws.Add(services.WatchCh())
// Rip through the services and enumerate them and their unique set of
// tags.
unique := make(map[string]map[string]struct{})
var result []*structs.ServiceNode
for service := services.Next(); service != nil; service = services.Next() {
svc := service.(*structs.ServiceNode)
tags, ok := unique[svc.ServiceName]
if !ok {
unique[svc.ServiceName] = make(map[string]struct{})
tags = unique[svc.ServiceName]
}
for _, tag := range svc.ServiceTags {
tags[tag] = struct{}{}
}
result = append(result, service.(*structs.ServiceNode))
}
// Generate the output structure.
var results = make(structs.Services)
for service, tags := range unique {
results[service] = make([]string, 0, len(tags))
for tag := range tags {
results[service] = append(results[service], tag)
}
}
return idx, results, nil
return idx, result, nil
}
func (s *Store) ServiceList(ws memdb.WatchSet, entMeta *acl.EnterpriseMeta, peerName string) (uint64, structs.ServiceList, error) {
@ -1212,7 +1193,7 @@ func serviceListTxn(tx ReadTxn, ws memdb.WatchSet, entMeta *acl.EnterpriseMeta,
}
// ServicesByNodeMeta returns all services, filtered by the given node metadata.
func (s *Store) ServicesByNodeMeta(ws memdb.WatchSet, filters map[string]string, entMeta *acl.EnterpriseMeta, peerName string) (uint64, structs.Services, error) {
func (s *Store) ServicesByNodeMeta(ws memdb.WatchSet, filters map[string]string, entMeta *acl.EnterpriseMeta, peerName string) (uint64, []*structs.ServiceNode, error) {
tx := s.db.Txn(false)
defer tx.Abort()
@ -1259,8 +1240,7 @@ func (s *Store) ServicesByNodeMeta(ws memdb.WatchSet, filters map[string]string,
}
allServicesCh := allServices.WatchCh()
// Populate the services map
unique := make(map[string]map[string]struct{})
var result structs.ServiceNodes
for node := nodes.Next(); node != nil; node = nodes.Next() {
n := node.(*structs.Node)
if len(filters) > 1 && !structs.SatisfiesMetaFilters(n.Meta, filters) {
@ -1274,30 +1254,11 @@ func (s *Store) ServicesByNodeMeta(ws memdb.WatchSet, filters map[string]string,
}
ws.AddWithLimit(watchLimit, services.WatchCh(), allServicesCh)
// Rip through the services and enumerate them and their unique set of
// tags.
for service := services.Next(); service != nil; service = services.Next() {
svc := service.(*structs.ServiceNode)
tags, ok := unique[svc.ServiceName]
if !ok {
unique[svc.ServiceName] = make(map[string]struct{})
tags = unique[svc.ServiceName]
}
for _, tag := range svc.ServiceTags {
tags[tag] = struct{}{}
}
result = append(result, service.(*structs.ServiceNode))
}
}
// Generate the output structure.
var results = make(structs.Services)
for service, tags := range unique {
results[service] = make([]string, 0, len(tags))
for tag := range tags {
results[service] = append(results[service], tag)
}
}
return idx, results, nil
return idx, result, nil
}
// maxIndexForService return the maximum Raft Index for a service

View File

@ -12,6 +12,8 @@ import (
"github.com/hashicorp/consul/acl"
"github.com/google/go-cmp/cmp"
"github.com/google/go-cmp/cmp/cmpopts"
"github.com/hashicorp/go-memdb"
"github.com/hashicorp/go-uuid"
"github.com/stretchr/testify/assert"
@ -2105,10 +2107,13 @@ func TestStateStore_Services(t *testing.T) {
Address: "1.1.1.1",
Port: 1111,
}
ns1.EnterpriseMeta.Normalize()
if err := s.EnsureService(2, "node1", ns1); err != nil {
t.Fatalf("err: %s", err)
}
testRegisterService(t, s, 3, "node1", "dogs")
ns1Dogs := testRegisterService(t, s, 3, "node1", "dogs")
ns1Dogs.EnterpriseMeta.Normalize()
testRegisterNode(t, s, 4, "node2")
ns2 := &structs.NodeService{
ID: "service3",
@ -2117,6 +2122,7 @@ func TestStateStore_Services(t *testing.T) {
Address: "1.1.1.1",
Port: 1111,
}
ns2.EnterpriseMeta.Normalize()
if err := s.EnsureService(5, "node2", ns2); err != nil {
t.Fatalf("err: %s", err)
}
@ -2134,19 +2140,13 @@ func TestStateStore_Services(t *testing.T) {
t.Fatalf("bad index: %d", idx)
}
// Verify the result. We sort the lists since the order is
// non-deterministic (it's built using a map internally).
expected := structs.Services{
"redis": []string{"prod", "primary", "replica"},
"dogs": []string{},
}
sort.Strings(expected["redis"])
for _, tags := range services {
sort.Strings(tags)
}
if !reflect.DeepEqual(expected, services) {
t.Fatalf("bad: %#v", services)
// Verify the result.
expected := []*structs.ServiceNode{
ns1Dogs.ToServiceNode("node1"),
ns1.ToServiceNode("node1"),
ns2.ToServiceNode("node2"),
}
assertDeepEqual(t, expected, services, cmpopts.IgnoreFields(structs.ServiceNode{}, "RaftIndex"))
// Deleting a node with a service should fire the watch.
if err := s.DeleteNode(6, "node1", nil, ""); err != nil {
@ -2185,6 +2185,7 @@ func TestStateStore_ServicesByNodeMeta(t *testing.T) {
Address: "1.1.1.1",
Port: 1111,
}
ns1.EnterpriseMeta.Normalize()
if err := s.EnsureService(2, "node0", ns1); err != nil {
t.Fatalf("err: %s", err)
}
@ -2195,6 +2196,7 @@ func TestStateStore_ServicesByNodeMeta(t *testing.T) {
Address: "1.1.1.1",
Port: 1111,
}
ns2.EnterpriseMeta.Normalize()
if err := s.EnsureService(3, "node1", ns2); err != nil {
t.Fatalf("err: %s", err)
}
@ -2209,11 +2211,10 @@ func TestStateStore_ServicesByNodeMeta(t *testing.T) {
if err != nil {
t.Fatalf("err: %s", err)
}
expected := structs.Services{
"redis": []string{"primary", "prod"},
expected := []*structs.ServiceNode{
ns1.ToServiceNode("node0"),
}
sort.Strings(res["redis"])
require.Equal(t, expected, res)
assertDeepEqual(t, res, expected, cmpopts.IgnoreFields(structs.ServiceNode{}, "RaftIndex"))
})
t.Run("Get all services using the common meta value", func(t *testing.T) {
@ -2221,11 +2222,12 @@ func TestStateStore_ServicesByNodeMeta(t *testing.T) {
if err != nil {
t.Fatalf("err: %s", err)
}
expected := structs.Services{
"redis": []string{"primary", "prod", "replica"},
require.Len(t, res, 2)
expected := []*structs.ServiceNode{
ns1.ToServiceNode("node0"),
ns2.ToServiceNode("node1"),
}
sort.Strings(res["redis"])
require.Equal(t, expected, res)
assertDeepEqual(t, res, expected, cmpopts.IgnoreFields(structs.ServiceNode{}, "RaftIndex"))
})
t.Run("Get an empty list for an invalid meta value", func(t *testing.T) {
@ -2233,8 +2235,8 @@ func TestStateStore_ServicesByNodeMeta(t *testing.T) {
if err != nil {
t.Fatalf("err: %s", err)
}
expected := structs.Services{}
require.Equal(t, expected, res)
var expected []*structs.ServiceNode
assertDeepEqual(t, res, expected, cmpopts.IgnoreFields(structs.ServiceNode{}, "RaftIndex"))
})
t.Run("Get the first node's service instance using multiple meta filters", func(t *testing.T) {
@ -2242,11 +2244,10 @@ func TestStateStore_ServicesByNodeMeta(t *testing.T) {
if err != nil {
t.Fatalf("err: %s", err)
}
expected := structs.Services{
"redis": []string{"primary", "prod"},
expected := []*structs.ServiceNode{
ns1.ToServiceNode("node0"),
}
sort.Strings(res["redis"])
require.Equal(t, expected, res)
assertDeepEqual(t, res, expected, cmpopts.IgnoreFields(structs.ServiceNode{}, "RaftIndex"))
})
t.Run("Registering some unrelated node + service should not fire the watch.", func(t *testing.T) {
@ -8810,3 +8811,10 @@ func setVirtualIPFlags(t *testing.T, s *Store) {
Value: "true",
}))
}
func assertDeepEqual(t *testing.T, x, y interface{}, opts ...cmp.Option) {
t.Helper()
if diff := cmp.Diff(x, y, opts...); diff != "" {
t.Fatalf("assertion failed: values are not equal\n--- expected\n+++ actual\n%v", diff)
}
}

View File

@ -146,13 +146,13 @@ func testRegisterServiceOpts(t *testing.T, s *Store, idx uint64, nodeID, service
// testRegisterServiceWithChange registers a service and allow ensuring the consul index is updated
// even if service already exists if using `modifyAccordingIndex`.
// This is done by setting the transaction ID in "version" meta so service will be updated if it already exists
func testRegisterServiceWithChange(t *testing.T, s *Store, idx uint64, nodeID, serviceID string, modifyAccordingIndex bool) {
testRegisterServiceWithChangeOpts(t, s, idx, nodeID, serviceID, modifyAccordingIndex)
func testRegisterServiceWithChange(t *testing.T, s *Store, idx uint64, nodeID, serviceID string, modifyAccordingIndex bool) *structs.NodeService {
return testRegisterServiceWithChangeOpts(t, s, idx, nodeID, serviceID, modifyAccordingIndex)
}
// testRegisterServiceWithChangeOpts is the same as testRegisterServiceWithChange with the addition of opts that can
// modify the service prior to writing.
func testRegisterServiceWithChangeOpts(t *testing.T, s *Store, idx uint64, nodeID, serviceID string, modifyAccordingIndex bool, opts ...func(service *structs.NodeService)) {
func testRegisterServiceWithChangeOpts(t *testing.T, s *Store, idx uint64, nodeID, serviceID string, modifyAccordingIndex bool, opts ...func(service *structs.NodeService)) *structs.NodeService {
meta := make(map[string]string)
if modifyAccordingIndex {
meta["version"] = fmt.Sprint(idx)
@ -183,14 +183,15 @@ func testRegisterServiceWithChangeOpts(t *testing.T, s *Store, idx uint64, nodeI
result.ServiceID != serviceID {
t.Fatalf("bad service: %#v", result)
}
return svc
}
// testRegisterService register a service with given transaction idx
// If the service already exists, transaction number might not be increased
// Use `testRegisterServiceWithChange()` if you want perform a registration that
// ensures the transaction is updated by setting idx in Meta of Service
func testRegisterService(t *testing.T, s *Store, idx uint64, nodeID, serviceID string) {
testRegisterServiceWithChange(t, s, idx, nodeID, serviceID, false)
func testRegisterService(t *testing.T, s *Store, idx uint64, nodeID, serviceID string) *structs.NodeService {
return testRegisterServiceWithChange(t, s, idx, nodeID, serviceID, false)
}
func testRegisterConnectService(t *testing.T, s *Store, idx uint64, nodeID, serviceID string) {

View File

@ -410,13 +410,64 @@ The corresponding CLI command is [`consul catalog services`](/commands/catalog/s
- `dc` `(string: "")` - Specifies the datacenter to query. This will default to
the datacenter of the agent being queried.
- `node-meta` `(string: "")` - Specifies a desired node metadata key/value pair
- `node-meta` `(string: "")` **Deprecated** - Use `filter` with the `NodeMeta` selector instead.
This parameter will be removed in a future version of Consul.
Specifies a desired node metadata key/value pair
of the form `key:value`. This parameter can be specified multiple times, and
filters the results to nodes with the specified key/value pairs.
- `ns` `(string: "")` <EnterpriseAlert inline /> - Specifies the namespace of the services you lookup.
You can also [specify the namespace through other methods](#methods-to-specify-namespace).
- `filter` `(string: "")` - Specifies the expression used to filter the
queries results prior to returning the data.
### Filtering
The filter will be executed against each Service mapping within the catalog.
The following selectors and filter operations are supported:
| Selector | Supported Operations |
| ---------------------------------------------------- | -------------------------------------------------- |
| `Address` | Equal, Not Equal, In, Not In, Matches, Not Matches |
| `Datacenter` | Equal, Not Equal, In, Not In, Matches, Not Matches |
| `ID` | Equal, Not Equal, In, Not In, Matches, Not Matches |
| `Node` | Equal, Not Equal, In, Not In, Matches, Not Matches |
| `NodeMeta.<any>` | Equal, Not Equal, In, Not In, Matches, Not Matches |
| `NodeMeta` | Is Empty, Is Not Empty, In, Not In |
| `ServiceAddress` | Equal, Not Equal, In, Not In, Matches, Not Matches |
| `ServiceConnect.Native` | Equal, Not Equal |
| `ServiceEnableTagOverride` | Equal, Not Equal |
| `ServiceID` | Equal, Not Equal, In, Not In, Matches, Not Matches |
| `ServiceKind` | Equal, Not Equal, In, Not In, Matches, Not Matches |
| `ServiceMeta.<any>` | Equal, Not Equal, In, Not In, Matches, Not Matches |
| `ServiceMeta` | Is Empty, Is Not Empty, In, Not In |
| `ServiceName` | Equal, Not Equal, In, Not In, Matches, Not Matches |
| `ServicePort` | Equal, Not Equal |
| `ServiceProxy.DestinationServiceID` | Equal, Not Equal, In, Not In, Matches, Not Matches |
| `ServiceProxy.DestinationServiceName` | Equal, Not Equal, In, Not In, Matches, Not Matches |
| `ServiceProxy.LocalServiceAddress` | Equal, Not Equal, In, Not In, Matches, Not Matches |
| `ServiceProxy.LocalServicePort` | Equal, Not Equal |
| `ServiceProxy.Mode` | Equal, Not Equal, In, Not In, Matches, Not Matches |
| `ServiceProxy.TransparentProxy.OutboundListenerPort` | Equal, Not Equal |
| `ServiceProxy.MeshGateway.Mode` | Equal, Not Equal, In, Not In, Matches, Not Matches |
| `ServiceProxy.Upstreams.Datacenter` | Equal, Not Equal, In, Not In, Matches, Not Matches |
| `ServiceProxy.Upstreams.DestinationName` | Equal, Not Equal, In, Not In, Matches, Not Matches |
| `ServiceProxy.Upstreams.DestinationNamespace` | Equal, Not Equal, In, Not In, Matches, Not Matches |
| `ServiceProxy.Upstreams.DestinationType` | Equal, Not Equal, In, Not In, Matches, Not Matches |
| `ServiceProxy.Upstreams.LocalBindAddress` | Equal, Not Equal, In, Not In, Matches, Not Matches |
| `ServiceProxy.Upstreams.LocalBindPort` | Equal, Not Equal |
| `ServiceProxy.Upstreams.MeshGateway.Mode` | Equal, Not Equal, In, Not In, Matches, Not Matches |
| `ServiceProxy.Upstreams` | Is Empty, Is Not Empty |
| `ServiceTaggedAddresses.<any>.Address` | Equal, Not Equal, In, Not In, Matches, Not Matches |
| `ServiceTaggedAddresses.<any>.Port` | Equal, Not Equal |
| `ServiceTaggedAddresses` | Is Empty, Is Not Empty, In, Not In |
| `ServiceTags` | In, Not In, Is Empty, Is Not Empty |
| `ServiceWeights.Passing` | Equal, Not Equal |
| `ServiceWeights.Warning` | Equal, Not Equal |
| `TaggedAddresses.<any>` | Equal, Not Equal, In, Not In, Matches, Not Matches |
| `TaggedAddresses` | Is Empty, Is Not Empty, In, Not In |
### Sample Request
```shell-session