diff --git a/.changelog/11742.txt b/.changelog/11742.txt new file mode 100644 index 000000000..6c6d4c249 --- /dev/null +++ b/.changelog/11742.txt @@ -0,0 +1,3 @@ +```release-note:improvement +api: Add filtering support to Catalog's List Services (v1/catalog/services) +``` diff --git a/agent/consul/catalog_endpoint.go b/agent/consul/catalog_endpoint.go index 111ee7b2b..696ae314a 100644 --- a/agent/consul/catalog_endpoint.go +++ b/agent/consul/catalog_endpoint.go @@ -565,6 +565,11 @@ func (c *Catalog) ListServices(args *structs.DCSpecificRequest, reply *structs.I return err } + filter, err := bexpr.CreateFilter(args.Filter, nil, []*structs.ServiceNode{}) + if err != nil { + return err + } + // Set reply enterprise metadata after resolving and validating the token so // that we can properly infer metadata from the token. reply.EnterpriseMeta = args.EnterpriseMeta @@ -574,10 +579,11 @@ func (c *Catalog) ListServices(args *structs.DCSpecificRequest, reply *structs.I &reply.QueryMeta, func(ws memdb.WatchSet, state *state.Store) error { var err error + var serviceNodes structs.ServiceNodes if len(args.NodeMetaFilters) > 0 { - reply.Index, reply.Services, err = state.ServicesByNodeMeta(ws, args.NodeMetaFilters, &args.EnterpriseMeta, args.PeerName) + reply.Index, serviceNodes, err = state.ServicesByNodeMeta(ws, args.NodeMetaFilters, &args.EnterpriseMeta, args.PeerName) } else { - reply.Index, reply.Services, err = state.Services(ws, &args.EnterpriseMeta, args.PeerName) + reply.Index, serviceNodes, err = state.Services(ws, &args.EnterpriseMeta, args.PeerName) } if err != nil { return err @@ -588,11 +594,43 @@ func (c *Catalog) ListServices(args *structs.DCSpecificRequest, reply *structs.I return nil } + raw, err := filter.Execute(serviceNodes) + if err != nil { + return err + } + + reply.Services = servicesTagsByName(raw.(structs.ServiceNodes)) + c.srv.filterACLWithAuthorizer(authz, reply) + return nil }) } +func servicesTagsByName(services []*structs.ServiceNode) structs.Services { + unique := make(map[string]map[string]struct{}) + for _, svc := range services { + tags, ok := unique[svc.ServiceName] + if !ok { + unique[svc.ServiceName] = make(map[string]struct{}) + tags = unique[svc.ServiceName] + } + for _, tag := range svc.ServiceTags { + tags[tag] = struct{}{} + } + } + + // Generate the output structure. + var results = make(structs.Services) + for service, tags := range unique { + results[service] = make([]string, 0, len(tags)) + for tag := range tags { + results[service] = append(results[service], tag) + } + } + return results +} + // ServiceList is used to query the services in a DC. // Returns services as a list of ServiceNames. func (c *Catalog) ServiceList(args *structs.DCSpecificRequest, reply *structs.IndexedServiceList) error { diff --git a/agent/consul/catalog_endpoint_test.go b/agent/consul/catalog_endpoint_test.go index ca00efaea..daa22c90c 100644 --- a/agent/consul/catalog_endpoint_test.go +++ b/agent/consul/catalog_endpoint_test.go @@ -1523,6 +1523,45 @@ func TestCatalog_ListServices_NodeMetaFilter(t *testing.T) { } } +func TestCatalog_ListServices_Filter(t *testing.T) { + t.Parallel() + _, s1 := testServer(t) + codec := rpcClient(t, s1) + + testrpc.WaitForTestAgent(t, s1.RPC, "dc1") + + // prep the cluster with some data we can use in our filters + registerTestCatalogEntries(t, codec) + + // Run the tests against the test server + + t.Run("ListServices", func(t *testing.T) { + args := structs.DCSpecificRequest{ + Datacenter: "dc1", + } + + args.Filter = "ServiceName == redis" + out := new(structs.IndexedServices) + require.NoError(t, msgpackrpc.CallWithCodec(codec, "Catalog.ListServices", &args, out)) + require.Contains(t, out.Services, "redis") + require.ElementsMatch(t, []string{"v1", "v2"}, out.Services["redis"]) + + args.Filter = "NodeMeta.os == NoSuchOS" + out = new(structs.IndexedServices) + require.NoError(t, msgpackrpc.CallWithCodec(codec, "Catalog.ListServices", &args, out)) + require.Len(t, out.Services, 0) + + args.Filter = "NodeMeta.NoSuchMetadata == linux" + out = new(structs.IndexedServices) + require.NoError(t, msgpackrpc.CallWithCodec(codec, "Catalog.ListServices", &args, out)) + require.Len(t, out.Services, 0) + + args.Filter = "InvalidField == linux" + out = new(structs.IndexedServices) + require.Error(t, msgpackrpc.CallWithCodec(codec, "Catalog.ListServices", &args, out)) + }) +} + func TestCatalog_ListServices_Blocking(t *testing.T) { if testing.Short() { t.Skip("too slow for testing.Short") diff --git a/agent/consul/state/catalog.go b/agent/consul/state/catalog.go index 258519d5b..879c59f74 100644 --- a/agent/consul/state/catalog.go +++ b/agent/consul/state/catalog.go @@ -1134,7 +1134,7 @@ func terminatingGatewayVirtualIPsSupported(tx ReadTxn, ws memdb.WatchSet) (bool, } // Services returns all services along with a list of associated tags. -func (s *Store) Services(ws memdb.WatchSet, entMeta *acl.EnterpriseMeta, peerName string) (uint64, structs.Services, error) { +func (s *Store) Services(ws memdb.WatchSet, entMeta *acl.EnterpriseMeta, peerName string) (uint64, []*structs.ServiceNode, error) { tx := s.db.Txn(false) defer tx.Abort() @@ -1148,30 +1148,11 @@ func (s *Store) Services(ws memdb.WatchSet, entMeta *acl.EnterpriseMeta, peerNam } ws.Add(services.WatchCh()) - // Rip through the services and enumerate them and their unique set of - // tags. - unique := make(map[string]map[string]struct{}) + var result []*structs.ServiceNode for service := services.Next(); service != nil; service = services.Next() { - svc := service.(*structs.ServiceNode) - tags, ok := unique[svc.ServiceName] - if !ok { - unique[svc.ServiceName] = make(map[string]struct{}) - tags = unique[svc.ServiceName] - } - for _, tag := range svc.ServiceTags { - tags[tag] = struct{}{} - } + result = append(result, service.(*structs.ServiceNode)) } - - // Generate the output structure. - var results = make(structs.Services) - for service, tags := range unique { - results[service] = make([]string, 0, len(tags)) - for tag := range tags { - results[service] = append(results[service], tag) - } - } - return idx, results, nil + return idx, result, nil } func (s *Store) ServiceList(ws memdb.WatchSet, entMeta *acl.EnterpriseMeta, peerName string) (uint64, structs.ServiceList, error) { @@ -1212,7 +1193,7 @@ func serviceListTxn(tx ReadTxn, ws memdb.WatchSet, entMeta *acl.EnterpriseMeta, } // ServicesByNodeMeta returns all services, filtered by the given node metadata. -func (s *Store) ServicesByNodeMeta(ws memdb.WatchSet, filters map[string]string, entMeta *acl.EnterpriseMeta, peerName string) (uint64, structs.Services, error) { +func (s *Store) ServicesByNodeMeta(ws memdb.WatchSet, filters map[string]string, entMeta *acl.EnterpriseMeta, peerName string) (uint64, []*structs.ServiceNode, error) { tx := s.db.Txn(false) defer tx.Abort() @@ -1259,8 +1240,7 @@ func (s *Store) ServicesByNodeMeta(ws memdb.WatchSet, filters map[string]string, } allServicesCh := allServices.WatchCh() - // Populate the services map - unique := make(map[string]map[string]struct{}) + var result structs.ServiceNodes for node := nodes.Next(); node != nil; node = nodes.Next() { n := node.(*structs.Node) if len(filters) > 1 && !structs.SatisfiesMetaFilters(n.Meta, filters) { @@ -1274,30 +1254,11 @@ func (s *Store) ServicesByNodeMeta(ws memdb.WatchSet, filters map[string]string, } ws.AddWithLimit(watchLimit, services.WatchCh(), allServicesCh) - // Rip through the services and enumerate them and their unique set of - // tags. for service := services.Next(); service != nil; service = services.Next() { - svc := service.(*structs.ServiceNode) - tags, ok := unique[svc.ServiceName] - if !ok { - unique[svc.ServiceName] = make(map[string]struct{}) - tags = unique[svc.ServiceName] - } - for _, tag := range svc.ServiceTags { - tags[tag] = struct{}{} - } + result = append(result, service.(*structs.ServiceNode)) } } - - // Generate the output structure. - var results = make(structs.Services) - for service, tags := range unique { - results[service] = make([]string, 0, len(tags)) - for tag := range tags { - results[service] = append(results[service], tag) - } - } - return idx, results, nil + return idx, result, nil } // maxIndexForService return the maximum Raft Index for a service diff --git a/agent/consul/state/catalog_test.go b/agent/consul/state/catalog_test.go index 10e7af6db..ca2bded03 100644 --- a/agent/consul/state/catalog_test.go +++ b/agent/consul/state/catalog_test.go @@ -12,6 +12,8 @@ import ( "github.com/hashicorp/consul/acl" + "github.com/google/go-cmp/cmp" + "github.com/google/go-cmp/cmp/cmpopts" "github.com/hashicorp/go-memdb" "github.com/hashicorp/go-uuid" "github.com/stretchr/testify/assert" @@ -2105,7 +2107,7 @@ func TestStateStore_Services(t *testing.T) { if err := s.EnsureService(2, "node1", ns1); err != nil { t.Fatalf("err: %s", err) } - testRegisterService(t, s, 3, "node1", "dogs") + ns1Dogs := testRegisterService(t, s, 3, "node1", "dogs") testRegisterNode(t, s, 4, "node2") ns2 := &structs.NodeService{ ID: "service3", @@ -2131,19 +2133,13 @@ func TestStateStore_Services(t *testing.T) { t.Fatalf("bad index: %d", idx) } - // Verify the result. We sort the lists since the order is - // non-deterministic (it's built using a map internally). - expected := structs.Services{ - "redis": []string{"prod", "primary", "replica"}, - "dogs": []string{}, - } - sort.Strings(expected["redis"]) - for _, tags := range services { - sort.Strings(tags) - } - if !reflect.DeepEqual(expected, services) { - t.Fatalf("bad: %#v", services) + // Verify the result. + expected := []*structs.ServiceNode{ + ns1Dogs.ToServiceNode("node1"), + ns1.ToServiceNode("node1"), + ns2.ToServiceNode("node2"), } + assertDeepEqual(t, services, expected, cmpopts.IgnoreFields(structs.ServiceNode{}, "RaftIndex")) // Deleting a node with a service should fire the watch. if err := s.DeleteNode(6, "node1", nil, ""); err != nil { @@ -2206,11 +2202,10 @@ func TestStateStore_ServicesByNodeMeta(t *testing.T) { if err != nil { t.Fatalf("err: %s", err) } - expected := structs.Services{ - "redis": []string{"primary", "prod"}, + expected := []*structs.ServiceNode{ + ns1.ToServiceNode("node0"), } - sort.Strings(res["redis"]) - require.Equal(t, expected, res) + assertDeepEqual(t, res, expected, cmpopts.IgnoreFields(structs.ServiceNode{}, "RaftIndex")) }) t.Run("Get all services using the common meta value", func(t *testing.T) { @@ -2218,11 +2213,12 @@ func TestStateStore_ServicesByNodeMeta(t *testing.T) { if err != nil { t.Fatalf("err: %s", err) } - expected := structs.Services{ - "redis": []string{"primary", "prod", "replica"}, + require.Len(t, res, 2) + expected := []*structs.ServiceNode{ + ns1.ToServiceNode("node0"), + ns2.ToServiceNode("node1"), } - sort.Strings(res["redis"]) - require.Equal(t, expected, res) + assertDeepEqual(t, res, expected, cmpopts.IgnoreFields(structs.ServiceNode{}, "RaftIndex")) }) t.Run("Get an empty list for an invalid meta value", func(t *testing.T) { @@ -2230,8 +2226,8 @@ func TestStateStore_ServicesByNodeMeta(t *testing.T) { if err != nil { t.Fatalf("err: %s", err) } - expected := structs.Services{} - require.Equal(t, expected, res) + var expected []*structs.ServiceNode + assertDeepEqual(t, res, expected, cmpopts.IgnoreFields(structs.ServiceNode{}, "RaftIndex")) }) t.Run("Get the first node's service instance using multiple meta filters", func(t *testing.T) { @@ -2239,11 +2235,10 @@ func TestStateStore_ServicesByNodeMeta(t *testing.T) { if err != nil { t.Fatalf("err: %s", err) } - expected := structs.Services{ - "redis": []string{"primary", "prod"}, + expected := []*structs.ServiceNode{ + ns1.ToServiceNode("node0"), } - sort.Strings(res["redis"]) - require.Equal(t, expected, res) + assertDeepEqual(t, res, expected, cmpopts.IgnoreFields(structs.ServiceNode{}, "RaftIndex")) }) t.Run("Registering some unrelated node + service should not fire the watch.", func(t *testing.T) { @@ -8807,3 +8802,10 @@ func setVirtualIPFlags(t *testing.T, s *Store) { Value: "true", })) } + +func assertDeepEqual(t *testing.T, x, y interface{}, opts ...cmp.Option) { + t.Helper() + if diff := cmp.Diff(x, y, opts...); diff != "" { + t.Fatalf("assertion failed: values are not equal\n--- expected\n+++ actual\n%v", diff) + } +} diff --git a/agent/consul/state/state_store_test.go b/agent/consul/state/state_store_test.go index c8460ca82..88e5418c8 100644 --- a/agent/consul/state/state_store_test.go +++ b/agent/consul/state/state_store_test.go @@ -146,13 +146,13 @@ func testRegisterServiceOpts(t *testing.T, s *Store, idx uint64, nodeID, service // testRegisterServiceWithChange registers a service and allow ensuring the consul index is updated // even if service already exists if using `modifyAccordingIndex`. // This is done by setting the transaction ID in "version" meta so service will be updated if it already exists -func testRegisterServiceWithChange(t *testing.T, s *Store, idx uint64, nodeID, serviceID string, modifyAccordingIndex bool) { - testRegisterServiceWithChangeOpts(t, s, idx, nodeID, serviceID, modifyAccordingIndex) +func testRegisterServiceWithChange(t *testing.T, s *Store, idx uint64, nodeID, serviceID string, modifyAccordingIndex bool) *structs.NodeService { + return testRegisterServiceWithChangeOpts(t, s, idx, nodeID, serviceID, modifyAccordingIndex) } // testRegisterServiceWithChangeOpts is the same as testRegisterServiceWithChange with the addition of opts that can // modify the service prior to writing. -func testRegisterServiceWithChangeOpts(t *testing.T, s *Store, idx uint64, nodeID, serviceID string, modifyAccordingIndex bool, opts ...func(service *structs.NodeService)) { +func testRegisterServiceWithChangeOpts(t *testing.T, s *Store, idx uint64, nodeID, serviceID string, modifyAccordingIndex bool, opts ...func(service *structs.NodeService)) *structs.NodeService { meta := make(map[string]string) if modifyAccordingIndex { meta["version"] = fmt.Sprint(idx) @@ -183,14 +183,15 @@ func testRegisterServiceWithChangeOpts(t *testing.T, s *Store, idx uint64, nodeI result.ServiceID != serviceID { t.Fatalf("bad service: %#v", result) } + return svc } // testRegisterService register a service with given transaction idx // If the service already exists, transaction number might not be increased // Use `testRegisterServiceWithChange()` if you want perform a registration that // ensures the transaction is updated by setting idx in Meta of Service -func testRegisterService(t *testing.T, s *Store, idx uint64, nodeID, serviceID string) { - testRegisterServiceWithChange(t, s, idx, nodeID, serviceID, false) +func testRegisterService(t *testing.T, s *Store, idx uint64, nodeID, serviceID string) *structs.NodeService { + return testRegisterServiceWithChange(t, s, idx, nodeID, serviceID, false) } func testRegisterConnectService(t *testing.T, s *Store, idx uint64, nodeID, serviceID string) { diff --git a/website/content/api-docs/catalog.mdx b/website/content/api-docs/catalog.mdx index b25917685..86480ab79 100644 --- a/website/content/api-docs/catalog.mdx +++ b/website/content/api-docs/catalog.mdx @@ -410,13 +410,64 @@ The corresponding CLI command is [`consul catalog services`](/commands/catalog/s - `dc` `(string: "")` - Specifies the datacenter to query. This will default to the datacenter of the agent being queried. -- `node-meta` `(string: "")` - Specifies a desired node metadata key/value pair +- `node-meta` `(string: "")` **Deprecated** - Use `filter` with the `NodeMeta` selector instead. + This parameter will be removed in a future version of Consul. + Specifies a desired node metadata key/value pair of the form `key:value`. This parameter can be specified multiple times, and filters the results to nodes with the specified key/value pairs. - `ns` `(string: "")` - Specifies the namespace of the services you lookup. You can also [specify the namespace through other methods](#methods-to-specify-namespace). +- `filter` `(string: "")` - Specifies the expression used to filter the + queries results prior to returning the data. + +### Filtering + +The filter will be executed against each Service mapping within the catalog. +The following selectors and filter operations are supported: + +| Selector | Supported Operations | +| ---------------------------------------------------- | -------------------------------------------------- | +| `Address` | Equal, Not Equal, In, Not In, Matches, Not Matches | +| `Datacenter` | Equal, Not Equal, In, Not In, Matches, Not Matches | +| `ID` | Equal, Not Equal, In, Not In, Matches, Not Matches | +| `Node` | Equal, Not Equal, In, Not In, Matches, Not Matches | +| `NodeMeta.` | Equal, Not Equal, In, Not In, Matches, Not Matches | +| `NodeMeta` | Is Empty, Is Not Empty, In, Not In | +| `ServiceAddress` | Equal, Not Equal, In, Not In, Matches, Not Matches | +| `ServiceConnect.Native` | Equal, Not Equal | +| `ServiceEnableTagOverride` | Equal, Not Equal | +| `ServiceID` | Equal, Not Equal, In, Not In, Matches, Not Matches | +| `ServiceKind` | Equal, Not Equal, In, Not In, Matches, Not Matches | +| `ServiceMeta.` | Equal, Not Equal, In, Not In, Matches, Not Matches | +| `ServiceMeta` | Is Empty, Is Not Empty, In, Not In | +| `ServiceName` | Equal, Not Equal, In, Not In, Matches, Not Matches | +| `ServicePort` | Equal, Not Equal | +| `ServiceProxy.DestinationServiceID` | Equal, Not Equal, In, Not In, Matches, Not Matches | +| `ServiceProxy.DestinationServiceName` | Equal, Not Equal, In, Not In, Matches, Not Matches | +| `ServiceProxy.LocalServiceAddress` | Equal, Not Equal, In, Not In, Matches, Not Matches | +| `ServiceProxy.LocalServicePort` | Equal, Not Equal | +| `ServiceProxy.Mode` | Equal, Not Equal, In, Not In, Matches, Not Matches | +| `ServiceProxy.TransparentProxy.OutboundListenerPort` | Equal, Not Equal | +| `ServiceProxy.MeshGateway.Mode` | Equal, Not Equal, In, Not In, Matches, Not Matches | +| `ServiceProxy.Upstreams.Datacenter` | Equal, Not Equal, In, Not In, Matches, Not Matches | +| `ServiceProxy.Upstreams.DestinationName` | Equal, Not Equal, In, Not In, Matches, Not Matches | +| `ServiceProxy.Upstreams.DestinationNamespace` | Equal, Not Equal, In, Not In, Matches, Not Matches | +| `ServiceProxy.Upstreams.DestinationType` | Equal, Not Equal, In, Not In, Matches, Not Matches | +| `ServiceProxy.Upstreams.LocalBindAddress` | Equal, Not Equal, In, Not In, Matches, Not Matches | +| `ServiceProxy.Upstreams.LocalBindPort` | Equal, Not Equal | +| `ServiceProxy.Upstreams.MeshGateway.Mode` | Equal, Not Equal, In, Not In, Matches, Not Matches | +| `ServiceProxy.Upstreams` | Is Empty, Is Not Empty | +| `ServiceTaggedAddresses..Address` | Equal, Not Equal, In, Not In, Matches, Not Matches | +| `ServiceTaggedAddresses..Port` | Equal, Not Equal | +| `ServiceTaggedAddresses` | Is Empty, Is Not Empty, In, Not In | +| `ServiceTags` | In, Not In, Is Empty, Is Not Empty | +| `ServiceWeights.Passing` | Equal, Not Equal | +| `ServiceWeights.Warning` | Equal, Not Equal | +| `TaggedAddresses.` | Equal, Not Equal, In, Not In, Matches, Not Matches | +| `TaggedAddresses` | Is Empty, Is Not Empty, In, Not In | + ### Sample Request ```shell-session