Use rpcHoldTimeout to calculate blocking timeout (#15541)
Adds buffer to clients so that servers have time to respond to blocking queries.
This commit is contained in:
parent
650d4b45fb
commit
d146a3d542
|
@ -0,0 +1,3 @@
|
|||
```release-note:bug
|
||||
agent: Fixed issue where blocking queries with short waits could timeout on the client
|
||||
```
|
|
@ -532,6 +532,7 @@ func newDefaultDeps(t *testing.T, c *Config) Deps {
|
|||
Datacenter: c.Datacenter,
|
||||
DefaultQueryTime: c.DefaultQueryTime,
|
||||
MaxQueryTime: c.MaxQueryTime,
|
||||
RPCHoldTimeout: c.RPCHoldTimeout,
|
||||
}
|
||||
connPool.SetRPCClientTimeout(c.RPCClientTimeout)
|
||||
return Deps{
|
||||
|
@ -881,8 +882,9 @@ func TestClient_RPC_Timeout(t *testing.T) {
|
|||
c.Datacenter = "dc1"
|
||||
c.NodeName = uniqueNodeName(t.Name())
|
||||
c.RPCClientTimeout = 10 * time.Millisecond
|
||||
c.DefaultQueryTime = 100 * time.Millisecond
|
||||
c.DefaultQueryTime = 150 * time.Millisecond
|
||||
c.MaxQueryTime = 200 * time.Millisecond
|
||||
c.RPCHoldTimeout = 50 * time.Millisecond
|
||||
})
|
||||
joinLAN(t, c1, s1)
|
||||
|
||||
|
@ -897,8 +899,8 @@ func TestClient_RPC_Timeout(t *testing.T) {
|
|||
require.NoError(t, s1.RegisterEndpoint("Short", &waiter{duration: 5 * time.Millisecond}))
|
||||
|
||||
t.Run("non-blocking query times out after RPCClientTimeout", func(t *testing.T) {
|
||||
// Requests with QueryOptions have a default timeout of RPCClientTimeout (10ms)
|
||||
// so we expect the RPC call to timeout.
|
||||
// Requests with QueryOptions have a default timeout of
|
||||
// RPCClientTimeout (10ms) so we expect the RPC call to timeout.
|
||||
var out struct{}
|
||||
err := c1.RPC("Long.Wait", &structs.NodeSpecificRequest{}, &out)
|
||||
require.Error(t, err)
|
||||
|
@ -931,8 +933,30 @@ func TestClient_RPC_Timeout(t *testing.T) {
|
|||
}, &out))
|
||||
})
|
||||
|
||||
t.Run("blocking query with short MaxQueryTime fails", func(t *testing.T) {
|
||||
t.Run("blocking query with MaxQueryTime succeeds", func(t *testing.T) {
|
||||
var out struct{}
|
||||
// Although we set MaxQueryTime to 100ms, the client is adding maximum
|
||||
// jitter (100ms / 16 = 6.25ms) as well as RPCHoldTimeout (50ms).
|
||||
// Client waits 156.25ms while the server waits 106.25ms (artifically
|
||||
// adds maximum jitter) so the server will always return first.
|
||||
require.NoError(t, c1.RPC("Long.Wait", &structs.NodeSpecificRequest{
|
||||
QueryOptions: structs.QueryOptions{
|
||||
MinQueryIndex: 1,
|
||||
MaxQueryTime: 100 * time.Millisecond,
|
||||
},
|
||||
}, &out))
|
||||
})
|
||||
|
||||
// This following scenario should not occur in practice since the server
|
||||
// should be aware of RPC timeouts and always return blocking queries before
|
||||
// the client closes the connection. But this is just a hypothetical case
|
||||
// to show waiter can fail since it does not consider QueryOptions.
|
||||
t.Run("blocking query with low MaxQueryTime fails", func(t *testing.T) {
|
||||
var out struct{}
|
||||
// Although we set MaxQueryTime to 20ms, the client is adding maximum
|
||||
// jitter (20ms / 16 = 1.25ms) as well as RPCHoldTimeout (50ms).
|
||||
// Client waits 71.25ms while the server waits 106.25ms (artifically
|
||||
// adds maximum jitter) so the client will error first.
|
||||
err := c1.RPC("Long.Wait", &structs.NodeSpecificRequest{
|
||||
QueryOptions: structs.QueryOptions{
|
||||
MinQueryIndex: 1,
|
||||
|
@ -940,6 +964,6 @@ func TestClient_RPC_Timeout(t *testing.T) {
|
|||
},
|
||||
}, &out)
|
||||
require.Error(t, err)
|
||||
require.Contains(t, err.Error(), "rpc error making call: i/o deadline reached")
|
||||
require.ErrorContains(t, err, "rpc error making call: i/o deadline reached")
|
||||
})
|
||||
}
|
||||
|
|
|
@ -140,6 +140,9 @@ type ConnPool struct {
|
|||
// TODO: consider refactoring to accept a full yamux.Config instead of a logger
|
||||
Logger *log.Logger
|
||||
|
||||
// RPCHoldTimeout is used as a buffer when calculating timeouts to
|
||||
// allow for leader rotation.
|
||||
RPCHoldTimeout time.Duration
|
||||
// MaxQueryTime is used for calculating timeouts on blocking queries.
|
||||
MaxQueryTime time.Duration
|
||||
// DefaultQueryTime is used for calculating timeouts on blocking queries.
|
||||
|
@ -630,8 +633,9 @@ func (p *ConnPool) rpc(dc string, nodeName string, addr net.Addr, method string,
|
|||
if bq, ok := args.(BlockableQuery); ok {
|
||||
blockingTimeout := bq.BlockingTimeout(p.MaxQueryTime, p.DefaultQueryTime)
|
||||
if blockingTimeout > 0 {
|
||||
// override the default client timeout
|
||||
timeout = blockingTimeout
|
||||
// Override the default client timeout but add RPCHoldTimeout
|
||||
// as a buffer for retries during leadership changes.
|
||||
timeout = blockingTimeout + p.RPCHoldTimeout
|
||||
}
|
||||
}
|
||||
if timeout > 0 {
|
||||
|
|
|
@ -186,6 +186,7 @@ func newConnPool(config *config.RuntimeConfig, logger hclog.Logger, tls *tlsutil
|
|||
Logger: logger.StandardLogger(&hclog.StandardLoggerOptions{InferLevels: true}),
|
||||
TLSConfigurator: tls,
|
||||
Datacenter: config.Datacenter,
|
||||
RPCHoldTimeout: config.RPCHoldTimeout,
|
||||
MaxQueryTime: config.MaxQueryTime,
|
||||
DefaultQueryTime: config.DefaultQueryTime,
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue