From 7c11580e93eead2f89d9576d76c99412ba6218b7 Mon Sep 17 00:00:00 2001 From: freddygv Date: Mon, 28 Sep 2020 19:42:03 -0600 Subject: [PATCH] Add topology RPC endpoint --- agent/consul/helper_test.go | 286 ++++++++++++++++++++++++ agent/consul/internal_endpoint.go | 39 ++++ agent/consul/internal_endpoint_test.go | 142 ++++++++++++ agent/consul/state/catalog.go | 4 +- agent/consul/state/catalog_test.go | 8 +- agent/consul/state/config_entry.go | 8 +- agent/consul/state/config_entry_test.go | 5 +- 7 files changed, 480 insertions(+), 12 deletions(-) diff --git a/agent/consul/helper_test.go b/agent/consul/helper_test.go index 5ea53cff3..7c28649bd 100644 --- a/agent/consul/helper_test.go +++ b/agent/consul/helper_test.go @@ -559,3 +559,289 @@ func registerTestCatalogEntriesMap(t *testing.T, codec rpc.ClientCodec, registra require.NoError(t, err, "Failed catalog registration %q: %v", name, err) } } + +func registerTestTopologyEntries(t *testing.T, codec rpc.ClientCodec, token string) { + t.Helper() + + // api and api-proxy on node foo - upstream: web + // web and web-proxy on node bar - upstream: redis + // web and web-proxy on node baz - upstream: redis + // redis and redis-proxy on node zip + registrations := map[string]*structs.RegisterRequest{ + "Node foo": { + Datacenter: "dc1", + Node: "foo", + ID: types.NodeID("e0155642-135d-4739-9853-a1ee6c9f945b"), + Address: "127.0.0.2", + Checks: structs.HealthChecks{ + &structs.HealthCheck{ + Node: "foo", + CheckID: "foo:alive", + Name: "foo-liveness", + Status: api.HealthPassing, + }, + }, + WriteRequest: structs.WriteRequest{Token: token}, + }, + "Service api on foo": { + Datacenter: "dc1", + Node: "foo", + SkipNodeUpdate: true, + Service: &structs.NodeService{ + Kind: structs.ServiceKindTypical, + ID: "api", + Service: "api", + Port: 9090, + Address: "198.18.1.2", + }, + Checks: structs.HealthChecks{ + &structs.HealthCheck{ + Node: "foo", + CheckID: "foo:api", + Name: "api-liveness", + Status: api.HealthPassing, + ServiceID: "api", + ServiceName: "api", + }, + }, + WriteRequest: structs.WriteRequest{Token: token}, + }, + "Service api-proxy": { + Datacenter: "dc1", + Node: "foo", + SkipNodeUpdate: true, + Service: &structs.NodeService{ + Kind: structs.ServiceKindConnectProxy, + ID: "api-proxy", + Service: "api-proxy", + Port: 8443, + Address: "198.18.1.2", + Proxy: structs.ConnectProxyConfig{ + DestinationServiceName: "api", + Upstreams: structs.Upstreams{ + { + DestinationName: "web", + LocalBindPort: 8080, + }, + }, + }, + }, + Checks: structs.HealthChecks{ + &structs.HealthCheck{ + Node: "foo", + CheckID: "foo:api-proxy", + Name: "api proxy listening", + Status: api.HealthPassing, + ServiceID: "api-proxy", + ServiceName: "api-proxy", + }, + }, + WriteRequest: structs.WriteRequest{Token: token}, + }, + "Node bar": { + Datacenter: "dc1", + Node: "bar", + ID: types.NodeID("c3e5fc07-3b2d-4c06-b8fc-a1a12432d459"), + Address: "127.0.0.3", + Checks: structs.HealthChecks{ + &structs.HealthCheck{ + Node: "bar", + CheckID: "bar:alive", + Name: "bar-liveness", + Status: api.HealthPassing, + }, + }, + WriteRequest: structs.WriteRequest{Token: token}, + }, + "Service web on bar": { + Datacenter: "dc1", + Node: "bar", + SkipNodeUpdate: true, + Service: &structs.NodeService{ + Kind: structs.ServiceKindTypical, + ID: "web", + Service: "web", + Port: 80, + Address: "198.18.1.20", + }, + Checks: structs.HealthChecks{ + &structs.HealthCheck{ + Node: "bar", + CheckID: "bar:web", + Name: "web-liveness", + Status: api.HealthWarning, + ServiceID: "web", + ServiceName: "web", + }, + }, + WriteRequest: structs.WriteRequest{Token: token}, + }, + "Service web-proxy on bar": { + Datacenter: "dc1", + Node: "bar", + SkipNodeUpdate: true, + Service: &structs.NodeService{ + Kind: structs.ServiceKindConnectProxy, + ID: "web-proxy", + Service: "web-proxy", + Port: 8443, + Address: "198.18.1.20", + Proxy: structs.ConnectProxyConfig{ + DestinationServiceName: "web", + Upstreams: structs.Upstreams{ + { + DestinationName: "redis", + LocalBindPort: 123, + }, + }, + }, + }, + Checks: structs.HealthChecks{ + &structs.HealthCheck{ + Node: "bar", + CheckID: "bar:web-proxy", + Name: "web proxy listening", + Status: api.HealthCritical, + ServiceID: "web-proxy", + ServiceName: "web-proxy", + }, + }, + WriteRequest: structs.WriteRequest{Token: token}, + }, + "Node baz": { + Datacenter: "dc1", + Node: "baz", + ID: types.NodeID("37ea7c44-a2a1-4764-ae28-7dfebeb54a22"), + Address: "127.0.0.4", + Checks: structs.HealthChecks{ + &structs.HealthCheck{ + Node: "baz", + CheckID: "baz:alive", + Name: "baz-liveness", + Status: api.HealthPassing, + }, + }, + WriteRequest: structs.WriteRequest{Token: token}, + }, + "Service web on baz": { + Datacenter: "dc1", + Node: "baz", + SkipNodeUpdate: true, + Service: &structs.NodeService{ + Kind: structs.ServiceKindTypical, + ID: "web", + Service: "web", + Port: 80, + Address: "198.18.1.40", + }, + Checks: structs.HealthChecks{ + &structs.HealthCheck{ + Node: "baz", + CheckID: "baz:web", + Name: "web-liveness", + Status: api.HealthPassing, + ServiceID: "web", + ServiceName: "web", + }, + }, + WriteRequest: structs.WriteRequest{Token: token}, + }, + "Service web-proxy on baz": { + Datacenter: "dc1", + Node: "baz", + SkipNodeUpdate: true, + Service: &structs.NodeService{ + Kind: structs.ServiceKindConnectProxy, + ID: "web-proxy", + Service: "web-proxy", + Port: 8443, + Address: "198.18.1.40", + Proxy: structs.ConnectProxyConfig{ + DestinationServiceName: "web", + Upstreams: structs.Upstreams{ + { + DestinationName: "redis", + LocalBindPort: 123, + }, + }, + }, + }, + Checks: structs.HealthChecks{ + &structs.HealthCheck{ + Node: "baz", + CheckID: "baz:web-proxy", + Name: "web proxy listening", + Status: api.HealthCritical, + ServiceID: "web-proxy", + ServiceName: "web-proxy", + }, + }, + WriteRequest: structs.WriteRequest{Token: token}, + }, + "Node zip": { + Datacenter: "dc1", + Node: "zip", + ID: types.NodeID("dc49fc8c-afc7-4a87-815d-74d144535075"), + Address: "127.0.0.5", + Checks: structs.HealthChecks{ + &structs.HealthCheck{ + Node: "zip", + CheckID: "zip:alive", + Name: "zip-liveness", + Status: api.HealthPassing, + }, + }, + WriteRequest: structs.WriteRequest{Token: token}, + }, + "Service redis on zip": { + Datacenter: "dc1", + Node: "zip", + SkipNodeUpdate: true, + Service: &structs.NodeService{ + Kind: structs.ServiceKindTypical, + ID: "redis", + Service: "redis", + Port: 6379, + Address: "198.18.1.60", + }, + Checks: structs.HealthChecks{ + &structs.HealthCheck{ + Node: "zip", + CheckID: "zip:redis", + Name: "redis-liveness", + Status: api.HealthPassing, + ServiceID: "redis", + ServiceName: "redis", + }, + }, + WriteRequest: structs.WriteRequest{Token: token}, + }, + "Service redis-proxy on zip": { + Datacenter: "dc1", + Node: "zip", + SkipNodeUpdate: true, + Service: &structs.NodeService{ + Kind: structs.ServiceKindConnectProxy, + ID: "redis-proxy", + Service: "redis-proxy", + Port: 8443, + Address: "198.18.1.60", + Proxy: structs.ConnectProxyConfig{ + DestinationServiceName: "redis", + }, + }, + Checks: structs.HealthChecks{ + &structs.HealthCheck{ + Node: "zip", + CheckID: "zip:redis-proxy", + Name: "redis proxy listening", + Status: api.HealthCritical, + ServiceID: "redis-proxy", + ServiceName: "redis-proxy", + }, + }, + WriteRequest: structs.WriteRequest{Token: token}, + }, + } + registerTestCatalogEntriesMap(t, codec, registrations) +} diff --git a/agent/consul/internal_endpoint.go b/agent/consul/internal_endpoint.go index 3f60b1015..1464ea06c 100644 --- a/agent/consul/internal_endpoint.go +++ b/agent/consul/internal_endpoint.go @@ -144,6 +144,45 @@ func (m *Internal) ServiceDump(args *structs.ServiceDumpRequest, reply *structs. }) } +func (m *Internal) ServiceTopology(args *structs.ServiceSpecificRequest, reply *structs.IndexedServiceTopology) error { + if done, err := m.srv.ForwardRPC("Internal.ServiceTopology", args, args, reply); done { + return err + } + if args.ServiceName == "" { + return fmt.Errorf("Must provide a service name") + } + + var authzContext acl.AuthorizerContext + authz, err := m.srv.ResolveTokenAndDefaultMeta(args.Token, &args.EnterpriseMeta, &authzContext) + if err != nil { + return err + } + if err := m.srv.validateEnterpriseRequest(&args.EnterpriseMeta, false); err != nil { + return err + } + if authz != nil && authz.ServiceRead(args.ServiceName, &authzContext) != acl.Allow { + return acl.ErrPermissionDenied + } + + return m.srv.blockingQuery( + &args.QueryOptions, + &reply.QueryMeta, + func(ws memdb.WatchSet, state *state.Store) error { + index, topology, err := state.ServiceTopology(ws, args.Datacenter, args.ServiceName, &args.EnterpriseMeta) + if err != nil { + return err + } + + reply.Index = index + reply.ServiceTopology = topology + + if err := m.srv.filterACL(args.Token, reply); err != nil { + return err + } + return nil + }) +} + // GatewayServiceNodes returns all the nodes for services associated with a gateway along with their gateway config func (m *Internal) GatewayServiceDump(args *structs.ServiceSpecificRequest, reply *structs.IndexedServiceDump) error { if done, err := m.srv.ForwardRPC("Internal.GatewayServiceDump", args, args, reply); done { diff --git a/agent/consul/internal_endpoint_test.go b/agent/consul/internal_endpoint_test.go index f65c64758..b14dc4a57 100644 --- a/agent/consul/internal_endpoint_test.go +++ b/agent/consul/internal_endpoint_test.go @@ -1605,3 +1605,145 @@ service_prefix "terminating-gateway" { policy = "read" } } assert.ElementsMatch(t, expected, actual) } + +func TestInternal_ServiceTopology(t *testing.T) { + t.Parallel() + dir1, s1 := testServer(t) + defer os.RemoveAll(dir1) + defer s1.Shutdown() + + testrpc.WaitForLeader(t, s1.RPC, "dc1") + + codec := rpcClient(t, s1) + defer codec.Close() + + // api and api-proxy on node foo - upstream: web + // web and web-proxy on node bar - upstream: redis + // web and web-proxy on node baz - upstream: redis + // redis and redis-proxy on node zip + registerTestTopologyEntries(t, codec, "") + + t.Run("api", func(t *testing.T) { + args := structs.ServiceSpecificRequest{ + Datacenter: "dc1", + ServiceName: "api", + } + var out structs.IndexedServiceTopology + require.NoError(t, msgpackrpc.CallWithCodec(codec, "Internal.ServiceTopology", &args, &out)) + require.False(t, out.FilteredByACLs) + + // bar/web, bar/web-proxy, baz/web, baz/web-proxy + require.Len(t, out.ServiceTopology.Upstreams, 4) + require.Len(t, out.ServiceTopology.Downstreams, 0) + }) + + t.Run("web", func(t *testing.T) { + args := structs.ServiceSpecificRequest{ + Datacenter: "dc1", + ServiceName: "web", + } + var out structs.IndexedServiceTopology + require.NoError(t, msgpackrpc.CallWithCodec(codec, "Internal.ServiceTopology", &args, &out)) + require.False(t, out.FilteredByACLs) + + // foo/api, foo/api-proxy + require.Len(t, out.ServiceTopology.Upstreams, 2) + + // zip/redis, zip/redis-proxy + require.Len(t, out.ServiceTopology.Downstreams, 2) + }) + + t.Run("redis", func(t *testing.T) { + args := structs.ServiceSpecificRequest{ + Datacenter: "dc1", + ServiceName: "redis", + } + var out structs.IndexedServiceTopology + require.NoError(t, msgpackrpc.CallWithCodec(codec, "Internal.ServiceTopology", &args, &out)) + require.False(t, out.FilteredByACLs) + + require.Len(t, out.ServiceTopology.Upstreams, 0) + + // bar/web, bar/web-proxy, baz/web, baz/web-proxy + require.Len(t, out.ServiceTopology.Downstreams, 4) + }) +} + +func TestInternal_ServiceTopology_ACL(t *testing.T) { + t.Parallel() + dir1, s1 := testServerWithConfig(t, func(c *Config) { + c.ACLDatacenter = "dc1" + c.ACLsEnabled = true + c.ACLMasterToken = TestDefaultMasterToken + c.ACLDefaultPolicy = "deny" + }) + defer os.RemoveAll(dir1) + defer s1.Shutdown() + + testrpc.WaitForLeader(t, s1.RPC, "dc1") + + codec := rpcClient(t, s1) + defer codec.Close() + + // api and api-proxy on node foo - upstream: web + // web and web-proxy on node bar - upstream: redis + // web and web-proxy on node baz - upstream: redis + // redis and redis-proxy on node zip + registerTestTopologyEntries(t, codec, TestDefaultMasterToken) + + // Token grants read to: foo/api, foo/api-proxy, bar/web, baz/web + userToken, err := upsertTestTokenWithPolicyRules(codec, TestDefaultMasterToken, "dc1", ` +node_prefix "" { policy = "read" } +service_prefix "api" { policy = "read" } +service "web" { policy = "read" } +`) + require.NoError(t, err) + + t.Run("api can't read web", func(t *testing.T) { + args := structs.ServiceSpecificRequest{ + Datacenter: "dc1", + ServiceName: "api", + QueryOptions: structs.QueryOptions{Token: userToken.SecretID}, + } + var out structs.IndexedServiceTopology + require.NoError(t, msgpackrpc.CallWithCodec(codec, "Internal.ServiceTopology", &args, &out)) + + require.True(t, out.FilteredByACLs) + + // The web-proxy upstream gets filtered out from both bar and baz + require.Len(t, out.ServiceTopology.Upstreams, 2) + require.Equal(t, "web", out.ServiceTopology.Upstreams[0].Service.Service) + require.Equal(t, "web", out.ServiceTopology.Upstreams[1].Service.Service) + + require.Len(t, out.ServiceTopology.Downstreams, 0) + }) + + t.Run("web can't read redis", func(t *testing.T) { + args := structs.ServiceSpecificRequest{ + Datacenter: "dc1", + ServiceName: "web", + QueryOptions: structs.QueryOptions{Token: userToken.SecretID}, + } + var out structs.IndexedServiceTopology + require.NoError(t, msgpackrpc.CallWithCodec(codec, "Internal.ServiceTopology", &args, &out)) + + require.True(t, out.FilteredByACLs) + + // The redis upstream gets filtered out but the api and proxy downstream are returned + require.Len(t, out.ServiceTopology.Upstreams, 0) + require.Len(t, out.ServiceTopology.Downstreams, 2) + }) + + t.Run("redis can't read self", func(t *testing.T) { + args := structs.ServiceSpecificRequest{ + Datacenter: "dc1", + ServiceName: "redis", + QueryOptions: structs.QueryOptions{Token: userToken.SecretID}, + } + var out structs.IndexedServiceTopology + err := msgpackrpc.CallWithCodec(codec, "Internal.ServiceTopology", &args, &out) + + // Can't read self, fails fast + require.True(t, acl.IsErrPermissionDenied(err)) + }) +} diff --git a/agent/consul/state/catalog.go b/agent/consul/state/catalog.go index bb7bbd53c..534f313aa 100644 --- a/agent/consul/state/catalog.go +++ b/agent/consul/state/catalog.go @@ -3009,7 +3009,7 @@ func (s *Store) UpstreamsForService(ws memdb.WatchSet, dc string, sn structs.Ser ) for _, u := range upstreams { // Evaluate the targets from the upstream's discovery chain - idx, targets, err := s.targetsForSource(ws, tx, dc, u.Name, &u.EnterpriseMeta) + idx, targets, err := s.discoveryChainTargets(ws, dc, u.Name, &u.EnterpriseMeta) if err != nil { return 0, nil, fmt.Errorf("failed to get discovery chain targets for %q: %v", u.String(), err) } @@ -3038,7 +3038,7 @@ func (s *Store) DownstreamsForService(ws memdb.WatchSet, dc string, service stru defer tx.Abort() // First fetch services with discovery chains that list the input as a target - idx, sources, err := s.sourcesForTarget(ws, tx, dc, service) + idx, sources, err := s.discoveryChainSources(ws, tx, dc, service) if err != nil { return 0, nil, fmt.Errorf("failed to get sources for discovery chain target %q: %v", service.String(), err) } diff --git a/agent/consul/state/catalog_test.go b/agent/consul/state/catalog_test.go index 6c73debe7..e0cdc4079 100644 --- a/agent/consul/state/catalog_test.go +++ b/agent/consul/state/catalog_test.go @@ -6194,7 +6194,7 @@ func TestCatalog_catalogDownstreams_Watches(t *testing.T) { ws = memdb.NewWatchSet() tx = s.db.ReadTxn() - idx, names, err = downstreamsFromRegistration(ws, tx, admin) + idx, _, err = downstreamsFromRegistration(ws, tx, admin) require.NoError(t, err) exp = expect{ @@ -6225,7 +6225,7 @@ func TestCatalog_catalogDownstreams_Watches(t *testing.T) { ws = memdb.NewWatchSet() tx = s.db.ReadTxn() - idx, names, err = downstreamsFromRegistration(ws, tx, cache) + idx, _, err = downstreamsFromRegistration(ws, tx, cache) require.NoError(t, err) @@ -6696,7 +6696,7 @@ func TestCatalog_upstreamsFromRegistration_Watches(t *testing.T) { ws = memdb.NewWatchSet() tx = s.db.ReadTxn() - idx, names, err = upstreamsFromRegistration(ws, tx, web) + idx, _, err = upstreamsFromRegistration(ws, tx, web) require.NoError(t, err) @@ -6994,7 +6994,7 @@ func TestCatalog_DownstreamsForService(t *testing.T) { } ws := memdb.NewWatchSet() - sn := structs.NewServiceName("api", structs.DefaultEnterpriseMeta()) + sn := structs.NewServiceName("admin", structs.DefaultEnterpriseMeta()) idx, names, err := s.DownstreamsForService(ws, "dc1", sn) require.NoError(t, err) diff --git a/agent/consul/state/config_entry.go b/agent/consul/state/config_entry.go index 3cb08b19e..091fd1939 100644 --- a/agent/consul/state/config_entry.go +++ b/agent/consul/state/config_entry.go @@ -373,8 +373,8 @@ var serviceGraphKinds = []string{ structs.ServiceResolver, } -// targetsForSource will return a list of services listed as a target for the input's discovery chain -func (s *Store) targetsForSource(ws memdb.WatchSet, tx ReadTxn, dc, service string, entMeta *structs.EnterpriseMeta) (uint64, []structs.ServiceName, error) { +// discoveryChainTargets will return a list of services listed as a target for the input's discovery chain +func (s *Store) discoveryChainTargets(ws memdb.WatchSet, dc, service string, entMeta *structs.EnterpriseMeta) (uint64, []structs.ServiceName, error) { source := structs.NewServiceName(service, entMeta) req := discoverychain.CompileRequest{ ServiceName: source.Name, @@ -402,8 +402,8 @@ func (s *Store) targetsForSource(ws memdb.WatchSet, tx ReadTxn, dc, service stri return idx, resp, nil } -// sourcesForTarget will return a list of services whose discovery chains have the input service as a target -func (s *Store) sourcesForTarget(ws memdb.WatchSet, tx ReadTxn, dc string, destination structs.ServiceName) (uint64, []structs.ServiceName, error) { +// discoveryChainSources will return a list of services whose discovery chains have the input service as a target +func (s *Store) discoveryChainSources(ws memdb.WatchSet, tx ReadTxn, dc string, destination structs.ServiceName) (uint64, []structs.ServiceName, error) { queue := []structs.ServiceName{destination} seenLink := make(map[structs.ServiceName]bool) diff --git a/agent/consul/state/config_entry_test.go b/agent/consul/state/config_entry_test.go index c6ccde415..4d9bb4900 100644 --- a/agent/consul/state/config_entry_test.go +++ b/agent/consul/state/config_entry_test.go @@ -1713,7 +1713,8 @@ func TestSourcesForTarget(t *testing.T) { tx := s.db.ReadTxn() defer tx.Abort() - idx, ids, err := s.sourcesForTarget(ws, tx, "dc1", "sink", nil) + sn := structs.NewServiceName("sink", structs.DefaultEnterpriseMeta()) + idx, ids, err := s.discoveryChainSources(ws, tx, "dc1", sn) require.NoError(t, err) require.Equal(t, tc.expect.idx, idx) @@ -1914,7 +1915,7 @@ func TestTargetsForSource(t *testing.T) { tx := s.db.ReadTxn() defer tx.Abort() - idx, ids, err := s.targetsForSource(ws, tx, "dc1", "web", nil) + idx, ids, err := s.discoveryChainTargets(ws, "dc1", "web", nil) require.NoError(t, err) require.Equal(t, tc.expect.idx, idx)