agent/router: refactor calculation of delay between rebalances.
This change attempts to make the delay logic more obvious by: * remove indirection, inline a bunch of function calls * move all the code and constants next to each other * replace the two constant values with a single value * reword the comments.
This commit is contained in:
parent
da1e45745c
commit
a3f922249e
|
@ -12,44 +12,10 @@ import (
|
|||
"sync/atomic"
|
||||
"time"
|
||||
|
||||
"github.com/hashicorp/consul/agent/metadata"
|
||||
"github.com/hashicorp/consul/lib"
|
||||
"github.com/hashicorp/consul/logging"
|
||||
"github.com/hashicorp/go-hclog"
|
||||
)
|
||||
|
||||
const (
|
||||
// clientRPCJitterFraction determines the amount of jitter added to
|
||||
// clientRPCMinReuseDuration before a connection is expired and a new
|
||||
// connection is established in order to rebalance load across consul
|
||||
// servers. The cluster-wide number of connections per second from
|
||||
// rebalancing is applied after this jitter to ensure the CPU impact
|
||||
// is always finite. See newRebalanceConnsPerSecPerServer's comment
|
||||
// for additional commentary.
|
||||
//
|
||||
// For example, in a 10K consul cluster with 5x servers, this default
|
||||
// averages out to ~13 new connections from rebalancing per server
|
||||
// per second (each connection is reused for 120s to 180s).
|
||||
clientRPCJitterFraction = 2
|
||||
|
||||
// clientRPCMinReuseDuration controls the minimum amount of time RPC
|
||||
// queries are sent over an established connection to a single server
|
||||
clientRPCMinReuseDuration = 120 * time.Second
|
||||
|
||||
// Limit the number of new connections a server receives per second
|
||||
// for connection rebalancing. This limit caps the load caused by
|
||||
// continual rebalancing efforts when a cluster is in equilibrium. A
|
||||
// lower value comes at the cost of increased recovery time after a
|
||||
// partition. This parameter begins to take effect when there are
|
||||
// more than ~48K clients querying 5x servers or at lower server
|
||||
// values when there is a partition.
|
||||
//
|
||||
// For example, in a 100K consul cluster with 5x servers, it will
|
||||
// take ~5min for all servers to rebalance their connections. If
|
||||
// 99,995 agents are in the minority talking to only one server, it
|
||||
// will take ~26min for all servers to rebalance. A 10K cluster in
|
||||
// the same scenario will take ~2.6min to rebalance.
|
||||
newRebalanceConnsPerSecPerServer = 64
|
||||
"github.com/hashicorp/consul/agent/metadata"
|
||||
"github.com/hashicorp/consul/logging"
|
||||
)
|
||||
|
||||
// ManagerSerfCluster is an interface wrapper around Serf in order to make this
|
||||
|
@ -278,7 +244,7 @@ func New(logger hclog.Logger, shutdownCh chan struct{}, clusterInfo ManagerSerfC
|
|||
m.logger = logger.Named(logging.Manager)
|
||||
m.clusterInfo = clusterInfo // can't pass *consul.Client: import cycle
|
||||
m.connPoolPinger = connPoolPinger // can't pass *consul.ConnPool: import cycle
|
||||
m.rebalanceTimer = time.NewTimer(clientRPCMinReuseDuration)
|
||||
m.rebalanceTimer = time.NewTimer(delayer.MinDelay)
|
||||
m.shutdownCh = shutdownCh
|
||||
m.rebalancer = rb
|
||||
m.serverName = serverName
|
||||
|
@ -497,44 +463,28 @@ func (m *Manager) RemoveServer(s *metadata.Server) {
|
|||
}
|
||||
}
|
||||
|
||||
// refreshServerRebalanceTimer is only called once m.rebalanceTimer expires.
|
||||
func (m *Manager) refreshServerRebalanceTimer() time.Duration {
|
||||
l := m.getServerList()
|
||||
numServers := len(l.servers)
|
||||
// Limit this connection's life based on the size (and health) of the
|
||||
// cluster. Never rebalance a connection more frequently than
|
||||
// connReuseLowWatermarkDuration, and make sure we never exceed
|
||||
// clusterWideRebalanceConnsPerSec operations/s across numLANMembers.
|
||||
clusterWideRebalanceConnsPerSec := float64(numServers * newRebalanceConnsPerSecPerServer)
|
||||
connReuseLowWatermarkDuration := clientRPCMinReuseDuration + lib.RandomStagger(clientRPCMinReuseDuration/clientRPCJitterFraction)
|
||||
numLANMembers := m.clusterInfo.NumNodes()
|
||||
connRebalanceTimeout := lib.RateScaledInterval(clusterWideRebalanceConnsPerSec, connReuseLowWatermarkDuration, numLANMembers)
|
||||
|
||||
m.rebalanceTimer.Reset(connRebalanceTimeout)
|
||||
return connRebalanceTimeout
|
||||
}
|
||||
|
||||
// ResetRebalanceTimer resets the rebalance timer. This method exists for
|
||||
// testing and should not be used directly.
|
||||
func (m *Manager) ResetRebalanceTimer() {
|
||||
m.listLock.Lock()
|
||||
defer m.listLock.Unlock()
|
||||
m.rebalanceTimer.Reset(clientRPCMinReuseDuration)
|
||||
m.rebalanceTimer.Reset(delayer.MinDelay)
|
||||
}
|
||||
|
||||
// Start is used to start and manage the task of automatically shuffling and
|
||||
// rebalancing the list of Consul servers. This maintenance only happens
|
||||
// periodically based on the expiration of the timer. Failed servers are
|
||||
// automatically cycled to the end of the list. New servers are appended to
|
||||
// the list. The order of the server list must be shuffled periodically to
|
||||
// distribute load across all known and available Consul servers.
|
||||
func (m *Manager) Start() {
|
||||
// Run periodically shuffles the list of servers to evenly distribute load.
|
||||
// Run exits when shutdownCh is closed.
|
||||
//
|
||||
// When a server fails it is moved to the end of the list, and new servers are
|
||||
// appended to the end of the list. Run ensures that load is distributed evenly
|
||||
// to all servers by randomly shuffling the list.
|
||||
func (m *Manager) Run() {
|
||||
for {
|
||||
select {
|
||||
case <-m.rebalanceTimer.C:
|
||||
m.rebalancer()
|
||||
m.RebalanceServers()
|
||||
m.refreshServerRebalanceTimer()
|
||||
delay := delayer.Delay(len(m.getServerList().servers), m.clusterInfo.NumNodes())
|
||||
m.rebalanceTimer.Reset(delay)
|
||||
|
||||
case <-m.shutdownCh:
|
||||
m.logger.Info("shutting down")
|
||||
|
@ -542,3 +492,59 @@ func (m *Manager) Start() {
|
|||
}
|
||||
}
|
||||
}
|
||||
|
||||
// delayer is used to calculate the time to wait between calls to rebalance the
|
||||
// servers. Rebalancing is necessary to ensure that load is balanced evenly
|
||||
// across all the servers.
|
||||
//
|
||||
// The values used by delayer must balance perfectly distributed server load
|
||||
// against the overhead of a client reconnecting to a server. Rebalancing on
|
||||
// every request would cause a lot of unnecessary load as clients reconnect,
|
||||
// where as never rebalancing would lead to situations where one or two servers
|
||||
// handle a lot more requests than others.
|
||||
//
|
||||
// These values result in a minimum delay of 120-180s. Once the number of
|
||||
// nodes/server exceeds 11520, the value will be determined by multiplying the
|
||||
// node/server ratio by 15.625ms.
|
||||
var delayer = rebalanceDelayer{
|
||||
MinDelay: 2 * time.Minute,
|
||||
MaxJitter: time.Minute,
|
||||
// Once the number of nodes/server exceeds 11520 this value is used to
|
||||
// increase the delay between rebalances to set a limit on the number of
|
||||
// reconnections per server in a given time frame.
|
||||
//
|
||||
// A higher value comes at the cost of increased recovery time after a
|
||||
// partition.
|
||||
//
|
||||
// For example, in a 100,000 node consul cluster with 5 servers, it will
|
||||
// take ~5min for all clients to rebalance their connections. If
|
||||
// 99,995 agents are in the minority talking to only one server, it
|
||||
// will take ~26min for all clients to rebalance. A 10K cluster in
|
||||
// the same scenario will take ~2.6min to rebalance.
|
||||
DelayPerNode: 15*time.Millisecond + 625*time.Microsecond,
|
||||
}
|
||||
|
||||
type rebalanceDelayer struct {
|
||||
// MinDelay that may be returned by Delay
|
||||
MinDelay time.Duration
|
||||
// MaxJitter to add to MinDelay to ensure there is some randomness in the
|
||||
// delay.
|
||||
MaxJitter time.Duration
|
||||
// DelayPerNode is the duration to add to each node when calculating delay.
|
||||
// The value is divided by the number of servers to arrive at the final
|
||||
// delay value.
|
||||
DelayPerNode time.Duration
|
||||
}
|
||||
|
||||
func (d *rebalanceDelayer) Delay(servers int, nodes int) time.Duration {
|
||||
min := d.MinDelay + time.Duration(rand.Int63n(int64(d.MaxJitter)))
|
||||
if servers == 0 {
|
||||
return min
|
||||
}
|
||||
|
||||
delay := time.Duration(float64(nodes) * float64(d.DelayPerNode) / float64(servers))
|
||||
if delay < min {
|
||||
return min
|
||||
}
|
||||
return delay
|
||||
}
|
||||
|
|
|
@ -264,7 +264,7 @@ func test_reconcileServerList(maxServers int) (bool, error) {
|
|||
return true, nil
|
||||
}
|
||||
|
||||
func TestManager_refreshServerRebalanceTimer(t *testing.T) {
|
||||
func TestRebalanceDelayer(t *testing.T) {
|
||||
type testCase struct {
|
||||
servers int
|
||||
nodes int
|
||||
|
@ -319,9 +319,7 @@ func TestManager_refreshServerRebalanceTimer(t *testing.T) {
|
|||
}
|
||||
|
||||
for _, tc := range testCases {
|
||||
m := &Manager{clusterInfo: &fauxSerf{numNodes: tc.nodes}, rebalanceTimer: time.NewTimer(0)}
|
||||
m.saveServerList(serverList{servers: make([]*metadata.Server, tc.servers)})
|
||||
delay := m.refreshServerRebalanceTimer()
|
||||
delay := delayer.Delay(tc.servers, tc.nodes)
|
||||
|
||||
if tc.expected != 0 {
|
||||
assert.Equal(t, tc.expected, delay, "nodes=%d, servers=%d", tc.nodes, tc.servers)
|
||||
|
|
|
@ -5,14 +5,15 @@ import (
|
|||
"sort"
|
||||
"sync"
|
||||
|
||||
"github.com/hashicorp/go-hclog"
|
||||
"github.com/hashicorp/serf/coordinate"
|
||||
"github.com/hashicorp/serf/serf"
|
||||
|
||||
"github.com/hashicorp/consul/agent/metadata"
|
||||
"github.com/hashicorp/consul/agent/structs"
|
||||
"github.com/hashicorp/consul/lib"
|
||||
"github.com/hashicorp/consul/logging"
|
||||
"github.com/hashicorp/consul/types"
|
||||
"github.com/hashicorp/go-hclog"
|
||||
"github.com/hashicorp/serf/coordinate"
|
||||
"github.com/hashicorp/serf/serf"
|
||||
)
|
||||
|
||||
// Router keeps track of a set of network areas and their associated Serf
|
||||
|
@ -269,7 +270,7 @@ func (r *Router) maybeInitializeManager(area *areaInfo, dc string) *Manager {
|
|||
|
||||
managers := r.managers[dc]
|
||||
r.managers[dc] = append(managers, manager)
|
||||
go manager.Start()
|
||||
go manager.Run()
|
||||
|
||||
return manager
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue