[Streaming] Properly filters node-meta queries on health
This wil fix https://github.com/hashicorp/consul/issues/9730
This commit is contained in:
parent
c466b08481
commit
cec640a1f2
|
@ -8,6 +8,7 @@ import (
|
||||||
"net/http/httptest"
|
"net/http/httptest"
|
||||||
"net/url"
|
"net/url"
|
||||||
"reflect"
|
"reflect"
|
||||||
|
"strconv"
|
||||||
"testing"
|
"testing"
|
||||||
|
|
||||||
"github.com/hashicorp/consul/agent/structs"
|
"github.com/hashicorp/consul/agent/structs"
|
||||||
|
@ -733,7 +734,23 @@ func TestHealthServiceNodes_NodeMetaFilter(t *testing.T) {
|
||||||
}
|
}
|
||||||
|
|
||||||
t.Parallel()
|
t.Parallel()
|
||||||
a := NewTestAgent(t, "")
|
|
||||||
|
tests := []struct {
|
||||||
|
name string
|
||||||
|
config string
|
||||||
|
}{
|
||||||
|
{"normal", ""},
|
||||||
|
{"cache-with-streaming", `
|
||||||
|
rpc{
|
||||||
|
enable_streaming=true
|
||||||
|
}
|
||||||
|
use_streaming_backend=true
|
||||||
|
`},
|
||||||
|
}
|
||||||
|
for _, tst := range tests {
|
||||||
|
t.Run(tst.name, func(t *testing.T) {
|
||||||
|
|
||||||
|
a := NewTestAgent(t, tst.config)
|
||||||
defer a.Shutdown()
|
defer a.Shutdown()
|
||||||
testrpc.WaitForLeader(t, a.RPC, "dc1")
|
testrpc.WaitForLeader(t, a.RPC, "dc1")
|
||||||
|
|
||||||
|
@ -746,6 +763,9 @@ func TestHealthServiceNodes_NodeMetaFilter(t *testing.T) {
|
||||||
|
|
||||||
assertIndex(t, resp)
|
assertIndex(t, resp)
|
||||||
|
|
||||||
|
cIndex, err := strconv.ParseUint(resp.Header().Get("X-Consul-Index"), 10, 64)
|
||||||
|
require.NoError(t, err)
|
||||||
|
|
||||||
// Should be a non-nil empty list
|
// Should be a non-nil empty list
|
||||||
nodes := obj.(structs.CheckServiceNodes)
|
nodes := obj.(structs.CheckServiceNodes)
|
||||||
if nodes == nil || len(nodes) != 0 {
|
if nodes == nil || len(nodes) != 0 {
|
||||||
|
@ -768,7 +788,22 @@ func TestHealthServiceNodes_NodeMetaFilter(t *testing.T) {
|
||||||
t.Fatalf("err: %v", err)
|
t.Fatalf("err: %v", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
req, _ = http.NewRequest("GET", "/v1/health/service/test?dc=dc1&node-meta=somekey:somevalue", nil)
|
args = &structs.RegisterRequest{
|
||||||
|
Datacenter: "dc1",
|
||||||
|
Node: "bar2",
|
||||||
|
Address: "127.0.0.1",
|
||||||
|
NodeMeta: map[string]string{"somekey": "othervalue"},
|
||||||
|
Service: &structs.NodeService{
|
||||||
|
ID: "test2",
|
||||||
|
Service: "test",
|
||||||
|
},
|
||||||
|
}
|
||||||
|
|
||||||
|
if err := a.RPC("Catalog.Register", args, &out); err != nil {
|
||||||
|
t.Fatalf("err: %v", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
req, _ = http.NewRequest("GET", fmt.Sprintf("/v1/health/service/test?dc=dc1&node-meta=somekey:somevalue&index=%d&wait=10ms", cIndex), nil)
|
||||||
resp = httptest.NewRecorder()
|
resp = httptest.NewRecorder()
|
||||||
obj, err = a.srv.HealthServiceNodes(resp, req)
|
obj, err = a.srv.HealthServiceNodes(resp, req)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
@ -782,6 +817,8 @@ func TestHealthServiceNodes_NodeMetaFilter(t *testing.T) {
|
||||||
if len(nodes) != 1 || nodes[0].Checks == nil || len(nodes[0].Checks) != 0 {
|
if len(nodes) != 1 || nodes[0].Checks == nil || len(nodes[0].Checks) != 0 {
|
||||||
t.Fatalf("bad: %v", obj)
|
t.Fatalf("bad: %v", obj)
|
||||||
}
|
}
|
||||||
|
})
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func TestHealthServiceNodes_Filter(t *testing.T) {
|
func TestHealthServiceNodes_Filter(t *testing.T) {
|
||||||
|
|
|
@ -71,7 +71,7 @@ func (c *Client) getServiceNodes(
|
||||||
panic("wrong response type for cachetype.HealthServicesName")
|
panic("wrong response type for cachetype.HealthServicesName")
|
||||||
}
|
}
|
||||||
|
|
||||||
return filterTags(value, req), md, nil
|
return filterTags(filterNodeMeta(value, req), req), md, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func filterTags(out *structs.IndexedCheckServiceNodes, req structs.ServiceSpecificRequest) structs.IndexedCheckServiceNodes {
|
func filterTags(out *structs.IndexedCheckServiceNodes, req structs.ServiceSpecificRequest) structs.IndexedCheckServiceNodes {
|
||||||
|
@ -126,3 +126,18 @@ func serviceTagFilter(sn *structs.NodeService, tag string) bool {
|
||||||
// If we didn't hit the tag above then we should filter.
|
// If we didn't hit the tag above then we should filter.
|
||||||
return true
|
return true
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func filterNodeMeta(out *structs.IndexedCheckServiceNodes, req structs.ServiceSpecificRequest) *structs.IndexedCheckServiceNodes {
|
||||||
|
if len(req.NodeMetaFilters) == 0 || len(out.Nodes) == 0 {
|
||||||
|
return out
|
||||||
|
}
|
||||||
|
results := make(structs.CheckServiceNodes, 0, len(out.Nodes))
|
||||||
|
for _, service := range out.Nodes {
|
||||||
|
serviceNode := service.Node
|
||||||
|
if structs.SatisfiesMetaFilters(serviceNode.Meta, req.NodeMetaFilters) {
|
||||||
|
results = append(results, service)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
out.Nodes = results
|
||||||
|
return out
|
||||||
|
}
|
||||||
|
|
Loading…
Reference in New Issue