agent/consul/state: support querying by Connect native
This commit is contained in:
parent
bb98686ec8
commit
a3e0ac1ee3
|
@ -17,6 +17,7 @@ import (
|
|||
"github.com/hashicorp/consul/types"
|
||||
"github.com/hashicorp/net-rpc-msgpackrpc"
|
||||
"github.com/stretchr/testify/assert"
|
||||
"github.com/stretchr/testify/require"
|
||||
)
|
||||
|
||||
func TestCatalog_Register(t *testing.T) {
|
||||
|
@ -1823,6 +1824,49 @@ func TestCatalog_ListServiceNodes_ConnectDestination(t *testing.T) {
|
|||
assert.Equal("", v.ServiceProxyDestination)
|
||||
}
|
||||
|
||||
// Test that calling ServiceNodes with Connect: true will return
|
||||
// Connect native services.
|
||||
func TestCatalog_ListServiceNodes_ConnectDestinationNative(t *testing.T) {
|
||||
t.Parallel()
|
||||
|
||||
require := require.New(t)
|
||||
dir1, s1 := testServer(t)
|
||||
defer os.RemoveAll(dir1)
|
||||
defer s1.Shutdown()
|
||||
codec := rpcClient(t, s1)
|
||||
defer codec.Close()
|
||||
|
||||
testrpc.WaitForLeader(t, s1.RPC, "dc1")
|
||||
|
||||
// Register the native service
|
||||
args := structs.TestRegisterRequest(t)
|
||||
args.Service.ConnectNative = true
|
||||
var out struct{}
|
||||
require.Nil(msgpackrpc.CallWithCodec(codec, "Catalog.Register", args, &out))
|
||||
|
||||
// List
|
||||
req := structs.ServiceSpecificRequest{
|
||||
Connect: true,
|
||||
Datacenter: "dc1",
|
||||
ServiceName: args.Service.Service,
|
||||
}
|
||||
var resp structs.IndexedServiceNodes
|
||||
require.Nil(msgpackrpc.CallWithCodec(codec, "Catalog.ServiceNodes", &req, &resp))
|
||||
require.Len(resp.ServiceNodes, 1)
|
||||
v := resp.ServiceNodes[0]
|
||||
require.Equal(args.Service.Service, v.ServiceName)
|
||||
|
||||
// List by non-Connect
|
||||
req = structs.ServiceSpecificRequest{
|
||||
Datacenter: "dc1",
|
||||
ServiceName: args.Service.Service,
|
||||
}
|
||||
require.Nil(msgpackrpc.CallWithCodec(codec, "Catalog.ServiceNodes", &req, &resp))
|
||||
require.Len(resp.ServiceNodes, 1)
|
||||
v = resp.ServiceNodes[0]
|
||||
require.Equal(args.Service.Service, v.ServiceName)
|
||||
}
|
||||
|
||||
func TestCatalog_ListServiceNodes_ConnectProxy_ACL(t *testing.T) {
|
||||
t.Parallel()
|
||||
|
||||
|
|
|
@ -91,14 +91,11 @@ func servicesTableSchema() *memdb.TableSchema {
|
|||
Lowercase: true,
|
||||
},
|
||||
},
|
||||
"proxy_destination": &memdb.IndexSchema{
|
||||
Name: "proxy_destination",
|
||||
"connect": &memdb.IndexSchema{
|
||||
Name: "connect",
|
||||
AllowMissing: true,
|
||||
Unique: false,
|
||||
Indexer: &memdb.StringFieldIndex{
|
||||
Field: "ServiceProxyDestination",
|
||||
Lowercase: true,
|
||||
},
|
||||
Indexer: &IndexConnectService{},
|
||||
},
|
||||
},
|
||||
}
|
||||
|
@ -819,7 +816,7 @@ func (s *Store) serviceNodes(ws memdb.WatchSet, serviceName string, connect bool
|
|||
}
|
||||
} else {
|
||||
f = func() (memdb.ResultIterator, error) {
|
||||
return tx.Get("services", "proxy_destination", serviceName)
|
||||
return tx.Get("services", "connect", serviceName)
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -1540,7 +1537,7 @@ func (s *Store) checkServiceNodes(ws memdb.WatchSet, serviceName string, connect
|
|||
}
|
||||
} else {
|
||||
f = func() (memdb.ResultIterator, error) {
|
||||
return tx.Get("services", "proxy_destination", serviceName)
|
||||
return tx.Get("services", "connect", serviceName)
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -1590,7 +1590,8 @@ func TestStateStore_ConnectServiceNodes(t *testing.T) {
|
|||
assert.Nil(s.EnsureService(13, "bar", &structs.NodeService{ID: "api", Service: "api", Tags: nil, Address: "", Port: 5000}))
|
||||
assert.Nil(s.EnsureService(14, "foo", &structs.NodeService{Kind: structs.ServiceKindConnectProxy, ID: "proxy", Service: "proxy", ProxyDestination: "db", Port: 8000}))
|
||||
assert.Nil(s.EnsureService(15, "bar", &structs.NodeService{Kind: structs.ServiceKindConnectProxy, ID: "proxy", Service: "proxy", ProxyDestination: "db", Port: 8000}))
|
||||
assert.Nil(s.EnsureService(16, "bar", &structs.NodeService{ID: "db2", Service: "db", Tags: []string{"slave"}, Address: "", Port: 8001}))
|
||||
assert.Nil(s.EnsureService(16, "bar", &structs.NodeService{ID: "native-db", Service: "db", ConnectNative: true}))
|
||||
assert.Nil(s.EnsureService(17, "bar", &structs.NodeService{ID: "db2", Service: "db", Tags: []string{"slave"}, Address: "", Port: 8001}))
|
||||
assert.True(watchFired(ws))
|
||||
|
||||
// Read everything back.
|
||||
|
@ -1598,11 +1599,13 @@ func TestStateStore_ConnectServiceNodes(t *testing.T) {
|
|||
idx, nodes, err = s.ConnectServiceNodes(ws, "db")
|
||||
assert.Nil(err)
|
||||
assert.Equal(idx, uint64(idx))
|
||||
assert.Len(nodes, 2)
|
||||
assert.Len(nodes, 3)
|
||||
|
||||
for _, n := range nodes {
|
||||
assert.Equal(structs.ServiceKindConnectProxy, n.ServiceKind)
|
||||
assert.Equal("db", n.ServiceProxyDestination)
|
||||
assert.True(
|
||||
n.ServiceKind == structs.ServiceKindConnectProxy ||
|
||||
n.ServiceConnectNative,
|
||||
"either proxy or connect native")
|
||||
}
|
||||
|
||||
// Registering some unrelated node should not fire the watch.
|
||||
|
|
|
@ -0,0 +1,54 @@
|
|||
package state
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"strings"
|
||||
|
||||
"github.com/hashicorp/consul/agent/structs"
|
||||
)
|
||||
|
||||
// IndexConnectService indexes a *struct.ServiceNode for querying by
|
||||
// services that support Connect to some target service. This will
|
||||
// properly index the proxy destination for proxies and the service name
|
||||
// for native services.
|
||||
type IndexConnectService struct{}
|
||||
|
||||
func (idx *IndexConnectService) FromObject(obj interface{}) (bool, []byte, error) {
|
||||
sn, ok := obj.(*structs.ServiceNode)
|
||||
if !ok {
|
||||
return false, nil, fmt.Errorf("Object must be ServiceNode, got %T", obj)
|
||||
}
|
||||
|
||||
var result []byte
|
||||
switch {
|
||||
case sn.ServiceKind == structs.ServiceKindConnectProxy:
|
||||
// For proxies, this service supports Connect for the destination
|
||||
result = []byte(strings.ToLower(sn.ServiceProxyDestination))
|
||||
|
||||
case sn.ServiceConnectNative:
|
||||
// For native, this service supports Connect directly
|
||||
result = []byte(strings.ToLower(sn.ServiceName))
|
||||
|
||||
default:
|
||||
// Doesn't support Connect at all
|
||||
return false, nil, nil
|
||||
}
|
||||
|
||||
// Return the result with the null terminator appended so we can
|
||||
// differentiate prefix vs. non-prefix matches.
|
||||
return true, append(result, '\x00'), nil
|
||||
}
|
||||
|
||||
func (idx *IndexConnectService) FromArgs(args ...interface{}) ([]byte, error) {
|
||||
if len(args) != 1 {
|
||||
return nil, fmt.Errorf("must provide only a single argument")
|
||||
}
|
||||
|
||||
arg, ok := args[0].(string)
|
||||
if !ok {
|
||||
return nil, fmt.Errorf("argument must be a string: %#v", args[0])
|
||||
}
|
||||
|
||||
// Add the null character as a terminator
|
||||
return append([]byte(strings.ToLower(arg)), '\x00'), nil
|
||||
}
|
|
@ -0,0 +1,122 @@
|
|||
package state
|
||||
|
||||
import (
|
||||
"testing"
|
||||
|
||||
"github.com/hashicorp/consul/agent/structs"
|
||||
"github.com/stretchr/testify/require"
|
||||
)
|
||||
|
||||
func TestIndexConnectService_FromObject(t *testing.T) {
|
||||
cases := []struct {
|
||||
Name string
|
||||
Input interface{}
|
||||
ExpectMatch bool
|
||||
ExpectVal []byte
|
||||
ExpectErr string
|
||||
}{
|
||||
{
|
||||
"not a ServiceNode",
|
||||
42,
|
||||
false,
|
||||
nil,
|
||||
"ServiceNode",
|
||||
},
|
||||
|
||||
{
|
||||
"typical service, not native",
|
||||
&structs.ServiceNode{
|
||||
ServiceName: "db",
|
||||
},
|
||||
false,
|
||||
nil,
|
||||
"",
|
||||
},
|
||||
|
||||
{
|
||||
"typical service, is native",
|
||||
&structs.ServiceNode{
|
||||
ServiceName: "dB",
|
||||
ServiceConnectNative: true,
|
||||
},
|
||||
true,
|
||||
[]byte("db\x00"),
|
||||
"",
|
||||
},
|
||||
|
||||
{
|
||||
"proxy service",
|
||||
&structs.ServiceNode{
|
||||
ServiceKind: structs.ServiceKindConnectProxy,
|
||||
ServiceName: "db",
|
||||
ServiceProxyDestination: "fOo",
|
||||
},
|
||||
true,
|
||||
[]byte("foo\x00"),
|
||||
"",
|
||||
},
|
||||
}
|
||||
|
||||
for _, tc := range cases {
|
||||
t.Run(tc.Name, func(t *testing.T) {
|
||||
require := require.New(t)
|
||||
|
||||
var idx IndexConnectService
|
||||
match, val, err := idx.FromObject(tc.Input)
|
||||
if tc.ExpectErr != "" {
|
||||
require.Error(err)
|
||||
require.Contains(err.Error(), tc.ExpectErr)
|
||||
return
|
||||
}
|
||||
require.NoError(err)
|
||||
require.Equal(tc.ExpectMatch, match)
|
||||
require.Equal(tc.ExpectVal, val)
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
func TestIndexConnectService_FromArgs(t *testing.T) {
|
||||
cases := []struct {
|
||||
Name string
|
||||
Args []interface{}
|
||||
ExpectVal []byte
|
||||
ExpectErr string
|
||||
}{
|
||||
{
|
||||
"multiple arguments",
|
||||
[]interface{}{"foo", "bar"},
|
||||
nil,
|
||||
"single",
|
||||
},
|
||||
|
||||
{
|
||||
"not a string",
|
||||
[]interface{}{42},
|
||||
nil,
|
||||
"must be a string",
|
||||
},
|
||||
|
||||
{
|
||||
"string",
|
||||
[]interface{}{"fOO"},
|
||||
[]byte("foo\x00"),
|
||||
"",
|
||||
},
|
||||
}
|
||||
|
||||
for _, tc := range cases {
|
||||
t.Run(tc.Name, func(t *testing.T) {
|
||||
require := require.New(t)
|
||||
|
||||
var idx IndexConnectService
|
||||
val, err := idx.FromArgs(tc.Args...)
|
||||
if tc.ExpectErr != "" {
|
||||
require.Error(err)
|
||||
require.Contains(err.Error(), tc.ExpectErr)
|
||||
return
|
||||
}
|
||||
require.NoError(err)
|
||||
require.Equal(tc.ExpectVal, val)
|
||||
})
|
||||
}
|
||||
}
|
|
@ -555,6 +555,11 @@ func (s *NodeService) Validate() error {
|
|||
result = multierror.Append(result, fmt.Errorf(
|
||||
"Port must be set for a Connect proxy"))
|
||||
}
|
||||
|
||||
if s.ConnectNative {
|
||||
result = multierror.Append(result, fmt.Errorf(
|
||||
"A Proxy cannot also be ConnectNative, only typical services"))
|
||||
}
|
||||
}
|
||||
|
||||
return result
|
||||
|
|
|
@ -246,6 +246,12 @@ func TestStructs_NodeService_ValidateConnectProxy(t *testing.T) {
|
|||
func(x *NodeService) { x.Port = 0 },
|
||||
"Port must",
|
||||
},
|
||||
|
||||
{
|
||||
"connect-proxy: ConnectNative set",
|
||||
func(x *NodeService) { x.ConnectNative = true },
|
||||
"cannot also be",
|
||||
},
|
||||
}
|
||||
|
||||
for _, tc := range cases {
|
||||
|
|
Loading…
Reference in New Issue