Adds sort of DCs in catalog queries based on RTT. Cleans up.

* Makes the catalog endpoint respect disabling coordinates for all
  RTT-sorting query types.
This commit is contained in:
James Phillips 2015-07-02 15:36:59 -07:00
parent b63909cf67
commit 033e8e6625
7 changed files with 427 additions and 24 deletions

View File

@ -107,6 +107,11 @@ func (c *Catalog) ListDatacenters(args *struct{}, reply *[]string) error {
// Sort the DCs
sort.Strings(dcs)
if !c.srv.config.DisableCoordinates {
if err := c.srv.sortDatacentersByDistance(dcs); err != nil {
return err
}
}
// Return
*reply = dcs
@ -132,7 +137,10 @@ func (c *Catalog) ListNodes(args *structs.DCSpecificRequest, reply *structs.Inde
}
reply.Index, reply.Nodes = index, nodes
return c.srv.sortByDistanceFrom(args.Source, reply.Nodes)
if c.srv.config.DisableCoordinates {
return nil
}
return c.srv.sortNodesByDistanceFrom(args.Source, reply.Nodes)
})
}
@ -192,7 +200,10 @@ func (c *Catalog) ServiceNodes(args *structs.ServiceSpecificRequest, reply *stru
if err := c.srv.filterACL(args.Token, reply); err != nil {
return err
}
return c.srv.sortByDistanceFrom(args.Source, reply.ServiceNodes)
if c.srv.config.DisableCoordinates {
return nil
}
return c.srv.sortNodesByDistanceFrom(args.Source, reply.ServiceNodes)
})
// Provide some metrics

View File

@ -4,7 +4,6 @@ import (
"fmt"
"net/rpc"
"os"
"sort"
"strings"
"testing"
"time"
@ -234,9 +233,7 @@ func TestCatalogListDatacenters(t *testing.T) {
t.Fatalf("err: %v", err)
}
// Sort the dcs
sort.Strings(out)
// The DCs should come out sorted by default.
if len(out) != 2 {
t.Fatalf("bad: %v", out)
}
@ -248,6 +245,75 @@ func TestCatalogListDatacenters(t *testing.T) {
}
}
func TestCatalogListDatacenters_DistanceSort(t *testing.T) {
dir1, s1 := testServer(t)
defer os.RemoveAll(dir1)
defer s1.Shutdown()
client := rpcClient(t, s1)
defer client.Close()
dir2, s2 := testServerDC(t, "dc2")
defer os.RemoveAll(dir2)
defer s2.Shutdown()
dir3, s3 := testServerDC(t, "acdc")
defer os.RemoveAll(dir3)
defer s3.Shutdown()
// Try to join
addr := fmt.Sprintf("127.0.0.1:%d",
s1.config.SerfWANConfig.MemberlistConfig.BindPort)
if _, err := s2.JoinWAN([]string{addr}); err != nil {
t.Fatalf("err: %v", err)
}
if _, err := s3.JoinWAN([]string{addr}); err != nil {
t.Fatalf("err: %v", err)
}
testutil.WaitForLeader(t, client.Call, "dc1")
var out []string
if err := client.Call("Catalog.ListDatacenters", struct{}{}, &out); err != nil {
t.Fatalf("err: %v", err)
}
// It's super hard to force the Serfs into a known configuration of
// coordinates, so the best we can do is make sure that the sorting
// function is getting called (it's tested extensively in rtt_test.go).
// Since this is relative to dc1, it will be listed first (proving we
// went into the sort fn) and the other two will be sorted by name since
// there are no known coordinates for them.
if len(out) != 3 {
t.Fatalf("bad: %v", out)
}
if out[0] != "dc1" {
t.Fatalf("bad: %v", out)
}
if out[1] != "acdc" {
t.Fatalf("bad: %v", out)
}
if out[2] != "dc2" {
t.Fatalf("bad: %v", out)
}
// Make sure we get the natural order if coordinates are disabled.
s1.config.DisableCoordinates = true
if err := client.Call("Catalog.ListDatacenters", struct{}{}, &out); err != nil {
t.Fatalf("err: %v", err)
}
if len(out) != 3 {
t.Fatalf("bad: %v", out)
}
if out[0] != "acdc" {
t.Fatalf("bad: %v", out)
}
if out[1] != "dc1" {
t.Fatalf("bad: %v", out)
}
if out[2] != "dc2" {
t.Fatalf("bad: %v", out)
}
}
func TestCatalogListNodes(t *testing.T) {
dir1, s1 := testServer(t)
defer os.RemoveAll(dir1)
@ -543,6 +609,34 @@ func TestCatalogListNodes_DistanceSort(t *testing.T) {
if out.Nodes[4].Node != s1.config.NodeName {
t.Fatalf("bad: %v", out)
}
// Make sure we get the natural order if coordinates are disabled.
s1.config.DisableCoordinates = true
args = structs.DCSpecificRequest{
Datacenter: "dc1",
Source: structs.QuerySource{Datacenter: "dc1", Node: "foo"},
}
testutil.WaitForResult(func() (bool, error) {
client.Call("Catalog.ListNodes", &args, &out)
return len(out.Nodes) == 5, nil
}, func(err error) {
t.Fatalf("err: %v", err)
})
if out.Nodes[0].Node != "aaa" {
t.Fatalf("bad: %v", out)
}
if out.Nodes[1].Node != "bar" {
t.Fatalf("bad: %v", out)
}
if out.Nodes[2].Node != "baz" {
t.Fatalf("bad: %v", out)
}
if out.Nodes[3].Node != "foo" {
t.Fatalf("bad: %v", out)
}
if out.Nodes[4].Node != s1.config.NodeName {
t.Fatalf("bad: %v", out)
}
}
func BenchmarkCatalogListNodes(t *testing.B) {
@ -888,6 +982,32 @@ func TestCatalogListServiceNodes_DistanceSort(t *testing.T) {
if out.ServiceNodes[3].Node != "aaa" {
t.Fatalf("bad: %v", out)
}
// Make sure we get the natural order if coordinates are disabled.
s1.config.DisableCoordinates = true
args = structs.ServiceSpecificRequest{
Datacenter: "dc1",
ServiceName: "db",
Source: structs.QuerySource{Datacenter: "dc1", Node: "foo"},
}
if err := client.Call("Catalog.ServiceNodes", &args, &out); err != nil {
t.Fatalf("err: %v", err)
}
if len(out.ServiceNodes) != 4 {
t.Fatalf("bad: %v", out)
}
if out.ServiceNodes[0].Node != "aaa" {
t.Fatalf("bad: %v", out)
}
if out.ServiceNodes[1].Node != "foo" {
t.Fatalf("bad: %v", out)
}
if out.ServiceNodes[2].Node != "bar" {
t.Fatalf("bad: %v", out)
}
if out.ServiceNodes[3].Node != "baz" {
t.Fatalf("bad: %v", out)
}
}
func TestCatalogNodeServices(t *testing.T) {

View File

@ -296,6 +296,10 @@ func DefaultConfig() *Config {
conf.SerfLANConfig.MemberlistConfig.BindPort = DefaultLANSerfPort
conf.SerfWANConfig.MemberlistConfig.BindPort = DefaultWANSerfPort
// Cache coordinates for the WAN since the number of servers is small,
// and because we don't store these in the database.
conf.SerfWANConfig.CacheCoordinates = true
// Disable shutdown on removal
conf.RaftConfig.ShutdownOnRemove = false

View File

@ -110,9 +110,11 @@ func (s *Server) newSorterByDistanceFrom(c *coordinate.Coordinate, subj interfac
}
}
// sortByDistanceFrom is used to sort results from our service catalog based on the
// distance (RTT) from the given source node.
func (s *Server) sortByDistanceFrom(source structs.QuerySource, subj interface{}) error {
// sortNodesByDistanceFrom is used to sort results from our service catalog based
// on the round trip time from the given source node. Nodes with missing coordinates
// will get stable sorted at the end of the list.
func (s *Server) sortNodesByDistanceFrom(source structs.QuerySource, subj interface{}) error {
// We can't compare coordinates across DCs.
if source.Datacenter != s.config.Datacenter {
return nil
@ -137,3 +139,131 @@ func (s *Server) sortByDistanceFrom(source structs.QuerySource, subj interface{}
sort.Stable(sorter)
return nil
}
// serfer provides the coordinate information we need from the Server in an
// interface that's easy to mock out for testing. Without this, we'd have to
// do some really painful setup to get good unit test coverage of all the cases.
type serfer interface {
GetDatacenter() string
GetCoordinate() (*coordinate.Coordinate, error)
GetCachedCoordinate(node string) (*coordinate.Coordinate, bool)
GetNodesForDatacenter(dc string) []string
}
// serverSerfer wraps a Server with the serfer interface.
type serverSerfer struct {
server *Server
}
// See serfer.
func (s *serverSerfer) GetDatacenter() string {
return s.server.config.Datacenter
}
// See serfer.
func (s *serverSerfer) GetCoordinate() (*coordinate.Coordinate, error) {
return s.server.serfWAN.GetCoordinate()
}
// See serfer.
func (s *serverSerfer) GetCachedCoordinate(node string) (*coordinate.Coordinate, bool) {
return s.server.serfWAN.GetCachedCoordinate(node)
}
// See serfer.
func (s *serverSerfer) GetNodesForDatacenter(dc string) []string {
nodes := make([]string, 0)
for _, part := range s.server.remoteConsuls[dc] {
nodes = append(nodes, part.Name)
}
return nodes
}
// sortDatacentersByDistance will sort the given list of DCs based on the
// median RTT to all nodes we know about from the WAN gossip pool). DCs with
// missing coordinates will be stable sorted to the end of the list.
func (s *Server) sortDatacentersByDistance(dcs []string) error {
serfer := serverSerfer{s}
return sortDatacentersByDistance(&serfer, dcs)
}
// getDatacenterDistance will return the median round trip time estimate for
// the given DC from the given serfer, in seconds. This will return positive
// infinity if no coordinates are available.
func getDatacenterDistance(s serfer, dc string) (float64, error) {
// If this is the serfer's DC then just bail with zero RTT.
if dc == s.GetDatacenter() {
return 0.0, nil
}
// Otherwise measure from the serfer to the nodes in the other DC.
coord, err := s.GetCoordinate()
if err != nil {
return 0.0, err
}
// Fetch all the nodes in the DC.
nodes := s.GetNodesForDatacenter(dc)
subvec := make([]float64, len(nodes))
for j, node := range nodes {
if other, ok := s.GetCachedCoordinate(node); ok {
subvec[j] = computeDistance(coord, other)
} else {
subvec[j] = computeDistance(coord, nil)
}
}
// Compute the median by sorting and taking the middle item.
sort.Float64s(subvec)
fmt.Println("%v", subvec)
if len(subvec) > 0 {
return subvec[len(subvec)/2], nil
}
return computeDistance(coord, nil), nil
}
// datacenterSorter takes a list of DC names and a parallel vector of distances
// and implements sort.Interface, keeping both structures coherent and sorting
// by distance.
type datacenterSorter struct {
Names []string
Vec []float64
}
// See sort.Interface.
func (n *datacenterSorter) Len() int {
return len(n.Names)
}
// See sort.Interface.
func (n *datacenterSorter) Swap(i, j int) {
n.Names[i], n.Names[j] = n.Names[j], n.Names[i]
n.Vec[i], n.Vec[j] = n.Vec[j], n.Vec[i]
}
// See sort.Interface.
func (n *datacenterSorter) Less(i, j int) bool {
return n.Vec[i] < n.Vec[j]
}
// sortDatacentersByDistance will sort the given list of DCs based on the
// median RTT to all nodes the given serfer knows about from the WAN gossip
// pool). DCs with missing coordinates will be stable sorted to the end of the
// list.
func sortDatacentersByDistance(s serfer, dcs []string) error {
// Build up a list of median distances to the other DCs.
vec := make([]float64, len(dcs))
for i, dc := range dcs {
rtt, err := getDatacenterDistance(s, dc)
if err != nil {
return err
}
vec[i] = rtt
}
sorter := &datacenterSorter{dcs, vec}
sort.Stable(sorter)
return nil
}

View File

@ -1,6 +1,7 @@
package consul
import (
"math"
"net/rpc"
"os"
"strings"
@ -96,7 +97,7 @@ func seedCoordinates(t *testing.T, client *rpc.Client, server *Server) {
time.Sleep(2 * server.config.CoordinateUpdatePeriod)
}
func TestRtt_sortByDistanceFrom_Nodes(t *testing.T) {
func TestRtt_sortNodesByDistanceFrom_Nodes(t *testing.T) {
dir, server := testServer(t)
defer os.RemoveAll(dir)
defer server.Shutdown()
@ -117,7 +118,7 @@ func TestRtt_sortByDistanceFrom_Nodes(t *testing.T) {
// The zero value for the source should not trigger any sorting.
var source structs.QuerySource
if err := server.sortByDistanceFrom(source, nodes); err != nil {
if err := server.sortNodesByDistanceFrom(source, nodes); err != nil {
t.Fatalf("err: %v", err)
}
verifyNodeSort(t, nodes, "apple,node1,node2,node3,node4,node5")
@ -125,7 +126,7 @@ func TestRtt_sortByDistanceFrom_Nodes(t *testing.T) {
// Same for a source in some other DC.
source.Node = "node1"
source.Datacenter = "dc2"
if err := server.sortByDistanceFrom(source, nodes); err != nil {
if err := server.sortNodesByDistanceFrom(source, nodes); err != nil {
t.Fatalf("err: %v", err)
}
verifyNodeSort(t, nodes, "apple,node1,node2,node3,node4,node5")
@ -133,7 +134,7 @@ func TestRtt_sortByDistanceFrom_Nodes(t *testing.T) {
// Same for a source node in our DC that we have no coordinate for.
source.Node = "apple"
source.Datacenter = "dc1"
if err := server.sortByDistanceFrom(source, nodes); err != nil {
if err := server.sortNodesByDistanceFrom(source, nodes); err != nil {
t.Fatalf("err: %v", err)
}
verifyNodeSort(t, nodes, "apple,node1,node2,node3,node4,node5")
@ -143,7 +144,7 @@ func TestRtt_sortByDistanceFrom_Nodes(t *testing.T) {
// its lexical hegemony.
source.Node = "node1"
source.Datacenter = "dc1"
if err := server.sortByDistanceFrom(source, nodes); err != nil {
if err := server.sortNodesByDistanceFrom(source, nodes); err != nil {
t.Fatalf("err: %v", err)
}
verifyNodeSort(t, nodes, "node1,node4,node5,node2,node3,apple")
@ -153,7 +154,7 @@ func TestRtt_sortByDistanceFrom_Nodes(t *testing.T) {
// they were in from the previous sort.
source.Node = "node2"
source.Datacenter = "dc1"
if err := server.sortByDistanceFrom(source, nodes); err != nil {
if err := server.sortNodesByDistanceFrom(source, nodes); err != nil {
t.Fatalf("err: %v", err)
}
verifyNodeSort(t, nodes, "node2,node5,node3,node4,node1,apple")
@ -161,13 +162,13 @@ func TestRtt_sortByDistanceFrom_Nodes(t *testing.T) {
// Let's exercise the stable sort explicitly to make sure we didn't
// just get lucky.
nodes[1], nodes[2] = nodes[2], nodes[1]
if err := server.sortByDistanceFrom(source, nodes); err != nil {
if err := server.sortNodesByDistanceFrom(source, nodes); err != nil {
t.Fatalf("err: %v", err)
}
verifyNodeSort(t, nodes, "node2,node3,node5,node4,node1,apple")
}
func TestRtt_sortByDistanceFrom_ServiceNodes(t *testing.T) {
func TestRtt_sortNodesByDistanceFrom_ServiceNodes(t *testing.T) {
dir, server := testServer(t)
defer os.RemoveAll(dir)
defer server.Shutdown()
@ -188,7 +189,7 @@ func TestRtt_sortByDistanceFrom_ServiceNodes(t *testing.T) {
// The zero value for the source should not trigger any sorting.
var source structs.QuerySource
if err := server.sortByDistanceFrom(source, nodes); err != nil {
if err := server.sortNodesByDistanceFrom(source, nodes); err != nil {
t.Fatalf("err: %v", err)
}
verifyServiceNodeSort(t, nodes, "apple,node1,node2,node3,node4,node5")
@ -196,7 +197,7 @@ func TestRtt_sortByDistanceFrom_ServiceNodes(t *testing.T) {
// Same for a source in some other DC.
source.Node = "node1"
source.Datacenter = "dc2"
if err := server.sortByDistanceFrom(source, nodes); err != nil {
if err := server.sortNodesByDistanceFrom(source, nodes); err != nil {
t.Fatalf("err: %v", err)
}
verifyServiceNodeSort(t, nodes, "apple,node1,node2,node3,node4,node5")
@ -204,7 +205,7 @@ func TestRtt_sortByDistanceFrom_ServiceNodes(t *testing.T) {
// Same for a source node in our DC that we have no coordinate for.
source.Node = "apple"
source.Datacenter = "dc1"
if err := server.sortByDistanceFrom(source, nodes); err != nil {
if err := server.sortNodesByDistanceFrom(source, nodes); err != nil {
t.Fatalf("err: %v", err)
}
verifyServiceNodeSort(t, nodes, "apple,node1,node2,node3,node4,node5")
@ -214,7 +215,7 @@ func TestRtt_sortByDistanceFrom_ServiceNodes(t *testing.T) {
// its lexical hegemony.
source.Node = "node1"
source.Datacenter = "dc1"
if err := server.sortByDistanceFrom(source, nodes); err != nil {
if err := server.sortNodesByDistanceFrom(source, nodes); err != nil {
t.Fatalf("err: %v", err)
}
verifyServiceNodeSort(t, nodes, "node1,node4,node5,node2,node3,apple")
@ -224,7 +225,7 @@ func TestRtt_sortByDistanceFrom_ServiceNodes(t *testing.T) {
// they were in from the previous sort.
source.Node = "node2"
source.Datacenter = "dc1"
if err := server.sortByDistanceFrom(source, nodes); err != nil {
if err := server.sortNodesByDistanceFrom(source, nodes); err != nil {
t.Fatalf("err: %v", err)
}
verifyServiceNodeSort(t, nodes, "node2,node5,node3,node4,node1,apple")
@ -232,8 +233,141 @@ func TestRtt_sortByDistanceFrom_ServiceNodes(t *testing.T) {
// Let's exercise the stable sort explicitly to make sure we didn't
// just get lucky.
nodes[1], nodes[2] = nodes[2], nodes[1]
if err := server.sortByDistanceFrom(source, nodes); err != nil {
if err := server.sortNodesByDistanceFrom(source, nodes); err != nil {
t.Fatalf("err: %v", err)
}
verifyServiceNodeSort(t, nodes, "node2,node3,node5,node4,node1,apple")
}
// mockNodeMap is keyed by node name and the values are the coordinates of the
// node.
type mockNodeMap map[string]*coordinate.Coordinate
// mockServer is used to provide a serfer interface for unit tests. The key is
// DC, which selects a map from node name to coordinate for that node.
type mockServer map[string]mockNodeMap
// newMockServer is used to generate a serfer interface that presents a known DC
// topology for unit tests. The server is in dc0.
//
// Here's the layout of the nodes:
//
// /---- dc1 ----\ /- dc2 -\ /- dc0 -\
// node2 node1 node3 node1 node1
// | | | | | | | | | | |
// 0 1 2 3 4 5 6 7 8 9 10 (ms)
//
// We also include a node4 in dc1 with no known coordinate, as well as a
// mysterious dcX with no nodes with known coordinates.
//
func newMockServer() *mockServer {
s := make(mockServer)
s["dc0"] = mockNodeMap{
"dc0.node1": generateCoordinate(10 * time.Millisecond),
}
s["dc1"] = mockNodeMap{
"dc1.node1": generateCoordinate(3 * time.Millisecond),
"dc1.node2": generateCoordinate(2 * time.Millisecond),
"dc1.node3": generateCoordinate(5 * time.Millisecond),
"dc1.node4": nil, // no known coordinate
}
s["dc2"] = mockNodeMap{
"dc2.node1": generateCoordinate(8 * time.Millisecond),
}
s["dcX"] = mockNodeMap{
"dcX.node1": nil, // no known coordinate
}
return &s
}
// See serfer.
func (s *mockServer) GetDatacenter() string {
return "dc0"
}
// See serfer.
func (s *mockServer) GetCoordinate() (*coordinate.Coordinate, error) {
return (*s)["dc0"]["dc0.node1"], nil
}
// See serfer.
func (s *mockServer) GetCachedCoordinate(node string) (*coordinate.Coordinate, bool) {
for _, nodes := range *s {
for n, coord := range nodes {
if n == node && coord != nil {
return coord, true
}
}
}
return nil, false
}
// See serfer.
func (s *mockServer) GetNodesForDatacenter(dc string) []string {
nodes := make([]string, 0)
if n, ok := (*s)[dc]; ok {
for name := range n {
nodes = append(nodes, name)
}
}
return nodes
}
func TestRtt_getDatacenterDistance(t *testing.T) {
s := newMockServer()
// The serfer's own DC is always 0 ms away.
if dist, err := getDatacenterDistance(s, "dc0"); err != nil || dist != 0.0 {
t.Fatalf("bad: %v err: %v", dist, err)
}
// Check a DC with no coordinates, which should give positive infinity.
if dist, err := getDatacenterDistance(s, "dcX"); err != nil || dist != math.Inf(1.0) {
t.Fatalf("bad: %v err: %v", dist, err)
}
// Similar for a totally unknown DC.
if dist, err := getDatacenterDistance(s, "acdc"); err != nil || dist != math.Inf(1.0) {
t.Fatalf("bad: %v err: %v", dist, err)
}
// Check the trivial median case (just one node).
if dist, err := getDatacenterDistance(s, "dc2"); err != nil || dist != 0.002 {
t.Fatalf("bad: %v err: %v", dist, err)
}
// Check the more interesting median case, note that there's a mystery
// node4 in there that should make the distances sort like this:
//
// [0] node3 (0.005), [1] node1 (0.007), [2] node2 (0.008), [3] node4 (+Inf)
//
// So the median should be at index 4 / 2 = 2 -> 0.008.
if dist, err := getDatacenterDistance(s, "dc1"); err != nil || dist != 0.008 {
t.Fatalf("bad: %v err: %v", dist, err)
}
}
func TestRtt_sortDatacentersByDistance(t *testing.T) {
s := newMockServer()
dcs := []string{"acdc", "dc0", "dc1", "dc2", "dcX"}
if err := sortDatacentersByDistance(s, dcs); err != nil {
t.Fatalf("err: %v", err)
}
expected := "dc0,dc2,dc1,acdc,dcX"
if actual := strings.Join(dcs, ","); actual != expected {
t.Fatalf("bad sort: %s != %s", actual, expected)
}
// Make sure the sort is stable and we didn't just get lucky.
dcs = []string{"dcX", "dc0", "dc1", "dc2", "acdc"}
if err := sortDatacentersByDistance(s, dcs); err != nil {
t.Fatalf("err: %v", err)
}
expected = "dc0,dc2,dc1,dcX,acdc"
if actual := strings.Join(dcs, ","); actual != expected {
t.Fatalf("bad sort: %s != %s", actual, expected)
}
}

View File

@ -141,7 +141,7 @@ func isConsulServer(m serf.Member) (bool, *serverParts) {
return true, parts
}
// Returns if a member is a consul node. Returns a boo,
// Returns if a member is a consul node. Returns a bool,
// and the datacenter.
func isConsulNode(m serf.Member) (bool, string) {
if m.Tags["role"] != "node" {

View File

@ -159,6 +159,10 @@ If the API call succeeds a 200 status code is returned.
This endpoint is hit with a GET and is used to return all the
datacenters that are known by the Consul server.
The datacenters will be sorted in ascending order based on the
estimated median round trip time from the server to the servers
in that datacenter.
It returns a JSON body like this:
```javascript