Adds missing unit tests and cleans up some router bugs.
This commit is contained in:
parent
850ac50e99
commit
8cc06ec10d
|
@ -377,256 +377,3 @@ func TestRTT_sortNodesByDistanceFrom_CheckServiceNodes(t *testing.T) {
|
|||
}
|
||||
verifyCheckServiceNodeSort(t, nodes, "node2,node3,node5,node4,node1,apple")
|
||||
}
|
||||
|
||||
/*
|
||||
|
||||
// mockNodeMap is keyed by node name and the values are the coordinates of the
|
||||
// node.
|
||||
type mockNodeMap map[string]*coordinate.Coordinate
|
||||
|
||||
// mockServer is used to provide a serfer interface for unit tests. The key is
|
||||
// DC, which selects a map from node name to coordinate for that node.
|
||||
type mockServer map[string]mockNodeMap
|
||||
|
||||
// newMockServer is used to generate a serfer interface that presents a known DC
|
||||
// topology for unit tests. The server is in dc0.
|
||||
//
|
||||
// Here's the layout of the nodes:
|
||||
//
|
||||
// /---- dc1 ----\ /- dc2 -\ /- dc0 -\
|
||||
// node2 node1 node3 node1 node1
|
||||
// | | | | | | | | | | |
|
||||
// 0 1 2 3 4 5 6 7 8 9 10 (ms)
|
||||
//
|
||||
// We also include a node4 in dc1 with no known coordinate, as well as a
|
||||
// mysterious dcX with no nodes with known coordinates.
|
||||
//
|
||||
func newMockServer() *mockServer {
|
||||
s := make(mockServer)
|
||||
s["dc0"] = mockNodeMap{
|
||||
"dc0.node1": lib.GenerateCoordinate(10 * time.Millisecond),
|
||||
}
|
||||
s["dc1"] = mockNodeMap{
|
||||
"dc1.node1": lib.GenerateCoordinate(3 * time.Millisecond),
|
||||
"dc1.node2": lib.GenerateCoordinate(2 * time.Millisecond),
|
||||
"dc1.node3": lib.GenerateCoordinate(5 * time.Millisecond),
|
||||
"dc1.node4": nil, // no known coordinate
|
||||
}
|
||||
s["dc2"] = mockNodeMap{
|
||||
"dc2.node1": lib.GenerateCoordinate(8 * time.Millisecond),
|
||||
}
|
||||
s["dcX"] = mockNodeMap{
|
||||
"dcX.node1": nil, // no known coordinate
|
||||
}
|
||||
return &s
|
||||
}
|
||||
|
||||
// See serfer.
|
||||
func (s *mockServer) GetDatacenter() string {
|
||||
return "dc0"
|
||||
}
|
||||
|
||||
// See serfer.
|
||||
func (s *mockServer) GetCoordinate() (*coordinate.Coordinate, error) {
|
||||
return (*s)["dc0"]["dc0.node1"], nil
|
||||
}
|
||||
|
||||
// See serfer.
|
||||
func (s *mockServer) GetCachedCoordinate(node string) (*coordinate.Coordinate, bool) {
|
||||
for _, nodes := range *s {
|
||||
for n, coord := range nodes {
|
||||
if n == node && coord != nil {
|
||||
return coord, true
|
||||
}
|
||||
}
|
||||
}
|
||||
return nil, false
|
||||
}
|
||||
|
||||
// See serfer.
|
||||
func (s *mockServer) GetNodesForDatacenter(dc string) []string {
|
||||
nodes := make([]string, 0)
|
||||
if n, ok := (*s)[dc]; ok {
|
||||
for name := range n {
|
||||
nodes = append(nodes, name)
|
||||
}
|
||||
}
|
||||
sort.Strings(nodes)
|
||||
return nodes
|
||||
}
|
||||
|
||||
func TestRTT_getDatacenterDistance(t *testing.T) {
|
||||
s := newMockServer()
|
||||
|
||||
// The serfer's own DC is always 0 ms away.
|
||||
if dist, err := getDatacenterDistance(s, "dc0"); err != nil || dist != 0.0 {
|
||||
t.Fatalf("bad: %v err: %v", dist, err)
|
||||
}
|
||||
|
||||
// Check a DC with no coordinates, which should give positive infinity.
|
||||
if dist, err := getDatacenterDistance(s, "dcX"); err != nil || dist != math.Inf(1.0) {
|
||||
t.Fatalf("bad: %v err: %v", dist, err)
|
||||
}
|
||||
|
||||
// Similar for a totally unknown DC.
|
||||
if dist, err := getDatacenterDistance(s, "acdc"); err != nil || dist != math.Inf(1.0) {
|
||||
t.Fatalf("bad: %v err: %v", dist, err)
|
||||
}
|
||||
|
||||
// Check the trivial median case (just one node).
|
||||
if dist, err := getDatacenterDistance(s, "dc2"); err != nil || dist != 0.002 {
|
||||
t.Fatalf("bad: %v err: %v", dist, err)
|
||||
}
|
||||
|
||||
// Check the more interesting median case, note that there's a mystery
|
||||
// node4 in there that should be excluded to make the distances sort
|
||||
// like this:
|
||||
//
|
||||
// [0] node3 (0.005), [1] node1 (0.007), [2] node2 (0.008)
|
||||
//
|
||||
// So the median should be at index 3 / 2 = 1 -> 0.007.
|
||||
if dist, err := getDatacenterDistance(s, "dc1"); err != nil || dist != 0.007 {
|
||||
t.Fatalf("bad: %v err: %v", dist, err)
|
||||
}
|
||||
}
|
||||
|
||||
func TestRTT_sortDatacentersByDistance(t *testing.T) {
|
||||
s := newMockServer()
|
||||
|
||||
dcs := []string{"acdc", "dc0", "dc1", "dc2", "dcX"}
|
||||
if err := sortDatacentersByDistance(s, dcs); err != nil {
|
||||
t.Fatalf("err: %v", err)
|
||||
}
|
||||
|
||||
expected := "dc0,dc2,dc1,acdc,dcX"
|
||||
if actual := strings.Join(dcs, ","); actual != expected {
|
||||
t.Fatalf("bad sort: %s != %s", actual, expected)
|
||||
}
|
||||
|
||||
// Make sure the sort is stable and we didn't just get lucky.
|
||||
dcs = []string{"dcX", "dc0", "dc1", "dc2", "acdc"}
|
||||
if err := sortDatacentersByDistance(s, dcs); err != nil {
|
||||
t.Fatalf("err: %v", err)
|
||||
}
|
||||
|
||||
expected = "dc0,dc2,dc1,dcX,acdc"
|
||||
if actual := strings.Join(dcs, ","); actual != expected {
|
||||
t.Fatalf("bad sort: %s != %s", actual, expected)
|
||||
}
|
||||
}
|
||||
|
||||
func TestRTT_getDatacenterMaps(t *testing.T) {
|
||||
s := newMockServer()
|
||||
|
||||
dcs := []string{"dc0", "acdc", "dc1", "dc2", "dcX"}
|
||||
maps := getDatacenterMaps(s, dcs)
|
||||
|
||||
if len(maps) != 5 {
|
||||
t.Fatalf("bad: %v", maps)
|
||||
}
|
||||
|
||||
if maps[0].Datacenter != "dc0" || len(maps[0].Coordinates) != 1 ||
|
||||
maps[0].Coordinates[0].Node != "dc0.node1" {
|
||||
t.Fatalf("bad: %v", maps[0])
|
||||
}
|
||||
verifyCoordinatesEqual(t, maps[0].Coordinates[0].Coord,
|
||||
lib.GenerateCoordinate(10*time.Millisecond))
|
||||
|
||||
if maps[1].Datacenter != "acdc" || len(maps[1].Coordinates) != 0 {
|
||||
t.Fatalf("bad: %v", maps[1])
|
||||
}
|
||||
|
||||
if maps[2].Datacenter != "dc1" || len(maps[2].Coordinates) != 3 ||
|
||||
maps[2].Coordinates[0].Node != "dc1.node1" ||
|
||||
maps[2].Coordinates[1].Node != "dc1.node2" ||
|
||||
maps[2].Coordinates[2].Node != "dc1.node3" {
|
||||
t.Fatalf("bad: %v", maps[2])
|
||||
}
|
||||
verifyCoordinatesEqual(t, maps[2].Coordinates[0].Coord,
|
||||
lib.GenerateCoordinate(3*time.Millisecond))
|
||||
verifyCoordinatesEqual(t, maps[2].Coordinates[1].Coord,
|
||||
lib.GenerateCoordinate(2*time.Millisecond))
|
||||
verifyCoordinatesEqual(t, maps[2].Coordinates[2].Coord,
|
||||
lib.GenerateCoordinate(5*time.Millisecond))
|
||||
|
||||
if maps[3].Datacenter != "dc2" || len(maps[3].Coordinates) != 1 ||
|
||||
maps[3].Coordinates[0].Node != "dc2.node1" {
|
||||
t.Fatalf("bad: %v", maps[3])
|
||||
}
|
||||
verifyCoordinatesEqual(t, maps[3].Coordinates[0].Coord,
|
||||
lib.GenerateCoordinate(8*time.Millisecond))
|
||||
|
||||
if maps[4].Datacenter != "dcX" || len(maps[4].Coordinates) != 0 {
|
||||
t.Fatalf("bad: %v", maps[4])
|
||||
}
|
||||
}
|
||||
|
||||
func TestRTT_getDatacentersByDistance(t *testing.T) {
|
||||
dir1, s1 := testServerWithConfig(t, func(c *Config) {
|
||||
c.Datacenter = "xxx"
|
||||
})
|
||||
defer os.RemoveAll(dir1)
|
||||
defer s1.Shutdown()
|
||||
codec1 := rpcClient(t, s1)
|
||||
defer codec1.Close()
|
||||
|
||||
dir2, s2 := testServerWithConfig(t, func(c *Config) {
|
||||
c.Datacenter = "dc1"
|
||||
})
|
||||
defer os.RemoveAll(dir2)
|
||||
defer s2.Shutdown()
|
||||
codec2 := rpcClient(t, s2)
|
||||
defer codec2.Close()
|
||||
|
||||
dir3, s3 := testServerWithConfig(t, func(c *Config) {
|
||||
c.Datacenter = "dc2"
|
||||
})
|
||||
defer os.RemoveAll(dir3)
|
||||
defer s3.Shutdown()
|
||||
codec3 := rpcClient(t, s3)
|
||||
defer codec3.Close()
|
||||
|
||||
testutil.WaitForLeader(t, s1.RPC, "xxx")
|
||||
testutil.WaitForLeader(t, s2.RPC, "dc1")
|
||||
testutil.WaitForLeader(t, s3.RPC, "dc2")
|
||||
|
||||
// Do the WAN joins.
|
||||
addr := fmt.Sprintf("127.0.0.1:%d",
|
||||
s1.config.SerfWANConfig.MemberlistConfig.BindPort)
|
||||
if _, err := s2.JoinWAN([]string{addr}); err != nil {
|
||||
t.Fatalf("err: %v", err)
|
||||
}
|
||||
if _, err := s3.JoinWAN([]string{addr}); err != nil {
|
||||
t.Fatalf("err: %v", err)
|
||||
}
|
||||
testutil.WaitForResult(
|
||||
func() (bool, error) {
|
||||
return len(s1.WANMembers()) > 2, nil
|
||||
},
|
||||
func(err error) {
|
||||
t.Fatalf("Failed waiting for WAN join: %v", err)
|
||||
})
|
||||
|
||||
// Get the DCs by distance. We don't have coordinate updates yet, but
|
||||
// having xxx show up first proves we are calling the distance sort,
|
||||
// since it would normally do a string sort.
|
||||
dcs, err := s1.getDatacentersByDistance()
|
||||
if err != nil {
|
||||
t.Fatalf("err: %s", err)
|
||||
}
|
||||
if len(dcs) != 3 || dcs[0] != "xxx" {
|
||||
t.Fatalf("bad: %v", dcs)
|
||||
}
|
||||
|
||||
// Let's disable coordinates just to be sure.
|
||||
s1.config.DisableCoordinates = true
|
||||
dcs, err = s1.getDatacentersByDistance()
|
||||
if err != nil {
|
||||
t.Fatalf("err: %s", err)
|
||||
}
|
||||
if len(dcs) != 3 || dcs[0] != "dc1" {
|
||||
t.Fatalf("bad: %v", dcs)
|
||||
}
|
||||
}
|
||||
|
||||
*/
|
||||
|
|
|
@ -1,7 +1,6 @@
|
|||
package servers_test
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"fmt"
|
||||
"log"
|
||||
"math/rand"
|
||||
|
@ -13,20 +12,6 @@ import (
|
|||
"github.com/hashicorp/consul/consul/servers"
|
||||
)
|
||||
|
||||
var (
|
||||
localLogger *log.Logger
|
||||
localLogBuffer *bytes.Buffer
|
||||
)
|
||||
|
||||
func init() {
|
||||
localLogBuffer = new(bytes.Buffer)
|
||||
localLogger = log.New(localLogBuffer, "", 0)
|
||||
}
|
||||
|
||||
func GetBufferedLogger() *log.Logger {
|
||||
return localLogger
|
||||
}
|
||||
|
||||
type fauxConnPool struct {
|
||||
// failPct between 0.0 and 1.0 == pct of time a Ping should fail
|
||||
failPct float64
|
||||
|
@ -49,16 +34,14 @@ func (s *fauxSerf) NumNodes() int {
|
|||
}
|
||||
|
||||
func testManager() (m *servers.Manager) {
|
||||
logger := GetBufferedLogger()
|
||||
logger = log.New(os.Stderr, "", log.LstdFlags)
|
||||
logger := log.New(os.Stderr, "", log.LstdFlags)
|
||||
shutdownCh := make(chan struct{})
|
||||
m = servers.New(logger, shutdownCh, &fauxSerf{}, &fauxConnPool{})
|
||||
return m
|
||||
}
|
||||
|
||||
func testManagerFailProb(failPct float64) (m *servers.Manager) {
|
||||
logger := GetBufferedLogger()
|
||||
logger = log.New(os.Stderr, "", log.LstdFlags)
|
||||
logger := log.New(os.Stderr, "", log.LstdFlags)
|
||||
shutdownCh := make(chan struct{})
|
||||
m = servers.New(logger, shutdownCh, &fauxSerf{}, &fauxConnPool{failPct: failPct})
|
||||
return m
|
||||
|
@ -144,8 +127,7 @@ func TestServers_FindServer(t *testing.T) {
|
|||
|
||||
// func New(logger *log.Logger, shutdownCh chan struct{}) (m *Manager) {
|
||||
func TestServers_New(t *testing.T) {
|
||||
logger := GetBufferedLogger()
|
||||
logger = log.New(os.Stderr, "", log.LstdFlags)
|
||||
logger := log.New(os.Stderr, "", log.LstdFlags)
|
||||
shutdownCh := make(chan struct{})
|
||||
m := servers.New(logger, shutdownCh, &fauxSerf{}, &fauxConnPool{})
|
||||
if m == nil {
|
||||
|
|
|
@ -14,12 +14,24 @@ import (
|
|||
"github.com/hashicorp/serf/serf"
|
||||
)
|
||||
|
||||
// Router keeps track of a set of network areas and their associated Serf
|
||||
// membership of Consul servers. It then indexes this by datacenter to provide
|
||||
// healthy routes to servers by datacenter.
|
||||
type Router struct {
|
||||
// logger is used for diagnostic output.
|
||||
logger *log.Logger
|
||||
|
||||
// localDatacenter has the name of the router's home datacenter. This is
|
||||
// used to short-circuit RTT calculations for local servers.
|
||||
localDatacenter string
|
||||
areas map[types.AreaID]*areaInfo
|
||||
managers map[string][]*Manager
|
||||
|
||||
// areas maps area IDs to structures holding information about that
|
||||
// area.
|
||||
areas map[types.AreaID]*areaInfo
|
||||
|
||||
// managers is an index from datacenter names to a list of server
|
||||
// managers for that datacenter. This is used to quickly lookup routes.
|
||||
managers map[string][]*Manager
|
||||
|
||||
// This top-level lock covers all the internal state.
|
||||
sync.RWMutex
|
||||
|
@ -31,20 +43,36 @@ type RouterSerfCluster interface {
|
|||
NumNodes() int
|
||||
Members() []serf.Member
|
||||
GetCoordinate() (*coordinate.Coordinate, error)
|
||||
GetCachedCoordinate(name string) (coord *coordinate.Coordinate, ok bool)
|
||||
GetCachedCoordinate(name string) (*coordinate.Coordinate, bool)
|
||||
}
|
||||
|
||||
// managerInfo holds a server manager for a datacenter along with its associated
|
||||
// shutdown channel.
|
||||
type managerInfo struct {
|
||||
manager *Manager
|
||||
// manager is notified about servers for this datacenter.
|
||||
manager *Manager
|
||||
|
||||
// shutdownCh is only given to this manager so we can shut it down when
|
||||
// all servers for this datacenter are gone.
|
||||
shutdownCh chan struct{}
|
||||
}
|
||||
|
||||
// areaInfo holds information about a given network area.
|
||||
type areaInfo struct {
|
||||
cluster RouterSerfCluster
|
||||
pinger Pinger
|
||||
// cluster is the Serf instance for this network area.
|
||||
cluster RouterSerfCluster
|
||||
|
||||
// pinger is used to ping servers in this network area when trying to
|
||||
// find a new, healthy server to talk to.
|
||||
pinger Pinger
|
||||
|
||||
// managers maps datacenter names to managers for that datacenter in
|
||||
// this area.
|
||||
managers map[string]*managerInfo
|
||||
}
|
||||
|
||||
// NewRouter returns a new router with the given configuration. This will also
|
||||
// spawn a goroutine that cleans up when the given shutdownCh is closed.
|
||||
func NewRouter(logger *log.Logger, shutdownCh chan struct{}, localDatacenter string) *Router {
|
||||
router := &Router{
|
||||
logger: logger,
|
||||
|
@ -72,6 +100,7 @@ func NewRouter(logger *log.Logger, shutdownCh chan struct{}, localDatacenter str
|
|||
return router
|
||||
}
|
||||
|
||||
// AddArea registers a new network area with the router.
|
||||
func (r *Router) AddArea(areaID types.AreaID, cluster RouterSerfCluster, pinger Pinger) error {
|
||||
r.Lock()
|
||||
defer r.Unlock()
|
||||
|
@ -80,11 +109,30 @@ func (r *Router) AddArea(areaID types.AreaID, cluster RouterSerfCluster, pinger
|
|||
return fmt.Errorf("area ID %q already exists", areaID)
|
||||
}
|
||||
|
||||
r.areas[areaID] = &areaInfo{
|
||||
area := &areaInfo{
|
||||
cluster: cluster,
|
||||
pinger: pinger,
|
||||
managers: make(map[string]*managerInfo),
|
||||
}
|
||||
r.areas[areaID] = area
|
||||
|
||||
// Do an initial populate of the manager so that we don't have to wait
|
||||
// for events to fire. This lets us attempt to use all the known servers
|
||||
// initially, and then will quickly detect that they are failed if we
|
||||
// can't reach them.
|
||||
for _, m := range cluster.Members() {
|
||||
ok, parts := agent.IsConsulServer(m)
|
||||
if !ok {
|
||||
r.logger.Printf("[WARN]: consul: Non-server %q in server-only area %q",
|
||||
m.Name, areaID)
|
||||
continue
|
||||
}
|
||||
|
||||
if err := r.addServer(area, parts); err != nil {
|
||||
return fmt.Errorf("failed to add server %q to area %q: %v", m.Name, areaID, err)
|
||||
}
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
|
@ -96,12 +144,16 @@ func (r *Router) removeManagerFromIndex(datacenter string, manager *Manager) {
|
|||
for i := 0; i < len(managers); i++ {
|
||||
if managers[i] == manager {
|
||||
r.managers[datacenter] = append(managers[:i], managers[i+1:]...)
|
||||
if len(r.managers[datacenter]) == 0 {
|
||||
delete(r.managers, datacenter)
|
||||
}
|
||||
return
|
||||
}
|
||||
}
|
||||
panic("managers index out of sync")
|
||||
}
|
||||
|
||||
// RemoveArea removes an existing network area from the router.
|
||||
func (r *Router) RemoveArea(areaID types.AreaID) error {
|
||||
r.Lock()
|
||||
defer r.Unlock()
|
||||
|
@ -121,15 +173,8 @@ func (r *Router) RemoveArea(areaID types.AreaID) error {
|
|||
return nil
|
||||
}
|
||||
|
||||
func (r *Router) AddServer(areaID types.AreaID, s *agent.Server) error {
|
||||
r.Lock()
|
||||
defer r.Unlock()
|
||||
|
||||
area, ok := r.areas[areaID]
|
||||
if !ok {
|
||||
return fmt.Errorf("area ID %q does not exist", areaID)
|
||||
}
|
||||
|
||||
// addServer does the work of AddServer once the write lock is held.
|
||||
func (r *Router) addServer(area *areaInfo, s *agent.Server) error {
|
||||
// Make the manager on the fly if this is the first we've seen of it,
|
||||
// and add it to the index.
|
||||
info, ok := area.managers[s.Datacenter]
|
||||
|
@ -140,6 +185,7 @@ func (r *Router) AddServer(areaID types.AreaID, s *agent.Server) error {
|
|||
manager: manager,
|
||||
shutdownCh: shutdownCh,
|
||||
}
|
||||
area.managers[s.Datacenter] = info
|
||||
|
||||
managers := r.managers[s.Datacenter]
|
||||
r.managers[s.Datacenter] = append(managers, manager)
|
||||
|
@ -149,6 +195,21 @@ func (r *Router) AddServer(areaID types.AreaID, s *agent.Server) error {
|
|||
return nil
|
||||
}
|
||||
|
||||
// AddServer should be called whenever a new server joins an area. This is
|
||||
// typically hooked into the Serf event handler area for this area.
|
||||
func (r *Router) AddServer(areaID types.AreaID, s *agent.Server) error {
|
||||
r.Lock()
|
||||
defer r.Unlock()
|
||||
|
||||
area, ok := r.areas[areaID]
|
||||
if !ok {
|
||||
return fmt.Errorf("area ID %q does not exist", areaID)
|
||||
}
|
||||
return r.addServer(area, s)
|
||||
}
|
||||
|
||||
// RemoveServer should be called whenever a server is removed from an area. This
|
||||
// is typically hooked into the Serf event handler area for this area.
|
||||
func (r *Router) RemoveServer(areaID types.AreaID, s *agent.Server) error {
|
||||
r.Lock()
|
||||
defer r.Unlock()
|
||||
|
@ -178,6 +239,10 @@ func (r *Router) RemoveServer(areaID types.AreaID, s *agent.Server) error {
|
|||
return nil
|
||||
}
|
||||
|
||||
// FailServer should be called whenever a server is failed in an area. This
|
||||
// is typically hooked into the Serf event handler area for this area. We will
|
||||
// immediately shift traffic away from this server, but it will remain in the
|
||||
// list of servers.
|
||||
func (r *Router) FailServer(areaID types.AreaID, s *agent.Server) error {
|
||||
r.RLock()
|
||||
defer r.RUnlock()
|
||||
|
@ -199,6 +264,36 @@ func (r *Router) FailServer(areaID types.AreaID, s *agent.Server) error {
|
|||
return nil
|
||||
}
|
||||
|
||||
// FindRoute returns a healthy server with a route to the given datacenter. The
|
||||
// Boolean return parameter will indicate if a server was available. In some
|
||||
// cases this may return a best-effort unhealthy server that can be used for a
|
||||
// connection attempt. If any problem occurs with the given server, the caller
|
||||
// should feed that back to the manager associated with the server, which is
|
||||
// also returned, by calling NofifyFailedServer().
|
||||
func (r *Router) FindRoute(datacenter string) (*Manager, *agent.Server, bool) {
|
||||
r.RLock()
|
||||
defer r.RUnlock()
|
||||
|
||||
// Get the list of managers for this datacenter. This will usually just
|
||||
// have one entry, but it's possible to have a user-defined area + WAN.
|
||||
managers, ok := r.managers[datacenter]
|
||||
if !ok {
|
||||
return nil, nil, false
|
||||
}
|
||||
|
||||
// Try each manager until we get a server.
|
||||
for _, manager := range managers {
|
||||
if s := manager.FindServer(); s != nil {
|
||||
return manager, s, true
|
||||
}
|
||||
}
|
||||
|
||||
// Didn't find a route (even via an unhealthy server).
|
||||
return nil, nil, false
|
||||
}
|
||||
|
||||
// GetDatacenters returns a list of datacenters known to the router, sorted by
|
||||
// name.
|
||||
func (r *Router) GetDatacenters() []string {
|
||||
r.RLock()
|
||||
defer r.RUnlock()
|
||||
|
@ -236,6 +331,10 @@ func (n *datacenterSorter) Less(i, j int) bool {
|
|||
return n.Vec[i] < n.Vec[j]
|
||||
}
|
||||
|
||||
// GetDatacentersByDeistance returns a list of datacenters known to the router,
|
||||
// sorted by median RTT from this server to the servers in each datacenter. If
|
||||
// there are multiple areas that reach a given datacenter, this will use the
|
||||
// lowest RTT for the sort.
|
||||
func (r *Router) GetDatacentersByDistance() ([]string, error) {
|
||||
r.RLock()
|
||||
defer r.RUnlock()
|
||||
|
@ -302,6 +401,8 @@ func (r *Router) GetDatacentersByDistance() ([]string, error) {
|
|||
return names, nil
|
||||
}
|
||||
|
||||
// GetDatacenterMaps returns a structure with the raw network coordinates of
|
||||
// each known server, organized by datacenter and network area.
|
||||
func (r *Router) GetDatacenterMaps() ([]structs.DatacenterMap, error) {
|
||||
r.RLock()
|
||||
defer r.RUnlock()
|
||||
|
@ -339,25 +440,3 @@ func (r *Router) GetDatacenterMaps() ([]structs.DatacenterMap, error) {
|
|||
}
|
||||
return maps, nil
|
||||
}
|
||||
|
||||
func (r *Router) FindRoute(datacenter string) (*Manager, *agent.Server, bool) {
|
||||
r.RLock()
|
||||
defer r.RUnlock()
|
||||
|
||||
// Get the list of managers for this datacenter. This will usually just
|
||||
// have one entry, but it's possible to have a user-defined area + WAN.
|
||||
managers, ok := r.managers[datacenter]
|
||||
if !ok {
|
||||
return nil, nil, false
|
||||
}
|
||||
|
||||
// Try each manager until we get a server.
|
||||
for _, manager := range managers {
|
||||
if s := manager.FindServer(); s != nil {
|
||||
return manager, s, true
|
||||
}
|
||||
}
|
||||
|
||||
// Didn't find a route (even via an unhealthy server).
|
||||
return nil, nil, false
|
||||
}
|
||||
|
|
|
@ -0,0 +1,358 @@
|
|||
package servers
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"log"
|
||||
"net"
|
||||
"os"
|
||||
"reflect"
|
||||
"sort"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/hashicorp/consul/consul/structs"
|
||||
"github.com/hashicorp/consul/lib"
|
||||
"github.com/hashicorp/consul/types"
|
||||
"github.com/hashicorp/serf/coordinate"
|
||||
"github.com/hashicorp/serf/serf"
|
||||
)
|
||||
|
||||
type mockCluster struct {
|
||||
self string
|
||||
members []serf.Member
|
||||
coords map[string]*coordinate.Coordinate
|
||||
addr int
|
||||
}
|
||||
|
||||
func newMockCluster(self string) *mockCluster {
|
||||
return &mockCluster{
|
||||
self: self,
|
||||
coords: make(map[string]*coordinate.Coordinate),
|
||||
addr: 1,
|
||||
}
|
||||
}
|
||||
|
||||
func (m *mockCluster) NumNodes() int {
|
||||
return len(m.members)
|
||||
}
|
||||
|
||||
func (m *mockCluster) Members() []serf.Member {
|
||||
return m.members
|
||||
}
|
||||
|
||||
func (m *mockCluster) GetCoordinate() (*coordinate.Coordinate, error) {
|
||||
return m.coords[m.self], nil
|
||||
}
|
||||
|
||||
func (m *mockCluster) GetCachedCoordinate(name string) (*coordinate.Coordinate, bool) {
|
||||
coord, ok := m.coords[name]
|
||||
return coord, ok
|
||||
}
|
||||
|
||||
func (m *mockCluster) AddMember(dc string, name string, coord *coordinate.Coordinate) {
|
||||
member := serf.Member{
|
||||
Name: fmt.Sprintf("%s.%s", name, dc),
|
||||
Addr: net.ParseIP(fmt.Sprintf("127.0.0.%d", m.addr)),
|
||||
Port: 8300,
|
||||
Tags: map[string]string{
|
||||
"dc": dc,
|
||||
"role": "consul",
|
||||
"port": "8300",
|
||||
"vsn": "3",
|
||||
},
|
||||
}
|
||||
m.members = append(m.members, member)
|
||||
if coord != nil {
|
||||
m.coords[member.Name] = coord
|
||||
}
|
||||
m.addr++
|
||||
}
|
||||
|
||||
// testCluster is used to generate a single WAN-like area with a known set of
|
||||
// member and RTT topology.
|
||||
//
|
||||
// Here's the layout of the nodes:
|
||||
//
|
||||
// /---- dc1 ----\ /- dc2 -\ /- dc0 -\
|
||||
// node2 node1 node3 node1 node0
|
||||
// | | | | | | | | | | |
|
||||
// 0 1 2 3 4 5 6 7 8 9 10 (ms)
|
||||
//
|
||||
// We also include a node4 in dc1 with no known coordinate, as well as a
|
||||
// mysterious dcX with no nodes with known coordinates.
|
||||
func testCluster(self string) *mockCluster {
|
||||
c := newMockCluster(self)
|
||||
c.AddMember("dc0", "node0", lib.GenerateCoordinate(10*time.Millisecond))
|
||||
c.AddMember("dc1", "node1", lib.GenerateCoordinate(3*time.Millisecond))
|
||||
c.AddMember("dc1", "node2", lib.GenerateCoordinate(2*time.Millisecond))
|
||||
c.AddMember("dc1", "node3", lib.GenerateCoordinate(5*time.Millisecond))
|
||||
c.AddMember("dc1", "node4", nil)
|
||||
c.AddMember("dc2", "node1", lib.GenerateCoordinate(8*time.Millisecond))
|
||||
c.AddMember("dcX", "node1", nil)
|
||||
return c
|
||||
}
|
||||
|
||||
func testRouter(dc string) *Router {
|
||||
logger := log.New(os.Stderr, "", log.LstdFlags)
|
||||
shutdownCh := make(chan struct{})
|
||||
return NewRouter(logger, shutdownCh, dc)
|
||||
}
|
||||
|
||||
func TestRouter_Routing(t *testing.T) {
|
||||
r := testRouter("dc0")
|
||||
|
||||
// Create a WAN-looking area.
|
||||
self := "node0.dc0"
|
||||
wan := testCluster(self)
|
||||
if err := r.AddArea(types.AreaWAN, wan, &fauxConnPool{}); err != nil {
|
||||
t.Fatalf("err: %v", err)
|
||||
}
|
||||
|
||||
// Adding the area should enable all the routes right away.
|
||||
if _, _, ok := r.FindRoute("dc0"); !ok {
|
||||
t.Fatalf("bad")
|
||||
}
|
||||
if _, _, ok := r.FindRoute("dc1"); !ok {
|
||||
t.Fatalf("bad")
|
||||
}
|
||||
if _, _, ok := r.FindRoute("dc2"); !ok {
|
||||
t.Fatalf("bad")
|
||||
}
|
||||
if _, _, ok := r.FindRoute("dcX"); !ok {
|
||||
t.Fatalf("bad")
|
||||
}
|
||||
|
||||
// This hasn't been added yet.
|
||||
if _, _, ok := r.FindRoute("dcY"); ok {
|
||||
t.Fatalf("bad")
|
||||
}
|
||||
|
||||
// Add another area.
|
||||
otherID := types.AreaID("other")
|
||||
other := newMockCluster(self)
|
||||
other.AddMember("dc0", "node0", nil)
|
||||
other.AddMember("dc1", "node1", nil)
|
||||
other.AddMember("dcY", "node1", nil)
|
||||
if err := r.AddArea(otherID, other, &fauxConnPool{}); err != nil {
|
||||
t.Fatalf("err: %v", err)
|
||||
}
|
||||
|
||||
// Now we should have a route to every DC.
|
||||
if _, _, ok := r.FindRoute("dc0"); !ok {
|
||||
t.Fatalf("bad")
|
||||
}
|
||||
if _, _, ok := r.FindRoute("dc1"); !ok {
|
||||
t.Fatalf("bad")
|
||||
}
|
||||
if _, _, ok := r.FindRoute("dc2"); !ok {
|
||||
t.Fatalf("bad")
|
||||
}
|
||||
if _, _, ok := r.FindRoute("dcX"); !ok {
|
||||
t.Fatalf("bad")
|
||||
}
|
||||
if _, _, ok := r.FindRoute("dcY"); !ok {
|
||||
t.Fatalf("bad")
|
||||
}
|
||||
|
||||
// Get the route for dcY and then fail the server. This will still
|
||||
// give the server back since we have no other choice.
|
||||
_, s, ok := r.FindRoute("dcY")
|
||||
if !ok {
|
||||
t.Fatalf("bad")
|
||||
}
|
||||
if err := r.FailServer(otherID, s); err != nil {
|
||||
t.Fatalf("err: %v", err)
|
||||
}
|
||||
if _, _, ok := r.FindRoute("dcY"); !ok {
|
||||
t.Fatalf("bad")
|
||||
}
|
||||
|
||||
// But if we remove the server we won't get a route.
|
||||
if err := r.RemoveServer(otherID, s); err != nil {
|
||||
t.Fatalf("err: %v", err)
|
||||
}
|
||||
if _, _, ok := r.FindRoute("dcY"); ok {
|
||||
t.Fatalf("bad")
|
||||
}
|
||||
|
||||
// Make sure the dcY manager also got removed from the area and from
|
||||
// the index we use for routing.
|
||||
func() {
|
||||
r.RLock()
|
||||
defer r.RUnlock()
|
||||
|
||||
area, ok := r.areas[otherID]
|
||||
if !ok {
|
||||
t.Fatalf("bad")
|
||||
}
|
||||
|
||||
if _, ok := area.managers["dcY"]; ok {
|
||||
t.Fatalf("bad")
|
||||
}
|
||||
|
||||
if _, ok := r.managers["dcY"]; ok {
|
||||
t.Fatalf("bad")
|
||||
}
|
||||
}()
|
||||
|
||||
// Do similar for dc0, which will take two removes because the dc0 is
|
||||
// reachable from two different areas.
|
||||
_, s, ok = r.FindRoute("dc0")
|
||||
if !ok {
|
||||
t.Fatalf("bad")
|
||||
}
|
||||
if err := r.RemoveServer(types.AreaWAN, s); err != nil {
|
||||
t.Fatalf("err: %v", err)
|
||||
}
|
||||
if _, _, ok = r.FindRoute("dc0"); !ok {
|
||||
t.Fatalf("bad")
|
||||
}
|
||||
if err := r.RemoveServer(otherID, s); err != nil {
|
||||
t.Fatalf("err: %v", err)
|
||||
}
|
||||
if _, _, ok = r.FindRoute("dc0"); ok {
|
||||
t.Fatalf("bad")
|
||||
}
|
||||
|
||||
// Now delete some areas.
|
||||
if _, _, ok = r.FindRoute("dc1"); !ok {
|
||||
t.Fatalf("bad")
|
||||
}
|
||||
if err := r.RemoveArea(types.AreaWAN); err != nil {
|
||||
t.Fatalf("err: %v", err)
|
||||
}
|
||||
if _, _, ok = r.FindRoute("dc1"); !ok {
|
||||
t.Fatalf("bad")
|
||||
}
|
||||
if err := r.RemoveArea(otherID); err != nil {
|
||||
t.Fatalf("err: %v", err)
|
||||
}
|
||||
if _, _, ok = r.FindRoute("dc1"); ok {
|
||||
t.Fatalf("bad")
|
||||
}
|
||||
}
|
||||
|
||||
func TestRouter_GetDatacenters(t *testing.T) {
|
||||
r := testRouter("dc0")
|
||||
|
||||
self := "node0.dc0"
|
||||
wan := testCluster(self)
|
||||
if err := r.AddArea(types.AreaWAN, wan, &fauxConnPool{}); err != nil {
|
||||
t.Fatalf("err: %v", err)
|
||||
}
|
||||
|
||||
actual := r.GetDatacenters()
|
||||
expected := []string{"dc0", "dc1", "dc2", "dcX"}
|
||||
if !reflect.DeepEqual(actual, expected) {
|
||||
t.Fatalf("bad: %#v", actual)
|
||||
}
|
||||
}
|
||||
|
||||
func TestRouter_distanceSorter(t *testing.T) {
|
||||
actual := &datacenterSorter{
|
||||
Names: []string{"foo", "bar", "baz", "zoo"},
|
||||
Vec: []float64{3.0, 1.0, 1.0, 0.0},
|
||||
}
|
||||
sort.Stable(actual)
|
||||
expected := &datacenterSorter{
|
||||
Names: []string{"zoo", "bar", "baz", "foo"},
|
||||
Vec: []float64{0.0, 1.0, 1.0, 3.0},
|
||||
}
|
||||
if !reflect.DeepEqual(actual, expected) {
|
||||
t.Fatalf("bad: %#v", *expected)
|
||||
}
|
||||
}
|
||||
|
||||
func TestRouter_GetDatacentersByDistance(t *testing.T) {
|
||||
r := testRouter("dc0")
|
||||
|
||||
// Start with just the WAN area described in the diagram above.
|
||||
self := "node0.dc0"
|
||||
wan := testCluster(self)
|
||||
if err := r.AddArea(types.AreaWAN, wan, &fauxConnPool{}); err != nil {
|
||||
t.Fatalf("err: %v", err)
|
||||
}
|
||||
|
||||
actual, err := r.GetDatacentersByDistance()
|
||||
if err != nil {
|
||||
t.Fatalf("err: %v", err)
|
||||
}
|
||||
expected := []string{"dc0", "dc2", "dc1", "dcX"}
|
||||
if !reflect.DeepEqual(actual, expected) {
|
||||
t.Fatalf("bad: %#v", actual)
|
||||
}
|
||||
|
||||
// Now add another area with a closer route for dc1.
|
||||
otherID := types.AreaID("other")
|
||||
other := newMockCluster(self)
|
||||
other.AddMember("dc0", "node0", lib.GenerateCoordinate(20*time.Millisecond))
|
||||
other.AddMember("dc1", "node1", lib.GenerateCoordinate(21*time.Millisecond))
|
||||
if err := r.AddArea(otherID, other, &fauxConnPool{}); err != nil {
|
||||
t.Fatalf("err: %v", err)
|
||||
}
|
||||
|
||||
actual, err = r.GetDatacentersByDistance()
|
||||
if err != nil {
|
||||
t.Fatalf("err: %v", err)
|
||||
}
|
||||
expected = []string{"dc0", "dc1", "dc2", "dcX"}
|
||||
if !reflect.DeepEqual(actual, expected) {
|
||||
t.Fatalf("bad: %#v", actual)
|
||||
}
|
||||
}
|
||||
|
||||
func TestRouter_GetDatacenterMaps(t *testing.T) {
|
||||
r := testRouter("dc0")
|
||||
|
||||
self := "node0.dc0"
|
||||
wan := testCluster(self)
|
||||
if err := r.AddArea(types.AreaWAN, wan, &fauxConnPool{}); err != nil {
|
||||
t.Fatalf("err: %v", err)
|
||||
}
|
||||
|
||||
actual, err := r.GetDatacenterMaps()
|
||||
if err != nil {
|
||||
t.Fatalf("err: %v", err)
|
||||
}
|
||||
if len(actual) != 3 {
|
||||
t.Fatalf("bad: %#v", actual)
|
||||
}
|
||||
for _, entry := range actual {
|
||||
switch entry.Datacenter {
|
||||
case "dc0":
|
||||
if !reflect.DeepEqual(entry, structs.DatacenterMap{
|
||||
Datacenter: "dc0",
|
||||
AreaID: types.AreaWAN,
|
||||
Coordinates: structs.Coordinates{
|
||||
&structs.Coordinate{"node0.dc0", lib.GenerateCoordinate(10 * time.Millisecond)},
|
||||
},
|
||||
}) {
|
||||
t.Fatalf("bad: %#v", entry)
|
||||
}
|
||||
case "dc1":
|
||||
if !reflect.DeepEqual(entry, structs.DatacenterMap{
|
||||
Datacenter: "dc1",
|
||||
AreaID: types.AreaWAN,
|
||||
Coordinates: structs.Coordinates{
|
||||
&structs.Coordinate{"node1.dc1", lib.GenerateCoordinate(3 * time.Millisecond)},
|
||||
&structs.Coordinate{"node2.dc1", lib.GenerateCoordinate(2 * time.Millisecond)},
|
||||
&structs.Coordinate{"node3.dc1", lib.GenerateCoordinate(5 * time.Millisecond)},
|
||||
},
|
||||
}) {
|
||||
t.Fatalf("bad: %#v", entry)
|
||||
}
|
||||
case "dc2":
|
||||
if !reflect.DeepEqual(entry, structs.DatacenterMap{
|
||||
Datacenter: "dc2",
|
||||
AreaID: types.AreaWAN,
|
||||
Coordinates: structs.Coordinates{
|
||||
&structs.Coordinate{"node1.dc2", lib.GenerateCoordinate(8 * time.Millisecond)},
|
||||
},
|
||||
}) {
|
||||
t.Fatalf("bad: %#v", entry)
|
||||
}
|
||||
default:
|
||||
t.Fatalf("bad: %#v", entry)
|
||||
}
|
||||
}
|
||||
}
|
|
@ -0,0 +1,54 @@
|
|||
package lib
|
||||
|
||||
import (
|
||||
"math"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/hashicorp/serf/coordinate"
|
||||
)
|
||||
|
||||
func TestRTT(t *testing.T) {
|
||||
cases := []struct {
|
||||
a *coordinate.Coordinate
|
||||
b *coordinate.Coordinate
|
||||
dist float64
|
||||
}{
|
||||
{
|
||||
GenerateCoordinate(0),
|
||||
GenerateCoordinate(10 * time.Millisecond),
|
||||
0.010,
|
||||
},
|
||||
{
|
||||
GenerateCoordinate(10 * time.Millisecond),
|
||||
GenerateCoordinate(10 * time.Millisecond),
|
||||
0.0,
|
||||
},
|
||||
{
|
||||
GenerateCoordinate(8 * time.Millisecond),
|
||||
GenerateCoordinate(10 * time.Millisecond),
|
||||
0.002,
|
||||
},
|
||||
{
|
||||
GenerateCoordinate(10 * time.Millisecond),
|
||||
GenerateCoordinate(8 * time.Millisecond),
|
||||
0.002,
|
||||
},
|
||||
{
|
||||
nil,
|
||||
GenerateCoordinate(8 * time.Millisecond),
|
||||
math.Inf(1.0),
|
||||
},
|
||||
{
|
||||
GenerateCoordinate(8 * time.Millisecond),
|
||||
nil,
|
||||
math.Inf(1.0),
|
||||
},
|
||||
}
|
||||
for i, c := range cases {
|
||||
dist := ComputeDistance(c.a, c.b)
|
||||
if c.dist != dist {
|
||||
t.Fatalf("bad (%d): %9.6f != %9.6f", i, c.dist, dist)
|
||||
}
|
||||
}
|
||||
}
|
Loading…
Reference in New Issue