diff --git a/.changelog/10514.txt b/.changelog/10514.txt new file mode 100644 index 000000000..f427fdcc2 --- /dev/null +++ b/.changelog/10514.txt @@ -0,0 +1,3 @@ +```release-note:bug +streaming: fix a bug that was preventing streaming from being enabled. +``` diff --git a/agent/agent.go b/agent/agent.go index 14b474ef0..73d22894c 100644 --- a/agent/agent.go +++ b/agent/agent.go @@ -394,6 +394,7 @@ func New(bd BaseDeps) (*Agent, error) { Conn: conn, Logger: bd.Logger.Named("rpcclient.health"), }, + UseStreamingBackend: a.config.UseStreamingBackend, } a.serviceManager = NewServiceManager(&a) diff --git a/agent/health_endpoint_test.go b/agent/health_endpoint_test.go index b36c23fa4..075849eb8 100644 --- a/agent/health_endpoint_test.go +++ b/agent/health_endpoint_test.go @@ -739,11 +739,16 @@ func TestHealthServiceNodes(t *testing.T) { func TestHealthServiceNodes_Blocking(t *testing.T) { cases := []struct { - name string - hcl string - grpcMetrics bool + name string + hcl string + grpcMetrics bool + queryBackend string }{ - {name: "no streaming"}, + { + name: "no streaming", + queryBackend: "blocking-query", + hcl: `use_streaming_backend = false`, + }, { name: "streaming", grpcMetrics: true, @@ -751,6 +756,7 @@ func TestHealthServiceNodes_Blocking(t *testing.T) { rpc { enable_streaming = true } use_streaming_backend = true `, + queryBackend: "streaming", }, } @@ -856,6 +862,8 @@ use_streaming_backend = true require.True(t, idx < newIdx, "index should have increased."+ "idx=%d, newIdx=%d", idx, newIdx) + require.Equal(t, tc.queryBackend, resp.Header().Get("X-Consul-Query-Backend")) + idx = newIdx checkErrs() @@ -882,6 +890,7 @@ use_streaming_backend = true newIdx := getIndex(t, resp) require.Equal(t, idx, newIdx) + require.Equal(t, tc.queryBackend, resp.Header().Get("X-Consul-Query-Backend")) } if tc.grpcMetrics { @@ -905,16 +914,25 @@ func TestHealthServiceNodes_NodeMetaFilter(t *testing.T) { t.Parallel() tests := []struct { - name string - config string + name string + config string + queryBackend string }{ - {"normal", ""}, - {"cache-with-streaming", ` + { + name: "blocking-query", + config: `use_streaming_backend=false`, + queryBackend: "blocking-query", + }, + { + name: "cache-with-streaming", + config: ` rpc{ enable_streaming=true } use_streaming_backend=true - `}, + `, + queryBackend: "streaming", + }, } for _, tst := range tests { t.Run(tst.name, func(t *testing.T) { @@ -986,6 +1004,8 @@ func TestHealthServiceNodes_NodeMetaFilter(t *testing.T) { if len(nodes) != 1 || nodes[0].Checks == nil || len(nodes[0].Checks) != 0 { t.Fatalf("bad: %v", obj) } + + require.Equal(t, tst.queryBackend, resp.Header().Get("X-Consul-Query-Backend")) }) } } @@ -1511,6 +1531,8 @@ func testHealthIngressServiceNodes(t *testing.T, agentHCL string) { // Should be a cache miss require.Equal(t, "MISS", resp.Header().Get("X-Cache")) + // always a blocking query, because the ingress endpoint does not yet support streaming. + require.Equal(t, "blocking-query", resp.Header().Get("X-Consul-Query-Backend")) })) require.True(t, t.Run("test caching hit", func(t *testing.T) { @@ -1525,6 +1547,8 @@ func testHealthIngressServiceNodes(t *testing.T, agentHCL string) { // Should be a cache HIT now! require.Equal(t, "HIT", resp.Header().Get("X-Cache")) + // always a blocking query, because the ingress endpoint does not yet support streaming. + require.Equal(t, "blocking-query", resp.Header().Get("X-Consul-Query-Backend")) })) } diff --git a/agent/rpcclient/health/view.go b/agent/rpcclient/health/view.go index a648686c4..2f31dde21 100644 --- a/agent/rpcclient/health/view.go +++ b/agent/rpcclient/health/view.go @@ -171,7 +171,8 @@ func (s *healthView) Result(index uint64) interface{} { result := structs.IndexedCheckServiceNodes{ Nodes: make(structs.CheckServiceNodes, 0, len(s.state)), QueryMeta: structs.QueryMeta{ - Index: index, + Index: index, + Backend: structs.QueryBackendStreaming, }, } for _, node := range s.state { diff --git a/agent/rpcclient/health/view_test.go b/agent/rpcclient/health/view_test.go index bdc59ad52..beba03d88 100644 --- a/agent/rpcclient/health/view_test.go +++ b/agent/rpcclient/health/view_test.go @@ -101,7 +101,8 @@ func TestHealthView_IntegrationWithStore_WithEmptySnapshot(t *testing.T) { empty := &structs.IndexedCheckServiceNodes{ Nodes: structs.CheckServiceNodes{}, QueryMeta: structs.QueryMeta{ - Index: 1, + Index: 1, + Backend: structs.QueryBackendStreaming, }, } @@ -381,6 +382,7 @@ func TestHealthView_IntegrationWithStore_WithFullSnapshot(t *testing.T) { func newExpectedNodes(nodes ...string) *structs.IndexedCheckServiceNodes { result := &structs.IndexedCheckServiceNodes{} + result.QueryMeta.Backend = structs.QueryBackendStreaming for _, node := range nodes { result.Nodes = append(result.Nodes, structs.CheckServiceNode{ Node: &structs.Node{Node: node}, diff --git a/agent/streaming_test.go b/agent/streaming_test.go index 0f45ad9ed..5fa4dd4c0 100644 --- a/agent/streaming_test.go +++ b/agent/streaming_test.go @@ -30,6 +30,7 @@ func testGRPCStreamingWorking(t *testing.T, config string) { assertIndex(t, resp) require.NotEmpty(t, resp.Header().Get("X-Consul-Index")) + require.Equal(t, "streaming", resp.Header().Get("X-Consul-Query-Backend")) } func TestGRPCWithTLSConfigs(t *testing.T) {