Test ServerManager.refreshServerRebalanceTimer
Change the signature so it returns a value so that this can be tested externally with mock data. See the sample table in TestServerManagerInternal_refreshServerRebalanceTimer() for the rate at which it will back off. This function is mostly used to not cripple large clusters in the event of a partition.
This commit is contained in:
parent
8e3b3d766d
commit
dcc64d91c6
|
@ -271,7 +271,7 @@ func (sm *ServerManager) RemoveServer(server *server_details.ServerDetails) {
|
||||||
// refreshServerRebalanceTimer is only called once the rebalanceTimer
|
// refreshServerRebalanceTimer is only called once the rebalanceTimer
|
||||||
// expires. Historically this was an expensive routine and is intended to be
|
// expires. Historically this was an expensive routine and is intended to be
|
||||||
// run in isolation in a dedicated, non-concurrent task.
|
// run in isolation in a dedicated, non-concurrent task.
|
||||||
func (sm *ServerManager) refreshServerRebalanceTimer(timer *time.Timer) {
|
func (sm *ServerManager) refreshServerRebalanceTimer(timer *time.Timer) time.Duration {
|
||||||
serverCfg := sm.getServerConfig()
|
serverCfg := sm.getServerConfig()
|
||||||
numConsulServers := len(serverCfg.servers)
|
numConsulServers := len(serverCfg.servers)
|
||||||
// Limit this connection's life based on the size (and health) of the
|
// Limit this connection's life based on the size (and health) of the
|
||||||
|
@ -280,12 +280,11 @@ func (sm *ServerManager) refreshServerRebalanceTimer(timer *time.Timer) {
|
||||||
// clusterWideRebalanceConnsPerSec operations/s across numLANMembers.
|
// clusterWideRebalanceConnsPerSec operations/s across numLANMembers.
|
||||||
clusterWideRebalanceConnsPerSec := float64(numConsulServers * newRebalanceConnsPerSecPerServer)
|
clusterWideRebalanceConnsPerSec := float64(numConsulServers * newRebalanceConnsPerSecPerServer)
|
||||||
connReuseLowWatermarkDuration := clientRPCMinReuseDuration + lib.RandomStagger(clientRPCMinReuseDuration/clientRPCJitterFraction)
|
connReuseLowWatermarkDuration := clientRPCMinReuseDuration + lib.RandomStagger(clientRPCMinReuseDuration/clientRPCJitterFraction)
|
||||||
|
|
||||||
numLANMembers := sm.clusterInfo.NumNodes()
|
numLANMembers := sm.clusterInfo.NumNodes()
|
||||||
connRebalanceTimeout := lib.RateScaledInterval(clusterWideRebalanceConnsPerSec, connReuseLowWatermarkDuration, numLANMembers)
|
connRebalanceTimeout := lib.RateScaledInterval(clusterWideRebalanceConnsPerSec, connReuseLowWatermarkDuration, numLANMembers)
|
||||||
sm.logger.Printf("[DEBUG] consul: connection will be rebalanced in %v", connRebalanceTimeout)
|
|
||||||
|
|
||||||
timer.Reset(connRebalanceTimeout)
|
timer.Reset(connRebalanceTimeout)
|
||||||
|
return connRebalanceTimeout
|
||||||
}
|
}
|
||||||
|
|
||||||
// saveServerConfig is a convenience method which hides the locking semantics
|
// saveServerConfig is a convenience method which hides the locking semantics
|
||||||
|
|
|
@ -2,8 +2,11 @@ package server_manager
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"bytes"
|
"bytes"
|
||||||
|
"fmt"
|
||||||
"log"
|
"log"
|
||||||
|
"os"
|
||||||
"testing"
|
"testing"
|
||||||
|
"time"
|
||||||
|
|
||||||
"github.com/hashicorp/consul/consul/server_details"
|
"github.com/hashicorp/consul/consul/server_details"
|
||||||
)
|
)
|
||||||
|
@ -23,16 +26,17 @@ func GetBufferedLogger() *log.Logger {
|
||||||
}
|
}
|
||||||
|
|
||||||
type fauxSerf struct {
|
type fauxSerf struct {
|
||||||
|
numNodes int
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s *fauxSerf) NumNodes() int {
|
func (s *fauxSerf) NumNodes() int {
|
||||||
return 16384
|
return s.numNodes
|
||||||
}
|
}
|
||||||
|
|
||||||
func testServerManager() (sm *ServerManager) {
|
func testServerManager() (sm *ServerManager) {
|
||||||
logger := GetBufferedLogger()
|
logger := GetBufferedLogger()
|
||||||
shutdownCh := make(chan struct{})
|
shutdownCh := make(chan struct{})
|
||||||
sm = New(logger, shutdownCh, &fauxSerf{})
|
sm = New(logger, shutdownCh, &fauxSerf{numNodes: 16384})
|
||||||
return sm
|
return sm
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -108,6 +112,10 @@ func TestServerManagerInternal_New(t *testing.T) {
|
||||||
t.Fatalf("ServerManager nil")
|
t.Fatalf("ServerManager nil")
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if sm.clusterInfo == nil {
|
||||||
|
t.Fatalf("ServerManager.clusterInfo nil")
|
||||||
|
}
|
||||||
|
|
||||||
if sm.logger == nil {
|
if sm.logger == nil {
|
||||||
t.Fatalf("ServerManager.logger nil")
|
t.Fatalf("ServerManager.logger nil")
|
||||||
}
|
}
|
||||||
|
@ -117,7 +125,65 @@ func TestServerManagerInternal_New(t *testing.T) {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// func (sc *serverConfig) resetRebalanceTimer(sm *ServerManager) {
|
// func (sc *serverConfig) refreshServerRebalanceTimer(timer *time.Timer) {
|
||||||
|
func TestServerManagerInternal_refreshServerRebalanceTimer(t *testing.T) {
|
||||||
|
sm := testServerManager()
|
||||||
|
|
||||||
|
timer := time.NewTimer(time.Duration(1 * time.Nanosecond))
|
||||||
|
time.Sleep(1 * time.Millisecond)
|
||||||
|
sm.refreshServerRebalanceTimer(timer)
|
||||||
|
|
||||||
|
logger := log.New(os.Stderr, "", log.LstdFlags)
|
||||||
|
shutdownCh := make(chan struct{})
|
||||||
|
|
||||||
|
type clusterSizes struct {
|
||||||
|
numNodes int
|
||||||
|
numServers int
|
||||||
|
minRebalance time.Duration
|
||||||
|
}
|
||||||
|
clusters := []clusterSizes{
|
||||||
|
{0, 3, 2 * time.Minute},
|
||||||
|
{1, 0, 2 * time.Minute}, // partitioned cluster
|
||||||
|
{1, 3, 2 * time.Minute},
|
||||||
|
{2, 3, 2 * time.Minute},
|
||||||
|
{100, 0, 2 * time.Minute}, // partitioned
|
||||||
|
{100, 1, 2 * time.Minute}, // partitioned
|
||||||
|
{100, 3, 2 * time.Minute},
|
||||||
|
{1024, 1, 2 * time.Minute}, // partitioned
|
||||||
|
{1024, 3, 2 * time.Minute}, // partitioned
|
||||||
|
{1024, 5, 2 * time.Minute},
|
||||||
|
{16384, 1, 4 * time.Minute}, // partitioned
|
||||||
|
{16384, 2, 2 * time.Minute}, // partitioned
|
||||||
|
{16384, 3, 2 * time.Minute}, // partitioned
|
||||||
|
{16384, 5, 2 * time.Minute},
|
||||||
|
{65535, 0, 2 * time.Minute}, // partitioned
|
||||||
|
{65535, 1, 8 * time.Minute}, // partitioned
|
||||||
|
{65535, 2, 3 * time.Minute}, // partitioned
|
||||||
|
{65535, 3, 5 * time.Minute}, // partitioned
|
||||||
|
{65535, 5, 3 * time.Minute}, // partitioned
|
||||||
|
{65535, 7, 2 * time.Minute},
|
||||||
|
{1000000, 1, 4 * time.Hour}, // partitioned
|
||||||
|
{1000000, 2, 2 * time.Hour}, // partitioned
|
||||||
|
{1000000, 3, 80 * time.Minute}, // partitioned
|
||||||
|
{1000000, 5, 50 * time.Minute}, // partitioned
|
||||||
|
{1000000, 11, 20 * time.Minute}, // partitioned
|
||||||
|
{1000000, 19, 10 * time.Minute},
|
||||||
|
}
|
||||||
|
|
||||||
|
for _, s := range clusters {
|
||||||
|
sm := New(logger, shutdownCh, &fauxSerf{numNodes: s.numNodes})
|
||||||
|
|
||||||
|
for i := 0; i < s.numServers; i++ {
|
||||||
|
nodeName := fmt.Sprintf("s%02d", i)
|
||||||
|
sm.AddServer(&server_details.ServerDetails{Name: nodeName})
|
||||||
|
}
|
||||||
|
|
||||||
|
d := sm.refreshServerRebalanceTimer(timer)
|
||||||
|
if d < s.minRebalance {
|
||||||
|
t.Fatalf("duration too short for cluster of size %d and %d servers (%s < %s)", s.numNodes, s.numServers, d, s.minRebalance)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
// func (sm *ServerManager) saveServerConfig(sc serverConfig) {
|
// func (sm *ServerManager) saveServerConfig(sc serverConfig) {
|
||||||
func TestServerManagerInternal_saveServerConfig(t *testing.T) {
|
func TestServerManagerInternal_saveServerConfig(t *testing.T) {
|
||||||
|
|
Loading…
Reference in a new issue