consul: use the Near field instead of PreferLocal
This commit is contained in:
parent
89fe991ab7
commit
114e57fff1
|
@ -531,13 +531,7 @@ func (s *HTTPServer) parseToken(req *http.Request, token *string) {
|
||||||
// DC in the request, if given, or else the agent's DC.
|
// DC in the request, if given, or else the agent's DC.
|
||||||
func (s *HTTPServer) parseSource(req *http.Request, source *structs.QuerySource) {
|
func (s *HTTPServer) parseSource(req *http.Request, source *structs.QuerySource) {
|
||||||
s.parseDC(req, &source.Datacenter)
|
s.parseDC(req, &source.Datacenter)
|
||||||
if node := req.URL.Query().Get("near"); node != "" {
|
source.Node = req.URL.Query().Get("near")
|
||||||
if node == "_agent" {
|
|
||||||
source.Node = s.agent.config.NodeName
|
|
||||||
} else {
|
|
||||||
source.Node = node
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// parse is a convenience method for endpoints that need
|
// parse is a convenience method for endpoints that need
|
||||||
|
|
|
@ -382,18 +382,6 @@ func TestParseSource(t *testing.T) {
|
||||||
if source.Datacenter != "foo" || source.Node != "bob" {
|
if source.Datacenter != "foo" || source.Node != "bob" {
|
||||||
t.Fatalf("bad: %v", source)
|
t.Fatalf("bad: %v", source)
|
||||||
}
|
}
|
||||||
|
|
||||||
// The magic "_agent" node name will use the agent's local node name.
|
|
||||||
req, err = http.NewRequest("GET",
|
|
||||||
"/v1/catalog/nodes?near=_agent", nil)
|
|
||||||
if err != nil {
|
|
||||||
t.Fatalf("err: %v", err)
|
|
||||||
}
|
|
||||||
source = structs.QuerySource{}
|
|
||||||
srv.parseSource(req, &source)
|
|
||||||
if source.Datacenter != "dc1" || source.Node != srv.agent.config.NodeName {
|
|
||||||
t.Fatalf("bad: %v", source)
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
func TestParseWait(t *testing.T) {
|
func TestParseWait(t *testing.T) {
|
||||||
|
|
|
@ -279,6 +279,7 @@ func TestPreparedQuery_Execute(t *testing.T) {
|
||||||
|
|
||||||
m.executeFn = func(args *structs.PreparedQueryExecuteRequest, reply *structs.PreparedQueryExecuteResponse) error {
|
m.executeFn = func(args *structs.PreparedQueryExecuteRequest, reply *structs.PreparedQueryExecuteResponse) error {
|
||||||
expected := &structs.PreparedQueryExecuteRequest{
|
expected := &structs.PreparedQueryExecuteRequest{
|
||||||
|
Origin: srv.agent.config.NodeName,
|
||||||
Datacenter: "dc1",
|
Datacenter: "dc1",
|
||||||
QueryIDOrName: "my-id",
|
QueryIDOrName: "my-id",
|
||||||
Limit: 5,
|
Limit: 5,
|
||||||
|
@ -350,6 +351,7 @@ func TestPreparedQuery_Explain(t *testing.T) {
|
||||||
|
|
||||||
m.explainFn = func(args *structs.PreparedQueryExecuteRequest, reply *structs.PreparedQueryExplainResponse) error {
|
m.explainFn = func(args *structs.PreparedQueryExecuteRequest, reply *structs.PreparedQueryExplainResponse) error {
|
||||||
expected := &structs.PreparedQueryExecuteRequest{
|
expected := &structs.PreparedQueryExecuteRequest{
|
||||||
|
Origin: srv.agent.config.NodeName,
|
||||||
Datacenter: "dc1",
|
Datacenter: "dc1",
|
||||||
QueryIDOrName: "my-id",
|
QueryIDOrName: "my-id",
|
||||||
Limit: 5,
|
Limit: 5,
|
||||||
|
|
|
@ -368,12 +368,35 @@ func (p *PreparedQuery) Execute(args *structs.PreparedQueryExecuteRequest,
|
||||||
// Shuffle the results in case coordinates are not available if they
|
// Shuffle the results in case coordinates are not available if they
|
||||||
// requested an RTT sort.
|
// requested an RTT sort.
|
||||||
reply.Nodes.Shuffle()
|
reply.Nodes.Shuffle()
|
||||||
if err := p.srv.sortNodesByDistanceFrom(args.Source, reply.Nodes); err != nil {
|
|
||||||
|
// Get the source to sort by. This can be passed in by the requestor, or
|
||||||
|
// pre-defined using the Near parameter in the prepared query. If the
|
||||||
|
// near parameter was defined, that will be preferred.
|
||||||
|
sortFrom := args.Source
|
||||||
|
if query.Service.Near != "" {
|
||||||
|
sortFrom = structs.QuerySource{
|
||||||
|
Datacenter: args.Datacenter,
|
||||||
|
Node: query.Service.Near,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Respect the magic "_agent" flag.
|
||||||
|
preferLocal := false
|
||||||
|
if sortFrom.Node == "_agent" {
|
||||||
|
preferLocal = true
|
||||||
|
sortFrom.Node = args.Origin
|
||||||
|
}
|
||||||
|
|
||||||
|
// Perform the distance sort
|
||||||
|
if err := p.srv.sortNodesByDistanceFrom(sortFrom, reply.Nodes); err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
// Prefer the local service if it exists and is in the set.
|
// Nodes cannot be any "closer" than localhost, so this special case ensures
|
||||||
if query.Service.PreferLocal {
|
// the local node is returned first if it is present in the result. This
|
||||||
|
// allows the local agent to be preferred even when network coordinates are
|
||||||
|
// not enabled.
|
||||||
|
if preferLocal {
|
||||||
for i, node := range reply.Nodes {
|
for i, node := range reply.Nodes {
|
||||||
if node.Node.Node == args.Origin {
|
if node.Node.Node == args.Origin {
|
||||||
remote := append(reply.Nodes[:i], reply.Nodes[i+1:]...)
|
remote := append(reply.Nodes[:i], reply.Nodes[i+1:]...)
|
||||||
|
|
|
@ -1609,7 +1609,7 @@ func TestPreparedQuery_Execute(t *testing.T) {
|
||||||
|
|
||||||
// Set the query to prefer a colocated service
|
// Set the query to prefer a colocated service
|
||||||
query.Op = structs.PreparedQueryUpdate
|
query.Op = structs.PreparedQueryUpdate
|
||||||
query.Query.Service.PreferLocal = true
|
query.Query.Service.Near = "_agent"
|
||||||
if err := msgpackrpc.CallWithCodec(codec1, "PreparedQuery.Apply", &query, &query.Query.ID); err != nil {
|
if err := msgpackrpc.CallWithCodec(codec1, "PreparedQuery.Apply", &query, &query.Query.ID); err != nil {
|
||||||
t.Fatalf("err: %v", err)
|
t.Fatalf("err: %v", err)
|
||||||
}
|
}
|
||||||
|
@ -1630,7 +1630,6 @@ func TestPreparedQuery_Execute(t *testing.T) {
|
||||||
if err := msgpackrpc.CallWithCodec(codec1, "PreparedQuery.Execute", &req, &reply); err != nil {
|
if err := msgpackrpc.CallWithCodec(codec1, "PreparedQuery.Execute", &req, &reply); err != nil {
|
||||||
t.Fatalf("err: %v", err)
|
t.Fatalf("err: %v", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
if n := len(reply.Nodes); n != 10 {
|
if n := len(reply.Nodes); n != 10 {
|
||||||
t.Fatalf("expect 10 nodes, got: %d", n)
|
t.Fatalf("expect 10 nodes, got: %d", n)
|
||||||
}
|
}
|
||||||
|
@ -1640,8 +1639,27 @@ func TestPreparedQuery_Execute(t *testing.T) {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Falls back to remote nodes if service is not available locally
|
||||||
|
{
|
||||||
|
req := structs.PreparedQueryExecuteRequest{
|
||||||
|
Origin: "not-in-result",
|
||||||
|
Datacenter: "dc1",
|
||||||
|
QueryIDOrName: query.Query.ID,
|
||||||
|
QueryOptions: structs.QueryOptions{Token: execToken},
|
||||||
|
}
|
||||||
|
|
||||||
|
var reply structs.PreparedQueryExecuteResponse
|
||||||
|
|
||||||
|
if err := msgpackrpc.CallWithCodec(codec1, "PreparedQuery.Execute", &req, &reply); err != nil {
|
||||||
|
t.Fatalf("err: %v", err)
|
||||||
|
}
|
||||||
|
if n := len(reply.Nodes); n != 10 {
|
||||||
|
t.Fatalf("expect 10 nodes, got: %d", n)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
// Remove local preference.
|
// Remove local preference.
|
||||||
query.Query.Service.PreferLocal = false
|
query.Query.Service.Near = ""
|
||||||
if err := msgpackrpc.CallWithCodec(codec1, "PreparedQuery.Apply", &query, &query.Query.ID); err != nil {
|
if err := msgpackrpc.CallWithCodec(codec1, "PreparedQuery.Apply", &query, &query.Query.ID); err != nil {
|
||||||
t.Fatalf("err:% v", err)
|
t.Fatalf("err:% v", err)
|
||||||
}
|
}
|
||||||
|
|
|
@ -34,10 +34,10 @@ type ServiceQuery struct {
|
||||||
// discarded)
|
// discarded)
|
||||||
OnlyPassing bool
|
OnlyPassing bool
|
||||||
|
|
||||||
// If PreferLocal is true, the local agent will be checked for a local
|
// Near allows the query to always prefer the node nearest the given
|
||||||
// copy of the service before continuing to remote machines. This is
|
// node. If the node does not exist, results are returned in their
|
||||||
// useful to prefer colocated services but fall back when unavailable.
|
// normal randomly-shuffled order.
|
||||||
PreferLocal bool
|
Near string
|
||||||
|
|
||||||
// Tags are a set of required and/or disallowed tags. If a tag is in
|
// Tags are a set of required and/or disallowed tags. If a tag is in
|
||||||
// this list it must be present. If the tag is preceded with "!" then
|
// this list it must be present. If the tag is preceded with "!" then
|
||||||
|
@ -168,10 +168,6 @@ func (q *PreparedQuerySpecificRequest) RequestDatacenter() string {
|
||||||
|
|
||||||
// PreparedQueryExecuteRequest is used to execute a prepared query.
|
// PreparedQueryExecuteRequest is used to execute a prepared query.
|
||||||
type PreparedQueryExecuteRequest struct {
|
type PreparedQueryExecuteRequest struct {
|
||||||
// Origin is used to carry around a reference to the node which
|
|
||||||
// is executing the query on behalf of the client.
|
|
||||||
Origin string
|
|
||||||
|
|
||||||
// Datacenter is the target this request is intended for.
|
// Datacenter is the target this request is intended for.
|
||||||
Datacenter string
|
Datacenter string
|
||||||
|
|
||||||
|
@ -182,6 +178,10 @@ type PreparedQueryExecuteRequest struct {
|
||||||
// Limit will trim the resulting list down to the given limit.
|
// Limit will trim the resulting list down to the given limit.
|
||||||
Limit int
|
Limit int
|
||||||
|
|
||||||
|
// Origin is used to carry around a reference to the node which
|
||||||
|
// is executing the query on behalf of the client.
|
||||||
|
Origin string
|
||||||
|
|
||||||
// Source is used to sort the results relative to a given node using
|
// Source is used to sort the results relative to a given node using
|
||||||
// network coordinates.
|
// network coordinates.
|
||||||
Source QuerySource
|
Source QuerySource
|
||||||
|
|
Loading…
Reference in a new issue