From a3e0ac1ee3633ac75fba185c9d8cc04a2fe60c25 Mon Sep 17 00:00:00 2001 From: Mitchell Hashimoto Date: Mon, 4 Jun 2018 20:04:45 -0700 Subject: [PATCH] agent/consul/state: support querying by Connect native --- agent/consul/catalog_endpoint_test.go | 44 ++++++++ agent/consul/state/catalog.go | 13 +-- agent/consul/state/catalog_test.go | 11 +- agent/consul/state/index_connect.go | 54 ++++++++++ agent/consul/state/index_connect_test.go | 122 +++++++++++++++++++++++ agent/structs/structs.go | 5 + agent/structs/structs_test.go | 6 ++ 7 files changed, 243 insertions(+), 12 deletions(-) create mode 100644 agent/consul/state/index_connect.go create mode 100644 agent/consul/state/index_connect_test.go diff --git a/agent/consul/catalog_endpoint_test.go b/agent/consul/catalog_endpoint_test.go index 860a993ed..d1864614b 100644 --- a/agent/consul/catalog_endpoint_test.go +++ b/agent/consul/catalog_endpoint_test.go @@ -17,6 +17,7 @@ import ( "github.com/hashicorp/consul/types" "github.com/hashicorp/net-rpc-msgpackrpc" "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" ) func TestCatalog_Register(t *testing.T) { @@ -1823,6 +1824,49 @@ func TestCatalog_ListServiceNodes_ConnectDestination(t *testing.T) { assert.Equal("", v.ServiceProxyDestination) } +// Test that calling ServiceNodes with Connect: true will return +// Connect native services. +func TestCatalog_ListServiceNodes_ConnectDestinationNative(t *testing.T) { + t.Parallel() + + require := require.New(t) + dir1, s1 := testServer(t) + defer os.RemoveAll(dir1) + defer s1.Shutdown() + codec := rpcClient(t, s1) + defer codec.Close() + + testrpc.WaitForLeader(t, s1.RPC, "dc1") + + // Register the native service + args := structs.TestRegisterRequest(t) + args.Service.ConnectNative = true + var out struct{} + require.Nil(msgpackrpc.CallWithCodec(codec, "Catalog.Register", args, &out)) + + // List + req := structs.ServiceSpecificRequest{ + Connect: true, + Datacenter: "dc1", + ServiceName: args.Service.Service, + } + var resp structs.IndexedServiceNodes + require.Nil(msgpackrpc.CallWithCodec(codec, "Catalog.ServiceNodes", &req, &resp)) + require.Len(resp.ServiceNodes, 1) + v := resp.ServiceNodes[0] + require.Equal(args.Service.Service, v.ServiceName) + + // List by non-Connect + req = structs.ServiceSpecificRequest{ + Datacenter: "dc1", + ServiceName: args.Service.Service, + } + require.Nil(msgpackrpc.CallWithCodec(codec, "Catalog.ServiceNodes", &req, &resp)) + require.Len(resp.ServiceNodes, 1) + v = resp.ServiceNodes[0] + require.Equal(args.Service.Service, v.ServiceName) +} + func TestCatalog_ListServiceNodes_ConnectProxy_ACL(t *testing.T) { t.Parallel() diff --git a/agent/consul/state/catalog.go b/agent/consul/state/catalog.go index 90a3dc5eb..e7d5ba3c0 100644 --- a/agent/consul/state/catalog.go +++ b/agent/consul/state/catalog.go @@ -91,14 +91,11 @@ func servicesTableSchema() *memdb.TableSchema { Lowercase: true, }, }, - "proxy_destination": &memdb.IndexSchema{ - Name: "proxy_destination", + "connect": &memdb.IndexSchema{ + Name: "connect", AllowMissing: true, Unique: false, - Indexer: &memdb.StringFieldIndex{ - Field: "ServiceProxyDestination", - Lowercase: true, - }, + Indexer: &IndexConnectService{}, }, }, } @@ -819,7 +816,7 @@ func (s *Store) serviceNodes(ws memdb.WatchSet, serviceName string, connect bool } } else { f = func() (memdb.ResultIterator, error) { - return tx.Get("services", "proxy_destination", serviceName) + return tx.Get("services", "connect", serviceName) } } @@ -1540,7 +1537,7 @@ func (s *Store) checkServiceNodes(ws memdb.WatchSet, serviceName string, connect } } else { f = func() (memdb.ResultIterator, error) { - return tx.Get("services", "proxy_destination", serviceName) + return tx.Get("services", "connect", serviceName) } } diff --git a/agent/consul/state/catalog_test.go b/agent/consul/state/catalog_test.go index 9d771ca48..2dd0fdaf2 100644 --- a/agent/consul/state/catalog_test.go +++ b/agent/consul/state/catalog_test.go @@ -1590,7 +1590,8 @@ func TestStateStore_ConnectServiceNodes(t *testing.T) { assert.Nil(s.EnsureService(13, "bar", &structs.NodeService{ID: "api", Service: "api", Tags: nil, Address: "", Port: 5000})) assert.Nil(s.EnsureService(14, "foo", &structs.NodeService{Kind: structs.ServiceKindConnectProxy, ID: "proxy", Service: "proxy", ProxyDestination: "db", Port: 8000})) assert.Nil(s.EnsureService(15, "bar", &structs.NodeService{Kind: structs.ServiceKindConnectProxy, ID: "proxy", Service: "proxy", ProxyDestination: "db", Port: 8000})) - assert.Nil(s.EnsureService(16, "bar", &structs.NodeService{ID: "db2", Service: "db", Tags: []string{"slave"}, Address: "", Port: 8001})) + assert.Nil(s.EnsureService(16, "bar", &structs.NodeService{ID: "native-db", Service: "db", ConnectNative: true})) + assert.Nil(s.EnsureService(17, "bar", &structs.NodeService{ID: "db2", Service: "db", Tags: []string{"slave"}, Address: "", Port: 8001})) assert.True(watchFired(ws)) // Read everything back. @@ -1598,11 +1599,13 @@ func TestStateStore_ConnectServiceNodes(t *testing.T) { idx, nodes, err = s.ConnectServiceNodes(ws, "db") assert.Nil(err) assert.Equal(idx, uint64(idx)) - assert.Len(nodes, 2) + assert.Len(nodes, 3) for _, n := range nodes { - assert.Equal(structs.ServiceKindConnectProxy, n.ServiceKind) - assert.Equal("db", n.ServiceProxyDestination) + assert.True( + n.ServiceKind == structs.ServiceKindConnectProxy || + n.ServiceConnectNative, + "either proxy or connect native") } // Registering some unrelated node should not fire the watch. diff --git a/agent/consul/state/index_connect.go b/agent/consul/state/index_connect.go new file mode 100644 index 000000000..dc1390af0 --- /dev/null +++ b/agent/consul/state/index_connect.go @@ -0,0 +1,54 @@ +package state + +import ( + "fmt" + "strings" + + "github.com/hashicorp/consul/agent/structs" +) + +// IndexConnectService indexes a *struct.ServiceNode for querying by +// services that support Connect to some target service. This will +// properly index the proxy destination for proxies and the service name +// for native services. +type IndexConnectService struct{} + +func (idx *IndexConnectService) FromObject(obj interface{}) (bool, []byte, error) { + sn, ok := obj.(*structs.ServiceNode) + if !ok { + return false, nil, fmt.Errorf("Object must be ServiceNode, got %T", obj) + } + + var result []byte + switch { + case sn.ServiceKind == structs.ServiceKindConnectProxy: + // For proxies, this service supports Connect for the destination + result = []byte(strings.ToLower(sn.ServiceProxyDestination)) + + case sn.ServiceConnectNative: + // For native, this service supports Connect directly + result = []byte(strings.ToLower(sn.ServiceName)) + + default: + // Doesn't support Connect at all + return false, nil, nil + } + + // Return the result with the null terminator appended so we can + // differentiate prefix vs. non-prefix matches. + return true, append(result, '\x00'), nil +} + +func (idx *IndexConnectService) FromArgs(args ...interface{}) ([]byte, error) { + if len(args) != 1 { + return nil, fmt.Errorf("must provide only a single argument") + } + + arg, ok := args[0].(string) + if !ok { + return nil, fmt.Errorf("argument must be a string: %#v", args[0]) + } + + // Add the null character as a terminator + return append([]byte(strings.ToLower(arg)), '\x00'), nil +} diff --git a/agent/consul/state/index_connect_test.go b/agent/consul/state/index_connect_test.go new file mode 100644 index 000000000..f8e50c91f --- /dev/null +++ b/agent/consul/state/index_connect_test.go @@ -0,0 +1,122 @@ +package state + +import ( + "testing" + + "github.com/hashicorp/consul/agent/structs" + "github.com/stretchr/testify/require" +) + +func TestIndexConnectService_FromObject(t *testing.T) { + cases := []struct { + Name string + Input interface{} + ExpectMatch bool + ExpectVal []byte + ExpectErr string + }{ + { + "not a ServiceNode", + 42, + false, + nil, + "ServiceNode", + }, + + { + "typical service, not native", + &structs.ServiceNode{ + ServiceName: "db", + }, + false, + nil, + "", + }, + + { + "typical service, is native", + &structs.ServiceNode{ + ServiceName: "dB", + ServiceConnectNative: true, + }, + true, + []byte("db\x00"), + "", + }, + + { + "proxy service", + &structs.ServiceNode{ + ServiceKind: structs.ServiceKindConnectProxy, + ServiceName: "db", + ServiceProxyDestination: "fOo", + }, + true, + []byte("foo\x00"), + "", + }, + } + + for _, tc := range cases { + t.Run(tc.Name, func(t *testing.T) { + require := require.New(t) + + var idx IndexConnectService + match, val, err := idx.FromObject(tc.Input) + if tc.ExpectErr != "" { + require.Error(err) + require.Contains(err.Error(), tc.ExpectErr) + return + } + require.NoError(err) + require.Equal(tc.ExpectMatch, match) + require.Equal(tc.ExpectVal, val) + }) + } +} + +func TestIndexConnectService_FromArgs(t *testing.T) { + cases := []struct { + Name string + Args []interface{} + ExpectVal []byte + ExpectErr string + }{ + { + "multiple arguments", + []interface{}{"foo", "bar"}, + nil, + "single", + }, + + { + "not a string", + []interface{}{42}, + nil, + "must be a string", + }, + + { + "string", + []interface{}{"fOO"}, + []byte("foo\x00"), + "", + }, + } + + for _, tc := range cases { + t.Run(tc.Name, func(t *testing.T) { + require := require.New(t) + + var idx IndexConnectService + val, err := idx.FromArgs(tc.Args...) + if tc.ExpectErr != "" { + require.Error(err) + require.Contains(err.Error(), tc.ExpectErr) + return + } + require.NoError(err) + require.Equal(tc.ExpectVal, val) + }) + } +} diff --git a/agent/structs/structs.go b/agent/structs/structs.go index 34d59b575..4f8eb9eb6 100644 --- a/agent/structs/structs.go +++ b/agent/structs/structs.go @@ -555,6 +555,11 @@ func (s *NodeService) Validate() error { result = multierror.Append(result, fmt.Errorf( "Port must be set for a Connect proxy")) } + + if s.ConnectNative { + result = multierror.Append(result, fmt.Errorf( + "A Proxy cannot also be ConnectNative, only typical services")) + } } return result diff --git a/agent/structs/structs_test.go b/agent/structs/structs_test.go index 2e44a1476..95e4f88a7 100644 --- a/agent/structs/structs_test.go +++ b/agent/structs/structs_test.go @@ -246,6 +246,12 @@ func TestStructs_NodeService_ValidateConnectProxy(t *testing.T) { func(x *NodeService) { x.Port = 0 }, "Port must", }, + + { + "connect-proxy: ConnectNative set", + func(x *NodeService) { x.ConnectNative = true }, + "cannot also be", + }, } for _, tc := range cases {