streaming: move ServiceTag and NodeMetaFiltering to the cache-entry
So that all the client side filtering is in the same place. Previously only the bexpr filter was in the cache-entry. Also makes a small change to the filtering so that instead of rebuilding slices of items, the filtering can return a bool to determine if the event payload is saved or not.
This commit is contained in:
parent
54dbcd0bb9
commit
0683964519
|
@ -5,6 +5,7 @@ import (
|
|||
"fmt"
|
||||
"reflect"
|
||||
"sort"
|
||||
"strings"
|
||||
"time"
|
||||
|
||||
"github.com/hashicorp/go-bexpr"
|
||||
|
@ -82,7 +83,7 @@ func (c *StreamingHealthServices) Fetch(opts cache.FetchOptions, req cache.Reque
|
|||
return req
|
||||
}
|
||||
|
||||
materializer, err := newMaterializer(c.deps, newReqFn, srvReq.Filter)
|
||||
materializer, err := newMaterializer(c.deps, newReqFn, srvReq)
|
||||
if err != nil {
|
||||
return cache.FetchResult{}, err
|
||||
}
|
||||
|
@ -100,9 +101,9 @@ func (c *StreamingHealthServices) Fetch(opts cache.FetchOptions, req cache.Reque
|
|||
func newMaterializer(
|
||||
deps MaterializerDeps,
|
||||
newRequestFn func(uint64) pbsubscribe.SubscribeRequest,
|
||||
filter string,
|
||||
req *structs.ServiceSpecificRequest,
|
||||
) (*submatview.Materializer, error) {
|
||||
view, err := newHealthView(filter)
|
||||
view, err := newHealthView(req)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
@ -139,8 +140,8 @@ func (s *streamingHealthState) Fetch(opts cache.FetchOptions) (cache.FetchResult
|
|||
return result, err
|
||||
}
|
||||
|
||||
func newHealthView(filterExpr string) (*healthView, error) {
|
||||
fe, err := newFilterEvaluator(filterExpr)
|
||||
func newHealthView(req *structs.ServiceSpecificRequest) (*healthView, error) {
|
||||
fe, err := newFilterEvaluator(req)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
@ -192,11 +193,44 @@ type filterEvaluator interface {
|
|||
Evaluate(datum interface{}) (bool, error)
|
||||
}
|
||||
|
||||
func newFilterEvaluator(expr string) (filterEvaluator, error) {
|
||||
if expr == "" {
|
||||
return noopFilterEvaluator{}, nil
|
||||
func newFilterEvaluator(req *structs.ServiceSpecificRequest) (filterEvaluator, error) {
|
||||
var evaluators []filterEvaluator
|
||||
|
||||
typ := reflect.TypeOf(structs.CheckServiceNode{})
|
||||
if req.Filter != "" {
|
||||
e, err := bexpr.CreateEvaluatorForType(req.Filter, nil, typ)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
evaluators = append(evaluators, e)
|
||||
}
|
||||
|
||||
if req.ServiceTag != "" {
|
||||
// Handle backwards compat with old field
|
||||
req.ServiceTags = []string{req.ServiceTag}
|
||||
}
|
||||
|
||||
if req.TagFilter && len(req.ServiceTags) > 0 {
|
||||
evaluators = append(evaluators, serviceTagEvaluator{tags: req.ServiceTags})
|
||||
}
|
||||
|
||||
for key, value := range req.NodeMetaFilters {
|
||||
expr := fmt.Sprintf(`"%s" in Node.Meta.%s`, value, key)
|
||||
e, err := bexpr.CreateEvaluatorForType(expr, nil, typ)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
evaluators = append(evaluators, e)
|
||||
}
|
||||
|
||||
switch len(evaluators) {
|
||||
case 0:
|
||||
return noopFilterEvaluator{}, nil
|
||||
case 1:
|
||||
return evaluators[0], nil
|
||||
default:
|
||||
return &multiFilterEvaluator{evaluators: evaluators}, nil
|
||||
}
|
||||
return bexpr.CreateEvaluatorForType(expr, nil, reflect.TypeOf(structs.CheckServiceNode{}))
|
||||
}
|
||||
|
||||
// noopFilterEvaluator may be used in place of a bexpr.Evaluator. The Evaluate
|
||||
|
@ -207,6 +241,20 @@ func (noopFilterEvaluator) Evaluate(_ interface{}) (bool, error) {
|
|||
return true, nil
|
||||
}
|
||||
|
||||
type multiFilterEvaluator struct {
|
||||
evaluators []filterEvaluator
|
||||
}
|
||||
|
||||
func (m multiFilterEvaluator) Evaluate(data interface{}) (bool, error) {
|
||||
for _, e := range m.evaluators {
|
||||
match, err := e.Evaluate(data)
|
||||
if !match || err != nil {
|
||||
return match, err
|
||||
}
|
||||
}
|
||||
return true, nil
|
||||
}
|
||||
|
||||
// sortCheckServiceNodes sorts the results to match memdb semantics
|
||||
// Sort results by Node.Node, if 2 instances match, order by Service.ID
|
||||
// Will allow result to be stable sorted and match queries without cache
|
||||
|
@ -240,3 +288,34 @@ func (s *healthView) Result(index uint64) (interface{}, error) {
|
|||
func (s *healthView) Reset() {
|
||||
s.state = make(map[string]structs.CheckServiceNode)
|
||||
}
|
||||
|
||||
// serviceTagEvaluator implements the filterEvaluator to perform filtering
|
||||
// by service tags. bexpr can not be used at this time, because the filtering
|
||||
// must be case insensitive for backwards compatibility. In the future this
|
||||
// may be replaced with bexpr once case insensitive support is added.
|
||||
type serviceTagEvaluator struct {
|
||||
tags []string
|
||||
}
|
||||
|
||||
func (m serviceTagEvaluator) Evaluate(data interface{}) (bool, error) {
|
||||
csn, ok := data.(structs.CheckServiceNode)
|
||||
if !ok {
|
||||
return false, fmt.Errorf("unexpected type %T for structs.CheckServiceNode filter", data)
|
||||
}
|
||||
for _, tag := range m.tags {
|
||||
if !serviceHasTag(csn.Service, tag) {
|
||||
// If any one of the expected tags was not found, filter the service
|
||||
return false, nil
|
||||
}
|
||||
}
|
||||
return true, nil
|
||||
}
|
||||
|
||||
func serviceHasTag(sn *structs.NodeService, tag string) bool {
|
||||
for _, t := range sn.Tags {
|
||||
if strings.EqualFold(t, tag) {
|
||||
return true
|
||||
}
|
||||
}
|
||||
return false
|
||||
}
|
||||
|
|
|
@ -568,3 +568,211 @@ func runStep(t *testing.T, name string, fn func(t *testing.T)) {
|
|||
t.FailNow()
|
||||
}
|
||||
}
|
||||
|
||||
func TestNewFilterEvaluator(t *testing.T) {
|
||||
type testCase struct {
|
||||
name string
|
||||
req structs.ServiceSpecificRequest
|
||||
data structs.CheckServiceNode
|
||||
expected bool
|
||||
}
|
||||
|
||||
fn := func(t *testing.T, tc testCase) {
|
||||
e, err := newFilterEvaluator(&tc.req)
|
||||
require.NoError(t, err)
|
||||
actual, err := e.Evaluate(tc.data)
|
||||
require.NoError(t, err)
|
||||
require.Equal(t, tc.expected, actual)
|
||||
}
|
||||
|
||||
var testCases = []testCase{
|
||||
{
|
||||
name: "single ServiceTags match",
|
||||
req: structs.ServiceSpecificRequest{
|
||||
ServiceTags: []string{"match"},
|
||||
TagFilter: true,
|
||||
},
|
||||
data: structs.CheckServiceNode{
|
||||
Service: &structs.NodeService{
|
||||
Tags: []string{"extra", "match"},
|
||||
},
|
||||
},
|
||||
expected: true,
|
||||
},
|
||||
{
|
||||
name: "single deprecated ServiceTag match",
|
||||
req: structs.ServiceSpecificRequest{
|
||||
ServiceTag: "match",
|
||||
TagFilter: true,
|
||||
},
|
||||
data: structs.CheckServiceNode{
|
||||
Service: &structs.NodeService{
|
||||
Tags: []string{"extra", "match"},
|
||||
},
|
||||
},
|
||||
expected: true,
|
||||
},
|
||||
{
|
||||
name: "single ServiceTags mismatch",
|
||||
req: structs.ServiceSpecificRequest{
|
||||
ServiceTags: []string{"other"},
|
||||
TagFilter: true,
|
||||
},
|
||||
data: structs.CheckServiceNode{
|
||||
Service: &structs.NodeService{
|
||||
Tags: []string{"extra", "match"},
|
||||
},
|
||||
},
|
||||
expected: false,
|
||||
},
|
||||
{
|
||||
name: "multiple ServiceTags match",
|
||||
req: structs.ServiceSpecificRequest{
|
||||
ServiceTags: []string{"match", "second"},
|
||||
TagFilter: true,
|
||||
},
|
||||
data: structs.CheckServiceNode{
|
||||
Service: &structs.NodeService{
|
||||
Tags: []string{"extra", "match", "second"},
|
||||
},
|
||||
},
|
||||
expected: true,
|
||||
},
|
||||
{
|
||||
name: "multiple ServiceTags mismatch",
|
||||
req: structs.ServiceSpecificRequest{
|
||||
ServiceTags: []string{"match", "not"},
|
||||
TagFilter: true,
|
||||
},
|
||||
data: structs.CheckServiceNode{
|
||||
Service: &structs.NodeService{
|
||||
Tags: []string{"extra", "match"},
|
||||
},
|
||||
},
|
||||
expected: false,
|
||||
},
|
||||
{
|
||||
name: "single NodeMetaFilter match",
|
||||
req: structs.ServiceSpecificRequest{
|
||||
NodeMetaFilters: map[string]string{"meta1": "match"},
|
||||
},
|
||||
data: structs.CheckServiceNode{
|
||||
Node: &structs.Node{
|
||||
Meta: map[string]string{
|
||||
"meta1": "match",
|
||||
"extra": "some",
|
||||
},
|
||||
},
|
||||
},
|
||||
expected: true,
|
||||
},
|
||||
{
|
||||
name: "single NodeMetaFilter mismatch",
|
||||
req: structs.ServiceSpecificRequest{
|
||||
NodeMetaFilters: map[string]string{
|
||||
"meta1": "match",
|
||||
},
|
||||
},
|
||||
data: structs.CheckServiceNode{
|
||||
Node: &structs.Node{
|
||||
Meta: map[string]string{
|
||||
"meta1": "other",
|
||||
"extra": "some",
|
||||
},
|
||||
},
|
||||
},
|
||||
expected: false,
|
||||
},
|
||||
{
|
||||
name: "multiple NodeMetaFilter match",
|
||||
req: structs.ServiceSpecificRequest{
|
||||
NodeMetaFilters: map[string]string{"meta1": "match", "meta2": "a"},
|
||||
},
|
||||
data: structs.CheckServiceNode{
|
||||
Node: &structs.Node{
|
||||
Meta: map[string]string{
|
||||
"meta1": "match",
|
||||
"meta2": "a",
|
||||
"extra": "some",
|
||||
},
|
||||
},
|
||||
},
|
||||
expected: true,
|
||||
},
|
||||
{
|
||||
name: "multiple NodeMetaFilter mismatch",
|
||||
req: structs.ServiceSpecificRequest{
|
||||
NodeMetaFilters: map[string]string{
|
||||
"meta1": "match",
|
||||
"meta2": "beta",
|
||||
},
|
||||
},
|
||||
data: structs.CheckServiceNode{
|
||||
Node: &structs.Node{
|
||||
Meta: map[string]string{
|
||||
"meta1": "other",
|
||||
"meta2": "gamma",
|
||||
},
|
||||
},
|
||||
},
|
||||
expected: false,
|
||||
},
|
||||
{
|
||||
name: "QueryOptions.Filter match",
|
||||
req: structs.ServiceSpecificRequest{
|
||||
QueryOptions: structs.QueryOptions{
|
||||
Filter: `Node.Node == "node3"`,
|
||||
},
|
||||
},
|
||||
data: structs.CheckServiceNode{
|
||||
Node: &structs.Node{Node: "node3"},
|
||||
},
|
||||
expected: true,
|
||||
},
|
||||
{
|
||||
name: "QueryOptions.Filter mismatch",
|
||||
req: structs.ServiceSpecificRequest{
|
||||
QueryOptions: structs.QueryOptions{
|
||||
Filter: `Node.Node == "node2"`,
|
||||
},
|
||||
},
|
||||
data: structs.CheckServiceNode{
|
||||
Node: &structs.Node{Node: "node3"},
|
||||
},
|
||||
expected: false,
|
||||
},
|
||||
{
|
||||
name: "all match",
|
||||
req: structs.ServiceSpecificRequest{
|
||||
QueryOptions: structs.QueryOptions{
|
||||
Filter: `Node.Node == "node3"`,
|
||||
},
|
||||
ServiceTags: []string{"tag1", "tag2"},
|
||||
NodeMetaFilters: map[string]string{
|
||||
"meta1": "match1",
|
||||
"meta2": "match2",
|
||||
},
|
||||
},
|
||||
data: structs.CheckServiceNode{
|
||||
Node: &structs.Node{
|
||||
Node: "node3",
|
||||
Meta: map[string]string{
|
||||
"meta1": "match1",
|
||||
"meta2": "match2",
|
||||
"extra": "other",
|
||||
},
|
||||
},
|
||||
Service: &structs.NodeService{
|
||||
Tags: []string{"tag1", "tag2", "extra"},
|
||||
},
|
||||
},
|
||||
expected: true,
|
||||
},
|
||||
}
|
||||
|
||||
for _, tc := range testCases {
|
||||
t.Run(tc.name, func(t *testing.T) {
|
||||
fn(t, tc)
|
||||
})
|
||||
}
|
||||
}
|
||||
|
|
|
@ -3100,7 +3100,7 @@ func TestDNS_CaseInsensitiveServiceLookup(t *testing.T) {
|
|||
}
|
||||
|
||||
if len(in.Answer) != 1 {
|
||||
t.Fatalf("empty lookup: %#v", in)
|
||||
t.Fatalf("question %v, empty lookup: %#v", question, in)
|
||||
}
|
||||
})
|
||||
}
|
||||
|
|
|
@ -2,7 +2,6 @@ package health
|
|||
|
||||
import (
|
||||
"context"
|
||||
"strings"
|
||||
|
||||
"github.com/hashicorp/consul/agent/cache"
|
||||
"github.com/hashicorp/consul/agent/structs"
|
||||
|
@ -70,66 +69,5 @@ func (c *Client) getServiceNodes(
|
|||
panic("wrong response type for cachetype.HealthServicesName")
|
||||
}
|
||||
|
||||
return filterTags(filterNodeMeta(value, req), req), md, nil
|
||||
}
|
||||
|
||||
func filterTags(out *structs.IndexedCheckServiceNodes, req structs.ServiceSpecificRequest) structs.IndexedCheckServiceNodes {
|
||||
if !req.TagFilter || len(req.ServiceTags) == 0 || len(out.Nodes) == 0 {
|
||||
return *out
|
||||
}
|
||||
tags := make([]string, 0, len(req.ServiceTags))
|
||||
for _, r := range req.ServiceTags {
|
||||
tags = append(tags, strings.ToLower(r))
|
||||
}
|
||||
results := make(structs.CheckServiceNodes, 0, len(out.Nodes))
|
||||
for _, service := range out.Nodes {
|
||||
svc := service.Service
|
||||
if !serviceTagsFilter(svc, tags) {
|
||||
results = append(results, service)
|
||||
}
|
||||
}
|
||||
out.Nodes = results
|
||||
return *out
|
||||
}
|
||||
|
||||
// serviceTagsFilter return true if service does not contains all the given tags
|
||||
func serviceTagsFilter(sn *structs.NodeService, tags []string) bool {
|
||||
for _, tag := range tags {
|
||||
if serviceTagFilter(sn, tag) {
|
||||
// If any one of the expected tags was not found, filter the service
|
||||
return true
|
||||
}
|
||||
}
|
||||
|
||||
// If all tags were found, don't filter the service
|
||||
return false
|
||||
}
|
||||
|
||||
// serviceTagFilter returns true (should filter) if the given service node
|
||||
// doesn't contain the given tag.
|
||||
func serviceTagFilter(sn *structs.NodeService, tag string) bool {
|
||||
// Look for the lower cased version of the tag.
|
||||
for _, t := range sn.Tags {
|
||||
if strings.ToLower(t) == tag {
|
||||
return false
|
||||
}
|
||||
}
|
||||
|
||||
// If we didn't hit the tag above then we should filter.
|
||||
return true
|
||||
}
|
||||
|
||||
func filterNodeMeta(out *structs.IndexedCheckServiceNodes, req structs.ServiceSpecificRequest) *structs.IndexedCheckServiceNodes {
|
||||
if len(req.NodeMetaFilters) == 0 || len(out.Nodes) == 0 {
|
||||
return out
|
||||
}
|
||||
results := make(structs.CheckServiceNodes, 0, len(out.Nodes))
|
||||
for _, service := range out.Nodes {
|
||||
serviceNode := service.Node
|
||||
if structs.SatisfiesMetaFilters(serviceNode.Meta, req.NodeMetaFilters) {
|
||||
results = append(results, service)
|
||||
}
|
||||
}
|
||||
out.Nodes = results
|
||||
return out
|
||||
return *value, md, nil
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue