diff --git a/agent/consul/prepared_query_endpoint.go b/agent/consul/prepared_query_endpoint.go index a56ddd35a..1c006ca1d 100644 --- a/agent/consul/prepared_query_endpoint.go +++ b/agent/consul/prepared_query_endpoint.go @@ -511,7 +511,15 @@ func (p *PreparedQuery) ExecuteRemote(args *structs.PreparedQueryExecuteRemoteRe func (p *PreparedQuery) execute(query *structs.PreparedQuery, reply *structs.PreparedQueryExecuteResponse) error { state := p.srv.fsm.State() - _, nodes, err := state.CheckServiceNodes(nil, query.Service.Service) + + // If we're requesting Connect-capable services, then switch the + // lookup to be the Connect function. + f := state.CheckServiceNodes + if query.Service.Connect { + f = state.CheckConnectServiceNodes + } + + _, nodes, err := f(nil, query.Service.Service) if err != nil { return err } diff --git a/agent/consul/prepared_query_endpoint_test.go b/agent/consul/prepared_query_endpoint_test.go index e4bc49e51..5e16eff0f 100644 --- a/agent/consul/prepared_query_endpoint_test.go +++ b/agent/consul/prepared_query_endpoint_test.go @@ -20,6 +20,7 @@ import ( "github.com/hashicorp/consul/types" "github.com/hashicorp/net-rpc-msgpackrpc" "github.com/hashicorp/serf/coordinate" + "github.com/stretchr/testify/require" ) func TestPreparedQuery_Apply(t *testing.T) { @@ -2617,6 +2618,128 @@ func TestPreparedQuery_Execute_ForwardLeader(t *testing.T) { } } +func TestPreparedQuery_Execute_ConnectExact(t *testing.T) { + t.Parallel() + + require := require.New(t) + dir1, s1 := testServer(t) + defer os.RemoveAll(dir1) + defer s1.Shutdown() + codec := rpcClient(t, s1) + defer codec.Close() + + // Setup 3 services on 3 nodes: one is non-Connect, one is Connect native, + // and one is a proxy to the non-Connect one. + for i := 0; i < 3; i++ { + req := structs.RegisterRequest{ + Datacenter: "dc1", + Node: fmt.Sprintf("node%d", i+1), + Address: fmt.Sprintf("127.0.0.%d", i+1), + Service: &structs.NodeService{ + Service: "foo", + Port: 8000, + }, + } + + switch i { + case 0: + // Default do nothing + + case 1: + // Connect native + req.Service.Connect.Native = true + + case 2: + // Connect proxy + req.Service.Kind = structs.ServiceKindConnectProxy + req.Service.ProxyDestination = req.Service.Service + req.Service.Service = "proxy" + } + + var reply struct{} + require.NoError(msgpackrpc.CallWithCodec(codec, "Catalog.Register", &req, &reply)) + } + + // The query, start with connect disabled + query := structs.PreparedQueryRequest{ + Datacenter: "dc1", + Op: structs.PreparedQueryCreate, + Query: &structs.PreparedQuery{ + Name: "test", + Service: structs.ServiceQuery{ + Service: "foo", + }, + DNS: structs.QueryDNSOptions{ + TTL: "10s", + }, + }, + } + require.NoError(msgpackrpc.CallWithCodec( + codec, "PreparedQuery.Apply", &query, &query.Query.ID)) + + // In the future we'll run updates + query.Op = structs.PreparedQueryUpdate + + // Run the registered query. + { + req := structs.PreparedQueryExecuteRequest{ + Datacenter: "dc1", + QueryIDOrName: query.Query.ID, + } + + var reply structs.PreparedQueryExecuteResponse + require.NoError(msgpackrpc.CallWithCodec( + codec, "PreparedQuery.Execute", &req, &reply)) + + // Result should have two because it omits the proxy whose name + // doesn't match the query. + require.Len(reply.Nodes, 2) + require.Equal(query.Query.Service.Service, reply.Service) + require.Equal(query.Query.DNS, reply.DNS) + require.True(reply.QueryMeta.KnownLeader, "queried leader") + } + + // Update the query + query.Query.Service.Connect = true + require.NoError(msgpackrpc.CallWithCodec( + codec, "PreparedQuery.Apply", &query, &query.Query.ID)) + + // Run the registered query. + { + req := structs.PreparedQueryExecuteRequest{ + Datacenter: "dc1", + QueryIDOrName: query.Query.ID, + } + + var reply structs.PreparedQueryExecuteResponse + require.NoError(msgpackrpc.CallWithCodec( + codec, "PreparedQuery.Execute", &req, &reply)) + + // Result should have two because we should get the native AND + // the proxy (since the destination matches our service name). + require.Len(reply.Nodes, 2) + require.Equal(query.Query.Service.Service, reply.Service) + require.Equal(query.Query.DNS, reply.DNS) + require.True(reply.QueryMeta.KnownLeader, "queried leader") + + // Make sure the native is the first one + if !reply.Nodes[0].Service.Connect.Native { + reply.Nodes[0], reply.Nodes[1] = reply.Nodes[1], reply.Nodes[0] + } + + require.True(reply.Nodes[0].Service.Connect.Native, "native") + require.Equal(reply.Service, reply.Nodes[0].Service.Service) + + require.Equal(structs.ServiceKindConnectProxy, reply.Nodes[1].Service.Kind) + require.Equal(reply.Service, reply.Nodes[1].Service.ProxyDestination) + } + + // Unset the query + query.Query.Service.Connect = false + require.NoError(msgpackrpc.CallWithCodec( + codec, "PreparedQuery.Apply", &query, &query.Query.ID)) +} + func TestPreparedQuery_tagFilter(t *testing.T) { t.Parallel() testNodes := func() structs.CheckServiceNodes { diff --git a/agent/structs/prepared_query.go b/agent/structs/prepared_query.go index 8171aaefe..842a9b716 100644 --- a/agent/structs/prepared_query.go +++ b/agent/structs/prepared_query.go @@ -57,6 +57,14 @@ type ServiceQuery struct { // pair is in this map it must be present on the node in order for the // service entry to be returned. NodeMeta map[string]string + + // Connect if true will filter the prepared query results to only + // include Connect-capable services. These include both native services + // and proxies for matching services. Note that if a proxy matches, + // the constraints in the query above (Near, OnlyPassing, etc.) apply + // to the _proxy_ and not the service being proxied. In practice, proxies + // should be directly next to their services so this isn't an issue. + Connect bool } const (