Rename serverCfg to sc for consistency
This commit is contained in:
parent
988b05700d
commit
fc1edea1ef
|
@ -59,7 +59,7 @@ type ConnPoolPinger interface {
|
||||||
PingConsulServer(server *server_details.ServerDetails) bool
|
PingConsulServer(server *server_details.ServerDetails) bool
|
||||||
}
|
}
|
||||||
|
|
||||||
// serverCfg is the thread-safe configuration struct used to maintain the
|
// serverConfig is the thread-safe configuration struct used to maintain the
|
||||||
// list of Consul servers in ServerManager.
|
// list of Consul servers in ServerManager.
|
||||||
//
|
//
|
||||||
// NOTE(sean@): We are explicitly relying on the fact that serverConfig will
|
// NOTE(sean@): We are explicitly relying on the fact that serverConfig will
|
||||||
|
@ -107,20 +107,20 @@ type ServerManager struct {
|
||||||
func (sm *ServerManager) AddServer(server *server_details.ServerDetails) {
|
func (sm *ServerManager) AddServer(server *server_details.ServerDetails) {
|
||||||
sm.serverConfigLock.Lock()
|
sm.serverConfigLock.Lock()
|
||||||
defer sm.serverConfigLock.Unlock()
|
defer sm.serverConfigLock.Unlock()
|
||||||
serverCfg := sm.getServerConfig()
|
sc := sm.getServerConfig()
|
||||||
|
|
||||||
// Check if this server is known
|
// Check if this server is known
|
||||||
found := false
|
found := false
|
||||||
for idx, existing := range serverCfg.servers {
|
for idx, existing := range sc.servers {
|
||||||
if existing.Name == server.Name {
|
if existing.Name == server.Name {
|
||||||
newServers := make([]*server_details.ServerDetails, len(serverCfg.servers))
|
newServers := make([]*server_details.ServerDetails, len(sc.servers))
|
||||||
copy(newServers, serverCfg.servers)
|
copy(newServers, sc.servers)
|
||||||
|
|
||||||
// Overwrite the existing server details in order to
|
// Overwrite the existing server details in order to
|
||||||
// possibly update metadata (e.g. server version)
|
// possibly update metadata (e.g. server version)
|
||||||
newServers[idx] = server
|
newServers[idx] = server
|
||||||
|
|
||||||
serverCfg.servers = newServers
|
sc.servers = newServers
|
||||||
found = true
|
found = true
|
||||||
break
|
break
|
||||||
}
|
}
|
||||||
|
@ -128,13 +128,13 @@ func (sm *ServerManager) AddServer(server *server_details.ServerDetails) {
|
||||||
|
|
||||||
// Add to the list if not known
|
// Add to the list if not known
|
||||||
if !found {
|
if !found {
|
||||||
newServers := make([]*server_details.ServerDetails, len(serverCfg.servers), len(serverCfg.servers)+1)
|
newServers := make([]*server_details.ServerDetails, len(sc.servers), len(sc.servers)+1)
|
||||||
copy(newServers, serverCfg.servers)
|
copy(newServers, sc.servers)
|
||||||
newServers = append(newServers, server)
|
newServers = append(newServers, server)
|
||||||
serverCfg.servers = newServers
|
sc.servers = newServers
|
||||||
}
|
}
|
||||||
|
|
||||||
sm.saveServerConfig(serverCfg)
|
sm.saveServerConfig(sc)
|
||||||
}
|
}
|
||||||
|
|
||||||
// cycleServers returns a new list of servers that has dequeued the first
|
// cycleServers returns a new list of servers that has dequeued the first
|
||||||
|
@ -193,8 +193,8 @@ func (sc *serverConfig) shuffleServers() {
|
||||||
// during an RPC call, it is rotated to the end of the list. If there are no
|
// during an RPC call, it is rotated to the end of the list. If there are no
|
||||||
// servers available, return nil.
|
// servers available, return nil.
|
||||||
func (sm *ServerManager) FindServer() *server_details.ServerDetails {
|
func (sm *ServerManager) FindServer() *server_details.ServerDetails {
|
||||||
serverCfg := sm.getServerConfig()
|
sc := sm.getServerConfig()
|
||||||
numServers := len(serverCfg.servers)
|
numServers := len(sc.servers)
|
||||||
if numServers == 0 {
|
if numServers == 0 {
|
||||||
sm.logger.Printf("[WARN] server manager: No servers available")
|
sm.logger.Printf("[WARN] server manager: No servers available")
|
||||||
return nil
|
return nil
|
||||||
|
@ -203,7 +203,7 @@ func (sm *ServerManager) FindServer() *server_details.ServerDetails {
|
||||||
// assumed to be the oldest in the server list (unless -
|
// assumed to be the oldest in the server list (unless -
|
||||||
// hypothetically - the server list was rotated right after a
|
// hypothetically - the server list was rotated right after a
|
||||||
// server was added).
|
// server was added).
|
||||||
return serverCfg.servers[0]
|
return sc.servers[0]
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -237,7 +237,7 @@ func New(logger *log.Logger, shutdownCh chan struct{}, clusterInfo ConsulCluster
|
||||||
// NotifyFailedServer marks the passed in server as "failed" by rotating it
|
// NotifyFailedServer marks the passed in server as "failed" by rotating it
|
||||||
// to the end of the server list.
|
// to the end of the server list.
|
||||||
func (sm *ServerManager) NotifyFailedServer(server *server_details.ServerDetails) {
|
func (sm *ServerManager) NotifyFailedServer(server *server_details.ServerDetails) {
|
||||||
serverCfg := sm.getServerConfig()
|
sc := sm.getServerConfig()
|
||||||
|
|
||||||
// If the server being failed is not the first server on the list,
|
// If the server being failed is not the first server on the list,
|
||||||
// this is a noop. If, however, the server is failed and first on
|
// this is a noop. If, however, the server is failed and first on
|
||||||
|
@ -245,7 +245,7 @@ func (sm *ServerManager) NotifyFailedServer(server *server_details.ServerDetails
|
||||||
// the server to the end of the list.
|
// the server to the end of the list.
|
||||||
|
|
||||||
// Only rotate the server list when there is more than one server
|
// Only rotate the server list when there is more than one server
|
||||||
if len(serverCfg.servers) > 1 && serverCfg.servers[0] == server &&
|
if len(sc.servers) > 1 && sc.servers[0] == server &&
|
||||||
// Use atomic.CAS to emulate a TryLock().
|
// Use atomic.CAS to emulate a TryLock().
|
||||||
atomic.CompareAndSwapInt32(&sm.notifyFailedBarrier, 0, 1) {
|
atomic.CompareAndSwapInt32(&sm.notifyFailedBarrier, 0, 1) {
|
||||||
defer atomic.StoreInt32(&sm.notifyFailedBarrier, 0)
|
defer atomic.StoreInt32(&sm.notifyFailedBarrier, 0)
|
||||||
|
@ -254,11 +254,11 @@ func (sm *ServerManager) NotifyFailedServer(server *server_details.ServerDetails
|
||||||
// server to the end.
|
// server to the end.
|
||||||
sm.serverConfigLock.Lock()
|
sm.serverConfigLock.Lock()
|
||||||
defer sm.serverConfigLock.Unlock()
|
defer sm.serverConfigLock.Unlock()
|
||||||
serverCfg = sm.getServerConfig()
|
sc = sm.getServerConfig()
|
||||||
|
|
||||||
if len(serverCfg.servers) > 1 && serverCfg.servers[0] == server {
|
if len(sc.servers) > 1 && sc.servers[0] == server {
|
||||||
serverCfg.servers = serverCfg.cycleServer()
|
sc.servers = sc.cycleServer()
|
||||||
sm.saveServerConfig(serverCfg)
|
sm.saveServerConfig(sc)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -266,8 +266,8 @@ func (sm *ServerManager) NotifyFailedServer(server *server_details.ServerDetails
|
||||||
// NumServers takes out an internal "read lock" and returns the number of
|
// NumServers takes out an internal "read lock" and returns the number of
|
||||||
// servers. numServers includes both healthy and unhealthy servers.
|
// servers. numServers includes both healthy and unhealthy servers.
|
||||||
func (sm *ServerManager) NumServers() (numServers int) {
|
func (sm *ServerManager) NumServers() (numServers int) {
|
||||||
serverCfg := sm.getServerConfig()
|
sc := sm.getServerConfig()
|
||||||
numServers = len(serverCfg.servers)
|
numServers = len(sc.servers)
|
||||||
return numServers
|
return numServers
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -286,24 +286,24 @@ func (sm *ServerManager) NumServers() (numServers int) {
|
||||||
func (sm *ServerManager) RebalanceServers() {
|
func (sm *ServerManager) RebalanceServers() {
|
||||||
FAILED_SERVER_DURING_REBALANCE:
|
FAILED_SERVER_DURING_REBALANCE:
|
||||||
// Obtain a copy of the server config
|
// Obtain a copy of the server config
|
||||||
serverCfg := sm.getServerConfig()
|
sc := sm.getServerConfig()
|
||||||
|
|
||||||
// Early abort if there is no value to shuffling
|
// Early abort if there is no value to shuffling
|
||||||
if len(serverCfg.servers) < 2 {
|
if len(sc.servers) < 2 {
|
||||||
// sm.logger.Printf("[INFO] server manager: can't rebalance with only %d servers", len(serverCfg.servers))
|
// sm.logger.Printf("[INFO] server manager: can't rebalance with only %d servers", len(sc.servers))
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
serverCfg.shuffleServers()
|
sc.shuffleServers()
|
||||||
|
|
||||||
// Iterate through the shuffled server list to find a healthy server.
|
// Iterate through the shuffled server list to find a healthy server.
|
||||||
// Don't iterate on the list directly, this loop mutates server the
|
// Don't iterate on the list directly, this loop mutates server the
|
||||||
// list.
|
// list.
|
||||||
var foundHealthyServer bool
|
var foundHealthyServer bool
|
||||||
for n := len(serverCfg.servers); n > 0; n-- {
|
for n := len(sc.servers); n > 0; n-- {
|
||||||
// Always test the first server. Failed servers are cycled
|
// Always test the first server. Failed servers are cycled
|
||||||
// while Serf detects the node has failed.
|
// while Serf detects the node has failed.
|
||||||
selectedServer := serverCfg.servers[0]
|
selectedServer := sc.servers[0]
|
||||||
|
|
||||||
// sm.logger.Printf("[INFO] server manager: Preemptively testing server %s before rebalance", selectedServer.String())
|
// sm.logger.Printf("[INFO] server manager: Preemptively testing server %s before rebalance", selectedServer.String())
|
||||||
ok := sm.connPoolPinger.PingConsulServer(selectedServer)
|
ok := sm.connPoolPinger.PingConsulServer(selectedServer)
|
||||||
|
@ -311,7 +311,7 @@ FAILED_SERVER_DURING_REBALANCE:
|
||||||
foundHealthyServer = true
|
foundHealthyServer = true
|
||||||
break
|
break
|
||||||
}
|
}
|
||||||
serverCfg.cycleServer()
|
sc.cycleServer()
|
||||||
}
|
}
|
||||||
|
|
||||||
// If no healthy servers were found, sleep and wait for Serf to make
|
// If no healthy servers were found, sleep and wait for Serf to make
|
||||||
|
@ -341,7 +341,7 @@ FAILED_SERVER_DURING_REBALANCE:
|
||||||
state byte
|
state byte
|
||||||
}
|
}
|
||||||
mergedList := make(map[server_details.Key]*targetServer)
|
mergedList := make(map[server_details.Key]*targetServer)
|
||||||
for _, s := range serverCfg.servers {
|
for _, s := range sc.servers {
|
||||||
mergedList[*s.Key()] = &targetServer{server: s, state: 'o'}
|
mergedList[*s.Key()] = &targetServer{server: s, state: 'o'}
|
||||||
}
|
}
|
||||||
for _, s := range tmpServerCfg.servers {
|
for _, s := range tmpServerCfg.servers {
|
||||||
|
@ -355,7 +355,7 @@ FAILED_SERVER_DURING_REBALANCE:
|
||||||
}
|
}
|
||||||
|
|
||||||
// Ensure the selected server has not been removed by Serf
|
// Ensure the selected server has not been removed by Serf
|
||||||
selectedServerKey := serverCfg.servers[0].Key()
|
selectedServerKey := sc.servers[0].Key()
|
||||||
if v, found := mergedList[*selectedServerKey]; found && v.state == 'o' {
|
if v, found := mergedList[*selectedServerKey]; found && v.state == 'o' {
|
||||||
return false
|
return false
|
||||||
}
|
}
|
||||||
|
@ -367,16 +367,16 @@ FAILED_SERVER_DURING_REBALANCE:
|
||||||
// Do nothing, server exists in both
|
// Do nothing, server exists in both
|
||||||
case 'o':
|
case 'o':
|
||||||
// Server has been removed
|
// Server has been removed
|
||||||
serverCfg.removeServerByKey(&k)
|
sc.removeServerByKey(&k)
|
||||||
case 'n':
|
case 'n':
|
||||||
// Server added
|
// Server added
|
||||||
serverCfg.servers = append(serverCfg.servers, v.server)
|
sc.servers = append(sc.servers, v.server)
|
||||||
default:
|
default:
|
||||||
panic("not implemented")
|
panic("not implemented")
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
sm.saveServerConfig(serverCfg)
|
sm.saveServerConfig(sc)
|
||||||
return true
|
return true
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -384,7 +384,7 @@ FAILED_SERVER_DURING_REBALANCE:
|
||||||
goto FAILED_SERVER_DURING_REBALANCE
|
goto FAILED_SERVER_DURING_REBALANCE
|
||||||
}
|
}
|
||||||
|
|
||||||
sm.logger.Printf("[DEBUG] server manager: Rebalanced %d servers, next active server is %s", len(serverCfg.servers), serverCfg.servers[0].String())
|
sm.logger.Printf("[DEBUG] server manager: Rebalanced %d servers, next active server is %s", len(sc.servers), sc.servers[0].String())
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -393,17 +393,17 @@ FAILED_SERVER_DURING_REBALANCE:
|
||||||
func (sm *ServerManager) RemoveServer(server *server_details.ServerDetails) {
|
func (sm *ServerManager) RemoveServer(server *server_details.ServerDetails) {
|
||||||
sm.serverConfigLock.Lock()
|
sm.serverConfigLock.Lock()
|
||||||
defer sm.serverConfigLock.Unlock()
|
defer sm.serverConfigLock.Unlock()
|
||||||
serverCfg := sm.getServerConfig()
|
sc := sm.getServerConfig()
|
||||||
|
|
||||||
// Remove the server if known
|
// Remove the server if known
|
||||||
for i, _ := range serverCfg.servers {
|
for i, _ := range sc.servers {
|
||||||
if serverCfg.servers[i].Name == server.Name {
|
if sc.servers[i].Name == server.Name {
|
||||||
newServers := make([]*server_details.ServerDetails, 0, len(serverCfg.servers)-1)
|
newServers := make([]*server_details.ServerDetails, 0, len(sc.servers)-1)
|
||||||
newServers = append(newServers, serverCfg.servers[:i]...)
|
newServers = append(newServers, sc.servers[:i]...)
|
||||||
newServers = append(newServers, serverCfg.servers[i+1:]...)
|
newServers = append(newServers, sc.servers[i+1:]...)
|
||||||
serverCfg.servers = newServers
|
sc.servers = newServers
|
||||||
|
|
||||||
sm.saveServerConfig(serverCfg)
|
sm.saveServerConfig(sc)
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -411,8 +411,8 @@ func (sm *ServerManager) RemoveServer(server *server_details.ServerDetails) {
|
||||||
|
|
||||||
// refreshServerRebalanceTimer is only called once sm.rebalanceTimer expires.
|
// refreshServerRebalanceTimer is only called once sm.rebalanceTimer expires.
|
||||||
func (sm *ServerManager) refreshServerRebalanceTimer() time.Duration {
|
func (sm *ServerManager) refreshServerRebalanceTimer() time.Duration {
|
||||||
serverCfg := sm.getServerConfig()
|
sc := sm.getServerConfig()
|
||||||
numConsulServers := len(serverCfg.servers)
|
numConsulServers := len(sc.servers)
|
||||||
// Limit this connection's life based on the size (and health) of the
|
// Limit this connection's life based on the size (and health) of the
|
||||||
// cluster. Never rebalance a connection more frequently than
|
// cluster. Never rebalance a connection more frequently than
|
||||||
// connReuseLowWatermarkDuration, and make sure we never exceed
|
// connReuseLowWatermarkDuration, and make sure we never exceed
|
||||||
|
|
Loading…
Reference in New Issue