streaming: fix enable of streaming in the client

And add checks to all the tests that explicitly use streaming.
This commit is contained in:
Daniel Nephin 2021-06-28 16:48:10 -04:00
parent 62beaa80f3
commit a4a390d7c5
6 changed files with 43 additions and 11 deletions

3
.changelog/10514.txt Normal file
View File

@ -0,0 +1,3 @@
```release-note:bug
streaming: fix a bug that was preventing streaming from being enabled.
```

View File

@ -394,6 +394,7 @@ func New(bd BaseDeps) (*Agent, error) {
Conn: conn, Conn: conn,
Logger: bd.Logger.Named("rpcclient.health"), Logger: bd.Logger.Named("rpcclient.health"),
}, },
UseStreamingBackend: a.config.UseStreamingBackend,
} }
a.serviceManager = NewServiceManager(&a) a.serviceManager = NewServiceManager(&a)

View File

@ -742,8 +742,13 @@ func TestHealthServiceNodes_Blocking(t *testing.T) {
name string name string
hcl string hcl string
grpcMetrics bool grpcMetrics bool
queryBackend string
}{ }{
{name: "no streaming"}, {
name: "no streaming",
queryBackend: "blocking-query",
hcl: `use_streaming_backend = false`,
},
{ {
name: "streaming", name: "streaming",
grpcMetrics: true, grpcMetrics: true,
@ -751,6 +756,7 @@ func TestHealthServiceNodes_Blocking(t *testing.T) {
rpc { enable_streaming = true } rpc { enable_streaming = true }
use_streaming_backend = true use_streaming_backend = true
`, `,
queryBackend: "streaming",
}, },
} }
@ -856,6 +862,8 @@ use_streaming_backend = true
require.True(t, idx < newIdx, "index should have increased."+ require.True(t, idx < newIdx, "index should have increased."+
"idx=%d, newIdx=%d", idx, newIdx) "idx=%d, newIdx=%d", idx, newIdx)
require.Equal(t, tc.queryBackend, resp.Header().Get("X-Consul-Query-Backend"))
idx = newIdx idx = newIdx
checkErrs() checkErrs()
@ -882,6 +890,7 @@ use_streaming_backend = true
newIdx := getIndex(t, resp) newIdx := getIndex(t, resp)
require.Equal(t, idx, newIdx) require.Equal(t, idx, newIdx)
require.Equal(t, tc.queryBackend, resp.Header().Get("X-Consul-Query-Backend"))
} }
if tc.grpcMetrics { if tc.grpcMetrics {
@ -907,14 +916,23 @@ func TestHealthServiceNodes_NodeMetaFilter(t *testing.T) {
tests := []struct { tests := []struct {
name string name string
config string config string
queryBackend string
}{ }{
{"normal", ""}, {
{"cache-with-streaming", ` name: "blocking-query",
config: `use_streaming_backend=false`,
queryBackend: "blocking-query",
},
{
name: "cache-with-streaming",
config: `
rpc{ rpc{
enable_streaming=true enable_streaming=true
} }
use_streaming_backend=true use_streaming_backend=true
`}, `,
queryBackend: "streaming",
},
} }
for _, tst := range tests { for _, tst := range tests {
t.Run(tst.name, func(t *testing.T) { t.Run(tst.name, func(t *testing.T) {
@ -986,6 +1004,8 @@ func TestHealthServiceNodes_NodeMetaFilter(t *testing.T) {
if len(nodes) != 1 || nodes[0].Checks == nil || len(nodes[0].Checks) != 0 { if len(nodes) != 1 || nodes[0].Checks == nil || len(nodes[0].Checks) != 0 {
t.Fatalf("bad: %v", obj) t.Fatalf("bad: %v", obj)
} }
require.Equal(t, tst.queryBackend, resp.Header().Get("X-Consul-Query-Backend"))
}) })
} }
} }
@ -1511,6 +1531,8 @@ func testHealthIngressServiceNodes(t *testing.T, agentHCL string) {
// Should be a cache miss // Should be a cache miss
require.Equal(t, "MISS", resp.Header().Get("X-Cache")) require.Equal(t, "MISS", resp.Header().Get("X-Cache"))
// always a blocking query, because the ingress endpoint does not yet support streaming.
require.Equal(t, "blocking-query", resp.Header().Get("X-Consul-Query-Backend"))
})) }))
require.True(t, t.Run("test caching hit", func(t *testing.T) { require.True(t, t.Run("test caching hit", func(t *testing.T) {
@ -1525,6 +1547,8 @@ func testHealthIngressServiceNodes(t *testing.T, agentHCL string) {
// Should be a cache HIT now! // Should be a cache HIT now!
require.Equal(t, "HIT", resp.Header().Get("X-Cache")) require.Equal(t, "HIT", resp.Header().Get("X-Cache"))
// always a blocking query, because the ingress endpoint does not yet support streaming.
require.Equal(t, "blocking-query", resp.Header().Get("X-Consul-Query-Backend"))
})) }))
} }

View File

@ -172,6 +172,7 @@ func (s *healthView) Result(index uint64) interface{} {
Nodes: make(structs.CheckServiceNodes, 0, len(s.state)), Nodes: make(structs.CheckServiceNodes, 0, len(s.state)),
QueryMeta: structs.QueryMeta{ QueryMeta: structs.QueryMeta{
Index: index, Index: index,
Backend: structs.QueryBackendStreaming,
}, },
} }
for _, node := range s.state { for _, node := range s.state {

View File

@ -102,6 +102,7 @@ func TestHealthView_IntegrationWithStore_WithEmptySnapshot(t *testing.T) {
Nodes: structs.CheckServiceNodes{}, Nodes: structs.CheckServiceNodes{},
QueryMeta: structs.QueryMeta{ QueryMeta: structs.QueryMeta{
Index: 1, Index: 1,
Backend: structs.QueryBackendStreaming,
}, },
} }
@ -381,6 +382,7 @@ func TestHealthView_IntegrationWithStore_WithFullSnapshot(t *testing.T) {
func newExpectedNodes(nodes ...string) *structs.IndexedCheckServiceNodes { func newExpectedNodes(nodes ...string) *structs.IndexedCheckServiceNodes {
result := &structs.IndexedCheckServiceNodes{} result := &structs.IndexedCheckServiceNodes{}
result.QueryMeta.Backend = structs.QueryBackendStreaming
for _, node := range nodes { for _, node := range nodes {
result.Nodes = append(result.Nodes, structs.CheckServiceNode{ result.Nodes = append(result.Nodes, structs.CheckServiceNode{
Node: &structs.Node{Node: node}, Node: &structs.Node{Node: node},

View File

@ -30,6 +30,7 @@ func testGRPCStreamingWorking(t *testing.T, config string) {
assertIndex(t, resp) assertIndex(t, resp)
require.NotEmpty(t, resp.Header().Get("X-Consul-Index")) require.NotEmpty(t, resp.Header().Get("X-Consul-Index"))
require.Equal(t, "streaming", resp.Header().Get("X-Consul-Query-Backend"))
} }
func TestGRPCWithTLSConfigs(t *testing.T) { func TestGRPCWithTLSConfigs(t *testing.T) {