diff --git a/consul/client.go b/consul/client.go index 82a4bd7fa..da1241f28 100644 --- a/consul/client.go +++ b/consul/client.go @@ -11,7 +11,7 @@ import ( "time" "github.com/hashicorp/consul/consul/server_details" - "github.com/hashicorp/consul/consul/server_manager" + "github.com/hashicorp/consul/consul/servers" "github.com/hashicorp/consul/consul/structs" "github.com/hashicorp/serf/coordinate" "github.com/hashicorp/serf/serf" @@ -58,9 +58,9 @@ type Client struct { // Connection pool to consul servers connPool *ConnPool - // serverMgr is responsible for the selection and maintenance of + // servers is responsible for the selection and maintenance of // Consul servers this agent uses for RPC requests - serverMgr *server_manager.ServerManager + servers *servers.Manager // eventCh is used to receive events from the // serf cluster in the datacenter @@ -130,9 +130,9 @@ func NewClient(config *Config) (*Client, error) { return nil, fmt.Errorf("Failed to start lan serf: %v", err) } - // Start maintenance task for server_manager - c.serverMgr = server_manager.New(c.logger, c.shutdownCh, c.serf, c.connPool) - go c.serverMgr.Start() + // Start maintenance task for servers + c.servers = servers.New(c.logger, c.shutdownCh, c.serf, c.connPool) + go c.servers.Start() return c, nil } @@ -271,7 +271,7 @@ func (c *Client) nodeJoin(me serf.MemberEvent) { continue } c.logger.Printf("[INFO] consul: adding server %s", parts) - c.serverMgr.AddServer(parts) + c.servers.AddServer(parts) // Trigger the callback if c.config.ServerUp != nil { @@ -288,7 +288,7 @@ func (c *Client) nodeFail(me serf.MemberEvent) { continue } c.logger.Printf("[INFO] consul: removing server %s", parts) - c.serverMgr.RemoveServer(parts) + c.servers.RemoveServer(parts) } } @@ -322,14 +322,14 @@ func (c *Client) localEvent(event serf.UserEvent) { // RPC is used to forward an RPC call to a consul server, or fail if no servers func (c *Client) RPC(method string, args interface{}, reply interface{}) error { - server := c.serverMgr.FindServer() + server := c.servers.FindServer() if server == nil { return structs.ErrNoServers } // Forward to remote Consul if err := c.connPool.RPC(c.config.Datacenter, server.Addr, server.Version, method, args, reply); err != nil { - c.serverMgr.NotifyFailedServer(server) + c.servers.NotifyFailedServer(server) c.logger.Printf("[ERR] consul: RPC failed to server %s: %v", server.Addr, err) return err } @@ -340,7 +340,7 @@ func (c *Client) RPC(method string, args interface{}, reply interface{}) error { // Stats is used to return statistics for debugging and insight // for various sub-systems func (c *Client) Stats() map[string]map[string]string { - numServers := c.serverMgr.NumServers() + numServers := c.servers.NumServers() toString := func(v uint64) string { return strconv.FormatUint(v, 10) diff --git a/consul/client_test.go b/consul/client_test.go index 124a7fb81..3e4d7706c 100644 --- a/consul/client_test.go +++ b/consul/client_test.go @@ -84,7 +84,7 @@ func TestClient_JoinLAN(t *testing.T) { t.Fatalf("err: %v", err) } testutil.WaitForResult(func() (bool, error) { - return c1.serverMgr.NumServers() == 1, nil + return c1.servers.NumServers() == 1, nil }, func(err error) { t.Fatalf("expected consul server") }) @@ -100,7 +100,7 @@ func TestClient_JoinLAN(t *testing.T) { // Check we have a new consul testutil.WaitForResult(func() (bool, error) { - return c1.serverMgr.NumServers() == 1, nil + return c1.servers.NumServers() == 1, nil }, func(err error) { t.Fatalf("expected consul server") }) @@ -270,7 +270,7 @@ func TestClient_RPC_ConsulServerPing(t *testing.T) { // Sleep to allow Serf to sync, shuffle, and let the shuffle complete time.Sleep(1 * time.Second) - c.serverMgr.ResetRebalanceTimer() + c.servers.ResetRebalanceTimer() time.Sleep(1 * time.Second) if len(c.LANMembers()) != numServers+numClients { @@ -286,7 +286,7 @@ func TestClient_RPC_ConsulServerPing(t *testing.T) { var pingCount int for range servers { time.Sleep(1 * time.Second) - s := c.serverMgr.FindServer() + s := c.servers.FindServer() ok, err := c.connPool.PingConsulServer(s) if !ok { t.Errorf("Unable to ping server %v: %s", s.String(), err) @@ -295,7 +295,7 @@ func TestClient_RPC_ConsulServerPing(t *testing.T) { // Artificially fail the server in order to rotate the server // list - c.serverMgr.NotifyFailedServer(s) + c.servers.NotifyFailedServer(s) } if pingCount != numServers { diff --git a/consul/server_manager/server_manager.go b/consul/servers/manager.go similarity index 81% rename from consul/server_manager/server_manager.go rename to consul/servers/manager.go index 24844b0f9..a94cc2b04 100644 --- a/consul/server_manager/server_manager.go +++ b/consul/servers/manager.go @@ -1,4 +1,4 @@ -package server_manager +package servers import ( "log" @@ -60,7 +60,7 @@ type Pinger interface { } // serverConfig is the thread-safe configuration struct used to maintain the -// list of Consul servers in ServerManager. +// list of Consul servers in Manager. // // NOTE(sean@): We are explicitly relying on the fact that serverConfig will // be copied onto the stack. Please keep this structure light. @@ -70,7 +70,7 @@ type serverConfig struct { servers []*server_details.ServerDetails } -type ServerManager struct { +type Manager struct { // serverConfig provides the necessary load/store semantics for the // server list. serverConfigValue atomic.Value @@ -104,10 +104,10 @@ type ServerManager struct { // begin seeing use after the rebalance timer fires or enough servers fail // organically. If the server is already known, merge the new server // details. -func (sm *ServerManager) AddServer(server *server_details.ServerDetails) { - sm.serverConfigLock.Lock() - defer sm.serverConfigLock.Unlock() - sc := sm.getServerConfig() +func (m *Manager) AddServer(server *server_details.ServerDetails) { + m.serverConfigLock.Lock() + defer m.serverConfigLock.Unlock() + sc := m.getServerConfig() // Check if this server is known found := false @@ -134,7 +134,7 @@ func (sm *ServerManager) AddServer(server *server_details.ServerDetails) { sc.servers = newServers } - sm.saveServerConfig(sc) + m.saveServerConfig(sc) } // cycleServers returns a new list of servers that has dequeued the first @@ -184,11 +184,11 @@ func (sc *serverConfig) shuffleServers() { // server list. If the server at the front of the list has failed or fails // during an RPC call, it is rotated to the end of the list. If there are no // servers available, return nil. -func (sm *ServerManager) FindServer() *server_details.ServerDetails { - sc := sm.getServerConfig() +func (m *Manager) FindServer() *server_details.ServerDetails { + sc := m.getServerConfig() numServers := len(sc.servers) if numServers == 0 { - sm.logger.Printf("[WARN] server manager: No servers available") + m.logger.Printf("[WARN] manager: No servers available") return nil } else { // Return whatever is at the front of the list because it is @@ -201,35 +201,35 @@ func (sm *ServerManager) FindServer() *server_details.ServerDetails { // getServerConfig is a convenience method which hides the locking semantics // of atomic.Value from the caller. -func (sm *ServerManager) getServerConfig() serverConfig { - return sm.serverConfigValue.Load().(serverConfig) +func (m *Manager) getServerConfig() serverConfig { + return m.serverConfigValue.Load().(serverConfig) } // saveServerConfig is a convenience method which hides the locking semantics // of atomic.Value from the caller. -func (sm *ServerManager) saveServerConfig(sc serverConfig) { - sm.serverConfigValue.Store(sc) +func (m *Manager) saveServerConfig(sc serverConfig) { + m.serverConfigValue.Store(sc) } -// New is the only way to safely create a new ServerManager struct. -func New(logger *log.Logger, shutdownCh chan struct{}, clusterInfo ConsulClusterInfo, connPoolPinger Pinger) (sm *ServerManager) { - sm = new(ServerManager) - sm.logger = logger - sm.clusterInfo = clusterInfo // can't pass *consul.Client: import cycle - sm.connPoolPinger = connPoolPinger // can't pass *consul.ConnPool: import cycle - sm.rebalanceTimer = time.NewTimer(clientRPCMinReuseDuration) - sm.shutdownCh = shutdownCh +// New is the only way to safely create a new Manager struct. +func New(logger *log.Logger, shutdownCh chan struct{}, clusterInfo ConsulClusterInfo, connPoolPinger Pinger) (m *Manager) { + m = new(Manager) + m.logger = logger + m.clusterInfo = clusterInfo // can't pass *consul.Client: import cycle + m.connPoolPinger = connPoolPinger // can't pass *consul.ConnPool: import cycle + m.rebalanceTimer = time.NewTimer(clientRPCMinReuseDuration) + m.shutdownCh = shutdownCh sc := serverConfig{} sc.servers = make([]*server_details.ServerDetails, 0) - sm.saveServerConfig(sc) - return sm + m.saveServerConfig(sc) + return m } // NotifyFailedServer marks the passed in server as "failed" by rotating it // to the end of the server list. -func (sm *ServerManager) NotifyFailedServer(server *server_details.ServerDetails) { - sc := sm.getServerConfig() +func (m *Manager) NotifyFailedServer(server *server_details.ServerDetails) { + sc := m.getServerConfig() // If the server being failed is not the first server on the list, // this is a noop. If, however, the server is failed and first on @@ -239,26 +239,26 @@ func (sm *ServerManager) NotifyFailedServer(server *server_details.ServerDetails // Only rotate the server list when there is more than one server if len(sc.servers) > 1 && sc.servers[0] == server && // Use atomic.CAS to emulate a TryLock(). - atomic.CompareAndSwapInt32(&sm.notifyFailedBarrier, 0, 1) { - defer atomic.StoreInt32(&sm.notifyFailedBarrier, 0) + atomic.CompareAndSwapInt32(&m.notifyFailedBarrier, 0, 1) { + defer atomic.StoreInt32(&m.notifyFailedBarrier, 0) // Grab a lock, retest, and take the hit of cycling the first // server to the end. - sm.serverConfigLock.Lock() - defer sm.serverConfigLock.Unlock() - sc = sm.getServerConfig() + m.serverConfigLock.Lock() + defer m.serverConfigLock.Unlock() + sc = m.getServerConfig() if len(sc.servers) > 1 && sc.servers[0] == server { sc.servers = sc.cycleServer() - sm.saveServerConfig(sc) + m.saveServerConfig(sc) } } } // NumServers takes out an internal "read lock" and returns the number of // servers. numServers includes both healthy and unhealthy servers. -func (sm *ServerManager) NumServers() (numServers int) { - sc := sm.getServerConfig() +func (m *Manager) NumServers() (numServers int) { + sc := m.getServerConfig() numServers = len(sc.servers) return numServers } @@ -275,9 +275,9 @@ func (sm *ServerManager) NumServers() (numServers int) { // Unhealthy servers are removed when serf notices the server has been // deregistered. Before the newly shuffled server list is saved, the new // remote endpoint is tested to ensure its responsive. -func (sm *ServerManager) RebalanceServers() { +func (m *Manager) RebalanceServers() { // Obtain a copy of the current serverConfig - sc := sm.getServerConfig() + sc := m.getServerConfig() // Early abort if there is no value to shuffling if len(sc.servers) < 2 { @@ -295,12 +295,12 @@ func (sm *ServerManager) RebalanceServers() { // while Serf detects the node has failed. selectedServer := sc.servers[0] - ok, err := sm.connPoolPinger.PingConsulServer(selectedServer) + ok, err := m.connPoolPinger.PingConsulServer(selectedServer) if ok { foundHealthyServer = true break } - sm.logger.Printf(`[DEBUG] server manager: pinging server "%s" failed: %s`, selectedServer.String(), err) + m.logger.Printf(`[DEBUG] manager: pinging server "%s" failed: %s`, selectedServer.String(), err) sc.cycleServer() } @@ -308,13 +308,13 @@ func (sm *ServerManager) RebalanceServers() { // If no healthy servers were found, sleep and wait for Serf to make // the world a happy place again. if !foundHealthyServer { - sm.logger.Printf("[DEBUG] server manager: No healthy servers during rebalance, aborting") + m.logger.Printf("[DEBUG] manager: No healthy servers during rebalance, aborting") return } // Verify that all servers are present - if sm.reconcileServerList(&sc) { - sm.logger.Printf("[DEBUG] server manager: Rebalanced %d servers, next active server is %s", len(sc.servers), sc.servers[0].String()) + if m.reconcileServerList(&sc) { + m.logger.Printf("[DEBUG] manager: Rebalanced %d servers, next active server is %s", len(sc.servers), sc.servers[0].String()) } else { // reconcileServerList failed because Serf removed the server // that was at the front of the list that had successfully @@ -336,13 +336,13 @@ func (sm *ServerManager) RebalanceServers() { // server does not exist in the list (i.e. was removed by Serf during a // PingConsulServer() call. Newly added servers are appended to the list and // other missing servers are removed from the list. -func (sm *ServerManager) reconcileServerList(sc *serverConfig) bool { - sm.serverConfigLock.Lock() - defer sm.serverConfigLock.Unlock() +func (m *Manager) reconcileServerList(sc *serverConfig) bool { + m.serverConfigLock.Lock() + defer m.serverConfigLock.Unlock() // newServerCfg is a serverConfig that has been kept up to date with // Serf node join and node leave events. - newServerCfg := sm.getServerConfig() + newServerCfg := m.getServerConfig() // If Serf has removed all nodes, or there is no selected server // (zero nodes in sc), abort early. @@ -394,16 +394,16 @@ func (sm *ServerManager) reconcileServerList(sc *serverConfig) bool { } } - sm.saveServerConfig(*sc) + m.saveServerConfig(*sc) return true } // RemoveServer takes out an internal write lock and removes a server from // the server list. -func (sm *ServerManager) RemoveServer(server *server_details.ServerDetails) { - sm.serverConfigLock.Lock() - defer sm.serverConfigLock.Unlock() - sc := sm.getServerConfig() +func (m *Manager) RemoveServer(server *server_details.ServerDetails) { + m.serverConfigLock.Lock() + defer m.serverConfigLock.Unlock() + sc := m.getServerConfig() // Remove the server if known for i, _ := range sc.servers { @@ -413,15 +413,15 @@ func (sm *ServerManager) RemoveServer(server *server_details.ServerDetails) { newServers = append(newServers, sc.servers[i+1:]...) sc.servers = newServers - sm.saveServerConfig(sc) + m.saveServerConfig(sc) return } } } -// refreshServerRebalanceTimer is only called once sm.rebalanceTimer expires. -func (sm *ServerManager) refreshServerRebalanceTimer() time.Duration { - sc := sm.getServerConfig() +// refreshServerRebalanceTimer is only called once m.rebalanceTimer expires. +func (m *Manager) refreshServerRebalanceTimer() time.Duration { + sc := m.getServerConfig() numConsulServers := len(sc.servers) // Limit this connection's life based on the size (and health) of the // cluster. Never rebalance a connection more frequently than @@ -429,19 +429,19 @@ func (sm *ServerManager) refreshServerRebalanceTimer() time.Duration { // clusterWideRebalanceConnsPerSec operations/s across numLANMembers. clusterWideRebalanceConnsPerSec := float64(numConsulServers * newRebalanceConnsPerSecPerServer) connReuseLowWatermarkDuration := clientRPCMinReuseDuration + lib.RandomStagger(clientRPCMinReuseDuration/clientRPCJitterFraction) - numLANMembers := sm.clusterInfo.NumNodes() + numLANMembers := m.clusterInfo.NumNodes() connRebalanceTimeout := lib.RateScaledInterval(clusterWideRebalanceConnsPerSec, connReuseLowWatermarkDuration, numLANMembers) - sm.rebalanceTimer.Reset(connRebalanceTimeout) + m.rebalanceTimer.Reset(connRebalanceTimeout) return connRebalanceTimeout } // ResetRebalanceTimer resets the rebalance timer. This method primarily // exists for testing and should not be used directly. -func (sm *ServerManager) ResetRebalanceTimer() { - sm.serverConfigLock.Lock() - defer sm.serverConfigLock.Unlock() - sm.rebalanceTimer.Reset(clientRPCMinReuseDuration) +func (m *Manager) ResetRebalanceTimer() { + m.serverConfigLock.Lock() + defer m.serverConfigLock.Unlock() + m.rebalanceTimer.Reset(clientRPCMinReuseDuration) } // Start is used to start and manage the task of automatically shuffling and @@ -450,15 +450,15 @@ func (sm *ServerManager) ResetRebalanceTimer() { // automatically cycled to the end of the list. New servers are appended to // the list. The order of the server list must be shuffled periodically to // distribute load across all known and available consul servers. -func (sm *ServerManager) Start() { +func (m *Manager) Start() { for { select { - case <-sm.rebalanceTimer.C: - sm.RebalanceServers() - sm.refreshServerRebalanceTimer() + case <-m.rebalanceTimer.C: + m.RebalanceServers() + m.refreshServerRebalanceTimer() - case <-sm.shutdownCh: - sm.logger.Printf("[INFO] server manager: shutting down") + case <-m.shutdownCh: + m.logger.Printf("[INFO] manager: shutting down") return } } diff --git a/consul/server_manager/server_manager_internal_test.go b/consul/servers/manager_internal_test.go similarity index 72% rename from consul/server_manager/server_manager_internal_test.go rename to consul/servers/manager_internal_test.go index 4a5a6d43a..29a1b35af 100644 --- a/consul/server_manager/server_manager_internal_test.go +++ b/consul/servers/manager_internal_test.go @@ -1,4 +1,4 @@ -package server_manager +package servers import ( "bytes" @@ -48,33 +48,33 @@ func (s *fauxSerf) NumNodes() int { return s.numNodes } -func testServerManager() (sm *ServerManager) { +func testManager() (m *Manager) { logger := GetBufferedLogger() shutdownCh := make(chan struct{}) - sm = New(logger, shutdownCh, &fauxSerf{numNodes: 16384}, &fauxConnPool{}) - return sm + m = New(logger, shutdownCh, &fauxSerf{numNodes: 16384}, &fauxConnPool{}) + return m } -func testServerManagerFailProb(failPct float64) (sm *ServerManager) { +func testManagerFailProb(failPct float64) (m *Manager) { logger := GetBufferedLogger() logger = log.New(os.Stderr, "", log.LstdFlags) shutdownCh := make(chan struct{}) - sm = New(logger, shutdownCh, &fauxSerf{}, &fauxConnPool{failPct: failPct}) - return sm + m = New(logger, shutdownCh, &fauxSerf{}, &fauxConnPool{failPct: failPct}) + return m } // func (sc *serverConfig) cycleServer() (servers []*server_details.ServerDetails) { -func TestServerManagerInternal_cycleServer(t *testing.T) { - sm := testServerManager() - sc := sm.getServerConfig() +func TestManagerInternal_cycleServer(t *testing.T) { + m := testManager() + sc := m.getServerConfig() server0 := &server_details.ServerDetails{Name: "server1"} server1 := &server_details.ServerDetails{Name: "server2"} server2 := &server_details.ServerDetails{Name: "server3"} sc.servers = append(sc.servers, server0, server1, server2) - sm.saveServerConfig(sc) + m.saveServerConfig(sc) - sc = sm.getServerConfig() + sc = m.getServerConfig() if len(sc.servers) != 3 { t.Fatalf("server length incorrect: %d/3", len(sc.servers)) } @@ -115,10 +115,10 @@ func TestServerManagerInternal_cycleServer(t *testing.T) { } } -// func (sm *ServerManager) getServerConfig() serverConfig { -func TestServerManagerInternal_getServerConfig(t *testing.T) { - sm := testServerManager() - sc := sm.getServerConfig() +// func (m *Manager) getServerConfig() serverConfig { +func TestManagerInternal_getServerConfig(t *testing.T) { + m := testManager() + sc := m.getServerConfig() if sc.servers == nil { t.Fatalf("serverConfig.servers nil") } @@ -128,28 +128,28 @@ func TestServerManagerInternal_getServerConfig(t *testing.T) { } } -// func New(logger *log.Logger, shutdownCh chan struct{}, clusterInfo ConsulClusterInfo) (sm *ServerManager) { -func TestServerManagerInternal_New(t *testing.T) { - sm := testServerManager() - if sm == nil { - t.Fatalf("ServerManager nil") +// func New(logger *log.Logger, shutdownCh chan struct{}, clusterInfo ConsulClusterInfo) (m *Manager) { +func TestManagerInternal_New(t *testing.T) { + m := testManager() + if m == nil { + t.Fatalf("Manager nil") } - if sm.clusterInfo == nil { - t.Fatalf("ServerManager.clusterInfo nil") + if m.clusterInfo == nil { + t.Fatalf("Manager.clusterInfo nil") } - if sm.logger == nil { - t.Fatalf("ServerManager.logger nil") + if m.logger == nil { + t.Fatalf("Manager.logger nil") } - if sm.shutdownCh == nil { - t.Fatalf("ServerManager.shutdownCh nil") + if m.shutdownCh == nil { + t.Fatalf("Manager.shutdownCh nil") } } -// func (sm *ServerManager) reconcileServerList(sc *serverConfig) bool { -func TestServerManagerInternal_reconcileServerList(t *testing.T) { +// func (m *Manager) reconcileServerList(sc *serverConfig) bool { +func TestManagerInternal_reconcileServerList(t *testing.T) { tests := []int{0, 1, 2, 3, 4, 5, 10, 100} for _, n := range tests { ok, err := test_reconcileServerList(n) @@ -164,22 +164,22 @@ func test_reconcileServerList(maxServers int) (bool, error) { // missing, the added have been added, and the original server is // present. const failPct = 0.5 - sm := testServerManagerFailProb(failPct) + m := testManagerFailProb(failPct) var failedServers, healthyServers []*server_details.ServerDetails for i := 0; i < maxServers; i++ { nodeName := fmt.Sprintf("s%02d", i) node := &server_details.ServerDetails{Name: nodeName} - // Add 66% of servers to ServerManager + // Add 66% of servers to Manager if rand.Float64() > 0.33 { - sm.AddServer(node) + m.AddServer(node) // Of healthy servers, (ab)use connPoolPinger to // failPct of the servers for the reconcile. This // allows for the selected server to no longer be // healthy for the reconcile below. - if ok, _ := sm.connPoolPinger.PingConsulServer(node); ok { + if ok, _ := m.connPoolPinger.PingConsulServer(node); ok { // Will still be present healthyServers = append(healthyServers, node) } else { @@ -192,9 +192,9 @@ func test_reconcileServerList(maxServers int) (bool, error) { } } - // Randomize ServerManager's server list - sm.RebalanceServers() - selectedServer := sm.FindServer() + // Randomize Manager's server list + m.RebalanceServers() + selectedServer := m.FindServer() var selectedServerFailed bool for _, s := range failedServers { @@ -204,38 +204,38 @@ func test_reconcileServerList(maxServers int) (bool, error) { } } - // Update ServerManager's server list to be "healthy" based on Serf. + // Update Manager's server list to be "healthy" based on Serf. // Reconcile this with origServers, which is shuffled and has a live // connection, but possibly out of date. - origServers := sm.getServerConfig() - sm.saveServerConfig(serverConfig{servers: healthyServers}) + origServers := m.getServerConfig() + m.saveServerConfig(serverConfig{servers: healthyServers}) // This should always succeed with non-zero server lists - if !selectedServerFailed && !sm.reconcileServerList(&origServers) && - len(sm.getServerConfig().servers) != 0 && + if !selectedServerFailed && !m.reconcileServerList(&origServers) && + len(m.getServerConfig().servers) != 0 && len(origServers.servers) != 0 { // If the random gods are unfavorable and we end up with zero // length lists, expect things to fail and retry the test. return false, fmt.Errorf("Expected reconcile to succeed: %v %d %d", selectedServerFailed, - len(sm.getServerConfig().servers), + len(m.getServerConfig().servers), len(origServers.servers)) } // If we have zero-length server lists, test succeeded in degenerate // case. - if len(sm.getServerConfig().servers) == 0 && + if len(m.getServerConfig().servers) == 0 && len(origServers.servers) == 0 { // Failed as expected w/ zero length list return true, nil } resultingServerMap := make(map[server_details.Key]bool) - for _, s := range sm.getServerConfig().servers { + for _, s := range m.getServerConfig().servers { resultingServerMap[*s.Key()] = true } - // Test to make sure no failed servers are in the ServerManager's + // Test to make sure no failed servers are in the Manager's // list. Error if there are any failedServers in sc.servers for _, s := range failedServers { _, ok := resultingServerMap[*s.Key()] @@ -245,7 +245,7 @@ func test_reconcileServerList(maxServers int) (bool, error) { } // Test to make sure all healthy servers are in the healthy list. - if len(healthyServers) != len(sm.getServerConfig().servers) { + if len(healthyServers) != len(m.getServerConfig().servers) { return false, fmt.Errorf("Expected healthy map and servers to match: %d/%d", len(healthyServers), len(healthyServers)) } @@ -260,7 +260,7 @@ func test_reconcileServerList(maxServers int) (bool, error) { } // func (sc *serverConfig) refreshServerRebalanceTimer() { -func TestServerManagerInternal_refreshServerRebalanceTimer(t *testing.T) { +func TestManagerInternal_refreshServerRebalanceTimer(t *testing.T) { type clusterSizes struct { numNodes int numServers int @@ -299,54 +299,54 @@ func TestServerManagerInternal_refreshServerRebalanceTimer(t *testing.T) { shutdownCh := make(chan struct{}) for _, s := range clusters { - sm := New(logger, shutdownCh, &fauxSerf{numNodes: s.numNodes}, &fauxConnPool{}) + m := New(logger, shutdownCh, &fauxSerf{numNodes: s.numNodes}, &fauxConnPool{}) for i := 0; i < s.numServers; i++ { nodeName := fmt.Sprintf("s%02d", i) - sm.AddServer(&server_details.ServerDetails{Name: nodeName}) + m.AddServer(&server_details.ServerDetails{Name: nodeName}) } - d := sm.refreshServerRebalanceTimer() + d := m.refreshServerRebalanceTimer() if d < s.minRebalance { t.Errorf("duration too short for cluster of size %d and %d servers (%s < %s)", s.numNodes, s.numServers, d, s.minRebalance) } } } -// func (sm *ServerManager) saveServerConfig(sc serverConfig) { -func TestServerManagerInternal_saveServerConfig(t *testing.T) { - sm := testServerManager() +// func (m *Manager) saveServerConfig(sc serverConfig) { +func TestManagerInternal_saveServerConfig(t *testing.T) { + m := testManager() // Initial condition func() { - sc := sm.getServerConfig() + sc := m.getServerConfig() if len(sc.servers) != 0 { - t.Fatalf("ServerManager.saveServerConfig failed to load init config") + t.Fatalf("Manager.saveServerConfig failed to load init config") } newServer := new(server_details.ServerDetails) sc.servers = append(sc.servers, newServer) - sm.saveServerConfig(sc) + m.saveServerConfig(sc) }() // Test that save works func() { - sc1 := sm.getServerConfig() + sc1 := m.getServerConfig() t1NumServers := len(sc1.servers) if t1NumServers != 1 { - t.Fatalf("ServerManager.saveServerConfig failed to save mutated config") + t.Fatalf("Manager.saveServerConfig failed to save mutated config") } }() // Verify mutation w/o a save doesn't alter the original func() { newServer := new(server_details.ServerDetails) - sc := sm.getServerConfig() + sc := m.getServerConfig() sc.servers = append(sc.servers, newServer) - sc_orig := sm.getServerConfig() + sc_orig := m.getServerConfig() origNumServers := len(sc_orig.servers) if origNumServers >= len(sc.servers) { - t.Fatalf("ServerManager.saveServerConfig unsaved config overwrote original") + t.Fatalf("Manager.saveServerConfig unsaved config overwrote original") } }() } diff --git a/consul/server_manager/server_manager_test.go b/consul/servers/manager_test.go similarity index 58% rename from consul/server_manager/server_manager_test.go rename to consul/servers/manager_test.go index 25673140a..150313247 100644 --- a/consul/server_manager/server_manager_test.go +++ b/consul/servers/manager_test.go @@ -1,4 +1,4 @@ -package server_manager_test +package servers_test import ( "bytes" @@ -10,7 +10,7 @@ import ( "testing" "github.com/hashicorp/consul/consul/server_details" - "github.com/hashicorp/consul/consul/server_manager" + "github.com/hashicorp/consul/consul/servers" ) var ( @@ -48,66 +48,66 @@ func (s *fauxSerf) NumNodes() int { return 16384 } -func testServerManager() (sm *server_manager.ServerManager) { +func testManager() (m *servers.Manager) { logger := GetBufferedLogger() logger = log.New(os.Stderr, "", log.LstdFlags) shutdownCh := make(chan struct{}) - sm = server_manager.New(logger, shutdownCh, &fauxSerf{}, &fauxConnPool{}) - return sm + m = servers.New(logger, shutdownCh, &fauxSerf{}, &fauxConnPool{}) + return m } -func testServerManagerFailProb(failPct float64) (sm *server_manager.ServerManager) { +func testManagerFailProb(failPct float64) (m *servers.Manager) { logger := GetBufferedLogger() logger = log.New(os.Stderr, "", log.LstdFlags) shutdownCh := make(chan struct{}) - sm = server_manager.New(logger, shutdownCh, &fauxSerf{}, &fauxConnPool{failPct: failPct}) - return sm + m = servers.New(logger, shutdownCh, &fauxSerf{}, &fauxConnPool{failPct: failPct}) + return m } -// func (sm *ServerManager) AddServer(server *server_details.ServerDetails) { -func TestServerManager_AddServer(t *testing.T) { - sm := testServerManager() +// func (m *Manager) AddServer(server *server_details.ServerDetails) { +func TestServers_AddServer(t *testing.T) { + m := testManager() var num int - num = sm.NumServers() + num = m.NumServers() if num != 0 { t.Fatalf("Expected zero servers to start") } s1 := &server_details.ServerDetails{Name: "s1"} - sm.AddServer(s1) - num = sm.NumServers() + m.AddServer(s1) + num = m.NumServers() if num != 1 { t.Fatalf("Expected one server") } - sm.AddServer(s1) - num = sm.NumServers() + m.AddServer(s1) + num = m.NumServers() if num != 1 { t.Fatalf("Expected one server (still)") } s2 := &server_details.ServerDetails{Name: "s2"} - sm.AddServer(s2) - num = sm.NumServers() + m.AddServer(s2) + num = m.NumServers() if num != 2 { t.Fatalf("Expected two servers") } } -// func (sm *ServerManager) FindServer() (server *server_details.ServerDetails) { -func TestServerManager_FindServer(t *testing.T) { - sm := testServerManager() +// func (m *Manager) FindServer() (server *server_details.ServerDetails) { +func TestServers_FindServer(t *testing.T) { + m := testManager() - if sm.FindServer() != nil { + if m.FindServer() != nil { t.Fatalf("Expected nil return") } - sm.AddServer(&server_details.ServerDetails{Name: "s1"}) - if sm.NumServers() != 1 { + m.AddServer(&server_details.ServerDetails{Name: "s1"}) + if m.NumServers() != 1 { t.Fatalf("Expected one server") } - s1 := sm.FindServer() + s1 := m.FindServer() if s1 == nil { t.Fatalf("Expected non-nil server") } @@ -115,118 +115,118 @@ func TestServerManager_FindServer(t *testing.T) { t.Fatalf("Expected s1 server") } - s1 = sm.FindServer() + s1 = m.FindServer() if s1 == nil || s1.Name != "s1" { t.Fatalf("Expected s1 server (still)") } - sm.AddServer(&server_details.ServerDetails{Name: "s2"}) - if sm.NumServers() != 2 { + m.AddServer(&server_details.ServerDetails{Name: "s2"}) + if m.NumServers() != 2 { t.Fatalf("Expected two servers") } - s1 = sm.FindServer() + s1 = m.FindServer() if s1 == nil || s1.Name != "s1" { t.Fatalf("Expected s1 server (still)") } - sm.NotifyFailedServer(s1) - s2 := sm.FindServer() + m.NotifyFailedServer(s1) + s2 := m.FindServer() if s2 == nil || s2.Name != "s2" { t.Fatalf("Expected s2 server") } - sm.NotifyFailedServer(s2) - s1 = sm.FindServer() + m.NotifyFailedServer(s2) + s1 = m.FindServer() if s1 == nil || s1.Name != "s1" { t.Fatalf("Expected s1 server") } } -// func New(logger *log.Logger, shutdownCh chan struct{}) (sm *ServerManager) { -func TestServerManager_New(t *testing.T) { +// func New(logger *log.Logger, shutdownCh chan struct{}) (m *Manager) { +func TestServers_New(t *testing.T) { logger := GetBufferedLogger() logger = log.New(os.Stderr, "", log.LstdFlags) shutdownCh := make(chan struct{}) - sm := server_manager.New(logger, shutdownCh, &fauxSerf{}, &fauxConnPool{}) - if sm == nil { - t.Fatalf("ServerManager nil") + m := servers.New(logger, shutdownCh, &fauxSerf{}, &fauxConnPool{}) + if m == nil { + t.Fatalf("Manager nil") } } -// func (sm *ServerManager) NotifyFailedServer(server *server_details.ServerDetails) { -func TestServerManager_NotifyFailedServer(t *testing.T) { - sm := testServerManager() +// func (m *Manager) NotifyFailedServer(server *server_details.ServerDetails) { +func TestServers_NotifyFailedServer(t *testing.T) { + m := testManager() - if sm.NumServers() != 0 { + if m.NumServers() != 0 { t.Fatalf("Expected zero servers to start") } s1 := &server_details.ServerDetails{Name: "s1"} s2 := &server_details.ServerDetails{Name: "s2"} - // Try notifying for a server that is not part of the server manager - sm.NotifyFailedServer(s1) - if sm.NumServers() != 0 { + // Try notifying for a server that is not managed by Manager + m.NotifyFailedServer(s1) + if m.NumServers() != 0 { t.Fatalf("Expected zero servers to start") } - sm.AddServer(s1) + m.AddServer(s1) // Test again w/ a server not in the list - sm.NotifyFailedServer(s2) - if sm.NumServers() != 1 { + m.NotifyFailedServer(s2) + if m.NumServers() != 1 { t.Fatalf("Expected one server") } - sm.AddServer(s2) - if sm.NumServers() != 2 { + m.AddServer(s2) + if m.NumServers() != 2 { t.Fatalf("Expected two servers") } - s1 = sm.FindServer() + s1 = m.FindServer() if s1 == nil || s1.Name != "s1" { t.Fatalf("Expected s1 server") } - sm.NotifyFailedServer(s2) - s1 = sm.FindServer() + m.NotifyFailedServer(s2) + s1 = m.FindServer() if s1 == nil || s1.Name != "s1" { t.Fatalf("Expected s1 server (still)") } - sm.NotifyFailedServer(s1) - s2 = sm.FindServer() + m.NotifyFailedServer(s1) + s2 = m.FindServer() if s2 == nil || s2.Name != "s2" { t.Fatalf("Expected s2 server") } - sm.NotifyFailedServer(s2) - s1 = sm.FindServer() + m.NotifyFailedServer(s2) + s1 = m.FindServer() if s1 == nil || s1.Name != "s1" { t.Fatalf("Expected s1 server") } } -// func (sm *ServerManager) NumServers() (numServers int) { -func TestServerManager_NumServers(t *testing.T) { - sm := testServerManager() +// func (m *Manager) NumServers() (numServers int) { +func TestServers_NumServers(t *testing.T) { + m := testManager() var num int - num = sm.NumServers() + num = m.NumServers() if num != 0 { t.Fatalf("Expected zero servers to start") } s := &server_details.ServerDetails{} - sm.AddServer(s) - num = sm.NumServers() + m.AddServer(s) + num = m.NumServers() if num != 1 { t.Fatalf("Expected one server after AddServer") } } -// func (sm *ServerManager) RebalanceServers() { -func TestServerManager_RebalanceServers(t *testing.T) { +// func (m *Manager) RebalanceServers() { +func TestServers_RebalanceServers(t *testing.T) { const failPct = 0.5 - sm := testServerManagerFailProb(failPct) + m := testManagerFailProb(failPct) const maxServers = 100 const numShuffleTests = 100 const uniquePassRate = 0.5 @@ -234,18 +234,18 @@ func TestServerManager_RebalanceServers(t *testing.T) { // Make a huge list of nodes. for i := 0; i < maxServers; i++ { nodeName := fmt.Sprintf("s%02d", i) - sm.AddServer(&server_details.ServerDetails{Name: nodeName}) + m.AddServer(&server_details.ServerDetails{Name: nodeName}) } // Keep track of how many unique shuffles we get. uniques := make(map[string]struct{}, maxServers) for i := 0; i < numShuffleTests; i++ { - sm.RebalanceServers() + m.RebalanceServers() var names []string for j := 0; j < maxServers; j++ { - server := sm.FindServer() - sm.NotifyFailedServer(server) + server := m.FindServer() + m.NotifyFailedServer(server) names = append(names, server.Name) } key := strings.Join(names, "|") @@ -260,25 +260,25 @@ func TestServerManager_RebalanceServers(t *testing.T) { } } -// func (sm *ServerManager) RemoveServer(server *server_details.ServerDetails) { -func TestServerManager_RemoveServer(t *testing.T) { +// func (m *Manager) RemoveServer(server *server_details.ServerDetails) { +func TestManager_RemoveServer(t *testing.T) { const nodeNameFmt = "s%02d" - sm := testServerManager() + m := testManager() - if sm.NumServers() != 0 { + if m.NumServers() != 0 { t.Fatalf("Expected zero servers to start") } // Test removing server before its added nodeName := fmt.Sprintf(nodeNameFmt, 1) s1 := &server_details.ServerDetails{Name: nodeName} - sm.RemoveServer(s1) - sm.AddServer(s1) + m.RemoveServer(s1) + m.AddServer(s1) nodeName = fmt.Sprintf(nodeNameFmt, 2) s2 := &server_details.ServerDetails{Name: nodeName} - sm.RemoveServer(s2) - sm.AddServer(s2) + m.RemoveServer(s2) + m.AddServer(s2) const maxServers = 19 servers := make([]*server_details.ServerDetails, maxServers) @@ -287,21 +287,21 @@ func TestServerManager_RemoveServer(t *testing.T) { nodeName := fmt.Sprintf(nodeNameFmt, i) server := &server_details.ServerDetails{Name: nodeName} servers = append(servers, server) - sm.AddServer(server) + m.AddServer(server) } - if sm.NumServers() != maxServers { - t.Fatalf("Expected %d servers, received %d", maxServers, sm.NumServers()) + if m.NumServers() != maxServers { + t.Fatalf("Expected %d servers, received %d", maxServers, m.NumServers()) } - sm.RebalanceServers() + m.RebalanceServers() - if sm.NumServers() != maxServers { - t.Fatalf("Expected %d servers, received %d", maxServers, sm.NumServers()) + if m.NumServers() != maxServers { + t.Fatalf("Expected %d servers, received %d", maxServers, m.NumServers()) } findServer := func(server *server_details.ServerDetails) bool { - for i := sm.NumServers(); i > 0; i-- { - s := sm.FindServer() + for i := m.NumServers(); i > 0; i-- { + s := m.FindServer() if s == server { return true } @@ -314,14 +314,14 @@ func TestServerManager_RemoveServer(t *testing.T) { // Remove servers from the front of the list for i := 3; i > 0; i-- { - server := sm.FindServer() + server := m.FindServer() if server == nil { t.Fatalf("FindServer returned nil") } - sm.RemoveServer(server) + m.RemoveServer(server) expectedNumServers-- - if sm.NumServers() != expectedNumServers { - t.Fatalf("Expected %d servers (got %d)", expectedNumServers, sm.NumServers()) + if m.NumServers() != expectedNumServers { + t.Fatalf("Expected %d servers (got %d)", expectedNumServers, m.NumServers()) } if findServer(server) == true { t.Fatalf("Did not expect to find server %s after removal from the front", server.Name) @@ -331,12 +331,12 @@ func TestServerManager_RemoveServer(t *testing.T) { // Remove server from the end of the list for i := 3; i > 0; i-- { - server := sm.FindServer() - sm.NotifyFailedServer(server) - sm.RemoveServer(server) + server := m.FindServer() + m.NotifyFailedServer(server) + m.RemoveServer(server) expectedNumServers-- - if sm.NumServers() != expectedNumServers { - t.Fatalf("Expected %d servers (got %d)", expectedNumServers, sm.NumServers()) + if m.NumServers() != expectedNumServers { + t.Fatalf("Expected %d servers (got %d)", expectedNumServers, m.NumServers()) } if findServer(server) == true { t.Fatalf("Did not expect to find server %s", server.Name) @@ -346,15 +346,15 @@ func TestServerManager_RemoveServer(t *testing.T) { // Remove server from the middle of the list for i := 3; i > 0; i-- { - server := sm.FindServer() - sm.NotifyFailedServer(server) - server2 := sm.FindServer() - sm.NotifyFailedServer(server2) // server2 now at end of the list + server := m.FindServer() + m.NotifyFailedServer(server) + server2 := m.FindServer() + m.NotifyFailedServer(server2) // server2 now at end of the list - sm.RemoveServer(server) + m.RemoveServer(server) expectedNumServers-- - if sm.NumServers() != expectedNumServers { - t.Fatalf("Expected %d servers (got %d)", expectedNumServers, sm.NumServers()) + if m.NumServers() != expectedNumServers { + t.Fatalf("Expected %d servers (got %d)", expectedNumServers, m.NumServers()) } if findServer(server) == true { t.Fatalf("Did not expect to find server %s", server.Name) @@ -362,21 +362,21 @@ func TestServerManager_RemoveServer(t *testing.T) { removedServers = append(removedServers, server) } - if sm.NumServers()+len(removedServers) != maxServers { - t.Fatalf("Expected %d+%d=%d servers", sm.NumServers(), len(removedServers), maxServers) + if m.NumServers()+len(removedServers) != maxServers { + t.Fatalf("Expected %d+%d=%d servers", m.NumServers(), len(removedServers), maxServers) } // Drain the remaining servers from the middle - for i := sm.NumServers(); i > 0; i-- { - server := sm.FindServer() - sm.NotifyFailedServer(server) - server2 := sm.FindServer() - sm.NotifyFailedServer(server2) // server2 now at end of the list - sm.RemoveServer(server) + for i := m.NumServers(); i > 0; i-- { + server := m.FindServer() + m.NotifyFailedServer(server) + server2 := m.FindServer() + m.NotifyFailedServer(server2) // server2 now at end of the list + m.RemoveServer(server) removedServers = append(removedServers, server) } - if sm.NumServers() != 0 { + if m.NumServers() != 0 { t.Fatalf("Expected an empty server list") } if len(removedServers) != maxServers { @@ -384,4 +384,4 @@ func TestServerManager_RemoveServer(t *testing.T) { } } -// func (sm *ServerManager) Start() { +// func (m *Manager) Start() {