From a73ed8c79a8366308fdc15f6ac85ea7890948071 Mon Sep 17 00:00:00 2001 From: James Phillips Date: Tue, 24 Oct 2017 22:09:06 -0700 Subject: [PATCH 1/4] Adds retry to API metrics test (flaky test). --- api/agent_test.go | 17 ++++++++--------- 1 file changed, 8 insertions(+), 9 deletions(-) diff --git a/api/agent_test.go b/api/agent_test.go index 89c2283a8..7d97a3af7 100644 --- a/api/agent_test.go +++ b/api/agent_test.go @@ -9,6 +9,7 @@ import ( "time" "github.com/hashicorp/consul/testutil" + "github.com/hashicorp/consul/testutil/retry" "github.com/hashicorp/serf/serf" ) @@ -42,16 +43,14 @@ func TestAPI_AgentMetrics(t *testing.T) { t.Fatalf("err: %v", err) } - var found bool - for _, g := range metrics.Gauges { - if g.Name == "consul.runtime.alloc_bytes" { - found = true - break + retry.Run(t, func(r *retry.R) { + for _, g := range metrics.Gauges { + if g.Name == "consul.runtime.alloc_bytes" { + return + } } - } - if !found { - t.Fatalf("missing runtime metrics") - } + r.Fatalf("missing runtime metrics") + }) } func TestAPI_AgentReload(t *testing.T) { From b5f8a16ea35985c860048ad3f254246fd3cb2662 Mon Sep 17 00:00:00 2001 From: Alex Dadgar Date: Wed, 25 Oct 2017 15:09:43 -0700 Subject: [PATCH 2/4] Initialize freeport lazily to avoid runtime issues This PR makes freeport initialize lazily rather than using an init method. --- lib/freeport/freeport.go | 10 +++++++++- 1 file changed, 9 insertions(+), 1 deletion(-) diff --git a/lib/freeport/freeport.go b/lib/freeport/freeport.go index c09ff4cd6..882998314 100644 --- a/lib/freeport/freeport.go +++ b/lib/freeport/freeport.go @@ -40,11 +40,16 @@ var ( // mu guards nextPort mu sync.Mutex + // once is used to do the initialization on the first call to retrieve free + // ports + once sync.Once + // port is the last allocated port. port int ) -func init() { +// initialize is used to initialize freeport. +func initialize() { if lowPort+maxBlocks*blockSize > 65535 { panic("freeport: block size too big or too many blocks requested") } @@ -108,6 +113,9 @@ func Free(n int) (ports []int, err error) { return nil, fmt.Errorf("freeport: block size too small") } + // Reserve a port block + once.Do(initialize) + for len(ports) < n { port++ From 87206133be6f567e932fd0db6ddcf89377d001a6 Mon Sep 17 00:00:00 2001 From: Frank Schroeder Date: Thu, 26 Oct 2017 14:24:42 +0200 Subject: [PATCH 3/4] agent: add /v1/coordianate/node/:node endpoint This patch adds a /v1/coordinate/node/:node endpoint to get the network coordinates for a single node in the network. Since Consul Enterprise supports network segments it is still possible to receive mutiple entries for a single node - one per segment. --- agent/coordinate_endpoint.go | 68 +++++++++++----- agent/coordinate_endpoint_test.go | 109 ++++++++++++++++++++++++++ agent/http.go | 2 + agent/http_test.go | 1 + website/source/api/coordinate.html.md | 54 ++++++++++++- 5 files changed, 215 insertions(+), 19 deletions(-) diff --git a/agent/coordinate_endpoint.go b/agent/coordinate_endpoint.go index e9ac566d9..8dd944393 100644 --- a/agent/coordinate_endpoint.go +++ b/agent/coordinate_endpoint.go @@ -4,6 +4,7 @@ import ( "fmt" "net/http" "sort" + "strings" "github.com/hashicorp/consul/agent/structs" ) @@ -85,22 +86,53 @@ func (s *HTTPServer) CoordinateNodes(resp http.ResponseWriter, req *http.Request return nil, err } - // Use empty list instead of nil. - if out.Coordinates == nil { - out.Coordinates = make(structs.Coordinates, 0) - } - - // Filter by segment if applicable - if v, ok := req.URL.Query()["segment"]; ok && len(v) > 0 { - segment := v[0] - filtered := make(structs.Coordinates, 0) - for _, coord := range out.Coordinates { - if coord.Segment == segment { - filtered = append(filtered, coord) - } - } - out.Coordinates = filtered - } - - return out.Coordinates, nil + return filterCoordinates(req, "", out.Coordinates), nil +} + +// CoordinateNode returns the LAN node in the given datacenter, along with +// raw network coordinates. +func (s *HTTPServer) CoordinateNode(resp http.ResponseWriter, req *http.Request) (interface{}, error) { + if req.Method != "GET" { + return nil, MethodNotAllowedError{req.Method, []string{"GET"}} + } + + args := structs.DCSpecificRequest{} + if done := s.parse(resp, req, &args.Datacenter, &args.QueryOptions); done { + return nil, nil + } + + var out structs.IndexedCoordinates + defer setMeta(resp, &out.QueryMeta) + if err := s.agent.RPC("Coordinate.ListNodes", &args, &out); err != nil { + sort.Sort(&sorter{out.Coordinates}) + return nil, err + } + + node := strings.TrimPrefix(req.URL.Path, "/v1/coordinate/node/") + return filterCoordinates(req, node, out.Coordinates), nil +} + +func filterCoordinates(req *http.Request, node string, in structs.Coordinates) structs.Coordinates { + out := structs.Coordinates{} + + if in == nil { + return out + } + + segment := "" + v, filterBySegment := req.URL.Query()["segment"] + if filterBySegment && len(v) > 0 { + segment = v[0] + } + + for _, c := range in { + if node != "" && c.Node != node { + continue + } + if filterBySegment && c.Segment != segment { + continue + } + out = append(out, c) + } + return out } diff --git a/agent/coordinate_endpoint_test.go b/agent/coordinate_endpoint_test.go index 455611ea6..74da244c0 100644 --- a/agent/coordinate_endpoint_test.go +++ b/agent/coordinate_endpoint_test.go @@ -140,3 +140,112 @@ func TestCoordinate_Nodes(t *testing.T) { t.Fatalf("bad: %v", coordinates) } } + +func TestCoordinate_Node(t *testing.T) { + t.Parallel() + a := NewTestAgent(t.Name(), "") + defer a.Shutdown() + + // Make sure an empty list is non-nil. + req, _ := http.NewRequest("GET", "/v1/coordinate/node/foo?dc=dc1", nil) + resp := httptest.NewRecorder() + obj, err := a.srv.CoordinateNode(resp, req) + if err != nil { + t.Fatalf("err: %v", err) + } + + coordinates := obj.(structs.Coordinates) + if coordinates == nil || len(coordinates) != 0 { + t.Fatalf("bad: %v", coordinates) + } + + // Register the nodes. + nodes := []string{"foo", "bar"} + for _, node := range nodes { + req := structs.RegisterRequest{ + Datacenter: "dc1", + Node: node, + Address: "127.0.0.1", + } + var reply struct{} + if err := a.RPC("Catalog.Register", &req, &reply); err != nil { + t.Fatalf("err: %s", err) + } + } + + // Send some coordinates for a few nodes, waiting a little while for the + // batch update to run. + arg1 := structs.CoordinateUpdateRequest{ + Datacenter: "dc1", + Node: "foo", + Segment: "alpha", + Coord: coordinate.NewCoordinate(coordinate.DefaultConfig()), + } + var out struct{} + if err := a.RPC("Coordinate.Update", &arg1, &out); err != nil { + t.Fatalf("err: %v", err) + } + + arg2 := structs.CoordinateUpdateRequest{ + Datacenter: "dc1", + Node: "bar", + Coord: coordinate.NewCoordinate(coordinate.DefaultConfig()), + } + if err := a.RPC("Coordinate.Update", &arg2, &out); err != nil { + t.Fatalf("err: %v", err) + } + time.Sleep(300 * time.Millisecond) + + // Query back and check the nodes are present and sorted correctly. + req, _ = http.NewRequest("GET", "/v1/coordinate/node/foo?dc=dc1", nil) + resp = httptest.NewRecorder() + obj, err = a.srv.CoordinateNode(resp, req) + if err != nil { + t.Fatalf("err: %v", err) + } + + coordinates = obj.(structs.Coordinates) + if len(coordinates) != 1 || + coordinates[0].Node != "foo" { + t.Fatalf("bad: %v", coordinates) + } + + // Filter on a nonexistant node segment + req, _ = http.NewRequest("GET", "/v1/coordinate/node/foo?segment=nope", nil) + resp = httptest.NewRecorder() + obj, err = a.srv.CoordinateNode(resp, req) + if err != nil { + t.Fatalf("err: %v", err) + } + + coordinates = obj.(structs.Coordinates) + if len(coordinates) != 0 { + t.Fatalf("bad: %v", coordinates) + } + + // Filter on a real node segment + req, _ = http.NewRequest("GET", "/v1/coordinate/node/foo?segment=alpha", nil) + resp = httptest.NewRecorder() + obj, err = a.srv.CoordinateNode(resp, req) + if err != nil { + t.Fatalf("err: %v", err) + } + + coordinates = obj.(structs.Coordinates) + if len(coordinates) != 1 || coordinates[0].Node != "foo" { + t.Fatalf("bad: %v", coordinates) + } + + // Make sure the empty filter works + req, _ = http.NewRequest("GET", "/v1/coordinate/node/foo?segment=", nil) + resp = httptest.NewRecorder() + obj, err = a.srv.CoordinateNode(resp, req) + if err != nil { + t.Fatalf("err: %v", err) + } + + coordinates = obj.(structs.Coordinates) + if len(coordinates) != 0 { + t.Fatalf("bad: %v", coordinates) + } +} diff --git a/agent/http.go b/agent/http.go index 10189517d..0abaf4234 100644 --- a/agent/http.go +++ b/agent/http.go @@ -139,9 +139,11 @@ func (s *HTTPServer) handler(enableDebug bool) http.Handler { if !s.agent.config.DisableCoordinates { handleFuncMetrics("/v1/coordinate/datacenters", s.wrap(s.CoordinateDatacenters)) handleFuncMetrics("/v1/coordinate/nodes", s.wrap(s.CoordinateNodes)) + handleFuncMetrics("/v1/coordinate/node/", s.wrap(s.CoordinateNode)) } else { handleFuncMetrics("/v1/coordinate/datacenters", s.wrap(coordinateDisabled)) handleFuncMetrics("/v1/coordinate/nodes", s.wrap(coordinateDisabled)) + handleFuncMetrics("/v1/coordinate/node/", s.wrap(coordinateDisabled)) } handleFuncMetrics("/v1/event/fire/", s.wrap(s.EventFire)) handleFuncMetrics("/v1/event/list", s.wrap(s.EventList)) diff --git a/agent/http_test.go b/agent/http_test.go index 3f1c70df7..517b90807 100644 --- a/agent/http_test.go +++ b/agent/http_test.go @@ -350,6 +350,7 @@ func TestHTTPAPI_MethodNotAllowed(t *testing.T) { {"GET", "/v1/catalog/services"}, {"GET", "/v1/coordinate/datacenters"}, {"GET", "/v1/coordinate/nodes"}, + {"GET", "/v1/coordinate/node/"}, {"PUT", "/v1/event/fire/"}, {"GET", "/v1/event/list"}, {"GET", "/v1/health/checks/"}, diff --git a/website/source/api/coordinate.html.md b/website/source/api/coordinate.html.md index f92b8601d..e3d87fcab 100644 --- a/website/source/api/coordinate.html.md +++ b/website/source/api/coordinate.html.md @@ -71,7 +71,7 @@ In **Consul Enterprise**, this will include coordinates for user-added network areas as well, as indicated by the `AreaID`. Coordinates are only compatible within the same area. -## Read LAN Coordinates +## Read LAN Coordinates for all nodes This endpoint returns the LAN network coordinates for all nodes in a given datacenter. @@ -122,3 +122,55 @@ $ curl \ In **Consul Enterprise**, this may include multiple coordinates for the same node, each marked with a different `Segment`. Coordinates are only compatible within the same segment. + +## Read LAN Coordinates for a node + +This endpoint returns the LAN network coordinates for all nodes in a given +datacenter. + +| Method | Path | Produces | +| ------ | ---------------------------- | -------------------------- | +| `GET` | `/coordinate/node/:node` | `application/json` | + +The table below shows this endpoint's support for +[blocking queries](/api/index.html#blocking-queries), +[consistency modes](/api/index.html#consistency-modes), and +[required ACLs](/api/index.html#acls). + +| Blocking Queries | Consistency Modes | ACL Required | +| ---------------- | ----------------- | ------------ | +| `YES` | `all` | `node:read` | + +### Parameters + +- `dc` `(string: "")` - Specifies the datacenter to query. This will default to + the datacenter of the agent being queried. This is specified as part of the + URL as a query parameter. + +### Sample Request + +```text +$ curl \ + https://consul.rocks/v1/coordinate/node/agent-one +``` + +### Sample Response + +```json +[ + { + "Node": "agent-one", + "Segment": "", + "Coord": { + "Adjustment": 0, + "Error": 1.5, + "Height": 0, + "Vec": [0, 0, 0, 0, 0, 0, 0, 0] + } + } +] +``` + +In **Consul Enterprise**, this may include multiple coordinates for the same node, +each marked with a different `Segment`. Coordinates are only compatible within the same +segment. From f80e70271debc8b0f49ffbf1c269a5f6ae09ac04 Mon Sep 17 00:00:00 2001 From: Kyle Havlovitz Date: Thu, 26 Oct 2017 19:16:40 -0700 Subject: [PATCH 4/4] Added Coordinate.Node rpc endpoint and client api method --- agent/consul/coordinate_endpoint.go | 42 +++++++ agent/consul/coordinate_endpoint_test.go | 154 ++++++++++++++++++++++- agent/consul/rtt.go | 10 +- agent/consul/state/coordinate.go | 10 +- agent/consul/state/coordinate_test.go | 10 +- agent/coordinate_endpoint.go | 6 +- api/coordinate.go | 21 ++++ api/coordinate_test.go | 19 +++ 8 files changed, 251 insertions(+), 21 deletions(-) diff --git a/agent/consul/coordinate_endpoint.go b/agent/consul/coordinate_endpoint.go index a001fad7b..abf255fbc 100644 --- a/agent/consul/coordinate_endpoint.go +++ b/agent/consul/coordinate_endpoint.go @@ -200,3 +200,45 @@ func (c *Coordinate) ListNodes(args *structs.DCSpecificRequest, reply *structs.I return nil }) } + +// ListNodes returns the list of nodes with their raw network coordinates (if no +// coordinates are available for a node it won't appear in this list). +func (c *Coordinate) Node(args *structs.NodeSpecificRequest, reply *structs.IndexedCoordinates) error { + if done, err := c.srv.forward("Coordinate.Node", args, args, reply); done { + return err + } + + // Fetch the ACL token, if any, and enforce the node policy if enabled. + rule, err := c.srv.resolveToken(args.Token) + if err != nil { + return err + } + if rule != nil && c.srv.config.ACLEnforceVersion8 { + // We don't enforce the sentinel policy here, since at this time + // sentinel only applies to creating or updating node or service + // info, not updating coordinates. + if !rule.NodeWrite(args.Node, nil) { + return acl.ErrPermissionDenied + } + } + + return c.srv.blockingQuery(&args.QueryOptions, + &reply.QueryMeta, + func(ws memdb.WatchSet, state *state.Store) error { + index, nodeCoords, err := state.Coordinate(args.Node, ws) + if err != nil { + return err + } + + var coords structs.Coordinates + for segment, coord := range nodeCoords { + coords = append(coords, &structs.Coordinate{ + Node: args.Node, + Segment: segment, + Coord: coord, + }) + } + reply.Index, reply.Coordinates = index, coords + return nil + }) +} diff --git a/agent/consul/coordinate_endpoint_test.go b/agent/consul/coordinate_endpoint_test.go index ba9ded9a7..5f23aeb2c 100644 --- a/agent/consul/coordinate_endpoint_test.go +++ b/agent/consul/coordinate_endpoint_test.go @@ -86,13 +86,13 @@ func TestCoordinate_Update(t *testing.T) { // Make sure the updates did not yet apply because the update period // hasn't expired. state := s1.fsm.State() - c, err := state.Coordinate("node1") + _, c, err := state.Coordinate("node1", nil) if err != nil { t.Fatalf("err: %v", err) } verify.Values(t, "", c, lib.CoordinateSet{}) - c, err = state.Coordinate("node2") + _, c, err = state.Coordinate("node2", nil) if err != nil { t.Fatalf("err: %v", err) } @@ -107,7 +107,7 @@ func TestCoordinate_Update(t *testing.T) { // Wait a while and the updates should get picked up. time.Sleep(3 * s1.config.CoordinateUpdatePeriod) - c, err = state.Coordinate("node1") + _, c, err = state.Coordinate("node1", nil) if err != nil { t.Fatalf("err: %v", err) } @@ -116,7 +116,7 @@ func TestCoordinate_Update(t *testing.T) { } verify.Values(t, "", c, expected) - c, err = state.Coordinate("node2") + _, c, err = state.Coordinate("node2", nil) if err != nil { t.Fatalf("err: %v", err) } @@ -155,7 +155,7 @@ func TestCoordinate_Update(t *testing.T) { time.Sleep(3 * s1.config.CoordinateUpdatePeriod) numDropped := 0 for i := 0; i < spamLen; i++ { - c, err = state.Coordinate(fmt.Sprintf("bogusnode%d", i)) + _, c, err = state.Coordinate(fmt.Sprintf("bogusnode%d", i), nil) if err != nil { t.Fatalf("err: %v", err) } @@ -502,3 +502,147 @@ node "foo" { t.Fatalf("bad: %#v", resp.Coordinates) } } + +func TestCoordinate_Node(t *testing.T) { + t.Parallel() + dir1, s1 := testServer(t) + defer os.RemoveAll(dir1) + defer s1.Shutdown() + + codec := rpcClient(t, s1) + defer codec.Close() + testrpc.WaitForLeader(t, s1.RPC, "dc1") + + // Register some nodes. + nodes := []string{"foo", "bar"} + for _, node := range nodes { + req := structs.RegisterRequest{ + Datacenter: "dc1", + Node: node, + Address: "127.0.0.1", + } + var reply struct{} + if err := msgpackrpc.CallWithCodec(codec, "Catalog.Register", &req, &reply); err != nil { + t.Fatalf("err: %v", err) + } + } + + // Send coordinate updates for each node. + arg1 := structs.CoordinateUpdateRequest{ + Datacenter: "dc1", + Node: "foo", + Coord: generateRandomCoordinate(), + } + var out struct{} + if err := msgpackrpc.CallWithCodec(codec, "Coordinate.Update", &arg1, &out); err != nil { + t.Fatalf("err: %v", err) + } + + arg2 := structs.CoordinateUpdateRequest{ + Datacenter: "dc1", + Node: "bar", + Coord: generateRandomCoordinate(), + } + if err := msgpackrpc.CallWithCodec(codec, "Coordinate.Update", &arg2, &out); err != nil { + t.Fatalf("err: %v", err) + } + + // Now query back for a specific node (make sure we only get coordinates for foo). + retry.Run(t, func(r *retry.R) { + arg := structs.NodeSpecificRequest{ + Node: "foo", + Datacenter: "dc1", + } + resp := structs.IndexedCoordinates{} + if err := msgpackrpc.CallWithCodec(codec, "Coordinate.Node", &arg, &resp); err != nil { + r.Fatalf("err: %v", err) + } + if len(resp.Coordinates) != 1 || + resp.Coordinates[0].Node != "foo" { + r.Fatalf("bad: %v", resp.Coordinates) + } + verify.Values(t, "", resp.Coordinates[0].Coord, arg1.Coord) // foo + }) +} + +func TestCoordinate_Node_ACLDeny(t *testing.T) { + t.Parallel() + dir1, s1 := testServerWithConfig(t, func(c *Config) { + c.ACLDatacenter = "dc1" + c.ACLMasterToken = "root" + c.ACLDefaultPolicy = "deny" + c.ACLEnforceVersion8 = false + }) + defer os.RemoveAll(dir1) + defer s1.Shutdown() + codec := rpcClient(t, s1) + defer codec.Close() + + testrpc.WaitForLeader(t, s1.RPC, "dc1") + + // Register some nodes. + nodes := []string{"node1", "node2"} + for _, node := range nodes { + req := structs.RegisterRequest{ + Datacenter: "dc1", + Node: node, + Address: "127.0.0.1", + } + var reply struct{} + if err := msgpackrpc.CallWithCodec(codec, "Catalog.Register", &req, &reply); err != nil { + t.Fatalf("err: %v", err) + } + } + + // Send an update for the first node. This should go through since we + // don't have version 8 ACLs enforced yet. + req := structs.CoordinateUpdateRequest{ + Datacenter: "dc1", + Node: "node1", + Coord: generateRandomCoordinate(), + } + var out struct{} + if err := msgpackrpc.CallWithCodec(codec, "Coordinate.Update", &req, &out); err != nil { + t.Fatalf("err: %v", err) + } + + // Now turn on version 8 enforcement and try again. + s1.config.ACLEnforceVersion8 = true + err := msgpackrpc.CallWithCodec(codec, "Coordinate.Update", &req, &out) + if !acl.IsErrPermissionDenied(err) { + t.Fatalf("err: %v", err) + } + + // Create an ACL that can write to the node. + arg := structs.ACLRequest{ + Datacenter: "dc1", + Op: structs.ACLSet, + ACL: structs.ACL{ + Name: "User token", + Type: structs.ACLTypeClient, + Rules: ` +node "node1" { + policy = "write" +} +`, + }, + WriteRequest: structs.WriteRequest{Token: "root"}, + } + var id string + if err := msgpackrpc.CallWithCodec(codec, "ACL.Apply", &arg, &id); err != nil { + t.Fatalf("err: %v", err) + } + + // With the token, it should now go through. + req.Token = id + if err := msgpackrpc.CallWithCodec(codec, "Coordinate.Update", &req, &out); err != nil { + t.Fatalf("err: %v", err) + } + + // But it should be blocked for the other node. + req.Node = "node2" + err = msgpackrpc.CallWithCodec(codec, "Coordinate.Update", &req, &out) + if !acl.IsErrPermissionDenied(err) { + t.Fatalf("err: %v", err) + } +} diff --git a/agent/consul/rtt.go b/agent/consul/rtt.go index 84a39f34d..8bb1bcaec 100644 --- a/agent/consul/rtt.go +++ b/agent/consul/rtt.go @@ -22,7 +22,7 @@ func (s *Server) newNodeSorter(cs lib.CoordinateSet, nodes structs.Nodes) (sort. state := s.fsm.State() vec := make([]float64, len(nodes)) for i, node := range nodes { - other, err := state.Coordinate(node.Node) + _, other, err := state.Coordinate(node.Node, nil) if err != nil { return nil, err } @@ -62,7 +62,7 @@ func (s *Server) newServiceNodeSorter(cs lib.CoordinateSet, nodes structs.Servic state := s.fsm.State() vec := make([]float64, len(nodes)) for i, node := range nodes { - other, err := state.Coordinate(node.Node) + _, other, err := state.Coordinate(node.Node, nil) if err != nil { return nil, err } @@ -102,7 +102,7 @@ func (s *Server) newHealthCheckSorter(cs lib.CoordinateSet, checks structs.Healt state := s.fsm.State() vec := make([]float64, len(checks)) for i, check := range checks { - other, err := state.Coordinate(check.Node) + _, other, err := state.Coordinate(check.Node, nil) if err != nil { return nil, err } @@ -142,7 +142,7 @@ func (s *Server) newCheckServiceNodeSorter(cs lib.CoordinateSet, nodes structs.C state := s.fsm.State() vec := make([]float64, len(nodes)) for i, node := range nodes { - other, err := state.Coordinate(node.Node.Node) + _, other, err := state.Coordinate(node.Node.Node, nil) if err != nil { return nil, err } @@ -203,7 +203,7 @@ func (s *Server) sortNodesByDistanceFrom(source structs.QuerySource, subj interf // There won't always be coordinates for the source node. If there are // none then we can bail out because there's no meaning for the sort. state := s.fsm.State() - cs, err := state.Coordinate(source.Node) + _, cs, err := state.Coordinate(source.Node, nil) if err != nil { return err } diff --git a/agent/consul/state/coordinate.go b/agent/consul/state/coordinate.go index 83db26455..68087b1cb 100644 --- a/agent/consul/state/coordinate.go +++ b/agent/consul/state/coordinate.go @@ -42,21 +42,25 @@ func (s *Restore) Coordinates(idx uint64, updates structs.Coordinates) error { // Coordinate returns a map of coordinates for the given node, indexed by // network segment. -func (s *Store) Coordinate(node string) (lib.CoordinateSet, error) { +func (s *Store) Coordinate(node string, ws memdb.WatchSet) (uint64, lib.CoordinateSet, error) { tx := s.db.Txn(false) defer tx.Abort() + // Get the table index. + idx := maxIndexTxn(tx, "coordinates") + iter, err := tx.Get("coordinates", "node", node) if err != nil { - return nil, fmt.Errorf("failed coordinate lookup: %s", err) + return 0, nil, fmt.Errorf("failed coordinate lookup: %s", err) } + ws.Add(iter.WatchCh()) results := make(lib.CoordinateSet) for raw := iter.Next(); raw != nil; raw = iter.Next() { coord := raw.(*structs.Coordinate) results[coord.Segment] = coord.Coord } - return results, nil + return idx, results, nil } // Coordinates queries for all nodes with coordinates. diff --git a/agent/consul/state/coordinate_test.go b/agent/consul/state/coordinate_test.go index b126e4478..dcc59c864 100644 --- a/agent/consul/state/coordinate_test.go +++ b/agent/consul/state/coordinate_test.go @@ -42,7 +42,7 @@ func TestStateStore_Coordinate_Updates(t *testing.T) { } verify.Values(t, "", all, structs.Coordinates{}) - coords, err := s.Coordinate("nope") + _, coords, err := s.Coordinate("nope", nil) if err != nil { t.Fatalf("err: %s", err) } @@ -102,7 +102,7 @@ func TestStateStore_Coordinate_Updates(t *testing.T) { // Also verify the per-node coordinate interface. for _, update := range updates { - coords, err := s.Coordinate(update.Node) + _, coords, err := s.Coordinate(update.Node, nil) if err != nil { t.Fatalf("err: %s", err) } @@ -133,7 +133,7 @@ func TestStateStore_Coordinate_Updates(t *testing.T) { // And check the per-node coordinate version of the same thing. for _, update := range updates { - coords, err := s.Coordinate(update.Node) + _, coords, err := s.Coordinate(update.Node, nil) if err != nil { t.Fatalf("err: %s", err) } @@ -188,7 +188,7 @@ func TestStateStore_Coordinate_Cleanup(t *testing.T) { } // Make sure it's in there. - coords, err := s.Coordinate("node1") + _, coords, err := s.Coordinate("node1", nil) if err != nil { t.Fatalf("err: %s", err) } @@ -204,7 +204,7 @@ func TestStateStore_Coordinate_Cleanup(t *testing.T) { } // Make sure the coordinate is gone. - coords, err = s.Coordinate("node1") + _, coords, err = s.Coordinate("node1", nil) if err != nil { t.Fatalf("err: %s", err) } diff --git a/agent/coordinate_endpoint.go b/agent/coordinate_endpoint.go index 8dd944393..ade8b582a 100644 --- a/agent/coordinate_endpoint.go +++ b/agent/coordinate_endpoint.go @@ -96,19 +96,19 @@ func (s *HTTPServer) CoordinateNode(resp http.ResponseWriter, req *http.Request) return nil, MethodNotAllowedError{req.Method, []string{"GET"}} } - args := structs.DCSpecificRequest{} + node := strings.TrimPrefix(req.URL.Path, "/v1/coordinate/node/") + args := structs.NodeSpecificRequest{Node: node} if done := s.parse(resp, req, &args.Datacenter, &args.QueryOptions); done { return nil, nil } var out structs.IndexedCoordinates defer setMeta(resp, &out.QueryMeta) - if err := s.agent.RPC("Coordinate.ListNodes", &args, &out); err != nil { + if err := s.agent.RPC("Coordinate.Node", &args, &out); err != nil { sort.Sort(&sorter{out.Coordinates}) return nil, err } - node := strings.TrimPrefix(req.URL.Path, "/v1/coordinate/node/") return filterCoordinates(req, node, out.Coordinates), nil } diff --git a/api/coordinate.go b/api/coordinate.go index 90214e392..42df0deca 100644 --- a/api/coordinate.go +++ b/api/coordinate.go @@ -66,3 +66,24 @@ func (c *Coordinate) Nodes(q *QueryOptions) ([]*CoordinateEntry, *QueryMeta, err } return out, qm, nil } + +// Node is used to return the coordinates of a single in the LAN pool. +func (c *Coordinate) Node(node string, q *QueryOptions) ([]*CoordinateEntry, *QueryMeta, error) { + r := c.c.newRequest("GET", "/v1/coordinate/node/"+node) + r.setQueryOptions(q) + rtt, resp, err := requireOK(c.c.doRequest(r)) + if err != nil { + return nil, nil, err + } + defer resp.Body.Close() + + qm := &QueryMeta{} + parseQueryMeta(resp, qm) + qm.RequestTime = rtt + + var out []*CoordinateEntry + if err := decodeBody(resp, &out); err != nil { + return nil, nil, err + } + return out, qm, nil +} diff --git a/api/coordinate_test.go b/api/coordinate_test.go index f41756b57..a91d1869b 100644 --- a/api/coordinate_test.go +++ b/api/coordinate_test.go @@ -42,3 +42,22 @@ func TestAPI_CoordinateNodes(t *testing.T) { // get an error. }) } + +func TestAPI_CoordinateNode(t *testing.T) { + t.Parallel() + c, s := makeClient(t) + defer s.Stop() + + coordinate := c.Coordinate() + retry.Run(t, func(r *retry.R) { + _, _, err := coordinate.Node(s.Config.NodeName, nil) + if err != nil { + r.Fatal(err) + } + + // There's not a good way to populate coordinates without + // waiting for them to calculate and update, so the best + // we can do is call the endpoint and make sure we don't + // get an error. + }) +}