Merge pull request #9703 from pierresouchay/streaming_tags_and_case_insensitive

Streaming filter tags + case insensitive lookups for Service Names
This commit is contained in:
Daniel Nephin 2021-02-26 12:06:26 -05:00 committed by GitHub
commit 4ef9578a07
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
9 changed files with 469 additions and 112 deletions

3
.changelog/9703.txt Normal file
View File

@ -0,0 +1,3 @@
```release-note:bug
streaming: lookup in health properly handle case-sensitivity and perform filtering based on tags and node-meta
```

View File

@ -5,6 +5,7 @@ import (
"fmt" "fmt"
"reflect" "reflect"
"sort" "sort"
"strings"
"time" "time"
"github.com/hashicorp/go-bexpr" "github.com/hashicorp/go-bexpr"
@ -82,7 +83,7 @@ func (c *StreamingHealthServices) Fetch(opts cache.FetchOptions, req cache.Reque
return req return req
} }
materializer, err := newMaterializer(c.deps, newReqFn, srvReq.Filter) materializer, err := newMaterializer(c.deps, newReqFn, srvReq)
if err != nil { if err != nil {
return cache.FetchResult{}, err return cache.FetchResult{}, err
} }
@ -100,9 +101,9 @@ func (c *StreamingHealthServices) Fetch(opts cache.FetchOptions, req cache.Reque
func newMaterializer( func newMaterializer(
deps MaterializerDeps, deps MaterializerDeps,
newRequestFn func(uint64) pbsubscribe.SubscribeRequest, newRequestFn func(uint64) pbsubscribe.SubscribeRequest,
filter string, req *structs.ServiceSpecificRequest,
) (*submatview.Materializer, error) { ) (*submatview.Materializer, error) {
view, err := newHealthView(filter) view, err := newHealthView(req)
if err != nil { if err != nil {
return nil, err return nil, err
} }
@ -139,8 +140,8 @@ func (s *streamingHealthState) Fetch(opts cache.FetchOptions) (cache.FetchResult
return result, err return result, err
} }
func newHealthView(filterExpr string) (*healthView, error) { func newHealthView(req *structs.ServiceSpecificRequest) (*healthView, error) {
fe, err := newFilterEvaluator(filterExpr) fe, err := newFilterEvaluator(req)
if err != nil { if err != nil {
return nil, err return nil, err
} }
@ -192,11 +193,44 @@ type filterEvaluator interface {
Evaluate(datum interface{}) (bool, error) Evaluate(datum interface{}) (bool, error)
} }
func newFilterEvaluator(expr string) (filterEvaluator, error) { func newFilterEvaluator(req *structs.ServiceSpecificRequest) (filterEvaluator, error) {
if expr == "" { var evaluators []filterEvaluator
return noopFilterEvaluator{}, nil
typ := reflect.TypeOf(structs.CheckServiceNode{})
if req.Filter != "" {
e, err := bexpr.CreateEvaluatorForType(req.Filter, nil, typ)
if err != nil {
return nil, err
}
evaluators = append(evaluators, e)
}
if req.ServiceTag != "" {
// Handle backwards compat with old field
req.ServiceTags = []string{req.ServiceTag}
}
if req.TagFilter && len(req.ServiceTags) > 0 {
evaluators = append(evaluators, serviceTagEvaluator{tags: req.ServiceTags})
}
for key, value := range req.NodeMetaFilters {
expr := fmt.Sprintf(`"%s" in Node.Meta.%s`, value, key)
e, err := bexpr.CreateEvaluatorForType(expr, nil, typ)
if err != nil {
return nil, err
}
evaluators = append(evaluators, e)
}
switch len(evaluators) {
case 0:
return noopFilterEvaluator{}, nil
case 1:
return evaluators[0], nil
default:
return &multiFilterEvaluator{evaluators: evaluators}, nil
} }
return bexpr.CreateEvaluatorForType(expr, nil, reflect.TypeOf(structs.CheckServiceNode{}))
} }
// noopFilterEvaluator may be used in place of a bexpr.Evaluator. The Evaluate // noopFilterEvaluator may be used in place of a bexpr.Evaluator. The Evaluate
@ -207,6 +241,20 @@ func (noopFilterEvaluator) Evaluate(_ interface{}) (bool, error) {
return true, nil return true, nil
} }
type multiFilterEvaluator struct {
evaluators []filterEvaluator
}
func (m multiFilterEvaluator) Evaluate(data interface{}) (bool, error) {
for _, e := range m.evaluators {
match, err := e.Evaluate(data)
if !match || err != nil {
return match, err
}
}
return true, nil
}
// sortCheckServiceNodes sorts the results to match memdb semantics // sortCheckServiceNodes sorts the results to match memdb semantics
// Sort results by Node.Node, if 2 instances match, order by Service.ID // Sort results by Node.Node, if 2 instances match, order by Service.ID
// Will allow result to be stable sorted and match queries without cache // Will allow result to be stable sorted and match queries without cache
@ -240,3 +288,34 @@ func (s *healthView) Result(index uint64) (interface{}, error) {
func (s *healthView) Reset() { func (s *healthView) Reset() {
s.state = make(map[string]structs.CheckServiceNode) s.state = make(map[string]structs.CheckServiceNode)
} }
// serviceTagEvaluator implements the filterEvaluator to perform filtering
// by service tags. bexpr can not be used at this time, because the filtering
// must be case insensitive for backwards compatibility. In the future this
// may be replaced with bexpr once case insensitive support is added.
type serviceTagEvaluator struct {
tags []string
}
func (m serviceTagEvaluator) Evaluate(data interface{}) (bool, error) {
csn, ok := data.(structs.CheckServiceNode)
if !ok {
return false, fmt.Errorf("unexpected type %T for structs.CheckServiceNode filter", data)
}
for _, tag := range m.tags {
if !serviceHasTag(csn.Service, tag) {
// If any one of the expected tags was not found, filter the service
return false, nil
}
}
return true, nil
}
func serviceHasTag(sn *structs.NodeService, tag string) bool {
for _, t := range sn.Tags {
if strings.EqualFold(t, tag) {
return true
}
}
return false
}

View File

@ -568,3 +568,211 @@ func runStep(t *testing.T, name string, fn func(t *testing.T)) {
t.FailNow() t.FailNow()
} }
} }
func TestNewFilterEvaluator(t *testing.T) {
type testCase struct {
name string
req structs.ServiceSpecificRequest
data structs.CheckServiceNode
expected bool
}
fn := func(t *testing.T, tc testCase) {
e, err := newFilterEvaluator(&tc.req)
require.NoError(t, err)
actual, err := e.Evaluate(tc.data)
require.NoError(t, err)
require.Equal(t, tc.expected, actual)
}
var testCases = []testCase{
{
name: "single ServiceTags match",
req: structs.ServiceSpecificRequest{
ServiceTags: []string{"match"},
TagFilter: true,
},
data: structs.CheckServiceNode{
Service: &structs.NodeService{
Tags: []string{"extra", "match"},
},
},
expected: true,
},
{
name: "single deprecated ServiceTag match",
req: structs.ServiceSpecificRequest{
ServiceTag: "match",
TagFilter: true,
},
data: structs.CheckServiceNode{
Service: &structs.NodeService{
Tags: []string{"extra", "match"},
},
},
expected: true,
},
{
name: "single ServiceTags mismatch",
req: structs.ServiceSpecificRequest{
ServiceTags: []string{"other"},
TagFilter: true,
},
data: structs.CheckServiceNode{
Service: &structs.NodeService{
Tags: []string{"extra", "match"},
},
},
expected: false,
},
{
name: "multiple ServiceTags match",
req: structs.ServiceSpecificRequest{
ServiceTags: []string{"match", "second"},
TagFilter: true,
},
data: structs.CheckServiceNode{
Service: &structs.NodeService{
Tags: []string{"extra", "match", "second"},
},
},
expected: true,
},
{
name: "multiple ServiceTags mismatch",
req: structs.ServiceSpecificRequest{
ServiceTags: []string{"match", "not"},
TagFilter: true,
},
data: structs.CheckServiceNode{
Service: &structs.NodeService{
Tags: []string{"extra", "match"},
},
},
expected: false,
},
{
name: "single NodeMetaFilter match",
req: structs.ServiceSpecificRequest{
NodeMetaFilters: map[string]string{"meta1": "match"},
},
data: structs.CheckServiceNode{
Node: &structs.Node{
Meta: map[string]string{
"meta1": "match",
"extra": "some",
},
},
},
expected: true,
},
{
name: "single NodeMetaFilter mismatch",
req: structs.ServiceSpecificRequest{
NodeMetaFilters: map[string]string{
"meta1": "match",
},
},
data: structs.CheckServiceNode{
Node: &structs.Node{
Meta: map[string]string{
"meta1": "other",
"extra": "some",
},
},
},
expected: false,
},
{
name: "multiple NodeMetaFilter match",
req: structs.ServiceSpecificRequest{
NodeMetaFilters: map[string]string{"meta1": "match", "meta2": "a"},
},
data: structs.CheckServiceNode{
Node: &structs.Node{
Meta: map[string]string{
"meta1": "match",
"meta2": "a",
"extra": "some",
},
},
},
expected: true,
},
{
name: "multiple NodeMetaFilter mismatch",
req: structs.ServiceSpecificRequest{
NodeMetaFilters: map[string]string{
"meta1": "match",
"meta2": "beta",
},
},
data: structs.CheckServiceNode{
Node: &structs.Node{
Meta: map[string]string{
"meta1": "other",
"meta2": "gamma",
},
},
},
expected: false,
},
{
name: "QueryOptions.Filter match",
req: structs.ServiceSpecificRequest{
QueryOptions: structs.QueryOptions{
Filter: `Node.Node == "node3"`,
},
},
data: structs.CheckServiceNode{
Node: &structs.Node{Node: "node3"},
},
expected: true,
},
{
name: "QueryOptions.Filter mismatch",
req: structs.ServiceSpecificRequest{
QueryOptions: structs.QueryOptions{
Filter: `Node.Node == "node2"`,
},
},
data: structs.CheckServiceNode{
Node: &structs.Node{Node: "node3"},
},
expected: false,
},
{
name: "all match",
req: structs.ServiceSpecificRequest{
QueryOptions: structs.QueryOptions{
Filter: `Node.Node == "node3"`,
},
ServiceTags: []string{"tag1", "tag2"},
NodeMetaFilters: map[string]string{
"meta1": "match1",
"meta2": "match2",
},
},
data: structs.CheckServiceNode{
Node: &structs.Node{
Node: "node3",
Meta: map[string]string{
"meta1": "match1",
"meta2": "match2",
"extra": "other",
},
},
Service: &structs.NodeService{
Tags: []string{"tag1", "tag2", "extra"},
},
},
expected: true,
},
}
for _, tc := range testCases {
t.Run(tc.name, func(t *testing.T) {
fn(t, tc)
})
}
}

View File

@ -1,6 +1,8 @@
package state package state
import ( import (
"strings"
memdb "github.com/hashicorp/go-memdb" memdb "github.com/hashicorp/go-memdb"
"github.com/hashicorp/consul/acl" "github.com/hashicorp/consul/acl"
@ -42,7 +44,7 @@ func (e EventPayloadCheckServiceNode) MatchesKey(key, namespace string) bool {
name = e.key name = e.key
} }
ns := e.Value.Service.EnterpriseMeta.GetNamespace() ns := e.Value.Service.EnterpriseMeta.GetNamespace()
return (key == "" || key == name) && (namespace == "" || namespace == ns) return (key == "" || strings.EqualFold(key, name)) && (namespace == "" || namespace == ns)
} }
// serviceHealthSnapshot returns a stream.SnapshotFunc that provides a snapshot // serviceHealthSnapshot returns a stream.SnapshotFunc that provides a snapshot

View File

@ -1171,12 +1171,16 @@ func (d *DNSServer) trimDNSResponse(cfg *dnsConfig, network string, req, resp *d
// lookupServiceNodes returns nodes with a given service. // lookupServiceNodes returns nodes with a given service.
func (d *DNSServer) lookupServiceNodes(cfg *dnsConfig, lookup serviceLookup) (structs.IndexedCheckServiceNodes, error) { func (d *DNSServer) lookupServiceNodes(cfg *dnsConfig, lookup serviceLookup) (structs.IndexedCheckServiceNodes, error) {
serviceTags := []string{}
if lookup.Tag != "" {
serviceTags = []string{lookup.Tag}
}
args := structs.ServiceSpecificRequest{ args := structs.ServiceSpecificRequest{
Connect: lookup.Connect, Connect: lookup.Connect,
Ingress: lookup.Ingress, Ingress: lookup.Ingress,
Datacenter: lookup.Datacenter, Datacenter: lookup.Datacenter,
ServiceName: lookup.Service, ServiceName: lookup.Service,
ServiceTags: []string{lookup.Tag}, ServiceTags: serviceTags,
TagFilter: lookup.Tag != "", TagFilter: lookup.Tag != "",
QueryOptions: structs.QueryOptions{ QueryOptions: structs.QueryOptions{
Token: d.agent.tokens.UserToken(), Token: d.agent.tokens.UserToken(),

View File

@ -3016,72 +3016,95 @@ func TestDNS_CaseInsensitiveServiceLookup(t *testing.T) {
} }
t.Parallel() t.Parallel()
a := NewTestAgent(t, "") tests := []struct {
defer a.Shutdown() name string
testrpc.WaitForLeader(t, a.RPC, "dc1") config string
}{
// Register a node with a service. // UDP + EDNS
{ {"normal", ""},
args := &structs.RegisterRequest{ {"cache", `dns_config{ allow_stale=true, max_stale="3h", use_cache=true, "cache_max_age"="3h"}`},
Datacenter: "dc1", {"cache-with-streaming", `
Node: "foo", rpc{
Address: "127.0.0.1", enable_streaming=true
Service: &structs.NodeService{ }
Service: "Db", use_streaming_backend=true
Tags: []string{"Primary"}, dns_config{ allow_stale=true, max_stale="3h", use_cache=true, "cache_max_age"="3h"}
Port: 12345, `},
},
}
var out struct{}
if err := a.RPC("Catalog.Register", args, &out); err != nil {
t.Fatalf("err: %v", err)
}
} }
for _, tst := range tests {
t.Run(fmt.Sprintf("A lookup %v", tst.name), func(t *testing.T) {
a := NewTestAgent(t, tst.config)
defer a.Shutdown()
testrpc.WaitForLeader(t, a.RPC, "dc1")
// Register an equivalent prepared query, as well as a name. // Register a node with a service.
var id string {
{ args := &structs.RegisterRequest{
args := &structs.PreparedQueryRequest{ Datacenter: "dc1",
Datacenter: "dc1", Node: "foo",
Op: structs.PreparedQueryCreate, Address: "127.0.0.1",
Query: &structs.PreparedQuery{ Service: &structs.NodeService{
Name: "somequery", Service: "Db",
Service: structs.ServiceQuery{ Tags: []string{"Primary"},
Service: "db", Port: 12345,
}, },
}, }
}
if err := a.RPC("PreparedQuery.Apply", args, &id); err != nil {
t.Fatalf("err: %v", err)
}
}
// Try some variations to make sure case doesn't matter. var out struct{}
questions := []string{ if err := a.RPC("Catalog.Register", args, &out); err != nil {
"primary.db.service.consul.", t.Fatalf("err: %v", err)
"pRIMARY.dB.service.consul.", }
"PRIMARY.dB.service.consul.", }
"db.service.consul.",
"DB.service.consul.",
"Db.service.consul.",
"somequery.query.consul.",
"SomeQuery.query.consul.",
"SOMEQUERY.query.consul.",
}
for _, question := range questions {
m := new(dns.Msg)
m.SetQuestion(question, dns.TypeSRV)
c := new(dns.Client) // Register an equivalent prepared query, as well as a name.
in, _, err := c.Exchange(m, a.DNSAddr()) var id string
if err != nil { {
t.Fatalf("err: %v", err) args := &structs.PreparedQueryRequest{
} Datacenter: "dc1",
Op: structs.PreparedQueryCreate,
Query: &structs.PreparedQuery{
Name: "somequery",
Service: structs.ServiceQuery{
Service: "db",
},
},
}
if err := a.RPC("PreparedQuery.Apply", args, &id); err != nil {
t.Fatalf("err: %v", err)
}
}
if len(in.Answer) != 1 { // Try some variations to make sure case doesn't matter.
t.Fatalf("empty lookup: %#v", in) questions := []string{
} "primary.Db.service.consul.",
"primary.db.service.consul.",
"pRIMARY.dB.service.consul.",
"PRIMARY.dB.service.consul.",
"db.service.consul.",
"DB.service.consul.",
"Db.service.consul.",
"somequery.query.consul.",
"SomeQuery.query.consul.",
"SOMEQUERY.query.consul.",
}
for _, question := range questions {
m := new(dns.Msg)
m.SetQuestion(question, dns.TypeSRV)
c := new(dns.Client)
retry.Run(t, func(r *retry.R) {
in, _, err := c.Exchange(m, a.DNSAddr())
if err != nil {
t.Fatalf("err: %v", err)
}
if len(in.Answer) != 1 {
t.Fatalf("question %v, empty lookup: %#v", question, in)
}
})
}
})
} }
} }

View File

@ -8,6 +8,7 @@ import (
"net/http/httptest" "net/http/httptest"
"net/url" "net/url"
"reflect" "reflect"
"strconv"
"testing" "testing"
"github.com/hashicorp/consul/agent/structs" "github.com/hashicorp/consul/agent/structs"
@ -733,54 +734,90 @@ func TestHealthServiceNodes_NodeMetaFilter(t *testing.T) {
} }
t.Parallel() t.Parallel()
a := NewTestAgent(t, "")
defer a.Shutdown()
testrpc.WaitForLeader(t, a.RPC, "dc1")
req, _ := http.NewRequest("GET", "/v1/health/service/consul?dc=dc1&node-meta=somekey:somevalue", nil) tests := []struct {
resp := httptest.NewRecorder() name string
obj, err := a.srv.HealthServiceNodes(resp, req) config string
if err != nil { }{
t.Fatalf("err: %v", err) {"normal", ""},
{"cache-with-streaming", `
rpc{
enable_streaming=true
}
use_streaming_backend=true
`},
} }
for _, tst := range tests {
t.Run(tst.name, func(t *testing.T) {
assertIndex(t, resp) a := NewTestAgent(t, tst.config)
defer a.Shutdown()
testrpc.WaitForLeader(t, a.RPC, "dc1")
// Should be a non-nil empty list req, _ := http.NewRequest("GET", "/v1/health/service/consul?dc=dc1&node-meta=somekey:somevalue", nil)
nodes := obj.(structs.CheckServiceNodes) resp := httptest.NewRecorder()
if nodes == nil || len(nodes) != 0 { obj, err := a.srv.HealthServiceNodes(resp, req)
t.Fatalf("bad: %v", obj) if err != nil {
} t.Fatalf("err: %v", err)
}
args := &structs.RegisterRequest{ assertIndex(t, resp)
Datacenter: "dc1",
Node: "bar",
Address: "127.0.0.1",
NodeMeta: map[string]string{"somekey": "somevalue"},
Service: &structs.NodeService{
ID: "test",
Service: "test",
},
}
var out struct{} cIndex, err := strconv.ParseUint(resp.Header().Get("X-Consul-Index"), 10, 64)
if err := a.RPC("Catalog.Register", args, &out); err != nil { require.NoError(t, err)
t.Fatalf("err: %v", err)
}
req, _ = http.NewRequest("GET", "/v1/health/service/test?dc=dc1&node-meta=somekey:somevalue", nil) // Should be a non-nil empty list
resp = httptest.NewRecorder() nodes := obj.(structs.CheckServiceNodes)
obj, err = a.srv.HealthServiceNodes(resp, req) if nodes == nil || len(nodes) != 0 {
if err != nil { t.Fatalf("bad: %v", obj)
t.Fatalf("err: %v", err) }
}
assertIndex(t, resp) args := &structs.RegisterRequest{
Datacenter: "dc1",
Node: "bar",
Address: "127.0.0.1",
NodeMeta: map[string]string{"somekey": "somevalue"},
Service: &structs.NodeService{
ID: "test",
Service: "test",
},
}
// Should be a non-nil empty list for checks var out struct{}
nodes = obj.(structs.CheckServiceNodes) if err := a.RPC("Catalog.Register", args, &out); err != nil {
if len(nodes) != 1 || nodes[0].Checks == nil || len(nodes[0].Checks) != 0 { t.Fatalf("err: %v", err)
t.Fatalf("bad: %v", obj) }
args = &structs.RegisterRequest{
Datacenter: "dc1",
Node: "bar2",
Address: "127.0.0.1",
NodeMeta: map[string]string{"somekey": "othervalue"},
Service: &structs.NodeService{
ID: "test2",
Service: "test",
},
}
if err := a.RPC("Catalog.Register", args, &out); err != nil {
t.Fatalf("err: %v", err)
}
req, _ = http.NewRequest("GET", fmt.Sprintf("/v1/health/service/test?dc=dc1&node-meta=somekey:somevalue&index=%d&wait=10ms", cIndex), nil)
resp = httptest.NewRecorder()
obj, err = a.srv.HealthServiceNodes(resp, req)
if err != nil {
t.Fatalf("err: %v", err)
}
assertIndex(t, resp)
// Should be a non-nil empty list for checks
nodes = obj.(structs.CheckServiceNodes)
if len(nodes) != 1 || nodes[0].Checks == nil || len(nodes[0].Checks) != 0 {
t.Fatalf("bad: %v", obj)
}
})
} }
} }

View File

@ -68,5 +68,6 @@ func (c *Client) getServiceNodes(
if !ok { if !ok {
panic("wrong response type for cachetype.HealthServicesName") panic("wrong response type for cachetype.HealthServicesName")
} }
return *value, md, nil return *value, md, nil
} }

View File

@ -602,7 +602,7 @@ func (r *ServiceSpecificRequest) CacheInfo() cache.RequestInfo {
sort.Strings(r.ServiceTags) sort.Strings(r.ServiceTags)
v, err := hashstructure.Hash([]interface{}{ v, err := hashstructure.Hash([]interface{}{
r.NodeMetaFilters, r.NodeMetaFilters,
r.ServiceName, strings.ToLower(r.ServiceName),
// DEPRECATED (singular-service-tag) - remove this when upgrade RPC compat // DEPRECATED (singular-service-tag) - remove this when upgrade RPC compat
// with 1.2.x is not required. We still need this in because <1.3 agents // with 1.2.x is not required. We still need this in because <1.3 agents
// might still send RPCs with singular tag set. In fact the only place we // might still send RPCs with singular tag set. In fact the only place we