Update comments to reflect reality

This commit is contained in:
Sean Chittenden 2016-02-24 19:51:56 -08:00
parent 1a09a5b2cf
commit b906e40811
1 changed files with 45 additions and 33 deletions

View File

@ -59,23 +59,23 @@ type ConsulClusterInfo interface {
// serverCfg is the thread-safe configuration structure that is used to
// maintain the list of consul servers in Client.
//
// NOTE(sean@): We are explicitly relying on the fact that this is copied.
// Please keep this structure light.
// NOTE(sean@): We are explicitly relying on the fact that serverConfig will
// be copied onto the stack. Please keep this structure light.
type serverConfig struct {
// servers tracks the locally known servers
// servers tracks the locally known servers. List membership is
// maintained by Serf.
servers []*server_details.ServerDetails
}
type ServerManager struct {
// serverConfig provides the necessary load/store semantics to
// serverConfig
// serverConfig provides the necessary load/store semantics for the
// server list.
serverConfigValue atomic.Value
serverConfigLock sync.Mutex
// shutdownCh is a copy of the channel in consul.Client
shutdownCh chan struct{}
// logger uses the provided LogOutput
logger *log.Logger
// serf is used to estimate the approximate number of nodes in a
@ -89,8 +89,10 @@ type ServerManager struct {
}
// AddServer takes out an internal write lock and adds a new server. If the
// server is not known, it adds the new server and schedules a rebalance. If
// it is known, we merge the new server details.
// server is not known, appends the server to the list. The new server will
// begin seeing use after the rebalance timer fires or enough servers fail
// organically. If the server is already known, merge the new server
// details.
func (sm *ServerManager) AddServer(server *server_details.ServerDetails) {
sm.serverConfigLock.Lock()
defer sm.serverConfigLock.Unlock()
@ -130,8 +132,7 @@ func (sm *ServerManager) AddServer(server *server_details.ServerDetails) {
func (sc *serverConfig) cycleServer() (servers []*server_details.ServerDetails) {
numServers := len(sc.servers)
if numServers < 2 {
// No action required
return servers
return servers // No action required
}
newServers := make([]*server_details.ServerDetails, 0, numServers)
@ -141,7 +142,11 @@ func (sc *serverConfig) cycleServer() (servers []*server_details.ServerDetails)
}
// FindHealthyServer takes out an internal "read lock" and searches through
// the list of servers to find a healthy server.
// the list of servers to find a "healthy" server. If the server is actually
// unhealthy, we rely on Serf to detect this and remove the node from the
// server list. If the server at the front of the list has failed or fails
// during an RPC call, it is rotated to the end of the list. If there are no
// servers available, return nil.
func (sm *ServerManager) FindHealthyServer() *server_details.ServerDetails {
serverCfg := sm.getServerConfig()
numServers := len(serverCfg.servers)
@ -149,7 +154,10 @@ func (sm *ServerManager) FindHealthyServer() *server_details.ServerDetails {
sm.logger.Printf("[ERR] consul: No servers found in the server config")
return nil
} else {
// Return whatever is at the front of the list
// Return whatever is at the front of the list because it is
// assumed to be the oldest in the server list (unless -
// hypothetically - the server list was rotated right after a
// server was added).
return serverCfg.servers[0]
}
}
@ -170,28 +178,29 @@ func (sm *ServerManager) getServerConfig() serverConfig {
// NewServerManager is the only way to safely create a new ServerManager
// struct.
func NewServerManager(logger *log.Logger, shutdownCh chan struct{}, cci ConsulClusterInfo) (sm *ServerManager) {
func NewServerManager(logger *log.Logger, shutdownCh chan struct{}, clusterInfo ConsulClusterInfo) (sm *ServerManager) {
// NOTE(sean@): Can't pass *consul.Client due to an import cycle
sm = new(ServerManager)
sm.logger = logger
sm.clusterInfo = cci
sm.clusterInfo = clusterInfo
sm.shutdownCh = shutdownCh
sc := serverConfig{}
sc.servers = make([]*server_details.ServerDetails, 0)
sm.serverConfigValue.Store(sc)
sm.saveServerConfig(sc)
return sm
}
// NotifyFailedServer is an exported convenience function that allows callers
// to pass in a server that has failed an RPC request and mark it as failed.
// If the server being failed is not the first server on the list, this is a
// noop. If, however, the server is failed and first on the list, acquire
// the lock, retest, and take the penalty of moving the server to the end of
// the list.
// NotifyFailedServer marks the passed in server as "failed" by rotating it
// to the end of the server list.
func (sm *ServerManager) NotifyFailedServer(server *server_details.ServerDetails) {
serverCfg := sm.getServerConfig()
// If the server being failed is not the first server on the list,
// this is a noop. If, however, the server is failed and first on
// the list, acquire the lock, retest, and take the penalty of moving
// the server to the end of the list.
// Use atomic.CAS to emulate a TryLock().
if len(serverCfg.servers) > 0 && serverCfg.servers[0] == server &&
atomic.CompareAndSwapInt32(&sm.notifyFailedBarrier, 0, 1) {
@ -212,10 +221,12 @@ func (sm *ServerManager) NotifyFailedServer(server *server_details.ServerDetails
// RebalanceServers takes out an internal write lock and shuffles the list of
// servers on this agent. This allows for a redistribution of work across
// consul servers and provides a guarantee that the order list of
// ServerDetails isn't actually ordered, therefore we can sequentially walk
// the array to pick a server without all agents in the cluster dog piling on
// a single node.
// consul servers and provides a guarantee that the order of the server list
// isn't related to the age at which the node was added to the cluster.
// Elsewhere we rely on the position in the server list as a hint regarding
// the stability of a server relative to its position in the server list.
// Servers at or near the front of the list are more stable than servers near
// the end of the list.
func (sm *ServerManager) RebalanceServers() {
sm.serverConfigLock.Lock()
defer sm.serverConfigLock.Unlock()
@ -237,11 +248,7 @@ func (sm *ServerManager) RebalanceServers() {
}
// RemoveServer takes out an internal write lock and removes a server from
// the server list. No rebalancing happens as a result of the removed server
// because we do not want a network partition which separated a server from
// this agent to cause an increase in work. Instead we rely on the internal
// already existing semantics to handle failure detection after a server has
// been removed.
// the server list.
func (sm *ServerManager) RemoveServer(server *server_details.ServerDetails) {
sm.serverConfigLock.Lock()
defer sm.serverConfigLock.Unlock()
@ -264,7 +271,9 @@ func (sm *ServerManager) RemoveServer(server *server_details.ServerDetails) {
}
}
// refreshServerRebalanceTimer is called
// refreshServerRebalanceTimer is only called once the rebalanceTimer
// expires. Historically this was an expensive routine and is intended to be
// run in isolation in a dedicated, non-concurrent task.
func (sm *ServerManager) refreshServerRebalanceTimer(timer *time.Timer) {
serverCfg := sm.getServerConfig()
numConsulServers := len(serverCfg.servers)
@ -289,8 +298,11 @@ func (sm *ServerManager) saveServerConfig(sc serverConfig) {
}
// Start is used to start and manage the task of automatically shuffling and
// rebalance the list of consul servers. This maintenance happens either
// when a new server is added or when a duration has been exceed.
// rebalancing the list of consul servers. This maintenance only happens
// periodically based on the expiration of the timer. Failed servers are
// automatically cycled to the end of the list. New servers are appended to
// the list. The order of the server list must be shuffled periodically to
// distribute load across all known and available consul servers.
func (sm *ServerManager) Start() {
var rebalanceTimer *time.Timer = time.NewTimer(time.Duration(initialRebalanceTimeoutHours * time.Hour))
var rebalanceTaskDispatched int32