diff --git a/consul/server_manager/server_manager.go b/consul/server_manager/server_manager.go index 92a1583f3..461df1fe2 100644 --- a/consul/server_manager/server_manager.go +++ b/consul/server_manager/server_manager.go @@ -59,7 +59,7 @@ type ConnPoolPinger interface { PingConsulServer(server *server_details.ServerDetails) bool } -// serverCfg is the thread-safe configuration struct used to maintain the +// serverConfig is the thread-safe configuration struct used to maintain the // list of Consul servers in ServerManager. // // NOTE(sean@): We are explicitly relying on the fact that serverConfig will @@ -107,20 +107,20 @@ type ServerManager struct { func (sm *ServerManager) AddServer(server *server_details.ServerDetails) { sm.serverConfigLock.Lock() defer sm.serverConfigLock.Unlock() - serverCfg := sm.getServerConfig() + sc := sm.getServerConfig() // Check if this server is known found := false - for idx, existing := range serverCfg.servers { + for idx, existing := range sc.servers { if existing.Name == server.Name { - newServers := make([]*server_details.ServerDetails, len(serverCfg.servers)) - copy(newServers, serverCfg.servers) + newServers := make([]*server_details.ServerDetails, len(sc.servers)) + copy(newServers, sc.servers) // Overwrite the existing server details in order to // possibly update metadata (e.g. server version) newServers[idx] = server - serverCfg.servers = newServers + sc.servers = newServers found = true break } @@ -128,13 +128,13 @@ func (sm *ServerManager) AddServer(server *server_details.ServerDetails) { // Add to the list if not known if !found { - newServers := make([]*server_details.ServerDetails, len(serverCfg.servers), len(serverCfg.servers)+1) - copy(newServers, serverCfg.servers) + newServers := make([]*server_details.ServerDetails, len(sc.servers), len(sc.servers)+1) + copy(newServers, sc.servers) newServers = append(newServers, server) - serverCfg.servers = newServers + sc.servers = newServers } - sm.saveServerConfig(serverCfg) + sm.saveServerConfig(sc) } // cycleServers returns a new list of servers that has dequeued the first @@ -193,8 +193,8 @@ func (sc *serverConfig) shuffleServers() { // during an RPC call, it is rotated to the end of the list. If there are no // servers available, return nil. func (sm *ServerManager) FindServer() *server_details.ServerDetails { - serverCfg := sm.getServerConfig() - numServers := len(serverCfg.servers) + sc := sm.getServerConfig() + numServers := len(sc.servers) if numServers == 0 { sm.logger.Printf("[WARN] server manager: No servers available") return nil @@ -203,7 +203,7 @@ func (sm *ServerManager) FindServer() *server_details.ServerDetails { // assumed to be the oldest in the server list (unless - // hypothetically - the server list was rotated right after a // server was added). - return serverCfg.servers[0] + return sc.servers[0] } } @@ -237,7 +237,7 @@ func New(logger *log.Logger, shutdownCh chan struct{}, clusterInfo ConsulCluster // NotifyFailedServer marks the passed in server as "failed" by rotating it // to the end of the server list. func (sm *ServerManager) NotifyFailedServer(server *server_details.ServerDetails) { - serverCfg := sm.getServerConfig() + sc := sm.getServerConfig() // If the server being failed is not the first server on the list, // this is a noop. If, however, the server is failed and first on @@ -245,7 +245,7 @@ func (sm *ServerManager) NotifyFailedServer(server *server_details.ServerDetails // the server to the end of the list. // Only rotate the server list when there is more than one server - if len(serverCfg.servers) > 1 && serverCfg.servers[0] == server && + if len(sc.servers) > 1 && sc.servers[0] == server && // Use atomic.CAS to emulate a TryLock(). atomic.CompareAndSwapInt32(&sm.notifyFailedBarrier, 0, 1) { defer atomic.StoreInt32(&sm.notifyFailedBarrier, 0) @@ -254,11 +254,11 @@ func (sm *ServerManager) NotifyFailedServer(server *server_details.ServerDetails // server to the end. sm.serverConfigLock.Lock() defer sm.serverConfigLock.Unlock() - serverCfg = sm.getServerConfig() + sc = sm.getServerConfig() - if len(serverCfg.servers) > 1 && serverCfg.servers[0] == server { - serverCfg.servers = serverCfg.cycleServer() - sm.saveServerConfig(serverCfg) + if len(sc.servers) > 1 && sc.servers[0] == server { + sc.servers = sc.cycleServer() + sm.saveServerConfig(sc) } } } @@ -266,8 +266,8 @@ func (sm *ServerManager) NotifyFailedServer(server *server_details.ServerDetails // NumServers takes out an internal "read lock" and returns the number of // servers. numServers includes both healthy and unhealthy servers. func (sm *ServerManager) NumServers() (numServers int) { - serverCfg := sm.getServerConfig() - numServers = len(serverCfg.servers) + sc := sm.getServerConfig() + numServers = len(sc.servers) return numServers } @@ -286,24 +286,24 @@ func (sm *ServerManager) NumServers() (numServers int) { func (sm *ServerManager) RebalanceServers() { FAILED_SERVER_DURING_REBALANCE: // Obtain a copy of the server config - serverCfg := sm.getServerConfig() + sc := sm.getServerConfig() // Early abort if there is no value to shuffling - if len(serverCfg.servers) < 2 { - // sm.logger.Printf("[INFO] server manager: can't rebalance with only %d servers", len(serverCfg.servers)) + if len(sc.servers) < 2 { + // sm.logger.Printf("[INFO] server manager: can't rebalance with only %d servers", len(sc.servers)) return } - serverCfg.shuffleServers() + sc.shuffleServers() // Iterate through the shuffled server list to find a healthy server. // Don't iterate on the list directly, this loop mutates server the // list. var foundHealthyServer bool - for n := len(serverCfg.servers); n > 0; n-- { + for n := len(sc.servers); n > 0; n-- { // Always test the first server. Failed servers are cycled // while Serf detects the node has failed. - selectedServer := serverCfg.servers[0] + selectedServer := sc.servers[0] // sm.logger.Printf("[INFO] server manager: Preemptively testing server %s before rebalance", selectedServer.String()) ok := sm.connPoolPinger.PingConsulServer(selectedServer) @@ -311,7 +311,7 @@ FAILED_SERVER_DURING_REBALANCE: foundHealthyServer = true break } - serverCfg.cycleServer() + sc.cycleServer() } // If no healthy servers were found, sleep and wait for Serf to make @@ -341,7 +341,7 @@ FAILED_SERVER_DURING_REBALANCE: state byte } mergedList := make(map[server_details.Key]*targetServer) - for _, s := range serverCfg.servers { + for _, s := range sc.servers { mergedList[*s.Key()] = &targetServer{server: s, state: 'o'} } for _, s := range tmpServerCfg.servers { @@ -355,7 +355,7 @@ FAILED_SERVER_DURING_REBALANCE: } // Ensure the selected server has not been removed by Serf - selectedServerKey := serverCfg.servers[0].Key() + selectedServerKey := sc.servers[0].Key() if v, found := mergedList[*selectedServerKey]; found && v.state == 'o' { return false } @@ -367,16 +367,16 @@ FAILED_SERVER_DURING_REBALANCE: // Do nothing, server exists in both case 'o': // Server has been removed - serverCfg.removeServerByKey(&k) + sc.removeServerByKey(&k) case 'n': // Server added - serverCfg.servers = append(serverCfg.servers, v.server) + sc.servers = append(sc.servers, v.server) default: panic("not implemented") } } - sm.saveServerConfig(serverCfg) + sm.saveServerConfig(sc) return true } @@ -384,7 +384,7 @@ FAILED_SERVER_DURING_REBALANCE: goto FAILED_SERVER_DURING_REBALANCE } - sm.logger.Printf("[DEBUG] server manager: Rebalanced %d servers, next active server is %s", len(serverCfg.servers), serverCfg.servers[0].String()) + sm.logger.Printf("[DEBUG] server manager: Rebalanced %d servers, next active server is %s", len(sc.servers), sc.servers[0].String()) return } @@ -393,17 +393,17 @@ FAILED_SERVER_DURING_REBALANCE: func (sm *ServerManager) RemoveServer(server *server_details.ServerDetails) { sm.serverConfigLock.Lock() defer sm.serverConfigLock.Unlock() - serverCfg := sm.getServerConfig() + sc := sm.getServerConfig() // Remove the server if known - for i, _ := range serverCfg.servers { - if serverCfg.servers[i].Name == server.Name { - newServers := make([]*server_details.ServerDetails, 0, len(serverCfg.servers)-1) - newServers = append(newServers, serverCfg.servers[:i]...) - newServers = append(newServers, serverCfg.servers[i+1:]...) - serverCfg.servers = newServers + for i, _ := range sc.servers { + if sc.servers[i].Name == server.Name { + newServers := make([]*server_details.ServerDetails, 0, len(sc.servers)-1) + newServers = append(newServers, sc.servers[:i]...) + newServers = append(newServers, sc.servers[i+1:]...) + sc.servers = newServers - sm.saveServerConfig(serverCfg) + sm.saveServerConfig(sc) return } } @@ -411,8 +411,8 @@ func (sm *ServerManager) RemoveServer(server *server_details.ServerDetails) { // refreshServerRebalanceTimer is only called once sm.rebalanceTimer expires. func (sm *ServerManager) refreshServerRebalanceTimer() time.Duration { - serverCfg := sm.getServerConfig() - numConsulServers := len(serverCfg.servers) + sc := sm.getServerConfig() + numConsulServers := len(sc.servers) // Limit this connection's life based on the size (and health) of the // cluster. Never rebalance a connection more frequently than // connReuseLowWatermarkDuration, and make sure we never exceed