From cec640a1f21fdcf91f37898a5009373cdbefb955 Mon Sep 17 00:00:00 2001 From: Pierre Souchay Date: Mon, 8 Feb 2021 17:53:18 +0100 Subject: [PATCH] [Streaming] Properly filters node-meta queries on health This wil fix https://github.com/hashicorp/consul/issues/9730 --- agent/health_endpoint_test.go | 115 ++++++++++++++++++++----------- agent/rpcclient/health/health.go | 17 ++++- 2 files changed, 92 insertions(+), 40 deletions(-) diff --git a/agent/health_endpoint_test.go b/agent/health_endpoint_test.go index 975c42d55..edf6187ec 100644 --- a/agent/health_endpoint_test.go +++ b/agent/health_endpoint_test.go @@ -8,6 +8,7 @@ import ( "net/http/httptest" "net/url" "reflect" + "strconv" "testing" "github.com/hashicorp/consul/agent/structs" @@ -733,54 +734,90 @@ func TestHealthServiceNodes_NodeMetaFilter(t *testing.T) { } t.Parallel() - a := NewTestAgent(t, "") - defer a.Shutdown() - testrpc.WaitForLeader(t, a.RPC, "dc1") - req, _ := http.NewRequest("GET", "/v1/health/service/consul?dc=dc1&node-meta=somekey:somevalue", nil) - resp := httptest.NewRecorder() - obj, err := a.srv.HealthServiceNodes(resp, req) - if err != nil { - t.Fatalf("err: %v", err) + tests := []struct { + name string + config string + }{ + {"normal", ""}, + {"cache-with-streaming", ` + rpc{ + enable_streaming=true + } + use_streaming_backend=true + `}, } + for _, tst := range tests { + t.Run(tst.name, func(t *testing.T) { - assertIndex(t, resp) + a := NewTestAgent(t, tst.config) + defer a.Shutdown() + testrpc.WaitForLeader(t, a.RPC, "dc1") - // Should be a non-nil empty list - nodes := obj.(structs.CheckServiceNodes) - if nodes == nil || len(nodes) != 0 { - t.Fatalf("bad: %v", obj) - } + req, _ := http.NewRequest("GET", "/v1/health/service/consul?dc=dc1&node-meta=somekey:somevalue", nil) + resp := httptest.NewRecorder() + obj, err := a.srv.HealthServiceNodes(resp, req) + if err != nil { + t.Fatalf("err: %v", err) + } - args := &structs.RegisterRequest{ - Datacenter: "dc1", - Node: "bar", - Address: "127.0.0.1", - NodeMeta: map[string]string{"somekey": "somevalue"}, - Service: &structs.NodeService{ - ID: "test", - Service: "test", - }, - } + assertIndex(t, resp) - var out struct{} - if err := a.RPC("Catalog.Register", args, &out); err != nil { - t.Fatalf("err: %v", err) - } + cIndex, err := strconv.ParseUint(resp.Header().Get("X-Consul-Index"), 10, 64) + require.NoError(t, err) - req, _ = http.NewRequest("GET", "/v1/health/service/test?dc=dc1&node-meta=somekey:somevalue", nil) - resp = httptest.NewRecorder() - obj, err = a.srv.HealthServiceNodes(resp, req) - if err != nil { - t.Fatalf("err: %v", err) - } + // Should be a non-nil empty list + nodes := obj.(structs.CheckServiceNodes) + if nodes == nil || len(nodes) != 0 { + t.Fatalf("bad: %v", obj) + } - assertIndex(t, resp) + args := &structs.RegisterRequest{ + Datacenter: "dc1", + Node: "bar", + Address: "127.0.0.1", + NodeMeta: map[string]string{"somekey": "somevalue"}, + Service: &structs.NodeService{ + ID: "test", + Service: "test", + }, + } - // Should be a non-nil empty list for checks - nodes = obj.(structs.CheckServiceNodes) - if len(nodes) != 1 || nodes[0].Checks == nil || len(nodes[0].Checks) != 0 { - t.Fatalf("bad: %v", obj) + var out struct{} + if err := a.RPC("Catalog.Register", args, &out); err != nil { + t.Fatalf("err: %v", err) + } + + args = &structs.RegisterRequest{ + Datacenter: "dc1", + Node: "bar2", + Address: "127.0.0.1", + NodeMeta: map[string]string{"somekey": "othervalue"}, + Service: &structs.NodeService{ + ID: "test2", + Service: "test", + }, + } + + if err := a.RPC("Catalog.Register", args, &out); err != nil { + t.Fatalf("err: %v", err) + } + + req, _ = http.NewRequest("GET", fmt.Sprintf("/v1/health/service/test?dc=dc1&node-meta=somekey:somevalue&index=%d&wait=10ms", cIndex), nil) + resp = httptest.NewRecorder() + obj, err = a.srv.HealthServiceNodes(resp, req) + if err != nil { + t.Fatalf("err: %v", err) + } + + assertIndex(t, resp) + + // Should be a non-nil empty list for checks + nodes = obj.(structs.CheckServiceNodes) + if len(nodes) != 1 || nodes[0].Checks == nil || len(nodes[0].Checks) != 0 { + t.Fatalf("bad: %v", obj) + } + }) } } diff --git a/agent/rpcclient/health/health.go b/agent/rpcclient/health/health.go index e2c6bd63e..c374bbb3f 100644 --- a/agent/rpcclient/health/health.go +++ b/agent/rpcclient/health/health.go @@ -71,7 +71,7 @@ func (c *Client) getServiceNodes( panic("wrong response type for cachetype.HealthServicesName") } - return filterTags(value, req), md, nil + return filterTags(filterNodeMeta(value, req), req), md, nil } func filterTags(out *structs.IndexedCheckServiceNodes, req structs.ServiceSpecificRequest) structs.IndexedCheckServiceNodes { @@ -126,3 +126,18 @@ func serviceTagFilter(sn *structs.NodeService, tag string) bool { // If we didn't hit the tag above then we should filter. return true } + +func filterNodeMeta(out *structs.IndexedCheckServiceNodes, req structs.ServiceSpecificRequest) *structs.IndexedCheckServiceNodes { + if len(req.NodeMetaFilters) == 0 || len(out.Nodes) == 0 { + return out + } + results := make(structs.CheckServiceNodes, 0, len(out.Nodes)) + for _, service := range out.Nodes { + serviceNode := service.Node + if structs.SatisfiesMetaFilters(serviceNode.Meta, req.NodeMetaFilters) { + results = append(results, service) + } + } + out.Nodes = results + return out +}