streaming: set a default timeout

The blocking query backend sets the default value on the server side.
The streaming backend does not using blocking queries, so we must set the timeout on
the client.
This commit is contained in:
Daniel Nephin 2021-07-27 17:55:00 -04:00
parent cfc829275c
commit 057e8320f9
4 changed files with 45 additions and 0 deletions

View File

@ -395,6 +395,7 @@ func New(bd BaseDeps) (*Agent, error) {
Logger: bd.Logger.Named("rpcclient.health"), Logger: bd.Logger.Named("rpcclient.health"),
}, },
UseStreamingBackend: a.config.UseStreamingBackend, UseStreamingBackend: a.config.UseStreamingBackend,
QueryOptionDefaults: config.ApplyDefaultQueryOptions(a.config),
} }
a.serviceManager = NewServiceManager(&a) a.serviceManager = NewServiceManager(&a)

View File

@ -1930,3 +1930,16 @@ func isFloat(t reflect.Type) bool { return t.Kind() == reflect.Float32 || t.Kind
func isComplex(t reflect.Type) bool { func isComplex(t reflect.Type) bool {
return t.Kind() == reflect.Complex64 || t.Kind() == reflect.Complex128 return t.Kind() == reflect.Complex64 || t.Kind() == reflect.Complex128
} }
// ApplyDefaultQueryOptions returns a function which will set default values on
// the options based on the configuration. The RuntimeConfig must not be nil.
func ApplyDefaultQueryOptions(config *RuntimeConfig) func(options *structs.QueryOptions) {
return func(options *structs.QueryOptions) {
switch {
case options.MaxQueryTime > config.MaxQueryTime:
options.MaxQueryTime = config.MaxQueryTime
case options.MaxQueryTime == 0:
options.MaxQueryTime = config.DefaultQueryTime
}
}
}

View File

@ -17,6 +17,7 @@ type Client struct {
MaterializerDeps MaterializerDeps MaterializerDeps MaterializerDeps
CacheName string CacheName string
UseStreamingBackend bool UseStreamingBackend bool
QueryOptionDefaults func(options *structs.QueryOptions)
} }
type NetRPC interface { type NetRPC interface {
@ -38,6 +39,8 @@ func (c *Client) ServiceNodes(
req structs.ServiceSpecificRequest, req structs.ServiceSpecificRequest,
) (structs.IndexedCheckServiceNodes, cache.ResultMeta, error) { ) (structs.IndexedCheckServiceNodes, cache.ResultMeta, error) {
if c.useStreaming(req) && (req.QueryOptions.UseCache || req.QueryOptions.MinQueryIndex > 0) { if c.useStreaming(req) && (req.QueryOptions.UseCache || req.QueryOptions.MinQueryIndex > 0) {
c.QueryOptionDefaults(&req.QueryOptions)
result, err := c.ViewStore.Get(ctx, c.newServiceRequest(req)) result, err := c.ViewStore.Get(ctx, c.newServiceRequest(req))
if err != nil { if err != nil {
return structs.IndexedCheckServiceNodes{}, cache.ResultMeta{}, err return structs.IndexedCheckServiceNodes{}, cache.ResultMeta{}, err

View File

@ -3,10 +3,12 @@ package health
import ( import (
"context" "context"
"testing" "testing"
"time"
"github.com/stretchr/testify/require" "github.com/stretchr/testify/require"
"github.com/hashicorp/consul/agent/cache" "github.com/hashicorp/consul/agent/cache"
"github.com/hashicorp/consul/agent/config"
"github.com/hashicorp/consul/agent/structs" "github.com/hashicorp/consul/agent/structs"
"github.com/hashicorp/consul/agent/submatview" "github.com/hashicorp/consul/agent/submatview"
) )
@ -25,6 +27,7 @@ func TestClient_ServiceNodes_BackendRouting(t *testing.T) {
ViewStore: &fakeViewStore{}, ViewStore: &fakeViewStore{},
CacheName: "cache-no-streaming", CacheName: "cache-no-streaming",
UseStreamingBackend: true, UseStreamingBackend: true,
QueryOptionDefaults: config.ApplyDefaultQueryOptions(&config.RuntimeConfig{}),
} }
_, _, err := c.ServiceNodes(context.Background(), tc.req) _, _, err := c.ServiceNodes(context.Background(), tc.req)
@ -233,3 +236,28 @@ func TestClient_Notify_BackendRouting(t *testing.T) {
}) })
} }
} }
func TestClient_ServiceNodes_SetsDefaults(t *testing.T) {
store := &fakeViewStore{}
c := &Client{
ViewStore: store,
CacheName: "cache-no-streaming",
UseStreamingBackend: true,
QueryOptionDefaults: config.ApplyDefaultQueryOptions(&config.RuntimeConfig{
MaxQueryTime: 200 * time.Second,
DefaultQueryTime: 100 * time.Second,
}),
}
req := structs.ServiceSpecificRequest{
Datacenter: "dc1",
ServiceName: "web1",
QueryOptions: structs.QueryOptions{MinQueryIndex: 22},
}
_, _, err := c.ServiceNodes(context.Background(), req)
require.NoError(t, err)
require.Len(t, store.calls, 1)
require.Equal(t, 100*time.Second, store.calls[0].CacheInfo().Timeout)
}