Add topology RPC endpoint

This commit is contained in:
freddygv 2020-09-28 19:42:03 -06:00
parent 21c4708fe9
commit 7c11580e93
7 changed files with 480 additions and 12 deletions

View File

@ -559,3 +559,289 @@ func registerTestCatalogEntriesMap(t *testing.T, codec rpc.ClientCodec, registra
require.NoError(t, err, "Failed catalog registration %q: %v", name, err)
}
}
func registerTestTopologyEntries(t *testing.T, codec rpc.ClientCodec, token string) {
t.Helper()
// api and api-proxy on node foo - upstream: web
// web and web-proxy on node bar - upstream: redis
// web and web-proxy on node baz - upstream: redis
// redis and redis-proxy on node zip
registrations := map[string]*structs.RegisterRequest{
"Node foo": {
Datacenter: "dc1",
Node: "foo",
ID: types.NodeID("e0155642-135d-4739-9853-a1ee6c9f945b"),
Address: "127.0.0.2",
Checks: structs.HealthChecks{
&structs.HealthCheck{
Node: "foo",
CheckID: "foo:alive",
Name: "foo-liveness",
Status: api.HealthPassing,
},
},
WriteRequest: structs.WriteRequest{Token: token},
},
"Service api on foo": {
Datacenter: "dc1",
Node: "foo",
SkipNodeUpdate: true,
Service: &structs.NodeService{
Kind: structs.ServiceKindTypical,
ID: "api",
Service: "api",
Port: 9090,
Address: "198.18.1.2",
},
Checks: structs.HealthChecks{
&structs.HealthCheck{
Node: "foo",
CheckID: "foo:api",
Name: "api-liveness",
Status: api.HealthPassing,
ServiceID: "api",
ServiceName: "api",
},
},
WriteRequest: structs.WriteRequest{Token: token},
},
"Service api-proxy": {
Datacenter: "dc1",
Node: "foo",
SkipNodeUpdate: true,
Service: &structs.NodeService{
Kind: structs.ServiceKindConnectProxy,
ID: "api-proxy",
Service: "api-proxy",
Port: 8443,
Address: "198.18.1.2",
Proxy: structs.ConnectProxyConfig{
DestinationServiceName: "api",
Upstreams: structs.Upstreams{
{
DestinationName: "web",
LocalBindPort: 8080,
},
},
},
},
Checks: structs.HealthChecks{
&structs.HealthCheck{
Node: "foo",
CheckID: "foo:api-proxy",
Name: "api proxy listening",
Status: api.HealthPassing,
ServiceID: "api-proxy",
ServiceName: "api-proxy",
},
},
WriteRequest: structs.WriteRequest{Token: token},
},
"Node bar": {
Datacenter: "dc1",
Node: "bar",
ID: types.NodeID("c3e5fc07-3b2d-4c06-b8fc-a1a12432d459"),
Address: "127.0.0.3",
Checks: structs.HealthChecks{
&structs.HealthCheck{
Node: "bar",
CheckID: "bar:alive",
Name: "bar-liveness",
Status: api.HealthPassing,
},
},
WriteRequest: structs.WriteRequest{Token: token},
},
"Service web on bar": {
Datacenter: "dc1",
Node: "bar",
SkipNodeUpdate: true,
Service: &structs.NodeService{
Kind: structs.ServiceKindTypical,
ID: "web",
Service: "web",
Port: 80,
Address: "198.18.1.20",
},
Checks: structs.HealthChecks{
&structs.HealthCheck{
Node: "bar",
CheckID: "bar:web",
Name: "web-liveness",
Status: api.HealthWarning,
ServiceID: "web",
ServiceName: "web",
},
},
WriteRequest: structs.WriteRequest{Token: token},
},
"Service web-proxy on bar": {
Datacenter: "dc1",
Node: "bar",
SkipNodeUpdate: true,
Service: &structs.NodeService{
Kind: structs.ServiceKindConnectProxy,
ID: "web-proxy",
Service: "web-proxy",
Port: 8443,
Address: "198.18.1.20",
Proxy: structs.ConnectProxyConfig{
DestinationServiceName: "web",
Upstreams: structs.Upstreams{
{
DestinationName: "redis",
LocalBindPort: 123,
},
},
},
},
Checks: structs.HealthChecks{
&structs.HealthCheck{
Node: "bar",
CheckID: "bar:web-proxy",
Name: "web proxy listening",
Status: api.HealthCritical,
ServiceID: "web-proxy",
ServiceName: "web-proxy",
},
},
WriteRequest: structs.WriteRequest{Token: token},
},
"Node baz": {
Datacenter: "dc1",
Node: "baz",
ID: types.NodeID("37ea7c44-a2a1-4764-ae28-7dfebeb54a22"),
Address: "127.0.0.4",
Checks: structs.HealthChecks{
&structs.HealthCheck{
Node: "baz",
CheckID: "baz:alive",
Name: "baz-liveness",
Status: api.HealthPassing,
},
},
WriteRequest: structs.WriteRequest{Token: token},
},
"Service web on baz": {
Datacenter: "dc1",
Node: "baz",
SkipNodeUpdate: true,
Service: &structs.NodeService{
Kind: structs.ServiceKindTypical,
ID: "web",
Service: "web",
Port: 80,
Address: "198.18.1.40",
},
Checks: structs.HealthChecks{
&structs.HealthCheck{
Node: "baz",
CheckID: "baz:web",
Name: "web-liveness",
Status: api.HealthPassing,
ServiceID: "web",
ServiceName: "web",
},
},
WriteRequest: structs.WriteRequest{Token: token},
},
"Service web-proxy on baz": {
Datacenter: "dc1",
Node: "baz",
SkipNodeUpdate: true,
Service: &structs.NodeService{
Kind: structs.ServiceKindConnectProxy,
ID: "web-proxy",
Service: "web-proxy",
Port: 8443,
Address: "198.18.1.40",
Proxy: structs.ConnectProxyConfig{
DestinationServiceName: "web",
Upstreams: structs.Upstreams{
{
DestinationName: "redis",
LocalBindPort: 123,
},
},
},
},
Checks: structs.HealthChecks{
&structs.HealthCheck{
Node: "baz",
CheckID: "baz:web-proxy",
Name: "web proxy listening",
Status: api.HealthCritical,
ServiceID: "web-proxy",
ServiceName: "web-proxy",
},
},
WriteRequest: structs.WriteRequest{Token: token},
},
"Node zip": {
Datacenter: "dc1",
Node: "zip",
ID: types.NodeID("dc49fc8c-afc7-4a87-815d-74d144535075"),
Address: "127.0.0.5",
Checks: structs.HealthChecks{
&structs.HealthCheck{
Node: "zip",
CheckID: "zip:alive",
Name: "zip-liveness",
Status: api.HealthPassing,
},
},
WriteRequest: structs.WriteRequest{Token: token},
},
"Service redis on zip": {
Datacenter: "dc1",
Node: "zip",
SkipNodeUpdate: true,
Service: &structs.NodeService{
Kind: structs.ServiceKindTypical,
ID: "redis",
Service: "redis",
Port: 6379,
Address: "198.18.1.60",
},
Checks: structs.HealthChecks{
&structs.HealthCheck{
Node: "zip",
CheckID: "zip:redis",
Name: "redis-liveness",
Status: api.HealthPassing,
ServiceID: "redis",
ServiceName: "redis",
},
},
WriteRequest: structs.WriteRequest{Token: token},
},
"Service redis-proxy on zip": {
Datacenter: "dc1",
Node: "zip",
SkipNodeUpdate: true,
Service: &structs.NodeService{
Kind: structs.ServiceKindConnectProxy,
ID: "redis-proxy",
Service: "redis-proxy",
Port: 8443,
Address: "198.18.1.60",
Proxy: structs.ConnectProxyConfig{
DestinationServiceName: "redis",
},
},
Checks: structs.HealthChecks{
&structs.HealthCheck{
Node: "zip",
CheckID: "zip:redis-proxy",
Name: "redis proxy listening",
Status: api.HealthCritical,
ServiceID: "redis-proxy",
ServiceName: "redis-proxy",
},
},
WriteRequest: structs.WriteRequest{Token: token},
},
}
registerTestCatalogEntriesMap(t, codec, registrations)
}

View File

@ -144,6 +144,45 @@ func (m *Internal) ServiceDump(args *structs.ServiceDumpRequest, reply *structs.
})
}
func (m *Internal) ServiceTopology(args *structs.ServiceSpecificRequest, reply *structs.IndexedServiceTopology) error {
if done, err := m.srv.ForwardRPC("Internal.ServiceTopology", args, args, reply); done {
return err
}
if args.ServiceName == "" {
return fmt.Errorf("Must provide a service name")
}
var authzContext acl.AuthorizerContext
authz, err := m.srv.ResolveTokenAndDefaultMeta(args.Token, &args.EnterpriseMeta, &authzContext)
if err != nil {
return err
}
if err := m.srv.validateEnterpriseRequest(&args.EnterpriseMeta, false); err != nil {
return err
}
if authz != nil && authz.ServiceRead(args.ServiceName, &authzContext) != acl.Allow {
return acl.ErrPermissionDenied
}
return m.srv.blockingQuery(
&args.QueryOptions,
&reply.QueryMeta,
func(ws memdb.WatchSet, state *state.Store) error {
index, topology, err := state.ServiceTopology(ws, args.Datacenter, args.ServiceName, &args.EnterpriseMeta)
if err != nil {
return err
}
reply.Index = index
reply.ServiceTopology = topology
if err := m.srv.filterACL(args.Token, reply); err != nil {
return err
}
return nil
})
}
// GatewayServiceNodes returns all the nodes for services associated with a gateway along with their gateway config
func (m *Internal) GatewayServiceDump(args *structs.ServiceSpecificRequest, reply *structs.IndexedServiceDump) error {
if done, err := m.srv.ForwardRPC("Internal.GatewayServiceDump", args, args, reply); done {

View File

@ -1605,3 +1605,145 @@ service_prefix "terminating-gateway" { policy = "read" }
}
assert.ElementsMatch(t, expected, actual)
}
func TestInternal_ServiceTopology(t *testing.T) {
t.Parallel()
dir1, s1 := testServer(t)
defer os.RemoveAll(dir1)
defer s1.Shutdown()
testrpc.WaitForLeader(t, s1.RPC, "dc1")
codec := rpcClient(t, s1)
defer codec.Close()
// api and api-proxy on node foo - upstream: web
// web and web-proxy on node bar - upstream: redis
// web and web-proxy on node baz - upstream: redis
// redis and redis-proxy on node zip
registerTestTopologyEntries(t, codec, "")
t.Run("api", func(t *testing.T) {
args := structs.ServiceSpecificRequest{
Datacenter: "dc1",
ServiceName: "api",
}
var out structs.IndexedServiceTopology
require.NoError(t, msgpackrpc.CallWithCodec(codec, "Internal.ServiceTopology", &args, &out))
require.False(t, out.FilteredByACLs)
// bar/web, bar/web-proxy, baz/web, baz/web-proxy
require.Len(t, out.ServiceTopology.Upstreams, 4)
require.Len(t, out.ServiceTopology.Downstreams, 0)
})
t.Run("web", func(t *testing.T) {
args := structs.ServiceSpecificRequest{
Datacenter: "dc1",
ServiceName: "web",
}
var out structs.IndexedServiceTopology
require.NoError(t, msgpackrpc.CallWithCodec(codec, "Internal.ServiceTopology", &args, &out))
require.False(t, out.FilteredByACLs)
// foo/api, foo/api-proxy
require.Len(t, out.ServiceTopology.Upstreams, 2)
// zip/redis, zip/redis-proxy
require.Len(t, out.ServiceTopology.Downstreams, 2)
})
t.Run("redis", func(t *testing.T) {
args := structs.ServiceSpecificRequest{
Datacenter: "dc1",
ServiceName: "redis",
}
var out structs.IndexedServiceTopology
require.NoError(t, msgpackrpc.CallWithCodec(codec, "Internal.ServiceTopology", &args, &out))
require.False(t, out.FilteredByACLs)
require.Len(t, out.ServiceTopology.Upstreams, 0)
// bar/web, bar/web-proxy, baz/web, baz/web-proxy
require.Len(t, out.ServiceTopology.Downstreams, 4)
})
}
func TestInternal_ServiceTopology_ACL(t *testing.T) {
t.Parallel()
dir1, s1 := testServerWithConfig(t, func(c *Config) {
c.ACLDatacenter = "dc1"
c.ACLsEnabled = true
c.ACLMasterToken = TestDefaultMasterToken
c.ACLDefaultPolicy = "deny"
})
defer os.RemoveAll(dir1)
defer s1.Shutdown()
testrpc.WaitForLeader(t, s1.RPC, "dc1")
codec := rpcClient(t, s1)
defer codec.Close()
// api and api-proxy on node foo - upstream: web
// web and web-proxy on node bar - upstream: redis
// web and web-proxy on node baz - upstream: redis
// redis and redis-proxy on node zip
registerTestTopologyEntries(t, codec, TestDefaultMasterToken)
// Token grants read to: foo/api, foo/api-proxy, bar/web, baz/web
userToken, err := upsertTestTokenWithPolicyRules(codec, TestDefaultMasterToken, "dc1", `
node_prefix "" { policy = "read" }
service_prefix "api" { policy = "read" }
service "web" { policy = "read" }
`)
require.NoError(t, err)
t.Run("api can't read web", func(t *testing.T) {
args := structs.ServiceSpecificRequest{
Datacenter: "dc1",
ServiceName: "api",
QueryOptions: structs.QueryOptions{Token: userToken.SecretID},
}
var out structs.IndexedServiceTopology
require.NoError(t, msgpackrpc.CallWithCodec(codec, "Internal.ServiceTopology", &args, &out))
require.True(t, out.FilteredByACLs)
// The web-proxy upstream gets filtered out from both bar and baz
require.Len(t, out.ServiceTopology.Upstreams, 2)
require.Equal(t, "web", out.ServiceTopology.Upstreams[0].Service.Service)
require.Equal(t, "web", out.ServiceTopology.Upstreams[1].Service.Service)
require.Len(t, out.ServiceTopology.Downstreams, 0)
})
t.Run("web can't read redis", func(t *testing.T) {
args := structs.ServiceSpecificRequest{
Datacenter: "dc1",
ServiceName: "web",
QueryOptions: structs.QueryOptions{Token: userToken.SecretID},
}
var out structs.IndexedServiceTopology
require.NoError(t, msgpackrpc.CallWithCodec(codec, "Internal.ServiceTopology", &args, &out))
require.True(t, out.FilteredByACLs)
// The redis upstream gets filtered out but the api and proxy downstream are returned
require.Len(t, out.ServiceTopology.Upstreams, 0)
require.Len(t, out.ServiceTopology.Downstreams, 2)
})
t.Run("redis can't read self", func(t *testing.T) {
args := structs.ServiceSpecificRequest{
Datacenter: "dc1",
ServiceName: "redis",
QueryOptions: structs.QueryOptions{Token: userToken.SecretID},
}
var out structs.IndexedServiceTopology
err := msgpackrpc.CallWithCodec(codec, "Internal.ServiceTopology", &args, &out)
// Can't read self, fails fast
require.True(t, acl.IsErrPermissionDenied(err))
})
}

View File

@ -3009,7 +3009,7 @@ func (s *Store) UpstreamsForService(ws memdb.WatchSet, dc string, sn structs.Ser
)
for _, u := range upstreams {
// Evaluate the targets from the upstream's discovery chain
idx, targets, err := s.targetsForSource(ws, tx, dc, u.Name, &u.EnterpriseMeta)
idx, targets, err := s.discoveryChainTargets(ws, dc, u.Name, &u.EnterpriseMeta)
if err != nil {
return 0, nil, fmt.Errorf("failed to get discovery chain targets for %q: %v", u.String(), err)
}
@ -3038,7 +3038,7 @@ func (s *Store) DownstreamsForService(ws memdb.WatchSet, dc string, service stru
defer tx.Abort()
// First fetch services with discovery chains that list the input as a target
idx, sources, err := s.sourcesForTarget(ws, tx, dc, service)
idx, sources, err := s.discoveryChainSources(ws, tx, dc, service)
if err != nil {
return 0, nil, fmt.Errorf("failed to get sources for discovery chain target %q: %v", service.String(), err)
}

View File

@ -6194,7 +6194,7 @@ func TestCatalog_catalogDownstreams_Watches(t *testing.T) {
ws = memdb.NewWatchSet()
tx = s.db.ReadTxn()
idx, names, err = downstreamsFromRegistration(ws, tx, admin)
idx, _, err = downstreamsFromRegistration(ws, tx, admin)
require.NoError(t, err)
exp = expect{
@ -6225,7 +6225,7 @@ func TestCatalog_catalogDownstreams_Watches(t *testing.T) {
ws = memdb.NewWatchSet()
tx = s.db.ReadTxn()
idx, names, err = downstreamsFromRegistration(ws, tx, cache)
idx, _, err = downstreamsFromRegistration(ws, tx, cache)
require.NoError(t, err)
@ -6696,7 +6696,7 @@ func TestCatalog_upstreamsFromRegistration_Watches(t *testing.T) {
ws = memdb.NewWatchSet()
tx = s.db.ReadTxn()
idx, names, err = upstreamsFromRegistration(ws, tx, web)
idx, _, err = upstreamsFromRegistration(ws, tx, web)
require.NoError(t, err)
@ -6994,7 +6994,7 @@ func TestCatalog_DownstreamsForService(t *testing.T) {
}
ws := memdb.NewWatchSet()
sn := structs.NewServiceName("api", structs.DefaultEnterpriseMeta())
sn := structs.NewServiceName("admin", structs.DefaultEnterpriseMeta())
idx, names, err := s.DownstreamsForService(ws, "dc1", sn)
require.NoError(t, err)

View File

@ -373,8 +373,8 @@ var serviceGraphKinds = []string{
structs.ServiceResolver,
}
// targetsForSource will return a list of services listed as a target for the input's discovery chain
func (s *Store) targetsForSource(ws memdb.WatchSet, tx ReadTxn, dc, service string, entMeta *structs.EnterpriseMeta) (uint64, []structs.ServiceName, error) {
// discoveryChainTargets will return a list of services listed as a target for the input's discovery chain
func (s *Store) discoveryChainTargets(ws memdb.WatchSet, dc, service string, entMeta *structs.EnterpriseMeta) (uint64, []structs.ServiceName, error) {
source := structs.NewServiceName(service, entMeta)
req := discoverychain.CompileRequest{
ServiceName: source.Name,
@ -402,8 +402,8 @@ func (s *Store) targetsForSource(ws memdb.WatchSet, tx ReadTxn, dc, service stri
return idx, resp, nil
}
// sourcesForTarget will return a list of services whose discovery chains have the input service as a target
func (s *Store) sourcesForTarget(ws memdb.WatchSet, tx ReadTxn, dc string, destination structs.ServiceName) (uint64, []structs.ServiceName, error) {
// discoveryChainSources will return a list of services whose discovery chains have the input service as a target
func (s *Store) discoveryChainSources(ws memdb.WatchSet, tx ReadTxn, dc string, destination structs.ServiceName) (uint64, []structs.ServiceName, error) {
queue := []structs.ServiceName{destination}
seenLink := make(map[structs.ServiceName]bool)

View File

@ -1713,7 +1713,8 @@ func TestSourcesForTarget(t *testing.T) {
tx := s.db.ReadTxn()
defer tx.Abort()
idx, ids, err := s.sourcesForTarget(ws, tx, "dc1", "sink", nil)
sn := structs.NewServiceName("sink", structs.DefaultEnterpriseMeta())
idx, ids, err := s.discoveryChainSources(ws, tx, "dc1", sn)
require.NoError(t, err)
require.Equal(t, tc.expect.idx, idx)
@ -1914,7 +1915,7 @@ func TestTargetsForSource(t *testing.T) {
tx := s.db.ReadTxn()
defer tx.Abort()
idx, ids, err := s.targetsForSource(ws, tx, "dc1", "web", nil)
idx, ids, err := s.discoveryChainTargets(ws, "dc1", "web", nil)
require.NoError(t, err)
require.Equal(t, tc.expect.idx, idx)