Merge branch 'coordinate-node-endpoint' of github.com:hashicorp/consul into esm-changes

This commit is contained in:
Kyle Havlovitz 2017-10-26 19:20:24 -07:00
commit 496dd7ab5b
No known key found for this signature in database
GPG key ID: 8A5E6B173056AD6C
14 changed files with 476 additions and 43 deletions

View file

@ -200,3 +200,45 @@ func (c *Coordinate) ListNodes(args *structs.DCSpecificRequest, reply *structs.I
return nil
})
}
// ListNodes returns the list of nodes with their raw network coordinates (if no
// coordinates are available for a node it won't appear in this list).
func (c *Coordinate) Node(args *structs.NodeSpecificRequest, reply *structs.IndexedCoordinates) error {
if done, err := c.srv.forward("Coordinate.Node", args, args, reply); done {
return err
}
// Fetch the ACL token, if any, and enforce the node policy if enabled.
rule, err := c.srv.resolveToken(args.Token)
if err != nil {
return err
}
if rule != nil && c.srv.config.ACLEnforceVersion8 {
// We don't enforce the sentinel policy here, since at this time
// sentinel only applies to creating or updating node or service
// info, not updating coordinates.
if !rule.NodeWrite(args.Node, nil) {
return acl.ErrPermissionDenied
}
}
return c.srv.blockingQuery(&args.QueryOptions,
&reply.QueryMeta,
func(ws memdb.WatchSet, state *state.Store) error {
index, nodeCoords, err := state.Coordinate(args.Node, ws)
if err != nil {
return err
}
var coords structs.Coordinates
for segment, coord := range nodeCoords {
coords = append(coords, &structs.Coordinate{
Node: args.Node,
Segment: segment,
Coord: coord,
})
}
reply.Index, reply.Coordinates = index, coords
return nil
})
}

View file

@ -86,13 +86,13 @@ func TestCoordinate_Update(t *testing.T) {
// Make sure the updates did not yet apply because the update period
// hasn't expired.
state := s1.fsm.State()
c, err := state.Coordinate("node1")
_, c, err := state.Coordinate("node1", nil)
if err != nil {
t.Fatalf("err: %v", err)
}
verify.Values(t, "", c, lib.CoordinateSet{})
c, err = state.Coordinate("node2")
_, c, err = state.Coordinate("node2", nil)
if err != nil {
t.Fatalf("err: %v", err)
}
@ -107,7 +107,7 @@ func TestCoordinate_Update(t *testing.T) {
// Wait a while and the updates should get picked up.
time.Sleep(3 * s1.config.CoordinateUpdatePeriod)
c, err = state.Coordinate("node1")
_, c, err = state.Coordinate("node1", nil)
if err != nil {
t.Fatalf("err: %v", err)
}
@ -116,7 +116,7 @@ func TestCoordinate_Update(t *testing.T) {
}
verify.Values(t, "", c, expected)
c, err = state.Coordinate("node2")
_, c, err = state.Coordinate("node2", nil)
if err != nil {
t.Fatalf("err: %v", err)
}
@ -155,7 +155,7 @@ func TestCoordinate_Update(t *testing.T) {
time.Sleep(3 * s1.config.CoordinateUpdatePeriod)
numDropped := 0
for i := 0; i < spamLen; i++ {
c, err = state.Coordinate(fmt.Sprintf("bogusnode%d", i))
_, c, err = state.Coordinate(fmt.Sprintf("bogusnode%d", i), nil)
if err != nil {
t.Fatalf("err: %v", err)
}
@ -502,3 +502,147 @@ node "foo" {
t.Fatalf("bad: %#v", resp.Coordinates)
}
}
func TestCoordinate_Node(t *testing.T) {
t.Parallel()
dir1, s1 := testServer(t)
defer os.RemoveAll(dir1)
defer s1.Shutdown()
codec := rpcClient(t, s1)
defer codec.Close()
testrpc.WaitForLeader(t, s1.RPC, "dc1")
// Register some nodes.
nodes := []string{"foo", "bar"}
for _, node := range nodes {
req := structs.RegisterRequest{
Datacenter: "dc1",
Node: node,
Address: "127.0.0.1",
}
var reply struct{}
if err := msgpackrpc.CallWithCodec(codec, "Catalog.Register", &req, &reply); err != nil {
t.Fatalf("err: %v", err)
}
}
// Send coordinate updates for each node.
arg1 := structs.CoordinateUpdateRequest{
Datacenter: "dc1",
Node: "foo",
Coord: generateRandomCoordinate(),
}
var out struct{}
if err := msgpackrpc.CallWithCodec(codec, "Coordinate.Update", &arg1, &out); err != nil {
t.Fatalf("err: %v", err)
}
arg2 := structs.CoordinateUpdateRequest{
Datacenter: "dc1",
Node: "bar",
Coord: generateRandomCoordinate(),
}
if err := msgpackrpc.CallWithCodec(codec, "Coordinate.Update", &arg2, &out); err != nil {
t.Fatalf("err: %v", err)
}
// Now query back for a specific node (make sure we only get coordinates for foo).
retry.Run(t, func(r *retry.R) {
arg := structs.NodeSpecificRequest{
Node: "foo",
Datacenter: "dc1",
}
resp := structs.IndexedCoordinates{}
if err := msgpackrpc.CallWithCodec(codec, "Coordinate.Node", &arg, &resp); err != nil {
r.Fatalf("err: %v", err)
}
if len(resp.Coordinates) != 1 ||
resp.Coordinates[0].Node != "foo" {
r.Fatalf("bad: %v", resp.Coordinates)
}
verify.Values(t, "", resp.Coordinates[0].Coord, arg1.Coord) // foo
})
}
func TestCoordinate_Node_ACLDeny(t *testing.T) {
t.Parallel()
dir1, s1 := testServerWithConfig(t, func(c *Config) {
c.ACLDatacenter = "dc1"
c.ACLMasterToken = "root"
c.ACLDefaultPolicy = "deny"
c.ACLEnforceVersion8 = false
})
defer os.RemoveAll(dir1)
defer s1.Shutdown()
codec := rpcClient(t, s1)
defer codec.Close()
testrpc.WaitForLeader(t, s1.RPC, "dc1")
// Register some nodes.
nodes := []string{"node1", "node2"}
for _, node := range nodes {
req := structs.RegisterRequest{
Datacenter: "dc1",
Node: node,
Address: "127.0.0.1",
}
var reply struct{}
if err := msgpackrpc.CallWithCodec(codec, "Catalog.Register", &req, &reply); err != nil {
t.Fatalf("err: %v", err)
}
}
// Send an update for the first node. This should go through since we
// don't have version 8 ACLs enforced yet.
req := structs.CoordinateUpdateRequest{
Datacenter: "dc1",
Node: "node1",
Coord: generateRandomCoordinate(),
}
var out struct{}
if err := msgpackrpc.CallWithCodec(codec, "Coordinate.Update", &req, &out); err != nil {
t.Fatalf("err: %v", err)
}
// Now turn on version 8 enforcement and try again.
s1.config.ACLEnforceVersion8 = true
err := msgpackrpc.CallWithCodec(codec, "Coordinate.Update", &req, &out)
if !acl.IsErrPermissionDenied(err) {
t.Fatalf("err: %v", err)
}
// Create an ACL that can write to the node.
arg := structs.ACLRequest{
Datacenter: "dc1",
Op: structs.ACLSet,
ACL: structs.ACL{
Name: "User token",
Type: structs.ACLTypeClient,
Rules: `
node "node1" {
policy = "write"
}
`,
},
WriteRequest: structs.WriteRequest{Token: "root"},
}
var id string
if err := msgpackrpc.CallWithCodec(codec, "ACL.Apply", &arg, &id); err != nil {
t.Fatalf("err: %v", err)
}
// With the token, it should now go through.
req.Token = id
if err := msgpackrpc.CallWithCodec(codec, "Coordinate.Update", &req, &out); err != nil {
t.Fatalf("err: %v", err)
}
// But it should be blocked for the other node.
req.Node = "node2"
err = msgpackrpc.CallWithCodec(codec, "Coordinate.Update", &req, &out)
if !acl.IsErrPermissionDenied(err) {
t.Fatalf("err: %v", err)
}
}

View file

@ -22,7 +22,7 @@ func (s *Server) newNodeSorter(cs lib.CoordinateSet, nodes structs.Nodes) (sort.
state := s.fsm.State()
vec := make([]float64, len(nodes))
for i, node := range nodes {
other, err := state.Coordinate(node.Node)
_, other, err := state.Coordinate(node.Node, nil)
if err != nil {
return nil, err
}
@ -62,7 +62,7 @@ func (s *Server) newServiceNodeSorter(cs lib.CoordinateSet, nodes structs.Servic
state := s.fsm.State()
vec := make([]float64, len(nodes))
for i, node := range nodes {
other, err := state.Coordinate(node.Node)
_, other, err := state.Coordinate(node.Node, nil)
if err != nil {
return nil, err
}
@ -102,7 +102,7 @@ func (s *Server) newHealthCheckSorter(cs lib.CoordinateSet, checks structs.Healt
state := s.fsm.State()
vec := make([]float64, len(checks))
for i, check := range checks {
other, err := state.Coordinate(check.Node)
_, other, err := state.Coordinate(check.Node, nil)
if err != nil {
return nil, err
}
@ -142,7 +142,7 @@ func (s *Server) newCheckServiceNodeSorter(cs lib.CoordinateSet, nodes structs.C
state := s.fsm.State()
vec := make([]float64, len(nodes))
for i, node := range nodes {
other, err := state.Coordinate(node.Node.Node)
_, other, err := state.Coordinate(node.Node.Node, nil)
if err != nil {
return nil, err
}
@ -203,7 +203,7 @@ func (s *Server) sortNodesByDistanceFrom(source structs.QuerySource, subj interf
// There won't always be coordinates for the source node. If there are
// none then we can bail out because there's no meaning for the sort.
state := s.fsm.State()
cs, err := state.Coordinate(source.Node)
_, cs, err := state.Coordinate(source.Node, nil)
if err != nil {
return err
}

View file

@ -42,21 +42,25 @@ func (s *Restore) Coordinates(idx uint64, updates structs.Coordinates) error {
// Coordinate returns a map of coordinates for the given node, indexed by
// network segment.
func (s *Store) Coordinate(node string) (lib.CoordinateSet, error) {
func (s *Store) Coordinate(node string, ws memdb.WatchSet) (uint64, lib.CoordinateSet, error) {
tx := s.db.Txn(false)
defer tx.Abort()
// Get the table index.
idx := maxIndexTxn(tx, "coordinates")
iter, err := tx.Get("coordinates", "node", node)
if err != nil {
return nil, fmt.Errorf("failed coordinate lookup: %s", err)
return 0, nil, fmt.Errorf("failed coordinate lookup: %s", err)
}
ws.Add(iter.WatchCh())
results := make(lib.CoordinateSet)
for raw := iter.Next(); raw != nil; raw = iter.Next() {
coord := raw.(*structs.Coordinate)
results[coord.Segment] = coord.Coord
}
return results, nil
return idx, results, nil
}
// Coordinates queries for all nodes with coordinates.

View file

@ -42,7 +42,7 @@ func TestStateStore_Coordinate_Updates(t *testing.T) {
}
verify.Values(t, "", all, structs.Coordinates{})
coords, err := s.Coordinate("nope")
_, coords, err := s.Coordinate("nope", nil)
if err != nil {
t.Fatalf("err: %s", err)
}
@ -102,7 +102,7 @@ func TestStateStore_Coordinate_Updates(t *testing.T) {
// Also verify the per-node coordinate interface.
for _, update := range updates {
coords, err := s.Coordinate(update.Node)
_, coords, err := s.Coordinate(update.Node, nil)
if err != nil {
t.Fatalf("err: %s", err)
}
@ -133,7 +133,7 @@ func TestStateStore_Coordinate_Updates(t *testing.T) {
// And check the per-node coordinate version of the same thing.
for _, update := range updates {
coords, err := s.Coordinate(update.Node)
_, coords, err := s.Coordinate(update.Node, nil)
if err != nil {
t.Fatalf("err: %s", err)
}
@ -188,7 +188,7 @@ func TestStateStore_Coordinate_Cleanup(t *testing.T) {
}
// Make sure it's in there.
coords, err := s.Coordinate("node1")
_, coords, err := s.Coordinate("node1", nil)
if err != nil {
t.Fatalf("err: %s", err)
}
@ -204,7 +204,7 @@ func TestStateStore_Coordinate_Cleanup(t *testing.T) {
}
// Make sure the coordinate is gone.
coords, err = s.Coordinate("node1")
_, coords, err = s.Coordinate("node1", nil)
if err != nil {
t.Fatalf("err: %s", err)
}

View file

@ -4,6 +4,7 @@ import (
"fmt"
"net/http"
"sort"
"strings"
"github.com/hashicorp/consul/agent/structs"
)
@ -85,24 +86,55 @@ func (s *HTTPServer) CoordinateNodes(resp http.ResponseWriter, req *http.Request
return nil, err
}
// Use empty list instead of nil.
if out.Coordinates == nil {
out.Coordinates = make(structs.Coordinates, 0)
return filterCoordinates(req, "", out.Coordinates), nil
}
// Filter by segment if applicable
if v, ok := req.URL.Query()["segment"]; ok && len(v) > 0 {
segment := v[0]
filtered := make(structs.Coordinates, 0)
for _, coord := range out.Coordinates {
if coord.Segment == segment {
filtered = append(filtered, coord)
}
}
out.Coordinates = filtered
// CoordinateNode returns the LAN node in the given datacenter, along with
// raw network coordinates.
func (s *HTTPServer) CoordinateNode(resp http.ResponseWriter, req *http.Request) (interface{}, error) {
if req.Method != "GET" {
return nil, MethodNotAllowedError{req.Method, []string{"GET"}}
}
return out.Coordinates, nil
node := strings.TrimPrefix(req.URL.Path, "/v1/coordinate/node/")
args := structs.NodeSpecificRequest{Node: node}
if done := s.parse(resp, req, &args.Datacenter, &args.QueryOptions); done {
return nil, nil
}
var out structs.IndexedCoordinates
defer setMeta(resp, &out.QueryMeta)
if err := s.agent.RPC("Coordinate.Node", &args, &out); err != nil {
sort.Sort(&sorter{out.Coordinates})
return nil, err
}
return filterCoordinates(req, node, out.Coordinates), nil
}
func filterCoordinates(req *http.Request, node string, in structs.Coordinates) structs.Coordinates {
out := structs.Coordinates{}
if in == nil {
return out
}
segment := ""
v, filterBySegment := req.URL.Query()["segment"]
if filterBySegment && len(v) > 0 {
segment = v[0]
}
for _, c := range in {
if node != "" && c.Node != node {
continue
}
if filterBySegment && c.Segment != segment {
continue
}
out = append(out, c)
}
return out
}
// CoordinateUpdate inserts or updates the LAN coordinate of a node.

View file

@ -140,3 +140,112 @@ func TestCoordinate_Nodes(t *testing.T) {
t.Fatalf("bad: %v", coordinates)
}
}
func TestCoordinate_Node(t *testing.T) {
t.Parallel()
a := NewTestAgent(t.Name(), "")
defer a.Shutdown()
// Make sure an empty list is non-nil.
req, _ := http.NewRequest("GET", "/v1/coordinate/node/foo?dc=dc1", nil)
resp := httptest.NewRecorder()
obj, err := a.srv.CoordinateNode(resp, req)
if err != nil {
t.Fatalf("err: %v", err)
}
coordinates := obj.(structs.Coordinates)
if coordinates == nil || len(coordinates) != 0 {
t.Fatalf("bad: %v", coordinates)
}
// Register the nodes.
nodes := []string{"foo", "bar"}
for _, node := range nodes {
req := structs.RegisterRequest{
Datacenter: "dc1",
Node: node,
Address: "127.0.0.1",
}
var reply struct{}
if err := a.RPC("Catalog.Register", &req, &reply); err != nil {
t.Fatalf("err: %s", err)
}
}
// Send some coordinates for a few nodes, waiting a little while for the
// batch update to run.
arg1 := structs.CoordinateUpdateRequest{
Datacenter: "dc1",
Node: "foo",
Segment: "alpha",
Coord: coordinate.NewCoordinate(coordinate.DefaultConfig()),
}
var out struct{}
if err := a.RPC("Coordinate.Update", &arg1, &out); err != nil {
t.Fatalf("err: %v", err)
}
arg2 := structs.CoordinateUpdateRequest{
Datacenter: "dc1",
Node: "bar",
Coord: coordinate.NewCoordinate(coordinate.DefaultConfig()),
}
if err := a.RPC("Coordinate.Update", &arg2, &out); err != nil {
t.Fatalf("err: %v", err)
}
time.Sleep(300 * time.Millisecond)
// Query back and check the nodes are present and sorted correctly.
req, _ = http.NewRequest("GET", "/v1/coordinate/node/foo?dc=dc1", nil)
resp = httptest.NewRecorder()
obj, err = a.srv.CoordinateNode(resp, req)
if err != nil {
t.Fatalf("err: %v", err)
}
coordinates = obj.(structs.Coordinates)
if len(coordinates) != 1 ||
coordinates[0].Node != "foo" {
t.Fatalf("bad: %v", coordinates)
}
// Filter on a nonexistant node segment
req, _ = http.NewRequest("GET", "/v1/coordinate/node/foo?segment=nope", nil)
resp = httptest.NewRecorder()
obj, err = a.srv.CoordinateNode(resp, req)
if err != nil {
t.Fatalf("err: %v", err)
}
coordinates = obj.(structs.Coordinates)
if len(coordinates) != 0 {
t.Fatalf("bad: %v", coordinates)
}
// Filter on a real node segment
req, _ = http.NewRequest("GET", "/v1/coordinate/node/foo?segment=alpha", nil)
resp = httptest.NewRecorder()
obj, err = a.srv.CoordinateNode(resp, req)
if err != nil {
t.Fatalf("err: %v", err)
}
coordinates = obj.(structs.Coordinates)
if len(coordinates) != 1 || coordinates[0].Node != "foo" {
t.Fatalf("bad: %v", coordinates)
}
// Make sure the empty filter works
req, _ = http.NewRequest("GET", "/v1/coordinate/node/foo?segment=", nil)
resp = httptest.NewRecorder()
obj, err = a.srv.CoordinateNode(resp, req)
if err != nil {
t.Fatalf("err: %v", err)
}
coordinates = obj.(structs.Coordinates)
if len(coordinates) != 0 {
t.Fatalf("bad: %v", coordinates)
}
}

View file

@ -139,10 +139,12 @@ func (s *HTTPServer) handler(enableDebug bool) http.Handler {
if !s.agent.config.DisableCoordinates {
handleFuncMetrics("/v1/coordinate/datacenters", s.wrap(s.CoordinateDatacenters))
handleFuncMetrics("/v1/coordinate/nodes", s.wrap(s.CoordinateNodes))
handleFuncMetrics("/v1/coordinate/node/", s.wrap(s.CoordinateNode))
handleFuncMetrics("/v1/coordinate/update", s.wrap(s.CoordinateUpdate))
} else {
handleFuncMetrics("/v1/coordinate/datacenters", s.wrap(coordinateDisabled))
handleFuncMetrics("/v1/coordinate/nodes", s.wrap(coordinateDisabled))
handleFuncMetrics("/v1/coordinate/node/", s.wrap(coordinateDisabled))
handleFuncMetrics("/v1/coordinate/update", s.wrap(coordinateDisabled))
}
handleFuncMetrics("/v1/event/fire/", s.wrap(s.EventFire))

View file

@ -350,6 +350,7 @@ func TestHTTPAPI_MethodNotAllowed(t *testing.T) {
{"GET", "/v1/catalog/services"},
{"GET", "/v1/coordinate/datacenters"},
{"GET", "/v1/coordinate/nodes"},
{"GET", "/v1/coordinate/node/"},
{"PUT", "/v1/event/fire/"},
{"GET", "/v1/event/list"},
{"GET", "/v1/health/checks/"},

View file

@ -9,6 +9,7 @@ import (
"time"
"github.com/hashicorp/consul/testutil"
"github.com/hashicorp/consul/testutil/retry"
"github.com/hashicorp/serf/serf"
)
@ -42,16 +43,14 @@ func TestAPI_AgentMetrics(t *testing.T) {
t.Fatalf("err: %v", err)
}
var found bool
retry.Run(t, func(r *retry.R) {
for _, g := range metrics.Gauges {
if g.Name == "consul.runtime.alloc_bytes" {
found = true
break
return
}
}
if !found {
t.Fatalf("missing runtime metrics")
}
r.Fatalf("missing runtime metrics")
})
}
func TestAPI_AgentReload(t *testing.T) {

View file

@ -83,3 +83,24 @@ func (c *Coordinate) Update(coord *CoordinateEntry, q *WriteOptions) (*WriteMeta
return wm, nil
}
// Node is used to return the coordinates of a single in the LAN pool.
func (c *Coordinate) Node(node string, q *QueryOptions) ([]*CoordinateEntry, *QueryMeta, error) {
r := c.c.newRequest("GET", "/v1/coordinate/node/"+node)
r.setQueryOptions(q)
rtt, resp, err := requireOK(c.c.doRequest(r))
if err != nil {
return nil, nil, err
}
defer resp.Body.Close()
qm := &QueryMeta{}
parseQueryMeta(resp, qm)
qm.RequestTime = rtt
var out []*CoordinateEntry
if err := decodeBody(resp, &out); err != nil {
return nil, nil, err
}
return out, qm, nil
}

View file

@ -42,3 +42,22 @@ func TestAPI_CoordinateNodes(t *testing.T) {
// get an error.
})
}
func TestAPI_CoordinateNode(t *testing.T) {
t.Parallel()
c, s := makeClient(t)
defer s.Stop()
coordinate := c.Coordinate()
retry.Run(t, func(r *retry.R) {
_, _, err := coordinate.Node(s.Config.NodeName, nil)
if err != nil {
r.Fatal(err)
}
// There's not a good way to populate coordinates without
// waiting for them to calculate and update, so the best
// we can do is call the endpoint and make sure we don't
// get an error.
})
}

View file

@ -40,11 +40,16 @@ var (
// mu guards nextPort
mu sync.Mutex
// once is used to do the initialization on the first call to retrieve free
// ports
once sync.Once
// port is the last allocated port.
port int
)
func init() {
// initialize is used to initialize freeport.
func initialize() {
if lowPort+maxBlocks*blockSize > 65535 {
panic("freeport: block size too big or too many blocks requested")
}
@ -108,6 +113,9 @@ func Free(n int) (ports []int, err error) {
return nil, fmt.Errorf("freeport: block size too small")
}
// Reserve a port block
once.Do(initialize)
for len(ports) < n {
port++

View file

@ -71,7 +71,7 @@ In **Consul Enterprise**, this will include coordinates for user-added network
areas as well, as indicated by the `AreaID`. Coordinates are only compatible
within the same area.
## Read LAN Coordinates
## Read LAN Coordinates for all nodes
This endpoint returns the LAN network coordinates for all nodes in a given
datacenter.
@ -122,3 +122,55 @@ $ curl \
In **Consul Enterprise**, this may include multiple coordinates for the same node,
each marked with a different `Segment`. Coordinates are only compatible within the same
segment.
## Read LAN Coordinates for a node
This endpoint returns the LAN network coordinates for all nodes in a given
datacenter.
| Method | Path | Produces |
| ------ | ---------------------------- | -------------------------- |
| `GET` | `/coordinate/node/:node` | `application/json` |
The table below shows this endpoint's support for
[blocking queries](/api/index.html#blocking-queries),
[consistency modes](/api/index.html#consistency-modes), and
[required ACLs](/api/index.html#acls).
| Blocking Queries | Consistency Modes | ACL Required |
| ---------------- | ----------------- | ------------ |
| `YES` | `all` | `node:read` |
### Parameters
- `dc` `(string: "")` - Specifies the datacenter to query. This will default to
the datacenter of the agent being queried. This is specified as part of the
URL as a query parameter.
### Sample Request
```text
$ curl \
https://consul.rocks/v1/coordinate/node/agent-one
```
### Sample Response
```json
[
{
"Node": "agent-one",
"Segment": "",
"Coord": {
"Adjustment": 0,
"Error": 1.5,
"Height": 0,
"Vec": [0, 0, 0, 0, 0, 0, 0, 0]
}
}
]
```
In **Consul Enterprise**, this may include multiple coordinates for the same node,
each marked with a different `Segment`. Coordinates are only compatible within the same
segment.