diff --git a/consul/client.go b/consul/client.go index 59a18c0e3..7f7aa5f91 100644 --- a/consul/client.go +++ b/consul/client.go @@ -12,11 +12,44 @@ import ( "time" "github.com/hashicorp/consul/consul/structs" + "github.com/hashicorp/consul/lib" "github.com/hashicorp/serf/coordinate" "github.com/hashicorp/serf/serf" ) const ( + // clientRPCMinReuseDuration controls the minimum amount of time RPC + // queries are sent over an established connection to a single server + clientRPCMinReuseDuration = 120 * time.Second + + // clientRPCJitterFraction determines the amount of jitter added to + // clientRPCMinReuseDuration before a connection is expired and a new + // connection is established in order to rebalance load across consul + // servers. The cluster-wide number of connections per second from + // rebalancing is applied after this jitter to ensure the CPU impact + // is always finite. See newRebalanceConnsPerSecPerServer's comment + // for additional commentary. + // + // For example, in a 10K consul cluster with 5x servers, this default + // averages out to ~13 new connections from rebalancing per server + // per second (each connection is reused for 120s to 180s). + clientRPCJitterFraction = 2 + + // Limit the number of new connections a server receives per second + // for connection rebalancing. This limit caps the load caused by + // continual rebalancing efforts when a cluster is in equilibrium. A + // lower value comes at the cost of increased recovery time after a + // partition. This parameter begins to take effect when there are + // more than ~48K clients querying 5x servers or at lower server + // values when there is a partition. + // + // For example, in a 100K consul cluster with 5x servers, it will + // take ~5min for all servers to rebalance their connections. If + // 99,995 agents are in the minority talking to only one server, it + // will take ~26min for all servers to rebalance. A 10K cluster in + // the same scenario will take ~2.6min to rebalance. + newRebalanceConnsPerSecPerServer = 64 + // clientRPCConnMaxIdle controls how long we keep an idle connection // open to a server. 127s was chosen as the first prime above 120s // (arbitrarily chose to use a prime) with the intent of reusing @@ -60,6 +93,10 @@ type Client struct { lastServer *serverParts lastRPCTime time.Time + // connRebalanceTime is the time at which we should change the server + // we query for RPC requests. + connRebalanceTime time.Time + // Logger uses the provided LogOutput logger *log.Logger @@ -332,9 +369,23 @@ func (c *Client) localEvent(event serf.UserEvent) { // RPC is used to forward an RPC call to a consul server, or fail if no servers func (c *Client) RPC(method string, args interface{}, reply interface{}) error { - // Check the last rpc time + // Check to make sure we haven't spent too much time querying a + // single server now := time.Now() + if !c.connRebalanceTime.IsZero() && now.After(c.connRebalanceTime) { + c.logger.Printf("[DEBUG] consul: connection time to server %s exceeded, rotating server connection", c.lastServer.Addr) + c.lastServer = nil + } + + // Allocate these vars on the stack before the goto var numConsulServers int + var clusterWideRebalanceConnsPerSec float64 + var connReuseLowWaterMark time.Duration + var numLANMembers int + + // Check the last RPC time, continue to reuse cached connection for + // up to clientRPCMinReuseDuration unless exceeded + // clientRPCConnMaxIdle lastRPCTime := now.Sub(c.lastRPCTime) var server *serverParts if c.lastServer != nil && lastRPCTime < clientRPCConnMaxIdle { @@ -356,11 +407,22 @@ func (c *Client) RPC(method string, args interface{}, reply interface{}) error { server = c.consuls[rand.Int31n(int32(numConsulServers))] c.consulLock.RUnlock() + // Limit this connection's life based on the size (and health) of the + // cluster. Never rebalance a connection more frequently than + // connReuseLowWaterMark, and make sure we never exceed + // clusterWideRebalanceConnsPerSec operations/s across numLANMembers. + clusterWideRebalanceConnsPerSec = float64(numConsulServers * newRebalanceConnsPerSecPerServer) + connReuseLowWaterMark = clientRPCMinReuseDuration + lib.RandomStagger(clientRPCMinReuseDuration/clientRPCJitterFraction) + numLANMembers = len(c.LANMembers()) + c.connRebalanceTime = now.Add(lib.RateScaledInterval(clusterWideRebalanceConnsPerSec, connReuseLowWaterMark, numLANMembers)) + c.logger.Printf("[DEBUG] consul: connection to server %s will expire at %v", c.lastServer.Addr, c.connRebalanceTime) + // Forward to remote Consul TRY_RPC: if err := c.connPool.RPC(c.config.Datacenter, server.Addr, server.Version, method, args, reply); err != nil { - c.lastServer = nil + c.connRebalanceTime = time.Time{} c.lastRPCTime = time.Time{} + c.lastServer = nil return err }