Continually rebalance client connections
Introduce a low-level background connection expiration mechanism wherein connections will be recycled periodically based on the size and health of the cluster. For the vast majority of consul users, this will mean an average connection age of 150s. For 10K node clusters it will take ~3min for clusters to rebalance their connections. In the pathological case for a 100K cluster where 99K clients are in the minority talking to 1x server it will take ~26min to rebalance all connections. It's possibe for clients recovering from a parititon to become fixated on a single server until the server or agent is restarted. This is of particular interest to long-running environments with stable agents, where `allow_stale` is true, and partitions occur periodically.
This commit is contained in:
parent
8a37e76cb0
commit
ef8bbca48f
|
@ -12,11 +12,44 @@ import (
|
|||
"time"
|
||||
|
||||
"github.com/hashicorp/consul/consul/structs"
|
||||
"github.com/hashicorp/consul/lib"
|
||||
"github.com/hashicorp/serf/coordinate"
|
||||
"github.com/hashicorp/serf/serf"
|
||||
)
|
||||
|
||||
const (
|
||||
// clientRPCMinReuseDuration controls the minimum amount of time RPC
|
||||
// queries are sent over an established connection to a single server
|
||||
clientRPCMinReuseDuration = 120 * time.Second
|
||||
|
||||
// clientRPCJitterFraction determines the amount of jitter added to
|
||||
// clientRPCMinReuseDuration before a connection is expired and a new
|
||||
// connection is established in order to rebalance load across consul
|
||||
// servers. The cluster-wide number of connections per second from
|
||||
// rebalancing is applied after this jitter to ensure the CPU impact
|
||||
// is always finite. See newRebalanceConnsPerSecPerServer's comment
|
||||
// for additional commentary.
|
||||
//
|
||||
// For example, in a 10K consul cluster with 5x servers, this default
|
||||
// averages out to ~13 new connections from rebalancing per server
|
||||
// per second (each connection is reused for 120s to 180s).
|
||||
clientRPCJitterFraction = 2
|
||||
|
||||
// Limit the number of new connections a server receives per second
|
||||
// for connection rebalancing. This limit caps the load caused by
|
||||
// continual rebalancing efforts when a cluster is in equilibrium. A
|
||||
// lower value comes at the cost of increased recovery time after a
|
||||
// partition. This parameter begins to take effect when there are
|
||||
// more than ~48K clients querying 5x servers or at lower server
|
||||
// values when there is a partition.
|
||||
//
|
||||
// For example, in a 100K consul cluster with 5x servers, it will
|
||||
// take ~5min for all servers to rebalance their connections. If
|
||||
// 99,995 agents are in the minority talking to only one server, it
|
||||
// will take ~26min for all servers to rebalance. A 10K cluster in
|
||||
// the same scenario will take ~2.6min to rebalance.
|
||||
newRebalanceConnsPerSecPerServer = 64
|
||||
|
||||
// clientRPCConnMaxIdle controls how long we keep an idle connection
|
||||
// open to a server. 127s was chosen as the first prime above 120s
|
||||
// (arbitrarily chose to use a prime) with the intent of reusing
|
||||
|
@ -60,6 +93,10 @@ type Client struct {
|
|||
lastServer *serverParts
|
||||
lastRPCTime time.Time
|
||||
|
||||
// connRebalanceTime is the time at which we should change the server
|
||||
// we query for RPC requests.
|
||||
connRebalanceTime time.Time
|
||||
|
||||
// Logger uses the provided LogOutput
|
||||
logger *log.Logger
|
||||
|
||||
|
@ -332,9 +369,23 @@ func (c *Client) localEvent(event serf.UserEvent) {
|
|||
|
||||
// RPC is used to forward an RPC call to a consul server, or fail if no servers
|
||||
func (c *Client) RPC(method string, args interface{}, reply interface{}) error {
|
||||
// Check the last rpc time
|
||||
// Check to make sure we haven't spent too much time querying a
|
||||
// single server
|
||||
now := time.Now()
|
||||
if !c.connRebalanceTime.IsZero() && now.After(c.connRebalanceTime) {
|
||||
c.logger.Printf("[DEBUG] consul: connection time to server %s exceeded, rotating server connection", c.lastServer.Addr)
|
||||
c.lastServer = nil
|
||||
}
|
||||
|
||||
// Allocate these vars on the stack before the goto
|
||||
var numConsulServers int
|
||||
var clusterWideRebalanceConnsPerSec float64
|
||||
var connReuseLowWaterMark time.Duration
|
||||
var numLANMembers int
|
||||
|
||||
// Check the last RPC time, continue to reuse cached connection for
|
||||
// up to clientRPCMinReuseDuration unless exceeded
|
||||
// clientRPCConnMaxIdle
|
||||
lastRPCTime := now.Sub(c.lastRPCTime)
|
||||
var server *serverParts
|
||||
if c.lastServer != nil && lastRPCTime < clientRPCConnMaxIdle {
|
||||
|
@ -356,11 +407,22 @@ func (c *Client) RPC(method string, args interface{}, reply interface{}) error {
|
|||
server = c.consuls[rand.Int31n(int32(numConsulServers))]
|
||||
c.consulLock.RUnlock()
|
||||
|
||||
// Limit this connection's life based on the size (and health) of the
|
||||
// cluster. Never rebalance a connection more frequently than
|
||||
// connReuseLowWaterMark, and make sure we never exceed
|
||||
// clusterWideRebalanceConnsPerSec operations/s across numLANMembers.
|
||||
clusterWideRebalanceConnsPerSec = float64(numConsulServers * newRebalanceConnsPerSecPerServer)
|
||||
connReuseLowWaterMark = clientRPCMinReuseDuration + lib.RandomStagger(clientRPCMinReuseDuration/clientRPCJitterFraction)
|
||||
numLANMembers = len(c.LANMembers())
|
||||
c.connRebalanceTime = now.Add(lib.RateScaledInterval(clusterWideRebalanceConnsPerSec, connReuseLowWaterMark, numLANMembers))
|
||||
c.logger.Printf("[DEBUG] consul: connection to server %s will expire at %v", c.lastServer.Addr, c.connRebalanceTime)
|
||||
|
||||
// Forward to remote Consul
|
||||
TRY_RPC:
|
||||
if err := c.connPool.RPC(c.config.Datacenter, server.Addr, server.Version, method, args, reply); err != nil {
|
||||
c.lastServer = nil
|
||||
c.connRebalanceTime = time.Time{}
|
||||
c.lastRPCTime = time.Time{}
|
||||
c.lastServer = nil
|
||||
return err
|
||||
}
|
||||
|
||||
|
|
Loading…
Reference in a new issue