consul: use source parameter for near prepared queries
This commit is contained in:
parent
aae9c2eb30
commit
782a081925
|
@ -369,40 +369,21 @@ func (p *PreparedQuery) Execute(args *structs.PreparedQueryExecuteRequest,
|
|||
// requested an RTT sort.
|
||||
reply.Nodes.Shuffle()
|
||||
|
||||
// Get the source to sort by. This can be passed in by the requestor, or
|
||||
// pre-defined using the Near parameter in the prepared query. If the
|
||||
// near parameter was defined, that will be preferred.
|
||||
sortFrom := args.Source
|
||||
if query.Service.Near != "" {
|
||||
sortFrom = structs.QuerySource{
|
||||
Datacenter: args.Datacenter,
|
||||
Node: query.Service.Near,
|
||||
// Check if the query carries a Near parameter, or if the requestor
|
||||
// supplied a ?near parameter in the request. We can apply distance
|
||||
// sorting if either of these cases are true, but we don't want to
|
||||
// affect the established round-robin default.
|
||||
if args.Source.NearRequested || query.Service.Near != "" {
|
||||
// Apply the "near" parameter if it exists on the prepared query and
|
||||
// was not provided in the request args.
|
||||
if !args.Source.NearRequested && query.Service.Near != "_agent" {
|
||||
args.Source.Node = query.Service.Near
|
||||
}
|
||||
}
|
||||
|
||||
// Respect the magic "_agent" flag.
|
||||
preferLocal := false
|
||||
if sortFrom.Node == "_agent" {
|
||||
preferLocal = true
|
||||
sortFrom = args.Origin
|
||||
}
|
||||
|
||||
// Perform the distance sort
|
||||
if err := p.srv.sortNodesByDistanceFrom(sortFrom, reply.Nodes); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
// Nodes cannot be any "closer" than localhost, so this special case ensures
|
||||
// the local node is returned first if it is present in the result. This
|
||||
// allows the local agent to be preferred even when network coordinates are
|
||||
// not enabled. Only works if the results come from the request origin DC.
|
||||
if preferLocal && reply.Datacenter == args.Origin.Datacenter {
|
||||
for i, node := range reply.Nodes {
|
||||
if node.Node.Node == args.Origin.Node {
|
||||
remote := append(reply.Nodes[:i], reply.Nodes[i+1:]...)
|
||||
reply.Nodes = append([]structs.CheckServiceNode{node}, remote...)
|
||||
break
|
||||
}
|
||||
// Perform the distance sort
|
||||
err := p.srv.sortNodesByDistanceFrom(args.Source, reply.Nodes)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -1548,8 +1548,9 @@ func TestPreparedQuery_Execute(t *testing.T) {
|
|||
Datacenter: "dc1",
|
||||
QueryIDOrName: query.Query.ID,
|
||||
Source: structs.QuerySource{
|
||||
Datacenter: "dc1",
|
||||
Node: "node3",
|
||||
Datacenter: "dc1",
|
||||
Node: "node3",
|
||||
NearRequested: true,
|
||||
},
|
||||
QueryOptions: structs.QueryOptions{Token: execToken},
|
||||
}
|
||||
|
@ -1607,110 +1608,22 @@ func TestPreparedQuery_Execute(t *testing.T) {
|
|||
t.Fatalf("unique shuffle ratio too low: %d/100", len(uniques))
|
||||
}
|
||||
|
||||
// Set the query to prefer a colocated service using the magic _agent token
|
||||
// Set the query to return results nearest to node3. This is the only
|
||||
// node with coordinates, and it carries the service we are asking for,
|
||||
// so node3 should always show up first.
|
||||
query.Op = structs.PreparedQueryUpdate
|
||||
query.Query.Service.Near = "_agent"
|
||||
if err := msgpackrpc.CallWithCodec(codec1, "PreparedQuery.Apply", &query, &query.Query.ID); err != nil {
|
||||
t.Fatalf("err: %v", err)
|
||||
}
|
||||
|
||||
// Now try querying and make sure the local node is preferred.
|
||||
{
|
||||
req := structs.PreparedQueryExecuteRequest{
|
||||
Origin: structs.QuerySource{
|
||||
Datacenter: "dc1",
|
||||
Node: "node1",
|
||||
},
|
||||
Datacenter: "dc1",
|
||||
QueryIDOrName: query.Query.ID,
|
||||
QueryOptions: structs.QueryOptions{Token: execToken},
|
||||
}
|
||||
|
||||
var reply structs.PreparedQueryExecuteResponse
|
||||
|
||||
// Repeat this a few times to make sure we don't just get lucky.
|
||||
for i := 0; i < 10; i++ {
|
||||
if err := msgpackrpc.CallWithCodec(codec1, "PreparedQuery.Execute", &req, &reply); err != nil {
|
||||
t.Fatalf("err: %v", err)
|
||||
}
|
||||
if n := len(reply.Nodes); n != 10 {
|
||||
t.Fatalf("expect 10 nodes, got: %d", n)
|
||||
}
|
||||
if node := reply.Nodes[0].Node.Node; node != "node1" {
|
||||
t.Fatalf("expect node1 first, got: %q", node)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Falls back to remote nodes if service is not available locally
|
||||
{
|
||||
req := structs.PreparedQueryExecuteRequest{
|
||||
Origin: structs.QuerySource{
|
||||
Datacenter: "dc1",
|
||||
Node: "node1",
|
||||
},
|
||||
Datacenter: "dc1",
|
||||
QueryIDOrName: query.Query.ID,
|
||||
QueryOptions: structs.QueryOptions{Token: execToken},
|
||||
}
|
||||
|
||||
var reply structs.PreparedQueryExecuteResponse
|
||||
|
||||
if err := msgpackrpc.CallWithCodec(codec1, "PreparedQuery.Execute", &req, &reply); err != nil {
|
||||
t.Fatalf("err: %v", err)
|
||||
}
|
||||
if n := len(reply.Nodes); n != 10 {
|
||||
t.Fatalf("expect 10 nodes, got: %d", n)
|
||||
}
|
||||
}
|
||||
|
||||
// Shuffles if the response comes from a non-local DC. We may
|
||||
// need to try multiple times if at first we get a match by chance.
|
||||
{
|
||||
req := structs.PreparedQueryExecuteRequest{
|
||||
Origin: structs.QuerySource{
|
||||
Datacenter: "dc2",
|
||||
Node: "node1",
|
||||
},
|
||||
Datacenter: "dc1",
|
||||
QueryIDOrName: query.Query.ID,
|
||||
QueryOptions: structs.QueryOptions{Token: execToken},
|
||||
}
|
||||
|
||||
var reply structs.PreparedQueryExecuteResponse
|
||||
|
||||
shuffled := false
|
||||
for i := 0; i < 10; i++ {
|
||||
if err := msgpackrpc.CallWithCodec(codec1, "PreparedQuery.Execute", &req, &reply); err != nil {
|
||||
t.Fatalf("err: %v", err)
|
||||
}
|
||||
if n := len(reply.Nodes); n != 10 {
|
||||
t.Fatalf("expect 10 nodes, got: %d", n)
|
||||
}
|
||||
if reply.Nodes[0].Node.Node != "node1" {
|
||||
shuffled = true
|
||||
break
|
||||
}
|
||||
}
|
||||
|
||||
if !shuffled {
|
||||
t.Fatal("expect node shuffle for remote results")
|
||||
}
|
||||
}
|
||||
|
||||
// Bake a non-local node name into Near parameter of the query. This
|
||||
// node was seeded with a coordinate above so distance sort works.
|
||||
query.Query.Service.Near = "node3"
|
||||
if err := msgpackrpc.CallWithCodec(codec1, "PreparedQuery.Apply", &query, &query.Query.ID); err != nil {
|
||||
t.Fatalf("err: %v", err)
|
||||
}
|
||||
|
||||
// Try the distance sort again to ensure the nearest node is returned
|
||||
// Now run the query and make sure the baked-in service is returned.
|
||||
{
|
||||
req := structs.PreparedQueryExecuteRequest{
|
||||
Origin: structs.QuerySource{
|
||||
Datacenter: "dc1",
|
||||
Node: "node1",
|
||||
Source: structs.QuerySource{
|
||||
Datacenter: "dc1",
|
||||
Node: "foo",
|
||||
NearRequested: false,
|
||||
},
|
||||
Datacenter: "dc1",
|
||||
QueryIDOrName: query.Query.ID,
|
||||
|
@ -1727,15 +1640,158 @@ func TestPreparedQuery_Execute(t *testing.T) {
|
|||
t.Fatalf("expect 10 nodes, got: %d", n)
|
||||
}
|
||||
if node := reply.Nodes[0].Node.Node; node != "node3" {
|
||||
t.Fatalf("expect node3, got: %q", node)
|
||||
t.Fatalf("expect node3 first, got: %q", node)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Un-bake the Near parameter.
|
||||
// Query again, but this time set NearRequested to "true". This should
|
||||
// prove that we allow overriding the baked-in value with ?near.
|
||||
{
|
||||
// Set up the query with a non-existent node. This will cause the
|
||||
// nodes to be shuffled if the passed node is respected, proving
|
||||
// that we allow the override to happen.
|
||||
req := structs.PreparedQueryExecuteRequest{
|
||||
Source: structs.QuerySource{
|
||||
Datacenter: "dc1",
|
||||
Node: "foo",
|
||||
NearRequested: true,
|
||||
},
|
||||
Datacenter: "dc1",
|
||||
QueryIDOrName: query.Query.ID,
|
||||
QueryOptions: structs.QueryOptions{Token: execToken},
|
||||
}
|
||||
|
||||
var reply structs.PreparedQueryExecuteResponse
|
||||
|
||||
shuffled := false
|
||||
for i := 0; i < 10; i++ {
|
||||
if err := msgpackrpc.CallWithCodec(codec1, "PreparedQuery.Execute", &req, &reply); err != nil {
|
||||
t.Fatalf("err: %v", err)
|
||||
}
|
||||
if n := len(reply.Nodes); n != 10 {
|
||||
t.Fatalf("expect 10 nodes, got: %d", n)
|
||||
}
|
||||
if node := reply.Nodes[0].Node.Node; node != "node3" {
|
||||
shuffled = true
|
||||
break
|
||||
}
|
||||
}
|
||||
|
||||
if !shuffled {
|
||||
t.Fatalf("expect nodes to be shuffled")
|
||||
}
|
||||
}
|
||||
|
||||
// Check that if NearRequested is passed as true, that we sort based
|
||||
// on the given node and do not use the one stored in the PQ.
|
||||
{
|
||||
req := structs.PreparedQueryExecuteRequest{
|
||||
Source: structs.QuerySource{
|
||||
Datacenter: "dc1",
|
||||
Node: "node1",
|
||||
NearRequested: true,
|
||||
},
|
||||
Datacenter: "dc1",
|
||||
QueryIDOrName: query.Query.ID,
|
||||
QueryOptions: structs.QueryOptions{Token: execToken},
|
||||
}
|
||||
|
||||
var reply structs.PreparedQueryExecuteResponse
|
||||
|
||||
// We just want to check that we get a non-local node, because
|
||||
// the local node is the only one with working coordinates.
|
||||
shuffled := false
|
||||
for i := 0; i < 10; i++ {
|
||||
if err := msgpackrpc.CallWithCodec(codec1, "PreparedQuery.Execute", &req, &reply); err != nil {
|
||||
t.Fatalf("err: %v", err)
|
||||
}
|
||||
if n := len(reply.Nodes); n != 10 {
|
||||
t.Fatalf("expect 10 nodes, got: %d", n)
|
||||
}
|
||||
if node := reply.Nodes[0].Node.Node; node != "node3" {
|
||||
shuffled = true
|
||||
break
|
||||
}
|
||||
}
|
||||
if !shuffled {
|
||||
t.Fatal("expect non-local results")
|
||||
}
|
||||
}
|
||||
|
||||
// Set the query to prefer a colocated service using the magic _agent token
|
||||
query.Query.Service.Near = "_agent"
|
||||
if err := msgpackrpc.CallWithCodec(codec1, "PreparedQuery.Apply", &query, &query.Query.ID); err != nil {
|
||||
t.Fatalf("err: %v", err)
|
||||
}
|
||||
|
||||
// Check that the node returned is the one we asked for in the
|
||||
// query source. This proves that if the PQ has "_agent" baked
|
||||
// in, we always use the passed-in node.
|
||||
{
|
||||
req := structs.PreparedQueryExecuteRequest{
|
||||
Source: structs.QuerySource{
|
||||
Datacenter: "dc1",
|
||||
Node: "node3",
|
||||
},
|
||||
Datacenter: "dc1",
|
||||
QueryIDOrName: query.Query.ID,
|
||||
QueryOptions: structs.QueryOptions{Token: execToken},
|
||||
}
|
||||
|
||||
var reply structs.PreparedQueryExecuteResponse
|
||||
|
||||
for i := 0; i < 10; i++ {
|
||||
if err := msgpackrpc.CallWithCodec(codec1, "PreparedQuery.Execute", &req, &reply); err != nil {
|
||||
t.Fatalf("err: %v", err)
|
||||
}
|
||||
if n := len(reply.Nodes); n != 10 {
|
||||
t.Fatalf("expect 10 nodes, got: %d", n)
|
||||
}
|
||||
if node := reply.Nodes[0].Node.Node; node != "node3" {
|
||||
t.Fatalf("expect node3 first, got: %q", node)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Shuffles if the response comes from a non-local DC. Proves that the
|
||||
// near parameter does not affect this order.
|
||||
{
|
||||
req := structs.PreparedQueryExecuteRequest{
|
||||
Source: structs.QuerySource{
|
||||
Datacenter: "dc2",
|
||||
Node: "node3",
|
||||
},
|
||||
Datacenter: "dc1",
|
||||
QueryIDOrName: query.Query.ID,
|
||||
QueryOptions: structs.QueryOptions{Token: execToken},
|
||||
}
|
||||
|
||||
var reply structs.PreparedQueryExecuteResponse
|
||||
|
||||
shuffled := false
|
||||
for i := 0; i < 10; i++ {
|
||||
if err := msgpackrpc.CallWithCodec(codec1, "PreparedQuery.Execute", &req, &reply); err != nil {
|
||||
t.Fatalf("err: %v", err)
|
||||
}
|
||||
if n := len(reply.Nodes); n != 10 {
|
||||
t.Fatalf("expect 10 nodes, got: %d", n)
|
||||
}
|
||||
if reply.Nodes[0].Node.Node != "node3" {
|
||||
shuffled = true
|
||||
break
|
||||
}
|
||||
}
|
||||
|
||||
if !shuffled {
|
||||
t.Fatal("expect node shuffle for remote results")
|
||||
}
|
||||
}
|
||||
|
||||
// Un-bake the near parameter.
|
||||
query.Query.Service.Near = ""
|
||||
if err := msgpackrpc.CallWithCodec(codec1, "PreparedQuery.Apply", &query, &query.Query.ID); err != nil {
|
||||
t.Fatalf("err:% v", err)
|
||||
t.Fatalf("err: %v", err)
|
||||
}
|
||||
|
||||
// Update the health of a node to mark it critical.
|
||||
|
|
|
@ -178,12 +178,6 @@ type PreparedQueryExecuteRequest struct {
|
|||
// Limit will trim the resulting list down to the given limit.
|
||||
Limit int
|
||||
|
||||
// Origin is used to carry around a reference to the node which
|
||||
// is executing the query on behalf of the client. It is taken
|
||||
// as a QuerySource so that it can be used directly for queries
|
||||
// relating to the agent servicing the request.
|
||||
Origin QuerySource
|
||||
|
||||
// Source is used to sort the results relative to a given node using
|
||||
// network coordinates.
|
||||
Source QuerySource
|
||||
|
|
|
@ -196,6 +196,11 @@ func (r *DeregisterRequest) RequestDatacenter() string {
|
|||
type QuerySource struct {
|
||||
Datacenter string
|
||||
Node string
|
||||
|
||||
// NearRequested indicates where the values in this QuerySource came
|
||||
// from. When true, the values were provided by the requestor,
|
||||
// otherwise they were filled by the agent servicing the request.
|
||||
NearRequested bool
|
||||
}
|
||||
|
||||
// DCSpecificRequest is used to query about a specific DC
|
||||
|
|
Loading…
Reference in New Issue