Proactively ping server before rotation
Before shuffling the server list, proactively ping the next server in the list to establish the connection and verify the remote endpoint is healthy.
This commit is contained in:
parent
b3a8e2f115
commit
2d9982eb27
|
@ -119,7 +119,7 @@ func NewClient(config *Config) (*Client, error) {
|
||||||
shutdownCh: make(chan struct{}),
|
shutdownCh: make(chan struct{}),
|
||||||
}
|
}
|
||||||
|
|
||||||
c.serverMgr = server_manager.New(c.logger, c.shutdownCh, c.serf)
|
c.serverMgr = server_manager.New(c.logger, c.shutdownCh, c.serf, c.connPool)
|
||||||
|
|
||||||
// Start maintenance task for serverMgr
|
// Start maintenance task for serverMgr
|
||||||
go c.serverMgr.Start()
|
go c.serverMgr.Start()
|
||||||
|
|
|
@ -10,6 +10,7 @@ import (
|
||||||
"sync/atomic"
|
"sync/atomic"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
|
"github.com/hashicorp/consul/consul/server_details"
|
||||||
"github.com/hashicorp/consul/tlsutil"
|
"github.com/hashicorp/consul/tlsutil"
|
||||||
"github.com/hashicorp/net-rpc-msgpackrpc"
|
"github.com/hashicorp/net-rpc-msgpackrpc"
|
||||||
"github.com/hashicorp/yamux"
|
"github.com/hashicorp/yamux"
|
||||||
|
@ -405,6 +406,30 @@ func (p *ConnPool) RPC(dc string, addr net.Addr, version int, method string, arg
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// PingConsulServer sends a Status.Ping message to the specified server and
|
||||||
|
// returns true if healthy, false if an error occurred
|
||||||
|
func (p *ConnPool) PingConsulServer(s *server_details.ServerDetails) bool {
|
||||||
|
// Get a usable client
|
||||||
|
conn, sc, err := p.getClient(s.Datacenter, s.Addr, s.Version)
|
||||||
|
if err != nil {
|
||||||
|
return false
|
||||||
|
}
|
||||||
|
|
||||||
|
// Make the RPC call
|
||||||
|
var out struct{}
|
||||||
|
err = msgpackrpc.CallWithCodec(sc.codec, "Status.Ping", struct{}{}, &out)
|
||||||
|
if err != nil {
|
||||||
|
sc.Close()
|
||||||
|
p.releaseConn(conn)
|
||||||
|
return false
|
||||||
|
}
|
||||||
|
|
||||||
|
// Done with the connection
|
||||||
|
conn.returnClient(sc)
|
||||||
|
p.releaseConn(conn)
|
||||||
|
return true
|
||||||
|
}
|
||||||
|
|
||||||
// Reap is used to close conns open over maxTime
|
// Reap is used to close conns open over maxTime
|
||||||
func (p *ConnPool) reap() {
|
func (p *ConnPool) reap() {
|
||||||
for {
|
for {
|
||||||
|
|
|
@ -53,6 +53,12 @@ type ConsulClusterInfo interface {
|
||||||
NumNodes() int
|
NumNodes() int
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// ConnPoolTester is an interface wrapping client.ConnPool to prevent a
|
||||||
|
// cyclic import dependency
|
||||||
|
type ConnPoolPinger interface {
|
||||||
|
PingConsulServer(server *server_details.ServerDetails) bool
|
||||||
|
}
|
||||||
|
|
||||||
// serverCfg is the thread-safe configuration struct used to maintain the
|
// serverCfg is the thread-safe configuration struct used to maintain the
|
||||||
// list of Consul servers in ServerManager.
|
// list of Consul servers in ServerManager.
|
||||||
//
|
//
|
||||||
|
@ -80,6 +86,11 @@ type ServerManager struct {
|
||||||
// connections. ConsulClusterInfo is an interface that wraps serf.
|
// connections. ConsulClusterInfo is an interface that wraps serf.
|
||||||
clusterInfo ConsulClusterInfo
|
clusterInfo ConsulClusterInfo
|
||||||
|
|
||||||
|
// connPoolPinger is used to test the health of a server in the
|
||||||
|
// connection pool. ConnPoolPinger is an interface that wraps
|
||||||
|
// client.ConnPool.
|
||||||
|
connPoolPinger ConnPoolPinger
|
||||||
|
|
||||||
// notifyFailedServersBarrier is acts as a barrier to prevent
|
// notifyFailedServersBarrier is acts as a barrier to prevent
|
||||||
// queueing behind serverConfigLog and acts as a TryLock().
|
// queueing behind serverConfigLog and acts as a TryLock().
|
||||||
notifyFailedBarrier int32
|
notifyFailedBarrier int32
|
||||||
|
@ -142,10 +153,23 @@ func (sc *serverConfig) cycleServer() (servers []*server_details.ServerDetails)
|
||||||
newServers = append(newServers, sc.servers[0])
|
newServers = append(newServers, sc.servers[0])
|
||||||
|
|
||||||
// FIXME(sean@): Is it worth it to fire off a go routine and
|
// FIXME(sean@): Is it worth it to fire off a go routine and
|
||||||
// TestConsulServer?
|
// PingConsulServer?
|
||||||
return newServers
|
return newServers
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// removeServerByKey performs an inline removal of the first matching server
|
||||||
|
func (sc *serverConfig) removeServerByKey(targetKey *server_details.Key) {
|
||||||
|
for i, s := range sc.servers {
|
||||||
|
if targetKey.Equal(s.Key()) {
|
||||||
|
// Delete the target server
|
||||||
|
copy(sc.servers[i:], sc.servers[i+1:])
|
||||||
|
sc.servers[len(sc.servers)-1] = nil
|
||||||
|
sc.servers = sc.servers[:len(sc.servers)-1]
|
||||||
|
return
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
// shuffleServers shuffles the server list in place
|
// shuffleServers shuffles the server list in place
|
||||||
func (sc *serverConfig) shuffleServers() {
|
func (sc *serverConfig) shuffleServers() {
|
||||||
newServers := make([]*server_details.ServerDetails, len(sc.servers))
|
newServers := make([]*server_details.ServerDetails, len(sc.servers))
|
||||||
|
@ -193,11 +217,11 @@ func (sm *ServerManager) saveServerConfig(sc serverConfig) {
|
||||||
}
|
}
|
||||||
|
|
||||||
// New is the only way to safely create a new ServerManager struct.
|
// New is the only way to safely create a new ServerManager struct.
|
||||||
func New(logger *log.Logger, shutdownCh chan struct{}, clusterInfo ConsulClusterInfo) (sm *ServerManager) {
|
func New(logger *log.Logger, shutdownCh chan struct{}, clusterInfo ConsulClusterInfo, connPoolPinger ConnPoolPinger) (sm *ServerManager) {
|
||||||
// NOTE(sean@): Can't pass *consul.Client due to an import cycle
|
|
||||||
sm = new(ServerManager)
|
sm = new(ServerManager)
|
||||||
sm.logger = logger
|
sm.logger = logger
|
||||||
sm.clusterInfo = clusterInfo
|
sm.clusterInfo = clusterInfo // can't pass *consul.Client: import cycle
|
||||||
|
sm.connPoolPinger = connPoolPinger // can't pass *consul.ConnPool: import cycle
|
||||||
sm.shutdownCh = shutdownCh
|
sm.shutdownCh = shutdownCh
|
||||||
|
|
||||||
sc := serverConfig{}
|
sc := serverConfig{}
|
||||||
|
@ -243,27 +267,120 @@ func (sm *ServerManager) NumServers() (numServers int) {
|
||||||
return numServers
|
return numServers
|
||||||
}
|
}
|
||||||
|
|
||||||
// RebalanceServers takes out an internal write lock and shuffles the list of
|
// RebalanceServers shuffles the list of servers on this agent. The server
|
||||||
// servers on this agent. This allows for a redistribution of work across
|
// at the front of the list is selected for the next RPC. RPC calls that
|
||||||
// consul servers and provides a guarantee that the order of the server list
|
// fail for a particular server are rotated to the end of the list. This
|
||||||
// isn't related to the age at which the node was added to the cluster.
|
// method reshuffles the list periodically in order to redistribute work
|
||||||
// Elsewhere we rely on the position in the server list as a hint regarding
|
// across all known consul servers (i.e. guarantee that the order of servers
|
||||||
// the stability of a server relative to its position in the server list.
|
// in the server list isn't positively correlated with the age of a server in
|
||||||
// Servers at or near the front of the list are more stable than servers near
|
// the consul cluster). Periodically shuffling the server list prevents
|
||||||
// the end of the list. Unhealthy servers are removed when serf notices the
|
// long-lived clients from fixating on long-lived servers.
|
||||||
// server has been deregistered.
|
//
|
||||||
serverCfg.servers = newServers
|
// Unhealthy servers are removed when serf notices the server has been
|
||||||
serverCfg.servers = newServers
|
// deregistered. Before the newly shuffled server list is saved, the new
|
||||||
|
// remote endpoint is tested to ensure its responsive.
|
||||||
func (sm *ServerManager) RebalanceServers() {
|
func (sm *ServerManager) RebalanceServers() {
|
||||||
sm.serverConfigLock.Lock()
|
FAILED_SERVER_DURING_REBALANCE:
|
||||||
defer sm.serverConfigLock.Unlock()
|
// Obtain a copy of the server config
|
||||||
serverCfg := sm.getServerConfig()
|
serverCfg := sm.getServerConfig()
|
||||||
|
|
||||||
|
// Early abort if there is no value to shuffling
|
||||||
serverCfg.shuffleServers()
|
if len(serverCfg.servers) < 2 {
|
||||||
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
sm.saveServerConfig(serverCfg)
|
serverCfg.shuffleServers()
|
||||||
|
|
||||||
|
// Iterate through the shuffled server list to find a healthy server.
|
||||||
|
// Don't iterate on the list directly, this loop mutates server the
|
||||||
|
// list.
|
||||||
|
var foundHealthyServer bool
|
||||||
|
for n := len(serverCfg.servers); n > 0; n-- {
|
||||||
|
// Always test the first server. Failed servers are cycled
|
||||||
|
// while Serf detects the node has failed.
|
||||||
|
selectedServer := serverCfg.servers[0]
|
||||||
|
|
||||||
|
sm.logger.Printf("[INFO] server manager: Preemptively testing server %s before rebalance", selectedServer.String())
|
||||||
|
ok := sm.connPoolPinger.PingConsulServer(selectedServer)
|
||||||
|
if ok {
|
||||||
|
foundHealthyServer = true
|
||||||
|
break
|
||||||
|
}
|
||||||
|
serverCfg.cycleServer()
|
||||||
|
}
|
||||||
|
|
||||||
|
// If no healthy servers were found, sleep and wait for Serf to make
|
||||||
|
// the world a happy place again.
|
||||||
|
if !foundHealthyServer {
|
||||||
|
const backoffDuration = 1 * time.Second
|
||||||
|
sm.logger.Printf("[INFO] server manager: No servers available, sleeping for %v", backoffDuration)
|
||||||
|
|
||||||
|
// Sleep with no locks
|
||||||
|
time.Sleep(backoffDuration)
|
||||||
|
goto FAILED_SERVER_DURING_REBALANCE
|
||||||
|
}
|
||||||
|
|
||||||
|
// Verify that all servers are present. Use an anonymous func to
|
||||||
|
// ensure lock is released when exiting the critical section.
|
||||||
|
reconcileServerLists := func() bool {
|
||||||
|
sm.serverConfigLock.Lock()
|
||||||
|
defer sm.serverConfigLock.Unlock()
|
||||||
|
tmpServerCfg := sm.getServerConfig()
|
||||||
|
|
||||||
|
type targetServer struct {
|
||||||
|
server *server_details.ServerDetails
|
||||||
|
|
||||||
|
// 'b' == both
|
||||||
|
// 'o' == original
|
||||||
|
// 'n' == new
|
||||||
|
state byte
|
||||||
|
}
|
||||||
|
mergedList := make(map[server_details.Key]*targetServer)
|
||||||
|
for _, s := range serverCfg.servers {
|
||||||
|
mergedList[*s.Key()] = &targetServer{server: s, state: 'o'}
|
||||||
|
}
|
||||||
|
for _, s := range tmpServerCfg.servers {
|
||||||
|
k := s.Key()
|
||||||
|
_, found := mergedList[*k]
|
||||||
|
if found {
|
||||||
|
mergedList[*k].state = 'b'
|
||||||
|
} else {
|
||||||
|
mergedList[*k] = &targetServer{server: s, state: 'n'}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Ensure the selected server has not been removed by Serf
|
||||||
|
selectedServerKey := serverCfg.servers[0].Key()
|
||||||
|
if v, found := mergedList[*selectedServerKey]; found && v.state == 'o' {
|
||||||
|
return false
|
||||||
|
}
|
||||||
|
|
||||||
|
// Add any new servers and remove any old servers
|
||||||
|
for k, v := range mergedList {
|
||||||
|
switch v.state {
|
||||||
|
case 'b':
|
||||||
|
// Do nothing, server exists in both
|
||||||
|
case 'o':
|
||||||
|
// Server has been removed
|
||||||
|
serverCfg.removeServerByKey(&k)
|
||||||
|
case 'n':
|
||||||
|
// Server added
|
||||||
|
serverCfg.servers = append(serverCfg.servers, v.server)
|
||||||
|
default:
|
||||||
|
panic("not implemented")
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
sm.saveServerConfig(serverCfg)
|
||||||
|
return true
|
||||||
|
}
|
||||||
|
|
||||||
|
if !reconcileServerLists() {
|
||||||
|
goto FAILED_SERVER_DURING_REBALANCE
|
||||||
|
}
|
||||||
|
|
||||||
|
sm.logger.Printf("[INFO] server manager: Rebalancing server connections complete")
|
||||||
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
// RemoveServer takes out an internal write lock and removes a server from
|
// RemoveServer takes out an internal write lock and removes a server from
|
||||||
|
|
|
@ -25,6 +25,13 @@ func GetBufferedLogger() *log.Logger {
|
||||||
return localLogger
|
return localLogger
|
||||||
}
|
}
|
||||||
|
|
||||||
|
type fauxConnPool struct {
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *fauxConnPool) TestConsulServer(server *server_details.ServerDetails) bool {
|
||||||
|
return true
|
||||||
|
}
|
||||||
|
|
||||||
type fauxSerf struct {
|
type fauxSerf struct {
|
||||||
numNodes int
|
numNodes int
|
||||||
}
|
}
|
||||||
|
@ -36,7 +43,7 @@ func (s *fauxSerf) NumNodes() int {
|
||||||
func testServerManager() (sm *ServerManager) {
|
func testServerManager() (sm *ServerManager) {
|
||||||
logger := GetBufferedLogger()
|
logger := GetBufferedLogger()
|
||||||
shutdownCh := make(chan struct{})
|
shutdownCh := make(chan struct{})
|
||||||
sm = New(logger, shutdownCh, &fauxSerf{numNodes: 16384})
|
sm = New(logger, shutdownCh, &fauxSerf{numNodes: 16384}, &fauxConnPool{})
|
||||||
return sm
|
return sm
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -171,7 +178,7 @@ func TestServerManagerInternal_refreshServerRebalanceTimer(t *testing.T) {
|
||||||
}
|
}
|
||||||
|
|
||||||
for _, s := range clusters {
|
for _, s := range clusters {
|
||||||
sm := New(logger, shutdownCh, &fauxSerf{numNodes: s.numNodes})
|
sm := New(logger, shutdownCh, &fauxSerf{numNodes: s.numNodes}, &fauxConnPool{})
|
||||||
|
|
||||||
for i := 0; i < s.numServers; i++ {
|
for i := 0; i < s.numServers; i++ {
|
||||||
nodeName := fmt.Sprintf("s%02d", i)
|
nodeName := fmt.Sprintf("s%02d", i)
|
||||||
|
|
|
@ -26,6 +26,13 @@ func GetBufferedLogger() *log.Logger {
|
||||||
return localLogger
|
return localLogger
|
||||||
}
|
}
|
||||||
|
|
||||||
|
type fauxConnPool struct {
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *fauxConnPool) TestConsulServer(server *server_details.ServerDetails) bool {
|
||||||
|
return true
|
||||||
|
}
|
||||||
|
|
||||||
type fauxSerf struct {
|
type fauxSerf struct {
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -37,7 +44,7 @@ func testServerManager() (sm *server_manager.ServerManager) {
|
||||||
logger := GetBufferedLogger()
|
logger := GetBufferedLogger()
|
||||||
logger = log.New(os.Stderr, "", log.LstdFlags)
|
logger = log.New(os.Stderr, "", log.LstdFlags)
|
||||||
shutdownCh := make(chan struct{})
|
shutdownCh := make(chan struct{})
|
||||||
sm = server_manager.New(logger, shutdownCh, &fauxSerf{})
|
sm = server_manager.New(logger, shutdownCh, &fauxSerf{}, &fauxConnPool{})
|
||||||
return sm
|
return sm
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -124,7 +131,7 @@ func TestServerManager_New(t *testing.T) {
|
||||||
logger := GetBufferedLogger()
|
logger := GetBufferedLogger()
|
||||||
logger = log.New(os.Stderr, "", log.LstdFlags)
|
logger = log.New(os.Stderr, "", log.LstdFlags)
|
||||||
shutdownCh := make(chan struct{})
|
shutdownCh := make(chan struct{})
|
||||||
sm := server_manager.New(logger, shutdownCh, &fauxSerf{})
|
sm := server_manager.New(logger, shutdownCh, &fauxSerf{}, &fauxConnPool{})
|
||||||
if sm == nil {
|
if sm == nil {
|
||||||
t.Fatalf("ServerManager nil")
|
t.Fatalf("ServerManager nil")
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue