Refactor out the management of Consul servers
Move the management of c.consulServers (fka c.consuls) into consul/server_manager.go. This commit brings in a background task that proactively manages the server list and: *) reshuffles the list *) manages the timer out of the RPC() path *) uses atomics to detect a server has failed This is a WIP, more work in testing needs to be completed.
This commit is contained in:
parent
6bda2c007c
commit
09d4c6439c
112
consul/client.go
112
consul/client.go
|
@ -3,7 +3,6 @@ package consul
|
|||
import (
|
||||
"fmt"
|
||||
"log"
|
||||
"math/rand"
|
||||
"os"
|
||||
"path/filepath"
|
||||
"strconv"
|
||||
|
@ -54,18 +53,19 @@ type Client struct {
|
|||
// Connection pool to consul servers
|
||||
connPool *ConnPool
|
||||
|
||||
// consulServers tracks the locally known servers
|
||||
consulServers []*serverParts
|
||||
consulLock sync.RWMutex
|
||||
// serverConfig provides the necessary load/store semantics to
|
||||
// serverConfig
|
||||
serverConfigValue atomic.Value
|
||||
serverConfigMtx sync.Mutex
|
||||
|
||||
// consulServersCh is used to receive events related to the
|
||||
// maintenance of the list of consulServers
|
||||
consulServersCh chan consulServerEventTypes
|
||||
|
||||
// eventCh is used to receive events from the
|
||||
// serf cluster in the datacenter
|
||||
eventCh chan serf.Event
|
||||
|
||||
// preferredServer is the last server we made an RPC call to,
|
||||
// this is used to re-use the last connection
|
||||
preferredServer *serverParts
|
||||
|
||||
// Logger uses the provided LogOutput
|
||||
logger *log.Logger
|
||||
|
||||
|
@ -119,6 +119,13 @@ func NewClient(config *Config) (*Client, error) {
|
|||
shutdownCh: make(chan struct{}),
|
||||
}
|
||||
|
||||
// Create the initial serverConfig
|
||||
serverCfg := serverConfig{}
|
||||
c.serverConfigValue.Store(serverCfg)
|
||||
|
||||
// Start consulServers maintenance
|
||||
go c.consulServersManager()
|
||||
|
||||
// Start the Serf listeners to prevent a deadlock
|
||||
go c.lanEventHandler()
|
||||
|
||||
|
@ -266,23 +273,7 @@ func (c *Client) nodeJoin(me serf.MemberEvent) {
|
|||
continue
|
||||
}
|
||||
c.logger.Printf("[INFO] consul: adding server %s", parts)
|
||||
|
||||
// Check if this server is known
|
||||
found := false
|
||||
c.consulLock.Lock()
|
||||
for idx, existing := range c.consulServers {
|
||||
if existing.Name == parts.Name {
|
||||
c.consulServers[idx] = parts
|
||||
found = true
|
||||
break
|
||||
}
|
||||
}
|
||||
|
||||
// Add to the list if not known
|
||||
if !found {
|
||||
c.consulServers = append(c.consulServers, parts)
|
||||
}
|
||||
c.consulLock.Unlock()
|
||||
c.AddServer(parts)
|
||||
|
||||
// Trigger the callback
|
||||
if c.config.ServerUp != nil {
|
||||
|
@ -299,18 +290,7 @@ func (c *Client) nodeFail(me serf.MemberEvent) {
|
|||
continue
|
||||
}
|
||||
c.logger.Printf("[INFO] consul: removing server %s", parts)
|
||||
|
||||
// Remove the server if known
|
||||
c.consulLock.Lock()
|
||||
n := len(c.consulServers)
|
||||
for i := 0; i < n; i++ {
|
||||
if c.consulServers[i].Name == parts.Name {
|
||||
c.consulServers[i], c.consulServers[n-1] = c.consulServers[n-1], nil
|
||||
c.consulServers = c.consulServers[:n-1]
|
||||
break
|
||||
}
|
||||
}
|
||||
c.consulLock.Unlock()
|
||||
c.RemoveServer(parts)
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -344,61 +324,55 @@ func (c *Client) localEvent(event serf.UserEvent) {
|
|||
|
||||
// RPC is used to forward an RPC call to a consul server, or fail if no servers
|
||||
func (c *Client) RPC(method string, args interface{}, reply interface{}) error {
|
||||
// Check to make sure we haven't spent too much time querying a
|
||||
// single server
|
||||
now := time.Now()
|
||||
if !c.connRebalanceTime.IsZero() && now.After(c.connRebalanceTime) {
|
||||
c.logger.Printf("[DEBUG] consul: connection time to server %s exceeded, rotating server connection", c.preferredServer.Addr)
|
||||
c.preferredServer = nil
|
||||
serverCfgPtr := c.serverConfigValue.Load()
|
||||
if serverCfgPtr == nil {
|
||||
c.logger.Printf("[ERR] consul: Failed to load a server config")
|
||||
return structs.ErrNoServers
|
||||
}
|
||||
serverCfg := serverCfgPtr.(serverConfig)
|
||||
|
||||
// Allocate these vars on the stack before the goto
|
||||
var numConsulServers int
|
||||
var clusterWideRebalanceConnsPerSec float64
|
||||
var connReuseLowWaterMark time.Duration
|
||||
var numLANMembers int
|
||||
|
||||
var server *serverParts
|
||||
if c.preferredServer != nil {
|
||||
server = c.preferredServer
|
||||
goto TRY_RPC
|
||||
}
|
||||
|
||||
// Bail if we can't find any servers
|
||||
c.consulLock.RLock()
|
||||
numConsulServers = len(c.consulServers)
|
||||
if numConsulServers == 0 {
|
||||
c.consulLock.RUnlock()
|
||||
numServers := len(serverCfg.servers)
|
||||
if numServers == 0 {
|
||||
c.logger.Printf("[ERR] consul: No servers found in the server config")
|
||||
return structs.ErrNoServers
|
||||
}
|
||||
|
||||
// Select a random addr
|
||||
server = c.consulServers[rand.Int31n(int32(numConsulServers))]
|
||||
c.consulLock.RUnlock()
|
||||
// Find the first non-failing server in the server list. If this is
|
||||
// not the first server a prior RPC call marked the first server as
|
||||
// failed and we're waiting for the server management task to reorder
|
||||
// a working server to the front of the list.
|
||||
var server *serverParts
|
||||
for i := range serverCfg.servers {
|
||||
failCount := atomic.LoadUint64(&(serverCfg.servers[i].Disabled))
|
||||
if failCount == 0 {
|
||||
server = serverCfg.servers[i]
|
||||
break
|
||||
}
|
||||
}
|
||||
|
||||
// Forward to remote Consul
|
||||
TRY_RPC:
|
||||
if err := c.connPool.RPC(c.config.Datacenter, server.Addr, server.Version, method, args, reply); err != nil {
|
||||
c.connRebalanceTime = time.Time{}
|
||||
c.preferredServer = nil
|
||||
atomic.AddUint64(&server.Disabled, 1)
|
||||
c.logger.Printf("[ERR] consul: RPC failed to server %s: %v", server.Addr, err)
|
||||
c.consulServersCh <- consulServersRPCError
|
||||
return err
|
||||
}
|
||||
|
||||
// Cache the last server as our preferred server
|
||||
_ = atomic.StorePointer(&c.preferredServer, server)
|
||||
return nil
|
||||
}
|
||||
|
||||
// Stats is used to return statistics for debugging and insight
|
||||
// for various sub-systems
|
||||
func (c *Client) Stats() map[string]map[string]string {
|
||||
serverCfg := c.serverConfigValue.Load().(serverConfig)
|
||||
|
||||
toString := func(v uint64) string {
|
||||
return strconv.FormatUint(v, 10)
|
||||
}
|
||||
stats := map[string]map[string]string{
|
||||
"consul": map[string]string{
|
||||
"server": "false",
|
||||
"known_servers": toString(uint64(len(c.consulServers))),
|
||||
"known_servers": toString(uint64(len(serverCfg.servers))),
|
||||
},
|
||||
"serf_lan": c.serf.Stats(),
|
||||
"runtime": runtimeStats(),
|
||||
|
|
|
@ -95,7 +95,8 @@ func TestClient_JoinLAN(t *testing.T) {
|
|||
|
||||
// Check we have a new consul
|
||||
testutil.WaitForResult(func() (bool, error) {
|
||||
return len(c1.consulServers) == 1, nil
|
||||
serverCfg := c1.serverConfigValue.Load().(serverConfig)
|
||||
return len(serverCfg.servers) == 1, nil
|
||||
}, func(err error) {
|
||||
t.Fatalf("expected consul server")
|
||||
})
|
||||
|
|
|
@ -0,0 +1,212 @@
|
|||
package consul
|
||||
|
||||
import (
|
||||
"math/rand"
|
||||
"sync/atomic"
|
||||
"time"
|
||||
|
||||
"github.com/hashicorp/consul/lib"
|
||||
)
|
||||
|
||||
type consulServerEventTypes int
|
||||
|
||||
const (
|
||||
// consulServersNodeJoin is used to notify of a new consulServer.
|
||||
// The primary effect of this is a reshuffling of consulServers and
|
||||
// finding a new preferredServer.
|
||||
consulServersNodeJoin = iota
|
||||
|
||||
// consulServersRebalance is used to signal we should rebalance our
|
||||
// connection load across servers
|
||||
consulServersRebalance
|
||||
|
||||
// consulServersRPCError is used to signal when a server has either
|
||||
// timed out or returned an error and we would like to have the
|
||||
// server manager find a new preferredServer.
|
||||
consulServersRPCError
|
||||
)
|
||||
|
||||
// serverCfg is the thread-safe configuration structure that is used to
|
||||
// maintain the list of consul servers in Client.
|
||||
//
|
||||
// NOTE(sean@): We are explicitly relying on the fact that this is copied.
|
||||
// Please keep this structure light.
|
||||
type serverConfig struct {
|
||||
// servers tracks the locally known servers
|
||||
servers []*serverParts
|
||||
|
||||
// Timer used to control rebalancing of servers
|
||||
rebalanceTimer *time.Timer
|
||||
}
|
||||
|
||||
// consulServersManager is used to automatically shuffle and rebalance the
|
||||
// list of consulServers. This maintenance happens either when a new server
|
||||
// is added or when a duration has been exceed.
|
||||
func (c *Client) consulServersManager() {
|
||||
defaultTimeout := 5 * time.Second // FIXME(sean@): This is a bullshit value
|
||||
var rebalanceTimer *time.Timer
|
||||
func(c *Client) {
|
||||
c.serverConfigMtx.Lock()
|
||||
defer c.serverConfigMtx.Unlock()
|
||||
|
||||
serverCfgPtr := c.serverConfigValue.Load()
|
||||
if serverCfgPtr == nil {
|
||||
panic("server config has not been initialized")
|
||||
}
|
||||
var serverCfg serverConfig
|
||||
serverCfg = serverCfgPtr.(serverConfig)
|
||||
rebalanceTimer = time.NewTimer(defaultTimeout)
|
||||
serverCfg.rebalanceTimer = rebalanceTimer
|
||||
}(c)
|
||||
|
||||
for {
|
||||
select {
|
||||
case e := <-c.consulServersCh:
|
||||
switch e {
|
||||
case consulServersNodeJoin:
|
||||
c.logger.Printf("[INFO] consul: new node joined cluster")
|
||||
c.RebalanceServers()
|
||||
case consulServersRebalance:
|
||||
c.logger.Printf("[INFO] consul: rebalancing servers by request")
|
||||
c.RebalanceServers()
|
||||
case consulServersRPCError:
|
||||
c.logger.Printf("[INFO] consul: need to find a new server to talk with")
|
||||
c.CycleFailedServers()
|
||||
// FIXME(sean@): wtb preemptive Status.Ping
|
||||
// of servers, ideally parallel fan-out of N
|
||||
// nodes, then settle on the first node which
|
||||
// responds successfully.
|
||||
//
|
||||
// Is there a distinction between slow and
|
||||
// offline? Do we run the Status.Ping with a
|
||||
// fixed timeout (say 30s) that way we can
|
||||
// alert administrators that they've set
|
||||
// their RPC time too low even though the
|
||||
// Ping did return successfully?
|
||||
default:
|
||||
c.logger.Printf("[WARN] consul: unhandled LAN Serf Event: %#v", e)
|
||||
}
|
||||
case <-rebalanceTimer.C:
|
||||
c.logger.Printf("[INFO] consul: server rebalance timeout")
|
||||
c.RebalanceServers()
|
||||
|
||||
case <-c.shutdownCh:
|
||||
return
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func (c *Client) AddServer(server *serverParts) {
|
||||
c.serverConfigMtx.Lock()
|
||||
defer c.serverConfigMtx.Unlock()
|
||||
serverCfg := c.serverConfigValue.Load().(serverConfig)
|
||||
|
||||
// Check if this server is known
|
||||
found := false
|
||||
for idx, existing := range serverCfg.servers {
|
||||
if existing.Name == server.Name {
|
||||
// Overwrite the existing server parts in order to
|
||||
// possibly update metadata (i.e. server version)
|
||||
serverCfg.servers[idx] = server
|
||||
found = true
|
||||
break
|
||||
}
|
||||
}
|
||||
|
||||
// Add to the list if not known
|
||||
if !found {
|
||||
serverCfg.servers = append(serverCfg.servers, server)
|
||||
|
||||
// Notify the server maintenance task of a new server
|
||||
c.consulServersCh <- consulServersNodeJoin
|
||||
}
|
||||
|
||||
c.serverConfigValue.Store(serverCfg)
|
||||
|
||||
}
|
||||
|
||||
func (c *Client) CycleFailedServers() {
|
||||
c.serverConfigMtx.Lock()
|
||||
defer c.serverConfigMtx.Unlock()
|
||||
serverCfg := c.serverConfigValue.Load().(serverConfig)
|
||||
|
||||
for i := range serverCfg.servers {
|
||||
failCount := atomic.LoadUint64(&(serverCfg.servers[i].Disabled))
|
||||
if failCount == 0 {
|
||||
break
|
||||
} else if failCount > 0 {
|
||||
serverCfg.servers = serverCfg.cycleServer()
|
||||
}
|
||||
}
|
||||
|
||||
serverCfg.resetRebalanceTimer(c)
|
||||
c.serverConfigValue.Store(serverCfg)
|
||||
}
|
||||
|
||||
func (sc *serverConfig) cycleServer() (servers []*serverParts) {
|
||||
numServers := len(servers)
|
||||
if numServers < 2 {
|
||||
// No action required for zero or one server situations
|
||||
return servers
|
||||
}
|
||||
|
||||
var failedNode *serverParts
|
||||
failedNode, servers = servers[0], servers[1:]
|
||||
servers = append(servers, failedNode)
|
||||
return servers
|
||||
}
|
||||
|
||||
func (c *Client) RebalanceServers() {
|
||||
c.serverConfigMtx.Lock()
|
||||
defer c.serverConfigMtx.Unlock()
|
||||
serverCfg := c.serverConfigValue.Load().(serverConfig)
|
||||
|
||||
// Shuffle the server list on server join. Servers are selected from
|
||||
// the head of the list and are moved to the end of the list on
|
||||
// failure.
|
||||
for i := len(serverCfg.servers) - 1; i > 0; i-- {
|
||||
j := rand.Int31n(int32(i + 1))
|
||||
serverCfg.servers[i], serverCfg.servers[j] = serverCfg.servers[j], serverCfg.servers[i]
|
||||
}
|
||||
|
||||
serverCfg.resetRebalanceTimer(c)
|
||||
c.serverConfigValue.Store(serverCfg)
|
||||
}
|
||||
|
||||
func (c *Client) RemoveServer(server *serverParts) {
|
||||
c.serverConfigMtx.Lock()
|
||||
defer c.serverConfigMtx.Unlock()
|
||||
serverCfg := c.serverConfigValue.Load().(serverConfig)
|
||||
|
||||
// Remove the server if known
|
||||
n := len(serverCfg.servers)
|
||||
for i := 0; i < n; i++ {
|
||||
if serverCfg.servers[i].Name == server.Name {
|
||||
serverCfg.servers[i], serverCfg.servers[n-1] = serverCfg.servers[n-1], nil
|
||||
serverCfg.servers = serverCfg.servers[:n-1]
|
||||
break
|
||||
}
|
||||
}
|
||||
|
||||
c.serverConfigValue.Store(serverCfg)
|
||||
|
||||
}
|
||||
|
||||
// resetRebalanceTimer assumes:
|
||||
//
|
||||
// 1) the serverConfigMtx is already held by the caller.
|
||||
// 2) the caller will call serverConfigValue.Store()
|
||||
func (sc *serverConfig) resetRebalanceTimer(c *Client) {
|
||||
numConsulServers := len(sc.servers)
|
||||
// Limit this connection's life based on the size (and health) of the
|
||||
// cluster. Never rebalance a connection more frequently than
|
||||
// connReuseLowWatermarkDuration, and make sure we never exceed
|
||||
// clusterWideRebalanceConnsPerSec operations/s across numLANMembers.
|
||||
clusterWideRebalanceConnsPerSec := float64(numConsulServers * newRebalanceConnsPerSecPerServer)
|
||||
connReuseLowWatermarkDuration := clientRPCMinReuseDuration + lib.RandomStagger(clientRPCMinReuseDuration/clientRPCJitterFraction)
|
||||
numLANMembers := len(c.LANMembers())
|
||||
connRebalanceTimeout := lib.RateScaledInterval(clusterWideRebalanceConnsPerSec, connReuseLowWatermarkDuration, numLANMembers)
|
||||
c.logger.Printf("[DEBUG] consul: connection will be rebalanced in %v", connRebalanceTimeout)
|
||||
|
||||
sc.rebalanceTimer.Reset(connRebalanceTimeout)
|
||||
}
|
Loading…
Reference in New Issue