consul: send agent source data as separate query source
This commit is contained in:
parent
ec8ade1800
commit
ebacaa2d67
|
@ -599,11 +599,11 @@ func (d *DNSServer) preparedQueryLookup(network, datacenter, query string, req,
|
||||||
AllowStale: d.config.AllowStale,
|
AllowStale: d.config.AllowStale,
|
||||||
},
|
},
|
||||||
|
|
||||||
// Always pass the local agent through as the source. In the DNS
|
// Always pass the local agent through. In the DNS interface, there
|
||||||
// interface, there is no provision for passing additional query
|
// is no provision for passing additional query parameters, so we
|
||||||
// parameters, so we send the local agent's data through to allow
|
// send the local agent's data through to allow distance sorting
|
||||||
// distance sorting relative to ourself on the server side.
|
// relative to ourself on the server side.
|
||||||
Source: structs.QuerySource{
|
Agent: structs.QuerySource{
|
||||||
Datacenter: d.agent.config.Datacenter,
|
Datacenter: d.agent.config.Datacenter,
|
||||||
Node: d.agent.config.NodeName,
|
Node: d.agent.config.NodeName,
|
||||||
},
|
},
|
||||||
|
|
|
@ -531,18 +531,7 @@ func (s *HTTPServer) parseToken(req *http.Request, token *string) {
|
||||||
// DC in the request, if given, or else the agent's DC.
|
// DC in the request, if given, or else the agent's DC.
|
||||||
func (s *HTTPServer) parseSource(req *http.Request, source *structs.QuerySource) {
|
func (s *HTTPServer) parseSource(req *http.Request, source *structs.QuerySource) {
|
||||||
s.parseDC(req, &source.Datacenter)
|
s.parseDC(req, &source.Datacenter)
|
||||||
|
source.Node = req.URL.Query().Get("near")
|
||||||
// Always start with the local node as the source.
|
|
||||||
source.Node = s.agent.config.NodeName
|
|
||||||
|
|
||||||
// If ?near was provided, take the value send it along. We also mark the
|
|
||||||
// fact that an override was provided with the NearRequested bool.
|
|
||||||
if node := req.URL.Query().Get("near"); node != "" {
|
|
||||||
source.NearRequested = true
|
|
||||||
if node != "_agent" {
|
|
||||||
source.Node = node
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// parse is a convenience method for endpoints that need
|
// parse is a convenience method for endpoints that need
|
||||||
|
|
|
@ -345,9 +345,8 @@ func TestParseSource(t *testing.T) {
|
||||||
defer srv.Shutdown()
|
defer srv.Shutdown()
|
||||||
defer srv.agent.Shutdown()
|
defer srv.agent.Shutdown()
|
||||||
|
|
||||||
// Default is agent's DC and the local node, with the near flag false
|
// Default is agent's DC and no node (since the user didn't care,
|
||||||
// (since the user didn't care, then just give them the cheapest possible
|
// then just give them the cheapest possible query).
|
||||||
// query).
|
|
||||||
req, err := http.NewRequest("GET",
|
req, err := http.NewRequest("GET",
|
||||||
"/v1/catalog/nodes", nil)
|
"/v1/catalog/nodes", nil)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
@ -355,7 +354,7 @@ func TestParseSource(t *testing.T) {
|
||||||
}
|
}
|
||||||
source := structs.QuerySource{}
|
source := structs.QuerySource{}
|
||||||
srv.parseSource(req, &source)
|
srv.parseSource(req, &source)
|
||||||
if source.Datacenter != "dc1" || source.Node != srv.agent.config.NodeName {
|
if source.Datacenter != "dc1" || source.Node != "" {
|
||||||
t.Fatalf("bad: %v", source)
|
t.Fatalf("bad: %v", source)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -367,7 +366,7 @@ func TestParseSource(t *testing.T) {
|
||||||
}
|
}
|
||||||
source = structs.QuerySource{}
|
source = structs.QuerySource{}
|
||||||
srv.parseSource(req, &source)
|
srv.parseSource(req, &source)
|
||||||
if source.Datacenter != "dc1" || source.Node != "bob" || !source.NearRequested {
|
if source.Datacenter != "dc1" || source.Node != "bob" {
|
||||||
t.Fatalf("bad: %v", source)
|
t.Fatalf("bad: %v", source)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -96,6 +96,10 @@ func parseLimit(req *http.Request, limit *int) error {
|
||||||
func (s *HTTPServer) preparedQueryExecute(id string, resp http.ResponseWriter, req *http.Request) (interface{}, error) {
|
func (s *HTTPServer) preparedQueryExecute(id string, resp http.ResponseWriter, req *http.Request) (interface{}, error) {
|
||||||
args := structs.PreparedQueryExecuteRequest{
|
args := structs.PreparedQueryExecuteRequest{
|
||||||
QueryIDOrName: id,
|
QueryIDOrName: id,
|
||||||
|
Agent: structs.QuerySource{
|
||||||
|
Node: s.agent.config.NodeName,
|
||||||
|
Datacenter: s.agent.config.Datacenter,
|
||||||
|
},
|
||||||
}
|
}
|
||||||
s.parseSource(req, &args.Source)
|
s.parseSource(req, &args.Source)
|
||||||
if done := s.parse(resp, req, &args.Datacenter, &args.QueryOptions); done {
|
if done := s.parse(resp, req, &args.Datacenter, &args.QueryOptions); done {
|
||||||
|
@ -131,6 +135,10 @@ func (s *HTTPServer) preparedQueryExecute(id string, resp http.ResponseWriter, r
|
||||||
func (s *HTTPServer) preparedQueryExplain(id string, resp http.ResponseWriter, req *http.Request) (interface{}, error) {
|
func (s *HTTPServer) preparedQueryExplain(id string, resp http.ResponseWriter, req *http.Request) (interface{}, error) {
|
||||||
args := structs.PreparedQueryExecuteRequest{
|
args := structs.PreparedQueryExecuteRequest{
|
||||||
QueryIDOrName: id,
|
QueryIDOrName: id,
|
||||||
|
Agent: structs.QuerySource{
|
||||||
|
Node: s.agent.config.NodeName,
|
||||||
|
Datacenter: s.agent.config.Datacenter,
|
||||||
|
},
|
||||||
}
|
}
|
||||||
s.parseSource(req, &args.Source)
|
s.parseSource(req, &args.Source)
|
||||||
if done := s.parse(resp, req, &args.Datacenter, &args.QueryOptions); done {
|
if done := s.parse(resp, req, &args.Datacenter, &args.QueryOptions); done {
|
||||||
|
|
|
@ -285,7 +285,10 @@ func TestPreparedQuery_Execute(t *testing.T) {
|
||||||
Source: structs.QuerySource{
|
Source: structs.QuerySource{
|
||||||
Datacenter: "dc1",
|
Datacenter: "dc1",
|
||||||
Node: "my-node",
|
Node: "my-node",
|
||||||
NearRequested: true,
|
},
|
||||||
|
Agent: structs.QuerySource{
|
||||||
|
Datacenter: srv.agent.config.Datacenter,
|
||||||
|
Node: srv.agent.config.NodeName,
|
||||||
},
|
},
|
||||||
QueryOptions: structs.QueryOptions{
|
QueryOptions: structs.QueryOptions{
|
||||||
Token: "my-token",
|
Token: "my-token",
|
||||||
|
@ -332,11 +335,15 @@ func TestPreparedQuery_Execute(t *testing.T) {
|
||||||
}
|
}
|
||||||
|
|
||||||
m.executeFn = func(args *structs.PreparedQueryExecuteRequest, reply *structs.PreparedQueryExecuteResponse) error {
|
m.executeFn = func(args *structs.PreparedQueryExecuteRequest, reply *structs.PreparedQueryExecuteResponse) error {
|
||||||
if args.Source.NearRequested {
|
if args.Source.Node != "" {
|
||||||
t.Fatal("expect NearRequested to be false")
|
t.Fatalf("expect node to be empty, got %q", args.Source.Node)
|
||||||
}
|
}
|
||||||
if args.Source.Node == "" {
|
expect := structs.QuerySource{
|
||||||
t.Fatalf("expect Source to be %q, got: %q", srv.agent.config.NodeName, args.Source.Node)
|
Datacenter: srv.agent.config.Datacenter,
|
||||||
|
Node: srv.agent.config.NodeName,
|
||||||
|
}
|
||||||
|
if !reflect.DeepEqual(args.Agent, expect) {
|
||||||
|
t.Fatalf("expect: %#v\nactual: %#v", expect, args.Agent)
|
||||||
}
|
}
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
@ -385,7 +392,10 @@ func TestPreparedQuery_Explain(t *testing.T) {
|
||||||
Source: structs.QuerySource{
|
Source: structs.QuerySource{
|
||||||
Datacenter: "dc1",
|
Datacenter: "dc1",
|
||||||
Node: "my-node",
|
Node: "my-node",
|
||||||
NearRequested: true,
|
},
|
||||||
|
Agent: structs.QuerySource{
|
||||||
|
Datacenter: srv.agent.config.Datacenter,
|
||||||
|
Node: srv.agent.config.NodeName,
|
||||||
},
|
},
|
||||||
QueryOptions: structs.QueryOptions{
|
QueryOptions: structs.QueryOptions{
|
||||||
Token: "my-token",
|
Token: "my-token",
|
||||||
|
|
|
@ -369,23 +369,26 @@ func (p *PreparedQuery) Execute(args *structs.PreparedQueryExecuteRequest,
|
||||||
// requested an RTT sort.
|
// requested an RTT sort.
|
||||||
reply.Nodes.Shuffle()
|
reply.Nodes.Shuffle()
|
||||||
|
|
||||||
// Check if the query carries a Near parameter, or if the requestor
|
// Build the query source. This can be provided by the client, or by
|
||||||
// supplied a ?near parameter in the request. We can apply distance
|
// the prepared query. Client-specified takes priority.
|
||||||
// sorting if either of these cases are true, but we don't want to
|
qs := args.Source
|
||||||
// affect the established round-robin default.
|
if qs.Datacenter == "" {
|
||||||
if args.Source.NearRequested || query.Service.Near != "" {
|
qs.Datacenter = args.Agent.Datacenter
|
||||||
// Apply the "near" parameter if it exists on the prepared query and
|
}
|
||||||
// was not provided in the request args.
|
if query.Service.Near != "" && qs.Node == "" {
|
||||||
if !args.Source.NearRequested && query.Service.Near != "_agent" {
|
qs.Node = query.Service.Near
|
||||||
args.Source.Node = query.Service.Near
|
}
|
||||||
|
|
||||||
|
// Respect the magic "_agent" flag.
|
||||||
|
if qs.Node == "_agent" {
|
||||||
|
qs.Node = args.Agent.Node
|
||||||
}
|
}
|
||||||
|
|
||||||
// Perform the distance sort
|
// Perform the distance sort
|
||||||
err := p.srv.sortNodesByDistanceFrom(args.Source, reply.Nodes)
|
err = p.srv.sortNodesByDistanceFrom(qs, reply.Nodes)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
}
|
|
||||||
|
|
||||||
// Apply the limit if given.
|
// Apply the limit if given.
|
||||||
if args.Limit > 0 && len(reply.Nodes) > args.Limit {
|
if args.Limit > 0 && len(reply.Nodes) > args.Limit {
|
||||||
|
|
|
@ -1550,7 +1550,6 @@ func TestPreparedQuery_Execute(t *testing.T) {
|
||||||
Source: structs.QuerySource{
|
Source: structs.QuerySource{
|
||||||
Datacenter: "dc1",
|
Datacenter: "dc1",
|
||||||
Node: "node3",
|
Node: "node3",
|
||||||
NearRequested: true,
|
|
||||||
},
|
},
|
||||||
QueryOptions: structs.QueryOptions{Token: execToken},
|
QueryOptions: structs.QueryOptions{Token: execToken},
|
||||||
}
|
}
|
||||||
|
@ -1617,13 +1616,12 @@ func TestPreparedQuery_Execute(t *testing.T) {
|
||||||
t.Fatalf("err: %v", err)
|
t.Fatalf("err: %v", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
// Now run the query and make sure the baked-in service is returned.
|
// Now run the query and make sure the sort looks right.
|
||||||
{
|
{
|
||||||
req := structs.PreparedQueryExecuteRequest{
|
req := structs.PreparedQueryExecuteRequest{
|
||||||
Source: structs.QuerySource{
|
Agent: structs.QuerySource{
|
||||||
Datacenter: "dc1",
|
Datacenter: "dc1",
|
||||||
Node: "foo",
|
Node: "node3",
|
||||||
NearRequested: false,
|
|
||||||
},
|
},
|
||||||
Datacenter: "dc1",
|
Datacenter: "dc1",
|
||||||
QueryIDOrName: query.Query.ID,
|
QueryIDOrName: query.Query.ID,
|
||||||
|
@ -1645,8 +1643,8 @@ func TestPreparedQuery_Execute(t *testing.T) {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// Query again, but this time set NearRequested to "true". This should
|
// Query again, but this time set a client-supplied query source. This
|
||||||
// prove that we allow overriding the baked-in value with ?near.
|
// proves that we allow overriding the baked-in value with ?near.
|
||||||
{
|
{
|
||||||
// Set up the query with a non-existent node. This will cause the
|
// Set up the query with a non-existent node. This will cause the
|
||||||
// nodes to be shuffled if the passed node is respected, proving
|
// nodes to be shuffled if the passed node is respected, proving
|
||||||
|
@ -1655,7 +1653,10 @@ func TestPreparedQuery_Execute(t *testing.T) {
|
||||||
Source: structs.QuerySource{
|
Source: structs.QuerySource{
|
||||||
Datacenter: "dc1",
|
Datacenter: "dc1",
|
||||||
Node: "foo",
|
Node: "foo",
|
||||||
NearRequested: true,
|
},
|
||||||
|
Agent: structs.QuerySource{
|
||||||
|
Datacenter: "dc1",
|
||||||
|
Node: "node3",
|
||||||
},
|
},
|
||||||
Datacenter: "dc1",
|
Datacenter: "dc1",
|
||||||
QueryIDOrName: query.Query.ID,
|
QueryIDOrName: query.Query.ID,
|
||||||
|
@ -1683,54 +1684,16 @@ func TestPreparedQuery_Execute(t *testing.T) {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// Check that if NearRequested is passed as true, that we sort based
|
// Bake the magic "_agent" flag into the query.
|
||||||
// on the given node and do not use the one stored in the PQ.
|
|
||||||
{
|
|
||||||
req := structs.PreparedQueryExecuteRequest{
|
|
||||||
Source: structs.QuerySource{
|
|
||||||
Datacenter: "dc1",
|
|
||||||
Node: "node1",
|
|
||||||
NearRequested: true,
|
|
||||||
},
|
|
||||||
Datacenter: "dc1",
|
|
||||||
QueryIDOrName: query.Query.ID,
|
|
||||||
QueryOptions: structs.QueryOptions{Token: execToken},
|
|
||||||
}
|
|
||||||
|
|
||||||
var reply structs.PreparedQueryExecuteResponse
|
|
||||||
|
|
||||||
// We just want to check that we get a non-local node, because
|
|
||||||
// the local node is the only one with working coordinates.
|
|
||||||
shuffled := false
|
|
||||||
for i := 0; i < 10; i++ {
|
|
||||||
if err := msgpackrpc.CallWithCodec(codec1, "PreparedQuery.Execute", &req, &reply); err != nil {
|
|
||||||
t.Fatalf("err: %v", err)
|
|
||||||
}
|
|
||||||
if n := len(reply.Nodes); n != 10 {
|
|
||||||
t.Fatalf("expect 10 nodes, got: %d", n)
|
|
||||||
}
|
|
||||||
if node := reply.Nodes[0].Node.Node; node != "node3" {
|
|
||||||
shuffled = true
|
|
||||||
break
|
|
||||||
}
|
|
||||||
}
|
|
||||||
if !shuffled {
|
|
||||||
t.Fatal("expect non-local results")
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// Set the query to prefer a colocated service using the magic _agent token
|
|
||||||
query.Query.Service.Near = "_agent"
|
query.Query.Service.Near = "_agent"
|
||||||
if err := msgpackrpc.CallWithCodec(codec1, "PreparedQuery.Apply", &query, &query.Query.ID); err != nil {
|
if err := msgpackrpc.CallWithCodec(codec1, "PreparedQuery.Apply", &query, &query.Query.ID); err != nil {
|
||||||
t.Fatalf("err: %v", err)
|
t.Fatalf("err: %v", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
// Check that the node returned is the one we asked for in the
|
// Check that we sort the local agent first when the magic flag is set.
|
||||||
// query source. This proves that if the PQ has "_agent" baked
|
|
||||||
// in, we always use the passed-in node.
|
|
||||||
{
|
{
|
||||||
req := structs.PreparedQueryExecuteRequest{
|
req := structs.PreparedQueryExecuteRequest{
|
||||||
Source: structs.QuerySource{
|
Agent: structs.QuerySource{
|
||||||
Datacenter: "dc1",
|
Datacenter: "dc1",
|
||||||
Node: "node3",
|
Node: "node3",
|
||||||
},
|
},
|
||||||
|
@ -1754,14 +1717,55 @@ func TestPreparedQuery_Execute(t *testing.T) {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Check that the query isn't just sorting "node3" first because we
|
||||||
|
// provided it in the Agent query source. Proves that we use the
|
||||||
|
// Agent source when the magic "_agent" flag is passed.
|
||||||
|
{
|
||||||
|
req := structs.PreparedQueryExecuteRequest{
|
||||||
|
Agent: structs.QuerySource{
|
||||||
|
Datacenter: "dc1",
|
||||||
|
Node: "foo",
|
||||||
|
},
|
||||||
|
Datacenter: "dc1",
|
||||||
|
QueryIDOrName: query.Query.ID,
|
||||||
|
QueryOptions: structs.QueryOptions{Token: execToken},
|
||||||
|
}
|
||||||
|
|
||||||
|
var reply structs.PreparedQueryExecuteResponse
|
||||||
|
|
||||||
|
// Expect the set to be shuffled since we have no coordinates
|
||||||
|
// on the "foo" node.
|
||||||
|
shuffled := false
|
||||||
|
for i := 0; i < 10; i++ {
|
||||||
|
if err := msgpackrpc.CallWithCodec(codec1, "PreparedQuery.Execute", &req, &reply); err != nil {
|
||||||
|
t.Fatalf("err: %v", err)
|
||||||
|
}
|
||||||
|
if n := len(reply.Nodes); n != 10 {
|
||||||
|
t.Fatalf("expect 10 nodes, got: %d", n)
|
||||||
|
}
|
||||||
|
if node := reply.Nodes[0].Node.Node; node != "node3" {
|
||||||
|
shuffled = true
|
||||||
|
break
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
if !shuffled {
|
||||||
|
t.Fatal("expect nodes to be shuffled")
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
// Shuffles if the response comes from a non-local DC. Proves that the
|
// Shuffles if the response comes from a non-local DC. Proves that the
|
||||||
// near parameter does not affect this order.
|
// agent query source does not interfere with the order.
|
||||||
{
|
{
|
||||||
req := structs.PreparedQueryExecuteRequest{
|
req := structs.PreparedQueryExecuteRequest{
|
||||||
Source: structs.QuerySource{
|
Source: structs.QuerySource{
|
||||||
Datacenter: "dc2",
|
Datacenter: "dc2",
|
||||||
Node: "node3",
|
Node: "node3",
|
||||||
},
|
},
|
||||||
|
Agent: structs.QuerySource{
|
||||||
|
Datacenter: "dc1",
|
||||||
|
Node: "node3",
|
||||||
|
},
|
||||||
Datacenter: "dc1",
|
Datacenter: "dc1",
|
||||||
QueryIDOrName: query.Query.ID,
|
QueryIDOrName: query.Query.ID,
|
||||||
QueryOptions: structs.QueryOptions{Token: execToken},
|
QueryOptions: structs.QueryOptions{Token: execToken},
|
||||||
|
|
|
@ -182,6 +182,10 @@ type PreparedQueryExecuteRequest struct {
|
||||||
// network coordinates.
|
// network coordinates.
|
||||||
Source QuerySource
|
Source QuerySource
|
||||||
|
|
||||||
|
// Agent is used to carry around a reference to the agent which initiated
|
||||||
|
// the execute request. Used to distance-sort relative to the local node.
|
||||||
|
Agent QuerySource
|
||||||
|
|
||||||
// QueryOptions (unfortunately named here) controls the consistency
|
// QueryOptions (unfortunately named here) controls the consistency
|
||||||
// settings for the query lookup itself, as well as the service lookups.
|
// settings for the query lookup itself, as well as the service lookups.
|
||||||
QueryOptions
|
QueryOptions
|
||||||
|
|
|
@ -196,11 +196,6 @@ func (r *DeregisterRequest) RequestDatacenter() string {
|
||||||
type QuerySource struct {
|
type QuerySource struct {
|
||||||
Datacenter string
|
Datacenter string
|
||||||
Node string
|
Node string
|
||||||
|
|
||||||
// NearRequested indicates where the values in this QuerySource came
|
|
||||||
// from. When true, the values were provided by the requestor,
|
|
||||||
// otherwise they were filled by the agent servicing the request.
|
|
||||||
NearRequested bool
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// DCSpecificRequest is used to query about a specific DC
|
// DCSpecificRequest is used to query about a specific DC
|
||||||
|
|
Loading…
Reference in a new issue