agent/consul: prepared query supports "Connect" field
This commit is contained in:
parent
e3562e39cc
commit
e8c899b1b8
|
@ -511,7 +511,15 @@ func (p *PreparedQuery) ExecuteRemote(args *structs.PreparedQueryExecuteRemoteRe
|
|||
func (p *PreparedQuery) execute(query *structs.PreparedQuery,
|
||||
reply *structs.PreparedQueryExecuteResponse) error {
|
||||
state := p.srv.fsm.State()
|
||||
_, nodes, err := state.CheckServiceNodes(nil, query.Service.Service)
|
||||
|
||||
// If we're requesting Connect-capable services, then switch the
|
||||
// lookup to be the Connect function.
|
||||
f := state.CheckServiceNodes
|
||||
if query.Service.Connect {
|
||||
f = state.CheckConnectServiceNodes
|
||||
}
|
||||
|
||||
_, nodes, err := f(nil, query.Service.Service)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
|
|
@ -20,6 +20,7 @@ import (
|
|||
"github.com/hashicorp/consul/types"
|
||||
"github.com/hashicorp/net-rpc-msgpackrpc"
|
||||
"github.com/hashicorp/serf/coordinate"
|
||||
"github.com/stretchr/testify/require"
|
||||
)
|
||||
|
||||
func TestPreparedQuery_Apply(t *testing.T) {
|
||||
|
@ -2617,6 +2618,128 @@ func TestPreparedQuery_Execute_ForwardLeader(t *testing.T) {
|
|||
}
|
||||
}
|
||||
|
||||
func TestPreparedQuery_Execute_ConnectExact(t *testing.T) {
|
||||
t.Parallel()
|
||||
|
||||
require := require.New(t)
|
||||
dir1, s1 := testServer(t)
|
||||
defer os.RemoveAll(dir1)
|
||||
defer s1.Shutdown()
|
||||
codec := rpcClient(t, s1)
|
||||
defer codec.Close()
|
||||
|
||||
// Setup 3 services on 3 nodes: one is non-Connect, one is Connect native,
|
||||
// and one is a proxy to the non-Connect one.
|
||||
for i := 0; i < 3; i++ {
|
||||
req := structs.RegisterRequest{
|
||||
Datacenter: "dc1",
|
||||
Node: fmt.Sprintf("node%d", i+1),
|
||||
Address: fmt.Sprintf("127.0.0.%d", i+1),
|
||||
Service: &structs.NodeService{
|
||||
Service: "foo",
|
||||
Port: 8000,
|
||||
},
|
||||
}
|
||||
|
||||
switch i {
|
||||
case 0:
|
||||
// Default do nothing
|
||||
|
||||
case 1:
|
||||
// Connect native
|
||||
req.Service.Connect.Native = true
|
||||
|
||||
case 2:
|
||||
// Connect proxy
|
||||
req.Service.Kind = structs.ServiceKindConnectProxy
|
||||
req.Service.ProxyDestination = req.Service.Service
|
||||
req.Service.Service = "proxy"
|
||||
}
|
||||
|
||||
var reply struct{}
|
||||
require.NoError(msgpackrpc.CallWithCodec(codec, "Catalog.Register", &req, &reply))
|
||||
}
|
||||
|
||||
// The query, start with connect disabled
|
||||
query := structs.PreparedQueryRequest{
|
||||
Datacenter: "dc1",
|
||||
Op: structs.PreparedQueryCreate,
|
||||
Query: &structs.PreparedQuery{
|
||||
Name: "test",
|
||||
Service: structs.ServiceQuery{
|
||||
Service: "foo",
|
||||
},
|
||||
DNS: structs.QueryDNSOptions{
|
||||
TTL: "10s",
|
||||
},
|
||||
},
|
||||
}
|
||||
require.NoError(msgpackrpc.CallWithCodec(
|
||||
codec, "PreparedQuery.Apply", &query, &query.Query.ID))
|
||||
|
||||
// In the future we'll run updates
|
||||
query.Op = structs.PreparedQueryUpdate
|
||||
|
||||
// Run the registered query.
|
||||
{
|
||||
req := structs.PreparedQueryExecuteRequest{
|
||||
Datacenter: "dc1",
|
||||
QueryIDOrName: query.Query.ID,
|
||||
}
|
||||
|
||||
var reply structs.PreparedQueryExecuteResponse
|
||||
require.NoError(msgpackrpc.CallWithCodec(
|
||||
codec, "PreparedQuery.Execute", &req, &reply))
|
||||
|
||||
// Result should have two because it omits the proxy whose name
|
||||
// doesn't match the query.
|
||||
require.Len(reply.Nodes, 2)
|
||||
require.Equal(query.Query.Service.Service, reply.Service)
|
||||
require.Equal(query.Query.DNS, reply.DNS)
|
||||
require.True(reply.QueryMeta.KnownLeader, "queried leader")
|
||||
}
|
||||
|
||||
// Update the query
|
||||
query.Query.Service.Connect = true
|
||||
require.NoError(msgpackrpc.CallWithCodec(
|
||||
codec, "PreparedQuery.Apply", &query, &query.Query.ID))
|
||||
|
||||
// Run the registered query.
|
||||
{
|
||||
req := structs.PreparedQueryExecuteRequest{
|
||||
Datacenter: "dc1",
|
||||
QueryIDOrName: query.Query.ID,
|
||||
}
|
||||
|
||||
var reply structs.PreparedQueryExecuteResponse
|
||||
require.NoError(msgpackrpc.CallWithCodec(
|
||||
codec, "PreparedQuery.Execute", &req, &reply))
|
||||
|
||||
// Result should have two because we should get the native AND
|
||||
// the proxy (since the destination matches our service name).
|
||||
require.Len(reply.Nodes, 2)
|
||||
require.Equal(query.Query.Service.Service, reply.Service)
|
||||
require.Equal(query.Query.DNS, reply.DNS)
|
||||
require.True(reply.QueryMeta.KnownLeader, "queried leader")
|
||||
|
||||
// Make sure the native is the first one
|
||||
if !reply.Nodes[0].Service.Connect.Native {
|
||||
reply.Nodes[0], reply.Nodes[1] = reply.Nodes[1], reply.Nodes[0]
|
||||
}
|
||||
|
||||
require.True(reply.Nodes[0].Service.Connect.Native, "native")
|
||||
require.Equal(reply.Service, reply.Nodes[0].Service.Service)
|
||||
|
||||
require.Equal(structs.ServiceKindConnectProxy, reply.Nodes[1].Service.Kind)
|
||||
require.Equal(reply.Service, reply.Nodes[1].Service.ProxyDestination)
|
||||
}
|
||||
|
||||
// Unset the query
|
||||
query.Query.Service.Connect = false
|
||||
require.NoError(msgpackrpc.CallWithCodec(
|
||||
codec, "PreparedQuery.Apply", &query, &query.Query.ID))
|
||||
}
|
||||
|
||||
func TestPreparedQuery_tagFilter(t *testing.T) {
|
||||
t.Parallel()
|
||||
testNodes := func() structs.CheckServiceNodes {
|
||||
|
|
|
@ -57,6 +57,14 @@ type ServiceQuery struct {
|
|||
// pair is in this map it must be present on the node in order for the
|
||||
// service entry to be returned.
|
||||
NodeMeta map[string]string
|
||||
|
||||
// Connect if true will filter the prepared query results to only
|
||||
// include Connect-capable services. These include both native services
|
||||
// and proxies for matching services. Note that if a proxy matches,
|
||||
// the constraints in the query above (Near, OnlyPassing, etc.) apply
|
||||
// to the _proxy_ and not the service being proxied. In practice, proxies
|
||||
// should be directly next to their services so this isn't an issue.
|
||||
Connect bool
|
||||
}
|
||||
|
||||
const (
|
||||
|
|
Loading…
Reference in a new issue