From 897282f77dbe5c925cd3adefbb4c7b3090b477da Mon Sep 17 00:00:00 2001 From: Sean Chittenden Date: Tue, 29 Mar 2016 16:17:16 -0700 Subject: [PATCH] Rename serverConfig to serverList serverList is a vastly more accurate name. Chase accordingly. No functional change other than types and APIs. --- consul/servers/manager.go | 187 ++++++++++++------------ consul/servers/manager_internal_test.go | 122 ++++++++-------- 2 files changed, 154 insertions(+), 155 deletions(-) diff --git a/consul/servers/manager.go b/consul/servers/manager.go index a94cc2b04..2962e9f8c 100644 --- a/consul/servers/manager.go +++ b/consul/servers/manager.go @@ -59,22 +59,21 @@ type Pinger interface { PingConsulServer(server *server_details.ServerDetails) (bool, error) } -// serverConfig is the thread-safe configuration struct used to maintain the -// list of Consul servers in Manager. +// serverList is a local copy of the struct used to maintain the list of +// Consul servers used by Manager. // -// NOTE(sean@): We are explicitly relying on the fact that serverConfig will +// NOTE(sean@): We are explicitly relying on the fact that serverList will // be copied onto the stack. Please keep this structure light. -type serverConfig struct { +type serverList struct { // servers tracks the locally known servers. List membership is // maintained by Serf. servers []*server_details.ServerDetails } type Manager struct { - // serverConfig provides the necessary load/store semantics for the - // server list. - serverConfigValue atomic.Value - serverConfigLock sync.Mutex + // listValue manages the atomic load/store of a Manager's serverList + listValue atomic.Value + listLock sync.Mutex // rebalanceTimer controls the duration of the rebalance interval rebalanceTimer *time.Timer @@ -95,7 +94,7 @@ type Manager struct { connPoolPinger Pinger // notifyFailedBarrier is acts as a barrier to prevent queuing behind - // serverConfigLog and acts as a TryLock(). + // serverListLog and acts as a TryLock(). notifyFailedBarrier int32 } @@ -105,22 +104,22 @@ type Manager struct { // organically. If the server is already known, merge the new server // details. func (m *Manager) AddServer(server *server_details.ServerDetails) { - m.serverConfigLock.Lock() - defer m.serverConfigLock.Unlock() - sc := m.getServerConfig() + m.listLock.Lock() + defer m.listLock.Unlock() + l := m.getServerList() // Check if this server is known found := false - for idx, existing := range sc.servers { + for idx, existing := range l.servers { if existing.Name == server.Name { - newServers := make([]*server_details.ServerDetails, len(sc.servers)) - copy(newServers, sc.servers) + newServers := make([]*server_details.ServerDetails, len(l.servers)) + copy(newServers, l.servers) // Overwrite the existing server details in order to // possibly update metadata (e.g. server version) newServers[idx] = server - sc.servers = newServers + l.servers = newServers found = true break } @@ -128,53 +127,53 @@ func (m *Manager) AddServer(server *server_details.ServerDetails) { // Add to the list if not known if !found { - newServers := make([]*server_details.ServerDetails, len(sc.servers), len(sc.servers)+1) - copy(newServers, sc.servers) + newServers := make([]*server_details.ServerDetails, len(l.servers), len(l.servers)+1) + copy(newServers, l.servers) newServers = append(newServers, server) - sc.servers = newServers + l.servers = newServers } - m.saveServerConfig(sc) + m.saveServerList(l) } // cycleServers returns a new list of servers that has dequeued the first // server and enqueued it at the end of the list. cycleServers assumes the -// caller is holding the serverConfigLock. cycleServer does not test or ping +// caller is holding the listLock. cycleServer does not test or ping // the next server inline. cycleServer may be called when the environment // has just entered an unhealthy situation and blocking on a server test is // less desirable than just returning the next server in the firing line. If // the next server fails, it will fail fast enough and cycleServer will be // called again. -func (sc *serverConfig) cycleServer() (servers []*server_details.ServerDetails) { - numServers := len(sc.servers) +func (l *serverList) cycleServer() (servers []*server_details.ServerDetails) { + numServers := len(l.servers) if numServers < 2 { return servers // No action required } newServers := make([]*server_details.ServerDetails, 0, numServers) - newServers = append(newServers, sc.servers[1:]...) - newServers = append(newServers, sc.servers[0]) + newServers = append(newServers, l.servers[1:]...) + newServers = append(newServers, l.servers[0]) return newServers } // removeServerByKey performs an inline removal of the first matching server -func (sc *serverConfig) removeServerByKey(targetKey *server_details.Key) { - for i, s := range sc.servers { +func (l *serverList) removeServerByKey(targetKey *server_details.Key) { + for i, s := range l.servers { if targetKey.Equal(s.Key()) { - copy(sc.servers[i:], sc.servers[i+1:]) - sc.servers[len(sc.servers)-1] = nil - sc.servers = sc.servers[:len(sc.servers)-1] + copy(l.servers[i:], l.servers[i+1:]) + l.servers[len(l.servers)-1] = nil + l.servers = l.servers[:len(l.servers)-1] return } } } // shuffleServers shuffles the server list in place -func (sc *serverConfig) shuffleServers() { - for i := len(sc.servers) - 1; i > 0; i-- { +func (l *serverList) shuffleServers() { + for i := len(l.servers) - 1; i > 0; i-- { j := rand.Int31n(int32(i + 1)) - sc.servers[i], sc.servers[j] = sc.servers[j], sc.servers[i] + l.servers[i], l.servers[j] = l.servers[j], l.servers[i] } } @@ -185,8 +184,8 @@ func (sc *serverConfig) shuffleServers() { // during an RPC call, it is rotated to the end of the list. If there are no // servers available, return nil. func (m *Manager) FindServer() *server_details.ServerDetails { - sc := m.getServerConfig() - numServers := len(sc.servers) + l := m.getServerList() + numServers := len(l.servers) if numServers == 0 { m.logger.Printf("[WARN] manager: No servers available") return nil @@ -195,20 +194,20 @@ func (m *Manager) FindServer() *server_details.ServerDetails { // assumed to be the oldest in the server list (unless - // hypothetically - the server list was rotated right after a // server was added). - return sc.servers[0] + return l.servers[0] } } -// getServerConfig is a convenience method which hides the locking semantics +// getServerList is a convenience method which hides the locking semantics // of atomic.Value from the caller. -func (m *Manager) getServerConfig() serverConfig { - return m.serverConfigValue.Load().(serverConfig) +func (m *Manager) getServerList() serverList { + return m.listValue.Load().(serverList) } -// saveServerConfig is a convenience method which hides the locking semantics +// saveServerList is a convenience method which hides the locking semantics // of atomic.Value from the caller. -func (m *Manager) saveServerConfig(sc serverConfig) { - m.serverConfigValue.Store(sc) +func (m *Manager) saveServerList(l serverList) { + m.listValue.Store(l) } // New is the only way to safely create a new Manager struct. @@ -220,16 +219,16 @@ func New(logger *log.Logger, shutdownCh chan struct{}, clusterInfo ConsulCluster m.rebalanceTimer = time.NewTimer(clientRPCMinReuseDuration) m.shutdownCh = shutdownCh - sc := serverConfig{} - sc.servers = make([]*server_details.ServerDetails, 0) - m.saveServerConfig(sc) + l := serverList{} + l.servers = make([]*server_details.ServerDetails, 0) + m.saveServerList(l) return m } // NotifyFailedServer marks the passed in server as "failed" by rotating it // to the end of the server list. func (m *Manager) NotifyFailedServer(server *server_details.ServerDetails) { - sc := m.getServerConfig() + l := m.getServerList() // If the server being failed is not the first server on the list, // this is a noop. If, however, the server is failed and first on @@ -237,20 +236,20 @@ func (m *Manager) NotifyFailedServer(server *server_details.ServerDetails) { // the server to the end of the list. // Only rotate the server list when there is more than one server - if len(sc.servers) > 1 && sc.servers[0] == server && + if len(l.servers) > 1 && l.servers[0] == server && // Use atomic.CAS to emulate a TryLock(). atomic.CompareAndSwapInt32(&m.notifyFailedBarrier, 0, 1) { defer atomic.StoreInt32(&m.notifyFailedBarrier, 0) // Grab a lock, retest, and take the hit of cycling the first // server to the end. - m.serverConfigLock.Lock() - defer m.serverConfigLock.Unlock() - sc = m.getServerConfig() + m.listLock.Lock() + defer m.listLock.Unlock() + l = m.getServerList() - if len(sc.servers) > 1 && sc.servers[0] == server { - sc.servers = sc.cycleServer() - m.saveServerConfig(sc) + if len(l.servers) > 1 && l.servers[0] == server { + l.servers = l.cycleServer() + m.saveServerList(l) } } } @@ -258,8 +257,8 @@ func (m *Manager) NotifyFailedServer(server *server_details.ServerDetails) { // NumServers takes out an internal "read lock" and returns the number of // servers. numServers includes both healthy and unhealthy servers. func (m *Manager) NumServers() (numServers int) { - sc := m.getServerConfig() - numServers = len(sc.servers) + l := m.getServerList() + numServers = len(l.servers) return numServers } @@ -276,24 +275,24 @@ func (m *Manager) NumServers() (numServers int) { // deregistered. Before the newly shuffled server list is saved, the new // remote endpoint is tested to ensure its responsive. func (m *Manager) RebalanceServers() { - // Obtain a copy of the current serverConfig - sc := m.getServerConfig() + // Obtain a copy of the current serverList + l := m.getServerList() // Early abort if there is no value to shuffling - if len(sc.servers) < 2 { + if len(l.servers) < 2 { return } - sc.shuffleServers() + l.shuffleServers() // Iterate through the shuffled server list to find a healthy server. // Don't iterate on the list directly, this loop mutates the server // list. var foundHealthyServer bool - for i := 0; i < len(sc.servers); i++ { + for i := 0; i < len(l.servers); i++ { // Always test the first server. Failed servers are cycled // while Serf detects the node has failed. - selectedServer := sc.servers[0] + selectedServer := l.servers[0] ok, err := m.connPoolPinger.PingConsulServer(selectedServer) if ok { @@ -302,7 +301,7 @@ func (m *Manager) RebalanceServers() { } m.logger.Printf(`[DEBUG] manager: pinging server "%s" failed: %s`, selectedServer.String(), err) - sc.cycleServer() + l.cycleServer() } // If no healthy servers were found, sleep and wait for Serf to make @@ -313,8 +312,8 @@ func (m *Manager) RebalanceServers() { } // Verify that all servers are present - if m.reconcileServerList(&sc) { - m.logger.Printf("[DEBUG] manager: Rebalanced %d servers, next active server is %s", len(sc.servers), sc.servers[0].String()) + if m.reconcileServerList(&l) { + m.logger.Printf("[DEBUG] manager: Rebalanced %d servers, next active server is %s", len(l.servers), l.servers[0].String()) } else { // reconcileServerList failed because Serf removed the server // that was at the front of the list that had successfully @@ -330,23 +329,23 @@ func (m *Manager) RebalanceServers() { return } -// reconcileServerList returns true when the first server in serverConfig -// exists in the receiver's serverConfig. If true, the merged serverConfig -// is stored as the receiver's serverConfig. Returns false if the first +// reconcileServerList returns true when the first server in serverList +// exists in the receiver's serverList. If true, the merged serverList +// is stored as the receiver's serverList. Returns false if the first // server does not exist in the list (i.e. was removed by Serf during a // PingConsulServer() call. Newly added servers are appended to the list and // other missing servers are removed from the list. -func (m *Manager) reconcileServerList(sc *serverConfig) bool { - m.serverConfigLock.Lock() - defer m.serverConfigLock.Unlock() +func (m *Manager) reconcileServerList(l *serverList) bool { + m.listLock.Lock() + defer m.listLock.Unlock() - // newServerCfg is a serverConfig that has been kept up to date with + // newServerCfg is a serverList that has been kept up to date with // Serf node join and node leave events. - newServerCfg := m.getServerConfig() + newServerCfg := m.getServerList() // If Serf has removed all nodes, or there is no selected server - // (zero nodes in sc), abort early. - if len(newServerCfg.servers) == 0 || len(sc.servers) == 0 { + // (zero nodes in l), abort early. + if len(newServerCfg.servers) == 0 || len(l.servers) == 0 { return false } @@ -358,8 +357,8 @@ func (m *Manager) reconcileServerList(sc *serverConfig) bool { // 'n' == new state byte } - mergedList := make(map[server_details.Key]*targetServer, len(sc.servers)) - for _, s := range sc.servers { + mergedList := make(map[server_details.Key]*targetServer, len(l.servers)) + for _, s := range l.servers { mergedList[*s.Key()] = &targetServer{server: s, state: 'o'} } for _, s := range newServerCfg.servers { @@ -373,7 +372,7 @@ func (m *Manager) reconcileServerList(sc *serverConfig) bool { } // Ensure the selected server has not been removed by Serf - selectedServerKey := sc.servers[0].Key() + selectedServerKey := l.servers[0].Key() if v, found := mergedList[*selectedServerKey]; found && v.state == 'o' { return false } @@ -385,35 +384,35 @@ func (m *Manager) reconcileServerList(sc *serverConfig) bool { // Do nothing, server exists in both case 'o': // Server has been removed - sc.removeServerByKey(&k) + l.removeServerByKey(&k) case 'n': // Server added - sc.servers = append(sc.servers, v.server) + l.servers = append(l.servers, v.server) default: panic("unknown merge list state") } } - m.saveServerConfig(*sc) + m.saveServerList(*l) return true } // RemoveServer takes out an internal write lock and removes a server from // the server list. func (m *Manager) RemoveServer(server *server_details.ServerDetails) { - m.serverConfigLock.Lock() - defer m.serverConfigLock.Unlock() - sc := m.getServerConfig() + m.listLock.Lock() + defer m.listLock.Unlock() + l := m.getServerList() // Remove the server if known - for i, _ := range sc.servers { - if sc.servers[i].Name == server.Name { - newServers := make([]*server_details.ServerDetails, 0, len(sc.servers)-1) - newServers = append(newServers, sc.servers[:i]...) - newServers = append(newServers, sc.servers[i+1:]...) - sc.servers = newServers + for i, _ := range l.servers { + if l.servers[i].Name == server.Name { + newServers := make([]*server_details.ServerDetails, 0, len(l.servers)-1) + newServers = append(newServers, l.servers[:i]...) + newServers = append(newServers, l.servers[i+1:]...) + l.servers = newServers - m.saveServerConfig(sc) + m.saveServerList(l) return } } @@ -421,8 +420,8 @@ func (m *Manager) RemoveServer(server *server_details.ServerDetails) { // refreshServerRebalanceTimer is only called once m.rebalanceTimer expires. func (m *Manager) refreshServerRebalanceTimer() time.Duration { - sc := m.getServerConfig() - numConsulServers := len(sc.servers) + l := m.getServerList() + numConsulServers := len(l.servers) // Limit this connection's life based on the size (and health) of the // cluster. Never rebalance a connection more frequently than // connReuseLowWatermarkDuration, and make sure we never exceed @@ -439,8 +438,8 @@ func (m *Manager) refreshServerRebalanceTimer() time.Duration { // ResetRebalanceTimer resets the rebalance timer. This method primarily // exists for testing and should not be used directly. func (m *Manager) ResetRebalanceTimer() { - m.serverConfigLock.Lock() - defer m.serverConfigLock.Unlock() + m.listLock.Lock() + defer m.listLock.Unlock() m.rebalanceTimer.Reset(clientRPCMinReuseDuration) } diff --git a/consul/servers/manager_internal_test.go b/consul/servers/manager_internal_test.go index 29a1b35af..eba5f32bc 100644 --- a/consul/servers/manager_internal_test.go +++ b/consul/servers/manager_internal_test.go @@ -63,68 +63,68 @@ func testManagerFailProb(failPct float64) (m *Manager) { return m } -// func (sc *serverConfig) cycleServer() (servers []*server_details.ServerDetails) { +// func (l *serverList) cycleServer() (servers []*server_details.ServerDetails) { func TestManagerInternal_cycleServer(t *testing.T) { m := testManager() - sc := m.getServerConfig() + l := m.getServerList() server0 := &server_details.ServerDetails{Name: "server1"} server1 := &server_details.ServerDetails{Name: "server2"} server2 := &server_details.ServerDetails{Name: "server3"} - sc.servers = append(sc.servers, server0, server1, server2) - m.saveServerConfig(sc) + l.servers = append(l.servers, server0, server1, server2) + m.saveServerList(l) - sc = m.getServerConfig() - if len(sc.servers) != 3 { - t.Fatalf("server length incorrect: %d/3", len(sc.servers)) + l = m.getServerList() + if len(l.servers) != 3 { + t.Fatalf("server length incorrect: %d/3", len(l.servers)) } - if sc.servers[0] != server0 && - sc.servers[1] != server1 && - sc.servers[2] != server2 { + if l.servers[0] != server0 && + l.servers[1] != server1 && + l.servers[2] != server2 { t.Fatalf("initial server ordering not correct") } - sc.servers = sc.cycleServer() - if len(sc.servers) != 3 { - t.Fatalf("server length incorrect: %d/3", len(sc.servers)) + l.servers = l.cycleServer() + if len(l.servers) != 3 { + t.Fatalf("server length incorrect: %d/3", len(l.servers)) } - if sc.servers[0] != server1 && - sc.servers[1] != server2 && - sc.servers[2] != server0 { + if l.servers[0] != server1 && + l.servers[1] != server2 && + l.servers[2] != server0 { t.Fatalf("server ordering after one cycle not correct") } - sc.servers = sc.cycleServer() - if len(sc.servers) != 3 { - t.Fatalf("server length incorrect: %d/3", len(sc.servers)) + l.servers = l.cycleServer() + if len(l.servers) != 3 { + t.Fatalf("server length incorrect: %d/3", len(l.servers)) } - if sc.servers[0] != server2 && - sc.servers[1] != server0 && - sc.servers[2] != server1 { + if l.servers[0] != server2 && + l.servers[1] != server0 && + l.servers[2] != server1 { t.Fatalf("server ordering after two cycles not correct") } - sc.servers = sc.cycleServer() - if len(sc.servers) != 3 { - t.Fatalf("server length incorrect: %d/3", len(sc.servers)) + l.servers = l.cycleServer() + if len(l.servers) != 3 { + t.Fatalf("server length incorrect: %d/3", len(l.servers)) } - if sc.servers[0] != server0 && - sc.servers[1] != server1 && - sc.servers[2] != server2 { + if l.servers[0] != server0 && + l.servers[1] != server1 && + l.servers[2] != server2 { t.Fatalf("server ordering after three cycles not correct") } } -// func (m *Manager) getServerConfig() serverConfig { -func TestManagerInternal_getServerConfig(t *testing.T) { +// func (m *Manager) getServerList() serverList { +func TestManagerInternal_getServerList(t *testing.T) { m := testManager() - sc := m.getServerConfig() - if sc.servers == nil { - t.Fatalf("serverConfig.servers nil") + l := m.getServerList() + if l.servers == nil { + t.Fatalf("serverList.servers nil") } - if len(sc.servers) != 0 { - t.Fatalf("serverConfig.servers length not zero") + if len(l.servers) != 0 { + t.Fatalf("serverList.servers length not zero") } } @@ -148,7 +148,7 @@ func TestManagerInternal_New(t *testing.T) { } } -// func (m *Manager) reconcileServerList(sc *serverConfig) bool { +// func (m *Manager) reconcileServerList(l *serverList) bool { func TestManagerInternal_reconcileServerList(t *testing.T) { tests := []int{0, 1, 2, 3, 4, 5, 10, 100} for _, n := range tests { @@ -207,36 +207,36 @@ func test_reconcileServerList(maxServers int) (bool, error) { // Update Manager's server list to be "healthy" based on Serf. // Reconcile this with origServers, which is shuffled and has a live // connection, but possibly out of date. - origServers := m.getServerConfig() - m.saveServerConfig(serverConfig{servers: healthyServers}) + origServers := m.getServerList() + m.saveServerList(serverList{servers: healthyServers}) // This should always succeed with non-zero server lists if !selectedServerFailed && !m.reconcileServerList(&origServers) && - len(m.getServerConfig().servers) != 0 && + len(m.getServerList().servers) != 0 && len(origServers.servers) != 0 { // If the random gods are unfavorable and we end up with zero // length lists, expect things to fail and retry the test. return false, fmt.Errorf("Expected reconcile to succeed: %v %d %d", selectedServerFailed, - len(m.getServerConfig().servers), + len(m.getServerList().servers), len(origServers.servers)) } // If we have zero-length server lists, test succeeded in degenerate // case. - if len(m.getServerConfig().servers) == 0 && + if len(m.getServerList().servers) == 0 && len(origServers.servers) == 0 { // Failed as expected w/ zero length list return true, nil } resultingServerMap := make(map[server_details.Key]bool) - for _, s := range m.getServerConfig().servers { + for _, s := range m.getServerList().servers { resultingServerMap[*s.Key()] = true } // Test to make sure no failed servers are in the Manager's - // list. Error if there are any failedServers in sc.servers + // list. Error if there are any failedServers in l.servers for _, s := range failedServers { _, ok := resultingServerMap[*s.Key()] if ok { @@ -245,7 +245,7 @@ func test_reconcileServerList(maxServers int) (bool, error) { } // Test to make sure all healthy servers are in the healthy list. - if len(healthyServers) != len(m.getServerConfig().servers) { + if len(healthyServers) != len(m.getServerList().servers) { return false, fmt.Errorf("Expected healthy map and servers to match: %d/%d", len(healthyServers), len(healthyServers)) } @@ -259,7 +259,7 @@ func test_reconcileServerList(maxServers int) (bool, error) { return true, nil } -// func (sc *serverConfig) refreshServerRebalanceTimer() { +// func (l *serverList) refreshServerRebalanceTimer() { func TestManagerInternal_refreshServerRebalanceTimer(t *testing.T) { type clusterSizes struct { numNodes int @@ -312,41 +312,41 @@ func TestManagerInternal_refreshServerRebalanceTimer(t *testing.T) { } } -// func (m *Manager) saveServerConfig(sc serverConfig) { -func TestManagerInternal_saveServerConfig(t *testing.T) { +// func (m *Manager) saveServerList(l serverList) { +func TestManagerInternal_saveServerList(t *testing.T) { m := testManager() // Initial condition func() { - sc := m.getServerConfig() - if len(sc.servers) != 0 { - t.Fatalf("Manager.saveServerConfig failed to load init config") + l := m.getServerList() + if len(l.servers) != 0 { + t.Fatalf("Manager.saveServerList failed to load init config") } newServer := new(server_details.ServerDetails) - sc.servers = append(sc.servers, newServer) - m.saveServerConfig(sc) + l.servers = append(l.servers, newServer) + m.saveServerList(l) }() // Test that save works func() { - sc1 := m.getServerConfig() - t1NumServers := len(sc1.servers) + l1 := m.getServerList() + t1NumServers := len(l1.servers) if t1NumServers != 1 { - t.Fatalf("Manager.saveServerConfig failed to save mutated config") + t.Fatalf("Manager.saveServerList failed to save mutated config") } }() // Verify mutation w/o a save doesn't alter the original func() { newServer := new(server_details.ServerDetails) - sc := m.getServerConfig() - sc.servers = append(sc.servers, newServer) + l := m.getServerList() + l.servers = append(l.servers, newServer) - sc_orig := m.getServerConfig() - origNumServers := len(sc_orig.servers) - if origNumServers >= len(sc.servers) { - t.Fatalf("Manager.saveServerConfig unsaved config overwrote original") + l_orig := m.getServerList() + origNumServers := len(l_orig.servers) + if origNumServers >= len(l.servers) { + t.Fatalf("Manager.saveServerList unsaved config overwrote original") } }() }