Introduce asynchronous management of consul server lists

Instead of blocking the RPC call path and performing a potentially expensive calculation (including a call to `c.LANMembers()`), introduce a channel to request a rebalance.  Some events don't force a reshuffle, instead the extend the duration of the current rebalance window because the environment thrashed enough to redistribute a client's load.
This commit is contained in:
Sean Chittenden 2016-02-24 10:55:04 -08:00
parent 6ed37d1d8d
commit 26e51376d9
5 changed files with 327 additions and 29 deletions

View File

@ -118,7 +118,7 @@ func NewClient(config *Config) (*Client, error) {
shutdownCh: make(chan struct{}),
}
c.serverMgr = server_manager.NewServerManager(c.logger, c.shutdownCh)
c.serverMgr = server_manager.NewServerManager(c.logger, c.shutdownCh, c.serf)
// Start consulServers maintenance
go c.serverMgr.StartServerManager()

View File

@ -83,6 +83,12 @@ func TestClient_JoinLAN(t *testing.T) {
if _, err := c1.JoinLAN([]string{addr}); err != nil {
t.Fatalf("err: %v", err)
}
numServers := c1.serverMgr.GetNumServers()
testutil.WaitForResult(func() (bool, error) {
return numServers == 1, nil
}, func(err error) {
t.Fatalf("expected consul server: %d", numServers)
})
// Check the members
testutil.WaitForResult(func() (bool, error) {
@ -93,9 +99,10 @@ func TestClient_JoinLAN(t *testing.T) {
t.Fatalf("bad len")
})
numServers = c1.serverMgr.GetNumServers()
// Check we have a new consul
testutil.WaitForResult(func() (bool, error) {
return c1.serverMgr.GetNumServers() == 1, nil
return numServers == 1, nil
}, func(err error) {
t.Fatalf("expected consul server")
})

View File

@ -9,6 +9,7 @@ import (
"github.com/hashicorp/consul/consul/server_details"
"github.com/hashicorp/consul/lib"
"github.com/hashicorp/serf/serf"
)
type consulServerEventTypes int
@ -23,6 +24,11 @@ const (
// connection load across servers
consulServersRebalance
// consulServersRefreshRebalanceDuration is used to signal when we
// should reset the rebalance duration because the server list has
// changed and we don't need to proactively change our connection
consulServersRefreshRebalanceDuration
// consulServersRPCError is used to signal when a server has either
// timed out or returned an error and we would like to have the
// server manager find a new preferredServer.
@ -47,6 +53,11 @@ const (
// queries are sent over an established connection to a single server
clientRPCMinReuseDuration = 120 * time.Second
// initialRebalanceTimeoutHours is the initial value for the
// rebalanceTimer. This value is discarded immediately after the
// client becomes aware of the first server.
initialRebalanceTimeoutHours = 24
// Limit the number of new connections a server receives per second
// for connection rebalancing. This limit caps the load caused by
// continual rebalancing efforts when a cluster is in equilibrium. A
@ -61,6 +72,14 @@ const (
// will take ~26min for all servers to rebalance. A 10K cluster in
// the same scenario will take ~2.6min to rebalance.
newRebalanceConnsPerSecPerServer = 64
// maxConsulServerManagerEvents is the size of the consulServersCh
// buffer.
maxConsulServerManagerEvents = 16
// defaultClusterSize is the assumed cluster size if no serf cluster
// is available.
defaultClusterSize = 1024
)
// serverCfg is the thread-safe configuration structure that is used to
@ -71,9 +90,6 @@ const (
type serverConfig struct {
// servers tracks the locally known servers
servers []*server_details.ServerDetails
// Timer used to control rebalancing of servers
rebalanceTimer *time.Timer
}
type ServerManager struct {
@ -86,11 +102,20 @@ type ServerManager struct {
// maintenance of the list of consulServers
consulServersCh chan consulServerEventTypes
// refreshRebalanceDurationCh is used to signal that a refresh should
// occur
refreshRebalanceDurationCh chan bool
// shutdownCh is a copy of the channel in consul.Client
shutdownCh chan struct{}
// logger uses the provided LogOutput
logger *log.Logger
// serf is used to estimate the approximate number of nodes in a
// cluster and limit the rate at which it rebalances server
// connections
serf *serf.Serf
}
// AddServer takes out an internal write lock and adds a new server. If the
@ -150,8 +175,8 @@ func (sm *ServerManager) CycleFailedServers() {
}
}
serverCfg.resetRebalanceTimer(sm)
sm.saveServerConfig(serverCfg)
sm.requestRefreshRebalanceDuration()
}
// cycleServers returns a new list of servers that has dequeued the first
@ -211,16 +236,18 @@ func (sm *ServerManager) getServerConfig() serverConfig {
// NewServerManager is the only way to safely create a new ServerManager
// struct.
func NewServerManager(logger *log.Logger, shutdownCh chan struct{}) (sm *ServerManager) {
func NewServerManager(logger *log.Logger, shutdownCh chan struct{}, serf *serf.Serf) (sm *ServerManager) {
// NOTE(sean@): Can't pass *consul.Client due to an import cycle
sm = new(ServerManager)
sm.logger = logger
sm.serf = serf
sm.consulServersCh = make(chan consulServerEventTypes, maxConsulServerManagerEvents)
sm.shutdownCh = shutdownCh
sm.refreshRebalanceDurationCh = make(chan bool, maxConsulServerManagerEvents)
sc := serverConfig{}
sc.servers = make([]*server_details.ServerDetails, 0)
sc.rebalanceTimer = time.NewTimer(time.Duration(initialRebalanceTimeoutHours * time.Hour))
sm.serverConfigValue.Store(sc)
return sm
}
@ -259,8 +286,8 @@ func (sm *ServerManager) RebalanceServers() {
}
serverCfg.servers = newServers
serverCfg.resetRebalanceTimer(sm)
sm.saveServerConfig(serverCfg)
sm.requestRefreshRebalanceDuration()
}
// RemoveServer takes out an internal write lock and removes a server from
@ -291,27 +318,45 @@ func (sm *ServerManager) RemoveServer(server *server_details.ServerDetails) {
}
}
// resetRebalanceTimer assumes:
//
// 1) the serverConfigLock is already held by the caller.
// 2) the caller will call serverConfigValue.Store()
func (sc *serverConfig) resetRebalanceTimer(sm *ServerManager) {
numConsulServers := len(sc.servers)
// requestRefreshRebalanceDuration sends a message to which causes a background
// thread to recalc the duration
func (sm *ServerManager) requestRefreshRebalanceDuration() {
sm.refreshRebalanceDurationCh <- true
}
// requestServerRebalance sends a message to which causes a background thread
// to reshuffle the list of servers
func (sm *ServerManager) requestServerRebalance() {
sm.consulServersCh <- consulServersRebalance
}
// refreshServerRebalanceTimer is called
func (sm *ServerManager) refreshServerRebalanceTimer(timer *time.Timer) {
serverCfg := sm.getServerConfig()
numConsulServers := len(serverCfg.servers)
// Limit this connection's life based on the size (and health) of the
// cluster. Never rebalance a connection more frequently than
// connReuseLowWatermarkDuration, and make sure we never exceed
// clusterWideRebalanceConnsPerSec operations/s across numLANMembers.
clusterWideRebalanceConnsPerSec := float64(numConsulServers * newRebalanceConnsPerSecPerServer)
connReuseLowWatermarkDuration := clientRPCMinReuseDuration + lib.RandomStagger(clientRPCMinReuseDuration/clientRPCJitterFraction)
numLANMembers := 16384 // Assume sufficiently large for now. FIXME: numLanMembers := len(c.LANMembers())
// Assume a moderate sized cluster unless we have an actual serf
// instance we can query.
numLANMembers := defaultClusterSize
if sm.serf != nil {
numLANMembers = sm.serf.NumNodes()
}
connRebalanceTimeout := lib.RateScaledInterval(clusterWideRebalanceConnsPerSec, connReuseLowWatermarkDuration, numLANMembers)
sm.logger.Printf("[DEBUG] consul: connection will be rebalanced in %v", connRebalanceTimeout)
if sc.rebalanceTimer == nil {
sc.rebalanceTimer = time.NewTimer(connRebalanceTimeout)
} else {
sc.rebalanceTimer.Reset(connRebalanceTimeout)
}
timer.Reset(connRebalanceTimeout)
}
// saveServerConfig is a convenience method which hides the locking semantics
// of atomic.Value from the caller.
func (sm *ServerManager) saveServerConfig(sc serverConfig) {
sm.serverConfigValue.Store(sc)
}
// StartServerManager is used to start and manage the task of automatically
@ -319,7 +364,9 @@ func (sc *serverConfig) resetRebalanceTimer(sm *ServerManager) {
// happens either when a new server is added or when a duration has been
// exceed.
func (sm *ServerManager) StartServerManager() {
var rebalanceTimer *time.Timer
var rebalanceTimer *time.Timer = time.NewTimer(time.Duration(initialRebalanceTimeoutHours * time.Hour))
var rebalanceTaskDispatched int32
func() {
sm.serverConfigLock.Lock()
defer sm.serverConfigLock.Unlock()
@ -330,8 +377,6 @@ func (sm *ServerManager) StartServerManager() {
}
var serverCfg serverConfig
serverCfg = serverCfgPtr.(serverConfig)
serverCfg.resetRebalanceTimer(sm)
rebalanceTimer = serverCfg.rebalanceTimer
sm.saveServerConfig(serverCfg)
}()
@ -340,13 +385,14 @@ func (sm *ServerManager) StartServerManager() {
case e := <-sm.consulServersCh:
switch e {
case consulServersNodeJoin:
sm.logger.Printf("[INFO] consul: new node joined cluster")
sm.RebalanceServers()
sm.logger.Printf("[INFO] server manager: new node joined cluster")
// rebalance on new server
sm.requestServerRebalance()
case consulServersRebalance:
sm.logger.Printf("[INFO] consul: rebalancing servers by request")
sm.logger.Printf("[INFO] server manager: rebalancing servers by request")
sm.RebalanceServers()
case consulServersRPCError:
sm.logger.Printf("[INFO] consul: need to find a new server to talk with")
sm.logger.Printf("[INFO] server manager: need to find a new server to talk with")
sm.CycleFailedServers()
// FIXME(sean@): wtb preemptive Status.Ping
// of servers, ideally parallel fan-out of N
@ -360,7 +406,21 @@ func (sm *ServerManager) StartServerManager() {
// their RPC time too low even though the
// Ping did return successfully?
default:
sm.logger.Printf("[WARN] consul: unhandled LAN Serf Event: %#v", e)
sm.logger.Printf("[WARN] server manager: unhandled LAN Serf Event: %#v", e)
}
case <-sm.refreshRebalanceDurationCh:
chanLen := len(sm.refreshRebalanceDurationCh)
// Drain all messages from the rebalance channel
for i := 0; i < chanLen; i++ {
<-sm.refreshRebalanceDurationCh
}
// Only run one rebalance task at a time, but do
// allow for the channel to be drained
if atomic.CompareAndSwapInt32(&rebalanceTaskDispatched, 0, 1) {
go func() {
defer atomic.StoreInt32(&rebalanceTaskDispatched, 0)
sm.refreshServerRebalanceTimer(rebalanceTimer)
}()
}
case <-rebalanceTimer.C:
sm.logger.Printf("[INFO] consul: server rebalance timeout")

View File

@ -0,0 +1,156 @@
package server_manager
import (
"bytes"
"log"
"testing"
"github.com/hashicorp/consul/consul/server_details"
)
var (
localLogger *log.Logger
localLogBuffer *bytes.Buffer
)
func init() {
localLogBuffer = new(bytes.Buffer)
localLogger = log.New(localLogBuffer, "", 0)
}
func GetBufferedLogger() *log.Logger {
return localLogger
}
func testServerManager() (sm *ServerManager) {
logger := GetBufferedLogger()
shutdownCh := make(chan struct{})
sm = NewServerManager(logger, shutdownCh, nil)
return sm
}
// func (sc *serverConfig) cycleServer() (servers []*server_details.ServerDetails) {
func TestServerManagerInternal_cycleServer(t *testing.T) {
sm := testServerManager()
sc := sm.getServerConfig()
server0 := &server_details.ServerDetails{Name: "server1"}
server1 := &server_details.ServerDetails{Name: "server2"}
server2 := &server_details.ServerDetails{Name: "server3"}
sc.servers = append(sc.servers, server0, server1, server2)
sm.saveServerConfig(sc)
sc = sm.getServerConfig()
if len(sc.servers) != 3 {
t.Fatalf("server length incorrect: %d/3", len(sc.servers))
}
if sc.servers[0] != server0 &&
sc.servers[1] != server1 &&
sc.servers[2] != server2 {
t.Fatalf("initial server ordering not correct")
}
sc.servers = sc.cycleServer()
if len(sc.servers) != 3 {
t.Fatalf("server length incorrect: %d/3", len(sc.servers))
}
if sc.servers[0] != server1 &&
sc.servers[1] != server2 &&
sc.servers[2] != server0 {
t.Fatalf("server ordering after one cycle not correct")
}
sc.servers = sc.cycleServer()
if len(sc.servers) != 3 {
t.Fatalf("server length incorrect: %d/3", len(sc.servers))
}
if sc.servers[0] != server2 &&
sc.servers[1] != server0 &&
sc.servers[2] != server1 {
t.Fatalf("server ordering after two cycles not correct")
}
sc.servers = sc.cycleServer()
if len(sc.servers) != 3 {
t.Fatalf("server length incorrect: %d/3", len(sc.servers))
}
if sc.servers[0] != server0 &&
sc.servers[1] != server1 &&
sc.servers[2] != server2 {
t.Fatalf("server ordering after three cycles not correct")
}
}
// func (sm *ServerManager) getServerConfig() serverConfig {
func TestServerManagerInternal_getServerConfig(t *testing.T) {
sm := testServerManager()
sc := sm.getServerConfig()
if sc.servers == nil {
t.Fatalf("serverConfig.servers nil")
}
if len(sc.servers) != 0 {
t.Fatalf("serverConfig.servers length not zero")
}
}
// func NewServerManager(logger *log.Logger, shutdownCh chan struct{}) (sm *ServerManager) {
func TestServerManagerInternal_NewServerManager(t *testing.T) {
sm := testServerManager()
if sm == nil {
t.Fatalf("ServerManager nil")
}
if sm.logger == nil {
t.Fatalf("ServerManager.logger nil")
}
if sm.consulServersCh == nil {
t.Fatalf("ServerManager.consulServersCh nil")
}
if sm.shutdownCh == nil {
t.Fatalf("ServerManager.shutdownCh nil")
}
}
// func (sc *serverConfig) resetRebalanceTimer(sm *ServerManager) {
// func (sm *ServerManager) saveServerConfig(sc serverConfig) {
func TestServerManagerInternal_saveServerConfig(t *testing.T) {
sm := testServerManager()
// Initial condition
func() {
sc := sm.getServerConfig()
if len(sc.servers) != 0 {
t.Fatalf("ServerManager.saveServerConfig failed to load init config")
}
newServer := new(server_details.ServerDetails)
sc.servers = append(sc.servers, newServer)
sm.saveServerConfig(sc)
}()
// Test that save works
func() {
sc1 := sm.getServerConfig()
t1NumServers := len(sc1.servers)
if t1NumServers != 1 {
t.Fatalf("ServerManager.saveServerConfig failed to save mutated config")
}
}()
// Verify mutation w/o a save doesn't alter the original
func() {
newServer := new(server_details.ServerDetails)
sc := sm.getServerConfig()
sc.servers = append(sc.servers, newServer)
sc_orig := sm.getServerConfig()
origNumServers := len(sc_orig.servers)
if origNumServers >= len(sc.servers) {
t.Fatalf("ServerManager.saveServerConfig unsaved config overwrote original")
}
}()
}

View File

@ -0,0 +1,75 @@
package server_manager_test
import (
"bytes"
"log"
"testing"
"github.com/hashicorp/consul/consul/server_details"
"github.com/hashicorp/consul/consul/server_manager"
)
var (
localLogger *log.Logger
localLogBuffer *bytes.Buffer
)
func init() {
localLogBuffer = new(bytes.Buffer)
localLogger = log.New(localLogBuffer, "", 0)
}
func GetBufferedLogger() *log.Logger {
return localLogger
}
func makeMockServerManager() (sm *server_manager.ServerManager) {
logger, shutdownCh := mockServerManager()
sm = server_manager.NewServerManager(logger, shutdownCh, nil)
return sm
}
func mockServerManager() (logger *log.Logger, shutdownCh chan struct{}) {
logger = GetBufferedLogger()
shutdownCh = make(chan struct{})
return logger, shutdownCh
}
// func (sm *ServerManager) AddServer(server *server_details.ServerDetails) {
// func (sm *ServerManager) CycleFailedServers() {
// func (sm *ServerManager) FindHealthyServer() (server *server_details.ServerDetails) {
// func (sm *ServerManager) GetNumServers() (numServers int) {
func TestServerManager_GetNumServers(t *testing.T) {
sm := makeMockServerManager()
var num int
num = sm.GetNumServers()
if num != 0 {
t.Fatalf("Expected zero servers to start")
}
s := &server_details.ServerDetails{}
sm.AddServer(s)
num = sm.GetNumServers()
if num != 1 {
t.Fatalf("Expected one server after AddServer")
}
}
// func NewServerManager(logger *log.Logger, shutdownCh chan struct{}) (sm *ServerManager) {
func TestServerManager_NewServerManager(t *testing.T) {
sm := makeMockServerManager()
if sm == nil {
t.Fatalf("ServerManager nil")
}
}
// func (sm *ServerManager) NotifyFailedServer(server *server_details.ServerDetails) {
// func (sm *ServerManager) RebalanceServers() {
// func (sm *ServerManager) RemoveServer(server *server_details.ServerDetails) {
// func (sm *ServerManager) StartServerManager() {