diff --git a/.changelog/12640.txt b/.changelog/12640.txt new file mode 100644 index 000000000..cc326764a --- /dev/null +++ b/.changelog/12640.txt @@ -0,0 +1,3 @@ +```release-note:bug +health: ensure /v1/health/service/:service endpoint returns the most recent results when a filter is used with streaming #12640 +``` diff --git a/agent/consul/health_endpoint.go b/agent/consul/health_endpoint.go index 60dc968c5..5f38ba189 100644 --- a/agent/consul/health_endpoint.go +++ b/agent/consul/health_endpoint.go @@ -236,30 +236,38 @@ func (h *Health) ServiceNodes(args *structs.ServiceSpecificRequest, reply *struc &args.QueryOptions, &reply.QueryMeta, func(ws memdb.WatchSet, state *state.Store) error { + var thisReply structs.IndexedCheckServiceNodes + index, nodes, err := f(ws, state, args) if err != nil { return err } - reply.Index, reply.Nodes = index, nodes + thisReply.Index, thisReply.Nodes = index, nodes + if len(args.NodeMetaFilters) > 0 { - reply.Nodes = nodeMetaFilter(args.NodeMetaFilters, reply.Nodes) + thisReply.Nodes = nodeMetaFilter(args.NodeMetaFilters, thisReply.Nodes) } - raw, err := filter.Execute(reply.Nodes) + raw, err := filter.Execute(thisReply.Nodes) if err != nil { return err } - reply.Nodes = raw.(structs.CheckServiceNodes) + thisReply.Nodes = raw.(structs.CheckServiceNodes) // Note: we filter the results with ACLs *after* applying the user-supplied // bexpr filter, to ensure QueryMeta.ResultsFilteredByACLs does not include // results that would be filtered out even if the user did have permission. - if err := h.srv.filterACL(args.Token, reply); err != nil { + if err := h.srv.filterACL(args.Token, &thisReply); err != nil { return err } - return h.srv.sortNodesByDistanceFrom(args.Source, reply.Nodes) + if err := h.srv.sortNodesByDistanceFrom(args.Source, thisReply.Nodes); err != nil { + return err + } + + *reply = thisReply + return nil }) // Provide some metrics diff --git a/agent/consul/health_endpoint_test.go b/agent/consul/health_endpoint_test.go index c3ebab97c..fbd6e4bcb 100644 --- a/agent/consul/health_endpoint_test.go +++ b/agent/consul/health_endpoint_test.go @@ -663,6 +663,85 @@ func TestHealth_ServiceNodes(t *testing.T) { } } +func TestHealth_ServiceNodes_BlockingQuery_withFilter(t *testing.T) { + if testing.Short() { + t.Skip("too slow for testing.Short") + } + + t.Parallel() + + _, s1 := testServer(t) + codec := rpcClient(t, s1) + + waitForLeaderEstablishment(t, s1) + + register := func(t *testing.T, name, tag string) { + arg := structs.RegisterRequest{ + Datacenter: "dc1", + ID: types.NodeID("43d419c0-433b-42c3-bf8a-193eba0b41a3"), + Node: "node1", + Address: "127.0.0.1", + Service: &structs.NodeService{ + ID: name, + Service: name, + Tags: []string{tag}, + }, + } + var out struct{} + require.NoError(t, msgpackrpc.CallWithCodec(codec, "Catalog.Register", &arg, &out)) + } + + register(t, "web", "foo") + + var lastIndex uint64 + runStep(t, "read original", func(t *testing.T) { + var out structs.IndexedCheckServiceNodes + req := structs.ServiceSpecificRequest{ + Datacenter: "dc1", + ServiceName: "web", + QueryOptions: structs.QueryOptions{ + Filter: "foo in Service.Tags", + }, + } + require.NoError(t, msgpackrpc.CallWithCodec(codec, "Health.ServiceNodes", &req, &out)) + + require.Len(t, out.Nodes, 1) + node := out.Nodes[0] + require.Equal(t, "node1", node.Node.Node) + require.Equal(t, "web", node.Service.Service) + require.Equal(t, []string{"foo"}, node.Service.Tags) + + require.Equal(t, structs.QueryBackendBlocking, out.Backend) + lastIndex = out.Index + }) + + runStep(t, "read blocking query result", func(t *testing.T) { + req := structs.ServiceSpecificRequest{ + Datacenter: "dc1", + ServiceName: "web", + QueryOptions: structs.QueryOptions{ + Filter: "foo in Service.Tags", + }, + } + req.MinQueryIndex = lastIndex + + var out structs.IndexedCheckServiceNodes + errCh := channelCallRPC(s1, "Health.ServiceNodes", &req, &out, nil) + + time.Sleep(200 * time.Millisecond) + + // Change the tags + register(t, "web", "bar") + + if err := <-errCh; err != nil { + require.NoError(t, err) + } + + require.Equal(t, structs.QueryBackendBlocking, out.Backend) + require.Len(t, out.Nodes, 0) + }) +} + func TestHealth_ServiceNodes_MultipleServiceTags(t *testing.T) { if testing.Short() { t.Skip("too slow for testing.Short") diff --git a/agent/health_endpoint_test.go b/agent/health_endpoint_test.go index 8bf37835c..f29c2f4be 100644 --- a/agent/health_endpoint_test.go +++ b/agent/health_endpoint_test.go @@ -20,6 +20,7 @@ import ( "github.com/hashicorp/consul/api" "github.com/hashicorp/consul/sdk/testutil/retry" "github.com/hashicorp/consul/testrpc" + "github.com/hashicorp/consul/types" ) func TestHealthChecksInState(t *testing.T) { @@ -936,6 +937,135 @@ use_streaming_backend = true } } +func TestHealthServiceNodes_Blocking_withFilter(t *testing.T) { + cases := []struct { + name string + hcl string + queryBackend string + }{ + { + name: "no streaming", + queryBackend: "blocking-query", + hcl: `use_streaming_backend = false`, + }, + { + name: "streaming", + hcl: ` +rpc { enable_streaming = true } +use_streaming_backend = true +`, + queryBackend: "streaming", + }, + } + + runStep := func(t *testing.T, name string, fn func(t *testing.T)) { + t.Helper() + if !t.Run(name, fn) { + t.FailNow() + } + } + + register := func(t *testing.T, a *TestAgent, name, tag string) { + args := &structs.RegisterRequest{ + Datacenter: "dc1", + ID: types.NodeID("43d419c0-433b-42c3-bf8a-193eba0b41a3"), + Node: "node1", + Address: "127.0.0.1", + Service: &structs.NodeService{ + ID: name, + Service: name, + Tags: []string{tag}, + }, + } + + var out struct{} + require.NoError(t, a.RPC("Catalog.Register", args, &out)) + } + + for _, tc := range cases { + tc := tc + t.Run(tc.name, func(t *testing.T) { + a := NewTestAgent(t, tc.hcl) + defer a.Shutdown() + testrpc.WaitForTestAgent(t, a.RPC, "dc1") + + // Register one with a tag. + register(t, a, "web", "foo") + + filterUrlPart := "filter=" + url.QueryEscape("foo in Service.Tags") + + // TODO: use other call format + + // Initial request with a filter should return one. + var lastIndex uint64 + runStep(t, "read original", func(t *testing.T) { + req, err := http.NewRequest("GET", "/v1/health/service/web?dc=dc1&"+filterUrlPart, nil) + require.NoError(t, err) + + resp := httptest.NewRecorder() + obj, err := a.srv.HealthServiceNodes(resp, req) + require.NoError(t, err) + + nodes := obj.(structs.CheckServiceNodes) + + require.Len(t, nodes, 1) + + node := nodes[0] + require.Equal(t, "node1", node.Node.Node) + require.Equal(t, "web", node.Service.Service) + require.Equal(t, []string{"foo"}, node.Service.Tags) + + require.Equal(t, "blocking-query", resp.Header().Get("X-Consul-Query-Backend")) + + idx := getIndex(t, resp) + require.True(t, idx > 0) + + lastIndex = idx + }) + + const timeout = 30 * time.Second + runStep(t, "read blocking query result", func(t *testing.T) { + var ( + // out and resp are not safe to read until reading from errCh + out structs.CheckServiceNodes + resp = httptest.NewRecorder() + errCh = make(chan error, 1) + ) + go func() { + url := fmt.Sprintf("/v1/health/service/web?dc=dc1&index=%d&wait=%s&%s", lastIndex, timeout, filterUrlPart) + req, err := http.NewRequest("GET", url, nil) + if err != nil { + errCh <- err + return + } + + obj, err := a.srv.HealthServiceNodes(resp, req) + if err != nil { + errCh <- err + return + } + + nodes := obj.(structs.CheckServiceNodes) + out = nodes + errCh <- nil + }() + + time.Sleep(200 * time.Millisecond) + + // Change the tags. + register(t, a, "web", "bar") + + if err := <-errCh; err != nil { + require.NoError(t, err) + } + + require.Len(t, out, 0) + require.Equal(t, tc.queryBackend, resp.Header().Get("X-Consul-Query-Backend")) + }) + }) + } +} + func TestHealthServiceNodes_NodeMetaFilter(t *testing.T) { if testing.Short() { t.Skip("too slow for testing.Short") diff --git a/agent/rpcclient/health/view.go b/agent/rpcclient/health/view.go index 6343032d9..8c9e6d469 100644 --- a/agent/rpcclient/health/view.go +++ b/agent/rpcclient/health/view.go @@ -80,11 +80,12 @@ func (s *healthView) Update(events []*pbsubscribe.Event) error { return errors.New("check service node was unexpectedly nil") } passed, err := s.filter.Evaluate(*csn) - switch { - case err != nil: + if err != nil { return err - case passed: + } else if passed { s.state[id] = *csn + } else { + delete(s.state, id) } case pbsubscribe.CatalogOp_Deregister: diff --git a/agent/structs/structs.go b/agent/structs/structs.go index 88e5a9b21..d5462bd51 100644 --- a/agent/structs/structs.go +++ b/agent/structs/structs.go @@ -398,6 +398,8 @@ const ( QueryBackendStreaming ) +func (q QueryBackend) GoString() string { return q.String() } + func (q QueryBackend) String() string { switch q { case QueryBackendBlocking: