Merge pull request #1667 from hashicorp/b-redistribute-clients

Continually redistribute client RPC connections
This commit is contained in:
sean- 2016-02-02 11:15:19 -08:00
commit 4a56b5e50e
1 changed files with 81 additions and 13 deletions

View File

@ -12,14 +12,51 @@ import (
"time"
"github.com/hashicorp/consul/consul/structs"
"github.com/hashicorp/consul/lib"
"github.com/hashicorp/serf/coordinate"
"github.com/hashicorp/serf/serf"
)
const (
// clientRPCCache controls how long we keep an idle connection
// open to a server
clientRPCCache = 30 * time.Second
// clientRPCMinReuseDuration controls the minimum amount of time RPC
// queries are sent over an established connection to a single server
clientRPCMinReuseDuration = 120 * time.Second
// clientRPCJitterFraction determines the amount of jitter added to
// clientRPCMinReuseDuration before a connection is expired and a new
// connection is established in order to rebalance load across consul
// servers. The cluster-wide number of connections per second from
// rebalancing is applied after this jitter to ensure the CPU impact
// is always finite. See newRebalanceConnsPerSecPerServer's comment
// for additional commentary.
//
// For example, in a 10K consul cluster with 5x servers, this default
// averages out to ~13 new connections from rebalancing per server
// per second (each connection is reused for 120s to 180s).
clientRPCJitterFraction = 2
// Limit the number of new connections a server receives per second
// for connection rebalancing. This limit caps the load caused by
// continual rebalancing efforts when a cluster is in equilibrium. A
// lower value comes at the cost of increased recovery time after a
// partition. This parameter begins to take effect when there are
// more than ~48K clients querying 5x servers or at lower server
// values when there is a partition.
//
// For example, in a 100K consul cluster with 5x servers, it will
// take ~5min for all servers to rebalance their connections. If
// 99,995 agents are in the minority talking to only one server, it
// will take ~26min for all servers to rebalance. A 10K cluster in
// the same scenario will take ~2.6min to rebalance.
newRebalanceConnsPerSecPerServer = 64
// clientRPCConnMaxIdle controls how long we keep an idle connection
// open to a server. 127s was chosen as the first prime above 120s
// (arbitrarily chose to use a prime) with the intent of reusing
// connections who are used by once-a-minute cron(8) jobs *and* who
// use a 60s jitter window (e.g. in vixie cron job execution can
// drift by up to 59s per job, or 119s for a once-a-minute cron job).
clientRPCConnMaxIdle = 127 * time.Second
// clientMaxStreams controls how many idle streams we keep
// open to a server
@ -56,6 +93,10 @@ type Client struct {
lastServer *serverParts
lastRPCTime time.Time
// connRebalanceTime is the time at which we should change the server
// we query for RPC requests.
connRebalanceTime time.Time
// Logger uses the provided LogOutput
logger *log.Logger
@ -103,7 +144,7 @@ func NewClient(config *Config) (*Client, error) {
// Create server
c := &Client{
config: config,
connPool: NewPool(config.LogOutput, clientRPCCache, clientMaxStreams, tlsWrap),
connPool: NewPool(config.LogOutput, clientRPCConnMaxIdle, clientMaxStreams, tlsWrap),
eventCh: make(chan serf.Event, 256),
logger: logger,
shutdownCh: make(chan struct{}),
@ -328,37 +369,64 @@ func (c *Client) localEvent(event serf.UserEvent) {
// RPC is used to forward an RPC call to a consul server, or fail if no servers
func (c *Client) RPC(method string, args interface{}, reply interface{}) error {
// Check the last rpc time
// Check to make sure we haven't spent too much time querying a
// single server
now := time.Now()
if !c.connRebalanceTime.IsZero() && now.After(c.connRebalanceTime) {
c.logger.Printf("[DEBUG] consul: connection time to server %s exceeded, rotating server connection", c.lastServer.Addr)
c.lastServer = nil
}
// Allocate these vars on the stack before the goto
var numConsulServers int
var clusterWideRebalanceConnsPerSec float64
var connReuseLowWaterMark time.Duration
var numLANMembers int
// Check the last RPC time, continue to reuse cached connection for
// up to clientRPCMinReuseDuration unless exceeded
// clientRPCConnMaxIdle
lastRPCTime := now.Sub(c.lastRPCTime)
var server *serverParts
if time.Now().Sub(c.lastRPCTime) < clientRPCCache {
if c.lastServer != nil && lastRPCTime < clientRPCConnMaxIdle {
server = c.lastServer
if server != nil {
goto TRY_RPC
}
goto TRY_RPC
}
// Bail if we can't find any servers
c.consulLock.RLock()
if len(c.consuls) == 0 {
numConsulServers = len(c.consuls)
if numConsulServers == 0 {
c.consulLock.RUnlock()
return structs.ErrNoServers
}
// Select a random addr
server = c.consuls[rand.Int31()%int32(len(c.consuls))]
server = c.consuls[rand.Int31n(int32(numConsulServers))]
c.consulLock.RUnlock()
// Limit this connection's life based on the size (and health) of the
// cluster. Never rebalance a connection more frequently than
// connReuseLowWaterMark, and make sure we never exceed
// clusterWideRebalanceConnsPerSec operations/s across numLANMembers.
clusterWideRebalanceConnsPerSec = float64(numConsulServers * newRebalanceConnsPerSecPerServer)
connReuseLowWaterMark = clientRPCMinReuseDuration + lib.RandomStagger(clientRPCMinReuseDuration/clientRPCJitterFraction)
numLANMembers = len(c.LANMembers())
c.connRebalanceTime = now.Add(lib.RateScaledInterval(clusterWideRebalanceConnsPerSec, connReuseLowWaterMark, numLANMembers))
c.logger.Printf("[DEBUG] consul: connection to server %s will expire at %v", c.lastServer.Addr, c.connRebalanceTime)
// Forward to remote Consul
TRY_RPC:
if err := c.connPool.RPC(c.config.Datacenter, server.Addr, server.Version, method, args, reply); err != nil {
c.lastServer = nil
c.connRebalanceTime = time.Time{}
c.lastRPCTime = time.Time{}
c.lastServer = nil
return err
}
// Cache the last server
c.lastServer = server
c.lastRPCTime = time.Now()
c.lastRPCTime = now
return nil
}