agent/consul: Catalog.ServiceNodes supports Connect filtering
This commit is contained in:
parent
06957f6d7f
commit
253256352c
|
@ -269,24 +269,37 @@ func (c *Catalog) ServiceNodes(args *structs.ServiceSpecificRequest, reply *stru
|
|||
return fmt.Errorf("Must provide service name")
|
||||
}
|
||||
|
||||
// Determine the function we'll call
|
||||
var f func(memdb.WatchSet, *state.Store) (uint64, structs.ServiceNodes, error)
|
||||
switch {
|
||||
case args.Connect:
|
||||
f = func(ws memdb.WatchSet, s *state.Store) (uint64, structs.ServiceNodes, error) {
|
||||
return s.ConnectServiceNodes(ws, args.ServiceName)
|
||||
}
|
||||
|
||||
default:
|
||||
f = func(ws memdb.WatchSet, s *state.Store) (uint64, structs.ServiceNodes, error) {
|
||||
if args.ServiceAddress != "" {
|
||||
return s.ServiceAddressNodes(ws, args.ServiceAddress)
|
||||
}
|
||||
|
||||
if args.TagFilter {
|
||||
return s.ServiceTagNodes(ws, args.ServiceName, args.ServiceTag)
|
||||
}
|
||||
|
||||
return s.ServiceNodes(ws, args.ServiceName)
|
||||
}
|
||||
}
|
||||
|
||||
err := c.srv.blockingQuery(
|
||||
&args.QueryOptions,
|
||||
&reply.QueryMeta,
|
||||
func(ws memdb.WatchSet, state *state.Store) error {
|
||||
var index uint64
|
||||
var services structs.ServiceNodes
|
||||
var err error
|
||||
if args.TagFilter {
|
||||
index, services, err = state.ServiceTagNodes(ws, args.ServiceName, args.ServiceTag)
|
||||
} else {
|
||||
index, services, err = state.ServiceNodes(ws, args.ServiceName)
|
||||
}
|
||||
if args.ServiceAddress != "" {
|
||||
index, services, err = state.ServiceAddressNodes(ws, args.ServiceAddress)
|
||||
}
|
||||
index, services, err := f(ws, state)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
reply.Index, reply.ServiceNodes = index, services
|
||||
if len(args.NodeMetaFilters) > 0 {
|
||||
var filtered structs.ServiceNodes
|
||||
|
@ -305,17 +318,24 @@ func (c *Catalog) ServiceNodes(args *structs.ServiceSpecificRequest, reply *stru
|
|||
|
||||
// Provide some metrics
|
||||
if err == nil {
|
||||
metrics.IncrCounterWithLabels([]string{"catalog", "service", "query"}, 1,
|
||||
// For metrics, we separate Connect-based lookups from non-Connect
|
||||
key := "service"
|
||||
if args.Connect {
|
||||
key = "connect"
|
||||
}
|
||||
|
||||
metrics.IncrCounterWithLabels([]string{"catalog", key, "query"}, 1,
|
||||
[]metrics.Label{{Name: "service", Value: args.ServiceName}})
|
||||
if args.ServiceTag != "" {
|
||||
metrics.IncrCounterWithLabels([]string{"catalog", "service", "query-tag"}, 1,
|
||||
metrics.IncrCounterWithLabels([]string{"catalog", key, "query-tag"}, 1,
|
||||
[]metrics.Label{{Name: "service", Value: args.ServiceName}, {Name: "tag", Value: args.ServiceTag}})
|
||||
}
|
||||
if len(reply.ServiceNodes) == 0 {
|
||||
metrics.IncrCounterWithLabels([]string{"catalog", "service", "not-found"}, 1,
|
||||
metrics.IncrCounterWithLabels([]string{"catalog", key, "not-found"}, 1,
|
||||
[]metrics.Label{{Name: "service", Value: args.ServiceName}})
|
||||
}
|
||||
}
|
||||
|
||||
return err
|
||||
}
|
||||
|
||||
|
|
|
@ -1773,6 +1773,57 @@ func TestCatalog_ListServiceNodes_ConnectProxy(t *testing.T) {
|
|||
assert.Equal(args.Service.ProxyDestination, v.ServiceProxyDestination)
|
||||
}
|
||||
|
||||
func TestCatalog_ListServiceNodes_ConnectDestination(t *testing.T) {
|
||||
t.Parallel()
|
||||
|
||||
assert := assert.New(t)
|
||||
dir1, s1 := testServer(t)
|
||||
defer os.RemoveAll(dir1)
|
||||
defer s1.Shutdown()
|
||||
codec := rpcClient(t, s1)
|
||||
defer codec.Close()
|
||||
|
||||
testrpc.WaitForLeader(t, s1.RPC, "dc1")
|
||||
|
||||
// Register the proxy service
|
||||
args := structs.TestRegisterRequestProxy(t)
|
||||
var out struct{}
|
||||
assert.Nil(msgpackrpc.CallWithCodec(codec, "Catalog.Register", args, &out))
|
||||
|
||||
// Register the service
|
||||
{
|
||||
dst := args.Service.ProxyDestination
|
||||
args := structs.TestRegisterRequest(t)
|
||||
args.Service.Service = dst
|
||||
var out struct{}
|
||||
assert.Nil(msgpackrpc.CallWithCodec(codec, "Catalog.Register", args, &out))
|
||||
}
|
||||
|
||||
// List
|
||||
req := structs.ServiceSpecificRequest{
|
||||
Connect: true,
|
||||
Datacenter: "dc1",
|
||||
ServiceName: args.Service.ProxyDestination,
|
||||
}
|
||||
var resp structs.IndexedServiceNodes
|
||||
assert.Nil(msgpackrpc.CallWithCodec(codec, "Catalog.ServiceNodes", &req, &resp))
|
||||
assert.Len(resp.ServiceNodes, 1)
|
||||
v := resp.ServiceNodes[0]
|
||||
assert.Equal(structs.ServiceKindConnectProxy, v.ServiceKind)
|
||||
assert.Equal(args.Service.ProxyDestination, v.ServiceProxyDestination)
|
||||
|
||||
// List by non-Connect
|
||||
req = structs.ServiceSpecificRequest{
|
||||
Datacenter: "dc1",
|
||||
ServiceName: args.Service.ProxyDestination,
|
||||
}
|
||||
assert.Nil(msgpackrpc.CallWithCodec(codec, "Catalog.ServiceNodes", &req, &resp))
|
||||
assert.Len(resp.ServiceNodes, 1)
|
||||
v = resp.ServiceNodes[0]
|
||||
assert.Equal(args.Service.ProxyDestination, v.ServiceName)
|
||||
assert.Equal("", v.ServiceProxyDestination)
|
||||
}
|
||||
|
||||
func TestCatalog_NodeServices(t *testing.T) {
|
||||
t.Parallel()
|
||||
dir1, s1 := testServer(t)
|
||||
|
|
|
@ -284,6 +284,10 @@ type ServiceSpecificRequest struct {
|
|||
ServiceAddress string
|
||||
TagFilter bool // Controls tag filtering
|
||||
Source QuerySource
|
||||
|
||||
// Connect if true will only search for Connect-compatible services.
|
||||
Connect bool
|
||||
|
||||
QueryOptions
|
||||
}
|
||||
|
||||
|
|
|
@ -4,6 +4,20 @@ import (
|
|||
"github.com/mitchellh/go-testing-interface"
|
||||
)
|
||||
|
||||
// TestRegisterRequest returns a RegisterRequest for registering a typical service.
|
||||
func TestRegisterRequest(t testing.T) *RegisterRequest {
|
||||
return &RegisterRequest{
|
||||
Datacenter: "dc1",
|
||||
Node: "foo",
|
||||
Address: "127.0.0.1",
|
||||
Service: &NodeService{
|
||||
Service: "web",
|
||||
Address: "",
|
||||
Port: 80,
|
||||
},
|
||||
}
|
||||
}
|
||||
|
||||
// TestRegisterRequestProxy returns a RegisterRequest for registering a
|
||||
// Connect proxy.
|
||||
func TestRegisterRequestProxy(t testing.T) *RegisterRequest {
|
||||
|
|
Loading…
Reference in New Issue