Simplify error handling

Rely on Serf for liveliness.  In the event of a failure, simply cycle the server to the end of the list.  If the server is unhealthy, Serf will reap the dead server.

Additional simplifications:

*) Only rebalance servers based on timers, not when a new server is readded to the cluster.
*) Back out the failure count in server_details.ServerDetails
This commit is contained in:
Sean Chittenden 2016-02-24 14:48:04 -08:00
parent 0c519aa90d
commit 231768faea
5 changed files with 34 additions and 133 deletions

View File

@ -121,7 +121,7 @@ func NewClient(config *Config) (*Client, error) {
c.serverMgr = server_manager.NewServerManager(c.logger, c.shutdownCh, c.serf)
// Start consulServers maintenance
go c.serverMgr.StartServerManager()
go c.serverMgr.Start()
// Start the Serf listeners to prevent a deadlock
go c.lanEventHandler()

View File

@ -17,11 +17,6 @@ type ServerDetails struct {
Expect int
Version int
Addr net.Addr
// Disabled is a uint64 in order to support atomic integer
// operations. Zero means enabled, non-zero is the number of times
// this server has failed without being marked healthy.
Disabled uint64
}
func (s *ServerDetails) String() string {
@ -37,11 +32,6 @@ func IsConsulServer(m serf.Member) (bool, *ServerDetails) {
datacenter := m.Tags["dc"]
_, bootstrap := m.Tags["bootstrap"]
var disabled uint64 = 0
_, disabledStr := m.Tags["disabled"]
if disabledStr {
disabled = 1
}
expect := 0
expect_str, ok := m.Tags["expect"]
@ -75,7 +65,6 @@ func IsConsulServer(m serf.Member) (bool, *ServerDetails) {
Expect: expect,
Addr: addr,
Version: vsn,
Disabled: disabled,
}
return true, parts
}

View File

@ -29,9 +29,6 @@ func TestIsConsulServer(t *testing.T) {
if parts.Bootstrap {
t.Fatalf("unexpected bootstrap")
}
if parts.Disabled > 0 {
t.Fatalf("unexpected disabled")
}
if parts.Expect != 0 {
t.Fatalf("bad: %v", parts.Expect)
}
@ -44,9 +41,6 @@ func TestIsConsulServer(t *testing.T) {
if !parts.Bootstrap {
t.Fatalf("expected bootstrap")
}
if parts.Disabled == 0 {
t.Fatalf("expected disabled")
}
if parts.Addr.String() != "127.0.0.1:10000" {
t.Fatalf("bad addr: %v", parts.Addr)
}

View File

@ -14,27 +14,6 @@ import (
type consulServerEventTypes int
const (
// consulServersNodeJoin is used to notify of a new consulServer.
// The primary effect of this is a reshuffling of consulServers and
// finding a new preferredServer.
consulServersNodeJoin = iota
// consulServersRebalance is used to signal we should rebalance our
// connection load across servers
consulServersRebalance
// consulServersRefreshRebalanceDuration is used to signal when we
// should reset the rebalance duration because the server list has
// changed and we don't need to proactively change our connection
consulServersRefreshRebalanceDuration
// consulServersRPCError is used to signal when a server has either
// timed out or returned an error and we would like to have the
// server manager find a new preferredServer.
consulServersRPCError
)
const (
// clientRPCJitterFraction determines the amount of jitter added to
// clientRPCMinReuseDuration before a connection is expired and a new
@ -149,36 +128,11 @@ func (sm *ServerManager) AddServer(server *server_details.ServerDetails) {
copy(newServers, serverCfg.servers)
newServers = append(newServers, server)
serverCfg.servers = newServers
// Notify the server maintenance task of a new server
sm.consulServersCh <- consulServersNodeJoin
}
sm.saveServerConfig(serverCfg)
}
// CycleFailedServers takes out an internal write lock and dequeues all
// failed servers and re-enqueues them. This method does not reshuffle the
// server list, instead it requests the rebalance duration be refreshed/reset
// further into the future.
func (sm *ServerManager) CycleFailedServers() {
sm.serverConfigLock.Lock()
defer sm.serverConfigLock.Unlock()
serverCfg := sm.getServerConfig()
for i := range serverCfg.servers {
failCount := atomic.LoadUint64(&(serverCfg.servers[i].Disabled))
if failCount == 0 {
break
} else if failCount > 0 {
serverCfg.servers = serverCfg.cycleServer()
}
}
sm.saveServerConfig(serverCfg)
sm.requestRefreshRebalanceDuration()
}
// cycleServers returns a new list of servers that has dequeued the first
// server and enqueued it at the end of the list. cycleServers assumes the
// caller is holding the serverConfigLock.
@ -197,27 +151,16 @@ func (sc *serverConfig) cycleServer() (servers []*server_details.ServerDetails)
// FindHealthyServer takes out an internal "read lock" and searches through
// the list of servers to find a healthy server.
func (sm *ServerManager) FindHealthyServer() (server *server_details.ServerDetails) {
func (sm *ServerManager) FindHealthyServer() *server_details.ServerDetails {
serverCfg := sm.getServerConfig()
numServers := len(serverCfg.servers)
if numServers == 0 {
sm.logger.Printf("[ERR] consul: No servers found in the server config")
return nil
} else {
// Return whatever is at the front of the list
return serverCfg.servers[0]
}
// Find the first non-failing server in the server list. If this is
// not the first server a prior RPC call marked the first server as
// failed and we're waiting for the server management task to reorder
// a working server to the front of the list.
for i := range serverCfg.servers {
failCount := atomic.LoadUint64(&(serverCfg.servers[i].Disabled))
if failCount == 0 {
server = serverCfg.servers[i]
break
}
}
return server
}
// GetNumServers takes out an internal "read lock" and returns the number of
@ -254,13 +197,25 @@ func NewServerManager(logger *log.Logger, shutdownCh chan struct{}, serf *serf.S
// NotifyFailedServer is an exported convenience function that allows callers
// to pass in a server that has failed an RPC request and mark it as failed.
// This will initiate a background task that will optimize the failed server
// to the end of the serer list. No locks are required here because we are
// bypassing the serverConfig and sending a message to ServerManager's
// channel.
// If the server being failed is not the first server on the list, this is a
// noop. If, however, the server is failed and first on the list, acquire
// the lock, retest, and take the penalty of moving the server to the end of
// the list.
func (sm *ServerManager) NotifyFailedServer(server *server_details.ServerDetails) {
atomic.AddUint64(&server.Disabled, 1)
sm.consulServersCh <- consulServersRPCError
serverCfg := sm.getServerConfig()
if len(serverCfg.servers) > 0 && serverCfg.servers[0] == server {
// Grab a lock, retest, and take the hit of cycling the first
// server to the end.
sm.serverConfigLock.Lock() // FIXME(sean@): wtb TryLock
defer sm.serverConfigLock.Unlock()
serverCfg = sm.getServerConfig()
if len(serverCfg.servers) > 0 && serverCfg.servers[0] == server {
serverCfg.cycleServer()
sm.saveServerConfig(serverCfg)
}
}
}
// RebalanceServers takes out an internal write lock and shuffles the list of
@ -287,7 +242,6 @@ func (sm *ServerManager) RebalanceServers() {
serverCfg.servers = newServers
sm.saveServerConfig(serverCfg)
sm.requestRefreshRebalanceDuration()
}
// RemoveServer takes out an internal write lock and removes a server from
@ -324,12 +278,6 @@ func (sm *ServerManager) requestRefreshRebalanceDuration() {
sm.refreshRebalanceDurationCh <- true
}
// requestServerRebalance sends a message to which causes a background thread
// to reshuffle the list of servers
func (sm *ServerManager) requestServerRebalance() {
sm.consulServersCh <- consulServersRebalance
}
// refreshServerRebalanceTimer is called
func (sm *ServerManager) refreshServerRebalanceTimer(timer *time.Timer) {
serverCfg := sm.getServerConfig()
@ -359,11 +307,10 @@ func (sm *ServerManager) saveServerConfig(sc serverConfig) {
sm.serverConfigValue.Store(sc)
}
// StartServerManager is used to start and manage the task of automatically
// shuffling and rebalance the list of consul servers. This maintenance
// happens either when a new server is added or when a duration has been
// exceed.
func (sm *ServerManager) StartServerManager() {
// Start is used to start and manage the task of automatically shuffling and
// rebalance the list of consul servers. This maintenance happens either
// when a new server is added or when a duration has been exceed.
func (sm *ServerManager) Start() {
var rebalanceTimer *time.Timer = time.NewTimer(time.Duration(initialRebalanceTimeoutHours * time.Hour))
var rebalanceTaskDispatched int32
@ -382,51 +329,22 @@ func (sm *ServerManager) StartServerManager() {
for {
select {
case e := <-sm.consulServersCh:
switch e {
case consulServersNodeJoin:
sm.logger.Printf("[INFO] server manager: new node joined cluster")
// rebalance on new server
sm.requestServerRebalance()
case consulServersRebalance:
sm.logger.Printf("[INFO] server manager: rebalancing servers by request")
sm.RebalanceServers()
case consulServersRPCError:
sm.logger.Printf("[INFO] server manager: need to find a new server to talk with")
sm.CycleFailedServers()
// FIXME(sean@): wtb preemptive Status.Ping
// of servers, ideally parallel fan-out of N
// nodes, then settle on the first node which
// responds successfully.
//
// Is there a distinction between slow and
// offline? Do we run the Status.Ping with a
// fixed timeout (say 30s) that way we can
// alert administrators that they've set
// their RPC time too low even though the
// Ping did return successfully?
default:
sm.logger.Printf("[WARN] server manager: unhandled LAN Serf Event: %#v", e)
}
case <-sm.refreshRebalanceDurationCh:
chanLen := len(sm.refreshRebalanceDurationCh)
// Drain all messages from the rebalance channel
for i := 0; i < chanLen; i++ {
<-sm.refreshRebalanceDurationCh
}
case <-rebalanceTimer.C:
sm.logger.Printf("[INFO] server manager: server rebalance timeout")
sm.RebalanceServers()
// Only run one rebalance task at a time, but do
// allow for the channel to be drained
if atomic.CompareAndSwapInt32(&rebalanceTaskDispatched, 0, 1) {
sm.logger.Printf("[INFO] server manager: Launching rebalance duration task")
go func() {
defer atomic.StoreInt32(&rebalanceTaskDispatched, 0)
sm.refreshServerRebalanceTimer(rebalanceTimer)
}()
}
case <-rebalanceTimer.C:
sm.logger.Printf("[INFO] consul: server rebalance timeout")
sm.RebalanceServers()
case <-sm.shutdownCh:
sm.logger.Printf("[INFO] server manager: shutting down")
return
}
}

View File

@ -72,4 +72,4 @@ func TestServerManager_NewServerManager(t *testing.T) {
// func (sm *ServerManager) RemoveServer(server *server_details.ServerDetails) {
// func (sm *ServerManager) StartServerManager() {
// func (sm *ServerManager) Start() {