From d1ad5383452db1be64178afd16ce19a4560b424a Mon Sep 17 00:00:00 2001 From: James Phillips Date: Tue, 10 Oct 2017 15:19:50 -0700 Subject: [PATCH] Makes RPC handling more robust when rolling servers. (#3561) * Adds client-side retry for no leader errors. This paves over the case where the client was connected to the leader when it loses leadership. * Adds a configurable server RPC drain time and a fail-fast path for RPCs. When a server leaves it gets removed from the Raft configuration, so it will never know who the new leader server ends up being. Without this we'd be doomed to wait out the RPC hold timeout and then fail. This makes things fail a little quicker while a sever is draining, and since we added a client retry AND since the server doing this has already shut down and left the Serf LAN, clients should retry against some other server. * Makes the RPC hold timeout configurable. * Reorders struct members. * Sets the RPC hold timeout default for test servers. * Bumps the leave drain time up to 5 seconds. * Robustifies retries with a simpler client-side RPC hold. * Reverts untended delete. --- agent/agent.go | 8 +++ agent/config/builder.go | 2 + agent/config/config.go | 4 +- agent/config/default.go | 2 + agent/config/runtime.go | 2 + agent/config/runtime_test.go | 10 +++- agent/consul/client.go | 34 +++++++++-- agent/consul/client_test.go | 69 +++++++++++++++++++++++ agent/consul/config.go | 10 ++-- agent/consul/rpc.go | 51 ++++++++++++----- agent/consul/server.go | 18 +++++- agent/consul/server_test.go | 22 +++++--- agent/pool/pool.go | 7 ++- agent/structs/errors.go | 8 ++- lib/eof.go | 27 +++++++++ website/source/docs/agent/options.html.md | 11 ++++ 16 files changed, 244 insertions(+), 41 deletions(-) create mode 100644 lib/eof.go diff --git a/agent/agent.go b/agent/agent.go index c63477670..2d232fe55 100644 --- a/agent/agent.go +++ b/agent/agent.go @@ -750,6 +750,14 @@ func (a *Agent) consulConfig() (*consul.Config, error) { base.RPCMaxBurst = a.config.RPCMaxBurst } + // RPC-related performance configs. + if a.config.RPCHoldTimeout > 0 { + base.RPCHoldTimeout = a.config.RPCHoldTimeout + } + if a.config.LeaveDrainTime > 0 { + base.LeaveDrainTime = a.config.LeaveDrainTime + } + // set the src address for outgoing rpc connections // Use port 0 so that outgoing connections use a random port. if !ipaddr.IsAny(base.RPCAddr.IP) { diff --git a/agent/config/builder.go b/agent/config/builder.go index ac6850c17..e665b53b5 100644 --- a/agent/config/builder.go +++ b/agent/config/builder.go @@ -587,6 +587,7 @@ func (b *Builder) Build() (rt RuntimeConfig, err error) { EncryptVerifyIncoming: b.boolVal(c.EncryptVerifyIncoming), EncryptVerifyOutgoing: b.boolVal(c.EncryptVerifyOutgoing), KeyFile: b.stringVal(c.KeyFile), + LeaveDrainTime: b.durationVal("performance.leave_drain_time", c.Performance.LeaveDrainTime), LeaveOnTerm: leaveOnTerm, LogLevel: b.stringVal(c.LogLevel), NodeID: types.NodeID(b.stringVal(c.NodeID)), @@ -596,6 +597,7 @@ func (b *Builder) Build() (rt RuntimeConfig, err error) { PidFile: b.stringVal(c.PidFile), RPCAdvertiseAddr: rpcAdvertiseAddr, RPCBindAddr: rpcBindAddr, + RPCHoldTimeout: b.durationVal("performance.rpc_hold_timeout", c.Performance.RPCHoldTimeout), RPCMaxBurst: b.intVal(c.Limits.RPCMaxBurst), RPCProtocol: b.intVal(c.RPCProtocol), RPCRateLimit: rate.Limit(b.float64Val(c.Limits.RPCRate)), diff --git a/agent/config/config.go b/agent/config/config.go index 19cd8ac84..1f18e0931 100644 --- a/agent/config/config.go +++ b/agent/config/config.go @@ -344,7 +344,9 @@ type HTTPConfig struct { } type Performance struct { - RaftMultiplier *int `json:"raft_multiplier,omitempty" hcl:"raft_multiplier" mapstructure:"raft_multiplier"` // todo(fs): validate as uint + LeaveDrainTime *string `json:"leave_drain_time,omitempty" hcl:"leave_drain_time" mapstructure:"leave_drain_time"` + RaftMultiplier *int `json:"raft_multiplier,omitempty" hcl:"raft_multiplier" mapstructure:"raft_multiplier"` // todo(fs): validate as uint + RPCHoldTimeout *string `json:"rpc_hold_timeout" hcl:"rpc_hold_timeout" mapstructure:"rpc_hold_timeout"` } type Telemetry struct { diff --git a/agent/config/default.go b/agent/config/default.go index 765232bbf..6bc3295c4 100644 --- a/agent/config/default.go +++ b/agent/config/default.go @@ -65,7 +65,9 @@ func DefaultSource() Source { rpc_max_burst = 1000 } performance = { + leave_drain_time = "5s" raft_multiplier = ` + strconv.Itoa(int(consul.DefaultRaftMultiplier)) + ` + rpc_hold_timeout = "7s" } ports = { dns = 8600 diff --git a/agent/config/runtime.go b/agent/config/runtime.go index ebbf88fe7..5fc2ab361 100644 --- a/agent/config/runtime.go +++ b/agent/config/runtime.go @@ -146,6 +146,7 @@ type RuntimeConfig struct { HTTPSAddrs []net.Addr HTTPSPort int KeyFile string + LeaveDrainTime time.Duration LeaveOnTerm bool LogLevel string NodeID types.NodeID @@ -154,6 +155,7 @@ type RuntimeConfig struct { PidFile string RPCAdvertiseAddr *net.TCPAddr RPCBindAddr *net.TCPAddr + RPCHoldTimeout time.Duration RPCMaxBurst int RPCProtocol int RPCRateLimit rate.Limit diff --git a/agent/config/runtime_test.go b/agent/config/runtime_test.go index ee6df1c49..c5699f052 100644 --- a/agent/config/runtime_test.go +++ b/agent/config/runtime_test.go @@ -2104,7 +2104,9 @@ func TestFullConfig(t *testing.T) { "node_name": "otlLxGaI", "non_voting_server": true, "performance": { - "raft_multiplier": 5 + "leave_drain_time": "8265s", + "raft_multiplier": 5, + "rpc_hold_timeout": "15707s" }, "pid_file": "43xN80Km", "ports": { @@ -2535,7 +2537,9 @@ func TestFullConfig(t *testing.T) { node_name = "otlLxGaI" non_voting_server = true performance { + leave_drain_time = "8265s" raft_multiplier = 5 + rpc_hold_timeout = "15707s" } pid_file = "43xN80Km" ports { @@ -3088,6 +3092,7 @@ func TestFullConfig(t *testing.T) { HTTPSAddrs: []net.Addr{tcpAddr("95.17.17.19:15127")}, HTTPSPort: 15127, KeyFile: "IEkkwgIA", + LeaveDrainTime: 8265 * time.Second, LeaveOnTerm: true, LogLevel: "k1zo9Spt", NodeID: types.NodeID("AsUIlw99"), @@ -3097,6 +3102,7 @@ func TestFullConfig(t *testing.T) { PidFile: "43xN80Km", RPCAdvertiseAddr: tcpAddr("17.99.29.16:3757"), RPCBindAddr: tcpAddr("16.99.34.17:3757"), + RPCHoldTimeout: 15707 * time.Second, RPCProtocol: 30793, RPCRateLimit: 12029.43, RPCMaxBurst: 44848, @@ -3765,6 +3771,7 @@ func TestSanitize(t *testing.T) { "HTTPSAddrs": [], "HTTPSPort": 0, "KeyFile": "hidden", + "LeaveDrainTime": "0s", "LeaveOnTerm": false, "LogLevel": "", "NodeID": "", @@ -3774,6 +3781,7 @@ func TestSanitize(t *testing.T) { "PidFile": "", "RPCAdvertiseAddr": "", "RPCBindAddr": "", + "RPCHoldTimeout": "0s", "RPCMaxBurst": 0, "RPCProtocol": 0, "RPCRateLimit": 0, diff --git a/agent/consul/client.go b/agent/consul/client.go index b8258bc2f..2e87b9ce3 100644 --- a/agent/consul/client.go +++ b/agent/consul/client.go @@ -233,6 +233,15 @@ func (c *Client) Encrypted() bool { // RPC is used to forward an RPC call to a consul server, or fail if no servers func (c *Client) RPC(method string, args interface{}, reply interface{}) error { + // This is subtle but we start measuring the time on the client side + // right at the time of the first request, vs. on the first retry as + // is done on the server side inside forward(). This is because the + // servers may already be applying the RPCHoldTimeout up there, so by + // starting the timer here we won't potentially double up the delay. + // TODO (slackpad) Plumb a deadline here with a context. + firstCheck := time.Now() + +TRY: server := c.routers.FindServer() if server == nil { return structs.ErrNoServers @@ -248,13 +257,28 @@ func (c *Client) RPC(method string, args interface{}, reply interface{}) error { } // Make the request. - if err := c.connPool.RPC(c.config.Datacenter, server.Addr, server.Version, method, server.UseTLS, args, reply); err != nil { - c.routers.NotifyFailedServer(server) - c.logger.Printf("[ERR] consul: RPC failed to server %s: %v", server.Addr, err) - return err + rpcErr := c.connPool.RPC(c.config.Datacenter, server.Addr, server.Version, method, server.UseTLS, args, reply) + if rpcErr == nil { + return nil } - return nil + // Move off to another server, and see if we can retry. + c.logger.Printf("[ERR] consul: %q RPC failed to server %s: %v", method, server.Addr, rpcErr) + c.routers.NotifyFailedServer(server) + if retry := canRetry(args, rpcErr); !retry { + return rpcErr + } + + // We can wait a bit and retry! + if time.Now().Sub(firstCheck) < c.config.RPCHoldTimeout { + jitter := lib.RandomStagger(c.config.RPCHoldTimeout / jitterFraction) + select { + case <-time.After(jitter): + goto TRY + case <-c.shutdownCh: + } + } + return rpcErr } // SnapshotRPC sends the snapshot request to one of the servers, reading from diff --git a/agent/consul/client_test.go b/agent/consul/client_test.go index 684ced39e..1958ba9fb 100644 --- a/agent/consul/client_test.go +++ b/agent/consul/client_test.go @@ -180,6 +180,75 @@ func TestClient_RPC(t *testing.T) { }) } +type leaderFailer struct { + totalCalls int + onceCalls int +} + +func (l *leaderFailer) Always(args struct{}, reply *struct{}) error { + l.totalCalls++ + return structs.ErrNoLeader +} + +func (l *leaderFailer) Once(args struct{}, reply *struct{}) error { + l.totalCalls++ + l.onceCalls++ + + switch { + case l.onceCalls == 1: + return structs.ErrNoLeader + + default: + return nil + } +} + +func TestClient_RPC_Retry(t *testing.T) { + t.Parallel() + + dir1, s1 := testServer(t) + defer os.RemoveAll(dir1) + defer s1.Shutdown() + + dir2, c1 := testClientWithConfig(t, func(c *Config) { + c.Datacenter = "dc1" + c.NodeName = uniqueNodeName(t.Name()) + c.RPCHoldTimeout = 2 * time.Second + }) + defer os.RemoveAll(dir2) + defer c1.Shutdown() + + joinLAN(t, c1, s1) + retry.Run(t, func(r *retry.R) { + var out struct{} + if err := c1.RPC("Status.Ping", struct{}{}, &out); err != nil { + r.Fatalf("err: %v", err) + } + }) + + failer := &leaderFailer{} + if err := s1.RegisterEndpoint("Fail", failer); err != nil { + t.Fatalf("err: %v", err) + } + + var out struct{} + if err := c1.RPC("Fail.Always", struct{}{}, &out); !structs.IsErrNoLeader(err) { + t.Fatalf("err: %v", err) + } + if got, want := failer.totalCalls, 2; got < want { + t.Fatalf("got %d want >= %d", got, want) + } + if err := c1.RPC("Fail.Once", struct{}{}, &out); err != nil { + t.Fatalf("err: %v", err) + } + if got, want := failer.onceCalls, 2; got < want { + t.Fatalf("got %d want >= %d", got, want) + } + if got, want := failer.totalCalls, 4; got < want { + t.Fatalf("got %d want >= %d", got, want) + } +} + func TestClient_RPC_Pool(t *testing.T) { t.Parallel() dir1, s1 := testServer(t) diff --git a/agent/consul/config.go b/agent/consul/config.go index 1b904593e..c37dd8d67 100644 --- a/agent/consul/config.go +++ b/agent/consul/config.go @@ -329,6 +329,10 @@ type Config struct { RPCRate rate.Limit RPCMaxBurst int + // LeaveDrainTime is used to wait after a server has left the LAN Serf + // pool for RPCs to drain and new requests to be sent to other servers. + LeaveDrainTime time.Duration + // AutopilotConfig is used to apply the initial autopilot config when // bootstrapping. AutopilotConfig *structs.AutopilotConfig @@ -406,12 +410,6 @@ func DefaultConfig() *Config { CoordinateUpdateBatchSize: 128, CoordinateUpdateMaxBatches: 5, - // This holds RPCs during leader elections. For the default Raft - // config the election timeout is 5 seconds, so we set this a - // bit longer to try to cover that period. This should be more - // than enough when running in the high performance mode. - RPCHoldTimeout: 7 * time.Second, - RPCRate: rate.Inf, RPCMaxBurst: 1000, diff --git a/agent/consul/rpc.go b/agent/consul/rpc.go index e08325891..9feed0199 100644 --- a/agent/consul/rpc.go +++ b/agent/consul/rpc.go @@ -177,6 +177,24 @@ func (s *Server) handleSnapshotConn(conn net.Conn) { }() } +// canRetry returns true if the given situation is safe for a retry. +func canRetry(args interface{}, err error) bool { + // No leader errors are always safe to retry since no state could have + // been changed. + if structs.IsErrNoLeader(err) { + return true + } + + // Reads are safe to retry for stream errors, such as if a server was + // being shut down. + info, ok := args.(structs.RPCInfo) + if ok && info.IsRead() && lib.IsErrEOF(err) { + return true + } + + return false +} + // forward is used to forward to a remote DC or to forward to the local leader // Returns a bool of if forwarding was performed, as well as any error func (s *Server) forward(method string, info structs.RPCInfo, args interface{}, reply interface{}) (bool, error) { @@ -195,8 +213,15 @@ func (s *Server) forward(method string, info structs.RPCInfo, args interface{}, } CHECK_LEADER: + // Fail fast if we are in the process of leaving + select { + case <-s.leaveCh: + return true, structs.ErrNoLeader + default: + } + // Find the leader - isLeader, remoteServer := s.getLeader() + isLeader, leader := s.getLeader() // Handle the case we are the leader if isLeader { @@ -204,11 +229,17 @@ CHECK_LEADER: } // Handle the case of a known leader - if remoteServer != nil { - err := s.forwardLeader(remoteServer, method, args, reply) - return true, err + rpcErr := structs.ErrNoLeader + if leader != nil { + rpcErr = s.connPool.RPC(s.config.Datacenter, leader.Addr, + leader.Version, method, leader.UseTLS, args, reply) + if rpcErr != nil && canRetry(info, rpcErr) { + goto RETRY + } + return true, rpcErr } +RETRY: // Gate the request until there is a leader if firstCheck.IsZero() { firstCheck = time.Now() @@ -218,12 +249,13 @@ CHECK_LEADER: select { case <-time.After(jitter): goto CHECK_LEADER + case <-s.leaveCh: case <-s.shutdownCh: } } // No leader found and hold time exceeded - return true, structs.ErrNoLeader + return true, rpcErr } // getLeader returns if the current node is the leader, and if not then it @@ -248,15 +280,6 @@ func (s *Server) getLeader() (bool, *metadata.Server) { return false, server } -// forwardLeader is used to forward an RPC call to the leader, or fail if no leader -func (s *Server) forwardLeader(server *metadata.Server, method string, args interface{}, reply interface{}) error { - // Handle a missing server - if server == nil { - return structs.ErrNoLeader - } - return s.connPool.RPC(s.config.Datacenter, server.Addr, server.Version, method, server.UseTLS, args, reply) -} - // forwardDC is used to forward an RPC call to a remote DC, or fail if no servers func (s *Server) forwardDC(method, dc string, args interface{}, reply interface{}) error { manager, server, ok := s.router.FindRoute(dc) diff --git a/agent/consul/server.go b/agent/consul/server.go index ac85326df..09c13a8e7 100644 --- a/agent/consul/server.go +++ b/agent/consul/server.go @@ -148,9 +148,16 @@ type Server struct { // updated reconcileCh chan serf.Member - // used to track when the server is ready to serve consistent reads, updated atomically + // readyForConsistentReads is used to track when the leader server is + // ready to serve consistent reads, after it has applied its initial + // barrier. This is updated atomically. readyForConsistentReads int32 + // leaveCh is used to signal that the server is leaving the cluster + // and trying to shed its RPC traffic onto other Consul servers. This + // is only ever closed. + leaveCh chan struct{} + // router is used to map out Consul servers in the WAN and in Consul // Enterprise user-defined areas. router *router.Router @@ -302,6 +309,7 @@ func NewServerLogger(config *Config, logger *log.Logger, tokens *token.Store) (* eventChLAN: make(chan serf.Event, 256), eventChWAN: make(chan serf.Event, 256), logger: logger, + leaveCh: make(chan struct{}), reconcileCh: make(chan serf.Member, 32), router: router.NewRouter(logger, config.Datacenter), rpcServer: rpc.NewServer(), @@ -783,6 +791,14 @@ func (s *Server) Leave() error { } } + // Start refusing RPCs now that we've left the LAN pool. It's important + // to do this *after* we've left the LAN pool so that clients will know + // to shift onto another server if they perform a retry. We also wake up + // all queries in the RPC retry state. + s.logger.Printf("[INFO] consul: Waiting %s to drain RPC traffic", s.config.LeaveDrainTime) + close(s.leaveCh) + time.Sleep(s.config.LeaveDrainTime) + // If we were not leader, wait to be safely removed from the cluster. We // must wait to allow the raft replication to take place, otherwise an // immediate shutdown could cause a loss of quorum. diff --git a/agent/consul/server_test.go b/agent/consul/server_test.go index 39b9644e8..be347fe56 100644 --- a/agent/consul/server_test.go +++ b/agent/consul/server_test.go @@ -93,6 +93,12 @@ func testServerConfig(t *testing.T) (string, *Config) { config.Build = "0.8.0" config.CoordinateUpdatePeriod = 100 * time.Millisecond + config.LeaveDrainTime = 1 * time.Millisecond + + // TODO (slackpad) - We should be able to run all tests w/o this, but it + // looks like several depend on it. + config.RPCHoldTimeout = 5 * time.Second + return dir, config } @@ -395,16 +401,16 @@ func TestServer_LeaveLeader(t *testing.T) { testrpc.WaitForLeader(t, s2.RPC, "dc1") // Issue a leave to the leader - var err error + var leader *Server switch { case s1.IsLeader(): - err = s1.Leave() + leader = s1 case s2.IsLeader(): - err = s2.Leave() + leader = s2 default: t.Fatal("no leader") } - if err != nil { + if err := leader.Leave(); err != nil { t.Fatal("leave failed: ", err) } @@ -433,16 +439,16 @@ func TestServer_Leave(t *testing.T) { testrpc.WaitForLeader(t, s2.RPC, "dc1") // Issue a leave to the non-leader - var err error + var nonleader *Server switch { case s1.IsLeader(): - err = s2.Leave() + nonleader = s2 case s2.IsLeader(): - err = s1.Leave() + nonleader = s1 default: t.Fatal("no leader") } - if err != nil { + if err := nonleader.Leave(); err != nil { t.Fatal("leave failed: ", err) } diff --git a/agent/pool/pool.go b/agent/pool/pool.go index 7fb55baeb..cdf6c6ec6 100644 --- a/agent/pool/pool.go +++ b/agent/pool/pool.go @@ -10,6 +10,7 @@ import ( "sync/atomic" "time" + "github.com/hashicorp/consul/lib" "github.com/hashicorp/consul/tlsutil" "github.com/hashicorp/net-rpc-msgpackrpc" "github.com/hashicorp/yamux" @@ -406,7 +407,7 @@ func (p *ConnPool) RPC(dc string, addr net.Addr, version int, method string, use // Get a usable client conn, sc, err := p.getClient(dc, addr, version, useTLS) if err != nil { - return fmt.Errorf("rpc error: %v", err) + return fmt.Errorf("rpc error getting client: %v", err) } // Make the RPC call @@ -418,12 +419,12 @@ func (p *ConnPool) RPC(dc string, addr net.Addr, version int, method string, use // about how we found this. The tldr is that if we see this // error, we know this connection is toast, so we should clear // it and make a new one on the next attempt. - if err == io.EOF { + if lib.IsErrEOF(err) { p.clearConn(conn) } p.releaseConn(conn) - return fmt.Errorf("rpc error: %v", err) + return fmt.Errorf("rpc error making call: %v", err) } // Done with the connection diff --git a/agent/structs/errors.go b/agent/structs/errors.go index fcf6dafe9..66337d2e4 100644 --- a/agent/structs/errors.go +++ b/agent/structs/errors.go @@ -23,6 +23,10 @@ var ( ErrRPCRateExceeded = errors.New(errRPCRateExceeded) ) -func IsErrRPCRateExceeded(err error) bool { - return strings.Contains(err.Error(), errRPCRateExceeded) +func IsErrNoLeader(err error) bool { + return err != nil && strings.Contains(err.Error(), errNoLeader) +} + +func IsErrRPCRateExceeded(err error) bool { + return err != nil && strings.Contains(err.Error(), errRPCRateExceeded) } diff --git a/lib/eof.go b/lib/eof.go new file mode 100644 index 000000000..f77844fd6 --- /dev/null +++ b/lib/eof.go @@ -0,0 +1,27 @@ +package lib + +import ( + "io" + "strings" + + "github.com/hashicorp/yamux" +) + +var yamuxStreamClosed = yamux.ErrStreamClosed.Error() +var yamuxSessionShutdown = yamux.ErrSessionShutdown.Error() + +// IsErrEOF returns true if we get an EOF error from the socket itself, or +// an EOF equivalent error from yamux. +func IsErrEOF(err error) bool { + if err == io.EOF { + return true + } + + errStr := err.Error() + if strings.Contains(errStr, yamuxStreamClosed) || + strings.Contains(errStr, yamuxSessionShutdown) { + return true + } + + return false +} diff --git a/website/source/docs/agent/options.html.md b/website/source/docs/agent/options.html.md index fb4ea5ab9..7eeaae208 100644 --- a/website/source/docs/agent/options.html.md +++ b/website/source/docs/agent/options.html.md @@ -958,6 +958,12 @@ Consul will not enable TLS for the HTTP API unless the `https` port has been ass Consul. See the [Server Performance](/docs/guides/performance.html) guide for more details. The following parameters are available: + * `leave_drain_time` - A duration + that a server will dwell during a graceful leave in order to allow requests to be retried against + other Consul servers. Under normal circumstances, this can prevent clients from experiencing + "no leader" errors when performing a rolling update of the Consul servers. This was added in + Consul 1.0. Must be a duration value such as 10s. Defaults to 5s. + * `raft_multiplier` - An integer multiplier used by Consul servers to scale key Raft timing parameters. Omitting this value or setting it to 0 uses default timing described below. Lower values are used to tighten @@ -975,6 +981,11 @@ Consul will not enable TLS for the HTTP API unless the `https` port has been ass See the note on [last contact](/docs/guides/performance.html#last-contact) timing for more details on tuning this parameter. The maximum allowed value is 10. + * `rpc_hold_timeout` - A duration + that a client or server will retry internal RPC requests during leader elections. Under normal + circumstances, this can prevent clients from experiencing "no leader" errors. This was added in + Consul 1.0. Must be a duration value such as 10s. Defaults to 7s. + * `ports` This is a nested object that allows setting the bind ports for the following keys: * `dns` - The DNS server, -1 to disable. Default 8600.