Add timeout to Client RPC calls (#11500)
Adds a timeout (deadline) to client RPC calls, so that streams will no longer hang indefinitely in unstable network conditions. Co-authored-by: kisunji <ckim@hashicorp.com>
This commit is contained in:
parent
2970c619d2
commit
45ffdc360e
4
.changelog/11500.txt
Normal file
4
.changelog/11500.txt
Normal file
|
@ -0,0 +1,4 @@
|
|||
```release-note:bugfix
|
||||
rpc: Adds a deadline to client RPC calls, so that streams will no longer hang
|
||||
indefinitely in unstable network conditions. [[GH-8504](https://github.com/hashicorp/consul/issues/8504)]
|
||||
```
|
|
@ -1650,6 +1650,7 @@ func TestCatalog_ListServices_Stale(t *testing.T) {
|
|||
c.PrimaryDatacenter = "dc1" // Enable ACLs!
|
||||
c.ACLsEnabled = true
|
||||
c.Bootstrap = false // Disable bootstrap
|
||||
c.RPCHoldTimeout = 10 * time.Millisecond
|
||||
})
|
||||
defer os.RemoveAll(dir2)
|
||||
defer s2.Shutdown()
|
||||
|
|
|
@ -291,20 +291,26 @@ TRY:
|
|||
}
|
||||
|
||||
// Move off to another server, and see if we can retry.
|
||||
c.logger.Error("RPC failed to server",
|
||||
"method", method,
|
||||
"server", server.Addr,
|
||||
"error", rpcErr,
|
||||
)
|
||||
metrics.IncrCounterWithLabels([]string{"client", "rpc", "failed"}, 1, []metrics.Label{{Name: "server", Value: server.Name}})
|
||||
manager.NotifyFailedServer(server)
|
||||
|
||||
// Use the zero value for RPCInfo if the request doesn't implement RPCInfo
|
||||
info, _ := args.(structs.RPCInfo)
|
||||
if retry := canRetry(info, rpcErr, firstCheck, c.config); !retry {
|
||||
c.logger.Error("RPC failed to server",
|
||||
"method", method,
|
||||
"server", server.Addr,
|
||||
"error", rpcErr,
|
||||
)
|
||||
metrics.IncrCounterWithLabels([]string{"client", "rpc", "failed"}, 1, []metrics.Label{{Name: "server", Value: server.Name}})
|
||||
return rpcErr
|
||||
}
|
||||
|
||||
c.logger.Warn("Retrying RPC to server",
|
||||
"method", method,
|
||||
"server", server.Addr,
|
||||
"error", rpcErr,
|
||||
)
|
||||
|
||||
// We can wait a bit and retry!
|
||||
jitter := lib.RandomStagger(c.config.RPCHoldTimeout / structs.JitterFraction)
|
||||
select {
|
||||
|
|
|
@ -48,6 +48,7 @@ func testClientConfig(t *testing.T) (string, *Config) {
|
|||
config.SerfLANConfig.MemberlistConfig.ProbeTimeout = 200 * time.Millisecond
|
||||
config.SerfLANConfig.MemberlistConfig.ProbeInterval = time.Second
|
||||
config.SerfLANConfig.MemberlistConfig.GossipInterval = 100 * time.Millisecond
|
||||
config.RPCHoldTimeout = 10 * time.Second
|
||||
return dir, config
|
||||
}
|
||||
|
||||
|
@ -72,7 +73,7 @@ func testClientWithConfigWithErr(t *testing.T, cb func(c *Config)) (string, *Cli
|
|||
}
|
||||
|
||||
// Apply config to copied fields because many tests only set the old
|
||||
//values.
|
||||
// values.
|
||||
config.ACLResolverSettings.ACLsEnabled = config.ACLsEnabled
|
||||
config.ACLResolverSettings.NodeName = config.NodeName
|
||||
config.ACLResolverSettings.Datacenter = config.Datacenter
|
||||
|
@ -521,13 +522,16 @@ func newDefaultDeps(t *testing.T, c *Config) Deps {
|
|||
resolver.Register(builder)
|
||||
|
||||
connPool := &pool.ConnPool{
|
||||
Server: false,
|
||||
SrcAddr: c.RPCSrcAddr,
|
||||
Logger: logger.StandardLogger(&hclog.StandardLoggerOptions{InferLevels: true}),
|
||||
MaxTime: 2 * time.Minute,
|
||||
MaxStreams: 4,
|
||||
TLSConfigurator: tls,
|
||||
Datacenter: c.Datacenter,
|
||||
Server: false,
|
||||
SrcAddr: c.RPCSrcAddr,
|
||||
Logger: logger.StandardLogger(&hclog.StandardLoggerOptions{InferLevels: true}),
|
||||
MaxTime: 2 * time.Minute,
|
||||
MaxStreams: 4,
|
||||
TLSConfigurator: tls,
|
||||
Datacenter: c.Datacenter,
|
||||
Timeout: c.RPCHoldTimeout,
|
||||
DefaultQueryTime: c.DefaultQueryTime,
|
||||
MaxQueryTime: c.MaxQueryTime,
|
||||
}
|
||||
|
||||
return Deps{
|
||||
|
@ -853,3 +857,67 @@ func TestClient_ShortReconnectTimeout(t *testing.T) {
|
|||
50*time.Millisecond,
|
||||
"The client node was not reaped within the alotted time")
|
||||
}
|
||||
|
||||
type waiter struct {
|
||||
duration time.Duration
|
||||
}
|
||||
|
||||
func (w *waiter) Wait(struct{}, *struct{}) error {
|
||||
time.Sleep(w.duration)
|
||||
return nil
|
||||
}
|
||||
|
||||
func TestClient_RPC_Timeout(t *testing.T) {
|
||||
if testing.Short() {
|
||||
t.Skip("too slow for testing.Short")
|
||||
}
|
||||
t.Parallel()
|
||||
|
||||
_, s1 := testServerWithConfig(t)
|
||||
|
||||
_, c1 := testClientWithConfig(t, func(c *Config) {
|
||||
c.Datacenter = "dc1"
|
||||
c.NodeName = uniqueNodeName(t.Name())
|
||||
c.RPCHoldTimeout = 10 * time.Millisecond
|
||||
c.DefaultQueryTime = 100 * time.Millisecond
|
||||
c.MaxQueryTime = 200 * time.Millisecond
|
||||
})
|
||||
joinLAN(t, c1, s1)
|
||||
|
||||
retry.Run(t, func(r *retry.R) {
|
||||
var out struct{}
|
||||
if err := c1.RPC("Status.Ping", struct{}{}, &out); err != nil {
|
||||
r.Fatalf("err: %v", err)
|
||||
}
|
||||
})
|
||||
|
||||
// waiter will sleep for 50ms
|
||||
require.NoError(t, s1.RegisterEndpoint("Wait", &waiter{duration: 50 * time.Millisecond}))
|
||||
|
||||
// Requests with QueryOptions have a default timeout of RPCHoldTimeout (10ms)
|
||||
// so we expect the RPC call to timeout.
|
||||
var out struct{}
|
||||
err := c1.RPC("Wait.Wait", &structs.NodeSpecificRequest{}, &out)
|
||||
require.Error(t, err)
|
||||
require.Contains(t, err.Error(), "rpc error making call: i/o deadline reached")
|
||||
|
||||
// Blocking requests have a longer timeout (100ms) so this should pass
|
||||
out = struct{}{}
|
||||
err = c1.RPC("Wait.Wait", &structs.NodeSpecificRequest{
|
||||
QueryOptions: structs.QueryOptions{
|
||||
MinQueryIndex: 1,
|
||||
},
|
||||
}, &out)
|
||||
require.NoError(t, err)
|
||||
|
||||
// We pass in a custom MaxQueryTime (20ms) through QueryOptions which should fail
|
||||
out = struct{}{}
|
||||
err = c1.RPC("Wait.Wait", &structs.NodeSpecificRequest{
|
||||
QueryOptions: structs.QueryOptions{
|
||||
MinQueryIndex: 1,
|
||||
MaxQueryTime: 20 * time.Millisecond,
|
||||
},
|
||||
}, &out)
|
||||
require.Error(t, err)
|
||||
require.Contains(t, err.Error(), "rpc error making call: i/o deadline reached")
|
||||
}
|
||||
|
|
|
@ -1374,6 +1374,10 @@ func (r isReadRequest) HasTimedOut(since time.Time, rpcHoldTimeout, maxQueryTime
|
|||
return false, nil
|
||||
}
|
||||
|
||||
func (r isReadRequest) Timeout(rpcHoldTimeout, maxQueryTime, defaultQueryTime time.Duration) time.Duration {
|
||||
return time.Duration(-1)
|
||||
}
|
||||
|
||||
func TestRPC_AuthorizeRaftRPC(t *testing.T) {
|
||||
caPEM, caPK, err := tlsutil.GenerateCA(tlsutil.CAOpts{Days: 5, Domain: "consul"})
|
||||
require.NoError(t, err)
|
||||
|
|
|
@ -165,7 +165,7 @@ func testServerConfig(t *testing.T) (string, *Config) {
|
|||
|
||||
// TODO (slackpad) - We should be able to run all tests w/o this, but it
|
||||
// looks like several depend on it.
|
||||
config.RPCHoldTimeout = 5 * time.Second
|
||||
config.RPCHoldTimeout = 10 * time.Second
|
||||
|
||||
config.ConnectEnabled = true
|
||||
config.CAConfig = &structs.CAConfiguration{
|
||||
|
|
|
@ -31,7 +31,7 @@ type muxSession interface {
|
|||
|
||||
// streamClient is used to wrap a stream with an RPC client
|
||||
type StreamClient struct {
|
||||
stream net.Conn
|
||||
stream *TimeoutConn
|
||||
codec rpc.ClientCodec
|
||||
}
|
||||
|
||||
|
@ -56,6 +56,36 @@ type Conn struct {
|
|||
clientLock sync.Mutex
|
||||
}
|
||||
|
||||
// TimeoutConn wraps net.Conn with a read timeout.
|
||||
// When set, FirstReadTimeout only applies to the very next Read.
|
||||
// DefaultTimeout is used for any other Read.
|
||||
type TimeoutConn struct {
|
||||
net.Conn
|
||||
DefaultTimeout time.Duration
|
||||
FirstReadTimeout time.Duration
|
||||
}
|
||||
|
||||
func (c *TimeoutConn) Read(b []byte) (int, error) {
|
||||
timeout := c.DefaultTimeout
|
||||
// Apply timeout to first read then zero it out
|
||||
if c.FirstReadTimeout > 0 {
|
||||
timeout = c.FirstReadTimeout
|
||||
c.FirstReadTimeout = 0
|
||||
}
|
||||
var deadline time.Time
|
||||
if timeout > 0 {
|
||||
deadline = time.Now().Add(timeout)
|
||||
}
|
||||
if err := c.Conn.SetReadDeadline(deadline); err != nil {
|
||||
return 0, err
|
||||
}
|
||||
return c.Conn.Read(b)
|
||||
}
|
||||
|
||||
func (c *TimeoutConn) Write(b []byte) (int, error) {
|
||||
return c.Conn.Write(b)
|
||||
}
|
||||
|
||||
func (c *Conn) Close() error {
|
||||
return c.session.Close()
|
||||
}
|
||||
|
@ -79,12 +109,14 @@ func (c *Conn) getClient() (*StreamClient, error) {
|
|||
return nil, err
|
||||
}
|
||||
|
||||
timeoutStream := &TimeoutConn{Conn: stream, DefaultTimeout: c.pool.Timeout}
|
||||
|
||||
// Create the RPC client
|
||||
codec := msgpackrpc.NewCodecFromHandle(true, true, stream, structs.MsgpackHandle)
|
||||
codec := msgpackrpc.NewCodecFromHandle(true, true, timeoutStream, structs.MsgpackHandle)
|
||||
|
||||
// Return a new stream client
|
||||
sc := &StreamClient{
|
||||
stream: stream,
|
||||
stream: timeoutStream,
|
||||
codec: codec,
|
||||
}
|
||||
return sc, nil
|
||||
|
@ -101,7 +133,7 @@ func (c *Conn) returnClient(client *StreamClient) {
|
|||
|
||||
// If this is a Yamux stream, shrink the internal buffers so that
|
||||
// we can GC the idle memory
|
||||
if ys, ok := client.stream.(*yamux.Stream); ok {
|
||||
if ys, ok := client.stream.Conn.(*yamux.Stream); ok {
|
||||
ys.Shrink()
|
||||
}
|
||||
}
|
||||
|
@ -133,6 +165,13 @@ type ConnPool struct {
|
|||
// TODO: consider refactoring to accept a full yamux.Config instead of a logger
|
||||
Logger *log.Logger
|
||||
|
||||
// The default timeout for stream reads/writes
|
||||
Timeout time.Duration
|
||||
|
||||
// Used for calculating timeouts on RPC requests
|
||||
MaxQueryTime time.Duration
|
||||
DefaultQueryTime time.Duration
|
||||
|
||||
// The maximum time to keep a connection open
|
||||
MaxTime time.Duration
|
||||
|
||||
|
@ -325,7 +364,7 @@ func (p *ConnPool) dial(
|
|||
tlsRPCType RPCType,
|
||||
) (net.Conn, HalfCloser, error) {
|
||||
// Try to dial the conn
|
||||
d := &net.Dialer{LocalAddr: p.SrcAddr, Timeout: DefaultDialTimeout}
|
||||
d := &net.Dialer{LocalAddr: p.SrcAddr, Timeout: p.Timeout}
|
||||
conn, err := d.Dial("tcp", addr.String())
|
||||
if err != nil {
|
||||
return nil, nil, err
|
||||
|
@ -590,6 +629,11 @@ func (p *ConnPool) rpc(dc string, nodeName string, addr net.Addr, method string,
|
|||
return fmt.Errorf("rpc error getting client: %w", err)
|
||||
}
|
||||
|
||||
// Use the zero value if the request doesn't implement RPCInfo
|
||||
if info, ok := args.(structs.RPCInfo); ok {
|
||||
sc.stream.FirstReadTimeout = info.Timeout(p.Timeout, p.MaxQueryTime, p.DefaultQueryTime)
|
||||
}
|
||||
|
||||
// Make the RPC call
|
||||
err = msgpackrpc.CallWithCodec(sc.codec, method, args, reply)
|
||||
if err != nil {
|
||||
|
|
|
@ -169,11 +169,14 @@ func newConnPool(config *config.RuntimeConfig, logger hclog.Logger, tls *tlsutil
|
|||
}
|
||||
|
||||
pool := &pool.ConnPool{
|
||||
Server: config.ServerMode,
|
||||
SrcAddr: rpcSrcAddr,
|
||||
Logger: logger.StandardLogger(&hclog.StandardLoggerOptions{InferLevels: true}),
|
||||
TLSConfigurator: tls,
|
||||
Datacenter: config.Datacenter,
|
||||
Server: config.ServerMode,
|
||||
SrcAddr: rpcSrcAddr,
|
||||
Logger: logger.StandardLogger(&hclog.StandardLoggerOptions{InferLevels: true}),
|
||||
TLSConfigurator: tls,
|
||||
Datacenter: config.Datacenter,
|
||||
Timeout: config.RPCHoldTimeout,
|
||||
MaxQueryTime: config.MaxQueryTime,
|
||||
DefaultQueryTime: config.DefaultQueryTime,
|
||||
}
|
||||
if config.ServerMode {
|
||||
pool.MaxTime = 2 * time.Minute
|
||||
|
|
|
@ -18,14 +18,12 @@ import (
|
|||
"github.com/golang/protobuf/ptypes/timestamp"
|
||||
|
||||
"github.com/golang/protobuf/proto"
|
||||
ptypes "github.com/golang/protobuf/ptypes"
|
||||
"github.com/hashicorp/consul-net-rpc/go-msgpack/codec"
|
||||
"github.com/hashicorp/go-multierror"
|
||||
"github.com/hashicorp/serf/coordinate"
|
||||
"github.com/mitchellh/hashstructure"
|
||||
|
||||
"github.com/hashicorp/consul-net-rpc/go-msgpack/codec"
|
||||
|
||||
ptypes "github.com/golang/protobuf/ptypes"
|
||||
|
||||
"github.com/hashicorp/consul/acl"
|
||||
"github.com/hashicorp/consul/agent/cache"
|
||||
"github.com/hashicorp/consul/api"
|
||||
|
@ -217,6 +215,7 @@ type RPCInfo interface {
|
|||
TokenSecret() string
|
||||
SetTokenSecret(string)
|
||||
HasTimedOut(since time.Time, rpcHoldTimeout, maxQueryTime, defaultQueryTime time.Duration) (bool, error)
|
||||
Timeout(rpcHoldTimeout, maxQueryTime, defaultQueryTime time.Duration) time.Duration
|
||||
}
|
||||
|
||||
// QueryOptions is used to specify various flags for read queries
|
||||
|
@ -315,18 +314,24 @@ func (q *QueryOptions) SetTokenSecret(s string) {
|
|||
q.Token = s
|
||||
}
|
||||
|
||||
func (q QueryOptions) HasTimedOut(start time.Time, rpcHoldTimeout, maxQueryTime, defaultQueryTime time.Duration) (bool, error) {
|
||||
func (q QueryOptions) Timeout(rpcHoldTimeout, maxQueryTime, defaultQueryTime time.Duration) time.Duration {
|
||||
// Match logic in Server.blockingQuery.
|
||||
if q.MinQueryIndex > 0 {
|
||||
if q.MaxQueryTime > maxQueryTime {
|
||||
q.MaxQueryTime = maxQueryTime
|
||||
} else if q.MaxQueryTime <= 0 {
|
||||
q.MaxQueryTime = defaultQueryTime
|
||||
}
|
||||
// Timeout after maximum jitter has elapsed.
|
||||
q.MaxQueryTime += lib.RandomStagger(q.MaxQueryTime / JitterFraction)
|
||||
|
||||
return time.Since(start) > (q.MaxQueryTime + rpcHoldTimeout), nil
|
||||
return q.MaxQueryTime + rpcHoldTimeout
|
||||
}
|
||||
return time.Since(start) > rpcHoldTimeout, nil
|
||||
return rpcHoldTimeout
|
||||
}
|
||||
|
||||
func (q QueryOptions) HasTimedOut(start time.Time, rpcHoldTimeout, maxQueryTime, defaultQueryTime time.Duration) (bool, error) {
|
||||
return time.Since(start) > q.Timeout(rpcHoldTimeout, maxQueryTime, defaultQueryTime), nil
|
||||
}
|
||||
|
||||
type WriteRequest struct {
|
||||
|
@ -353,7 +358,11 @@ func (w *WriteRequest) SetTokenSecret(s string) {
|
|||
}
|
||||
|
||||
func (w WriteRequest) HasTimedOut(start time.Time, rpcHoldTimeout, maxQueryTime, defaultQueryTime time.Duration) (bool, error) {
|
||||
return time.Since(start) > rpcHoldTimeout, nil
|
||||
return time.Since(start) > w.Timeout(rpcHoldTimeout, maxQueryTime, defaultQueryTime), nil
|
||||
}
|
||||
|
||||
func (w WriteRequest) Timeout(rpcHoldTimeout, maxQueryTime, defaultQueryTime time.Duration) time.Duration {
|
||||
return rpcHoldTimeout
|
||||
}
|
||||
|
||||
type QueryBackend int
|
||||
|
|
|
@ -344,6 +344,14 @@ func (msg *%[1]s) HasTimedOut(start time.Time, rpcHoldTimeout time.Duration, a t
|
|||
return msg.%[2]s.HasTimedOut(start, rpcHoldTimeout, a, b)
|
||||
}
|
||||
|
||||
// Timeout implements structs.RPCInfo
|
||||
func (msg *%[1]s) Timeout(rpcHoldTimeout time.Duration, a time.Duration, b time.Duration) time.Duration {
|
||||
if msg == nil || msg.%[2]s == nil {
|
||||
return 0
|
||||
}
|
||||
return msg.%[2]s.Timeout(rpcHoldTimeout, a, b)
|
||||
}
|
||||
|
||||
// IsRead implements structs.RPCInfo
|
||||
func (msg *%[1]s) IsRead() bool {
|
||||
return false
|
||||
|
@ -392,6 +400,14 @@ func (msg *%[1]s) HasTimedOut(start time.Time, rpcHoldTimeout time.Duration, a t
|
|||
return msg.%[2]s.HasTimedOut(start, rpcHoldTimeout, a, b)
|
||||
}
|
||||
|
||||
// Timeout implements structs.RPCInfo
|
||||
func (msg *%[1]s) Timeout(rpcHoldTimeout time.Duration, a time.Duration, b time.Duration) time.Duration {
|
||||
if msg == nil || msg.%[2]s == nil {
|
||||
return 0
|
||||
}
|
||||
return msg.%[2]s.Timeout(rpcHoldTimeout, a, b)
|
||||
}
|
||||
|
||||
// SetTokenSecret implements structs.RPCInfo
|
||||
func (msg *%[1]s) SetTokenSecret(s string) {
|
||||
// TODO: initialize if nil
|
||||
|
@ -443,6 +459,15 @@ func (msg *%[1]s) HasTimedOut(start time.Time, rpcHoldTimeout time.Duration, a t
|
|||
}
|
||||
return msg.%[2]s.HasTimedOut(start, rpcHoldTimeout, a, b)
|
||||
}
|
||||
|
||||
// Timeout implements structs.RPCInfo
|
||||
func (msg *%[1]s) Timeout(rpcHoldTimeout time.Duration, a time.Duration, b time.Duration) time.Duration {
|
||||
if msg == nil || msg.%[2]s == nil {
|
||||
return 0
|
||||
}
|
||||
return msg.%[2]s.Timeout(rpcHoldTimeout, a, b)
|
||||
}
|
||||
|
||||
// SetTokenSecret implements structs.RPCInfo
|
||||
func (msg *%[1]s) SetTokenSecret(s string) {
|
||||
// TODO: initialize if nil
|
||||
|
|
|
@ -23,5 +23,9 @@ func (req *AutoConfigRequest) SetTokenSecret(token string) {
|
|||
}
|
||||
|
||||
func (req *AutoConfigRequest) HasTimedOut(start time.Time, rpcHoldTimeout, maxQueryTime, defaultQueryTime time.Duration) (bool, error) {
|
||||
return time.Since(start) > rpcHoldTimeout, nil
|
||||
return time.Since(start) > req.Timeout(rpcHoldTimeout, maxQueryTime, defaultQueryTime), nil
|
||||
}
|
||||
|
||||
func (req *AutoConfigRequest) Timeout(rpcHoldTimeout, maxQueryTime, defaultQueryTime time.Duration) time.Duration {
|
||||
return rpcHoldTimeout
|
||||
}
|
||||
|
|
|
@ -75,12 +75,16 @@ func (q *QueryOptions) SetStaleIfError(staleIfError time.Duration) {
|
|||
}
|
||||
|
||||
func (q *QueryOptions) HasTimedOut(start time.Time, rpcHoldTimeout, maxQueryTime, defaultQueryTime time.Duration) (bool, error) {
|
||||
return time.Since(start) > q.Timeout(rpcHoldTimeout, maxQueryTime, defaultQueryTime), nil
|
||||
}
|
||||
|
||||
func (q *QueryOptions) Timeout(rpcHoldTimeout, maxQueryTime, defaultQueryTime time.Duration) time.Duration {
|
||||
maxTime := structs.DurationFromProto(q.MaxQueryTime)
|
||||
o := structs.QueryOptions{
|
||||
MaxQueryTime: maxTime,
|
||||
MinQueryIndex: q.MinQueryIndex,
|
||||
}
|
||||
return o.HasTimedOut(start, rpcHoldTimeout, maxQueryTime, defaultQueryTime)
|
||||
return o.Timeout(rpcHoldTimeout, maxQueryTime, defaultQueryTime)
|
||||
}
|
||||
|
||||
// SetFilter is needed to implement the structs.QueryOptionsCompat interface
|
||||
|
@ -113,8 +117,13 @@ func (w *WriteRequest) AllowStaleRead() bool {
|
|||
}
|
||||
|
||||
// HasTimedOut implements structs.RPCInfo
|
||||
func (w *WriteRequest) HasTimedOut(start time.Time, rpcHoldTimeout, _, _ time.Duration) (bool, error) {
|
||||
return time.Since(start) > rpcHoldTimeout, nil
|
||||
func (w *WriteRequest) HasTimedOut(start time.Time, rpcHoldTimeout, maxQueryTime, defaultQueryTime time.Duration) (bool, error) {
|
||||
return time.Since(start) > w.Timeout(rpcHoldTimeout, maxQueryTime, defaultQueryTime), nil
|
||||
}
|
||||
|
||||
// Timeout implements structs.RPCInfo
|
||||
func (w *WriteRequest) Timeout(rpcHoldTimeout, _, _ time.Duration) time.Duration {
|
||||
return rpcHoldTimeout
|
||||
}
|
||||
|
||||
// IsRead implements structs.RPCInfo
|
||||
|
@ -140,7 +149,12 @@ func (r *ReadRequest) SetTokenSecret(token string) {
|
|||
|
||||
// HasTimedOut implements structs.RPCInfo
|
||||
func (r *ReadRequest) HasTimedOut(start time.Time, rpcHoldTimeout, maxQueryTime, defaultQueryTime time.Duration) (bool, error) {
|
||||
return time.Since(start) > rpcHoldTimeout, nil
|
||||
return time.Since(start) > r.Timeout(rpcHoldTimeout, maxQueryTime, defaultQueryTime), nil
|
||||
}
|
||||
|
||||
// Timeout implements structs.RPCInfo
|
||||
func (r *ReadRequest) Timeout(rpcHoldTimeout, _, _ time.Duration) time.Duration {
|
||||
return rpcHoldTimeout
|
||||
}
|
||||
|
||||
// RequestDatacenter implements structs.RPCInfo
|
||||
|
|
|
@ -29,5 +29,10 @@ func (req *SubscribeRequest) SetTokenSecret(token string) {
|
|||
|
||||
// HasTimedOut implements structs.RPCInfo
|
||||
func (req *SubscribeRequest) HasTimedOut(start time.Time, rpcHoldTimeout, maxQueryTime, defaultQueryTime time.Duration) (bool, error) {
|
||||
return time.Since(start) > rpcHoldTimeout, nil
|
||||
return time.Since(start) > req.Timeout(rpcHoldTimeout, maxQueryTime, defaultQueryTime), nil
|
||||
}
|
||||
|
||||
// Timeout implements structs.RPCInfo
|
||||
func (req *SubscribeRequest) Timeout(rpcHoldTimeout, maxQueryTime, defaultQueryTime time.Duration) time.Duration {
|
||||
return rpcHoldTimeout
|
||||
}
|
||||
|
|
Loading…
Reference in a new issue