Service mesh topology visualization endpoint MVP

This commit is contained in:
Freddy 2020-10-05 10:53:11 -06:00 committed by GitHub
commit 3deb5ce28b
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
14 changed files with 3247 additions and 203 deletions

View File

@ -1409,6 +1409,18 @@ func (f *aclFilter) filterCheckServiceNodes(nodes *structs.CheckServiceNodes) {
*nodes = csn
}
// filterServiceTopology is used to filter upstreams/downstreams based on ACL rules.
// this filter is unlike others in that it also returns whether the result was filtered by ACLs
func (f *aclFilter) filterServiceTopology(topology *structs.ServiceTopology) bool {
numUp := len(topology.Upstreams)
numDown := len(topology.Downstreams)
f.filterCheckServiceNodes(&topology.Upstreams)
f.filterCheckServiceNodes(&topology.Downstreams)
return numUp != len(topology.Upstreams) || numDown != len(topology.Downstreams)
}
// filterDatacenterCheckServiceNodes is used to filter nodes based on ACL rules.
func (f *aclFilter) filterDatacenterCheckServiceNodes(datacenterNodes *map[string]structs.CheckServiceNodes) {
dn := *datacenterNodes
@ -1846,6 +1858,12 @@ func (r *ACLResolver) filterACLWithAuthorizer(authorizer acl.Authorizer, subj in
case *structs.IndexedCheckServiceNodes:
filt.filterCheckServiceNodes(&v.Nodes)
case *structs.IndexedServiceTopology:
filtered := filt.filterServiceTopology(v.ServiceTopology)
if filtered {
v.FilteredByACLs = true
}
case *structs.DatacenterIndexedCheckServiceNodes:
filt.filterDatacenterCheckServiceNodes(&v.DatacenterNodes)

View File

@ -2766,6 +2766,166 @@ node "node1" {
}
}
func TestACL_filterServiceTopology(t *testing.T) {
t.Parallel()
// Create some nodes.
fill := func() structs.ServiceTopology {
return structs.ServiceTopology{
Upstreams: structs.CheckServiceNodes{
structs.CheckServiceNode{
Node: &structs.Node{
Node: "node1",
},
Service: &structs.NodeService{
ID: "foo",
Service: "foo",
},
Checks: structs.HealthChecks{
&structs.HealthCheck{
Node: "node1",
CheckID: "check1",
ServiceName: "foo",
},
},
},
},
Downstreams: structs.CheckServiceNodes{
structs.CheckServiceNode{
Node: &structs.Node{
Node: "node2",
},
Service: &structs.NodeService{
ID: "bar",
Service: "bar",
},
Checks: structs.HealthChecks{
&structs.HealthCheck{
Node: "node2",
CheckID: "check1",
ServiceName: "bar",
},
},
},
},
}
}
original := fill()
t.Run("allow all without permissions", func(t *testing.T) {
topo := fill()
f := newACLFilter(acl.AllowAll(), nil)
filtered := f.filterServiceTopology(&topo)
if filtered {
t.Fatalf("should not have been filtered")
}
assert.Equal(t, original, topo)
})
t.Run("deny all without permissions", func(t *testing.T) {
topo := fill()
f := newACLFilter(acl.DenyAll(), nil)
filtered := f.filterServiceTopology(&topo)
if !filtered {
t.Fatalf("should have been marked as filtered")
}
assert.Len(t, topo.Upstreams, 0)
assert.Len(t, topo.Upstreams, 0)
})
t.Run("only upstream permissions", func(t *testing.T) {
rules := `
node "node1" {
policy = "read"
}
service "foo" {
policy = "read"
}`
policy, err := acl.NewPolicyFromSource("", 0, rules, acl.SyntaxLegacy, nil, nil)
if err != nil {
t.Fatalf("err %v", err)
}
perms, err := acl.NewPolicyAuthorizerWithDefaults(acl.DenyAll(), []*acl.Policy{policy}, nil)
if err != nil {
t.Fatalf("err: %v", err)
}
topo := fill()
f := newACLFilter(perms, nil)
filtered := f.filterServiceTopology(&topo)
if !filtered {
t.Fatalf("should have been marked as filtered")
}
assert.Equal(t, original.Upstreams, topo.Upstreams)
assert.Len(t, topo.Downstreams, 0)
})
t.Run("only downstream permissions", func(t *testing.T) {
rules := `
node "node2" {
policy = "read"
}
service "bar" {
policy = "read"
}`
policy, err := acl.NewPolicyFromSource("", 0, rules, acl.SyntaxLegacy, nil, nil)
if err != nil {
t.Fatalf("err %v", err)
}
perms, err := acl.NewPolicyAuthorizerWithDefaults(acl.DenyAll(), []*acl.Policy{policy}, nil)
if err != nil {
t.Fatalf("err: %v", err)
}
topo := fill()
f := newACLFilter(perms, nil)
filtered := f.filterServiceTopology(&topo)
if !filtered {
t.Fatalf("should have been marked as filtered")
}
assert.Equal(t, original.Downstreams, topo.Downstreams)
assert.Len(t, topo.Upstreams, 0)
})
t.Run("upstream and downstream permissions", func(t *testing.T) {
rules := `
node "node1" {
policy = "read"
}
service "foo" {
policy = "read"
}
node "node2" {
policy = "read"
}
service "bar" {
policy = "read"
}`
policy, err := acl.NewPolicyFromSource("", 0, rules, acl.SyntaxLegacy, nil, nil)
if err != nil {
t.Fatalf("err %v", err)
}
perms, err := acl.NewPolicyAuthorizerWithDefaults(acl.DenyAll(), []*acl.Policy{policy}, nil)
if err != nil {
t.Fatalf("err: %v", err)
}
topo := fill()
f := newACLFilter(perms, nil)
filtered := f.filterServiceTopology(&topo)
if filtered {
t.Fatalf("should not have been filtered")
}
original := fill()
assert.Equal(t, original, topo)
})
}
func TestACL_filterCoordinates(t *testing.T) {
t.Parallel()
// Create some coordinates.

View File

@ -1,13 +1,11 @@
package consul
import (
"errors"
"fmt"
"time"
metrics "github.com/armon/go-metrics"
"github.com/hashicorp/consul/acl"
"github.com/hashicorp/consul/agent/connect"
"github.com/hashicorp/consul/agent/consul/discoverychain"
"github.com/hashicorp/consul/agent/consul/state"
"github.com/hashicorp/consul/agent/structs"
@ -53,39 +51,16 @@ func (c *DiscoveryChain) Get(args *structs.DiscoveryChainRequest, reply *structs
&args.QueryOptions,
&reply.QueryMeta,
func(ws memdb.WatchSet, state *state.Store) error {
index, entries, err := state.ReadDiscoveryChainConfigEntries(ws, args.Name, entMeta)
if err != nil {
return err
}
_, config, err := state.CAConfig(ws)
if err != nil {
return err
} else if config == nil {
return errors.New("no cluster ca config setup")
}
// Build TrustDomain based on the ClusterID stored.
signingID := connect.SpiffeIDSigningForCluster(config)
if signingID == nil {
// If CA is bootstrapped at all then this should never happen but be
// defensive.
return errors.New("no cluster trust domain setup")
}
currentTrustDomain := signingID.Host()
// Then we compile it into something useful.
chain, err := discoverychain.Compile(discoverychain.CompileRequest{
req := discoverychain.CompileRequest{
ServiceName: args.Name,
EvaluateInNamespace: entMeta.NamespaceOrDefault(),
EvaluateInDatacenter: evalDC,
EvaluateInTrustDomain: currentTrustDomain,
UseInDatacenter: c.srv.config.Datacenter,
OverrideMeshGateway: args.OverrideMeshGateway,
OverrideProtocol: args.OverrideProtocol,
OverrideConnectTimeout: args.OverrideConnectTimeout,
Entries: entries,
})
}
index, chain, err := state.ServiceDiscoveryChain(ws, args.Name, entMeta, req)
if err != nil {
return err
}

View File

@ -559,3 +559,289 @@ func registerTestCatalogEntriesMap(t *testing.T, codec rpc.ClientCodec, registra
require.NoError(t, err, "Failed catalog registration %q: %v", name, err)
}
}
func registerTestTopologyEntries(t *testing.T, codec rpc.ClientCodec, token string) {
t.Helper()
// api and api-proxy on node foo - upstream: web
// web and web-proxy on node bar - upstream: redis
// web and web-proxy on node baz - upstream: redis
// redis and redis-proxy on node zip
registrations := map[string]*structs.RegisterRequest{
"Node foo": {
Datacenter: "dc1",
Node: "foo",
ID: types.NodeID("e0155642-135d-4739-9853-a1ee6c9f945b"),
Address: "127.0.0.2",
Checks: structs.HealthChecks{
&structs.HealthCheck{
Node: "foo",
CheckID: "foo:alive",
Name: "foo-liveness",
Status: api.HealthPassing,
},
},
WriteRequest: structs.WriteRequest{Token: token},
},
"Service api on foo": {
Datacenter: "dc1",
Node: "foo",
SkipNodeUpdate: true,
Service: &structs.NodeService{
Kind: structs.ServiceKindTypical,
ID: "api",
Service: "api",
Port: 9090,
Address: "198.18.1.2",
},
Checks: structs.HealthChecks{
&structs.HealthCheck{
Node: "foo",
CheckID: "foo:api",
Name: "api-liveness",
Status: api.HealthPassing,
ServiceID: "api",
ServiceName: "api",
},
},
WriteRequest: structs.WriteRequest{Token: token},
},
"Service api-proxy": {
Datacenter: "dc1",
Node: "foo",
SkipNodeUpdate: true,
Service: &structs.NodeService{
Kind: structs.ServiceKindConnectProxy,
ID: "api-proxy",
Service: "api-proxy",
Port: 8443,
Address: "198.18.1.2",
Proxy: structs.ConnectProxyConfig{
DestinationServiceName: "api",
Upstreams: structs.Upstreams{
{
DestinationName: "web",
LocalBindPort: 8080,
},
},
},
},
Checks: structs.HealthChecks{
&structs.HealthCheck{
Node: "foo",
CheckID: "foo:api-proxy",
Name: "api proxy listening",
Status: api.HealthPassing,
ServiceID: "api-proxy",
ServiceName: "api-proxy",
},
},
WriteRequest: structs.WriteRequest{Token: token},
},
"Node bar": {
Datacenter: "dc1",
Node: "bar",
ID: types.NodeID("c3e5fc07-3b2d-4c06-b8fc-a1a12432d459"),
Address: "127.0.0.3",
Checks: structs.HealthChecks{
&structs.HealthCheck{
Node: "bar",
CheckID: "bar:alive",
Name: "bar-liveness",
Status: api.HealthPassing,
},
},
WriteRequest: structs.WriteRequest{Token: token},
},
"Service web on bar": {
Datacenter: "dc1",
Node: "bar",
SkipNodeUpdate: true,
Service: &structs.NodeService{
Kind: structs.ServiceKindTypical,
ID: "web",
Service: "web",
Port: 80,
Address: "198.18.1.20",
},
Checks: structs.HealthChecks{
&structs.HealthCheck{
Node: "bar",
CheckID: "bar:web",
Name: "web-liveness",
Status: api.HealthWarning,
ServiceID: "web",
ServiceName: "web",
},
},
WriteRequest: structs.WriteRequest{Token: token},
},
"Service web-proxy on bar": {
Datacenter: "dc1",
Node: "bar",
SkipNodeUpdate: true,
Service: &structs.NodeService{
Kind: structs.ServiceKindConnectProxy,
ID: "web-proxy",
Service: "web-proxy",
Port: 8443,
Address: "198.18.1.20",
Proxy: structs.ConnectProxyConfig{
DestinationServiceName: "web",
Upstreams: structs.Upstreams{
{
DestinationName: "redis",
LocalBindPort: 123,
},
},
},
},
Checks: structs.HealthChecks{
&structs.HealthCheck{
Node: "bar",
CheckID: "bar:web-proxy",
Name: "web proxy listening",
Status: api.HealthCritical,
ServiceID: "web-proxy",
ServiceName: "web-proxy",
},
},
WriteRequest: structs.WriteRequest{Token: token},
},
"Node baz": {
Datacenter: "dc1",
Node: "baz",
ID: types.NodeID("37ea7c44-a2a1-4764-ae28-7dfebeb54a22"),
Address: "127.0.0.4",
Checks: structs.HealthChecks{
&structs.HealthCheck{
Node: "baz",
CheckID: "baz:alive",
Name: "baz-liveness",
Status: api.HealthPassing,
},
},
WriteRequest: structs.WriteRequest{Token: token},
},
"Service web on baz": {
Datacenter: "dc1",
Node: "baz",
SkipNodeUpdate: true,
Service: &structs.NodeService{
Kind: structs.ServiceKindTypical,
ID: "web",
Service: "web",
Port: 80,
Address: "198.18.1.40",
},
Checks: structs.HealthChecks{
&structs.HealthCheck{
Node: "baz",
CheckID: "baz:web",
Name: "web-liveness",
Status: api.HealthPassing,
ServiceID: "web",
ServiceName: "web",
},
},
WriteRequest: structs.WriteRequest{Token: token},
},
"Service web-proxy on baz": {
Datacenter: "dc1",
Node: "baz",
SkipNodeUpdate: true,
Service: &structs.NodeService{
Kind: structs.ServiceKindConnectProxy,
ID: "web-proxy",
Service: "web-proxy",
Port: 8443,
Address: "198.18.1.40",
Proxy: structs.ConnectProxyConfig{
DestinationServiceName: "web",
Upstreams: structs.Upstreams{
{
DestinationName: "redis",
LocalBindPort: 123,
},
},
},
},
Checks: structs.HealthChecks{
&structs.HealthCheck{
Node: "baz",
CheckID: "baz:web-proxy",
Name: "web proxy listening",
Status: api.HealthCritical,
ServiceID: "web-proxy",
ServiceName: "web-proxy",
},
},
WriteRequest: structs.WriteRequest{Token: token},
},
"Node zip": {
Datacenter: "dc1",
Node: "zip",
ID: types.NodeID("dc49fc8c-afc7-4a87-815d-74d144535075"),
Address: "127.0.0.5",
Checks: structs.HealthChecks{
&structs.HealthCheck{
Node: "zip",
CheckID: "zip:alive",
Name: "zip-liveness",
Status: api.HealthPassing,
},
},
WriteRequest: structs.WriteRequest{Token: token},
},
"Service redis on zip": {
Datacenter: "dc1",
Node: "zip",
SkipNodeUpdate: true,
Service: &structs.NodeService{
Kind: structs.ServiceKindTypical,
ID: "redis",
Service: "redis",
Port: 6379,
Address: "198.18.1.60",
},
Checks: structs.HealthChecks{
&structs.HealthCheck{
Node: "zip",
CheckID: "zip:redis",
Name: "redis-liveness",
Status: api.HealthPassing,
ServiceID: "redis",
ServiceName: "redis",
},
},
WriteRequest: structs.WriteRequest{Token: token},
},
"Service redis-proxy on zip": {
Datacenter: "dc1",
Node: "zip",
SkipNodeUpdate: true,
Service: &structs.NodeService{
Kind: structs.ServiceKindConnectProxy,
ID: "redis-proxy",
Service: "redis-proxy",
Port: 8443,
Address: "198.18.1.60",
Proxy: structs.ConnectProxyConfig{
DestinationServiceName: "redis",
},
},
Checks: structs.HealthChecks{
&structs.HealthCheck{
Node: "zip",
CheckID: "zip:redis-proxy",
Name: "redis proxy listening",
Status: api.HealthCritical,
ServiceID: "redis-proxy",
ServiceName: "redis-proxy",
},
},
WriteRequest: structs.WriteRequest{Token: token},
},
}
registerTestCatalogEntriesMap(t, codec, registrations)
}

View File

@ -144,6 +144,45 @@ func (m *Internal) ServiceDump(args *structs.ServiceDumpRequest, reply *structs.
})
}
func (m *Internal) ServiceTopology(args *structs.ServiceSpecificRequest, reply *structs.IndexedServiceTopology) error {
if done, err := m.srv.ForwardRPC("Internal.ServiceTopology", args, args, reply); done {
return err
}
if args.ServiceName == "" {
return fmt.Errorf("Must provide a service name")
}
var authzContext acl.AuthorizerContext
authz, err := m.srv.ResolveTokenAndDefaultMeta(args.Token, &args.EnterpriseMeta, &authzContext)
if err != nil {
return err
}
if err := m.srv.validateEnterpriseRequest(&args.EnterpriseMeta, false); err != nil {
return err
}
if authz != nil && authz.ServiceRead(args.ServiceName, &authzContext) != acl.Allow {
return acl.ErrPermissionDenied
}
return m.srv.blockingQuery(
&args.QueryOptions,
&reply.QueryMeta,
func(ws memdb.WatchSet, state *state.Store) error {
index, topology, err := state.ServiceTopology(ws, args.Datacenter, args.ServiceName, &args.EnterpriseMeta)
if err != nil {
return err
}
reply.Index = index
reply.ServiceTopology = topology
if err := m.srv.filterACL(args.Token, reply); err != nil {
return err
}
return nil
})
}
// GatewayServiceNodes returns all the nodes for services associated with a gateway along with their gateway config
func (m *Internal) GatewayServiceDump(args *structs.ServiceSpecificRequest, reply *structs.IndexedServiceDump) error {
if done, err := m.srv.ForwardRPC("Internal.GatewayServiceDump", args, args, reply); done {

View File

@ -1605,3 +1605,145 @@ service_prefix "terminating-gateway" { policy = "read" }
}
assert.ElementsMatch(t, expected, actual)
}
func TestInternal_ServiceTopology(t *testing.T) {
t.Parallel()
dir1, s1 := testServer(t)
defer os.RemoveAll(dir1)
defer s1.Shutdown()
testrpc.WaitForLeader(t, s1.RPC, "dc1")
codec := rpcClient(t, s1)
defer codec.Close()
// api and api-proxy on node foo - upstream: web
// web and web-proxy on node bar - upstream: redis
// web and web-proxy on node baz - upstream: redis
// redis and redis-proxy on node zip
registerTestTopologyEntries(t, codec, "")
t.Run("api", func(t *testing.T) {
args := structs.ServiceSpecificRequest{
Datacenter: "dc1",
ServiceName: "api",
}
var out structs.IndexedServiceTopology
require.NoError(t, msgpackrpc.CallWithCodec(codec, "Internal.ServiceTopology", &args, &out))
require.False(t, out.FilteredByACLs)
// bar/web, bar/web-proxy, baz/web, baz/web-proxy
require.Len(t, out.ServiceTopology.Upstreams, 4)
require.Len(t, out.ServiceTopology.Downstreams, 0)
})
t.Run("web", func(t *testing.T) {
args := structs.ServiceSpecificRequest{
Datacenter: "dc1",
ServiceName: "web",
}
var out structs.IndexedServiceTopology
require.NoError(t, msgpackrpc.CallWithCodec(codec, "Internal.ServiceTopology", &args, &out))
require.False(t, out.FilteredByACLs)
// foo/api, foo/api-proxy
require.Len(t, out.ServiceTopology.Upstreams, 2)
// zip/redis, zip/redis-proxy
require.Len(t, out.ServiceTopology.Downstreams, 2)
})
t.Run("redis", func(t *testing.T) {
args := structs.ServiceSpecificRequest{
Datacenter: "dc1",
ServiceName: "redis",
}
var out structs.IndexedServiceTopology
require.NoError(t, msgpackrpc.CallWithCodec(codec, "Internal.ServiceTopology", &args, &out))
require.False(t, out.FilteredByACLs)
require.Len(t, out.ServiceTopology.Upstreams, 0)
// bar/web, bar/web-proxy, baz/web, baz/web-proxy
require.Len(t, out.ServiceTopology.Downstreams, 4)
})
}
func TestInternal_ServiceTopology_ACL(t *testing.T) {
t.Parallel()
dir1, s1 := testServerWithConfig(t, func(c *Config) {
c.ACLDatacenter = "dc1"
c.ACLsEnabled = true
c.ACLMasterToken = TestDefaultMasterToken
c.ACLDefaultPolicy = "deny"
})
defer os.RemoveAll(dir1)
defer s1.Shutdown()
testrpc.WaitForLeader(t, s1.RPC, "dc1")
codec := rpcClient(t, s1)
defer codec.Close()
// api and api-proxy on node foo - upstream: web
// web and web-proxy on node bar - upstream: redis
// web and web-proxy on node baz - upstream: redis
// redis and redis-proxy on node zip
registerTestTopologyEntries(t, codec, TestDefaultMasterToken)
// Token grants read to: foo/api, foo/api-proxy, bar/web, baz/web
userToken, err := upsertTestTokenWithPolicyRules(codec, TestDefaultMasterToken, "dc1", `
node_prefix "" { policy = "read" }
service_prefix "api" { policy = "read" }
service "web" { policy = "read" }
`)
require.NoError(t, err)
t.Run("api can't read web", func(t *testing.T) {
args := structs.ServiceSpecificRequest{
Datacenter: "dc1",
ServiceName: "api",
QueryOptions: structs.QueryOptions{Token: userToken.SecretID},
}
var out structs.IndexedServiceTopology
require.NoError(t, msgpackrpc.CallWithCodec(codec, "Internal.ServiceTopology", &args, &out))
require.True(t, out.FilteredByACLs)
// The web-proxy upstream gets filtered out from both bar and baz
require.Len(t, out.ServiceTopology.Upstreams, 2)
require.Equal(t, "web", out.ServiceTopology.Upstreams[0].Service.Service)
require.Equal(t, "web", out.ServiceTopology.Upstreams[1].Service.Service)
require.Len(t, out.ServiceTopology.Downstreams, 0)
})
t.Run("web can't read redis", func(t *testing.T) {
args := structs.ServiceSpecificRequest{
Datacenter: "dc1",
ServiceName: "web",
QueryOptions: structs.QueryOptions{Token: userToken.SecretID},
}
var out structs.IndexedServiceTopology
require.NoError(t, msgpackrpc.CallWithCodec(codec, "Internal.ServiceTopology", &args, &out))
require.True(t, out.FilteredByACLs)
// The redis upstream gets filtered out but the api and proxy downstream are returned
require.Len(t, out.ServiceTopology.Upstreams, 0)
require.Len(t, out.ServiceTopology.Downstreams, 2)
})
t.Run("redis can't read self", func(t *testing.T) {
args := structs.ServiceSpecificRequest{
Datacenter: "dc1",
ServiceName: "redis",
QueryOptions: structs.QueryOptions{Token: userToken.SecretID},
}
var out structs.IndexedServiceTopology
err := msgpackrpc.CallWithCodec(codec, "Internal.ServiceTopology", &args, &out)
// Can't read self, fails fast
require.True(t, acl.IsErrPermissionDenied(err))
})
}

View File

@ -12,11 +12,13 @@ import (
"github.com/hashicorp/consul/types"
memdb "github.com/hashicorp/go-memdb"
"github.com/hashicorp/go-uuid"
"github.com/mitchellh/copystructure"
)
const (
servicesTableName = "services"
gatewayServicesTableName = "gateway-services"
topologyTableName = "mesh-topology"
// serviceLastExtinctionIndexName keeps track of the last raft index when the last instance
// of any service was unregistered. This is used by blocking queries on missing services.
@ -103,6 +105,47 @@ func gatewayServicesTableNameSchema() *memdb.TableSchema {
}
}
// topologyTableNameSchema returns a new table schema used to store information
// relating upstream and downstream services
func topologyTableNameSchema() *memdb.TableSchema {
return &memdb.TableSchema{
Name: topologyTableName,
Indexes: map[string]*memdb.IndexSchema{
"id": {
Name: "id",
AllowMissing: false,
Unique: true,
Indexer: &memdb.CompoundIndex{
Indexes: []memdb.Indexer{
&ServiceNameIndex{
Field: "Upstream",
},
&ServiceNameIndex{
Field: "Downstream",
},
},
},
},
"upstream": {
Name: "upstream",
AllowMissing: true,
Unique: false,
Indexer: &ServiceNameIndex{
Field: "Upstream",
},
},
"downstream": {
Name: "downstream",
AllowMissing: false,
Unique: false,
Indexer: &ServiceNameIndex{
Field: "Downstream",
},
},
},
}
}
type ServiceNameIndex struct {
Field string
}
@ -164,6 +207,7 @@ func init() {
registerSchema(servicesTableSchema)
registerSchema(checksTableSchema)
registerSchema(gatewayServicesTableNameSchema)
registerSchema(topologyTableNameSchema)
}
const (
@ -782,10 +826,15 @@ func ensureServiceTxn(tx *txn, idx uint64, node string, preserveIndexes bool, sv
}
// Check if this service is covered by a gateway's wildcard specifier
err = checkGatewayWildcardsAndUpdate(tx, idx, svc)
if err != nil {
if err = checkGatewayWildcardsAndUpdate(tx, idx, svc); err != nil {
return fmt.Errorf("failed updating gateway mapping: %s", err)
}
// Update upstream/downstream mappings if it's a connect service
if svc.Kind == structs.ServiceKindConnectProxy {
if err = updateMeshTopology(tx, idx, node, svc, existing); err != nil {
return fmt.Errorf("failed updating upstream/downstream association")
}
}
// Create the service node entry and populate the indexes. Note that
// conversion doesn't populate any of the node-specific information.
@ -1485,9 +1534,14 @@ func (s *Store) deleteServiceTxn(tx *txn, idx uint64, nodeName, serviceID string
}
svc := service.(*structs.ServiceNode)
name := svc.CompoundServiceName()
if err := catalogUpdateServiceKindIndexes(tx, svc.ServiceKind, idx, &svc.EnterpriseMeta); err != nil {
return err
}
if err := cleanupMeshTopology(tx, idx, svc); err != nil {
return fmt.Errorf("failed to clean up mesh-topology associations for %q: %v", name.String(), err)
}
if _, remainingService, err := firstWatchWithTxn(tx, "services", "service", svc.ServiceName, entMeta); err == nil {
if remainingService != nil {
@ -1508,26 +1562,8 @@ func (s *Store) deleteServiceTxn(tx *txn, idx uint64, nodeName, serviceID string
if err := catalogUpdateServiceExtinctionIndex(tx, idx, entMeta); err != nil {
return err
}
// Clean up association between service name and gateways if needed
gateways, err := serviceGateways(tx, svc.ServiceName, &svc.EnterpriseMeta)
if err != nil {
return fmt.Errorf("failed gateway lookup for %q: %s", svc.ServiceName, err)
}
for mapping := gateways.Next(); mapping != nil; mapping = gateways.Next() {
if gs, ok := mapping.(*structs.GatewayService); ok && gs != nil {
// Only delete if association was created by a wildcard specifier.
// Otherwise the service was specified in the config entry, and the association should be maintained
// for when the service is re-registered
if gs.FromWildcard {
if err := tx.Delete(gatewayServicesTableName, gs); err != nil {
return fmt.Errorf("failed to truncate gateway services table: %v", err)
}
if err := indexUpdateMaxTxn(tx, idx, gatewayServicesTableName); err != nil {
return fmt.Errorf("failed updating gateway-services index: %v", err)
}
}
}
if err := cleanupGatewayWildcards(tx, idx, svc); err != nil {
return fmt.Errorf("failed to clean up gateway-service associations for %q: %v", name.String(), err)
}
}
} else {
@ -1980,6 +2016,33 @@ func (s *Store) deleteCheckTxn(tx *txn, idx uint64, node string, checkID types.C
return nil
}
// CombinedCheckServiceNodes is used to query all nodes and checks for both typical and Connect endpoints of a service
func (s *Store) CombinedCheckServiceNodes(ws memdb.WatchSet, service structs.ServiceName) (uint64, structs.CheckServiceNodes, error) {
var (
resp structs.CheckServiceNodes
maxIdx uint64
)
idx, csn, err := s.CheckServiceNodes(ws, service.Name, &service.EnterpriseMeta)
if err != nil {
return 0, nil, fmt.Errorf("failed to get downstream nodes for %q: %v", service, err)
}
if idx > maxIdx {
maxIdx = idx
}
resp = append(resp, csn...)
idx, csn, err = s.CheckConnectServiceNodes(ws, service.Name, &service.EnterpriseMeta)
if err != nil {
return 0, nil, fmt.Errorf("failed to get downstream connect nodes for %q: %v", service, err)
}
if idx > maxIdx {
maxIdx = idx
}
resp = append(resp, csn...)
return maxIdx, resp, nil
}
// CheckServiceNodes is used to query all nodes and checks for a given service.
func (s *Store) CheckServiceNodes(ws memdb.WatchSet, serviceName string, entMeta *structs.EnterpriseMeta) (uint64, structs.CheckServiceNodes, error) {
return s.checkServiceNodes(ws, serviceName, false, entMeta)
@ -2702,6 +2765,30 @@ func checkGatewayWildcardsAndUpdate(tx *txn, idx uint64, svc *structs.NodeServic
return nil
}
func cleanupGatewayWildcards(tx *txn, idx uint64, svc *structs.ServiceNode) error {
// Clean up association between service name and gateways if needed
gateways, err := serviceGateways(tx, svc.ServiceName, &svc.EnterpriseMeta)
if err != nil {
return fmt.Errorf("failed gateway lookup for %q: %s", svc.ServiceName, err)
}
for mapping := gateways.Next(); mapping != nil; mapping = gateways.Next() {
if gs, ok := mapping.(*structs.GatewayService); ok && gs != nil {
// Only delete if association was created by a wildcard specifier.
// Otherwise the service was specified in the config entry, and the association should be maintained
// for when the service is re-registered
if gs.FromWildcard {
if err := tx.Delete(gatewayServicesTableName, gs); err != nil {
return fmt.Errorf("failed to truncate gateway services table: %v", err)
}
if err := indexUpdateMaxTxn(tx, idx, gatewayServicesTableName); err != nil {
return fmt.Errorf("failed updating gateway-services index: %v", err)
}
}
}
}
return nil
}
// serviceGateways returns all GatewayService entries with the given service name. This effectively looks up
// all the gateways mapped to this service.
func serviceGateways(tx *txn, name string, entMeta *structs.EnterpriseMeta) (memdb.ResultIterator, error) {
@ -2820,3 +2907,289 @@ func checkProtocolMatch(tx ReadTxn, ws memdb.WatchSet, svc *structs.GatewayServi
return idx, svc.Protocol == protocol, nil
}
func (s *Store) ServiceTopology(
ws memdb.WatchSet,
dc, service string,
entMeta *structs.EnterpriseMeta,
) (uint64, *structs.ServiceTopology, error) {
tx := s.db.ReadTxn()
defer tx.Abort()
var (
maxIdx uint64
sn = structs.NewServiceName(service, entMeta)
)
idx, upstreamNames, err := upstreamsFromRegistrationTxn(tx, ws, sn)
if err != nil {
return 0, nil, err
}
if idx > maxIdx {
maxIdx = idx
}
idx, upstreams, err := s.combinedServiceNodesTxn(tx, ws, upstreamNames)
if err != nil {
return 0, nil, fmt.Errorf("failed to get upstreams for %q: %v", sn.String(), err)
}
if idx > maxIdx {
maxIdx = idx
}
idx, downstreamNames, err := s.downstreamsForServiceTxn(tx, ws, dc, sn)
if err != nil {
return 0, nil, err
}
if idx > maxIdx {
maxIdx = idx
}
idx, downstreams, err := s.combinedServiceNodesTxn(tx, ws, downstreamNames)
if err != nil {
return 0, nil, fmt.Errorf("failed to get downstreams for %q: %v", sn.String(), err)
}
if idx > maxIdx {
maxIdx = idx
}
resp := &structs.ServiceTopology{
Upstreams: upstreams,
Downstreams: downstreams,
}
return maxIdx, resp, nil
}
// combinedServiceNodesTxn returns typical and connect endpoints for a list of services.
// This enabled aggregating checks statuses across both.
func (s *Store) combinedServiceNodesTxn(tx *txn, ws memdb.WatchSet, names []structs.ServiceName) (uint64, structs.CheckServiceNodes, error) {
var (
maxIdx uint64
resp structs.CheckServiceNodes
)
for _, u := range names {
// Collect typical then connect instances
idx, csn, err := checkServiceNodesTxn(tx, ws, u.Name, false, &u.EnterpriseMeta)
if err != nil {
return 0, nil, err
}
if idx > maxIdx {
maxIdx = idx
}
resp = append(resp, csn...)
idx, csn, err = checkServiceNodesTxn(tx, ws, u.Name, true, &u.EnterpriseMeta)
if err != nil {
return 0, nil, err
}
if idx > maxIdx {
maxIdx = idx
}
resp = append(resp, csn...)
}
return maxIdx, resp, nil
}
// downstreamsForServiceTxn will find all downstream services that could route traffic to the input service.
// There are two factors at play. Upstreams defined in a proxy registration, and the discovery chain for those upstreams.
// TODO (freddy): Account for ingress gateways
func (s *Store) downstreamsForServiceTxn(tx ReadTxn, ws memdb.WatchSet, dc string, service structs.ServiceName) (uint64, []structs.ServiceName, error) {
// First fetch services that have discovery chains that eventually route to the target service
idx, sources, err := s.discoveryChainSourcesTxn(tx, ws, dc, service)
if err != nil {
return 0, nil, fmt.Errorf("failed to get sources for discovery chain target %q: %v", service.String(), err)
}
var maxIdx uint64
if idx > maxIdx {
maxIdx = idx
}
var (
resp []structs.ServiceName
seen = make(map[structs.ServiceName]bool)
)
for _, s := range sources {
// We then follow these sources one level down to the services defining them as an upstream.
idx, downstreams, err := downstreamsFromRegistrationTxn(tx, ws, s)
if err != nil {
return 0, nil, fmt.Errorf("failed to get registration downstreams for %q: %v", s.String(), err)
}
if idx > maxIdx {
maxIdx = idx
}
for _, d := range downstreams {
if !seen[d] {
resp = append(resp, d)
seen[d] = true
}
}
}
return maxIdx, resp, nil
}
// upstreamsFromRegistrationTxn returns the ServiceNames of the upstreams defined across instances of the input
func upstreamsFromRegistrationTxn(tx ReadTxn, ws memdb.WatchSet, sn structs.ServiceName) (uint64, []structs.ServiceName, error) {
return linkedFromRegistrationTxn(tx, ws, sn, false)
}
// downstreamsFromRegistrationTxn returns the ServiceNames of downstream services based on registrations across instances of the input
func downstreamsFromRegistrationTxn(tx ReadTxn, ws memdb.WatchSet, sn structs.ServiceName) (uint64, []structs.ServiceName, error) {
return linkedFromRegistrationTxn(tx, ws, sn, true)
}
func linkedFromRegistrationTxn(tx ReadTxn, ws memdb.WatchSet, service structs.ServiceName, downstreams bool) (uint64, []structs.ServiceName, error) {
// To fetch upstreams we query services that have the input listed as a downstream
// To fetch downstreams we query services that have the input listed as an upstream
index := "downstream"
if downstreams {
index = "upstream"
}
iter, err := tx.Get(topologyTableName, index, service)
if err != nil {
return 0, nil, fmt.Errorf("%q lookup failed: %v", topologyTableName, err)
}
ws.Add(iter.WatchCh())
var (
idx uint64
resp []structs.ServiceName
)
for raw := iter.Next(); raw != nil; raw = iter.Next() {
entry := raw.(*structs.UpstreamDownstream)
if entry.ModifyIndex > idx {
idx = entry.ModifyIndex
}
linked := entry.Upstream
if downstreams {
linked = entry.Downstream
}
resp = append(resp, linked)
}
// TODO (freddy) This needs a tombstone to avoid the index sliding back on mapping deletion
// Using the table index here means that blocking queries will wake up more often than they should
tableIdx := maxIndexTxn(tx, topologyTableName)
if tableIdx > idx {
idx = tableIdx
}
return idx, resp, nil
}
// updateMeshTopology creates associations between the input service and its upstreams in the topology table
func updateMeshTopology(tx *txn, idx uint64, node string, svc *structs.NodeService, existing interface{}) error {
oldUpstreams := make(map[structs.ServiceName]bool)
if e, ok := existing.(*structs.ServiceNode); ok {
for _, u := range e.ServiceProxy.Upstreams {
upstreamMeta := structs.EnterpriseMetaInitializer(u.DestinationNamespace)
sn := structs.NewServiceName(u.DestinationName, &upstreamMeta)
oldUpstreams[sn] = true
}
}
// Despite the name "destination", this service name is downstream of the proxy
downstream := structs.NewServiceName(svc.Proxy.DestinationServiceName, &svc.EnterpriseMeta)
inserted := make(map[structs.ServiceName]bool)
for _, u := range svc.Proxy.Upstreams {
if u.DestinationType == structs.UpstreamDestTypePreparedQuery {
continue
}
// TODO (freddy): Account for upstream datacenter
upstreamMeta := structs.EnterpriseMetaInitializer(u.DestinationNamespace)
upstream := structs.NewServiceName(u.DestinationName, &upstreamMeta)
obj, err := tx.First(topologyTableName, "id", upstream, downstream)
if err != nil {
return fmt.Errorf("%q lookup failed: %v", topologyTableName, err)
}
sid := svc.CompoundServiceID()
uid := structs.UniqueID(node, sid.String())
var mapping *structs.UpstreamDownstream
if existing, ok := obj.(*structs.UpstreamDownstream); ok {
rawCopy, err := copystructure.Copy(existing)
if err != nil {
return fmt.Errorf("failed to copy existing topology mapping: %v", err)
}
mapping, ok = rawCopy.(*structs.UpstreamDownstream)
if !ok {
return fmt.Errorf("unexpected topology type %T", rawCopy)
}
mapping.Refs[uid] = struct{}{}
mapping.ModifyIndex = idx
inserted[upstream] = true
}
if mapping == nil {
mapping = &structs.UpstreamDownstream{
Upstream: upstream,
Downstream: downstream,
Refs: map[string]struct{}{uid: {}},
RaftIndex: structs.RaftIndex{
CreateIndex: idx,
ModifyIndex: idx,
},
}
}
if err := tx.Insert(topologyTableName, mapping); err != nil {
return fmt.Errorf("failed inserting %s mapping: %s", topologyTableName, err)
}
if err := indexUpdateMaxTxn(tx, idx, topologyTableName); err != nil {
return fmt.Errorf("failed updating %s index: %v", topologyTableName, err)
}
inserted[upstream] = true
}
for u := range oldUpstreams {
if !inserted[u] {
if _, err := tx.DeleteAll(topologyTableName, "id", u, downstream); err != nil {
return fmt.Errorf("failed to truncate %s table: %v", topologyTableName, err)
}
if err := indexUpdateMaxTxn(tx, idx, topologyTableName); err != nil {
return fmt.Errorf("failed updating %s index: %v", topologyTableName, err)
}
}
}
return nil
}
// cleanupMeshTopology removes a service from the mesh topology table
// This is only safe to call when there are no more known instances of this proxy
func cleanupMeshTopology(tx *txn, idx uint64, service *structs.ServiceNode) error {
if service.ServiceKind != structs.ServiceKindConnectProxy {
return nil
}
sn := structs.NewServiceName(service.ServiceProxy.DestinationServiceName, &service.EnterpriseMeta)
sid := service.CompoundServiceID()
uid := structs.UniqueID(service.Node, sid.String())
iter, err := tx.Get(topologyTableName, "downstream", sn)
if err != nil {
return fmt.Errorf("%q lookup failed: %v", topologyTableName, err)
}
for raw := iter.Next(); raw != nil; raw = iter.Next() {
entry := raw.(*structs.UpstreamDownstream)
rawCopy, err := copystructure.Copy(entry)
if err != nil {
return fmt.Errorf("failed to copy existing topology mapping: %v", err)
}
copy, ok := rawCopy.(*structs.UpstreamDownstream)
if !ok {
return fmt.Errorf("unexpected topology type %T", rawCopy)
}
delete(copy.Refs, uid)
if len(copy.Refs) == 0 {
if err := tx.Delete(topologyTableName, entry); err != nil {
return fmt.Errorf("failed to truncate %s table: %v", topologyTableName, err)
}
if err := indexUpdateMaxTxn(tx, idx, topologyTableName); err != nil {
return fmt.Errorf("failed updating %s index: %v", topologyTableName, err)
}
}
}
return nil
}

View File

@ -6114,3 +6114,859 @@ func TestStateStore_DumpGatewayServices(t *testing.T) {
assert.Len(t, out, 0)
})
}
func TestCatalog_catalogDownstreams_Watches(t *testing.T) {
type expect struct {
idx uint64
names []structs.ServiceName
}
s := testStateStore(t)
require.NoError(t, s.EnsureNode(0, &structs.Node{
ID: "c73b8fdf-4ef8-4e43-9aa2-59e85cc6a70c",
Node: "foo",
}))
defaultMeta := structs.DefaultEnterpriseMeta()
admin := structs.NewServiceName("admin", defaultMeta)
cache := structs.NewServiceName("cache", defaultMeta)
// Watch should fire since the admin <-> web-proxy pairing was inserted into the topology table
ws := memdb.NewWatchSet()
tx := s.db.ReadTxn()
idx, names, err := downstreamsFromRegistrationTxn(tx, ws, admin)
require.NoError(t, err)
assert.Zero(t, idx)
assert.Len(t, names, 0)
svc := structs.NodeService{
Kind: structs.ServiceKindConnectProxy,
ID: "web-proxy",
Service: "web-proxy",
Address: "127.0.0.2",
Port: 443,
Proxy: structs.ConnectProxyConfig{
DestinationServiceName: "web",
Upstreams: structs.Upstreams{
structs.Upstream{
DestinationName: "db",
},
structs.Upstream{
DestinationName: "admin",
},
},
},
EnterpriseMeta: *defaultMeta,
}
require.NoError(t, s.EnsureService(1, "foo", &svc))
assert.True(t, watchFired(ws))
ws = memdb.NewWatchSet()
tx = s.db.ReadTxn()
idx, names, err = downstreamsFromRegistrationTxn(tx, ws, admin)
require.NoError(t, err)
exp := expect{
idx: 1,
names: []structs.ServiceName{
{Name: "web", EnterpriseMeta: *defaultMeta},
},
}
require.Equal(t, exp.idx, idx)
require.ElementsMatch(t, exp.names, names)
// Now replace the admin upstream to verify watch fires and mapping is removed
svc.Proxy.Upstreams = structs.Upstreams{
structs.Upstream{
DestinationName: "db",
},
structs.Upstream{
DestinationName: "not-admin",
},
structs.Upstream{
DestinationName: "cache",
},
}
require.NoError(t, s.EnsureService(2, "foo", &svc))
assert.True(t, watchFired(ws))
ws = memdb.NewWatchSet()
tx = s.db.ReadTxn()
idx, _, err = downstreamsFromRegistrationTxn(tx, ws, admin)
require.NoError(t, err)
exp = expect{
// Expect index where the upstream was replaced
idx: 2,
}
require.Equal(t, exp.idx, idx)
require.Empty(t, exp.names)
// Should still be able to get downstream for one of the other upstreams
ws = memdb.NewWatchSet()
tx = s.db.ReadTxn()
idx, names, err = downstreamsFromRegistrationTxn(tx, ws, cache)
require.NoError(t, err)
exp = expect{
idx: 2,
names: []structs.ServiceName{
{Name: "web", EnterpriseMeta: *defaultMeta},
},
}
require.Equal(t, exp.idx, idx)
require.ElementsMatch(t, exp.names, names)
// Now delete the web-proxy service and the result should be empty
require.NoError(t, s.DeleteService(3, "foo", "web-proxy", defaultMeta))
assert.True(t, watchFired(ws))
ws = memdb.NewWatchSet()
tx = s.db.ReadTxn()
idx, _, err = downstreamsFromRegistrationTxn(tx, ws, cache)
require.NoError(t, err)
exp = expect{
// Expect deletion index
idx: 3,
}
require.Equal(t, exp.idx, idx)
require.Empty(t, exp.names)
}
func TestCatalog_catalogDownstreams(t *testing.T) {
defaultMeta := structs.DefaultEnterpriseMeta()
type expect struct {
idx uint64
names []structs.ServiceName
}
tt := []struct {
name string
services []*structs.NodeService
expect expect
}{
{
name: "single proxy with multiple upstreams",
services: []*structs.NodeService{
{
Kind: structs.ServiceKindConnectProxy,
ID: "api-proxy",
Service: "api-proxy",
Address: "127.0.0.1",
Port: 443,
Proxy: structs.ConnectProxyConfig{
DestinationServiceName: "api",
Upstreams: structs.Upstreams{
structs.Upstream{
DestinationName: "cache",
},
structs.Upstream{
DestinationName: "db",
},
structs.Upstream{
DestinationName: "admin",
},
},
},
EnterpriseMeta: *defaultMeta,
},
},
expect: expect{
idx: 1,
names: []structs.ServiceName{
{Name: "api", EnterpriseMeta: *defaultMeta},
},
},
},
{
name: "multiple proxies with multiple upstreams",
services: []*structs.NodeService{
{
Kind: structs.ServiceKindConnectProxy,
ID: "api-proxy",
Service: "api-proxy",
Address: "127.0.0.1",
Port: 443,
Proxy: structs.ConnectProxyConfig{
DestinationServiceName: "api",
Upstreams: structs.Upstreams{
structs.Upstream{
DestinationName: "cache",
},
structs.Upstream{
DestinationName: "db",
},
structs.Upstream{
DestinationName: "admin",
},
},
},
EnterpriseMeta: *defaultMeta,
},
{
Kind: structs.ServiceKindConnectProxy,
ID: "web-proxy",
Service: "web-proxy",
Address: "127.0.0.2",
Port: 443,
Proxy: structs.ConnectProxyConfig{
DestinationServiceName: "web",
Upstreams: structs.Upstreams{
structs.Upstream{
DestinationName: "db",
},
structs.Upstream{
DestinationName: "admin",
},
},
},
EnterpriseMeta: *defaultMeta,
},
},
expect: expect{
idx: 2,
names: []structs.ServiceName{
{Name: "api", EnterpriseMeta: *defaultMeta},
{Name: "web", EnterpriseMeta: *defaultMeta},
},
},
},
}
for _, tc := range tt {
t.Run(tc.name, func(t *testing.T) {
s := testStateStore(t)
ws := memdb.NewWatchSet()
require.NoError(t, s.EnsureNode(0, &structs.Node{
ID: "c73b8fdf-4ef8-4e43-9aa2-59e85cc6a70c",
Node: "foo",
}))
var i uint64 = 1
for _, svc := range tc.services {
require.NoError(t, s.EnsureService(i, "foo", svc))
i++
}
tx := s.db.ReadTxn()
idx, names, err := downstreamsFromRegistrationTxn(tx, ws, structs.NewServiceName("admin", structs.DefaultEnterpriseMeta()))
require.NoError(t, err)
require.Equal(t, tc.expect.idx, idx)
require.ElementsMatch(t, tc.expect.names, names)
})
}
}
func TestCatalog_upstreamsFromRegistration(t *testing.T) {
defaultMeta := structs.DefaultEnterpriseMeta()
type expect struct {
idx uint64
names []structs.ServiceName
}
tt := []struct {
name string
services []*structs.NodeService
expect expect
}{
{
name: "single proxy with multiple upstreams",
services: []*structs.NodeService{
{
Kind: structs.ServiceKindConnectProxy,
ID: "api-proxy",
Service: "api-proxy",
Address: "127.0.0.1",
Port: 443,
Proxy: structs.ConnectProxyConfig{
DestinationServiceName: "api",
Upstreams: structs.Upstreams{
structs.Upstream{
DestinationName: "cache",
},
structs.Upstream{
DestinationName: "db",
},
structs.Upstream{
DestinationName: "admin",
},
},
},
EnterpriseMeta: *defaultMeta,
},
},
expect: expect{
idx: 1,
names: []structs.ServiceName{
{Name: "cache", EnterpriseMeta: *defaultMeta},
{Name: "db", EnterpriseMeta: *defaultMeta},
{Name: "admin", EnterpriseMeta: *defaultMeta},
},
},
},
{
name: "multiple proxies with multiple upstreams",
services: []*structs.NodeService{
{
Kind: structs.ServiceKindConnectProxy,
ID: "api-proxy",
Service: "api-proxy",
Address: "127.0.0.1",
Port: 443,
Proxy: structs.ConnectProxyConfig{
DestinationServiceName: "api",
Upstreams: structs.Upstreams{
structs.Upstream{
DestinationName: "cache",
},
structs.Upstream{
DestinationName: "db",
},
structs.Upstream{
DestinationName: "admin",
},
},
},
EnterpriseMeta: *defaultMeta,
},
{
Kind: structs.ServiceKindConnectProxy,
ID: "api-proxy-2",
Service: "api-proxy",
Address: "127.0.0.2",
Port: 443,
Proxy: structs.ConnectProxyConfig{
DestinationServiceName: "api",
Upstreams: structs.Upstreams{
structs.Upstream{
DestinationName: "cache",
},
structs.Upstream{
DestinationName: "db",
},
structs.Upstream{
DestinationName: "new-admin",
},
},
},
EnterpriseMeta: *defaultMeta,
},
{
Kind: structs.ServiceKindConnectProxy,
ID: "different-api-proxy",
Service: "different-api-proxy",
Address: "127.0.0.4",
Port: 443,
Proxy: structs.ConnectProxyConfig{
DestinationServiceName: "api",
Upstreams: structs.Upstreams{
structs.Upstream{
DestinationName: "elasticache",
},
structs.Upstream{
DestinationName: "db",
},
structs.Upstream{
DestinationName: "admin",
},
},
},
EnterpriseMeta: *defaultMeta,
},
{
Kind: structs.ServiceKindConnectProxy,
ID: "web-proxy",
Service: "web-proxy",
Address: "127.0.0.3",
Port: 80,
Proxy: structs.ConnectProxyConfig{
DestinationServiceName: "web",
Upstreams: structs.Upstreams{
structs.Upstream{
DestinationName: "db",
},
structs.Upstream{
DestinationName: "billing",
},
},
},
EnterpriseMeta: *defaultMeta,
},
},
expect: expect{
idx: 4,
names: []structs.ServiceName{
{Name: "cache", EnterpriseMeta: *defaultMeta},
{Name: "db", EnterpriseMeta: *defaultMeta},
{Name: "admin", EnterpriseMeta: *defaultMeta},
{Name: "new-admin", EnterpriseMeta: *defaultMeta},
{Name: "elasticache", EnterpriseMeta: *defaultMeta},
},
},
},
}
for _, tc := range tt {
t.Run(tc.name, func(t *testing.T) {
s := testStateStore(t)
ws := memdb.NewWatchSet()
require.NoError(t, s.EnsureNode(0, &structs.Node{
ID: "c73b8fdf-4ef8-4e43-9aa2-59e85cc6a70c",
Node: "foo",
}))
var i uint64 = 1
for _, svc := range tc.services {
require.NoError(t, s.EnsureService(i, "foo", svc))
i++
}
tx := s.db.ReadTxn()
idx, names, err := upstreamsFromRegistrationTxn(tx, ws, structs.NewServiceName("api", structs.DefaultEnterpriseMeta()))
require.NoError(t, err)
require.Equal(t, tc.expect.idx, idx)
require.ElementsMatch(t, tc.expect.names, names)
})
}
}
func TestCatalog_upstreamsFromRegistration_Watches(t *testing.T) {
type expect struct {
idx uint64
names []structs.ServiceName
}
s := testStateStore(t)
require.NoError(t, s.EnsureNode(0, &structs.Node{
ID: "c73b8fdf-4ef8-4e43-9aa2-59e85cc6a70c",
Node: "foo",
}))
defaultMeta := structs.DefaultEnterpriseMeta()
web := structs.NewServiceName("web", defaultMeta)
ws := memdb.NewWatchSet()
tx := s.db.ReadTxn()
idx, names, err := upstreamsFromRegistrationTxn(tx, ws, web)
require.NoError(t, err)
assert.Zero(t, idx)
assert.Len(t, names, 0)
// Watch should fire since the admin <-> web pairing was inserted into the topology table
svc := structs.NodeService{
Kind: structs.ServiceKindConnectProxy,
ID: "web-proxy",
Service: "web-proxy",
Address: "127.0.0.2",
Port: 443,
Proxy: structs.ConnectProxyConfig{
DestinationServiceName: "web",
Upstreams: structs.Upstreams{
structs.Upstream{
DestinationName: "db",
},
structs.Upstream{
DestinationName: "admin",
},
},
},
EnterpriseMeta: *defaultMeta,
}
require.NoError(t, s.EnsureService(1, "foo", &svc))
assert.True(t, watchFired(ws))
ws = memdb.NewWatchSet()
tx = s.db.ReadTxn()
idx, names, err = upstreamsFromRegistrationTxn(tx, ws, web)
require.NoError(t, err)
exp := expect{
idx: 1,
names: []structs.ServiceName{
{Name: "db", EnterpriseMeta: *defaultMeta},
{Name: "admin", EnterpriseMeta: *defaultMeta},
},
}
require.Equal(t, exp.idx, idx)
require.ElementsMatch(t, exp.names, names)
// Now edit the upstreams list to verify watch fires and mapping is removed
svc.Proxy.Upstreams = structs.Upstreams{
structs.Upstream{
DestinationName: "db",
},
structs.Upstream{
DestinationName: "not-admin",
},
}
require.NoError(t, s.EnsureService(2, "foo", &svc))
assert.True(t, watchFired(ws))
ws = memdb.NewWatchSet()
tx = s.db.ReadTxn()
idx, names, err = upstreamsFromRegistrationTxn(tx, ws, web)
require.NoError(t, err)
exp = expect{
// Expect index where the upstream was replaced
idx: 2,
names: []structs.ServiceName{
{Name: "db", EnterpriseMeta: *defaultMeta},
{Name: "not-admin", EnterpriseMeta: *defaultMeta},
},
}
require.Equal(t, exp.idx, idx)
require.ElementsMatch(t, exp.names, names)
// Adding a new instance with distinct upstreams should result in a list that joins both
svc = structs.NodeService{
Kind: structs.ServiceKindConnectProxy,
ID: "web-proxy-2",
Service: "web-proxy",
Address: "127.0.0.3",
Port: 443,
Proxy: structs.ConnectProxyConfig{
DestinationServiceName: "web",
Upstreams: structs.Upstreams{
structs.Upstream{
DestinationName: "db",
},
structs.Upstream{
DestinationName: "also-not-admin",
},
structs.Upstream{
DestinationName: "cache",
},
},
},
EnterpriseMeta: *defaultMeta,
}
require.NoError(t, s.EnsureService(3, "foo", &svc))
assert.True(t, watchFired(ws))
ws = memdb.NewWatchSet()
tx = s.db.ReadTxn()
idx, names, err = upstreamsFromRegistrationTxn(tx, ws, web)
require.NoError(t, err)
exp = expect{
idx: 3,
names: []structs.ServiceName{
{Name: "db", EnterpriseMeta: *defaultMeta},
{Name: "not-admin", EnterpriseMeta: *defaultMeta},
{Name: "also-not-admin", EnterpriseMeta: *defaultMeta},
{Name: "cache", EnterpriseMeta: *defaultMeta},
},
}
require.Equal(t, exp.idx, idx)
require.ElementsMatch(t, exp.names, names)
// Now delete the web-proxy service and the result should mirror the one of the remaining instance
require.NoError(t, s.DeleteService(4, "foo", "web-proxy", defaultMeta))
assert.True(t, watchFired(ws))
ws = memdb.NewWatchSet()
tx = s.db.ReadTxn()
idx, names, err = upstreamsFromRegistrationTxn(tx, ws, web)
require.NoError(t, err)
exp = expect{
idx: 4,
names: []structs.ServiceName{
{Name: "db", EnterpriseMeta: *defaultMeta},
{Name: "also-not-admin", EnterpriseMeta: *defaultMeta},
{Name: "cache", EnterpriseMeta: *defaultMeta},
},
}
require.Equal(t, exp.idx, idx)
require.ElementsMatch(t, exp.names, names)
// Now delete the last web-proxy instance and the mappings should be cleared
require.NoError(t, s.DeleteService(5, "foo", "web-proxy-2", defaultMeta))
assert.True(t, watchFired(ws))
ws = memdb.NewWatchSet()
tx = s.db.ReadTxn()
idx, _, err = upstreamsFromRegistrationTxn(tx, ws, web)
require.NoError(t, err)
exp = expect{
// Expect deletion index
idx: 5,
}
require.Equal(t, exp.idx, idx)
require.Empty(t, exp.names)
}
func TestCatalog_DownstreamsForService(t *testing.T) {
defaultMeta := structs.DefaultEnterpriseMeta()
type expect struct {
idx uint64
names []structs.ServiceName
}
tt := []struct {
name string
services []*structs.NodeService
entries []structs.ConfigEntry
expect expect
}{
{
name: "kitchen sink",
services: []*structs.NodeService{
{
Kind: structs.ServiceKindConnectProxy,
ID: "api-proxy",
Service: "api-proxy",
Address: "127.0.0.1",
Port: 443,
Proxy: structs.ConnectProxyConfig{
DestinationServiceName: "api",
Upstreams: structs.Upstreams{
structs.Upstream{
DestinationName: "cache",
},
structs.Upstream{
DestinationName: "db",
},
structs.Upstream{
DestinationName: "old-admin",
},
},
},
EnterpriseMeta: *defaultMeta,
},
{
Kind: structs.ServiceKindConnectProxy,
ID: "web-proxy",
Service: "web-proxy",
Address: "127.0.0.2",
Port: 443,
Proxy: structs.ConnectProxyConfig{
DestinationServiceName: "web",
Upstreams: structs.Upstreams{
structs.Upstream{
DestinationName: "db",
},
structs.Upstream{
DestinationName: "admin",
},
},
},
EnterpriseMeta: *defaultMeta,
},
},
entries: []structs.ConfigEntry{
&structs.ProxyConfigEntry{
Kind: structs.ProxyDefaults,
Name: structs.ProxyConfigGlobal,
Config: map[string]interface{}{
"protocol": "http",
},
},
&structs.ServiceRouterConfigEntry{
Kind: structs.ServiceRouter,
Name: "old-admin",
Routes: []structs.ServiceRoute{
{
Match: &structs.ServiceRouteMatch{
HTTP: &structs.ServiceRouteHTTPMatch{
PathExact: "/v2",
},
},
Destination: &structs.ServiceRouteDestination{
Service: "admin",
},
},
},
},
},
expect: expect{
idx: 4,
names: []structs.ServiceName{
// get web from listing admin directly as an upstream
{Name: "web", EnterpriseMeta: *defaultMeta},
// get api from old-admin routing to admin and web listing old-admin as an upstream
{Name: "api", EnterpriseMeta: *defaultMeta},
},
},
},
}
for _, tc := range tt {
t.Run(tc.name, func(t *testing.T) {
s := testStateStore(t)
require.NoError(t, s.EnsureNode(0, &structs.Node{
ID: "c73b8fdf-4ef8-4e43-9aa2-59e85cc6a70c",
Node: "foo",
}))
var i uint64 = 1
for _, svc := range tc.services {
require.NoError(t, s.EnsureService(i, "foo", svc))
i++
}
ca := &structs.CAConfiguration{
Provider: "consul",
}
err := s.CASetConfig(0, ca)
require.NoError(t, err)
for _, entry := range tc.entries {
require.NoError(t, entry.Normalize())
require.NoError(t, s.EnsureConfigEntry(i, entry, nil))
i++
}
tx := s.db.ReadTxn()
defer tx.Abort()
ws := memdb.NewWatchSet()
sn := structs.NewServiceName("admin", structs.DefaultEnterpriseMeta())
idx, names, err := s.downstreamsForServiceTxn(tx, ws, "dc1", sn)
require.NoError(t, err)
require.Equal(t, tc.expect.idx, idx)
require.ElementsMatch(t, tc.expect.names, names)
})
}
}
func TestCatalog_DownstreamsForService_Updates(t *testing.T) {
var (
defaultMeta = structs.DefaultEnterpriseMeta()
target = structs.NewServiceName("admin", defaultMeta)
)
s := testStateStore(t)
ca := &structs.CAConfiguration{
Provider: "consul",
}
err := s.CASetConfig(1, ca)
require.NoError(t, err)
require.NoError(t, s.EnsureNode(2, &structs.Node{
ID: "c73b8fdf-4ef8-4e43-9aa2-59e85cc6a70c",
Node: "foo",
}))
// Register a service with our target as an upstream, and it should show up as a downstream
web := structs.NodeService{
Kind: structs.ServiceKindConnectProxy,
ID: "web-proxy",
Service: "web-proxy",
Address: "127.0.0.2",
Port: 443,
Proxy: structs.ConnectProxyConfig{
DestinationServiceName: "web",
Upstreams: structs.Upstreams{
structs.Upstream{
DestinationName: "db",
},
structs.Upstream{
DestinationName: "admin",
},
},
},
EnterpriseMeta: *defaultMeta,
}
require.NoError(t, s.EnsureService(3, "foo", &web))
ws := memdb.NewWatchSet()
tx := s.db.ReadTxn()
idx, names, err := s.downstreamsForServiceTxn(tx, ws, "dc1", target)
require.NoError(t, err)
tx.Abort()
expect := []structs.ServiceName{
{Name: "web", EnterpriseMeta: *defaultMeta},
}
require.Equal(t, uint64(3), idx)
require.ElementsMatch(t, expect, names)
// Register a service WITHOUT our target as an upstream, and the watch should not fire
api := structs.NodeService{
Kind: structs.ServiceKindConnectProxy,
ID: "api-proxy",
Service: "api-proxy",
Address: "127.0.0.1",
Port: 443,
Proxy: structs.ConnectProxyConfig{
DestinationServiceName: "api",
Upstreams: structs.Upstreams{
structs.Upstream{
DestinationName: "cache",
},
structs.Upstream{
DestinationName: "db",
},
structs.Upstream{
DestinationName: "old-admin",
},
},
},
EnterpriseMeta: *defaultMeta,
}
require.NoError(t, s.EnsureService(4, "foo", &api))
require.False(t, watchFired(ws))
// Update the routing so that api's upstream routes to our target and watches should fire
defaults := structs.ProxyConfigEntry{
Kind: structs.ProxyDefaults,
Name: structs.ProxyConfigGlobal,
Config: map[string]interface{}{
"protocol": "http",
},
}
require.NoError(t, defaults.Normalize())
require.NoError(t, s.EnsureConfigEntry(5, &defaults, nil))
router := structs.ServiceRouterConfigEntry{
Kind: structs.ServiceRouter,
Name: "old-admin",
Routes: []structs.ServiceRoute{
{
Match: &structs.ServiceRouteMatch{
HTTP: &structs.ServiceRouteHTTPMatch{
PathExact: "/v2",
},
},
Destination: &structs.ServiceRouteDestination{
Service: "admin",
},
},
},
}
require.NoError(t, router.Normalize())
require.NoError(t, s.EnsureConfigEntry(6, &router, nil))
// We updated a relevant config entry
require.True(t, watchFired(ws))
ws = memdb.NewWatchSet()
tx = s.db.ReadTxn()
idx, names, err = s.downstreamsForServiceTxn(tx, ws, "dc1", target)
require.NoError(t, err)
tx.Abort()
expect = []structs.ServiceName{
// get web from listing admin directly as an upstream
{Name: "web", EnterpriseMeta: *defaultMeta},
// get api from old-admin routing to admin and web listing old-admin as an upstream
{Name: "api", EnterpriseMeta: *defaultMeta},
}
require.Equal(t, uint64(6), idx)
require.ElementsMatch(t, expect, names)
}

View File

@ -1,8 +1,10 @@
package state
import (
"errors"
"fmt"
"github.com/hashicorp/consul/agent/connect"
"github.com/hashicorp/consul/agent/consul/discoverychain"
"github.com/hashicorp/consul/agent/structs"
"github.com/hashicorp/consul/lib"
@ -371,6 +373,96 @@ var serviceGraphKinds = []string{
structs.ServiceResolver,
}
// discoveryChainTargets will return a list of services listed as a target for the input's discovery chain
func (s *Store) discoveryChainTargetsTxn(tx ReadTxn, ws memdb.WatchSet, dc, service string, entMeta *structs.EnterpriseMeta) (uint64, []structs.ServiceName, error) {
source := structs.NewServiceName(service, entMeta)
req := discoverychain.CompileRequest{
ServiceName: source.Name,
EvaluateInNamespace: source.NamespaceOrDefault(),
EvaluateInDatacenter: dc,
UseInDatacenter: dc,
}
idx, chain, err := s.serviceDiscoveryChainTxn(tx, ws, source.Name, entMeta, req)
if err != nil {
return 0, nil, fmt.Errorf("failed to fetch discovery chain for %q: %v", source.String(), err)
}
var resp []structs.ServiceName
for _, t := range chain.Targets {
em := structs.EnterpriseMetaInitializer(t.Namespace)
target := structs.NewServiceName(t.Service, &em)
// TODO (freddy): Allow upstream DC and encode in response
if t.Datacenter == dc {
resp = append(resp, target)
}
}
return idx, resp, nil
}
// discoveryChainSourcesTxn will return a list of services whose discovery chains have the given service as a target
func (s *Store) discoveryChainSourcesTxn(tx ReadTxn, ws memdb.WatchSet, dc string, destination structs.ServiceName) (uint64, []structs.ServiceName, error) {
seenLink := map[structs.ServiceName]bool{destination: true}
queue := []structs.ServiceName{destination}
for len(queue) > 0 {
// The "link" index returns config entries that reference a service
iter, err := tx.Get(configTableName, "link", queue[0].ToServiceID())
if err != nil {
return 0, nil, err
}
ws.Add(iter.WatchCh())
for raw := iter.Next(); raw != nil; raw = iter.Next() {
entry := raw.(structs.ConfigEntry)
sn := structs.NewServiceName(entry.GetName(), entry.GetEnterpriseMeta())
if !seenLink[sn] {
seenLink[sn] = true
queue = append(queue, sn)
}
}
queue = queue[1:]
}
var (
maxIdx uint64 = 1
resp []structs.ServiceName
)
// Only return the services that target the destination anywhere in their discovery chains.
seenSource := make(map[structs.ServiceName]bool)
for sn := range seenLink {
req := discoverychain.CompileRequest{
ServiceName: sn.Name,
EvaluateInNamespace: sn.NamespaceOrDefault(),
EvaluateInDatacenter: dc,
UseInDatacenter: dc,
}
idx, chain, err := s.serviceDiscoveryChainTxn(tx, ws, sn.Name, &sn.EnterpriseMeta, req)
if err != nil {
return 0, nil, fmt.Errorf("failed to fetch discovery chain for %q: %v", sn.String(), err)
}
for _, t := range chain.Targets {
em := structs.EnterpriseMetaInitializer(t.Namespace)
candidate := structs.NewServiceName(t.Service, &em)
if !candidate.Matches(&destination) {
continue
}
if idx > maxIdx {
maxIdx = idx
}
if !seenSource[sn] {
seenSource[sn] = true
resp = append(resp, sn)
}
}
}
return maxIdx, resp, nil
}
func validateProposedConfigEntryInServiceGraph(
tx ReadTxn,
kind, name string,
@ -555,6 +647,57 @@ func testCompileDiscoveryChain(
return chain.Protocol, chain.Nodes[chain.StartNode], nil
}
func (s *Store) ServiceDiscoveryChain(
ws memdb.WatchSet,
serviceName string,
entMeta *structs.EnterpriseMeta,
req discoverychain.CompileRequest,
) (uint64, *structs.CompiledDiscoveryChain, error) {
tx := s.db.ReadTxn()
defer tx.Abort()
return s.serviceDiscoveryChainTxn(tx, ws, serviceName, entMeta, req)
}
func (s *Store) serviceDiscoveryChainTxn(
tx ReadTxn,
ws memdb.WatchSet,
serviceName string,
entMeta *structs.EnterpriseMeta,
req discoverychain.CompileRequest,
) (uint64, *structs.CompiledDiscoveryChain, error) {
index, entries, err := readDiscoveryChainConfigEntriesTxn(tx, ws, serviceName, nil, entMeta)
if err != nil {
return 0, nil, err
}
req.Entries = entries
_, config, err := s.CAConfig(ws)
if err != nil {
return 0, nil, err
} else if config == nil {
return 0, nil, errors.New("no cluster ca config setup")
}
// Build TrustDomain based on the ClusterID stored.
signingID := connect.SpiffeIDSigningForCluster(config)
if signingID == nil {
// If CA is bootstrapped at all then this should never happen but be
// defensive.
return 0, nil, errors.New("no cluster trust domain setup")
}
req.EvaluateInTrustDomain = signingID.Host()
// Then we compile it into something useful.
chain, err := discoverychain.Compile(req)
if err != nil {
return 0, nil, fmt.Errorf("failed to compile discovery chain: %v", err)
}
return index, chain, nil
}
// ReadDiscoveryChainConfigEntries will query for the full discovery chain for
// the provided service name. All relevant config entries will be recursively
// fetched and included in the result.

View File

@ -1246,7 +1246,7 @@ func TestStore_ReadDiscoveryChainConfigEntries_SubsetSplit(t *testing.T) {
require.NoError(t, s.EnsureConfigEntry(0, entry, nil))
}
_, entrySet, err := s.ReadDiscoveryChainConfigEntries(nil, "main", nil)
_, entrySet, err := s.readDiscoveryChainConfigEntries(nil, "main", nil, nil)
require.NoError(t, err)
require.Len(t, entrySet.Routers, 0)
@ -1452,3 +1452,490 @@ func TestStore_ValidateIngressGatewayErrorOnMismatchedProtocols(t *testing.T) {
require.NoError(t, s.DeleteConfigEntry(5, structs.IngressGateway, "gateway", nil))
})
}
func TestSourcesForTarget(t *testing.T) {
defaultMeta := *structs.DefaultEnterpriseMeta()
type expect struct {
idx uint64
names []structs.ServiceName
}
tt := []struct {
name string
entries []structs.ConfigEntry
expect expect
}{
{
name: "no relevant config entries",
entries: []structs.ConfigEntry{},
expect: expect{
idx: 1,
names: []structs.ServiceName{
{Name: "sink", EnterpriseMeta: defaultMeta},
},
},
},
{
name: "from route match",
entries: []structs.ConfigEntry{
&structs.ProxyConfigEntry{
Kind: structs.ProxyDefaults,
Name: structs.ProxyConfigGlobal,
Config: map[string]interface{}{
"protocol": "http",
},
},
&structs.ServiceRouterConfigEntry{
Kind: structs.ServiceRouter,
Name: "web",
Routes: []structs.ServiceRoute{
{
Match: &structs.ServiceRouteMatch{
HTTP: &structs.ServiceRouteHTTPMatch{
PathExact: "/sink",
},
},
Destination: &structs.ServiceRouteDestination{
Service: "sink",
},
},
},
},
},
expect: expect{
idx: 2,
names: []structs.ServiceName{
{Name: "web", EnterpriseMeta: defaultMeta},
{Name: "sink", EnterpriseMeta: defaultMeta},
},
},
},
{
name: "from redirect",
entries: []structs.ConfigEntry{
&structs.ProxyConfigEntry{
Kind: structs.ProxyDefaults,
Name: structs.ProxyConfigGlobal,
Config: map[string]interface{}{
"protocol": "http",
},
},
&structs.ServiceResolverConfigEntry{
Kind: structs.ServiceResolver,
Name: "web",
Redirect: &structs.ServiceResolverRedirect{
Service: "sink",
},
},
},
expect: expect{
idx: 2,
names: []structs.ServiceName{
{Name: "web", EnterpriseMeta: defaultMeta},
{Name: "sink", EnterpriseMeta: defaultMeta},
},
},
},
{
name: "from failover",
entries: []structs.ConfigEntry{
&structs.ProxyConfigEntry{
Kind: structs.ProxyDefaults,
Name: structs.ProxyConfigGlobal,
Config: map[string]interface{}{
"protocol": "http",
},
},
&structs.ServiceResolverConfigEntry{
Kind: structs.ServiceResolver,
Name: "web",
Failover: map[string]structs.ServiceResolverFailover{
"*": {
Service: "sink",
Datacenters: []string{"dc2", "dc3"},
},
},
},
},
expect: expect{
idx: 2,
names: []structs.ServiceName{
{Name: "web", EnterpriseMeta: defaultMeta},
{Name: "sink", EnterpriseMeta: defaultMeta},
},
},
},
{
name: "from splitter",
entries: []structs.ConfigEntry{
&structs.ProxyConfigEntry{
Kind: structs.ProxyDefaults,
Name: structs.ProxyConfigGlobal,
Config: map[string]interface{}{
"protocol": "http",
},
},
&structs.ServiceSplitterConfigEntry{
Kind: structs.ServiceSplitter,
Name: "web",
Splits: []structs.ServiceSplit{
{Weight: 90, Service: "web"},
{Weight: 10, Service: "sink"},
},
},
},
expect: expect{
idx: 2,
names: []structs.ServiceName{
{Name: "web", EnterpriseMeta: defaultMeta},
{Name: "sink", EnterpriseMeta: defaultMeta},
},
},
},
{
name: "chained route redirect",
entries: []structs.ConfigEntry{
&structs.ProxyConfigEntry{
Kind: structs.ProxyDefaults,
Name: structs.ProxyConfigGlobal,
Config: map[string]interface{}{
"protocol": "http",
},
},
&structs.ServiceRouterConfigEntry{
Kind: structs.ServiceRouter,
Name: "source",
Routes: []structs.ServiceRoute{
{
Match: &structs.ServiceRouteMatch{
HTTP: &structs.ServiceRouteHTTPMatch{
PathExact: "/route",
},
},
Destination: &structs.ServiceRouteDestination{
Service: "routed",
},
},
},
},
&structs.ServiceResolverConfigEntry{
Kind: structs.ServiceResolver,
Name: "routed",
Redirect: &structs.ServiceResolverRedirect{
Service: "sink",
},
},
},
expect: expect{
idx: 3,
names: []structs.ServiceName{
{Name: "source", EnterpriseMeta: defaultMeta},
{Name: "routed", EnterpriseMeta: defaultMeta},
{Name: "sink", EnterpriseMeta: defaultMeta},
},
},
},
{
name: "kitchen sink with multiple services referencing sink directly",
entries: []structs.ConfigEntry{
&structs.ProxyConfigEntry{
Kind: structs.ProxyDefaults,
Name: structs.ProxyConfigGlobal,
Config: map[string]interface{}{
"protocol": "http",
},
},
&structs.ServiceRouterConfigEntry{
Kind: structs.ServiceRouter,
Name: "routed",
Routes: []structs.ServiceRoute{
{
Match: &structs.ServiceRouteMatch{
HTTP: &structs.ServiceRouteHTTPMatch{
PathExact: "/sink",
},
},
Destination: &structs.ServiceRouteDestination{
Service: "sink",
},
},
},
},
&structs.ServiceResolverConfigEntry{
Kind: structs.ServiceResolver,
Name: "redirected",
Redirect: &structs.ServiceResolverRedirect{
Service: "sink",
},
},
&structs.ServiceResolverConfigEntry{
Kind: structs.ServiceResolver,
Name: "failed-over",
Failover: map[string]structs.ServiceResolverFailover{
"*": {
Service: "sink",
Datacenters: []string{"dc2", "dc3"},
},
},
},
&structs.ServiceSplitterConfigEntry{
Kind: structs.ServiceSplitter,
Name: "split",
Splits: []structs.ServiceSplit{
{Weight: 90, Service: "no-op"},
{Weight: 10, Service: "sink"},
},
},
&structs.ServiceSplitterConfigEntry{
Kind: structs.ServiceSplitter,
Name: "unrelated",
Splits: []structs.ServiceSplit{
{Weight: 90, Service: "zip"},
{Weight: 10, Service: "zop"},
},
},
},
expect: expect{
idx: 6,
names: []structs.ServiceName{
{Name: "split", EnterpriseMeta: defaultMeta},
{Name: "failed-over", EnterpriseMeta: defaultMeta},
{Name: "redirected", EnterpriseMeta: defaultMeta},
{Name: "routed", EnterpriseMeta: defaultMeta},
{Name: "sink", EnterpriseMeta: defaultMeta},
},
},
},
}
for _, tc := range tt {
t.Run(tc.name, func(t *testing.T) {
s := testStateStore(t)
ws := memdb.NewWatchSet()
ca := &structs.CAConfiguration{
Provider: "consul",
}
err := s.CASetConfig(0, ca)
require.NoError(t, err)
var i uint64 = 1
for _, entry := range tc.entries {
require.NoError(t, entry.Normalize())
require.NoError(t, s.EnsureConfigEntry(i, entry, nil))
i++
}
tx := s.db.ReadTxn()
defer tx.Abort()
sn := structs.NewServiceName("sink", structs.DefaultEnterpriseMeta())
idx, names, err := s.discoveryChainSourcesTxn(tx, ws, "dc1", sn)
require.NoError(t, err)
require.Equal(t, tc.expect.idx, idx)
require.ElementsMatch(t, tc.expect.names, names)
})
}
}
func TestTargetsForSource(t *testing.T) {
defaultMeta := *structs.DefaultEnterpriseMeta()
type expect struct {
idx uint64
ids []structs.ServiceName
}
tt := []struct {
name string
entries []structs.ConfigEntry
expect expect
}{
{
name: "from route match",
entries: []structs.ConfigEntry{
&structs.ProxyConfigEntry{
Kind: structs.ProxyDefaults,
Name: structs.ProxyConfigGlobal,
Config: map[string]interface{}{
"protocol": "http",
},
},
&structs.ServiceRouterConfigEntry{
Kind: structs.ServiceRouter,
Name: "web",
Routes: []structs.ServiceRoute{
{
Match: &structs.ServiceRouteMatch{
HTTP: &structs.ServiceRouteHTTPMatch{
PathExact: "/sink",
},
},
Destination: &structs.ServiceRouteDestination{
Service: "sink",
},
},
},
},
},
expect: expect{
idx: 2,
ids: []structs.ServiceName{
{Name: "web", EnterpriseMeta: defaultMeta},
{Name: "sink", EnterpriseMeta: defaultMeta},
},
},
},
{
name: "from redirect",
entries: []structs.ConfigEntry{
&structs.ProxyConfigEntry{
Kind: structs.ProxyDefaults,
Name: structs.ProxyConfigGlobal,
Config: map[string]interface{}{
"protocol": "http",
},
},
&structs.ServiceResolverConfigEntry{
Kind: structs.ServiceResolver,
Name: "web",
Redirect: &structs.ServiceResolverRedirect{
Service: "sink",
},
},
},
expect: expect{
idx: 2,
ids: []structs.ServiceName{
{Name: "sink", EnterpriseMeta: defaultMeta},
},
},
},
{
name: "from failover",
entries: []structs.ConfigEntry{
&structs.ProxyConfigEntry{
Kind: structs.ProxyDefaults,
Name: structs.ProxyConfigGlobal,
Config: map[string]interface{}{
"protocol": "http",
},
},
&structs.ServiceResolverConfigEntry{
Kind: structs.ServiceResolver,
Name: "web",
Failover: map[string]structs.ServiceResolverFailover{
"*": {
Service: "remote-web",
Datacenters: []string{"dc2", "dc3"},
},
},
},
},
expect: expect{
idx: 2,
ids: []structs.ServiceName{
{Name: "web", EnterpriseMeta: defaultMeta},
},
},
},
{
name: "from splitter",
entries: []structs.ConfigEntry{
&structs.ProxyConfigEntry{
Kind: structs.ProxyDefaults,
Name: structs.ProxyConfigGlobal,
Config: map[string]interface{}{
"protocol": "http",
},
},
&structs.ServiceSplitterConfigEntry{
Kind: structs.ServiceSplitter,
Name: "web",
Splits: []structs.ServiceSplit{
{Weight: 90, Service: "web"},
{Weight: 10, Service: "sink"},
},
},
},
expect: expect{
idx: 2,
ids: []structs.ServiceName{
{Name: "web", EnterpriseMeta: defaultMeta},
{Name: "sink", EnterpriseMeta: defaultMeta},
},
},
},
{
name: "chained route redirect",
entries: []structs.ConfigEntry{
&structs.ProxyConfigEntry{
Kind: structs.ProxyDefaults,
Name: structs.ProxyConfigGlobal,
Config: map[string]interface{}{
"protocol": "http",
},
},
&structs.ServiceRouterConfigEntry{
Kind: structs.ServiceRouter,
Name: "web",
Routes: []structs.ServiceRoute{
{
Match: &structs.ServiceRouteMatch{
HTTP: &structs.ServiceRouteHTTPMatch{
PathExact: "/route",
},
},
Destination: &structs.ServiceRouteDestination{
Service: "routed",
},
},
},
},
&structs.ServiceResolverConfigEntry{
Kind: structs.ServiceResolver,
Name: "routed",
Redirect: &structs.ServiceResolverRedirect{
Service: "sink",
},
},
},
expect: expect{
idx: 3,
ids: []structs.ServiceName{
{Name: "web", EnterpriseMeta: defaultMeta},
{Name: "sink", EnterpriseMeta: defaultMeta},
},
},
},
}
for _, tc := range tt {
t.Run(tc.name, func(t *testing.T) {
s := testStateStore(t)
ws := memdb.NewWatchSet()
ca := &structs.CAConfiguration{
Provider: "consul",
}
err := s.CASetConfig(0, ca)
require.NoError(t, err)
var i uint64 = 1
for _, entry := range tc.entries {
require.NoError(t, entry.Normalize())
require.NoError(t, s.EnsureConfigEntry(i, entry, nil))
i++
}
tx := s.db.ReadTxn()
defer tx.Abort()
idx, ids, err := s.discoveryChainTargetsTxn(tx, ws, "dc1", "web", nil)
require.NoError(t, err)
require.Equal(t, tc.expect.idx, idx)
require.ElementsMatch(t, tc.expect.ids, ids)
})
}
}

View File

@ -99,6 +99,7 @@ func init() {
registerEndpoint("/v1/internal/ui/services", []string{"GET"}, (*HTTPHandlers).UIServices)
registerEndpoint("/v1/internal/ui/gateway-services-nodes/", []string{"GET"}, (*HTTPHandlers).UIGatewayServicesNodes)
registerEndpoint("/v1/internal/ui/gateway-intentions/", []string{"GET"}, (*HTTPHandlers).UIGatewayIntentions)
registerEndpoint("/v1/internal/ui/service-topology/", []string{"GET"}, (*HTTPHandlers).UIServiceTopology)
registerEndpoint("/v1/internal/acl/authorize", []string{"POST"}, (*HTTPHandlers).ACLAuthorize)
registerEndpoint("/v1/kv/", []string{"GET", "PUT", "DELETE"}, (*HTTPHandlers).KVSEndpoint)
registerEndpoint("/v1/operator/raft/configuration", []string{"GET"}, (*HTTPHandlers).OperatorRaftConfiguration)

View File

@ -1031,6 +1031,14 @@ func (ns *NodeService) CompoundServiceName() ServiceName {
}
}
// UniqueID is a unique identifier for a service instance within a datacenter by encoding:
// node/namespace/service_id
//
// Note: We do not have strict character restrictions in all node names, so this should NOT be split on / to retrieve components.
func UniqueID(node string, compoundID string) string {
return fmt.Sprintf("%s/%s", node, compoundID)
}
// ServiceConnect are the shared Connect settings between all service
// definitions from the agent to the state store.
type ServiceConnect struct {
@ -1849,6 +1857,17 @@ type IndexedGatewayServices struct {
QueryMeta
}
type IndexedServiceTopology struct {
ServiceTopology *ServiceTopology
FilteredByACLs bool
QueryMeta
}
type ServiceTopology struct {
Upstreams CheckServiceNodes
Downstreams CheckServiceNodes
}
// IndexedConfigEntries has its own encoding logic which differs from
// ConfigEntryRequest as it has to send a slice of ConfigEntry.
type IndexedConfigEntries struct {
@ -2391,3 +2410,18 @@ func (r *KeyringResponses) Add(v interface{}) {
func (r *KeyringResponses) New() interface{} {
return new(KeyringResponses)
}
// UpstreamDownstream pairs come from individual proxy registrations, which can be updated independently.
type UpstreamDownstream struct {
Upstream ServiceName
Downstream ServiceName
// Refs stores the registrations that contain this pairing.
// When there are no remaining Refs, the UpstreamDownstream can be deleted.
//
// Note: This map must be treated as immutable when accessed in MemDB.
// The entire UpstreamDownstream structure must be deep copied on updates.
Refs map[string]struct{}
RaftIndex
}

View File

@ -16,30 +16,51 @@ import (
// to extract this.
const metaExternalSource = "external-source"
// ServiceSummary is used to summarize a service
type ServiceSummary struct {
Kind structs.ServiceKind `json:",omitempty"`
Name string
Datacenter string
Tags []string
Nodes []string
ExternalSources []string
externalSourceSet map[string]struct{} // internal to track uniqueness
checks map[string]*structs.HealthCheck
InstanceCount int
ChecksPassing int
ChecksWarning int
ChecksCritical int
GatewayConfig GatewayConfig
structs.EnterpriseMeta
}
func (s *ServiceSummary) LessThan(other *ServiceSummary) bool {
if s.EnterpriseMeta.LessThan(&other.EnterpriseMeta) {
return true
}
return s.Name < other.Name
}
type ServiceListingSummary struct {
ServiceSummary
ConnectedWithProxy bool
ConnectedWithGateway bool
}
type GatewayConfig struct {
AssociatedServiceCount int `json:",omitempty"`
Addresses []string `json:",omitempty"`
// internal to track uniqueness
addressesSet map[string]struct{}
}
// ServiceSummary is used to summarize a service
type ServiceSummary struct {
Kind structs.ServiceKind `json:",omitempty"`
Name string
Tags []string
Nodes []string
InstanceCount int
ChecksPassing int
ChecksWarning int
ChecksCritical int
ExternalSources []string
externalSourceSet map[string]struct{} // internal to track uniqueness
GatewayConfig GatewayConfig `json:",omitempty"`
ConnectedWithProxy bool
ConnectedWithGateway bool
structs.EnterpriseMeta
type ServiceTopology struct {
Upstreams []*ServiceSummary
Downstreams []*ServiceSummary
FilteredByACLs bool
}
// UINodes is used to list the nodes in a given datacenter. We return a
@ -163,9 +184,39 @@ RPC:
return nil, err
}
// Generate the summary
// TODO (gateways) (freddy) Have Internal.ServiceDump return ServiceDump instead. Need to add bexpr filtering for type.
return summarizeServices(out.Nodes.ToServiceDump(), out.Gateways, s.agent.config, args.Datacenter), nil
// Store the names of the gateways associated with each service
var (
serviceGateways = make(map[structs.ServiceName][]structs.ServiceName)
numLinkedServices = make(map[structs.ServiceName]int)
)
for _, gs := range out.Gateways {
serviceGateways[gs.Service] = append(serviceGateways[gs.Service], gs.Gateway)
numLinkedServices[gs.Gateway] += 1
}
summaries, hasProxy := summarizeServices(out.Nodes.ToServiceDump(), nil, "")
sorted := prepSummaryOutput(summaries, false)
var result []*ServiceListingSummary
for _, svc := range sorted {
sum := ServiceListingSummary{ServiceSummary: *svc}
sn := structs.NewServiceName(svc.Name, &svc.EnterpriseMeta)
if hasProxy[sn] {
sum.ConnectedWithProxy = true
}
// Verify that at least one of the gateways linked by config entry has an instance registered in the catalog
for _, gw := range serviceGateways[sn] {
if s := summaries[gw]; s != nil && sum.InstanceCount > 0 {
sum.ConnectedWithGateway = true
}
}
sum.GatewayConfig.AssociatedServiceCount = numLinkedServices[sn]
result = append(result, &sum)
}
return result, nil
}
// UIGatewayServices is used to query all the nodes for services associated with a gateway along with their gateway config
@ -200,17 +251,59 @@ RPC:
return nil, err
}
return summarizeServices(out.Dump, nil, s.agent.config, args.Datacenter), nil
summaries, _ := summarizeServices(out.Dump, s.agent.config, args.Datacenter)
return prepSummaryOutput(summaries, false), nil
}
// TODO (freddy): Refactor to split up for the two use cases
func summarizeServices(dump structs.ServiceDump, gateways structs.GatewayServices, cfg *config.RuntimeConfig, dc string) []*ServiceSummary {
// Collect the summary information
var services []structs.ServiceName
summary := make(map[structs.ServiceName]*ServiceSummary)
// UIServiceTopology returns the list of upstreams and downstreams for a Connect enabled service.
// - Downstreams are services that list the given service as an upstream
// - Upstreams are the upstreams defined in the given service's proxy registrations
func (s *HTTPHandlers) UIServiceTopology(resp http.ResponseWriter, req *http.Request) (interface{}, error) {
// Parse arguments
args := structs.ServiceSpecificRequest{}
if done := s.parse(resp, req, &args.Datacenter, &args.QueryOptions); done {
return nil, nil
}
if err := s.parseEntMeta(req, &args.EnterpriseMeta); err != nil {
return nil, err
}
linkedGateways := make(map[structs.ServiceName][]structs.ServiceName)
hasProxy := make(map[structs.ServiceName]bool)
args.ServiceName = strings.TrimPrefix(req.URL.Path, "/v1/internal/ui/service-topology/")
if args.ServiceName == "" {
resp.WriteHeader(http.StatusBadRequest)
fmt.Fprint(resp, "Missing service name")
return nil, nil
}
// Make the RPC request
var out structs.IndexedServiceTopology
defer setMeta(resp, &out.QueryMeta)
RPC:
if err := s.agent.RPC("Internal.ServiceTopology", &args, &out); err != nil {
// Retry the request allowing stale data if no leader
if strings.Contains(err.Error(), structs.ErrNoLeader.Error()) && !args.AllowStale {
args.AllowStale = true
goto RPC
}
return nil, err
}
upstreams, _ := summarizeServices(out.ServiceTopology.Upstreams.ToServiceDump(), nil, "")
downstreams, _ := summarizeServices(out.ServiceTopology.Downstreams.ToServiceDump(), nil, "")
sum := ServiceTopology{
Upstreams: prepSummaryOutput(upstreams, true),
Downstreams: prepSummaryOutput(downstreams, true),
FilteredByACLs: out.FilteredByACLs,
}
return sum, nil
}
func summarizeServices(dump structs.ServiceDump, cfg *config.RuntimeConfig, dc string) (map[structs.ServiceName]*ServiceSummary, map[structs.ServiceName]bool) {
var (
summary = make(map[structs.ServiceName]*ServiceSummary)
hasProxy = make(map[structs.ServiceName]bool)
)
getService := func(service structs.ServiceName) *ServiceSummary {
serv, ok := summary[service]
@ -223,22 +316,12 @@ func summarizeServices(dump structs.ServiceDump, gateways structs.GatewayService
InstanceCount: 0,
}
summary[service] = serv
services = append(services, service)
}
return serv
}
// Collect the list of services linked to each gateway up front
// THis also allows tracking whether a service name is associated with a gateway
gsCount := make(map[structs.ServiceName]int)
for _, gs := range gateways {
gsCount[gs.Gateway] += 1
linkedGateways[gs.Service] = append(linkedGateways[gs.Service], gs.Gateway)
}
for _, csn := range dump {
if csn.GatewayService != nil {
if cfg != nil && csn.GatewayService != nil {
gwsvc := csn.GatewayService
sum := getService(gwsvc.Service)
modifySummaryForGatewayService(cfg, dc, sum, gwsvc)
@ -248,15 +331,27 @@ func summarizeServices(dump structs.ServiceDump, gateways structs.GatewayService
if csn.Service == nil {
continue
}
sid := structs.NewServiceName(csn.Service.Service, &csn.Service.EnterpriseMeta)
sum := getService(sid)
sn := structs.NewServiceName(csn.Service.Service, &csn.Service.EnterpriseMeta)
sum := getService(sn)
svc := csn.Service
sum.Nodes = append(sum.Nodes, csn.Node.Node)
sum.Kind = svc.Kind
sum.Datacenter = csn.Node.Datacenter
sum.InstanceCount += 1
if svc.Kind == structs.ServiceKindConnectProxy {
hasProxy[structs.NewServiceName(svc.Proxy.DestinationServiceName, &svc.EnterpriseMeta)] = true
sn := structs.NewServiceName(svc.Proxy.DestinationServiceName, &svc.EnterpriseMeta)
hasProxy[sn] = true
destination := getService(sn)
for _, check := range csn.Checks {
cid := structs.NewCheckID(check.CheckID, &check.EnterpriseMeta)
uid := structs.UniqueID(csn.Node.Node, cid.String())
if destination.checks == nil {
destination.checks = make(map[string]*structs.HealthCheck)
}
destination.checks[uid] = check
}
}
for _, tag := range svc.Tags {
found := false
@ -266,7 +361,6 @@ func summarizeServices(dump structs.ServiceDump, gateways structs.GatewayService
break
}
}
if !found {
sum.Tags = append(sum.Tags, tag)
}
@ -288,7 +382,28 @@ func summarizeServices(dump structs.ServiceDump, gateways structs.GatewayService
}
for _, check := range csn.Checks {
switch check.Status {
cid := structs.NewCheckID(check.CheckID, &check.EnterpriseMeta)
uid := structs.UniqueID(csn.Node.Node, cid.String())
if sum.checks == nil {
sum.checks = make(map[string]*structs.HealthCheck)
}
sum.checks[uid] = check
}
}
return summary, hasProxy
}
func prepSummaryOutput(summaries map[structs.ServiceName]*ServiceSummary, excludeSidecars bool) []*ServiceSummary {
var resp []*ServiceSummary
// Collect and sort resp for display
for _, sum := range summaries {
sort.Strings(sum.Nodes)
sort.Strings(sum.Tags)
for _, chk := range sum.checks {
switch chk.Status {
case api.HealthPassing:
sum.ChecksPassing++
case api.HealthWarning:
@ -297,34 +412,15 @@ func summarizeServices(dump structs.ServiceDump, gateways structs.GatewayService
sum.ChecksCritical++
}
}
if excludeSidecars && sum.Kind != structs.ServiceKindTypical {
continue
}
resp = append(resp, sum)
}
// Return the services in sorted order
sort.Slice(services, func(i, j int) bool {
return services[i].LessThan(&services[j])
sort.Slice(resp, func(i, j int) bool {
return resp[i].LessThan(resp[j])
})
output := make([]*ServiceSummary, len(summary))
for idx, service := range services {
sum := summary[service]
if hasProxy[service] {
sum.ConnectedWithProxy = true
}
// Verify that at least one of the gateways linked by config entry has an instance registered in the catalog
for _, gw := range linkedGateways[service] {
if s := summary[gw]; s != nil && s.InstanceCount > 0 {
sum.ConnectedWithGateway = true
}
}
sum.GatewayConfig.AssociatedServiceCount = gsCount[service]
// Sort the nodes and tags
sort.Strings(sum.Nodes)
sort.Strings(sum.Tags)
output[idx] = sum
}
return output
return resp
}
func modifySummaryForGatewayService(

View File

@ -223,6 +223,7 @@ func TestUiServices(t *testing.T) {
Service: &structs.NodeService{
Kind: structs.ServiceKindTypical,
Service: "api",
ID: "api-1",
Tags: []string{"tag1", "tag2"},
},
Checks: structs.HealthChecks{
@ -230,18 +231,20 @@ func TestUiServices(t *testing.T) {
Node: "foo",
Name: "api svc check",
ServiceName: "api",
ServiceID: "api-1",
Status: api.HealthWarning,
},
},
},
// register web svc on node foo
// register api-proxy svc on node foo
{
Datacenter: "dc1",
Node: "foo",
SkipNodeUpdate: true,
Service: &structs.NodeService{
Kind: structs.ServiceKindConnectProxy,
Service: "web",
Service: "api-proxy",
ID: "api-proxy-1",
Tags: []string{},
Meta: map[string]string{metaExternalSource: "k8s"},
Port: 1234,
@ -252,8 +255,9 @@ func TestUiServices(t *testing.T) {
Checks: structs.HealthChecks{
&structs.HealthCheck{
Node: "foo",
Name: "web svc check",
ServiceName: "web",
Name: "api proxy listening",
ServiceName: "api-proxy",
ServiceID: "api-proxy-1",
Status: api.HealthPassing,
},
},
@ -264,14 +268,12 @@ func TestUiServices(t *testing.T) {
Node: "bar",
Address: "127.0.0.2",
Service: &structs.NodeService{
Kind: structs.ServiceKindConnectProxy,
Kind: structs.ServiceKindTypical,
Service: "web",
ID: "web-1",
Tags: []string{},
Meta: map[string]string{metaExternalSource: "k8s"},
Port: 1234,
Proxy: structs.ConnectProxyConfig{
DestinationServiceName: "api",
},
},
Checks: []*structs.HealthCheck{
{
@ -279,6 +281,7 @@ func TestUiServices(t *testing.T) {
Name: "web svc check",
Status: api.HealthCritical,
ServiceName: "web",
ServiceID: "web-1",
},
},
},
@ -366,76 +369,107 @@ func TestUiServices(t *testing.T) {
assertIndex(t, resp)
// Should be 2 nodes, and all the empty lists should be non-nil
summary := obj.([]*ServiceSummary)
require.Len(t, summary, 5)
summary := obj.([]*ServiceListingSummary)
require.Len(t, summary, 6)
// internal accounting that users don't see can be blown away
for _, sum := range summary {
sum.externalSourceSet = nil
sum.checks = nil
}
expected := []*ServiceSummary{
expected := []*ServiceListingSummary{
{
Kind: structs.ServiceKindTypical,
Name: "api",
Tags: []string{"tag1", "tag2"},
Nodes: []string{"foo"},
InstanceCount: 1,
ChecksPassing: 2,
ChecksWarning: 1,
ChecksCritical: 0,
ServiceSummary: ServiceSummary{
Kind: structs.ServiceKindTypical,
Name: "api",
Datacenter: "dc1",
Tags: []string{"tag1", "tag2"},
Nodes: []string{"foo"},
InstanceCount: 1,
ChecksPassing: 2,
ChecksWarning: 1,
ChecksCritical: 0,
EnterpriseMeta: *structs.DefaultEnterpriseMeta(),
},
ConnectedWithProxy: true,
ConnectedWithGateway: true,
EnterpriseMeta: *structs.DefaultEnterpriseMeta(),
},
{
Kind: structs.ServiceKindTypical,
Name: "cache",
Tags: nil,
Nodes: []string{"zip"},
InstanceCount: 1,
ChecksPassing: 0,
ChecksWarning: 0,
ChecksCritical: 0,
ServiceSummary: ServiceSummary{
Kind: structs.ServiceKindConnectProxy,
Name: "api-proxy",
Datacenter: "dc1",
Tags: nil,
Nodes: []string{"foo"},
InstanceCount: 1,
ChecksPassing: 2,
ChecksWarning: 0,
ChecksCritical: 0,
ExternalSources: []string{"k8s"},
EnterpriseMeta: *structs.DefaultEnterpriseMeta(),
},
},
{
ServiceSummary: ServiceSummary{
Kind: structs.ServiceKindTypical,
Name: "cache",
Datacenter: "dc1",
Tags: nil,
Nodes: []string{"zip"},
InstanceCount: 1,
ChecksPassing: 0,
ChecksWarning: 0,
ChecksCritical: 0,
EnterpriseMeta: *structs.DefaultEnterpriseMeta(),
},
ConnectedWithGateway: true,
EnterpriseMeta: *structs.DefaultEnterpriseMeta(),
},
{
Kind: structs.ServiceKindConnectProxy,
Name: "web",
Tags: nil,
Nodes: []string{"bar", "foo"},
InstanceCount: 2,
ChecksPassing: 2,
ChecksWarning: 1,
ChecksCritical: 1,
ExternalSources: []string{"k8s"},
EnterpriseMeta: *structs.DefaultEnterpriseMeta(),
ServiceSummary: ServiceSummary{
Kind: structs.ServiceKindTypical,
Name: "consul",
Datacenter: "dc1",
Tags: nil,
Nodes: []string{a.Config.NodeName},
InstanceCount: 1,
ChecksPassing: 1,
ChecksWarning: 0,
ChecksCritical: 0,
EnterpriseMeta: *structs.DefaultEnterpriseMeta(),
},
},
{
Kind: structs.ServiceKindTypical,
Name: "consul",
Tags: nil,
Nodes: []string{a.Config.NodeName},
InstanceCount: 1,
ChecksPassing: 1,
ChecksWarning: 0,
ChecksCritical: 0,
EnterpriseMeta: *structs.DefaultEnterpriseMeta(),
ServiceSummary: ServiceSummary{
Kind: structs.ServiceKindTerminatingGateway,
Name: "terminating-gateway",
Datacenter: "dc1",
Tags: nil,
Nodes: []string{"foo"},
InstanceCount: 1,
ChecksPassing: 1,
ChecksWarning: 0,
ChecksCritical: 0,
GatewayConfig: GatewayConfig{AssociatedServiceCount: 2},
EnterpriseMeta: *structs.DefaultEnterpriseMeta(),
},
},
{
Kind: structs.ServiceKindTerminatingGateway,
Name: "terminating-gateway",
Tags: nil,
Nodes: []string{"foo"},
InstanceCount: 1,
ChecksPassing: 2,
ChecksWarning: 1,
GatewayConfig: GatewayConfig{AssociatedServiceCount: 2},
EnterpriseMeta: *structs.DefaultEnterpriseMeta(),
ServiceSummary: ServiceSummary{
Kind: structs.ServiceKindTypical,
Name: "web",
Datacenter: "dc1",
Tags: nil,
Nodes: []string{"bar"},
InstanceCount: 1,
ChecksPassing: 0,
ChecksWarning: 0,
ChecksCritical: 1,
ExternalSources: []string{"k8s"},
EnterpriseMeta: *structs.DefaultEnterpriseMeta(),
},
},
}
require.ElementsMatch(t, expected, summary)
})
@ -448,39 +482,46 @@ func TestUiServices(t *testing.T) {
assertIndex(t, resp)
// Should be 2 nodes, and all the empty lists should be non-nil
summary := obj.([]*ServiceSummary)
summary := obj.([]*ServiceListingSummary)
require.Len(t, summary, 2)
// internal accounting that users don't see can be blown away
for _, sum := range summary {
sum.externalSourceSet = nil
sum.checks = nil
}
expected := []*ServiceSummary{
expected := []*ServiceListingSummary{
{
Kind: structs.ServiceKindTypical,
Name: "api",
Tags: []string{"tag1", "tag2"},
Nodes: []string{"foo"},
InstanceCount: 1,
ChecksPassing: 2,
ChecksWarning: 1,
ChecksCritical: 0,
ConnectedWithProxy: true,
ServiceSummary: ServiceSummary{
Kind: structs.ServiceKindTypical,
Name: "api",
Datacenter: "dc1",
Tags: []string{"tag1", "tag2"},
Nodes: []string{"foo"},
InstanceCount: 1,
ChecksPassing: 1,
ChecksWarning: 1,
ChecksCritical: 0,
EnterpriseMeta: *structs.DefaultEnterpriseMeta(),
},
ConnectedWithProxy: false,
ConnectedWithGateway: false,
EnterpriseMeta: *structs.DefaultEnterpriseMeta(),
},
{
Kind: structs.ServiceKindConnectProxy,
Name: "web",
Tags: nil,
Nodes: []string{"bar", "foo"},
InstanceCount: 2,
ChecksPassing: 2,
ChecksWarning: 1,
ChecksCritical: 1,
ExternalSources: []string{"k8s"},
EnterpriseMeta: *structs.DefaultEnterpriseMeta(),
ServiceSummary: ServiceSummary{
Kind: structs.ServiceKindTypical,
Name: "web",
Datacenter: "dc1",
Tags: nil,
Nodes: []string{"bar"},
InstanceCount: 1,
ChecksPassing: 0,
ChecksWarning: 0,
ChecksCritical: 1,
ExternalSources: []string{"k8s"},
EnterpriseMeta: *structs.DefaultEnterpriseMeta(),
},
},
}
require.ElementsMatch(t, expected, summary)
@ -582,7 +623,14 @@ func TestUIGatewayServiceNodes_Terminating(t *testing.T) {
assert.Nil(t, err)
assertIndex(t, resp)
dump := obj.([]*ServiceSummary)
summary := obj.([]*ServiceSummary)
// internal accounting that users don't see can be blown away
for _, sum := range summary {
sum.externalSourceSet = nil
sum.checks = nil
}
expect := []*ServiceSummary{
{
Name: "redis",
@ -590,6 +638,7 @@ func TestUIGatewayServiceNodes_Terminating(t *testing.T) {
},
{
Name: "db",
Datacenter: "dc1",
Tags: []string{"backup", "primary"},
Nodes: []string{"bar", "baz"},
InstanceCount: 2,
@ -599,7 +648,7 @@ func TestUIGatewayServiceNodes_Terminating(t *testing.T) {
EnterpriseMeta: *structs.DefaultEnterpriseMeta(),
},
}
assert.ElementsMatch(t, expect, dump)
assert.ElementsMatch(t, expect, summary)
}
func TestUIGatewayServiceNodes_Ingress(t *testing.T) {
@ -748,6 +797,7 @@ func TestUIGatewayServiceNodes_Ingress(t *testing.T) {
},
{
Name: "db",
Datacenter: "dc1",
Tags: []string{"backup", "primary"},
Nodes: []string{"bar", "baz"},
InstanceCount: 2,
@ -767,6 +817,7 @@ func TestUIGatewayServiceNodes_Ingress(t *testing.T) {
// internal accounting that users don't see can be blown away
for _, sum := range dump {
sum.GatewayConfig.addressesSet = nil
sum.checks = nil
}
assert.ElementsMatch(t, expect, dump)
}
@ -878,3 +929,386 @@ func TestUIEndpoint_modifySummaryForGatewayService_UseRequestedDCInsteadOfConfig
expected := serviceCanonicalDNSName("test", "ingress", "dc2", "consul", nil) + ":42"
require.Equal(t, expected, sum.GatewayConfig.Addresses[0])
}
func TestUIServiceTopology(t *testing.T) {
t.Parallel()
a := NewTestAgent(t, "")
defer a.Shutdown()
// Register terminating gateway and config entry linking it to postgres + redis
{
registrations := map[string]*structs.RegisterRequest{
"Node foo": {
Datacenter: "dc1",
Node: "foo",
Address: "127.0.0.2",
Checks: structs.HealthChecks{
&structs.HealthCheck{
Node: "foo",
CheckID: "foo:alive",
Name: "foo-liveness",
Status: api.HealthPassing,
},
},
},
"Service api on foo": {
Datacenter: "dc1",
Node: "foo",
SkipNodeUpdate: true,
Service: &structs.NodeService{
Kind: structs.ServiceKindTypical,
ID: "api",
Service: "api",
Port: 9090,
Address: "198.18.1.2",
},
Checks: structs.HealthChecks{
&structs.HealthCheck{
Node: "foo",
CheckID: "foo:api",
Name: "api-liveness",
Status: api.HealthPassing,
ServiceID: "api",
ServiceName: "api",
},
},
},
"Service api-proxy": {
Datacenter: "dc1",
Node: "foo",
SkipNodeUpdate: true,
Service: &structs.NodeService{
Kind: structs.ServiceKindConnectProxy,
ID: "api-proxy",
Service: "api-proxy",
Port: 8443,
Address: "198.18.1.2",
Proxy: structs.ConnectProxyConfig{
DestinationServiceName: "api",
Upstreams: structs.Upstreams{
{
DestinationName: "web",
LocalBindPort: 8080,
},
},
},
},
Checks: structs.HealthChecks{
&structs.HealthCheck{
Node: "foo",
CheckID: "foo:api-proxy",
Name: "api proxy listening",
Status: api.HealthPassing,
ServiceID: "api-proxy",
ServiceName: "api-proxy",
},
},
},
"Node bar": {
Datacenter: "dc1",
Node: "bar",
Address: "127.0.0.3",
Checks: structs.HealthChecks{
&structs.HealthCheck{
Node: "bar",
CheckID: "bar:alive",
Name: "bar-liveness",
Status: api.HealthPassing,
},
},
},
"Service web on bar": {
Datacenter: "dc1",
Node: "bar",
SkipNodeUpdate: true,
Service: &structs.NodeService{
Kind: structs.ServiceKindTypical,
ID: "web",
Service: "web",
Port: 80,
Address: "198.18.1.20",
},
Checks: structs.HealthChecks{
&structs.HealthCheck{
Node: "bar",
CheckID: "bar:web",
Name: "web-liveness",
Status: api.HealthWarning,
ServiceID: "web",
ServiceName: "web",
},
},
},
"Service web-proxy on bar": {
Datacenter: "dc1",
Node: "bar",
SkipNodeUpdate: true,
Service: &structs.NodeService{
Kind: structs.ServiceKindConnectProxy,
ID: "web-proxy",
Service: "web-proxy",
Port: 8443,
Address: "198.18.1.20",
Proxy: structs.ConnectProxyConfig{
DestinationServiceName: "web",
Upstreams: structs.Upstreams{
{
DestinationName: "redis",
LocalBindPort: 123,
},
},
},
},
Checks: structs.HealthChecks{
&structs.HealthCheck{
Node: "bar",
CheckID: "bar:web-proxy",
Name: "web proxy listening",
Status: api.HealthCritical,
ServiceID: "web-proxy",
ServiceName: "web-proxy",
},
},
},
"Node baz": {
Datacenter: "dc1",
Node: "baz",
Address: "127.0.0.4",
Checks: structs.HealthChecks{
&structs.HealthCheck{
Node: "baz",
CheckID: "baz:alive",
Name: "baz-liveness",
Status: api.HealthPassing,
},
},
},
"Service web on baz": {
Datacenter: "dc1",
Node: "baz",
SkipNodeUpdate: true,
Service: &structs.NodeService{
Kind: structs.ServiceKindTypical,
ID: "web",
Service: "web",
Port: 80,
Address: "198.18.1.40",
},
Checks: structs.HealthChecks{
&structs.HealthCheck{
Node: "baz",
CheckID: "baz:web",
Name: "web-liveness",
Status: api.HealthPassing,
ServiceID: "web",
ServiceName: "web",
},
},
},
"Service web-proxy on baz": {
Datacenter: "dc1",
Node: "baz",
SkipNodeUpdate: true,
Service: &structs.NodeService{
Kind: structs.ServiceKindConnectProxy,
ID: "web-proxy",
Service: "web-proxy",
Port: 8443,
Address: "198.18.1.40",
Proxy: structs.ConnectProxyConfig{
DestinationServiceName: "web",
Upstreams: structs.Upstreams{
{
DestinationName: "redis",
LocalBindPort: 123,
},
},
},
},
Checks: structs.HealthChecks{
&structs.HealthCheck{
Node: "baz",
CheckID: "baz:web-proxy",
Name: "web proxy listening",
Status: api.HealthCritical,
ServiceID: "web-proxy",
ServiceName: "web-proxy",
},
},
},
"Node zip": {
Datacenter: "dc1",
Node: "zip",
Address: "127.0.0.5",
Checks: structs.HealthChecks{
&structs.HealthCheck{
Node: "zip",
CheckID: "zip:alive",
Name: "zip-liveness",
Status: api.HealthPassing,
},
},
},
"Service redis on zip": {
Datacenter: "dc1",
Node: "zip",
SkipNodeUpdate: true,
Service: &structs.NodeService{
Kind: structs.ServiceKindTypical,
ID: "redis",
Service: "redis",
Port: 6379,
Address: "198.18.1.60",
},
Checks: structs.HealthChecks{
&structs.HealthCheck{
Node: "zip",
CheckID: "zip:redis",
Name: "redis-liveness",
Status: api.HealthPassing,
ServiceID: "redis",
ServiceName: "redis",
},
},
},
"Service redis-proxy on zip": {
Datacenter: "dc1",
Node: "zip",
SkipNodeUpdate: true,
Service: &structs.NodeService{
Kind: structs.ServiceKindConnectProxy,
ID: "redis-proxy",
Service: "redis-proxy",
Port: 8443,
Address: "198.18.1.60",
Proxy: structs.ConnectProxyConfig{
DestinationServiceName: "redis",
},
},
Checks: structs.HealthChecks{
&structs.HealthCheck{
Node: "zip",
CheckID: "zip:redis-proxy",
Name: "redis proxy listening",
Status: api.HealthCritical,
ServiceID: "redis-proxy",
ServiceName: "redis-proxy",
},
},
},
}
for _, args := range registrations {
var out struct{}
require.NoError(t, a.RPC("Catalog.Register", args, &out))
}
}
t.Run("api", func(t *testing.T) {
// Request topology for api
req, _ := http.NewRequest("GET", "/v1/internal/ui/service-topology/api", nil)
resp := httptest.NewRecorder()
obj, err := a.srv.UIServiceTopology(resp, req)
assert.Nil(t, err)
assertIndex(t, resp)
expect := ServiceTopology{
Upstreams: []*ServiceSummary{
{
Name: "web",
Datacenter: "dc1",
Nodes: []string{"bar", "baz"},
InstanceCount: 2,
ChecksPassing: 3,
ChecksWarning: 1,
ChecksCritical: 2,
},
},
FilteredByACLs: false,
}
result := obj.(ServiceTopology)
// Internal accounting that is not returned in JSON response
for _, u := range result.Upstreams {
u.externalSourceSet = nil
u.checks = nil
}
require.Equal(t, expect, result)
})
t.Run("web", func(t *testing.T) {
// Request topology for web
req, _ := http.NewRequest("GET", "/v1/internal/ui/service-topology/web", nil)
resp := httptest.NewRecorder()
obj, err := a.srv.UIServiceTopology(resp, req)
assert.Nil(t, err)
assertIndex(t, resp)
expect := ServiceTopology{
Upstreams: []*ServiceSummary{
{
Name: "redis",
Datacenter: "dc1",
Nodes: []string{"zip"},
InstanceCount: 1,
ChecksPassing: 2,
ChecksCritical: 1,
},
},
Downstreams: []*ServiceSummary{
{
Name: "api",
Datacenter: "dc1",
Nodes: []string{"foo"},
InstanceCount: 1,
ChecksPassing: 3,
},
},
FilteredByACLs: false,
}
result := obj.(ServiceTopology)
// Internal accounting that is not returned in JSON response
for _, u := range result.Upstreams {
u.externalSourceSet = nil
u.checks = nil
}
for _, d := range result.Downstreams {
d.externalSourceSet = nil
d.checks = nil
}
require.Equal(t, expect, result)
})
t.Run("redis", func(t *testing.T) {
// Request topology for redis
req, _ := http.NewRequest("GET", "/v1/internal/ui/service-topology/redis", nil)
resp := httptest.NewRecorder()
obj, err := a.srv.UIServiceTopology(resp, req)
assert.Nil(t, err)
assertIndex(t, resp)
expect := ServiceTopology{
Downstreams: []*ServiceSummary{
{
Name: "web",
Datacenter: "dc1",
Nodes: []string{"bar", "baz"},
InstanceCount: 2,
ChecksPassing: 3,
ChecksWarning: 1,
ChecksCritical: 2,
},
},
FilteredByACLs: false,
}
result := obj.(ServiceTopology)
// Internal accounting that is not returned in JSON response
for _, d := range result.Downstreams {
d.externalSourceSet = nil
d.checks = nil
}
require.Equal(t, expect, result)
})
}