From 45ffdc360e7f1767f884a6bb7a17dc44a525b535 Mon Sep 17 00:00:00 2001 From: Will Jordan Date: Thu, 21 Apr 2022 13:21:35 -0700 Subject: [PATCH] Add timeout to Client RPC calls (#11500) Adds a timeout (deadline) to client RPC calls, so that streams will no longer hang indefinitely in unstable network conditions. Co-authored-by: kisunji --- .changelog/11500.txt | 4 ++ agent/consul/catalog_endpoint_test.go | 1 + agent/consul/client.go | 18 +++-- agent/consul/client_test.go | 84 ++++++++++++++++++++--- agent/consul/rpc_test.go | 4 ++ agent/consul/server_test.go | 2 +- agent/pool/pool.go | 54 +++++++++++++-- agent/setup.go | 13 ++-- agent/structs/structs.go | 25 ++++--- internal/tools/proto-gen-rpc-glue/main.go | 25 +++++++ proto/pbautoconf/auto_config.go | 6 +- proto/pbcommon/common.go | 22 ++++-- proto/pbsubscribe/subscribe.go | 7 +- 13 files changed, 226 insertions(+), 39 deletions(-) create mode 100644 .changelog/11500.txt diff --git a/.changelog/11500.txt b/.changelog/11500.txt new file mode 100644 index 000000000..b19fe8a52 --- /dev/null +++ b/.changelog/11500.txt @@ -0,0 +1,4 @@ +```release-note:bugfix +rpc: Adds a deadline to client RPC calls, so that streams will no longer hang +indefinitely in unstable network conditions. [[GH-8504](https://github.com/hashicorp/consul/issues/8504)] +``` \ No newline at end of file diff --git a/agent/consul/catalog_endpoint_test.go b/agent/consul/catalog_endpoint_test.go index d6d303c2b..78ac4c36b 100644 --- a/agent/consul/catalog_endpoint_test.go +++ b/agent/consul/catalog_endpoint_test.go @@ -1650,6 +1650,7 @@ func TestCatalog_ListServices_Stale(t *testing.T) { c.PrimaryDatacenter = "dc1" // Enable ACLs! c.ACLsEnabled = true c.Bootstrap = false // Disable bootstrap + c.RPCHoldTimeout = 10 * time.Millisecond }) defer os.RemoveAll(dir2) defer s2.Shutdown() diff --git a/agent/consul/client.go b/agent/consul/client.go index 6a15acb94..7ce00af33 100644 --- a/agent/consul/client.go +++ b/agent/consul/client.go @@ -291,20 +291,26 @@ TRY: } // Move off to another server, and see if we can retry. - c.logger.Error("RPC failed to server", - "method", method, - "server", server.Addr, - "error", rpcErr, - ) - metrics.IncrCounterWithLabels([]string{"client", "rpc", "failed"}, 1, []metrics.Label{{Name: "server", Value: server.Name}}) manager.NotifyFailedServer(server) // Use the zero value for RPCInfo if the request doesn't implement RPCInfo info, _ := args.(structs.RPCInfo) if retry := canRetry(info, rpcErr, firstCheck, c.config); !retry { + c.logger.Error("RPC failed to server", + "method", method, + "server", server.Addr, + "error", rpcErr, + ) + metrics.IncrCounterWithLabels([]string{"client", "rpc", "failed"}, 1, []metrics.Label{{Name: "server", Value: server.Name}}) return rpcErr } + c.logger.Warn("Retrying RPC to server", + "method", method, + "server", server.Addr, + "error", rpcErr, + ) + // We can wait a bit and retry! jitter := lib.RandomStagger(c.config.RPCHoldTimeout / structs.JitterFraction) select { diff --git a/agent/consul/client_test.go b/agent/consul/client_test.go index d593f5aa9..5c35e3f33 100644 --- a/agent/consul/client_test.go +++ b/agent/consul/client_test.go @@ -48,6 +48,7 @@ func testClientConfig(t *testing.T) (string, *Config) { config.SerfLANConfig.MemberlistConfig.ProbeTimeout = 200 * time.Millisecond config.SerfLANConfig.MemberlistConfig.ProbeInterval = time.Second config.SerfLANConfig.MemberlistConfig.GossipInterval = 100 * time.Millisecond + config.RPCHoldTimeout = 10 * time.Second return dir, config } @@ -72,7 +73,7 @@ func testClientWithConfigWithErr(t *testing.T, cb func(c *Config)) (string, *Cli } // Apply config to copied fields because many tests only set the old - //values. + // values. config.ACLResolverSettings.ACLsEnabled = config.ACLsEnabled config.ACLResolverSettings.NodeName = config.NodeName config.ACLResolverSettings.Datacenter = config.Datacenter @@ -521,13 +522,16 @@ func newDefaultDeps(t *testing.T, c *Config) Deps { resolver.Register(builder) connPool := &pool.ConnPool{ - Server: false, - SrcAddr: c.RPCSrcAddr, - Logger: logger.StandardLogger(&hclog.StandardLoggerOptions{InferLevels: true}), - MaxTime: 2 * time.Minute, - MaxStreams: 4, - TLSConfigurator: tls, - Datacenter: c.Datacenter, + Server: false, + SrcAddr: c.RPCSrcAddr, + Logger: logger.StandardLogger(&hclog.StandardLoggerOptions{InferLevels: true}), + MaxTime: 2 * time.Minute, + MaxStreams: 4, + TLSConfigurator: tls, + Datacenter: c.Datacenter, + Timeout: c.RPCHoldTimeout, + DefaultQueryTime: c.DefaultQueryTime, + MaxQueryTime: c.MaxQueryTime, } return Deps{ @@ -853,3 +857,67 @@ func TestClient_ShortReconnectTimeout(t *testing.T) { 50*time.Millisecond, "The client node was not reaped within the alotted time") } + +type waiter struct { + duration time.Duration +} + +func (w *waiter) Wait(struct{}, *struct{}) error { + time.Sleep(w.duration) + return nil +} + +func TestClient_RPC_Timeout(t *testing.T) { + if testing.Short() { + t.Skip("too slow for testing.Short") + } + t.Parallel() + + _, s1 := testServerWithConfig(t) + + _, c1 := testClientWithConfig(t, func(c *Config) { + c.Datacenter = "dc1" + c.NodeName = uniqueNodeName(t.Name()) + c.RPCHoldTimeout = 10 * time.Millisecond + c.DefaultQueryTime = 100 * time.Millisecond + c.MaxQueryTime = 200 * time.Millisecond + }) + joinLAN(t, c1, s1) + + retry.Run(t, func(r *retry.R) { + var out struct{} + if err := c1.RPC("Status.Ping", struct{}{}, &out); err != nil { + r.Fatalf("err: %v", err) + } + }) + + // waiter will sleep for 50ms + require.NoError(t, s1.RegisterEndpoint("Wait", &waiter{duration: 50 * time.Millisecond})) + + // Requests with QueryOptions have a default timeout of RPCHoldTimeout (10ms) + // so we expect the RPC call to timeout. + var out struct{} + err := c1.RPC("Wait.Wait", &structs.NodeSpecificRequest{}, &out) + require.Error(t, err) + require.Contains(t, err.Error(), "rpc error making call: i/o deadline reached") + + // Blocking requests have a longer timeout (100ms) so this should pass + out = struct{}{} + err = c1.RPC("Wait.Wait", &structs.NodeSpecificRequest{ + QueryOptions: structs.QueryOptions{ + MinQueryIndex: 1, + }, + }, &out) + require.NoError(t, err) + + // We pass in a custom MaxQueryTime (20ms) through QueryOptions which should fail + out = struct{}{} + err = c1.RPC("Wait.Wait", &structs.NodeSpecificRequest{ + QueryOptions: structs.QueryOptions{ + MinQueryIndex: 1, + MaxQueryTime: 20 * time.Millisecond, + }, + }, &out) + require.Error(t, err) + require.Contains(t, err.Error(), "rpc error making call: i/o deadline reached") +} diff --git a/agent/consul/rpc_test.go b/agent/consul/rpc_test.go index 5e1323a1e..4103be746 100644 --- a/agent/consul/rpc_test.go +++ b/agent/consul/rpc_test.go @@ -1374,6 +1374,10 @@ func (r isReadRequest) HasTimedOut(since time.Time, rpcHoldTimeout, maxQueryTime return false, nil } +func (r isReadRequest) Timeout(rpcHoldTimeout, maxQueryTime, defaultQueryTime time.Duration) time.Duration { + return time.Duration(-1) +} + func TestRPC_AuthorizeRaftRPC(t *testing.T) { caPEM, caPK, err := tlsutil.GenerateCA(tlsutil.CAOpts{Days: 5, Domain: "consul"}) require.NoError(t, err) diff --git a/agent/consul/server_test.go b/agent/consul/server_test.go index 8e693afab..31972b5cc 100644 --- a/agent/consul/server_test.go +++ b/agent/consul/server_test.go @@ -165,7 +165,7 @@ func testServerConfig(t *testing.T) (string, *Config) { // TODO (slackpad) - We should be able to run all tests w/o this, but it // looks like several depend on it. - config.RPCHoldTimeout = 5 * time.Second + config.RPCHoldTimeout = 10 * time.Second config.ConnectEnabled = true config.CAConfig = &structs.CAConfiguration{ diff --git a/agent/pool/pool.go b/agent/pool/pool.go index 179565dcf..acfe73065 100644 --- a/agent/pool/pool.go +++ b/agent/pool/pool.go @@ -31,7 +31,7 @@ type muxSession interface { // streamClient is used to wrap a stream with an RPC client type StreamClient struct { - stream net.Conn + stream *TimeoutConn codec rpc.ClientCodec } @@ -56,6 +56,36 @@ type Conn struct { clientLock sync.Mutex } +// TimeoutConn wraps net.Conn with a read timeout. +// When set, FirstReadTimeout only applies to the very next Read. +// DefaultTimeout is used for any other Read. +type TimeoutConn struct { + net.Conn + DefaultTimeout time.Duration + FirstReadTimeout time.Duration +} + +func (c *TimeoutConn) Read(b []byte) (int, error) { + timeout := c.DefaultTimeout + // Apply timeout to first read then zero it out + if c.FirstReadTimeout > 0 { + timeout = c.FirstReadTimeout + c.FirstReadTimeout = 0 + } + var deadline time.Time + if timeout > 0 { + deadline = time.Now().Add(timeout) + } + if err := c.Conn.SetReadDeadline(deadline); err != nil { + return 0, err + } + return c.Conn.Read(b) +} + +func (c *TimeoutConn) Write(b []byte) (int, error) { + return c.Conn.Write(b) +} + func (c *Conn) Close() error { return c.session.Close() } @@ -79,12 +109,14 @@ func (c *Conn) getClient() (*StreamClient, error) { return nil, err } + timeoutStream := &TimeoutConn{Conn: stream, DefaultTimeout: c.pool.Timeout} + // Create the RPC client - codec := msgpackrpc.NewCodecFromHandle(true, true, stream, structs.MsgpackHandle) + codec := msgpackrpc.NewCodecFromHandle(true, true, timeoutStream, structs.MsgpackHandle) // Return a new stream client sc := &StreamClient{ - stream: stream, + stream: timeoutStream, codec: codec, } return sc, nil @@ -101,7 +133,7 @@ func (c *Conn) returnClient(client *StreamClient) { // If this is a Yamux stream, shrink the internal buffers so that // we can GC the idle memory - if ys, ok := client.stream.(*yamux.Stream); ok { + if ys, ok := client.stream.Conn.(*yamux.Stream); ok { ys.Shrink() } } @@ -133,6 +165,13 @@ type ConnPool struct { // TODO: consider refactoring to accept a full yamux.Config instead of a logger Logger *log.Logger + // The default timeout for stream reads/writes + Timeout time.Duration + + // Used for calculating timeouts on RPC requests + MaxQueryTime time.Duration + DefaultQueryTime time.Duration + // The maximum time to keep a connection open MaxTime time.Duration @@ -325,7 +364,7 @@ func (p *ConnPool) dial( tlsRPCType RPCType, ) (net.Conn, HalfCloser, error) { // Try to dial the conn - d := &net.Dialer{LocalAddr: p.SrcAddr, Timeout: DefaultDialTimeout} + d := &net.Dialer{LocalAddr: p.SrcAddr, Timeout: p.Timeout} conn, err := d.Dial("tcp", addr.String()) if err != nil { return nil, nil, err @@ -590,6 +629,11 @@ func (p *ConnPool) rpc(dc string, nodeName string, addr net.Addr, method string, return fmt.Errorf("rpc error getting client: %w", err) } + // Use the zero value if the request doesn't implement RPCInfo + if info, ok := args.(structs.RPCInfo); ok { + sc.stream.FirstReadTimeout = info.Timeout(p.Timeout, p.MaxQueryTime, p.DefaultQueryTime) + } + // Make the RPC call err = msgpackrpc.CallWithCodec(sc.codec, method, args, reply) if err != nil { diff --git a/agent/setup.go b/agent/setup.go index 322f170b2..bbe54ae06 100644 --- a/agent/setup.go +++ b/agent/setup.go @@ -169,11 +169,14 @@ func newConnPool(config *config.RuntimeConfig, logger hclog.Logger, tls *tlsutil } pool := &pool.ConnPool{ - Server: config.ServerMode, - SrcAddr: rpcSrcAddr, - Logger: logger.StandardLogger(&hclog.StandardLoggerOptions{InferLevels: true}), - TLSConfigurator: tls, - Datacenter: config.Datacenter, + Server: config.ServerMode, + SrcAddr: rpcSrcAddr, + Logger: logger.StandardLogger(&hclog.StandardLoggerOptions{InferLevels: true}), + TLSConfigurator: tls, + Datacenter: config.Datacenter, + Timeout: config.RPCHoldTimeout, + MaxQueryTime: config.MaxQueryTime, + DefaultQueryTime: config.DefaultQueryTime, } if config.ServerMode { pool.MaxTime = 2 * time.Minute diff --git a/agent/structs/structs.go b/agent/structs/structs.go index 765c039e7..7a06aeeda 100644 --- a/agent/structs/structs.go +++ b/agent/structs/structs.go @@ -18,14 +18,12 @@ import ( "github.com/golang/protobuf/ptypes/timestamp" "github.com/golang/protobuf/proto" + ptypes "github.com/golang/protobuf/ptypes" + "github.com/hashicorp/consul-net-rpc/go-msgpack/codec" "github.com/hashicorp/go-multierror" "github.com/hashicorp/serf/coordinate" "github.com/mitchellh/hashstructure" - "github.com/hashicorp/consul-net-rpc/go-msgpack/codec" - - ptypes "github.com/golang/protobuf/ptypes" - "github.com/hashicorp/consul/acl" "github.com/hashicorp/consul/agent/cache" "github.com/hashicorp/consul/api" @@ -217,6 +215,7 @@ type RPCInfo interface { TokenSecret() string SetTokenSecret(string) HasTimedOut(since time.Time, rpcHoldTimeout, maxQueryTime, defaultQueryTime time.Duration) (bool, error) + Timeout(rpcHoldTimeout, maxQueryTime, defaultQueryTime time.Duration) time.Duration } // QueryOptions is used to specify various flags for read queries @@ -315,18 +314,24 @@ func (q *QueryOptions) SetTokenSecret(s string) { q.Token = s } -func (q QueryOptions) HasTimedOut(start time.Time, rpcHoldTimeout, maxQueryTime, defaultQueryTime time.Duration) (bool, error) { +func (q QueryOptions) Timeout(rpcHoldTimeout, maxQueryTime, defaultQueryTime time.Duration) time.Duration { + // Match logic in Server.blockingQuery. if q.MinQueryIndex > 0 { if q.MaxQueryTime > maxQueryTime { q.MaxQueryTime = maxQueryTime } else if q.MaxQueryTime <= 0 { q.MaxQueryTime = defaultQueryTime } + // Timeout after maximum jitter has elapsed. q.MaxQueryTime += lib.RandomStagger(q.MaxQueryTime / JitterFraction) - return time.Since(start) > (q.MaxQueryTime + rpcHoldTimeout), nil + return q.MaxQueryTime + rpcHoldTimeout } - return time.Since(start) > rpcHoldTimeout, nil + return rpcHoldTimeout +} + +func (q QueryOptions) HasTimedOut(start time.Time, rpcHoldTimeout, maxQueryTime, defaultQueryTime time.Duration) (bool, error) { + return time.Since(start) > q.Timeout(rpcHoldTimeout, maxQueryTime, defaultQueryTime), nil } type WriteRequest struct { @@ -353,7 +358,11 @@ func (w *WriteRequest) SetTokenSecret(s string) { } func (w WriteRequest) HasTimedOut(start time.Time, rpcHoldTimeout, maxQueryTime, defaultQueryTime time.Duration) (bool, error) { - return time.Since(start) > rpcHoldTimeout, nil + return time.Since(start) > w.Timeout(rpcHoldTimeout, maxQueryTime, defaultQueryTime), nil +} + +func (w WriteRequest) Timeout(rpcHoldTimeout, maxQueryTime, defaultQueryTime time.Duration) time.Duration { + return rpcHoldTimeout } type QueryBackend int diff --git a/internal/tools/proto-gen-rpc-glue/main.go b/internal/tools/proto-gen-rpc-glue/main.go index 0618b35b2..57e932f50 100644 --- a/internal/tools/proto-gen-rpc-glue/main.go +++ b/internal/tools/proto-gen-rpc-glue/main.go @@ -344,6 +344,14 @@ func (msg *%[1]s) HasTimedOut(start time.Time, rpcHoldTimeout time.Duration, a t return msg.%[2]s.HasTimedOut(start, rpcHoldTimeout, a, b) } +// Timeout implements structs.RPCInfo +func (msg *%[1]s) Timeout(rpcHoldTimeout time.Duration, a time.Duration, b time.Duration) time.Duration { + if msg == nil || msg.%[2]s == nil { + return 0 + } + return msg.%[2]s.Timeout(rpcHoldTimeout, a, b) +} + // IsRead implements structs.RPCInfo func (msg *%[1]s) IsRead() bool { return false @@ -392,6 +400,14 @@ func (msg *%[1]s) HasTimedOut(start time.Time, rpcHoldTimeout time.Duration, a t return msg.%[2]s.HasTimedOut(start, rpcHoldTimeout, a, b) } +// Timeout implements structs.RPCInfo +func (msg *%[1]s) Timeout(rpcHoldTimeout time.Duration, a time.Duration, b time.Duration) time.Duration { + if msg == nil || msg.%[2]s == nil { + return 0 + } + return msg.%[2]s.Timeout(rpcHoldTimeout, a, b) +} + // SetTokenSecret implements structs.RPCInfo func (msg *%[1]s) SetTokenSecret(s string) { // TODO: initialize if nil @@ -443,6 +459,15 @@ func (msg *%[1]s) HasTimedOut(start time.Time, rpcHoldTimeout time.Duration, a t } return msg.%[2]s.HasTimedOut(start, rpcHoldTimeout, a, b) } + +// Timeout implements structs.RPCInfo +func (msg *%[1]s) Timeout(rpcHoldTimeout time.Duration, a time.Duration, b time.Duration) time.Duration { + if msg == nil || msg.%[2]s == nil { + return 0 + } + return msg.%[2]s.Timeout(rpcHoldTimeout, a, b) +} + // SetTokenSecret implements structs.RPCInfo func (msg *%[1]s) SetTokenSecret(s string) { // TODO: initialize if nil diff --git a/proto/pbautoconf/auto_config.go b/proto/pbautoconf/auto_config.go index 96ef19cad..74a7cf4ab 100644 --- a/proto/pbautoconf/auto_config.go +++ b/proto/pbautoconf/auto_config.go @@ -23,5 +23,9 @@ func (req *AutoConfigRequest) SetTokenSecret(token string) { } func (req *AutoConfigRequest) HasTimedOut(start time.Time, rpcHoldTimeout, maxQueryTime, defaultQueryTime time.Duration) (bool, error) { - return time.Since(start) > rpcHoldTimeout, nil + return time.Since(start) > req.Timeout(rpcHoldTimeout, maxQueryTime, defaultQueryTime), nil +} + +func (req *AutoConfigRequest) Timeout(rpcHoldTimeout, maxQueryTime, defaultQueryTime time.Duration) time.Duration { + return rpcHoldTimeout } diff --git a/proto/pbcommon/common.go b/proto/pbcommon/common.go index 79b1592e5..faca038b6 100644 --- a/proto/pbcommon/common.go +++ b/proto/pbcommon/common.go @@ -75,12 +75,16 @@ func (q *QueryOptions) SetStaleIfError(staleIfError time.Duration) { } func (q *QueryOptions) HasTimedOut(start time.Time, rpcHoldTimeout, maxQueryTime, defaultQueryTime time.Duration) (bool, error) { + return time.Since(start) > q.Timeout(rpcHoldTimeout, maxQueryTime, defaultQueryTime), nil +} + +func (q *QueryOptions) Timeout(rpcHoldTimeout, maxQueryTime, defaultQueryTime time.Duration) time.Duration { maxTime := structs.DurationFromProto(q.MaxQueryTime) o := structs.QueryOptions{ MaxQueryTime: maxTime, MinQueryIndex: q.MinQueryIndex, } - return o.HasTimedOut(start, rpcHoldTimeout, maxQueryTime, defaultQueryTime) + return o.Timeout(rpcHoldTimeout, maxQueryTime, defaultQueryTime) } // SetFilter is needed to implement the structs.QueryOptionsCompat interface @@ -113,8 +117,13 @@ func (w *WriteRequest) AllowStaleRead() bool { } // HasTimedOut implements structs.RPCInfo -func (w *WriteRequest) HasTimedOut(start time.Time, rpcHoldTimeout, _, _ time.Duration) (bool, error) { - return time.Since(start) > rpcHoldTimeout, nil +func (w *WriteRequest) HasTimedOut(start time.Time, rpcHoldTimeout, maxQueryTime, defaultQueryTime time.Duration) (bool, error) { + return time.Since(start) > w.Timeout(rpcHoldTimeout, maxQueryTime, defaultQueryTime), nil +} + +// Timeout implements structs.RPCInfo +func (w *WriteRequest) Timeout(rpcHoldTimeout, _, _ time.Duration) time.Duration { + return rpcHoldTimeout } // IsRead implements structs.RPCInfo @@ -140,7 +149,12 @@ func (r *ReadRequest) SetTokenSecret(token string) { // HasTimedOut implements structs.RPCInfo func (r *ReadRequest) HasTimedOut(start time.Time, rpcHoldTimeout, maxQueryTime, defaultQueryTime time.Duration) (bool, error) { - return time.Since(start) > rpcHoldTimeout, nil + return time.Since(start) > r.Timeout(rpcHoldTimeout, maxQueryTime, defaultQueryTime), nil +} + +// Timeout implements structs.RPCInfo +func (r *ReadRequest) Timeout(rpcHoldTimeout, _, _ time.Duration) time.Duration { + return rpcHoldTimeout } // RequestDatacenter implements structs.RPCInfo diff --git a/proto/pbsubscribe/subscribe.go b/proto/pbsubscribe/subscribe.go index 961c3c1cd..05a34a0f0 100644 --- a/proto/pbsubscribe/subscribe.go +++ b/proto/pbsubscribe/subscribe.go @@ -29,5 +29,10 @@ func (req *SubscribeRequest) SetTokenSecret(token string) { // HasTimedOut implements structs.RPCInfo func (req *SubscribeRequest) HasTimedOut(start time.Time, rpcHoldTimeout, maxQueryTime, defaultQueryTime time.Duration) (bool, error) { - return time.Since(start) > rpcHoldTimeout, nil + return time.Since(start) > req.Timeout(rpcHoldTimeout, maxQueryTime, defaultQueryTime), nil +} + +// Timeout implements structs.RPCInfo +func (req *SubscribeRequest) Timeout(rpcHoldTimeout, maxQueryTime, defaultQueryTime time.Duration) time.Duration { + return rpcHoldTimeout }