Merge pull request #10707 from hashicorp/dnephin/streaming-setup-default-timeout

streaming: set default query timeout
This commit is contained in:
Daniel Nephin 2021-07-28 18:29:28 -04:00 committed by GitHub
commit f2f5aba1bf
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
6 changed files with 65 additions and 6 deletions

6
.changelog/10707.txt Normal file
View File

@ -0,0 +1,6 @@
```release-note:bug
streaming: set the default wait timeout for health queries
```
```release-note:bug
http: log cancelled requests as such at the INFO level, instead of logging them as errored requests.
```

View File

@ -395,6 +395,7 @@ func New(bd BaseDeps) (*Agent, error) {
Logger: bd.Logger.Named("rpcclient.health"),
},
UseStreamingBackend: a.config.UseStreamingBackend,
QueryOptionDefaults: config.ApplyDefaultQueryOptions(a.config),
}
a.serviceManager = NewServiceManager(&a)

View File

@ -1930,3 +1930,16 @@ func isFloat(t reflect.Type) bool { return t.Kind() == reflect.Float32 || t.Kind
func isComplex(t reflect.Type) bool {
return t.Kind() == reflect.Complex64 || t.Kind() == reflect.Complex128
}
// ApplyDefaultQueryOptions returns a function which will set default values on
// the options based on the configuration. The RuntimeConfig must not be nil.
func ApplyDefaultQueryOptions(config *RuntimeConfig) func(options *structs.QueryOptions) {
return func(options *structs.QueryOptions) {
switch {
case options.MaxQueryTime > config.MaxQueryTime:
options.MaxQueryTime = config.MaxQueryTime
case options.MaxQueryTime == 0:
options.MaxQueryTime = config.DefaultQueryTime
}
}
}

View File

@ -432,12 +432,20 @@ func (s *HTTPHandlers) wrap(handler endpoint, methods []string) http.HandlerFunc
}
handleErr := func(err error) {
if req.Context().Err() != nil {
httpLogger.Info("Request cancelled",
"method", req.Method,
"url", logURL,
"from", req.RemoteAddr,
"error", err)
} else {
httpLogger.Error("Request error",
"method", req.Method,
"url", logURL,
"from", req.RemoteAddr,
"error", err,
)
"error", err)
}
switch {
case isForbidden(err):
resp.WriteHeader(http.StatusForbidden)

View File

@ -17,6 +17,7 @@ type Client struct {
MaterializerDeps MaterializerDeps
CacheName string
UseStreamingBackend bool
QueryOptionDefaults func(options *structs.QueryOptions)
}
type NetRPC interface {
@ -38,6 +39,8 @@ func (c *Client) ServiceNodes(
req structs.ServiceSpecificRequest,
) (structs.IndexedCheckServiceNodes, cache.ResultMeta, error) {
if c.useStreaming(req) && (req.QueryOptions.UseCache || req.QueryOptions.MinQueryIndex > 0) {
c.QueryOptionDefaults(&req.QueryOptions)
result, err := c.ViewStore.Get(ctx, c.newServiceRequest(req))
if err != nil {
return structs.IndexedCheckServiceNodes{}, cache.ResultMeta{}, err

View File

@ -3,10 +3,12 @@ package health
import (
"context"
"testing"
"time"
"github.com/stretchr/testify/require"
"github.com/hashicorp/consul/agent/cache"
"github.com/hashicorp/consul/agent/config"
"github.com/hashicorp/consul/agent/structs"
"github.com/hashicorp/consul/agent/submatview"
)
@ -25,6 +27,7 @@ func TestClient_ServiceNodes_BackendRouting(t *testing.T) {
ViewStore: &fakeViewStore{},
CacheName: "cache-no-streaming",
UseStreamingBackend: true,
QueryOptionDefaults: config.ApplyDefaultQueryOptions(&config.RuntimeConfig{}),
}
_, _, err := c.ServiceNodes(context.Background(), tc.req)
@ -233,3 +236,28 @@ func TestClient_Notify_BackendRouting(t *testing.T) {
})
}
}
func TestClient_ServiceNodes_SetsDefaults(t *testing.T) {
store := &fakeViewStore{}
c := &Client{
ViewStore: store,
CacheName: "cache-no-streaming",
UseStreamingBackend: true,
QueryOptionDefaults: config.ApplyDefaultQueryOptions(&config.RuntimeConfig{
MaxQueryTime: 200 * time.Second,
DefaultQueryTime: 100 * time.Second,
}),
}
req := structs.ServiceSpecificRequest{
Datacenter: "dc1",
ServiceName: "web1",
QueryOptions: structs.QueryOptions{MinQueryIndex: 22},
}
_, _, err := c.ServiceNodes(context.Background(), req)
require.NoError(t, err)
require.Len(t, store.calls, 1)
require.Equal(t, 100*time.Second, store.calls[0].CacheInfo().Timeout)
}