From 06839645190858c40716802cf091a35ec4df19a7 Mon Sep 17 00:00:00 2001 From: Daniel Nephin Date: Mon, 8 Feb 2021 18:54:37 -0500 Subject: [PATCH] streaming: move ServiceTag and NodeMetaFiltering to the cache-entry So that all the client side filtering is in the same place. Previously only the bexpr filter was in the cache-entry. Also makes a small change to the filtering so that instead of rebuilding slices of items, the filtering can return a bool to determine if the event payload is saved or not. --- .../cache-types/streaming_health_services.go | 97 +++++++- .../streaming_health_services_test.go | 208 ++++++++++++++++++ agent/dns_test.go | 2 +- agent/rpcclient/health/health.go | 64 +----- 4 files changed, 298 insertions(+), 73 deletions(-) diff --git a/agent/cache-types/streaming_health_services.go b/agent/cache-types/streaming_health_services.go index efb1dba87..58c5fd278 100644 --- a/agent/cache-types/streaming_health_services.go +++ b/agent/cache-types/streaming_health_services.go @@ -5,6 +5,7 @@ import ( "fmt" "reflect" "sort" + "strings" "time" "github.com/hashicorp/go-bexpr" @@ -82,7 +83,7 @@ func (c *StreamingHealthServices) Fetch(opts cache.FetchOptions, req cache.Reque return req } - materializer, err := newMaterializer(c.deps, newReqFn, srvReq.Filter) + materializer, err := newMaterializer(c.deps, newReqFn, srvReq) if err != nil { return cache.FetchResult{}, err } @@ -100,9 +101,9 @@ func (c *StreamingHealthServices) Fetch(opts cache.FetchOptions, req cache.Reque func newMaterializer( deps MaterializerDeps, newRequestFn func(uint64) pbsubscribe.SubscribeRequest, - filter string, + req *structs.ServiceSpecificRequest, ) (*submatview.Materializer, error) { - view, err := newHealthView(filter) + view, err := newHealthView(req) if err != nil { return nil, err } @@ -139,8 +140,8 @@ func (s *streamingHealthState) Fetch(opts cache.FetchOptions) (cache.FetchResult return result, err } -func newHealthView(filterExpr string) (*healthView, error) { - fe, err := newFilterEvaluator(filterExpr) +func newHealthView(req *structs.ServiceSpecificRequest) (*healthView, error) { + fe, err := newFilterEvaluator(req) if err != nil { return nil, err } @@ -192,11 +193,44 @@ type filterEvaluator interface { Evaluate(datum interface{}) (bool, error) } -func newFilterEvaluator(expr string) (filterEvaluator, error) { - if expr == "" { - return noopFilterEvaluator{}, nil +func newFilterEvaluator(req *structs.ServiceSpecificRequest) (filterEvaluator, error) { + var evaluators []filterEvaluator + + typ := reflect.TypeOf(structs.CheckServiceNode{}) + if req.Filter != "" { + e, err := bexpr.CreateEvaluatorForType(req.Filter, nil, typ) + if err != nil { + return nil, err + } + evaluators = append(evaluators, e) + } + + if req.ServiceTag != "" { + // Handle backwards compat with old field + req.ServiceTags = []string{req.ServiceTag} + } + + if req.TagFilter && len(req.ServiceTags) > 0 { + evaluators = append(evaluators, serviceTagEvaluator{tags: req.ServiceTags}) + } + + for key, value := range req.NodeMetaFilters { + expr := fmt.Sprintf(`"%s" in Node.Meta.%s`, value, key) + e, err := bexpr.CreateEvaluatorForType(expr, nil, typ) + if err != nil { + return nil, err + } + evaluators = append(evaluators, e) + } + + switch len(evaluators) { + case 0: + return noopFilterEvaluator{}, nil + case 1: + return evaluators[0], nil + default: + return &multiFilterEvaluator{evaluators: evaluators}, nil } - return bexpr.CreateEvaluatorForType(expr, nil, reflect.TypeOf(structs.CheckServiceNode{})) } // noopFilterEvaluator may be used in place of a bexpr.Evaluator. The Evaluate @@ -207,6 +241,20 @@ func (noopFilterEvaluator) Evaluate(_ interface{}) (bool, error) { return true, nil } +type multiFilterEvaluator struct { + evaluators []filterEvaluator +} + +func (m multiFilterEvaluator) Evaluate(data interface{}) (bool, error) { + for _, e := range m.evaluators { + match, err := e.Evaluate(data) + if !match || err != nil { + return match, err + } + } + return true, nil +} + // sortCheckServiceNodes sorts the results to match memdb semantics // Sort results by Node.Node, if 2 instances match, order by Service.ID // Will allow result to be stable sorted and match queries without cache @@ -240,3 +288,34 @@ func (s *healthView) Result(index uint64) (interface{}, error) { func (s *healthView) Reset() { s.state = make(map[string]structs.CheckServiceNode) } + +// serviceTagEvaluator implements the filterEvaluator to perform filtering +// by service tags. bexpr can not be used at this time, because the filtering +// must be case insensitive for backwards compatibility. In the future this +// may be replaced with bexpr once case insensitive support is added. +type serviceTagEvaluator struct { + tags []string +} + +func (m serviceTagEvaluator) Evaluate(data interface{}) (bool, error) { + csn, ok := data.(structs.CheckServiceNode) + if !ok { + return false, fmt.Errorf("unexpected type %T for structs.CheckServiceNode filter", data) + } + for _, tag := range m.tags { + if !serviceHasTag(csn.Service, tag) { + // If any one of the expected tags was not found, filter the service + return false, nil + } + } + return true, nil +} + +func serviceHasTag(sn *structs.NodeService, tag string) bool { + for _, t := range sn.Tags { + if strings.EqualFold(t, tag) { + return true + } + } + return false +} diff --git a/agent/cache-types/streaming_health_services_test.go b/agent/cache-types/streaming_health_services_test.go index 496484820..a58967644 100644 --- a/agent/cache-types/streaming_health_services_test.go +++ b/agent/cache-types/streaming_health_services_test.go @@ -568,3 +568,211 @@ func runStep(t *testing.T, name string, fn func(t *testing.T)) { t.FailNow() } } + +func TestNewFilterEvaluator(t *testing.T) { + type testCase struct { + name string + req structs.ServiceSpecificRequest + data structs.CheckServiceNode + expected bool + } + + fn := func(t *testing.T, tc testCase) { + e, err := newFilterEvaluator(&tc.req) + require.NoError(t, err) + actual, err := e.Evaluate(tc.data) + require.NoError(t, err) + require.Equal(t, tc.expected, actual) + } + + var testCases = []testCase{ + { + name: "single ServiceTags match", + req: structs.ServiceSpecificRequest{ + ServiceTags: []string{"match"}, + TagFilter: true, + }, + data: structs.CheckServiceNode{ + Service: &structs.NodeService{ + Tags: []string{"extra", "match"}, + }, + }, + expected: true, + }, + { + name: "single deprecated ServiceTag match", + req: structs.ServiceSpecificRequest{ + ServiceTag: "match", + TagFilter: true, + }, + data: structs.CheckServiceNode{ + Service: &structs.NodeService{ + Tags: []string{"extra", "match"}, + }, + }, + expected: true, + }, + { + name: "single ServiceTags mismatch", + req: structs.ServiceSpecificRequest{ + ServiceTags: []string{"other"}, + TagFilter: true, + }, + data: structs.CheckServiceNode{ + Service: &structs.NodeService{ + Tags: []string{"extra", "match"}, + }, + }, + expected: false, + }, + { + name: "multiple ServiceTags match", + req: structs.ServiceSpecificRequest{ + ServiceTags: []string{"match", "second"}, + TagFilter: true, + }, + data: structs.CheckServiceNode{ + Service: &structs.NodeService{ + Tags: []string{"extra", "match", "second"}, + }, + }, + expected: true, + }, + { + name: "multiple ServiceTags mismatch", + req: structs.ServiceSpecificRequest{ + ServiceTags: []string{"match", "not"}, + TagFilter: true, + }, + data: structs.CheckServiceNode{ + Service: &structs.NodeService{ + Tags: []string{"extra", "match"}, + }, + }, + expected: false, + }, + { + name: "single NodeMetaFilter match", + req: structs.ServiceSpecificRequest{ + NodeMetaFilters: map[string]string{"meta1": "match"}, + }, + data: structs.CheckServiceNode{ + Node: &structs.Node{ + Meta: map[string]string{ + "meta1": "match", + "extra": "some", + }, + }, + }, + expected: true, + }, + { + name: "single NodeMetaFilter mismatch", + req: structs.ServiceSpecificRequest{ + NodeMetaFilters: map[string]string{ + "meta1": "match", + }, + }, + data: structs.CheckServiceNode{ + Node: &structs.Node{ + Meta: map[string]string{ + "meta1": "other", + "extra": "some", + }, + }, + }, + expected: false, + }, + { + name: "multiple NodeMetaFilter match", + req: structs.ServiceSpecificRequest{ + NodeMetaFilters: map[string]string{"meta1": "match", "meta2": "a"}, + }, + data: structs.CheckServiceNode{ + Node: &structs.Node{ + Meta: map[string]string{ + "meta1": "match", + "meta2": "a", + "extra": "some", + }, + }, + }, + expected: true, + }, + { + name: "multiple NodeMetaFilter mismatch", + req: structs.ServiceSpecificRequest{ + NodeMetaFilters: map[string]string{ + "meta1": "match", + "meta2": "beta", + }, + }, + data: structs.CheckServiceNode{ + Node: &structs.Node{ + Meta: map[string]string{ + "meta1": "other", + "meta2": "gamma", + }, + }, + }, + expected: false, + }, + { + name: "QueryOptions.Filter match", + req: structs.ServiceSpecificRequest{ + QueryOptions: structs.QueryOptions{ + Filter: `Node.Node == "node3"`, + }, + }, + data: structs.CheckServiceNode{ + Node: &structs.Node{Node: "node3"}, + }, + expected: true, + }, + { + name: "QueryOptions.Filter mismatch", + req: structs.ServiceSpecificRequest{ + QueryOptions: structs.QueryOptions{ + Filter: `Node.Node == "node2"`, + }, + }, + data: structs.CheckServiceNode{ + Node: &structs.Node{Node: "node3"}, + }, + expected: false, + }, + { + name: "all match", + req: structs.ServiceSpecificRequest{ + QueryOptions: structs.QueryOptions{ + Filter: `Node.Node == "node3"`, + }, + ServiceTags: []string{"tag1", "tag2"}, + NodeMetaFilters: map[string]string{ + "meta1": "match1", + "meta2": "match2", + }, + }, + data: structs.CheckServiceNode{ + Node: &structs.Node{ + Node: "node3", + Meta: map[string]string{ + "meta1": "match1", + "meta2": "match2", + "extra": "other", + }, + }, + Service: &structs.NodeService{ + Tags: []string{"tag1", "tag2", "extra"}, + }, + }, + expected: true, + }, + } + + for _, tc := range testCases { + t.Run(tc.name, func(t *testing.T) { + fn(t, tc) + }) + } +} diff --git a/agent/dns_test.go b/agent/dns_test.go index e7506db0f..08583caee 100644 --- a/agent/dns_test.go +++ b/agent/dns_test.go @@ -3100,7 +3100,7 @@ func TestDNS_CaseInsensitiveServiceLookup(t *testing.T) { } if len(in.Answer) != 1 { - t.Fatalf("empty lookup: %#v", in) + t.Fatalf("question %v, empty lookup: %#v", question, in) } }) } diff --git a/agent/rpcclient/health/health.go b/agent/rpcclient/health/health.go index 2b28cbf3f..adc85e83c 100644 --- a/agent/rpcclient/health/health.go +++ b/agent/rpcclient/health/health.go @@ -2,7 +2,6 @@ package health import ( "context" - "strings" "github.com/hashicorp/consul/agent/cache" "github.com/hashicorp/consul/agent/structs" @@ -70,66 +69,5 @@ func (c *Client) getServiceNodes( panic("wrong response type for cachetype.HealthServicesName") } - return filterTags(filterNodeMeta(value, req), req), md, nil -} - -func filterTags(out *structs.IndexedCheckServiceNodes, req structs.ServiceSpecificRequest) structs.IndexedCheckServiceNodes { - if !req.TagFilter || len(req.ServiceTags) == 0 || len(out.Nodes) == 0 { - return *out - } - tags := make([]string, 0, len(req.ServiceTags)) - for _, r := range req.ServiceTags { - tags = append(tags, strings.ToLower(r)) - } - results := make(structs.CheckServiceNodes, 0, len(out.Nodes)) - for _, service := range out.Nodes { - svc := service.Service - if !serviceTagsFilter(svc, tags) { - results = append(results, service) - } - } - out.Nodes = results - return *out -} - -// serviceTagsFilter return true if service does not contains all the given tags -func serviceTagsFilter(sn *structs.NodeService, tags []string) bool { - for _, tag := range tags { - if serviceTagFilter(sn, tag) { - // If any one of the expected tags was not found, filter the service - return true - } - } - - // If all tags were found, don't filter the service - return false -} - -// serviceTagFilter returns true (should filter) if the given service node -// doesn't contain the given tag. -func serviceTagFilter(sn *structs.NodeService, tag string) bool { - // Look for the lower cased version of the tag. - for _, t := range sn.Tags { - if strings.ToLower(t) == tag { - return false - } - } - - // If we didn't hit the tag above then we should filter. - return true -} - -func filterNodeMeta(out *structs.IndexedCheckServiceNodes, req structs.ServiceSpecificRequest) *structs.IndexedCheckServiceNodes { - if len(req.NodeMetaFilters) == 0 || len(out.Nodes) == 0 { - return out - } - results := make(structs.CheckServiceNodes, 0, len(out.Nodes)) - for _, service := range out.Nodes { - serviceNode := service.Node - if structs.SatisfiesMetaFilters(serviceNode.Meta, req.NodeMetaFilters) { - results = append(results, service) - } - } - out.Nodes = results - return out + return *value, md, nil }