Refactor client RPC timeouts (#14965)

Fix an issue where rpc_hold_timeout was being used as the timeout for non-blocking queries. Users should be able to tune read timeouts without fiddling with rpc_hold_timeout. A new configuration `rpc_read_timeout` is created.

Refactor some implementation from the original PR 11500 to remove the misleading linkage between RPCInfo's timeout (used to retry in case of certain modes of failures) and the client RPC timeouts.
This commit is contained in:
Chris S. Kim 2022-10-18 15:05:09 -04:00 committed by GitHub
parent 0712e1a456
commit e4c20ec190
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
27 changed files with 216 additions and 174 deletions

3
.changelog/14965.txt Normal file
View File

@ -0,0 +1,3 @@
```release-note:feature
agent: Added a new config option `rpc_client_timeout` to tune timeouts for client RPC requests
```

View File

@ -1407,6 +1407,7 @@ func newConsulConfig(runtimeCfg *config.RuntimeConfig, logger hclog.Logger) (*co
// RPC-related performance configs. We allow explicit zero value to disable so
// copy it whatever the value.
cfg.RPCHoldTimeout = runtimeCfg.RPCHoldTimeout
cfg.RPCClientTimeout = runtimeCfg.RPCClientTimeout
cfg.RPCConfig = runtimeCfg.RPCConfig
@ -4142,6 +4143,7 @@ func (a *Agent) reloadConfigInternal(newCfg *config.RuntimeConfig) error {
}
cc := consul.ReloadableConfig{
RPCClientTimeout: newCfg.RPCClientTimeout,
RPCRateLimit: newCfg.RPCRateLimit,
RPCMaxBurst: newCfg.RPCMaxBurst,
RPCMaxConnsPerClient: newCfg.RPCMaxConnsPerClient,

View File

@ -4198,6 +4198,36 @@ func TestAgent_consulConfig_AutoEncryptAllowTLS(t *testing.T) {
require.True(t, a.consulConfig().AutoEncryptAllowTLS)
}
func TestAgent_ReloadConfigRPCClientConfig(t *testing.T) {
if testing.Short() {
t.Skip("too slow for testing.Short")
}
dataDir := testutil.TempDir(t, "agent") // we manage the data dir
hcl := `
data_dir = "` + dataDir + `"
server = false
bootstrap = false
`
a := NewTestAgent(t, hcl)
defaultRPCTimeout := 60 * time.Second
require.Equal(t, defaultRPCTimeout, a.baseDeps.ConnPool.RPCClientTimeout())
hcl = `
data_dir = "` + dataDir + `"
server = false
bootstrap = false
limits {
rpc_client_timeout = "2m"
}
`
c := TestConfig(testutil.Logger(t), config.FileSource{Name: t.Name(), Format: "hcl", Data: hcl})
require.NoError(t, a.reloadConfigInternal(c))
require.Equal(t, 2*time.Minute, a.baseDeps.ConnPool.RPCClientTimeout())
}
func TestAgent_consulConfig_RaftTrailingLogs(t *testing.T) {
if testing.Short() {
t.Skip("too slow for testing.Short")

View File

@ -19,7 +19,6 @@ import (
"time"
"github.com/armon/go-metrics/prometheus"
hcpconfig "github.com/hashicorp/consul/agent/hcp/config"
"github.com/hashicorp/go-bexpr"
"github.com/hashicorp/go-hclog"
"github.com/hashicorp/go-multierror"
@ -27,6 +26,8 @@ import (
"github.com/hashicorp/memberlist"
"golang.org/x/time/rate"
hcpconfig "github.com/hashicorp/consul/agent/hcp/config"
"github.com/hashicorp/consul/agent/cache"
"github.com/hashicorp/consul/agent/checks"
"github.com/hashicorp/consul/agent/connect/ca"
@ -1030,6 +1031,7 @@ func (b *builder) build() (rt RuntimeConfig, err error) {
RPCBindAddr: rpcBindAddr,
RPCHandshakeTimeout: b.durationVal("limits.rpc_handshake_timeout", c.Limits.RPCHandshakeTimeout),
RPCHoldTimeout: b.durationVal("performance.rpc_hold_timeout", c.Performance.RPCHoldTimeout),
RPCClientTimeout: b.durationVal("limits.rpc_client_timeout", c.Limits.RPCClientTimeout),
RPCMaxBurst: intVal(c.Limits.RPCMaxBurst),
RPCMaxConnsPerClient: intVal(c.Limits.RPCMaxConnsPerClient),
RPCProtocol: intVal(c.RPCProtocol),

View File

@ -716,6 +716,7 @@ type UnixSocket struct {
type Limits struct {
HTTPMaxConnsPerClient *int `mapstructure:"http_max_conns_per_client"`
HTTPSHandshakeTimeout *string `mapstructure:"https_handshake_timeout"`
RPCClientTimeout *string `mapstructure:"rpc_client_timeout"`
RPCHandshakeTimeout *string `mapstructure:"rpc_handshake_timeout"`
RPCMaxBurst *int `mapstructure:"rpc_max_burst"`
RPCMaxConnsPerClient *int `mapstructure:"rpc_max_conns_per_client"`

View File

@ -98,6 +98,7 @@ func DefaultSource() Source {
http_max_conns_per_client = 200
https_handshake_timeout = "5s"
rpc_handshake_timeout = "5s"
rpc_client_timeout = "60s"
rpc_rate = -1
rpc_max_burst = 1000
rpc_max_conns_per_client = 100

View File

@ -133,7 +133,7 @@ type RuntimeConfig struct {
// AutopilotMinQuorum sets the minimum number of servers required in a cluster
// before autopilot can prune dead servers.
//
//hcl: autopilot { min_quorum = int }
// hcl: autopilot { min_quorum = int }
AutopilotMinQuorum uint
// AutopilotRedundancyZoneTag is the Meta tag to use for separating servers
@ -908,6 +908,18 @@ type RuntimeConfig struct {
// hcl: performance { rpc_hold_timeout = "duration" }
RPCHoldTimeout time.Duration
// RPCClientTimeout limits how long a client is allowed to read from an RPC
// connection. This is used to set an upper bound for requests to eventually
// terminate so that RPC connections are not held indefinitely.
// It may be set to 0 explicitly to disable the timeout but this should never
// be used in production. Default is 60 seconds.
//
// Note: Blocking queries use MaxQueryTime and DefaultQueryTime to calculate
// timeouts.
//
// hcl: limits { rpc_client_timeout = "duration" }
RPCClientTimeout time.Duration
// RPCRateLimit and RPCMaxBurst control how frequently RPC calls are allowed
// to happen. In any large enough time interval, rate limiter limits the
// rate to RPCRateLimit tokens per second, with a maximum burst size of
@ -1345,7 +1357,7 @@ type RuntimeConfig struct {
SkipLeaveOnInt bool
// AutoReloadConfig indicate if the config will be
//auto reloaded bases on config file modification
// auto reloaded bases on config file modification
// hcl: auto_reload_config = (true|false)
AutoReloadConfig bool

View File

@ -19,9 +19,10 @@ import (
"github.com/armon/go-metrics/prometheus"
"github.com/google/go-cmp/cmp/cmpopts"
hcpconfig "github.com/hashicorp/consul/agent/hcp/config"
"github.com/stretchr/testify/require"
hcpconfig "github.com/hashicorp/consul/agent/hcp/config"
"github.com/hashicorp/consul/acl"
"github.com/hashicorp/consul/agent/cache"
"github.com/hashicorp/consul/agent/checks"
@ -4577,6 +4578,7 @@ func TestLoad_IntegrationWithFlags(t *testing.T) {
// defaults are changed from these values forcing that change to be
// intentional.
rt.RPCHandshakeTimeout = 5 * time.Second
rt.RPCClientTimeout = 60 * time.Second
rt.HTTPSHandshakeTimeout = 5 * time.Second
rt.HTTPMaxConnsPerClient = 200
rt.RPCMaxConnsPerClient = 100
@ -6115,6 +6117,7 @@ func TestLoad_FullConfig(t *testing.T) {
RPCAdvertiseAddr: tcpAddr("17.99.29.16:3757"),
RPCBindAddr: tcpAddr("16.99.34.17:3757"),
RPCHandshakeTimeout: 1932 * time.Millisecond,
RPCClientTimeout: 62 * time.Second,
RPCHoldTimeout: 15707 * time.Second,
RPCProtocol: 30793,
RPCRateLimit: 12029.43,

View File

@ -265,6 +265,7 @@
"RPCMaxConnsPerClient": 0,
"RPCProtocol": 0,
"RPCRateLimit": 0,
"RPCClientTimeout": "0s",
"RaftBoltDBConfig": {
"NoFreelistSync": false
},

View File

@ -300,6 +300,7 @@ limits {
http_max_conns_per_client = 100
https_handshake_timeout = "2391ms"
rpc_handshake_timeout = "1932ms"
rpc_client_timeout = "62s"
rpc_rate = 12029.43
rpc_max_burst = 44848
rpc_max_conns_per_client = 2954

View File

@ -300,6 +300,7 @@
"http_max_conns_per_client": 100,
"https_handshake_timeout": "2391ms",
"rpc_handshake_timeout": "1932ms",
"rpc_client_timeout": "62s",
"rpc_rate": 12029.43,
"rpc_max_burst": 44848,
"rpc_max_conns_per_client": 2954,

View File

@ -1690,7 +1690,6 @@ func TestCatalog_ListServices_Stale(t *testing.T) {
c.PrimaryDatacenter = "dc1" // Enable ACLs!
c.ACLsEnabled = true
c.Bootstrap = false // Disable bootstrap
c.RPCHoldTimeout = 10 * time.Millisecond
})
defer os.RemoveAll(dir2)
defer s2.Shutdown()

View File

@ -397,12 +397,12 @@ func (c *Client) Stats() map[string]map[string]string {
// GetLANCoordinate returns the coordinate of the node in the LAN gossip
// pool.
//
// - Clients return a single coordinate for the single gossip pool they are
// in (default, segment, or partition).
// - Clients return a single coordinate for the single gossip pool they are
// in (default, segment, or partition).
//
// - Servers return one coordinate for their canonical gossip pool (i.e.
// default partition/segment) and one per segment they are also ancillary
// members of.
// - Servers return one coordinate for their canonical gossip pool (i.e.
// default partition/segment) and one per segment they are also ancillary
// members of.
//
// NOTE: servers do not emit coordinates for partitioned gossip pools they
// are ancillary members of.
@ -422,6 +422,7 @@ func (c *Client) GetLANCoordinate() (lib.CoordinateSet, error) {
// relevant configuration information
func (c *Client) ReloadConfig(config ReloadableConfig) error {
c.rpcLimiter.Store(rate.NewLimiter(config.RPCRateLimit, config.RPCMaxBurst))
c.connPool.SetRPCClientTimeout(config.RPCClientTimeout)
return nil
}

View File

@ -50,7 +50,6 @@ func testClientConfig(t *testing.T) (string, *Config) {
config.SerfLANConfig.MemberlistConfig.ProbeTimeout = 200 * time.Millisecond
config.SerfLANConfig.MemberlistConfig.ProbeInterval = time.Second
config.SerfLANConfig.MemberlistConfig.GossipInterval = 100 * time.Millisecond
config.RPCHoldTimeout = 10 * time.Second
return dir, config
}
@ -531,11 +530,10 @@ func newDefaultDeps(t *testing.T, c *Config) Deps {
MaxStreams: 4,
TLSConfigurator: tls,
Datacenter: c.Datacenter,
Timeout: c.RPCHoldTimeout,
DefaultQueryTime: c.DefaultQueryTime,
MaxQueryTime: c.MaxQueryTime,
}
connPool.SetRPCClientTimeout(c.RPCClientTimeout)
return Deps{
EventPublisher: stream.NewEventPublisher(10 * time.Second),
Logger: logger,
@ -882,7 +880,7 @@ func TestClient_RPC_Timeout(t *testing.T) {
_, c1 := testClientWithConfig(t, func(c *Config) {
c.Datacenter = "dc1"
c.NodeName = uniqueNodeName(t.Name())
c.RPCHoldTimeout = 10 * time.Millisecond
c.RPCClientTimeout = 10 * time.Millisecond
c.DefaultQueryTime = 100 * time.Millisecond
c.MaxQueryTime = 200 * time.Millisecond
})
@ -895,34 +893,53 @@ func TestClient_RPC_Timeout(t *testing.T) {
}
})
// waiter will sleep for 101ms which is 1ms more than the DefaultQueryTime
require.NoError(t, s1.RegisterEndpoint("Wait", &waiter{duration: 101 * time.Millisecond}))
require.NoError(t, s1.RegisterEndpoint("Long", &waiter{duration: 100 * time.Millisecond}))
require.NoError(t, s1.RegisterEndpoint("Short", &waiter{duration: 5 * time.Millisecond}))
// Requests with QueryOptions have a default timeout of RPCHoldTimeout (10ms)
// so we expect the RPC call to timeout.
var out struct{}
err := c1.RPC("Wait.Wait", &structs.NodeSpecificRequest{}, &out)
require.Error(t, err)
require.Contains(t, err.Error(), "rpc error making call: i/o deadline reached")
t.Run("non-blocking query times out after RPCClientTimeout", func(t *testing.T) {
// Requests with QueryOptions have a default timeout of RPCClientTimeout (10ms)
// so we expect the RPC call to timeout.
var out struct{}
err := c1.RPC("Long.Wait", &structs.NodeSpecificRequest{}, &out)
require.Error(t, err)
require.Contains(t, err.Error(), "rpc error making call: i/o deadline reached")
})
// Blocking requests have a longer timeout (100ms) so this should pass since we
// add the maximum jitter which should be 16ms
out = struct{}{}
err = c1.RPC("Wait.Wait", &structs.NodeSpecificRequest{
QueryOptions: structs.QueryOptions{
MinQueryIndex: 1,
},
}, &out)
require.NoError(t, err)
t.Run("non-blocking query succeeds", func(t *testing.T) {
var out struct{}
require.NoError(t, c1.RPC("Short.Wait", &structs.NodeSpecificRequest{}, &out))
})
// We pass in a custom MaxQueryTime (20ms) through QueryOptions which should fail
out = struct{}{}
err = c1.RPC("Wait.Wait", &structs.NodeSpecificRequest{
QueryOptions: structs.QueryOptions{
MinQueryIndex: 1,
MaxQueryTime: 20 * time.Millisecond,
},
}, &out)
require.Error(t, err)
require.Contains(t, err.Error(), "rpc error making call: i/o deadline reached")
t.Run("check that deadline does not persist across calls", func(t *testing.T) {
var out struct{}
err := c1.RPC("Long.Wait", &structs.NodeSpecificRequest{}, &out)
require.Error(t, err)
require.Contains(t, err.Error(), "rpc error making call: i/o deadline reached")
require.NoError(t, c1.RPC("Long.Wait", &structs.NodeSpecificRequest{
QueryOptions: structs.QueryOptions{
MinQueryIndex: 1,
},
}, &out))
})
t.Run("blocking query succeeds", func(t *testing.T) {
var out struct{}
require.NoError(t, c1.RPC("Long.Wait", &structs.NodeSpecificRequest{
QueryOptions: structs.QueryOptions{
MinQueryIndex: 1,
},
}, &out))
})
t.Run("blocking query with short MaxQueryTime fails", func(t *testing.T) {
var out struct{}
err := c1.RPC("Long.Wait", &structs.NodeSpecificRequest{
QueryOptions: structs.QueryOptions{
MinQueryIndex: 1,
MaxQueryTime: 20 * time.Millisecond,
},
}, &out)
require.Error(t, err)
require.Contains(t, err.Error(), "rpc error making call: i/o deadline reached")
})
}

View File

@ -331,6 +331,13 @@ type Config struct {
// place, and a small jitter is applied to avoid a thundering herd.
RPCHoldTimeout time.Duration
// RPCClientTimeout limits how long a client is allowed to read from an RPC
// connection. This is used to set an upper bound for non-blocking queries to
// eventually terminate so that RPC connections are not held indefinitely.
// Blocking queries will use MaxQueryTime and DefaultQueryTime to calculate
// their own timeouts.
RPCClientTimeout time.Duration
// RPCRateLimit and RPCMaxBurst control how frequently RPC calls are allowed
// to happen. In any large enough time interval, rate limiter limits the
// rate to RPCRateLimit tokens per second, with a maximum burst size of
@ -612,6 +619,7 @@ type RPCConfig struct {
// ReloadableConfig is the configuration that is passed to ReloadConfig when
// application config is reloaded.
type ReloadableConfig struct {
RPCClientTimeout time.Duration
RPCRateLimit rate.Limit
RPCMaxBurst int
RPCMaxConnsPerClient int

View File

@ -1378,14 +1378,10 @@ func (r isReadRequest) IsRead() bool {
return true
}
func (r isReadRequest) HasTimedOut(since time.Time, rpcHoldTimeout, maxQueryTime, defaultQueryTime time.Duration) (bool, error) {
func (r isReadRequest) HasTimedOut(_ time.Time, _, _, _ time.Duration) (bool, error) {
return false, nil
}
func (r isReadRequest) Timeout(rpcHoldTimeout, maxQueryTime, defaultQueryTime time.Duration) time.Duration {
return time.Duration(-1)
}
func TestRPC_AuthorizeRaftRPC(t *testing.T) {
caPEM, caPK, err := tlsutil.GenerateCA(tlsutil.CAOpts{Days: 5, Domain: "consul"})
require.NoError(t, err)

View File

@ -18,8 +18,8 @@ import (
"time"
"github.com/armon/go-metrics"
"github.com/hashicorp/consul/agent/hcp"
connlimit "github.com/hashicorp/go-connlimit"
"github.com/hashicorp/consul-net-rpc/net/rpc"
"github.com/hashicorp/go-connlimit"
"github.com/hashicorp/go-hclog"
"github.com/hashicorp/go-memdb"
"github.com/hashicorp/go-version"
@ -31,8 +31,6 @@ import (
"golang.org/x/time/rate"
"google.golang.org/grpc"
"github.com/hashicorp/consul-net-rpc/net/rpc"
"github.com/hashicorp/consul/acl"
"github.com/hashicorp/consul/agent/consul/authmethod"
"github.com/hashicorp/consul/agent/consul/authmethod/ssoauth"
@ -49,6 +47,7 @@ import (
"github.com/hashicorp/consul/agent/grpc-external/services/serverdiscovery"
agentgrpc "github.com/hashicorp/consul/agent/grpc-internal"
"github.com/hashicorp/consul/agent/grpc-internal/services/subscribe"
"github.com/hashicorp/consul/agent/hcp"
"github.com/hashicorp/consul/agent/metadata"
"github.com/hashicorp/consul/agent/pool"
"github.com/hashicorp/consul/agent/router"
@ -1646,6 +1645,7 @@ func (s *Server) ReloadConfig(config ReloadableConfig) error {
s.rpcConnLimiter.SetConfig(connlimit.Config{
MaxConnsPerClientIP: config.RPCMaxConnsPerClient,
})
s.connPool.SetRPCClientTimeout(config.RPCClientTimeout)
if s.IsLeader() {
// only bootstrap the config entries if we are the leader

View File

@ -16,9 +16,8 @@ import (
"github.com/armon/go-metrics"
"github.com/google/tcpproxy"
"github.com/hashicorp/consul/agent/hcp"
"github.com/hashicorp/go-hclog"
uuid "github.com/hashicorp/go-uuid"
"github.com/hashicorp/go-uuid"
"github.com/hashicorp/memberlist"
"github.com/hashicorp/raft"
"github.com/stretchr/testify/mock"
@ -26,6 +25,8 @@ import (
"golang.org/x/time/rate"
"google.golang.org/grpc"
"github.com/hashicorp/consul/agent/hcp"
"github.com/hashicorp/consul-net-rpc/net/rpc"
"github.com/hashicorp/consul/agent/connect"
@ -1815,6 +1816,7 @@ func TestServer_ReloadConfig(t *testing.T) {
c.Build = "1.5.0"
c.RPCRateLimit = 500
c.RPCMaxBurst = 5000
c.RPCClientTimeout = 60 * time.Second
// Set one raft param to be non-default in the initial config, others are
// default.
c.RaftConfig.TrailingLogs = 1234
@ -1828,7 +1830,10 @@ func TestServer_ReloadConfig(t *testing.T) {
require.Equal(t, rate.Limit(500), limiter.Limit())
require.Equal(t, 5000, limiter.Burst())
require.Equal(t, 60*time.Second, s.connPool.RPCClientTimeout())
rc := ReloadableConfig{
RPCClientTimeout: 2 * time.Minute,
RPCRateLimit: 1000,
RPCMaxBurst: 10000,
ConfigEntryBootstrap: []structs.ConfigEntry{entryInit},
@ -1857,6 +1862,9 @@ func TestServer_ReloadConfig(t *testing.T) {
require.Equal(t, rate.Limit(1000), limiter.Limit())
require.Equal(t, 10000, limiter.Burst())
// Check RPC client timeout got updated
require.Equal(t, 2*time.Minute, s.connPool.RPCClientTimeout())
// Check raft config
defaults := DefaultConfig()
got := s.raft.ReloadableConfig()

View File

@ -18,6 +18,7 @@ import (
"github.com/hashicorp/consul/agent/structs"
"github.com/hashicorp/consul/lib"
"github.com/hashicorp/consul/proto/pbcommon"
"github.com/hashicorp/consul/tlsutil"
)
@ -31,7 +32,7 @@ type muxSession interface {
// streamClient is used to wrap a stream with an RPC client
type StreamClient struct {
stream *TimeoutConn
stream net.Conn
codec rpc.ClientCodec
}
@ -56,36 +57,6 @@ type Conn struct {
clientLock sync.Mutex
}
// TimeoutConn wraps net.Conn with a read timeout.
// When set, FirstReadTimeout only applies to the very next Read.
// DefaultTimeout is used for any other Read.
type TimeoutConn struct {
net.Conn
DefaultTimeout time.Duration
FirstReadTimeout time.Duration
}
func (c *TimeoutConn) Read(b []byte) (int, error) {
timeout := c.DefaultTimeout
// Apply timeout to first read then zero it out
if c.FirstReadTimeout > 0 {
timeout = c.FirstReadTimeout
c.FirstReadTimeout = 0
}
var deadline time.Time
if timeout > 0 {
deadline = time.Now().Add(timeout)
}
if err := c.Conn.SetReadDeadline(deadline); err != nil {
return 0, err
}
return c.Conn.Read(b)
}
func (c *TimeoutConn) Write(b []byte) (int, error) {
return c.Conn.Write(b)
}
func (c *Conn) Close() error {
return c.session.Close()
}
@ -109,14 +80,12 @@ func (c *Conn) getClient() (*StreamClient, error) {
return nil, err
}
timeoutStream := &TimeoutConn{Conn: stream, DefaultTimeout: c.pool.Timeout}
// Create the RPC client
codec := msgpackrpc.NewCodecFromHandle(true, true, timeoutStream, structs.MsgpackHandle)
codec := msgpackrpc.NewCodecFromHandle(true, true, stream, structs.MsgpackHandle)
// Return a new stream client
sc := &StreamClient{
stream: timeoutStream,
stream: stream,
codec: codec,
}
return sc, nil
@ -133,7 +102,7 @@ func (c *Conn) returnClient(client *StreamClient) {
// If this is a Yamux stream, shrink the internal buffers so that
// we can GC the idle memory
if ys, ok := client.stream.Conn.(*yamux.Stream); ok {
if ys, ok := client.stream.(*yamux.Stream); ok {
ys.Shrink()
}
}
@ -158,6 +127,12 @@ func (c *Conn) markForUse() {
// streams allowed. If TLS settings are provided outgoing connections
// use TLS.
type ConnPool struct {
// clientTimeoutMs is the default timeout for client RPC requests
// in milliseconds. Stored as an atomic uint32 value to allow for
// reloading.
// TODO: once we move to go1.19, change to atomic.Uint32.
clientTimeoutMs uint32
// SrcAddr is the source address for outgoing connections.
SrcAddr *net.TCPAddr
@ -165,11 +140,9 @@ type ConnPool struct {
// TODO: consider refactoring to accept a full yamux.Config instead of a logger
Logger *log.Logger
// The default timeout for stream reads/writes
Timeout time.Duration
// Used for calculating timeouts on RPC requests
MaxQueryTime time.Duration
// MaxQueryTime is used for calculating timeouts on blocking queries.
MaxQueryTime time.Duration
// DefaultQueryTime is used for calculating timeouts on blocking queries.
DefaultQueryTime time.Duration
// The maximum time to keep a connection open
@ -364,7 +337,7 @@ func (p *ConnPool) dial(
tlsRPCType RPCType,
) (net.Conn, HalfCloser, error) {
// Try to dial the conn
d := &net.Dialer{LocalAddr: p.SrcAddr, Timeout: p.Timeout}
d := &net.Dialer{LocalAddr: p.SrcAddr, Timeout: DefaultDialTimeout}
conn, err := d.Dial("tcp", addr.String())
if err != nil {
return nil, nil, err
@ -417,6 +390,18 @@ func (p *ConnPool) dial(
return conn, hc, nil
}
func (p *ConnPool) RPCClientTimeout() time.Duration {
return time.Duration(atomic.LoadUint32(&p.clientTimeoutMs)) * time.Millisecond
}
func (p *ConnPool) SetRPCClientTimeout(timeout time.Duration) {
if timeout > time.Hour {
// Prevent unreasonably large timeouts that might overflow a uint32
timeout = time.Hour
}
atomic.StoreUint32(&p.clientTimeoutMs, uint32(timeout.Milliseconds()))
}
// DialRPCViaMeshGateway dials the destination node and sets up the connection
// to be the correct RPC type using ALPN. This currently is exclusively used to
// dial other servers in foreign datacenters via mesh gateways.
@ -620,6 +605,17 @@ func (p *ConnPool) rpcInsecure(dc string, addr net.Addr, method string, args int
return nil
}
// BlockableQuery represents a read query which can be blocking or non-blocking.
// This interface is used to override the rpc_client_timeout for blocking queries.
type BlockableQuery interface {
// BlockingTimeout returns duration > 0 if the query is blocking.
// Otherwise returns 0 for non-blocking queries.
BlockingTimeout(maxQueryTime, defaultQueryTime time.Duration) time.Duration
}
var _ BlockableQuery = (*structs.QueryOptions)(nil)
var _ BlockableQuery = (*pbcommon.QueryOptions)(nil)
func (p *ConnPool) rpc(dc string, nodeName string, addr net.Addr, method string, args interface{}, reply interface{}) error {
p.once.Do(p.init)
@ -629,9 +625,20 @@ func (p *ConnPool) rpc(dc string, nodeName string, addr net.Addr, method string,
return fmt.Errorf("rpc error getting client: %w", err)
}
// Use the zero value if the request doesn't implement RPCInfo
if info, ok := args.(structs.RPCInfo); ok {
sc.stream.FirstReadTimeout = info.Timeout(p.Timeout, p.MaxQueryTime, p.DefaultQueryTime)
var deadline time.Time
timeout := p.RPCClientTimeout()
if bq, ok := args.(BlockableQuery); ok {
blockingTimeout := bq.BlockingTimeout(p.MaxQueryTime, p.DefaultQueryTime)
if blockingTimeout > 0 {
// override the default client timeout
timeout = blockingTimeout
}
}
if timeout > 0 {
deadline = time.Now().Add(timeout)
}
if err := sc.stream.SetReadDeadline(deadline); err != nil {
return fmt.Errorf("rpc error setting read deadline: %w", err)
}
// Make the RPC call

View File

@ -8,7 +8,6 @@ import (
"time"
"github.com/armon/go-metrics/prometheus"
"github.com/hashicorp/consul/agent/hcp"
"github.com/hashicorp/go-hclog"
"google.golang.org/grpc/grpclog"
@ -24,6 +23,7 @@ import (
grpcInt "github.com/hashicorp/consul/agent/grpc-internal"
"github.com/hashicorp/consul/agent/grpc-internal/resolver"
grpcWare "github.com/hashicorp/consul/agent/grpc-middleware"
"github.com/hashicorp/consul/agent/hcp"
"github.com/hashicorp/consul/agent/local"
"github.com/hashicorp/consul/agent/pool"
"github.com/hashicorp/consul/agent/router"
@ -181,10 +181,10 @@ func newConnPool(config *config.RuntimeConfig, logger hclog.Logger, tls *tlsutil
Logger: logger.StandardLogger(&hclog.StandardLoggerOptions{InferLevels: true}),
TLSConfigurator: tls,
Datacenter: config.Datacenter,
Timeout: config.RPCHoldTimeout,
MaxQueryTime: config.MaxQueryTime,
DefaultQueryTime: config.DefaultQueryTime,
}
pool.SetRPCClientTimeout(config.RPCClientTimeout)
if config.ServerMode {
pool.MaxTime = 2 * time.Minute
pool.MaxStreams = 64

View File

@ -245,7 +245,6 @@ type RPCInfo interface {
TokenSecret() string
SetTokenSecret(string)
HasTimedOut(since time.Time, rpcHoldTimeout, maxQueryTime, defaultQueryTime time.Duration) (bool, error)
Timeout(rpcHoldTimeout, maxQueryTime, defaultQueryTime time.Duration) time.Duration
}
// QueryOptions is used to specify various flags for read queries
@ -344,7 +343,8 @@ func (q *QueryOptions) SetTokenSecret(s string) {
q.Token = s
}
func (q QueryOptions) Timeout(rpcHoldTimeout, maxQueryTime, defaultQueryTime time.Duration) time.Duration {
// BlockingTimeout implements pool.BlockableQuery
func (q QueryOptions) BlockingTimeout(maxQueryTime, defaultQueryTime time.Duration) time.Duration {
// Match logic in Server.blockingQuery.
if q.MinQueryIndex > 0 {
if q.MaxQueryTime > maxQueryTime {
@ -355,13 +355,15 @@ func (q QueryOptions) Timeout(rpcHoldTimeout, maxQueryTime, defaultQueryTime tim
// Timeout after maximum jitter has elapsed.
q.MaxQueryTime += q.MaxQueryTime / JitterFraction
return q.MaxQueryTime + rpcHoldTimeout
return q.MaxQueryTime
}
return rpcHoldTimeout
return 0
}
func (q QueryOptions) HasTimedOut(start time.Time, rpcHoldTimeout, maxQueryTime, defaultQueryTime time.Duration) (bool, error) {
return time.Since(start) > q.Timeout(rpcHoldTimeout, maxQueryTime, defaultQueryTime), nil
// In addition to BlockingTimeout, allow for an additional rpcHoldTimeout buffer
// in case we need to wait for a leader election.
return time.Since(start) > rpcHoldTimeout+q.BlockingTimeout(maxQueryTime, defaultQueryTime), nil
}
type WriteRequest struct {
@ -387,12 +389,8 @@ func (w *WriteRequest) SetTokenSecret(s string) {
w.Token = s
}
func (w WriteRequest) HasTimedOut(start time.Time, rpcHoldTimeout, maxQueryTime, defaultQueryTime time.Duration) (bool, error) {
return time.Since(start) > w.Timeout(rpcHoldTimeout, maxQueryTime, defaultQueryTime), nil
}
func (w WriteRequest) Timeout(rpcHoldTimeout, maxQueryTime, defaultQueryTime time.Duration) time.Duration {
return rpcHoldTimeout
func (w WriteRequest) HasTimedOut(start time.Time, rpcHoldTimeout, _, _ time.Duration) (bool, error) {
return time.Since(start) > rpcHoldTimeout, nil
}
type QueryBackend int

View File

@ -385,14 +385,6 @@ func (msg *%[1]s) HasTimedOut(start time.Time, rpcHoldTimeout time.Duration, a t
return msg.%[2]s.HasTimedOut(start, rpcHoldTimeout, a, b)
}
// Timeout implements structs.RPCInfo
func (msg *%[1]s) Timeout(rpcHoldTimeout time.Duration, a time.Duration, b time.Duration) time.Duration {
if msg == nil || msg.%[2]s == nil {
return 0
}
return msg.%[2]s.Timeout(rpcHoldTimeout, a, b)
}
// IsRead implements structs.RPCInfo
func (msg *%[1]s) IsRead() bool {
return false
@ -441,14 +433,6 @@ func (msg *%[1]s) HasTimedOut(start time.Time, rpcHoldTimeout time.Duration, a t
return msg.%[2]s.HasTimedOut(start, rpcHoldTimeout, a, b)
}
// Timeout implements structs.RPCInfo
func (msg *%[1]s) Timeout(rpcHoldTimeout time.Duration, a time.Duration, b time.Duration) time.Duration {
if msg == nil || msg.%[2]s == nil {
return 0
}
return msg.%[2]s.Timeout(rpcHoldTimeout, a, b)
}
// SetTokenSecret implements structs.RPCInfo
func (msg *%[1]s) SetTokenSecret(s string) {
// TODO: initialize if nil
@ -494,12 +478,6 @@ func (msg *%[1]s) HasTimedOut(start time.Time, rpcHoldTimeout time.Duration, a t
return time.Since(start) > rpcHoldTimeout, nil
}
// Timeout implements structs.RPCInfo
func (msg *%[1]s) Timeout(rpcHoldTimeout time.Duration, a time.Duration, b time.Duration) time.Duration {
// TODO(peering): figure out read semantics here
return rpcHoldTimeout
}
// SetTokenSecret implements structs.RPCInfo
func (msg *%[1]s) SetTokenSecret(s string) {
// TODO(peering): figure out read semantics here

View File

@ -22,10 +22,6 @@ func (req *AutoConfigRequest) SetTokenSecret(token string) {
req.ConsulToken = token
}
func (req *AutoConfigRequest) HasTimedOut(start time.Time, rpcHoldTimeout, maxQueryTime, defaultQueryTime time.Duration) (bool, error) {
return time.Since(start) > req.Timeout(rpcHoldTimeout, maxQueryTime, defaultQueryTime), nil
}
func (req *AutoConfigRequest) Timeout(rpcHoldTimeout, maxQueryTime, defaultQueryTime time.Duration) time.Duration {
return rpcHoldTimeout
func (req *AutoConfigRequest) HasTimedOut(start time.Time, rpcHoldTimeout, _, _ time.Duration) (bool, error) {
return time.Since(start) > rpcHoldTimeout, nil
}

View File

@ -75,16 +75,19 @@ func (q *QueryOptions) SetStaleIfError(staleIfError time.Duration) {
}
func (q *QueryOptions) HasTimedOut(start time.Time, rpcHoldTimeout, maxQueryTime, defaultQueryTime time.Duration) (bool, error) {
return time.Since(start) > q.Timeout(rpcHoldTimeout, maxQueryTime, defaultQueryTime), nil
// In addition to BlockingTimeout, allow for an additional rpcHoldTimeout buffer
// in case we need to wait for a leader election.
return time.Since(start) > rpcHoldTimeout+q.BlockingTimeout(maxQueryTime, defaultQueryTime), nil
}
func (q *QueryOptions) Timeout(rpcHoldTimeout, maxQueryTime, defaultQueryTime time.Duration) time.Duration {
// BlockingTimeout implements pool.BlockableQuery
func (q *QueryOptions) BlockingTimeout(maxQueryTime, defaultQueryTime time.Duration) time.Duration {
maxTime := structs.DurationFromProto(q.MaxQueryTime)
o := structs.QueryOptions{
MaxQueryTime: maxTime,
MinQueryIndex: q.MinQueryIndex,
}
return o.Timeout(rpcHoldTimeout, maxQueryTime, defaultQueryTime)
return o.BlockingTimeout(maxQueryTime, defaultQueryTime)
}
// SetFilter is needed to implement the structs.QueryOptionsCompat interface
@ -118,12 +121,7 @@ func (w *WriteRequest) AllowStaleRead() bool {
// HasTimedOut implements structs.RPCInfo
func (w *WriteRequest) HasTimedOut(start time.Time, rpcHoldTimeout, maxQueryTime, defaultQueryTime time.Duration) (bool, error) {
return time.Since(start) > w.Timeout(rpcHoldTimeout, maxQueryTime, defaultQueryTime), nil
}
// Timeout implements structs.RPCInfo
func (w *WriteRequest) Timeout(rpcHoldTimeout, _, _ time.Duration) time.Duration {
return rpcHoldTimeout
return time.Since(start) > rpcHoldTimeout, nil
}
// IsRead implements structs.RPCInfo
@ -148,13 +146,8 @@ func (r *ReadRequest) SetTokenSecret(token string) {
}
// HasTimedOut implements structs.RPCInfo
func (r *ReadRequest) HasTimedOut(start time.Time, rpcHoldTimeout, maxQueryTime, defaultQueryTime time.Duration) (bool, error) {
return time.Since(start) > r.Timeout(rpcHoldTimeout, maxQueryTime, defaultQueryTime), nil
}
// Timeout implements structs.RPCInfo
func (r *ReadRequest) Timeout(rpcHoldTimeout, _, _ time.Duration) time.Duration {
return rpcHoldTimeout
func (r *ReadRequest) HasTimedOut(start time.Time, rpcHoldTimeout, _, _ time.Duration) (bool, error) {
return time.Since(start) > rpcHoldTimeout, nil
}
// RequestDatacenter implements structs.RPCInfo

View File

@ -33,12 +33,6 @@ func (msg *PeeringReadRequest) HasTimedOut(start time.Time, rpcHoldTimeout time.
return time.Since(start) > rpcHoldTimeout, nil
}
// Timeout implements structs.RPCInfo
func (msg *PeeringReadRequest) Timeout(rpcHoldTimeout time.Duration, a time.Duration, b time.Duration) time.Duration {
// TODO(peering): figure out read semantics here
return rpcHoldTimeout
}
// SetTokenSecret implements structs.RPCInfo
func (msg *PeeringReadRequest) SetTokenSecret(s string) {
// TODO(peering): figure out read semantics here
@ -77,12 +71,6 @@ func (msg *PeeringListRequest) HasTimedOut(start time.Time, rpcHoldTimeout time.
return time.Since(start) > rpcHoldTimeout, nil
}
// Timeout implements structs.RPCInfo
func (msg *PeeringListRequest) Timeout(rpcHoldTimeout time.Duration, a time.Duration, b time.Duration) time.Duration {
// TODO(peering): figure out read semantics here
return rpcHoldTimeout
}
// SetTokenSecret implements structs.RPCInfo
func (msg *PeeringListRequest) SetTokenSecret(s string) {
// TODO(peering): figure out read semantics here

View File

@ -32,13 +32,8 @@ func (req *SubscribeRequest) SetTokenSecret(token string) {
}
// HasTimedOut implements structs.RPCInfo
func (req *SubscribeRequest) HasTimedOut(start time.Time, rpcHoldTimeout, maxQueryTime, defaultQueryTime time.Duration) (bool, error) {
return time.Since(start) > req.Timeout(rpcHoldTimeout, maxQueryTime, defaultQueryTime), nil
}
// Timeout implements structs.RPCInfo
func (req *SubscribeRequest) Timeout(rpcHoldTimeout, maxQueryTime, defaultQueryTime time.Duration) time.Duration {
return rpcHoldTimeout
func (req *SubscribeRequest) HasTimedOut(start time.Time, rpcHoldTimeout, _, _ time.Duration) (bool, error) {
return time.Since(start) > rpcHoldTimeout, nil
}
// EnterpriseMeta returns the EnterpriseMeta encoded in the request's Subject.

View File

@ -541,6 +541,7 @@ Valid time units are 'ns', 'us' (or 'µs'), 'ms', 's', 'm', 'h'."
- `http_max_conns_per_client` - Configures a limit of how many concurrent TCP connections a single client IP address is allowed to open to the agent's HTTP(S) server. This affects the HTTP(S) servers in both client and server agents. Default value is `200`.
- `https_handshake_timeout` - Configures the limit for how long the HTTPS server in both client and server agents will wait for a client to complete a TLS handshake. This should be kept conservative as it limits how many connections an unauthenticated attacker can open if `verify_incoming` is being using to authenticate clients (strongly recommended in production). Default value is `5s`.
- `rpc_handshake_timeout` - Configures the limit for how long servers will wait after a client TCP connection is established before they complete the connection handshake. When TLS is used, the same timeout applies to the TLS handshake separately from the initial protocol negotiation. All Consul clients should perform this immediately on establishing a new connection. This should be kept conservative as it limits how many connections an unauthenticated attacker can open if `verify_incoming` is being using to authenticate clients (strongly recommended in production). When `verify_incoming` is true on servers, this limits how long the connection socket and associated goroutines will be held open before the client successfully authenticates. Default value is `5s`.
- `rpc_client_timeout` - Configures the limit for how long a client is allowed to read from an RPC connection. This is used to set an upper bound for calls to eventually terminate so that RPC connections are not held indefinitely. Blocking queries can override this timeout. Default is `60s`.
- `rpc_max_conns_per_client` - Configures a limit of how many concurrent TCP connections a single source IP address is allowed to open to a single server. It affects both clients connections and other server connections. In general Consul clients multiplex many RPC calls over a single TCP connection so this can typically be kept low. It needs to be more than one though since servers open at least one additional connection for raft RPC, possibly more for WAN federation when using network areas, and snapshot requests from clients run over a separate TCP conn. A reasonably low limit significantly reduces the ability of an unauthenticated attacker to consume unbounded resources by holding open many connections. You may need to increase this if WAN federated servers connect via proxies or NAT gateways or similar causing many legitimate connections from a single source IP. Default value is `100` which is designed to be extremely conservative to limit issues with certain deployment patterns. Most deployments can probably reduce this safely. 100 connections on modern server hardware should not cause a significant impact on resource usage from an unauthenticated attacker though.
- `rpc_rate` - Configures the RPC rate limiter on Consul _clients_ by setting the maximum request rate that this agent is allowed to make for RPC requests to Consul servers, in requests per second. Defaults to infinite, which disables rate limiting.
- `rpc_max_burst` - The size of the token bucket used to recharge the RPC rate limiter on Consul _clients_. Defaults to 1000 tokens, and each token is good for a single RPC call to a Consul server. See https://en.wikipedia.org/wiki/Token_bucket for more details about how token bucket rate limiters operate.