Merge pull request #2137 from hashicorp/f-pq-near
Support "near" parameter in prepared query service block
This commit is contained in:
commit
e9960e6c85
|
@ -598,6 +598,15 @@ func (d *DNSServer) preparedQueryLookup(network, datacenter, query string, req,
|
|||
Token: d.agent.config.ACLToken,
|
||||
AllowStale: d.config.AllowStale,
|
||||
},
|
||||
|
||||
// Always pass the local agent through. In the DNS interface, there
|
||||
// is no provision for passing additional query parameters, so we
|
||||
// send the local agent's data through to allow distance sorting
|
||||
// relative to ourself on the server side.
|
||||
Agent: structs.QuerySource{
|
||||
Datacenter: d.agent.config.Datacenter,
|
||||
Node: d.agent.config.NodeName,
|
||||
},
|
||||
}
|
||||
|
||||
// TODO (slackpad) - What's a safe limit we can set here? It seems like
|
||||
|
|
|
@ -3166,3 +3166,37 @@ func TestDNS_InvalidQueries(t *testing.T) {
|
|||
}
|
||||
}
|
||||
}
|
||||
|
||||
func TestDNS_PreparedQuery_AgentSource(t *testing.T) {
|
||||
dir, srv := makeDNSServer(t)
|
||||
defer os.RemoveAll(dir)
|
||||
defer srv.agent.Shutdown()
|
||||
|
||||
testutil.WaitForLeader(t, srv.agent.RPC, "dc1")
|
||||
|
||||
m := MockPreparedQuery{}
|
||||
if err := srv.agent.InjectEndpoint("PreparedQuery", &m); err != nil {
|
||||
t.Fatalf("err: %v", err)
|
||||
}
|
||||
|
||||
m.executeFn = func(args *structs.PreparedQueryExecuteRequest, reply *structs.PreparedQueryExecuteResponse) error {
|
||||
// Check that the agent inserted its self-name and datacenter to
|
||||
// the RPC request body.
|
||||
if args.Agent.Datacenter != srv.agent.config.Datacenter ||
|
||||
args.Agent.Node != srv.agent.config.NodeName {
|
||||
t.Fatalf("bad: %#v", args.Agent)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
{
|
||||
m := new(dns.Msg)
|
||||
m.SetQuestion("foo.query.consul.", dns.TypeSRV)
|
||||
|
||||
c := new(dns.Client)
|
||||
addr, _ := srv.agent.config.ClientListener("", srv.agent.config.Ports.DNS)
|
||||
if _, _, err := c.Exchange(m, addr.String()); err != nil {
|
||||
t.Fatalf("err: %v", err)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -96,6 +96,10 @@ func parseLimit(req *http.Request, limit *int) error {
|
|||
func (s *HTTPServer) preparedQueryExecute(id string, resp http.ResponseWriter, req *http.Request) (interface{}, error) {
|
||||
args := structs.PreparedQueryExecuteRequest{
|
||||
QueryIDOrName: id,
|
||||
Agent: structs.QuerySource{
|
||||
Node: s.agent.config.NodeName,
|
||||
Datacenter: s.agent.config.Datacenter,
|
||||
},
|
||||
}
|
||||
s.parseSource(req, &args.Source)
|
||||
if done := s.parse(resp, req, &args.Datacenter, &args.QueryOptions); done {
|
||||
|
@ -131,6 +135,10 @@ func (s *HTTPServer) preparedQueryExecute(id string, resp http.ResponseWriter, r
|
|||
func (s *HTTPServer) preparedQueryExplain(id string, resp http.ResponseWriter, req *http.Request) (interface{}, error) {
|
||||
args := structs.PreparedQueryExecuteRequest{
|
||||
QueryIDOrName: id,
|
||||
Agent: structs.QuerySource{
|
||||
Node: s.agent.config.NodeName,
|
||||
Datacenter: s.agent.config.Datacenter,
|
||||
},
|
||||
}
|
||||
s.parseSource(req, &args.Source)
|
||||
if done := s.parse(resp, req, &args.Datacenter, &args.QueryOptions); done {
|
||||
|
|
|
@ -286,6 +286,10 @@ func TestPreparedQuery_Execute(t *testing.T) {
|
|||
Datacenter: "dc1",
|
||||
Node: "my-node",
|
||||
},
|
||||
Agent: structs.QuerySource{
|
||||
Datacenter: srv.agent.config.Datacenter,
|
||||
Node: srv.agent.config.NodeName,
|
||||
},
|
||||
QueryOptions: structs.QueryOptions{
|
||||
Token: "my-token",
|
||||
RequireConsistent: true,
|
||||
|
@ -323,6 +327,38 @@ func TestPreparedQuery_Execute(t *testing.T) {
|
|||
}
|
||||
})
|
||||
|
||||
// Ensure the proper params are set when no special args are passed
|
||||
httpTest(t, func(srv *HTTPServer) {
|
||||
m := MockPreparedQuery{}
|
||||
if err := srv.agent.InjectEndpoint("PreparedQuery", &m); err != nil {
|
||||
t.Fatalf("err: %v", err)
|
||||
}
|
||||
|
||||
m.executeFn = func(args *structs.PreparedQueryExecuteRequest, reply *structs.PreparedQueryExecuteResponse) error {
|
||||
if args.Source.Node != "" {
|
||||
t.Fatalf("expect node to be empty, got %q", args.Source.Node)
|
||||
}
|
||||
expect := structs.QuerySource{
|
||||
Datacenter: srv.agent.config.Datacenter,
|
||||
Node: srv.agent.config.NodeName,
|
||||
}
|
||||
if !reflect.DeepEqual(args.Agent, expect) {
|
||||
t.Fatalf("expect: %#v\nactual: %#v", expect, args.Agent)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
req, err := http.NewRequest("GET", "/v1/query/my-id/execute", nil)
|
||||
if err != nil {
|
||||
t.Fatalf("err: %v", err)
|
||||
}
|
||||
|
||||
resp := httptest.NewRecorder()
|
||||
if _, err := srv.PreparedQuerySpecific(resp, req); err != nil {
|
||||
t.Fatalf("err: %v", err)
|
||||
}
|
||||
})
|
||||
|
||||
httpTest(t, func(srv *HTTPServer) {
|
||||
body := bytes.NewBuffer(nil)
|
||||
req, err := http.NewRequest("GET", "/v1/query/not-there/execute", body)
|
||||
|
@ -357,6 +393,10 @@ func TestPreparedQuery_Explain(t *testing.T) {
|
|||
Datacenter: "dc1",
|
||||
Node: "my-node",
|
||||
},
|
||||
Agent: structs.QuerySource{
|
||||
Datacenter: srv.agent.config.Datacenter,
|
||||
Node: srv.agent.config.NodeName,
|
||||
},
|
||||
QueryOptions: structs.QueryOptions{
|
||||
Token: "my-token",
|
||||
RequireConsistent: true,
|
||||
|
|
|
@ -20,6 +20,7 @@ func TestWalk_ServiceQuery(t *testing.T) {
|
|||
Failover: structs.QueryDatacenterOptions{
|
||||
Datacenters: []string{"dc1", "dc2"},
|
||||
},
|
||||
Near: "_agent",
|
||||
Tags: []string{"tag1", "tag2", "tag3"},
|
||||
}
|
||||
if err := walk(service, fn); err != nil {
|
||||
|
@ -30,6 +31,7 @@ func TestWalk_ServiceQuery(t *testing.T) {
|
|||
".Service:the-service",
|
||||
".Failover.Datacenters[0]:dc1",
|
||||
".Failover.Datacenters[1]:dc2",
|
||||
".Near:_agent",
|
||||
".Tags[0]:tag1",
|
||||
".Tags[1]:tag2",
|
||||
".Tags[2]:tag3",
|
||||
|
|
|
@ -368,7 +368,25 @@ func (p *PreparedQuery) Execute(args *structs.PreparedQueryExecuteRequest,
|
|||
// Shuffle the results in case coordinates are not available if they
|
||||
// requested an RTT sort.
|
||||
reply.Nodes.Shuffle()
|
||||
if err := p.srv.sortNodesByDistanceFrom(args.Source, reply.Nodes); err != nil {
|
||||
|
||||
// Build the query source. This can be provided by the client, or by
|
||||
// the prepared query. Client-specified takes priority.
|
||||
qs := args.Source
|
||||
if qs.Datacenter == "" {
|
||||
qs.Datacenter = args.Agent.Datacenter
|
||||
}
|
||||
if query.Service.Near != "" && qs.Node == "" {
|
||||
qs.Node = query.Service.Near
|
||||
}
|
||||
|
||||
// Respect the magic "_agent" flag.
|
||||
if qs.Node == "_agent" {
|
||||
qs.Node = args.Agent.Node
|
||||
}
|
||||
|
||||
// Perform the distance sort
|
||||
err = p.srv.sortNodesByDistanceFrom(qs, reply.Nodes)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
|
|
|
@ -1607,6 +1607,197 @@ func TestPreparedQuery_Execute(t *testing.T) {
|
|||
t.Fatalf("unique shuffle ratio too low: %d/100", len(uniques))
|
||||
}
|
||||
|
||||
// Set the query to return results nearest to node3. This is the only
|
||||
// node with coordinates, and it carries the service we are asking for,
|
||||
// so node3 should always show up first.
|
||||
query.Op = structs.PreparedQueryUpdate
|
||||
query.Query.Service.Near = "node3"
|
||||
if err := msgpackrpc.CallWithCodec(codec1, "PreparedQuery.Apply", &query, &query.Query.ID); err != nil {
|
||||
t.Fatalf("err: %v", err)
|
||||
}
|
||||
|
||||
// Now run the query and make sure the sort looks right.
|
||||
{
|
||||
req := structs.PreparedQueryExecuteRequest{
|
||||
Agent: structs.QuerySource{
|
||||
Datacenter: "dc1",
|
||||
Node: "node3",
|
||||
},
|
||||
Datacenter: "dc1",
|
||||
QueryIDOrName: query.Query.ID,
|
||||
QueryOptions: structs.QueryOptions{Token: execToken},
|
||||
}
|
||||
|
||||
var reply structs.PreparedQueryExecuteResponse
|
||||
|
||||
for i := 0; i < 10; i++ {
|
||||
if err := msgpackrpc.CallWithCodec(codec1, "PreparedQuery.Execute", &req, &reply); err != nil {
|
||||
t.Fatalf("err: %v", err)
|
||||
}
|
||||
if n := len(reply.Nodes); n != 10 {
|
||||
t.Fatalf("expect 10 nodes, got: %d", n)
|
||||
}
|
||||
if node := reply.Nodes[0].Node.Node; node != "node3" {
|
||||
t.Fatalf("expect node3 first, got: %q", node)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Query again, but this time set a client-supplied query source. This
|
||||
// proves that we allow overriding the baked-in value with ?near.
|
||||
{
|
||||
// Set up the query with a non-existent node. This will cause the
|
||||
// nodes to be shuffled if the passed node is respected, proving
|
||||
// that we allow the override to happen.
|
||||
req := structs.PreparedQueryExecuteRequest{
|
||||
Source: structs.QuerySource{
|
||||
Datacenter: "dc1",
|
||||
Node: "foo",
|
||||
},
|
||||
Agent: structs.QuerySource{
|
||||
Datacenter: "dc1",
|
||||
Node: "node3",
|
||||
},
|
||||
Datacenter: "dc1",
|
||||
QueryIDOrName: query.Query.ID,
|
||||
QueryOptions: structs.QueryOptions{Token: execToken},
|
||||
}
|
||||
|
||||
var reply structs.PreparedQueryExecuteResponse
|
||||
|
||||
shuffled := false
|
||||
for i := 0; i < 10; i++ {
|
||||
if err := msgpackrpc.CallWithCodec(codec1, "PreparedQuery.Execute", &req, &reply); err != nil {
|
||||
t.Fatalf("err: %v", err)
|
||||
}
|
||||
if n := len(reply.Nodes); n != 10 {
|
||||
t.Fatalf("expect 10 nodes, got: %d", n)
|
||||
}
|
||||
if node := reply.Nodes[0].Node.Node; node != "node3" {
|
||||
shuffled = true
|
||||
break
|
||||
}
|
||||
}
|
||||
|
||||
if !shuffled {
|
||||
t.Fatalf("expect nodes to be shuffled")
|
||||
}
|
||||
}
|
||||
|
||||
// Bake the magic "_agent" flag into the query.
|
||||
query.Query.Service.Near = "_agent"
|
||||
if err := msgpackrpc.CallWithCodec(codec1, "PreparedQuery.Apply", &query, &query.Query.ID); err != nil {
|
||||
t.Fatalf("err: %v", err)
|
||||
}
|
||||
|
||||
// Check that we sort the local agent first when the magic flag is set.
|
||||
{
|
||||
req := structs.PreparedQueryExecuteRequest{
|
||||
Agent: structs.QuerySource{
|
||||
Datacenter: "dc1",
|
||||
Node: "node3",
|
||||
},
|
||||
Datacenter: "dc1",
|
||||
QueryIDOrName: query.Query.ID,
|
||||
QueryOptions: structs.QueryOptions{Token: execToken},
|
||||
}
|
||||
|
||||
var reply structs.PreparedQueryExecuteResponse
|
||||
|
||||
for i := 0; i < 10; i++ {
|
||||
if err := msgpackrpc.CallWithCodec(codec1, "PreparedQuery.Execute", &req, &reply); err != nil {
|
||||
t.Fatalf("err: %v", err)
|
||||
}
|
||||
if n := len(reply.Nodes); n != 10 {
|
||||
t.Fatalf("expect 10 nodes, got: %d", n)
|
||||
}
|
||||
if node := reply.Nodes[0].Node.Node; node != "node3" {
|
||||
t.Fatalf("expect node3 first, got: %q", node)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Check that the query isn't just sorting "node3" first because we
|
||||
// provided it in the Agent query source. Proves that we use the
|
||||
// Agent source when the magic "_agent" flag is passed.
|
||||
{
|
||||
req := structs.PreparedQueryExecuteRequest{
|
||||
Agent: structs.QuerySource{
|
||||
Datacenter: "dc1",
|
||||
Node: "foo",
|
||||
},
|
||||
Datacenter: "dc1",
|
||||
QueryIDOrName: query.Query.ID,
|
||||
QueryOptions: structs.QueryOptions{Token: execToken},
|
||||
}
|
||||
|
||||
var reply structs.PreparedQueryExecuteResponse
|
||||
|
||||
// Expect the set to be shuffled since we have no coordinates
|
||||
// on the "foo" node.
|
||||
shuffled := false
|
||||
for i := 0; i < 10; i++ {
|
||||
if err := msgpackrpc.CallWithCodec(codec1, "PreparedQuery.Execute", &req, &reply); err != nil {
|
||||
t.Fatalf("err: %v", err)
|
||||
}
|
||||
if n := len(reply.Nodes); n != 10 {
|
||||
t.Fatalf("expect 10 nodes, got: %d", n)
|
||||
}
|
||||
if node := reply.Nodes[0].Node.Node; node != "node3" {
|
||||
shuffled = true
|
||||
break
|
||||
}
|
||||
}
|
||||
|
||||
if !shuffled {
|
||||
t.Fatal("expect nodes to be shuffled")
|
||||
}
|
||||
}
|
||||
|
||||
// Shuffles if the response comes from a non-local DC. Proves that the
|
||||
// agent query source does not interfere with the order.
|
||||
{
|
||||
req := structs.PreparedQueryExecuteRequest{
|
||||
Source: structs.QuerySource{
|
||||
Datacenter: "dc2",
|
||||
Node: "node3",
|
||||
},
|
||||
Agent: structs.QuerySource{
|
||||
Datacenter: "dc1",
|
||||
Node: "node3",
|
||||
},
|
||||
Datacenter: "dc1",
|
||||
QueryIDOrName: query.Query.ID,
|
||||
QueryOptions: structs.QueryOptions{Token: execToken},
|
||||
}
|
||||
|
||||
var reply structs.PreparedQueryExecuteResponse
|
||||
|
||||
shuffled := false
|
||||
for i := 0; i < 10; i++ {
|
||||
if err := msgpackrpc.CallWithCodec(codec1, "PreparedQuery.Execute", &req, &reply); err != nil {
|
||||
t.Fatalf("err: %v", err)
|
||||
}
|
||||
if n := len(reply.Nodes); n != 10 {
|
||||
t.Fatalf("expect 10 nodes, got: %d", n)
|
||||
}
|
||||
if reply.Nodes[0].Node.Node != "node3" {
|
||||
shuffled = true
|
||||
break
|
||||
}
|
||||
}
|
||||
|
||||
if !shuffled {
|
||||
t.Fatal("expect node shuffle for remote results")
|
||||
}
|
||||
}
|
||||
|
||||
// Un-bake the near parameter.
|
||||
query.Query.Service.Near = ""
|
||||
if err := msgpackrpc.CallWithCodec(codec1, "PreparedQuery.Apply", &query, &query.Query.ID); err != nil {
|
||||
t.Fatalf("err: %v", err)
|
||||
}
|
||||
|
||||
// Update the health of a node to mark it critical.
|
||||
setHealth := func(node string, health string) {
|
||||
req := structs.RegisterRequest{
|
||||
|
@ -1683,7 +1874,6 @@ func TestPreparedQuery_Execute(t *testing.T) {
|
|||
}
|
||||
|
||||
// Make the query more picky so it excludes warning nodes.
|
||||
query.Op = structs.PreparedQueryUpdate
|
||||
query.Query.Service.OnlyPassing = true
|
||||
if err := msgpackrpc.CallWithCodec(codec1, "PreparedQuery.Apply", &query, &query.Query.ID); err != nil {
|
||||
t.Fatalf("err: %v", err)
|
||||
|
|
|
@ -34,6 +34,12 @@ type ServiceQuery struct {
|
|||
// discarded)
|
||||
OnlyPassing bool
|
||||
|
||||
// Near allows the query to always prefer the node nearest the given
|
||||
// node. If the node does not exist, results are returned in their
|
||||
// normal randomly-shuffled order. Supplying the magic "_agent" value
|
||||
// is supported to sort near the agent which initiated the request.
|
||||
Near string
|
||||
|
||||
// Tags are a set of required and/or disallowed tags. If a tag is in
|
||||
// this list it must be present. If the tag is preceded with "!" then
|
||||
// it is disallowed.
|
||||
|
@ -177,6 +183,10 @@ type PreparedQueryExecuteRequest struct {
|
|||
// network coordinates.
|
||||
Source QuerySource
|
||||
|
||||
// Agent is used to carry around a reference to the agent which initiated
|
||||
// the execute request. Used to distance-sort relative to the local node.
|
||||
Agent QuerySource
|
||||
|
||||
// QueryOptions (unfortunately named here) controls the consistency
|
||||
// settings for the query lookup itself, as well as the service lookups.
|
||||
QueryOptions
|
||||
|
|
|
@ -70,6 +70,7 @@ query, like this example:
|
|||
"Name": "my-query",
|
||||
"Session": "adf4238a-882b-9ddc-4a9d-5b6758e4159e",
|
||||
"Token": "",
|
||||
"Near": "node1",
|
||||
"Service": {
|
||||
"Service": "redis",
|
||||
"Failover": {
|
||||
|
@ -114,6 +115,16 @@ attribute which can be set on functions. This change in effect moves Consul
|
|||
from using `SECURITY DEFINER` by default to `SECURITY INVOKER` by default for
|
||||
new Prepared Queries.
|
||||
|
||||
<a name="near"></a>
|
||||
`Near` allows specifying a particular node to sort near based on distance
|
||||
sorting using [Network Coordinates](/docs/internals/coordinates.html). The
|
||||
nearest instance to the specified node will be returned first, and subsequent
|
||||
nodes in the response will be sorted in ascending order of estimated round-trip
|
||||
times. If the node given does not exist, the nodes in the response will
|
||||
be shuffled. Using the magic `_agent` value is supported, and will automatically
|
||||
return results nearest the agent servicing the request. If unspecified, the
|
||||
response will be shuffled by default.
|
||||
|
||||
The set of fields inside the `Service` structure define the query's behavior.
|
||||
|
||||
`Service` is the name of the service to query. This is required.
|
||||
|
@ -365,8 +376,9 @@ blocking queries, but it does support all consistency modes.
|
|||
Adding the optional "?near=" parameter with a node name will sort the resulting
|
||||
list in ascending order based on the estimated round trip time from that node.
|
||||
Passing "?near=_agent" will use the agent's node for the sort. If this is not
|
||||
present, then the nodes will be shuffled randomly and will be in a different
|
||||
order each time the query is executed.
|
||||
present, the default behavior will shuffle the nodes randomly each time the
|
||||
query is executed. Passing this option will override the built-in
|
||||
<a href="#near">near parameter</a> of a prepared query, if present.
|
||||
|
||||
An optional "?limit=" parameter can be used to limit the size of the list to
|
||||
the given number of nodes. This is applied after any sorting or shuffling.
|
||||
|
|
|
@ -14,6 +14,21 @@ details provided for their upgrades as a result of new features or changed
|
|||
behavior. This page is used to document those details separately from the
|
||||
standard upgrade flow.
|
||||
|
||||
## Consul 0.7
|
||||
|
||||
Consul version 0.7 adds a feature which allows prepared queries to store a
|
||||
["Near" parameter](/docs/agent/http/query.html#near) in the query definition
|
||||
itself. This feature enables using the distance sorting features of prepared
|
||||
queries without explicitly providing the node to sort near in requests, but
|
||||
requires the agent servicing a request to send additional information about
|
||||
itself to the Consul servers when executing the prepared query. Agents prior
|
||||
to 0.7.0 do not send this information, which means they are unable to properly
|
||||
execute prepared queries configured with a `Near` parameter. Similarly, any
|
||||
server nodes prior to version 0.7.0 are unable to store the `Near` parameter,
|
||||
making them unable to properly serve requests for prepared queries using the
|
||||
feature. It is recommended that all agents be running version 0.7.0 prior to
|
||||
using this feature.
|
||||
|
||||
## Consul 0.6.4
|
||||
|
||||
Consul 0.6.4 made some substantial changes to how ACLs work with prepared
|
||||
|
|
Loading…
Reference in a new issue