diff --git a/consul/catalog_endpoint.go b/consul/catalog_endpoint.go index 4e7bab650..85092cfe5 100644 --- a/consul/catalog_endpoint.go +++ b/consul/catalog_endpoint.go @@ -107,6 +107,11 @@ func (c *Catalog) ListDatacenters(args *struct{}, reply *[]string) error { // Sort the DCs sort.Strings(dcs) + if !c.srv.config.DisableCoordinates { + if err := c.srv.sortDatacentersByDistance(dcs); err != nil { + return err + } + } // Return *reply = dcs @@ -132,7 +137,10 @@ func (c *Catalog) ListNodes(args *structs.DCSpecificRequest, reply *structs.Inde } reply.Index, reply.Nodes = index, nodes - return c.srv.sortByDistanceFrom(args.Source, reply.Nodes) + if c.srv.config.DisableCoordinates { + return nil + } + return c.srv.sortNodesByDistanceFrom(args.Source, reply.Nodes) }) } @@ -192,7 +200,10 @@ func (c *Catalog) ServiceNodes(args *structs.ServiceSpecificRequest, reply *stru if err := c.srv.filterACL(args.Token, reply); err != nil { return err } - return c.srv.sortByDistanceFrom(args.Source, reply.ServiceNodes) + if c.srv.config.DisableCoordinates { + return nil + } + return c.srv.sortNodesByDistanceFrom(args.Source, reply.ServiceNodes) }) // Provide some metrics diff --git a/consul/catalog_endpoint_test.go b/consul/catalog_endpoint_test.go index efc418fbb..5ce1115b4 100644 --- a/consul/catalog_endpoint_test.go +++ b/consul/catalog_endpoint_test.go @@ -4,7 +4,6 @@ import ( "fmt" "net/rpc" "os" - "sort" "strings" "testing" "time" @@ -234,9 +233,7 @@ func TestCatalogListDatacenters(t *testing.T) { t.Fatalf("err: %v", err) } - // Sort the dcs - sort.Strings(out) - + // The DCs should come out sorted by default. if len(out) != 2 { t.Fatalf("bad: %v", out) } @@ -248,6 +245,75 @@ func TestCatalogListDatacenters(t *testing.T) { } } +func TestCatalogListDatacenters_DistanceSort(t *testing.T) { + dir1, s1 := testServer(t) + defer os.RemoveAll(dir1) + defer s1.Shutdown() + client := rpcClient(t, s1) + defer client.Close() + + dir2, s2 := testServerDC(t, "dc2") + defer os.RemoveAll(dir2) + defer s2.Shutdown() + + dir3, s3 := testServerDC(t, "acdc") + defer os.RemoveAll(dir3) + defer s3.Shutdown() + + // Try to join + addr := fmt.Sprintf("127.0.0.1:%d", + s1.config.SerfWANConfig.MemberlistConfig.BindPort) + if _, err := s2.JoinWAN([]string{addr}); err != nil { + t.Fatalf("err: %v", err) + } + if _, err := s3.JoinWAN([]string{addr}); err != nil { + t.Fatalf("err: %v", err) + } + testutil.WaitForLeader(t, client.Call, "dc1") + + var out []string + if err := client.Call("Catalog.ListDatacenters", struct{}{}, &out); err != nil { + t.Fatalf("err: %v", err) + } + + // It's super hard to force the Serfs into a known configuration of + // coordinates, so the best we can do is make sure that the sorting + // function is getting called (it's tested extensively in rtt_test.go). + // Since this is relative to dc1, it will be listed first (proving we + // went into the sort fn) and the other two will be sorted by name since + // there are no known coordinates for them. + if len(out) != 3 { + t.Fatalf("bad: %v", out) + } + if out[0] != "dc1" { + t.Fatalf("bad: %v", out) + } + if out[1] != "acdc" { + t.Fatalf("bad: %v", out) + } + if out[2] != "dc2" { + t.Fatalf("bad: %v", out) + } + + // Make sure we get the natural order if coordinates are disabled. + s1.config.DisableCoordinates = true + if err := client.Call("Catalog.ListDatacenters", struct{}{}, &out); err != nil { + t.Fatalf("err: %v", err) + } + if len(out) != 3 { + t.Fatalf("bad: %v", out) + } + if out[0] != "acdc" { + t.Fatalf("bad: %v", out) + } + if out[1] != "dc1" { + t.Fatalf("bad: %v", out) + } + if out[2] != "dc2" { + t.Fatalf("bad: %v", out) + } +} + func TestCatalogListNodes(t *testing.T) { dir1, s1 := testServer(t) defer os.RemoveAll(dir1) @@ -543,6 +609,34 @@ func TestCatalogListNodes_DistanceSort(t *testing.T) { if out.Nodes[4].Node != s1.config.NodeName { t.Fatalf("bad: %v", out) } + + // Make sure we get the natural order if coordinates are disabled. + s1.config.DisableCoordinates = true + args = structs.DCSpecificRequest{ + Datacenter: "dc1", + Source: structs.QuerySource{Datacenter: "dc1", Node: "foo"}, + } + testutil.WaitForResult(func() (bool, error) { + client.Call("Catalog.ListNodes", &args, &out) + return len(out.Nodes) == 5, nil + }, func(err error) { + t.Fatalf("err: %v", err) + }) + if out.Nodes[0].Node != "aaa" { + t.Fatalf("bad: %v", out) + } + if out.Nodes[1].Node != "bar" { + t.Fatalf("bad: %v", out) + } + if out.Nodes[2].Node != "baz" { + t.Fatalf("bad: %v", out) + } + if out.Nodes[3].Node != "foo" { + t.Fatalf("bad: %v", out) + } + if out.Nodes[4].Node != s1.config.NodeName { + t.Fatalf("bad: %v", out) + } } func BenchmarkCatalogListNodes(t *testing.B) { @@ -888,6 +982,32 @@ func TestCatalogListServiceNodes_DistanceSort(t *testing.T) { if out.ServiceNodes[3].Node != "aaa" { t.Fatalf("bad: %v", out) } + + // Make sure we get the natural order if coordinates are disabled. + s1.config.DisableCoordinates = true + args = structs.ServiceSpecificRequest{ + Datacenter: "dc1", + ServiceName: "db", + Source: structs.QuerySource{Datacenter: "dc1", Node: "foo"}, + } + if err := client.Call("Catalog.ServiceNodes", &args, &out); err != nil { + t.Fatalf("err: %v", err) + } + if len(out.ServiceNodes) != 4 { + t.Fatalf("bad: %v", out) + } + if out.ServiceNodes[0].Node != "aaa" { + t.Fatalf("bad: %v", out) + } + if out.ServiceNodes[1].Node != "foo" { + t.Fatalf("bad: %v", out) + } + if out.ServiceNodes[2].Node != "bar" { + t.Fatalf("bad: %v", out) + } + if out.ServiceNodes[3].Node != "baz" { + t.Fatalf("bad: %v", out) + } } func TestCatalogNodeServices(t *testing.T) { diff --git a/consul/config.go b/consul/config.go index 6336933a5..e5b9dd36f 100644 --- a/consul/config.go +++ b/consul/config.go @@ -296,6 +296,10 @@ func DefaultConfig() *Config { conf.SerfLANConfig.MemberlistConfig.BindPort = DefaultLANSerfPort conf.SerfWANConfig.MemberlistConfig.BindPort = DefaultWANSerfPort + // Cache coordinates for the WAN since the number of servers is small, + // and because we don't store these in the database. + conf.SerfWANConfig.CacheCoordinates = true + // Disable shutdown on removal conf.RaftConfig.ShutdownOnRemove = false diff --git a/consul/rtt.go b/consul/rtt.go index 3cfe90fb4..68c94920e 100644 --- a/consul/rtt.go +++ b/consul/rtt.go @@ -110,9 +110,11 @@ func (s *Server) newSorterByDistanceFrom(c *coordinate.Coordinate, subj interfac } } -// sortByDistanceFrom is used to sort results from our service catalog based on the -// distance (RTT) from the given source node. -func (s *Server) sortByDistanceFrom(source structs.QuerySource, subj interface{}) error { +// sortNodesByDistanceFrom is used to sort results from our service catalog based +// on the round trip time from the given source node. Nodes with missing coordinates +// will get stable sorted at the end of the list. + +func (s *Server) sortNodesByDistanceFrom(source structs.QuerySource, subj interface{}) error { // We can't compare coordinates across DCs. if source.Datacenter != s.config.Datacenter { return nil @@ -137,3 +139,131 @@ func (s *Server) sortByDistanceFrom(source structs.QuerySource, subj interface{} sort.Stable(sorter) return nil } + +// serfer provides the coordinate information we need from the Server in an +// interface that's easy to mock out for testing. Without this, we'd have to +// do some really painful setup to get good unit test coverage of all the cases. +type serfer interface { + GetDatacenter() string + GetCoordinate() (*coordinate.Coordinate, error) + GetCachedCoordinate(node string) (*coordinate.Coordinate, bool) + GetNodesForDatacenter(dc string) []string +} + +// serverSerfer wraps a Server with the serfer interface. +type serverSerfer struct { + server *Server +} + +// See serfer. +func (s *serverSerfer) GetDatacenter() string { + return s.server.config.Datacenter +} + +// See serfer. +func (s *serverSerfer) GetCoordinate() (*coordinate.Coordinate, error) { + return s.server.serfWAN.GetCoordinate() +} + +// See serfer. +func (s *serverSerfer) GetCachedCoordinate(node string) (*coordinate.Coordinate, bool) { + return s.server.serfWAN.GetCachedCoordinate(node) +} + +// See serfer. +func (s *serverSerfer) GetNodesForDatacenter(dc string) []string { + nodes := make([]string, 0) + for _, part := range s.server.remoteConsuls[dc] { + nodes = append(nodes, part.Name) + } + return nodes +} + +// sortDatacentersByDistance will sort the given list of DCs based on the +// median RTT to all nodes we know about from the WAN gossip pool). DCs with +// missing coordinates will be stable sorted to the end of the list. +func (s *Server) sortDatacentersByDistance(dcs []string) error { + serfer := serverSerfer{s} + return sortDatacentersByDistance(&serfer, dcs) +} + +// getDatacenterDistance will return the median round trip time estimate for +// the given DC from the given serfer, in seconds. This will return positive +// infinity if no coordinates are available. +func getDatacenterDistance(s serfer, dc string) (float64, error) { + // If this is the serfer's DC then just bail with zero RTT. + if dc == s.GetDatacenter() { + return 0.0, nil + } + + // Otherwise measure from the serfer to the nodes in the other DC. + coord, err := s.GetCoordinate() + if err != nil { + return 0.0, err + } + + // Fetch all the nodes in the DC. + nodes := s.GetNodesForDatacenter(dc) + subvec := make([]float64, len(nodes)) + for j, node := range nodes { + if other, ok := s.GetCachedCoordinate(node); ok { + subvec[j] = computeDistance(coord, other) + } else { + subvec[j] = computeDistance(coord, nil) + } + } + + // Compute the median by sorting and taking the middle item. + sort.Float64s(subvec) + fmt.Println("%v", subvec) + if len(subvec) > 0 { + return subvec[len(subvec)/2], nil + } + + return computeDistance(coord, nil), nil +} + +// datacenterSorter takes a list of DC names and a parallel vector of distances +// and implements sort.Interface, keeping both structures coherent and sorting +// by distance. +type datacenterSorter struct { + Names []string + Vec []float64 +} + +// See sort.Interface. +func (n *datacenterSorter) Len() int { + return len(n.Names) +} + +// See sort.Interface. +func (n *datacenterSorter) Swap(i, j int) { + n.Names[i], n.Names[j] = n.Names[j], n.Names[i] + n.Vec[i], n.Vec[j] = n.Vec[j], n.Vec[i] +} + +// See sort.Interface. +func (n *datacenterSorter) Less(i, j int) bool { + return n.Vec[i] < n.Vec[j] +} + +// sortDatacentersByDistance will sort the given list of DCs based on the +// median RTT to all nodes the given serfer knows about from the WAN gossip +// pool). DCs with missing coordinates will be stable sorted to the end of the +// list. +func sortDatacentersByDistance(s serfer, dcs []string) error { + // Build up a list of median distances to the other DCs. + vec := make([]float64, len(dcs)) + for i, dc := range dcs { + rtt, err := getDatacenterDistance(s, dc) + if err != nil { + return err + } + + vec[i] = rtt + } + + sorter := &datacenterSorter{dcs, vec} + sort.Stable(sorter) + return nil +} diff --git a/consul/rtt_test.go b/consul/rtt_test.go index 0d4ec3bd5..f2da14e8d 100644 --- a/consul/rtt_test.go +++ b/consul/rtt_test.go @@ -1,6 +1,7 @@ package consul import ( + "math" "net/rpc" "os" "strings" @@ -96,7 +97,7 @@ func seedCoordinates(t *testing.T, client *rpc.Client, server *Server) { time.Sleep(2 * server.config.CoordinateUpdatePeriod) } -func TestRtt_sortByDistanceFrom_Nodes(t *testing.T) { +func TestRtt_sortNodesByDistanceFrom_Nodes(t *testing.T) { dir, server := testServer(t) defer os.RemoveAll(dir) defer server.Shutdown() @@ -117,7 +118,7 @@ func TestRtt_sortByDistanceFrom_Nodes(t *testing.T) { // The zero value for the source should not trigger any sorting. var source structs.QuerySource - if err := server.sortByDistanceFrom(source, nodes); err != nil { + if err := server.sortNodesByDistanceFrom(source, nodes); err != nil { t.Fatalf("err: %v", err) } verifyNodeSort(t, nodes, "apple,node1,node2,node3,node4,node5") @@ -125,7 +126,7 @@ func TestRtt_sortByDistanceFrom_Nodes(t *testing.T) { // Same for a source in some other DC. source.Node = "node1" source.Datacenter = "dc2" - if err := server.sortByDistanceFrom(source, nodes); err != nil { + if err := server.sortNodesByDistanceFrom(source, nodes); err != nil { t.Fatalf("err: %v", err) } verifyNodeSort(t, nodes, "apple,node1,node2,node3,node4,node5") @@ -133,7 +134,7 @@ func TestRtt_sortByDistanceFrom_Nodes(t *testing.T) { // Same for a source node in our DC that we have no coordinate for. source.Node = "apple" source.Datacenter = "dc1" - if err := server.sortByDistanceFrom(source, nodes); err != nil { + if err := server.sortNodesByDistanceFrom(source, nodes); err != nil { t.Fatalf("err: %v", err) } verifyNodeSort(t, nodes, "apple,node1,node2,node3,node4,node5") @@ -143,7 +144,7 @@ func TestRtt_sortByDistanceFrom_Nodes(t *testing.T) { // its lexical hegemony. source.Node = "node1" source.Datacenter = "dc1" - if err := server.sortByDistanceFrom(source, nodes); err != nil { + if err := server.sortNodesByDistanceFrom(source, nodes); err != nil { t.Fatalf("err: %v", err) } verifyNodeSort(t, nodes, "node1,node4,node5,node2,node3,apple") @@ -153,7 +154,7 @@ func TestRtt_sortByDistanceFrom_Nodes(t *testing.T) { // they were in from the previous sort. source.Node = "node2" source.Datacenter = "dc1" - if err := server.sortByDistanceFrom(source, nodes); err != nil { + if err := server.sortNodesByDistanceFrom(source, nodes); err != nil { t.Fatalf("err: %v", err) } verifyNodeSort(t, nodes, "node2,node5,node3,node4,node1,apple") @@ -161,13 +162,13 @@ func TestRtt_sortByDistanceFrom_Nodes(t *testing.T) { // Let's exercise the stable sort explicitly to make sure we didn't // just get lucky. nodes[1], nodes[2] = nodes[2], nodes[1] - if err := server.sortByDistanceFrom(source, nodes); err != nil { + if err := server.sortNodesByDistanceFrom(source, nodes); err != nil { t.Fatalf("err: %v", err) } verifyNodeSort(t, nodes, "node2,node3,node5,node4,node1,apple") } -func TestRtt_sortByDistanceFrom_ServiceNodes(t *testing.T) { +func TestRtt_sortNodesByDistanceFrom_ServiceNodes(t *testing.T) { dir, server := testServer(t) defer os.RemoveAll(dir) defer server.Shutdown() @@ -188,7 +189,7 @@ func TestRtt_sortByDistanceFrom_ServiceNodes(t *testing.T) { // The zero value for the source should not trigger any sorting. var source structs.QuerySource - if err := server.sortByDistanceFrom(source, nodes); err != nil { + if err := server.sortNodesByDistanceFrom(source, nodes); err != nil { t.Fatalf("err: %v", err) } verifyServiceNodeSort(t, nodes, "apple,node1,node2,node3,node4,node5") @@ -196,7 +197,7 @@ func TestRtt_sortByDistanceFrom_ServiceNodes(t *testing.T) { // Same for a source in some other DC. source.Node = "node1" source.Datacenter = "dc2" - if err := server.sortByDistanceFrom(source, nodes); err != nil { + if err := server.sortNodesByDistanceFrom(source, nodes); err != nil { t.Fatalf("err: %v", err) } verifyServiceNodeSort(t, nodes, "apple,node1,node2,node3,node4,node5") @@ -204,7 +205,7 @@ func TestRtt_sortByDistanceFrom_ServiceNodes(t *testing.T) { // Same for a source node in our DC that we have no coordinate for. source.Node = "apple" source.Datacenter = "dc1" - if err := server.sortByDistanceFrom(source, nodes); err != nil { + if err := server.sortNodesByDistanceFrom(source, nodes); err != nil { t.Fatalf("err: %v", err) } verifyServiceNodeSort(t, nodes, "apple,node1,node2,node3,node4,node5") @@ -214,7 +215,7 @@ func TestRtt_sortByDistanceFrom_ServiceNodes(t *testing.T) { // its lexical hegemony. source.Node = "node1" source.Datacenter = "dc1" - if err := server.sortByDistanceFrom(source, nodes); err != nil { + if err := server.sortNodesByDistanceFrom(source, nodes); err != nil { t.Fatalf("err: %v", err) } verifyServiceNodeSort(t, nodes, "node1,node4,node5,node2,node3,apple") @@ -224,7 +225,7 @@ func TestRtt_sortByDistanceFrom_ServiceNodes(t *testing.T) { // they were in from the previous sort. source.Node = "node2" source.Datacenter = "dc1" - if err := server.sortByDistanceFrom(source, nodes); err != nil { + if err := server.sortNodesByDistanceFrom(source, nodes); err != nil { t.Fatalf("err: %v", err) } verifyServiceNodeSort(t, nodes, "node2,node5,node3,node4,node1,apple") @@ -232,8 +233,141 @@ func TestRtt_sortByDistanceFrom_ServiceNodes(t *testing.T) { // Let's exercise the stable sort explicitly to make sure we didn't // just get lucky. nodes[1], nodes[2] = nodes[2], nodes[1] - if err := server.sortByDistanceFrom(source, nodes); err != nil { + if err := server.sortNodesByDistanceFrom(source, nodes); err != nil { t.Fatalf("err: %v", err) } verifyServiceNodeSort(t, nodes, "node2,node3,node5,node4,node1,apple") } + +// mockNodeMap is keyed by node name and the values are the coordinates of the +// node. +type mockNodeMap map[string]*coordinate.Coordinate + +// mockServer is used to provide a serfer interface for unit tests. The key is +// DC, which selects a map from node name to coordinate for that node. +type mockServer map[string]mockNodeMap + +// newMockServer is used to generate a serfer interface that presents a known DC +// topology for unit tests. The server is in dc0. +// +// Here's the layout of the nodes: +// +// /---- dc1 ----\ /- dc2 -\ /- dc0 -\ +// node2 node1 node3 node1 node1 +// | | | | | | | | | | | +// 0 1 2 3 4 5 6 7 8 9 10 (ms) +// +// We also include a node4 in dc1 with no known coordinate, as well as a +// mysterious dcX with no nodes with known coordinates. +// +func newMockServer() *mockServer { + s := make(mockServer) + s["dc0"] = mockNodeMap{ + "dc0.node1": generateCoordinate(10 * time.Millisecond), + } + s["dc1"] = mockNodeMap{ + "dc1.node1": generateCoordinate(3 * time.Millisecond), + "dc1.node2": generateCoordinate(2 * time.Millisecond), + "dc1.node3": generateCoordinate(5 * time.Millisecond), + "dc1.node4": nil, // no known coordinate + } + s["dc2"] = mockNodeMap{ + "dc2.node1": generateCoordinate(8 * time.Millisecond), + } + s["dcX"] = mockNodeMap{ + "dcX.node1": nil, // no known coordinate + } + return &s +} + +// See serfer. +func (s *mockServer) GetDatacenter() string { + return "dc0" +} + +// See serfer. +func (s *mockServer) GetCoordinate() (*coordinate.Coordinate, error) { + return (*s)["dc0"]["dc0.node1"], nil +} + +// See serfer. +func (s *mockServer) GetCachedCoordinate(node string) (*coordinate.Coordinate, bool) { + for _, nodes := range *s { + for n, coord := range nodes { + if n == node && coord != nil { + return coord, true + } + } + } + return nil, false +} + +// See serfer. +func (s *mockServer) GetNodesForDatacenter(dc string) []string { + nodes := make([]string, 0) + if n, ok := (*s)[dc]; ok { + for name := range n { + nodes = append(nodes, name) + } + } + return nodes +} + +func TestRtt_getDatacenterDistance(t *testing.T) { + s := newMockServer() + + // The serfer's own DC is always 0 ms away. + if dist, err := getDatacenterDistance(s, "dc0"); err != nil || dist != 0.0 { + t.Fatalf("bad: %v err: %v", dist, err) + } + + // Check a DC with no coordinates, which should give positive infinity. + if dist, err := getDatacenterDistance(s, "dcX"); err != nil || dist != math.Inf(1.0) { + t.Fatalf("bad: %v err: %v", dist, err) + } + + // Similar for a totally unknown DC. + if dist, err := getDatacenterDistance(s, "acdc"); err != nil || dist != math.Inf(1.0) { + t.Fatalf("bad: %v err: %v", dist, err) + } + + // Check the trivial median case (just one node). + if dist, err := getDatacenterDistance(s, "dc2"); err != nil || dist != 0.002 { + t.Fatalf("bad: %v err: %v", dist, err) + } + + // Check the more interesting median case, note that there's a mystery + // node4 in there that should make the distances sort like this: + // + // [0] node3 (0.005), [1] node1 (0.007), [2] node2 (0.008), [3] node4 (+Inf) + // + // So the median should be at index 4 / 2 = 2 -> 0.008. + if dist, err := getDatacenterDistance(s, "dc1"); err != nil || dist != 0.008 { + t.Fatalf("bad: %v err: %v", dist, err) + } +} + +func TestRtt_sortDatacentersByDistance(t *testing.T) { + s := newMockServer() + + dcs := []string{"acdc", "dc0", "dc1", "dc2", "dcX"} + if err := sortDatacentersByDistance(s, dcs); err != nil { + t.Fatalf("err: %v", err) + } + + expected := "dc0,dc2,dc1,acdc,dcX" + if actual := strings.Join(dcs, ","); actual != expected { + t.Fatalf("bad sort: %s != %s", actual, expected) + } + + // Make sure the sort is stable and we didn't just get lucky. + dcs = []string{"dcX", "dc0", "dc1", "dc2", "acdc"} + if err := sortDatacentersByDistance(s, dcs); err != nil { + t.Fatalf("err: %v", err) + } + + expected = "dc0,dc2,dc1,dcX,acdc" + if actual := strings.Join(dcs, ","); actual != expected { + t.Fatalf("bad sort: %s != %s", actual, expected) + } +} diff --git a/consul/util.go b/consul/util.go index a9544c0f2..f5a29a49b 100644 --- a/consul/util.go +++ b/consul/util.go @@ -141,7 +141,7 @@ func isConsulServer(m serf.Member) (bool, *serverParts) { return true, parts } -// Returns if a member is a consul node. Returns a boo, +// Returns if a member is a consul node. Returns a bool, // and the datacenter. func isConsulNode(m serf.Member) (bool, string) { if m.Tags["role"] != "node" { diff --git a/website/source/docs/agent/http/catalog.html.markdown b/website/source/docs/agent/http/catalog.html.markdown index 554719e5c..ccf67311b 100644 --- a/website/source/docs/agent/http/catalog.html.markdown +++ b/website/source/docs/agent/http/catalog.html.markdown @@ -159,6 +159,10 @@ If the API call succeeds a 200 status code is returned. This endpoint is hit with a GET and is used to return all the datacenters that are known by the Consul server. +The datacenters will be sorted in ascending order based on the +estimated median round trip time from the server to the servers +in that datacenter. + It returns a JSON body like this: ```javascript