Add method for downstreams from disco chain

This commit is contained in:
freddygv 2020-09-22 16:05:09 -06:00
parent df893e14c1
commit 3653045cb0
3 changed files with 379 additions and 29 deletions

View File

@ -1,13 +1,11 @@
package consul package consul
import ( import (
"errors"
"fmt" "fmt"
"time" "time"
metrics "github.com/armon/go-metrics" metrics "github.com/armon/go-metrics"
"github.com/hashicorp/consul/acl" "github.com/hashicorp/consul/acl"
"github.com/hashicorp/consul/agent/connect"
"github.com/hashicorp/consul/agent/consul/discoverychain" "github.com/hashicorp/consul/agent/consul/discoverychain"
"github.com/hashicorp/consul/agent/consul/state" "github.com/hashicorp/consul/agent/consul/state"
"github.com/hashicorp/consul/agent/structs" "github.com/hashicorp/consul/agent/structs"
@ -53,39 +51,16 @@ func (c *DiscoveryChain) Get(args *structs.DiscoveryChainRequest, reply *structs
&args.QueryOptions, &args.QueryOptions,
&reply.QueryMeta, &reply.QueryMeta,
func(ws memdb.WatchSet, state *state.Store) error { func(ws memdb.WatchSet, state *state.Store) error {
index, entries, err := state.ReadDiscoveryChainConfigEntries(ws, args.Name, entMeta) req := discoverychain.CompileRequest{
if err != nil {
return err
}
_, config, err := state.CAConfig(ws)
if err != nil {
return err
} else if config == nil {
return errors.New("no cluster ca config setup")
}
// Build TrustDomain based on the ClusterID stored.
signingID := connect.SpiffeIDSigningForCluster(config)
if signingID == nil {
// If CA is bootstrapped at all then this should never happen but be
// defensive.
return errors.New("no cluster trust domain setup")
}
currentTrustDomain := signingID.Host()
// Then we compile it into something useful.
chain, err := discoverychain.Compile(discoverychain.CompileRequest{
ServiceName: args.Name, ServiceName: args.Name,
EvaluateInNamespace: entMeta.NamespaceOrDefault(), EvaluateInNamespace: entMeta.NamespaceOrDefault(),
EvaluateInDatacenter: evalDC, EvaluateInDatacenter: evalDC,
EvaluateInTrustDomain: currentTrustDomain,
UseInDatacenter: c.srv.config.Datacenter, UseInDatacenter: c.srv.config.Datacenter,
OverrideMeshGateway: args.OverrideMeshGateway, OverrideMeshGateway: args.OverrideMeshGateway,
OverrideProtocol: args.OverrideProtocol, OverrideProtocol: args.OverrideProtocol,
OverrideConnectTimeout: args.OverrideConnectTimeout, OverrideConnectTimeout: args.OverrideConnectTimeout,
Entries: entries, }
}) index, chain, err := state.ServiceDiscoveryChain(ws, args.Name, entMeta, req)
if err != nil { if err != nil {
return err return err
} }

View File

@ -1,8 +1,10 @@
package state package state
import ( import (
"errors"
"fmt" "fmt"
"github.com/hashicorp/consul/agent/connect"
"github.com/hashicorp/consul/agent/consul/discoverychain" "github.com/hashicorp/consul/agent/consul/discoverychain"
"github.com/hashicorp/consul/agent/structs" "github.com/hashicorp/consul/agent/structs"
"github.com/hashicorp/consul/lib" "github.com/hashicorp/consul/lib"
@ -371,6 +373,72 @@ var serviceGraphKinds = []string{
structs.ServiceResolver, structs.ServiceResolver,
} }
// sourcesForTarget will return a list of services whose discovery chains have the input service as a target
func (s *Store) sourcesForTarget(ws memdb.WatchSet, tx ReadTxn, dc, service string, entMeta *structs.EnterpriseMeta) (uint64, []structs.ServiceName, error) {
destination := structs.NewServiceName(service, entMeta)
queue := []structs.ServiceName{destination}
seenLink := make(map[structs.ServiceName]bool)
for len(queue) > 0 {
// The "link" index returns config entries that reference a service
iter, err := tx.Get(configTableName, "link", queue[0].ToServiceID())
if err != nil {
return 0, nil, err
}
ws.Add(iter.WatchCh())
for raw := iter.Next(); raw != nil; raw = iter.Next() {
entry := raw.(structs.ConfigEntry)
sn := structs.NewServiceName(entry.GetName(), entry.GetEnterpriseMeta())
if !seenLink[sn] {
seenLink[sn] = true
queue = append(queue, sn)
}
}
queue = queue[1:]
}
var (
maxIdx uint64
resp []structs.ServiceName
)
// Only return the services that directly target the destination
seenSource := make(map[structs.ServiceName]bool)
for sn, _ := range seenLink {
req := discoverychain.CompileRequest{
ServiceName: sn.Name,
EvaluateInNamespace: sn.NamespaceOrDefault(),
// TODO(freddy) : Should these be anything other than the known DC?
EvaluateInDatacenter: dc,
UseInDatacenter: dc,
}
idx, chain, err := s.ServiceDiscoveryChain(ws, sn.Name, entMeta, req)
if err != nil {
return 0, nil, fmt.Errorf("failed to fetch discovery chain for %q: %v", sn.String(), err)
}
for _, t := range chain.Targets {
em := structs.EnterpriseMetaInitializer(t.Namespace)
candidate := structs.NewServiceName(t.Service, &em)
if !candidate.Matches(&destination) {
continue
}
if idx > maxIdx {
maxIdx = idx
}
if !seenSource[sn] {
seenSource[sn] = true
resp = append(resp, sn)
}
}
}
return maxIdx, resp, nil
}
func validateProposedConfigEntryInServiceGraph( func validateProposedConfigEntryInServiceGraph(
tx ReadTxn, tx ReadTxn,
kind, name string, kind, name string,
@ -555,6 +623,44 @@ func testCompileDiscoveryChain(
return chain.Protocol, chain.Nodes[chain.StartNode], nil return chain.Protocol, chain.Nodes[chain.StartNode], nil
} }
func (s *Store) ServiceDiscoveryChain(
ws memdb.WatchSet,
serviceName string,
entMeta *structs.EnterpriseMeta,
req discoverychain.CompileRequest,
) (uint64, *structs.CompiledDiscoveryChain, error) {
index, entries, err := s.readDiscoveryChainConfigEntries(ws, serviceName, nil, entMeta)
if err != nil {
return 0, nil, err
}
req.Entries = entries
_, config, err := s.CAConfig(ws)
if err != nil {
return 0, nil, err
} else if config == nil {
return 0, nil, errors.New("no cluster ca config setup")
}
// Build TrustDomain based on the ClusterID stored.
signingID := connect.SpiffeIDSigningForCluster(config)
if signingID == nil {
// If CA is bootstrapped at all then this should never happen but be
// defensive.
return 0, nil, errors.New("no cluster trust domain setup")
}
req.EvaluateInTrustDomain = signingID.Host()
// Then we compile it into something useful.
chain, err := discoverychain.Compile(req)
if err != nil {
return 0, nil, fmt.Errorf("failed to compile discovery chain: %v", err)
}
return index, chain, nil
}
// ReadDiscoveryChainConfigEntries will query for the full discovery chain for // ReadDiscoveryChainConfigEntries will query for the full discovery chain for
// the provided service name. All relevant config entries will be recursively // the provided service name. All relevant config entries will be recursively
// fetched and included in the result. // fetched and included in the result.

View File

@ -1246,7 +1246,7 @@ func TestStore_ReadDiscoveryChainConfigEntries_SubsetSplit(t *testing.T) {
require.NoError(t, s.EnsureConfigEntry(0, entry, nil)) require.NoError(t, s.EnsureConfigEntry(0, entry, nil))
} }
_, entrySet, err := s.ReadDiscoveryChainConfigEntries(nil, "main", nil) _, entrySet, err := s.readDiscoveryChainConfigEntries(nil, "main", nil, nil)
require.NoError(t, err) require.NoError(t, err)
require.Len(t, entrySet.Routers, 0) require.Len(t, entrySet.Routers, 0)
@ -1452,3 +1452,272 @@ func TestStore_ValidateIngressGatewayErrorOnMismatchedProtocols(t *testing.T) {
require.NoError(t, s.DeleteConfigEntry(5, structs.IngressGateway, "gateway", nil)) require.NoError(t, s.DeleteConfigEntry(5, structs.IngressGateway, "gateway", nil))
}) })
} }
func TestSourcesForTarget(t *testing.T) {
defaultMeta := *structs.DefaultEnterpriseMeta()
type expect struct {
idx uint64
ids []structs.ServiceName
}
tt := []struct {
name string
entries []structs.ConfigEntry
expect expect
}{
{
name: "from route match",
entries: []structs.ConfigEntry{
&structs.ProxyConfigEntry{
Kind: structs.ProxyDefaults,
Name: structs.ProxyConfigGlobal,
Config: map[string]interface{}{
"protocol": "http",
},
},
&structs.ServiceRouterConfigEntry{
Kind: structs.ServiceRouter,
Name: "web",
Routes: []structs.ServiceRoute{
{
Match: &structs.ServiceRouteMatch{
HTTP: &structs.ServiceRouteHTTPMatch{
PathExact: "/sink",
},
},
Destination: &structs.ServiceRouteDestination{
Service: "sink",
},
},
},
},
},
expect: expect{
idx: 2,
ids: []structs.ServiceName{
{Name: "web", EnterpriseMeta: defaultMeta},
},
},
},
{
name: "from redirect",
entries: []structs.ConfigEntry{
&structs.ProxyConfigEntry{
Kind: structs.ProxyDefaults,
Name: structs.ProxyConfigGlobal,
Config: map[string]interface{}{
"protocol": "http",
},
},
&structs.ServiceResolverConfigEntry{
Kind: structs.ServiceResolver,
Name: "web",
Redirect: &structs.ServiceResolverRedirect{
Service: "sink",
},
},
},
expect: expect{
idx: 2,
ids: []structs.ServiceName{
{Name: "web", EnterpriseMeta: defaultMeta},
},
},
},
{
name: "from failover",
entries: []structs.ConfigEntry{
&structs.ProxyConfigEntry{
Kind: structs.ProxyDefaults,
Name: structs.ProxyConfigGlobal,
Config: map[string]interface{}{
"protocol": "http",
},
},
&structs.ServiceResolverConfigEntry{
Kind: structs.ServiceResolver,
Name: "web",
Failover: map[string]structs.ServiceResolverFailover{
"*": {
Service: "sink",
Datacenters: []string{"dc2", "dc3"},
},
},
},
},
expect: expect{
idx: 2,
ids: []structs.ServiceName{
{Name: "web", EnterpriseMeta: defaultMeta},
},
},
},
{
name: "from splitter",
entries: []structs.ConfigEntry{
&structs.ProxyConfigEntry{
Kind: structs.ProxyDefaults,
Name: structs.ProxyConfigGlobal,
Config: map[string]interface{}{
"protocol": "http",
},
},
&structs.ServiceSplitterConfigEntry{
Kind: structs.ServiceSplitter,
Name: "web",
Splits: []structs.ServiceSplit{
{Weight: 90, Service: "web"},
{Weight: 10, Service: "sink"},
},
},
},
expect: expect{
idx: 2,
ids: []structs.ServiceName{
{Name: "web", EnterpriseMeta: defaultMeta},
},
},
},
{
name: "chained route redirect",
entries: []structs.ConfigEntry{
&structs.ProxyConfigEntry{
Kind: structs.ProxyDefaults,
Name: structs.ProxyConfigGlobal,
Config: map[string]interface{}{
"protocol": "http",
},
},
&structs.ServiceRouterConfigEntry{
Kind: structs.ServiceRouter,
Name: "source",
Routes: []structs.ServiceRoute{
{
Match: &structs.ServiceRouteMatch{
HTTP: &structs.ServiceRouteHTTPMatch{
PathExact: "/route",
},
},
Destination: &structs.ServiceRouteDestination{
Service: "routed",
},
},
},
},
&structs.ServiceResolverConfigEntry{
Kind: structs.ServiceResolver,
Name: "routed",
Redirect: &structs.ServiceResolverRedirect{
Service: "sink",
},
},
},
expect: expect{
idx: 3,
ids: []structs.ServiceName{
{Name: "source", EnterpriseMeta: defaultMeta},
{Name: "routed", EnterpriseMeta: defaultMeta},
},
},
},
{
name: "kitchen sink with multiple services referencing sink directly",
entries: []structs.ConfigEntry{
&structs.ProxyConfigEntry{
Kind: structs.ProxyDefaults,
Name: structs.ProxyConfigGlobal,
Config: map[string]interface{}{
"protocol": "http",
},
},
&structs.ServiceRouterConfigEntry{
Kind: structs.ServiceRouter,
Name: "routed",
Routes: []structs.ServiceRoute{
{
Match: &structs.ServiceRouteMatch{
HTTP: &structs.ServiceRouteHTTPMatch{
PathExact: "/sink",
},
},
Destination: &structs.ServiceRouteDestination{
Service: "sink",
},
},
},
},
&structs.ServiceResolverConfigEntry{
Kind: structs.ServiceResolver,
Name: "redirected",
Redirect: &structs.ServiceResolverRedirect{
Service: "sink",
},
},
&structs.ServiceResolverConfigEntry{
Kind: structs.ServiceResolver,
Name: "failed-over",
Failover: map[string]structs.ServiceResolverFailover{
"*": {
Service: "sink",
Datacenters: []string{"dc2", "dc3"},
},
},
},
&structs.ServiceSplitterConfigEntry{
Kind: structs.ServiceSplitter,
Name: "split",
Splits: []structs.ServiceSplit{
{Weight: 90, Service: "no-op"},
{Weight: 10, Service: "sink"},
},
},
&structs.ServiceSplitterConfigEntry{
Kind: structs.ServiceSplitter,
Name: "unrelated",
Splits: []structs.ServiceSplit{
{Weight: 90, Service: "zip"},
{Weight: 10, Service: "zop"},
},
},
},
expect: expect{
idx: 6,
ids: []structs.ServiceName{
{Name: "split", EnterpriseMeta: defaultMeta},
{Name: "failed-over", EnterpriseMeta: defaultMeta},
{Name: "redirected", EnterpriseMeta: defaultMeta},
{Name: "routed", EnterpriseMeta: defaultMeta},
},
},
},
}
for _, tc := range tt {
t.Run(tc.name, func(t *testing.T) {
s := testStateStore(t)
ws := memdb.NewWatchSet()
ca := &structs.CAConfiguration{
Provider: "consul",
}
err := s.CASetConfig(0, ca)
require.NoError(t, err)
var i uint64 = 1
for _, entry := range tc.entries {
require.NoError(t, entry.Normalize())
require.NoError(t, s.EnsureConfigEntry(i, entry, nil))
i++
}
tx := s.db.ReadTxn()
defer tx.Abort()
idx, ids, err := s.sourcesForTarget(ws, tx, "dc1", "sink", nil)
require.NoError(t, err)
require.Equal(t, tc.expect.idx, idx)
require.ElementsMatch(t, tc.expect.ids, ids)
})
}
}