Removes remoteConsuls in favor of the new router.

This has the next wave of RTT integration with the router and also
factors some common RTT-related helpers out to lib. While we were
in here we also got rid of the coordinate disable config so we don't
need to deal with the complexity in the router (there was never a
user-visible way to disable coordinates).
This commit is contained in:
James Phillips 2017-03-13 22:56:24 -07:00
parent 746e735dce
commit 28f8aa5559
No known key found for this signature in database
GPG Key ID: 77183E682AC5FC11
18 changed files with 200 additions and 295 deletions

View File

@ -242,16 +242,6 @@ func TestAgent_Self(t *testing.T) {
t.Fatalf("meta fields are not equal: %v != %v", meta, val.Meta) t.Fatalf("meta fields are not equal: %v != %v", meta, val.Meta)
} }
srv.agent.config.DisableCoordinates = true
obj, err = srv.AgentSelf(nil, req)
if err != nil {
t.Fatalf("err: %v", err)
}
val = obj.(AgentSelf)
if val.Coord != nil {
t.Fatalf("should have been nil: %v", val.Coord)
}
// Make sure there's nothing called "token" that's leaked. // Make sure there's nothing called "token" that's leaked.
raw, err := srv.marshalJSON(req, obj) raw, err := srv.marshalJSON(req, obj)
if err != nil { if err != nil {

View File

@ -77,7 +77,6 @@ func nextConfig() *Config {
cons.RaftConfig.HeartbeatTimeout = 40 * time.Millisecond cons.RaftConfig.HeartbeatTimeout = 40 * time.Millisecond
cons.RaftConfig.ElectionTimeout = 40 * time.Millisecond cons.RaftConfig.ElectionTimeout = 40 * time.Millisecond
cons.DisableCoordinates = false
cons.CoordinateUpdatePeriod = 100 * time.Millisecond cons.CoordinateUpdatePeriod = 100 * time.Millisecond
return conf return conf
} }

View File

@ -31,7 +31,7 @@ Usage: consul rtt [options] node1 [node2]
the datacenter (eg. "myserver.dc1"). the datacenter (eg. "myserver.dc1").
It is not possible to measure between LAN coordinates and WAN coordinates It is not possible to measure between LAN coordinates and WAN coordinates
because they are maintained by independent Serf gossip pools, so they are because they are maintained by independent Serf gossip areas, so they are
not compatible. not compatible.
` + c.Command.Help() ` + c.Command.Help()

View File

@ -149,7 +149,7 @@ func (c *Catalog) Deregister(args *structs.DeregisterRequest, reply *struct{}) e
// ListDatacenters is used to query for the list of known datacenters // ListDatacenters is used to query for the list of known datacenters
func (c *Catalog) ListDatacenters(args *struct{}, reply *[]string) error { func (c *Catalog) ListDatacenters(args *struct{}, reply *[]string) error {
dcs, err := c.srv.getDatacentersByDistance() dcs, err := c.srv.router.GetDatacentersByDistance()
if err != nil { if err != nil {
return err return err
} }

View File

@ -873,9 +873,9 @@ func TestCatalog_ListNodes_DistanceSort(t *testing.T) {
// Set all but one of the nodes to known coordinates. // Set all but one of the nodes to known coordinates.
updates := structs.Coordinates{ updates := structs.Coordinates{
{"foo", generateCoordinate(2 * time.Millisecond)}, {"foo", lib.GenerateCoordinate(2 * time.Millisecond)},
{"bar", generateCoordinate(5 * time.Millisecond)}, {"bar", lib.GenerateCoordinate(5 * time.Millisecond)},
{"baz", generateCoordinate(1 * time.Millisecond)}, {"baz", lib.GenerateCoordinate(1 * time.Millisecond)},
} }
if err := s1.fsm.State().CoordinateBatchUpdate(5, updates); err != nil { if err := s1.fsm.State().CoordinateBatchUpdate(5, updates); err != nil {
t.Fatalf("err: %v", err) t.Fatalf("err: %v", err)
@ -1467,9 +1467,9 @@ func TestCatalog_ListServiceNodes_DistanceSort(t *testing.T) {
// Set all but one of the nodes to known coordinates. // Set all but one of the nodes to known coordinates.
updates := structs.Coordinates{ updates := structs.Coordinates{
{"foo", generateCoordinate(2 * time.Millisecond)}, {"foo", lib.GenerateCoordinate(2 * time.Millisecond)},
{"bar", generateCoordinate(5 * time.Millisecond)}, {"bar", lib.GenerateCoordinate(5 * time.Millisecond)},
{"baz", generateCoordinate(1 * time.Millisecond)}, {"baz", lib.GenerateCoordinate(1 * time.Millisecond)},
} }
if err := s1.fsm.State().CoordinateBatchUpdate(9, updates); err != nil { if err := s1.fsm.State().CoordinateBatchUpdate(9, updates); err != nil {
t.Fatalf("err: %v", err) t.Fatalf("err: %v", err)

View File

@ -157,7 +157,6 @@ func (c *Client) setupSerf(conf *serf.Config, ch chan serf.Event, path string) (
conf.ProtocolVersion = protocolVersionMap[c.config.ProtocolVersion] conf.ProtocolVersion = protocolVersionMap[c.config.ProtocolVersion]
conf.RejoinAfterLeave = c.config.RejoinAfterLeave conf.RejoinAfterLeave = c.config.RejoinAfterLeave
conf.Merge = &lanMergeDelegate{dc: c.config.Datacenter} conf.Merge = &lanMergeDelegate{dc: c.config.Datacenter}
conf.DisableCoordinates = c.config.DisableCoordinates
if err := lib.EnsurePath(conf.SnapshotPath, false); err != nil { if err := lib.EnsurePath(conf.SnapshotPath, false); err != nil {
return nil, err return nil, err
} }

View File

@ -251,9 +251,6 @@ type Config struct {
// user events. This function should not block. // user events. This function should not block.
UserEventHandler func(serf.UserEvent) UserEventHandler func(serf.UserEvent)
// DisableCoordinates controls features related to network coordinates.
DisableCoordinates bool
// CoordinateUpdatePeriod controls how long a server batches coordinate // CoordinateUpdatePeriod controls how long a server batches coordinate
// updates before applying them in a Raft transaction. A larger period // updates before applying them in a Raft transaction. A larger period
// leads to fewer Raft transactions, but also the stored coordinates // leads to fewer Raft transactions, but also the stored coordinates
@ -344,7 +341,6 @@ func DefaultConfig() *Config {
TombstoneTTL: 15 * time.Minute, TombstoneTTL: 15 * time.Minute,
TombstoneTTLGranularity: 30 * time.Second, TombstoneTTLGranularity: 30 * time.Second,
SessionTTLMin: 10 * time.Second, SessionTTLMin: 10 * time.Second,
DisableCoordinates: false,
// These are tuned to provide a total throughput of 128 updates // These are tuned to provide a total throughput of 128 updates
// per second. If you update these, you should update the client- // per second. If you update these, you should update the client-

View File

@ -2,6 +2,7 @@ package consul
import ( import (
"fmt" "fmt"
"strings"
"sync" "sync"
"time" "time"
@ -146,6 +147,15 @@ func (c *Coordinate) ListDatacenters(args *struct{}, reply *[]structs.Datacenter
return err return err
} }
// Strip the datacenter suffixes from all the node names.
for i := range maps {
suffix := fmt.Sprintf(".%s", maps[i].Datacenter)
for j := range maps[i].Coordinates {
node := maps[i].Coordinates[j].Node
maps[i].Coordinates[j].Node = strings.TrimSuffix(node, suffix)
}
}
*reply = maps *reply = maps
return nil return nil
} }

View File

@ -167,8 +167,8 @@ func TestHealth_ChecksInState_DistanceSort(t *testing.T) {
t.Fatalf("err: %v", err) t.Fatalf("err: %v", err)
} }
updates := structs.Coordinates{ updates := structs.Coordinates{
{"foo", generateCoordinate(1 * time.Millisecond)}, {"foo", lib.GenerateCoordinate(1 * time.Millisecond)},
{"bar", generateCoordinate(2 * time.Millisecond)}, {"bar", lib.GenerateCoordinate(2 * time.Millisecond)},
} }
if err := s1.fsm.State().CoordinateBatchUpdate(3, updates); err != nil { if err := s1.fsm.State().CoordinateBatchUpdate(3, updates); err != nil {
t.Fatalf("err: %v", err) t.Fatalf("err: %v", err)
@ -436,8 +436,8 @@ func TestHealth_ServiceChecks_DistanceSort(t *testing.T) {
t.Fatalf("err: %v", err) t.Fatalf("err: %v", err)
} }
updates := structs.Coordinates{ updates := structs.Coordinates{
{"foo", generateCoordinate(1 * time.Millisecond)}, {"foo", lib.GenerateCoordinate(1 * time.Millisecond)},
{"bar", generateCoordinate(2 * time.Millisecond)}, {"bar", lib.GenerateCoordinate(2 * time.Millisecond)},
} }
if err := s1.fsm.State().CoordinateBatchUpdate(3, updates); err != nil { if err := s1.fsm.State().CoordinateBatchUpdate(3, updates); err != nil {
t.Fatalf("err: %v", err) t.Fatalf("err: %v", err)
@ -737,8 +737,8 @@ func TestHealth_ServiceNodes_DistanceSort(t *testing.T) {
t.Fatalf("err: %v", err) t.Fatalf("err: %v", err)
} }
updates := structs.Coordinates{ updates := structs.Coordinates{
{"foo", generateCoordinate(1 * time.Millisecond)}, {"foo", lib.GenerateCoordinate(1 * time.Millisecond)},
{"bar", generateCoordinate(2 * time.Millisecond)}, {"bar", lib.GenerateCoordinate(2 * time.Millisecond)},
} }
if err := s1.fsm.State().CoordinateBatchUpdate(3, updates); err != nil { if err := s1.fsm.State().CoordinateBatchUpdate(3, updates); err != nil {
t.Fatalf("err: %v", err) t.Fatalf("err: %v", err)

View File

@ -600,7 +600,9 @@ func (q *queryServerWrapper) GetLogger() *log.Logger {
// GetOtherDatacentersByDistance calls into the server's fn and filters out the // GetOtherDatacentersByDistance calls into the server's fn and filters out the
// server's own DC. // server's own DC.
func (q *queryServerWrapper) GetOtherDatacentersByDistance() ([]string, error) { func (q *queryServerWrapper) GetOtherDatacentersByDistance() ([]string, error) {
dcs, err := q.srv.getDatacentersByDistance() // TODO (slackpad) - We should cache this result since it's expensive to
// compute.
dcs, err := q.srv.router.GetDatacentersByDistance()
if err != nil { if err != nil {
return nil, err return nil, err
} }

View File

@ -2,24 +2,13 @@ package consul
import ( import (
"fmt" "fmt"
"math"
"sort" "sort"
"github.com/hashicorp/consul/consul/structs" "github.com/hashicorp/consul/consul/structs"
"github.com/hashicorp/consul/lib"
"github.com/hashicorp/serf/coordinate" "github.com/hashicorp/serf/coordinate"
) )
// computeDistance returns the distance between the two network coordinates in
// seconds. If either of the coordinates is nil then this will return positive
// infinity.
func computeDistance(a *coordinate.Coordinate, b *coordinate.Coordinate) float64 {
if a == nil || b == nil {
return math.Inf(1.0)
}
return a.DistanceTo(b).Seconds()
}
// nodeSorter takes a list of nodes and a parallel vector of distances and // nodeSorter takes a list of nodes and a parallel vector of distances and
// implements sort.Interface, keeping both structures coherent and sorting by // implements sort.Interface, keeping both structures coherent and sorting by
// distance. // distance.
@ -38,7 +27,7 @@ func (s *Server) newNodeSorter(c *coordinate.Coordinate, nodes structs.Nodes) (s
if err != nil { if err != nil {
return nil, err return nil, err
} }
vec[i] = computeDistance(c, coord) vec[i] = lib.ComputeDistance(c, coord)
} }
return &nodeSorter{nodes, vec}, nil return &nodeSorter{nodes, vec}, nil
} }
@ -77,7 +66,7 @@ func (s *Server) newServiceNodeSorter(c *coordinate.Coordinate, nodes structs.Se
if err != nil { if err != nil {
return nil, err return nil, err
} }
vec[i] = computeDistance(c, coord) vec[i] = lib.ComputeDistance(c, coord)
} }
return &serviceNodeSorter{nodes, vec}, nil return &serviceNodeSorter{nodes, vec}, nil
} }
@ -116,7 +105,7 @@ func (s *Server) newHealthCheckSorter(c *coordinate.Coordinate, checks structs.H
if err != nil { if err != nil {
return nil, err return nil, err
} }
vec[i] = computeDistance(c, coord) vec[i] = lib.ComputeDistance(c, coord)
} }
return &healthCheckSorter{checks, vec}, nil return &healthCheckSorter{checks, vec}, nil
} }
@ -155,7 +144,7 @@ func (s *Server) newCheckServiceNodeSorter(c *coordinate.Coordinate, nodes struc
if err != nil { if err != nil {
return nil, err return nil, err
} }
vec[i] = computeDistance(c, coord) vec[i] = lib.ComputeDistance(c, coord)
} }
return &checkServiceNodeSorter{nodes, vec}, nil return &checkServiceNodeSorter{nodes, vec}, nil
} }
@ -198,12 +187,6 @@ func (s *Server) newSorterByDistanceFrom(c *coordinate.Coordinate, subj interfac
// //
// If coordinates are disabled this will be a no-op. // If coordinates are disabled this will be a no-op.
func (s *Server) sortNodesByDistanceFrom(source structs.QuerySource, subj interface{}) error { func (s *Server) sortNodesByDistanceFrom(source structs.QuerySource, subj interface{}) error {
// Make it safe to call this without having to check if coordinates are
// disabled first.
if s.config.DisableCoordinates {
return nil
}
// We can't sort if there's no source node. // We can't sort if there's no source node.
if source.Node == "" { if source.Node == "" {
return nil return nil
@ -233,179 +216,3 @@ func (s *Server) sortNodesByDistanceFrom(source structs.QuerySource, subj interf
sort.Stable(sorter) sort.Stable(sorter)
return nil return nil
} }
// serfer provides the coordinate information we need from the Server in an
// interface that's easy to mock out for testing. Without this, we'd have to
// do some really painful setup to get good unit test coverage of all the cases.
type serfer interface {
GetDatacenter() string
GetCoordinate() (*coordinate.Coordinate, error)
GetCachedCoordinate(node string) (*coordinate.Coordinate, bool)
GetNodesForDatacenter(dc string) []string
}
// serverSerfer wraps a Server with the serfer interface.
type serverSerfer struct {
server *Server
}
// See serfer.
func (s *serverSerfer) GetDatacenter() string {
return s.server.config.Datacenter
}
// See serfer.
func (s *serverSerfer) GetCoordinate() (*coordinate.Coordinate, error) {
return s.server.serfWAN.GetCoordinate()
}
// See serfer.
func (s *serverSerfer) GetCachedCoordinate(node string) (*coordinate.Coordinate, bool) {
return s.server.serfWAN.GetCachedCoordinate(node)
}
// See serfer.
func (s *serverSerfer) GetNodesForDatacenter(dc string) []string {
s.server.remoteLock.RLock()
defer s.server.remoteLock.RUnlock()
nodes := make([]string, 0)
for _, part := range s.server.remoteConsuls[dc] {
nodes = append(nodes, part.Name)
}
return nodes
}
// getDatacenterDistance will return the median round trip time estimate for
// the given DC from the given serfer, in seconds. This will return positive
// infinity if no coordinates are available.
func getDatacenterDistance(s serfer, dc string) (float64, error) {
// If this is the serfer's DC then just bail with zero RTT.
if dc == s.GetDatacenter() {
return 0.0, nil
}
// Otherwise measure from the serfer to the nodes in the other DC.
coord, err := s.GetCoordinate()
if err != nil {
return 0.0, err
}
// Fetch all the nodes in the DC and record their distance, if available.
nodes := s.GetNodesForDatacenter(dc)
subvec := make([]float64, 0, len(nodes))
for _, node := range nodes {
if other, ok := s.GetCachedCoordinate(node); ok {
subvec = append(subvec, computeDistance(coord, other))
}
}
// Compute the median by sorting and taking the middle item.
if len(subvec) > 0 {
sort.Float64s(subvec)
return subvec[len(subvec)/2], nil
}
// Return the default infinity value.
return computeDistance(coord, nil), nil
}
// datacenterSorter takes a list of DC names and a parallel vector of distances
// and implements sort.Interface, keeping both structures coherent and sorting
// by distance.
type datacenterSorter struct {
Names []string
Vec []float64
}
// See sort.Interface.
func (n *datacenterSorter) Len() int {
return len(n.Names)
}
// See sort.Interface.
func (n *datacenterSorter) Swap(i, j int) {
n.Names[i], n.Names[j] = n.Names[j], n.Names[i]
n.Vec[i], n.Vec[j] = n.Vec[j], n.Vec[i]
}
// See sort.Interface.
func (n *datacenterSorter) Less(i, j int) bool {
return n.Vec[i] < n.Vec[j]
}
// sortDatacentersByDistance will sort the given list of DCs based on the
// median RTT to all nodes the given serfer knows about from the WAN gossip
// pool). DCs with missing coordinates will be stable sorted to the end of the
// list.
func sortDatacentersByDistance(s serfer, dcs []string) error {
// Build up a list of median distances to the other DCs.
vec := make([]float64, len(dcs))
for i, dc := range dcs {
rtt, err := getDatacenterDistance(s, dc)
if err != nil {
return err
}
vec[i] = rtt
}
sorter := &datacenterSorter{dcs, vec}
sort.Stable(sorter)
return nil
}
// getDatacenterMaps returns the raw coordinates of all the nodes in the
// given list of DCs (the output list will preserve the incoming order).
func (s *Server) getDatacenterMaps(dcs []string) []structs.DatacenterMap {
serfer := serverSerfer{s}
return getDatacenterMaps(&serfer, dcs)
}
// getDatacenterMaps returns the raw coordinates of all the nodes in the
// given list of DCs (the output list will preserve the incoming order).
func getDatacenterMaps(s serfer, dcs []string) []structs.DatacenterMap {
maps := make([]structs.DatacenterMap, 0, len(dcs))
for _, dc := range dcs {
m := structs.DatacenterMap{Datacenter: dc}
nodes := s.GetNodesForDatacenter(dc)
for _, node := range nodes {
if coord, ok := s.GetCachedCoordinate(node); ok {
entry := &structs.Coordinate{Node: node, Coord: coord}
m.Coordinates = append(m.Coordinates, entry)
}
}
maps = append(maps, m)
}
return maps
}
// getDatacentersByDistance will return the list of DCs, sorted in order
// of increasing distance based on the median distance to that DC from all
// servers we know about in the WAN gossip pool. This will sort by name all
// other things being equal (or if coordinates are disabled).
func (s *Server) getDatacentersByDistance() ([]string, error) {
s.remoteLock.RLock()
dcs := make([]string, 0, len(s.remoteConsuls))
for dc := range s.remoteConsuls {
dcs = append(dcs, dc)
}
s.remoteLock.RUnlock()
// Sort by name first, since the coordinate sort is stable.
sort.Strings(dcs)
// Make it safe to call this without having to check if coordinates are
// disabled first.
if s.config.DisableCoordinates {
return dcs, nil
}
// Do the sort!
serfer := serverSerfer{s}
if err := sortDatacentersByDistance(&serfer, dcs); err != nil {
return nil, err
}
return dcs, nil
}

View File

@ -2,29 +2,18 @@ package consul
import ( import (
"fmt" "fmt"
"math"
"net/rpc" "net/rpc"
"os" "os"
"sort"
"strings" "strings"
"testing" "testing"
"time" "time"
"github.com/hashicorp/consul/consul/structs" "github.com/hashicorp/consul/consul/structs"
"github.com/hashicorp/consul/lib"
"github.com/hashicorp/consul/testutil" "github.com/hashicorp/consul/testutil"
"github.com/hashicorp/net-rpc-msgpackrpc" "github.com/hashicorp/net-rpc-msgpackrpc"
"github.com/hashicorp/serf/coordinate"
) )
// generateCoordinate creates a new coordinate with the given distance from the
// origin.
func generateCoordinate(rtt time.Duration) *coordinate.Coordinate {
coord := coordinate.NewCoordinate(coordinate.DefaultConfig())
coord.Vec[0] = rtt.Seconds()
coord.Height = 0
return coord
}
// verifyNodeSort makes sure the order of the nodes in the slice is the same as // verifyNodeSort makes sure the order of the nodes in the slice is the same as
// the expected order, expressed as a comma-separated string. // the expected order, expressed as a comma-separated string.
func verifyNodeSort(t *testing.T, nodes structs.Nodes, expected string) { func verifyNodeSort(t *testing.T, nodes structs.Nodes, expected string) {
@ -106,27 +95,27 @@ func seedCoordinates(t *testing.T, codec rpc.ClientCodec, server *Server) {
structs.CoordinateUpdateRequest{ structs.CoordinateUpdateRequest{
Datacenter: "dc1", Datacenter: "dc1",
Node: "node1", Node: "node1",
Coord: generateCoordinate(10 * time.Millisecond), Coord: lib.GenerateCoordinate(10 * time.Millisecond),
}, },
structs.CoordinateUpdateRequest{ structs.CoordinateUpdateRequest{
Datacenter: "dc1", Datacenter: "dc1",
Node: "node2", Node: "node2",
Coord: generateCoordinate(2 * time.Millisecond), Coord: lib.GenerateCoordinate(2 * time.Millisecond),
}, },
structs.CoordinateUpdateRequest{ structs.CoordinateUpdateRequest{
Datacenter: "dc1", Datacenter: "dc1",
Node: "node3", Node: "node3",
Coord: generateCoordinate(1 * time.Millisecond), Coord: lib.GenerateCoordinate(1 * time.Millisecond),
}, },
structs.CoordinateUpdateRequest{ structs.CoordinateUpdateRequest{
Datacenter: "dc1", Datacenter: "dc1",
Node: "node4", Node: "node4",
Coord: generateCoordinate(8 * time.Millisecond), Coord: lib.GenerateCoordinate(8 * time.Millisecond),
}, },
structs.CoordinateUpdateRequest{ structs.CoordinateUpdateRequest{
Datacenter: "dc1", Datacenter: "dc1",
Node: "node5", Node: "node5",
Coord: generateCoordinate(3 * time.Millisecond), Coord: lib.GenerateCoordinate(3 * time.Millisecond),
}, },
} }
@ -183,19 +172,10 @@ func TestRTT_sortNodesByDistanceFrom(t *testing.T) {
} }
verifyNodeSort(t, nodes, "apple,node1,node2,node3,node4,node5") verifyNodeSort(t, nodes, "apple,node1,node2,node3,node4,node5")
// Set source to legit values relative to node1 but disable coordinates. // Now sort relative to node1, note that apple doesn't have any seeded
// coordinate info so it should end up at the end, despite its lexical
// hegemony.
source.Node = "node1" source.Node = "node1"
source.Datacenter = "dc1"
server.config.DisableCoordinates = true
if err := server.sortNodesByDistanceFrom(source, nodes); err != nil {
t.Fatalf("err: %v", err)
}
verifyNodeSort(t, nodes, "apple,node1,node2,node3,node4,node5")
// Now enable coordinates and sort relative to node1, note that apple
// doesn't have any seeded coordinate info so it should end up at the
// end, despite its lexical hegemony.
server.config.DisableCoordinates = false
if err := server.sortNodesByDistanceFrom(source, nodes); err != nil { if err := server.sortNodesByDistanceFrom(source, nodes); err != nil {
t.Fatalf("err: %v", err) t.Fatalf("err: %v", err)
} }
@ -398,6 +378,8 @@ func TestRTT_sortNodesByDistanceFrom_CheckServiceNodes(t *testing.T) {
verifyCheckServiceNodeSort(t, nodes, "node2,node3,node5,node4,node1,apple") verifyCheckServiceNodeSort(t, nodes, "node2,node3,node5,node4,node1,apple")
} }
/*
// mockNodeMap is keyed by node name and the values are the coordinates of the // mockNodeMap is keyed by node name and the values are the coordinates of the
// node. // node.
type mockNodeMap map[string]*coordinate.Coordinate type mockNodeMap map[string]*coordinate.Coordinate
@ -422,16 +404,16 @@ type mockServer map[string]mockNodeMap
func newMockServer() *mockServer { func newMockServer() *mockServer {
s := make(mockServer) s := make(mockServer)
s["dc0"] = mockNodeMap{ s["dc0"] = mockNodeMap{
"dc0.node1": generateCoordinate(10 * time.Millisecond), "dc0.node1": lib.GenerateCoordinate(10 * time.Millisecond),
} }
s["dc1"] = mockNodeMap{ s["dc1"] = mockNodeMap{
"dc1.node1": generateCoordinate(3 * time.Millisecond), "dc1.node1": lib.GenerateCoordinate(3 * time.Millisecond),
"dc1.node2": generateCoordinate(2 * time.Millisecond), "dc1.node2": lib.GenerateCoordinate(2 * time.Millisecond),
"dc1.node3": generateCoordinate(5 * time.Millisecond), "dc1.node3": lib.GenerateCoordinate(5 * time.Millisecond),
"dc1.node4": nil, // no known coordinate "dc1.node4": nil, // no known coordinate
} }
s["dc2"] = mockNodeMap{ s["dc2"] = mockNodeMap{
"dc2.node1": generateCoordinate(8 * time.Millisecond), "dc2.node1": lib.GenerateCoordinate(8 * time.Millisecond),
} }
s["dcX"] = mockNodeMap{ s["dcX"] = mockNodeMap{
"dcX.node1": nil, // no known coordinate "dcX.node1": nil, // no known coordinate
@ -548,7 +530,7 @@ func TestRTT_getDatacenterMaps(t *testing.T) {
t.Fatalf("bad: %v", maps[0]) t.Fatalf("bad: %v", maps[0])
} }
verifyCoordinatesEqual(t, maps[0].Coordinates[0].Coord, verifyCoordinatesEqual(t, maps[0].Coordinates[0].Coord,
generateCoordinate(10*time.Millisecond)) lib.GenerateCoordinate(10*time.Millisecond))
if maps[1].Datacenter != "acdc" || len(maps[1].Coordinates) != 0 { if maps[1].Datacenter != "acdc" || len(maps[1].Coordinates) != 0 {
t.Fatalf("bad: %v", maps[1]) t.Fatalf("bad: %v", maps[1])
@ -561,18 +543,18 @@ func TestRTT_getDatacenterMaps(t *testing.T) {
t.Fatalf("bad: %v", maps[2]) t.Fatalf("bad: %v", maps[2])
} }
verifyCoordinatesEqual(t, maps[2].Coordinates[0].Coord, verifyCoordinatesEqual(t, maps[2].Coordinates[0].Coord,
generateCoordinate(3*time.Millisecond)) lib.GenerateCoordinate(3*time.Millisecond))
verifyCoordinatesEqual(t, maps[2].Coordinates[1].Coord, verifyCoordinatesEqual(t, maps[2].Coordinates[1].Coord,
generateCoordinate(2*time.Millisecond)) lib.GenerateCoordinate(2*time.Millisecond))
verifyCoordinatesEqual(t, maps[2].Coordinates[2].Coord, verifyCoordinatesEqual(t, maps[2].Coordinates[2].Coord,
generateCoordinate(5*time.Millisecond)) lib.GenerateCoordinate(5*time.Millisecond))
if maps[3].Datacenter != "dc2" || len(maps[3].Coordinates) != 1 || if maps[3].Datacenter != "dc2" || len(maps[3].Coordinates) != 1 ||
maps[3].Coordinates[0].Node != "dc2.node1" { maps[3].Coordinates[0].Node != "dc2.node1" {
t.Fatalf("bad: %v", maps[3]) t.Fatalf("bad: %v", maps[3])
} }
verifyCoordinatesEqual(t, maps[3].Coordinates[0].Coord, verifyCoordinatesEqual(t, maps[3].Coordinates[0].Coord,
generateCoordinate(8*time.Millisecond)) lib.GenerateCoordinate(8*time.Millisecond))
if maps[4].Datacenter != "dcX" || len(maps[4].Coordinates) != 0 { if maps[4].Datacenter != "dcX" || len(maps[4].Coordinates) != 0 {
t.Fatalf("bad: %v", maps[4]) t.Fatalf("bad: %v", maps[4])
@ -646,3 +628,5 @@ func TestRTT_getDatacentersByDistance(t *testing.T) {
t.Fatalf("bad: %v", dcs) t.Fatalf("bad: %v", dcs)
} }
} }
*/

View File

@ -137,11 +137,6 @@ type Server struct {
// updated // updated
reconcileCh chan serf.Member reconcileCh chan serf.Member
// remoteConsuls is used to track the known consuls in
// remote datacenters. Used to do DC forwarding.
remoteConsuls map[string][]*agent.Server
remoteLock sync.RWMutex
// router is used to map out Consul servers in the WAN and in Consul // router is used to map out Consul servers in the WAN and in Consul
// Enterprise user-defined areas. // Enterprise user-defined areas.
router *servers.Router router *servers.Router
@ -256,8 +251,7 @@ func NewServer(config *Config) (*Server, error) {
localConsuls: make(map[raft.ServerAddress]*agent.Server), localConsuls: make(map[raft.ServerAddress]*agent.Server),
logger: logger, logger: logger,
reconcileCh: make(chan serf.Member, 32), reconcileCh: make(chan serf.Member, 32),
remoteConsuls: make(map[string][]*agent.Server, 4), router: servers.NewRouter(logger, shutdownCh, config.Datacenter),
router: servers.NewRouter(logger, shutdownCh),
rpcServer: rpc.NewServer(), rpcServer: rpc.NewServer(),
rpcTLS: incomingTLS, rpcTLS: incomingTLS,
tombstoneGC: gc, tombstoneGC: gc,
@ -386,9 +380,6 @@ func (s *Server) setupSerf(conf *serf.Config, ch chan serf.Event, path string, w
return nil, err return nil, err
} }
// Plumb down the enable coordinates flag.
conf.DisableCoordinates = s.config.DisableCoordinates
return serf.Create(conf) return serf.Create(conf)
} }

View File

@ -73,7 +73,6 @@ func testServerConfig(t *testing.T, NodeName string) (string, *Config) {
config.ReconcileInterval = 100 * time.Millisecond config.ReconcileInterval = 100 * time.Millisecond
config.DisableCoordinates = false
config.CoordinateUpdatePeriod = 100 * time.Millisecond config.CoordinateUpdatePeriod = 100 * time.Millisecond
return dir, config return dir, config
} }
@ -214,13 +213,13 @@ func TestServer_JoinWAN(t *testing.T) {
t.Fatalf("bad len") t.Fatalf("bad len")
}) })
// Check the remoteConsuls has both // Check the router has both
if len(s1.remoteConsuls) != 2 { if len(s1.router.GetDatacenters()) != 2 {
t.Fatalf("remote consul missing") t.Fatalf("remote consul missing")
} }
testutil.WaitForResult(func() (bool, error) { testutil.WaitForResult(func() (bool, error) {
return len(s2.remoteConsuls) == 2, nil return len(s2.router.GetDatacenters()) == 2, nil
}, func(err error) { }, func(err error) {
t.Fatalf("remote consul missing") t.Fatalf("remote consul missing")
}) })
@ -289,12 +288,12 @@ func TestServer_JoinSeparateLanAndWanAddresses(t *testing.T) {
t.Fatalf("bad len") t.Fatalf("bad len")
}) })
// Check the remoteConsuls has both // Check the router has both
if len(s1.remoteConsuls) != 2 { if len(s1.router.GetDatacenters()) != 2 {
t.Fatalf("remote consul missing") t.Fatalf("remote consul missing")
} }
if len(s2.remoteConsuls) != 2 { if len(s2.router.GetDatacenters()) != 2 {
t.Fatalf("remote consul missing") t.Fatalf("remote consul missing")
} }
@ -693,9 +692,9 @@ func TestServer_globalRPCErrors(t *testing.T) {
defer s1.Shutdown() defer s1.Shutdown()
testutil.WaitForResult(func() (bool, error) { testutil.WaitForResult(func() (bool, error) {
return len(s1.remoteConsuls) == 1, nil return len(s1.router.GetDatacenters()) == 1, nil
}, func(err error) { }, func(err error) {
t.Fatalf("Server did not join LAN successfully") t.Fatalf("Server did not join WAN successfully")
}) })
// Check that an error from a remote DC is returned // Check that an error from a remote DC is returned

View File

@ -3,10 +3,12 @@ package servers
import ( import (
"fmt" "fmt"
"log" "log"
"sort"
"sync" "sync"
"github.com/hashicorp/consul/consul/agent" "github.com/hashicorp/consul/consul/agent"
"github.com/hashicorp/consul/consul/structs" "github.com/hashicorp/consul/consul/structs"
"github.com/hashicorp/consul/lib"
"github.com/hashicorp/consul/types" "github.com/hashicorp/consul/types"
"github.com/hashicorp/serf/coordinate" "github.com/hashicorp/serf/coordinate"
"github.com/hashicorp/serf/serf" "github.com/hashicorp/serf/serf"
@ -15,8 +17,9 @@ import (
type Router struct { type Router struct {
logger *log.Logger logger *log.Logger
areas map[types.AreaID]*areaInfo localDatacenter string
managers map[string][]*Manager areas map[types.AreaID]*areaInfo
managers map[string][]*Manager
// This top-level lock covers all the internal state. // This top-level lock covers all the internal state.
sync.RWMutex sync.RWMutex
@ -42,11 +45,12 @@ type areaInfo struct {
managers map[string]*managerInfo managers map[string]*managerInfo
} }
func NewRouter(logger *log.Logger, shutdownCh chan struct{}) *Router { func NewRouter(logger *log.Logger, shutdownCh chan struct{}, localDatacenter string) *Router {
router := &Router{ router := &Router{
logger: logger, logger: logger,
areas: make(map[types.AreaID]*areaInfo), localDatacenter: localDatacenter,
managers: make(map[string][]*Manager), areas: make(map[types.AreaID]*areaInfo),
managers: make(map[string][]*Manager),
} }
// This will propagate a top-level shutdown to all the managers. // This will propagate a top-level shutdown to all the managers.
@ -203,9 +207,105 @@ func (r *Router) GetDatacenters() []string {
for dc, _ := range r.managers { for dc, _ := range r.managers {
dcs = append(dcs, dc) dcs = append(dcs, dc)
} }
sort.Strings(dcs)
return dcs return dcs
} }
// datacenterSorter takes a list of DC names and a parallel vector of distances
// and implements sort.Interface, keeping both structures coherent and sorting
// by distance.
type datacenterSorter struct {
Names []string
Vec []float64
}
// See sort.Interface.
func (n *datacenterSorter) Len() int {
return len(n.Names)
}
// See sort.Interface.
func (n *datacenterSorter) Swap(i, j int) {
n.Names[i], n.Names[j] = n.Names[j], n.Names[i]
n.Vec[i], n.Vec[j] = n.Vec[j], n.Vec[i]
}
// See sort.Interface.
func (n *datacenterSorter) Less(i, j int) bool {
return n.Vec[i] < n.Vec[j]
}
func (r *Router) GetDatacentersByDistance() ([]string, error) {
r.RLock()
defer r.RUnlock()
// Calculate a median RTT to the servers in each datacenter, by area.
dcs := make(map[string]float64)
for areaID, info := range r.areas {
index := make(map[string][]float64)
coord, err := info.cluster.GetCoordinate()
if err != nil {
return nil, err
}
for _, m := range info.cluster.Members() {
ok, parts := agent.IsConsulServer(m)
if !ok {
r.logger.Printf("[WARN]: consul: Non-server %q in server-only area %q",
m.Name, areaID)
continue
}
existing := index[parts.Datacenter]
if parts.Datacenter == r.localDatacenter {
// Everything in the local datacenter looks like zero RTT.
index[parts.Datacenter] = append(existing, 0.0)
} else {
// It's OK to get a nil coordinate back, ComputeDistance
// will put the RTT at positive infinity.
other, _ := info.cluster.GetCachedCoordinate(parts.Name)
rtt := lib.ComputeDistance(coord, other)
index[parts.Datacenter] = append(existing, rtt)
}
}
// Compute the median RTT between this server and the servers
// in each datacenter. We accumulate the lowest RTT to each DC
// in the master map, since a given DC might appear in multiple
// areas.
for dc, rtts := range index {
var rtt float64
if len(rtts) > 0 {
sort.Float64s(rtts)
rtt = rtts[len(rtts)/2]
} else {
rtt = lib.ComputeDistance(coord, nil)
}
current, ok := dcs[dc]
if !ok || (ok && rtt < current) {
dcs[dc] = rtt
}
}
}
// First sort by DC name, since we do a stable sort later.
names := make([]string, 0, len(dcs))
for dc, _ := range dcs {
names = append(names, dc)
}
sort.Strings(names)
// Then stable sort by median RTT.
vec := make([]float64, 0, len(dcs))
for _, dc := range names {
vec = append(vec, dcs[dc])
}
sort.Stable(&datacenterSorter{names, vec})
return names, nil
}
func (r *Router) GetDatacenterMaps() ([]structs.DatacenterMap, error) { func (r *Router) GetDatacenterMaps() ([]structs.DatacenterMap, error) {
r.RLock() r.RLock()
defer r.RUnlock() defer r.RUnlock()

28
lib/rtt.go Normal file
View File

@ -0,0 +1,28 @@
package lib
import (
"math"
"time"
"github.com/hashicorp/serf/coordinate"
)
// ComputeDistance returns the distance between the two network coordinates in
// seconds. If either of the coordinates is nil then this will return positive
// infinity.
func ComputeDistance(a *coordinate.Coordinate, b *coordinate.Coordinate) float64 {
if a == nil || b == nil {
return math.Inf(1.0)
}
return a.DistanceTo(b).Seconds()
}
// GenerateCoordinate creates a new coordinate with the given distance from the
// origin. This should only be used for tests.
func GenerateCoordinate(rtt time.Duration) *coordinate.Coordinate {
coord := coordinate.NewCoordinate(coordinate.DefaultConfig())
coord.Vec[0] = rtt.Seconds()
coord.Height = 0
return coord
}

View File

@ -6,4 +6,4 @@ type AreaID string
// This represents the existing WAN area that's built in to Consul. Consul // This represents the existing WAN area that's built in to Consul. Consul
// Enterprise generalizes areas, which are represented with UUIDs. // Enterprise generalizes areas, which are represented with UUIDs.
const AreaWAN AreaID = "WAN" const AreaWAN AreaID = "wan"

View File

@ -35,7 +35,7 @@ Consul as the `consul members` command would show, not IP addresses.
datacenter and the LAN coordinates are used. If the -wan option is given, datacenter and the LAN coordinates are used. If the -wan option is given,
then the WAN coordinates are used, and the node names must be suffixed by a period then the WAN coordinates are used, and the node names must be suffixed by a period
and the datacenter (eg. "myserver.dc1"). It is not possible to measure between and the datacenter (eg. "myserver.dc1"). It is not possible to measure between
LAN coordinates and WAN coordinates, so both nodes must be in the same pool. LAN coordinates and WAN coordinates, so both nodes must be in the same area.
The following environment variables control accessing the HTTP server via SSL: The following environment variables control accessing the HTTP server via SSL: