Update proxycfg for transparent proxy

This commit is contained in:
freddygv 2021-03-17 13:40:39 -06:00
parent d19a5830dd
commit 3c97e5a777
7 changed files with 423 additions and 36 deletions

View File

@ -3713,6 +3713,8 @@ func (a *Agent) registerCache() {
a.cache.RegisterType(cachetype.IntentionMatchName, &cachetype.IntentionMatch{RPC: a})
a.cache.RegisterType(cachetype.IntentionUpstreamsName, &cachetype.IntentionUpstreams{RPC: a})
a.cache.RegisterType(cachetype.CatalogServicesName, &cachetype.CatalogServices{RPC: a})
a.cache.RegisterType(cachetype.HealthServicesName, &cachetype.HealthServices{RPC: a})

View File

@ -1,6 +1,7 @@
package proxycfg
import (
"context"
"path"
"testing"
"time"
@ -105,6 +106,11 @@ func TestManager_BasicLifecycle(t *testing.T) {
},
)
}
upstreams := structs.TestUpstreams(t)
for i := range upstreams {
upstreams[i].DestinationNamespace = structs.IntentionDefaultNamespace
}
webProxy := &structs.NodeService{
Kind: structs.ServiceKindConnectProxy,
ID: "web-sidecar-proxy",
@ -119,7 +125,7 @@ func TestManager_BasicLifecycle(t *testing.T) {
Config: map[string]interface{}{
"foo": "bar",
},
Upstreams: structs.TestUpstreams(t),
Upstreams: upstreams,
},
}
@ -212,6 +218,7 @@ func TestManager_BasicLifecycle(t *testing.T) {
DiscoveryChain: map[string]*structs.CompiledDiscoveryChain{
"db": dbDefaultChain(),
},
WatchedDiscoveryChains: map[string]context.CancelFunc{},
WatchedUpstreams: nil, // Clone() clears this out
WatchedUpstreamEndpoints: map[string]map[string]structs.CheckServiceNodes{
"db": {
@ -222,6 +229,10 @@ func TestManager_BasicLifecycle(t *testing.T) {
WatchedGatewayEndpoints: map[string]map[string]structs.CheckServiceNodes{
"db": {},
},
UpstreamConfig: map[string]*structs.Upstream{
upstreams[0].Identifier(): &upstreams[0],
upstreams[1].Identifier(): &upstreams[1],
},
},
PreparedQueryEndpoints: map[string]structs.CheckServiceNodes{},
WatchedServiceChecks: map[structs.ServiceID][]structs.CheckType{},
@ -261,6 +272,7 @@ func TestManager_BasicLifecycle(t *testing.T) {
DiscoveryChain: map[string]*structs.CompiledDiscoveryChain{
"db": dbSplitChain(),
},
WatchedDiscoveryChains: map[string]context.CancelFunc{},
WatchedUpstreams: nil, // Clone() clears this out
WatchedUpstreamEndpoints: map[string]map[string]structs.CheckServiceNodes{
"db": {
@ -272,6 +284,10 @@ func TestManager_BasicLifecycle(t *testing.T) {
WatchedGatewayEndpoints: map[string]map[string]structs.CheckServiceNodes{
"db": {},
},
UpstreamConfig: map[string]*structs.Upstream{
upstreams[0].Identifier(): &upstreams[0],
upstreams[1].Identifier(): &upstreams[1],
},
},
PreparedQueryEndpoints: map[string]structs.CheckServiceNodes{},
WatchedServiceChecks: map[structs.ServiceID][]structs.CheckType{},

View File

@ -18,6 +18,13 @@ type ConfigSnapshotUpstreams struct {
// targeted by this upstream. We then instantiate watches for those targets.
DiscoveryChain map[string]*structs.CompiledDiscoveryChain
// WatchedDiscoveryChains is a map of upstream.Identifier() -> CancelFunc's
// in order to cancel any watches when the proxy's configuration is
// changed. Ingress gateways and transparent proxies need this because
// discovery chain watches are added and removed through the lifecycle
// of a single proxycfg state instance.
WatchedDiscoveryChains map[string]context.CancelFunc
// WatchedUpstreams is a map of upstream.Identifier() -> (map of TargetID ->
// CancelFunc's) in order to cancel any watches when the configuration is
// changed.
@ -36,6 +43,9 @@ type ConfigSnapshotUpstreams struct {
// TargetID -> CheckServiceNodes) and is used to determine the backing
// endpoints of a mesh gateway.
WatchedGatewayEndpoints map[string]map[string]structs.CheckServiceNodes
// UpstreamConfig is a map to an upstream's configuration.
UpstreamConfig map[string]*structs.Upstream
}
type configSnapshotConnectProxy struct {
@ -58,12 +68,14 @@ func (c *configSnapshotConnectProxy) IsEmpty() bool {
return c.Leaf == nil &&
!c.IntentionsSet &&
len(c.DiscoveryChain) == 0 &&
len(c.WatchedDiscoveryChains) == 0 &&
len(c.WatchedUpstreams) == 0 &&
len(c.WatchedUpstreamEndpoints) == 0 &&
len(c.WatchedGateways) == 0 &&
len(c.WatchedGatewayEndpoints) == 0 &&
len(c.WatchedServiceChecks) == 0 &&
len(c.PreparedQueryEndpoints) == 0
len(c.PreparedQueryEndpoints) == 0 &&
len(c.UpstreamConfig) == 0
}
type configSnapshotTerminatingGateway struct {
@ -287,12 +299,6 @@ type configSnapshotIngressGateway struct {
// to. This is constructed from the ingress-gateway config entry, and uses
// the GatewayServices RPC to retrieve them.
Upstreams map[IngressListenerKey]structs.Upstreams
// WatchedDiscoveryChains is a map of upstream.Identifier() -> CancelFunc's
// in order to cancel any watches when the ingress gateway configuration is
// changed. Ingress gateways need this because discovery chain watches are
// added and removed through the lifecycle of single proxycfg.state instance.
WatchedDiscoveryChains map[string]context.CancelFunc
}
func (c *configSnapshotIngressGateway) IsEmpty() bool {
@ -301,7 +307,6 @@ func (c *configSnapshotIngressGateway) IsEmpty() bool {
}
return len(c.Upstreams) == 0 &&
len(c.DiscoveryChain) == 0 &&
len(c.WatchedDiscoveryChains) == 0 &&
len(c.WatchedUpstreams) == 0 &&
len(c.WatchedUpstreamEndpoints) == 0
}

View File

@ -40,6 +40,7 @@ const (
serviceConfigIDPrefix = "service-config:"
serviceResolverIDPrefix = "service-resolver:"
serviceIntentionsIDPrefix = "service-intentions:"
intentionUpstreamsID = "intention-upstreams"
svcChecksWatchIDPrefix = cachetype.ServiceHTTPChecksName + ":"
serviceIDPrefix = string(structs.UpstreamDestTypeService) + ":"
preparedQueryIDPrefix = string(structs.UpstreamDestTypePreparedQuery) + ":"
@ -175,13 +176,14 @@ func newState(ns *structs.NodeService, token string) (*state, error) {
func (s *state) Watch() (<-chan ConfigSnapshot, error) {
s.ctx, s.cancel = context.WithCancel(context.Background())
err := s.initWatches()
snap := s.initialConfigSnapshot()
err := s.initWatches(&snap)
if err != nil {
s.cancel()
return nil, err
}
go s.run()
go s.run(&snap)
return s.snapCh, nil
}
@ -195,10 +197,10 @@ func (s *state) Close() error {
}
// initWatches sets up the watches needed for the particular service
func (s *state) initWatches() error {
func (s *state) initWatches(snap *ConfigSnapshot) error {
switch s.kind {
case structs.ServiceKindConnectProxy:
return s.initWatchesConnectProxy()
return s.initWatchesConnectProxy(snap)
case structs.ServiceKindTerminatingGateway:
return s.initWatchesTerminatingGateway()
case structs.ServiceKindMeshGateway:
@ -243,7 +245,7 @@ func (s *state) watchConnectProxyService(ctx context.Context, correlationId stri
// initWatchesConnectProxy sets up the watches needed based on current proxy registration
// state.
func (s *state) initWatchesConnectProxy() error {
func (s *state) initWatchesConnectProxy(snap *ConfigSnapshot) error {
// Watch for root changes
err := s.cache.Notify(s.ctx, cachetype.ConnectCARootName, &structs.DCSpecificRequest{
Datacenter: s.source.Datacenter,
@ -295,12 +297,36 @@ func (s *state) initWatchesConnectProxy() error {
// default the namespace to the namespace of this proxy service
currentNamespace := s.proxyID.NamespaceOrDefault()
if s.proxyCfg.TransparentProxy {
// When in transparent proxy we will infer upstreams from intentions with this source
err := s.cache.Notify(s.ctx, cachetype.IntentionUpstreamsName, &structs.ServiceSpecificRequest{
Datacenter: s.source.Datacenter,
QueryOptions: structs.QueryOptions{Token: s.token},
ServiceName: s.proxyCfg.DestinationServiceName,
EnterpriseMeta: structs.NewEnterpriseMeta(s.proxyID.NamespaceOrEmpty()),
}, intentionUpstreamsID, s.ch)
if err != nil {
return err
}
}
// Watch for updates to service endpoints for all upstreams
for _, u := range s.proxyCfg.Upstreams {
// This can be true if the upstream is a synthetic entry populated from centralized upstream config.
// Watches should not be created for them.
if u.CentrallyConfigured {
continue
}
snap.ConnectProxy.UpstreamConfig[u.Identifier()] = &u
dc := s.source.Datacenter
if u.Datacenter != "" {
dc = u.Datacenter
}
if s.proxyCfg.TransparentProxy && (dc == "" || dc == s.source.Datacenter) {
// In TransparentProxy mode, watches for upstreams in the local DC are handled by the IntentionUpstreams watch.
continue
}
ns := currentNamespace
if u.DestinationNamespace != "" {
@ -541,12 +567,14 @@ func (s *state) initialConfigSnapshot() ConfigSnapshot {
switch s.kind {
case structs.ServiceKindConnectProxy:
snap.ConnectProxy.DiscoveryChain = make(map[string]*structs.CompiledDiscoveryChain)
snap.ConnectProxy.WatchedDiscoveryChains = make(map[string]context.CancelFunc)
snap.ConnectProxy.WatchedUpstreams = make(map[string]map[string]context.CancelFunc)
snap.ConnectProxy.WatchedUpstreamEndpoints = make(map[string]map[string]structs.CheckServiceNodes)
snap.ConnectProxy.WatchedGateways = make(map[string]map[string]context.CancelFunc)
snap.ConnectProxy.WatchedGatewayEndpoints = make(map[string]map[string]structs.CheckServiceNodes)
snap.ConnectProxy.WatchedServiceChecks = make(map[structs.ServiceID][]structs.CheckType)
snap.ConnectProxy.PreparedQueryEndpoints = make(map[string]structs.CheckServiceNodes)
snap.ConnectProxy.UpstreamConfig = make(map[string]*structs.Upstream)
case structs.ServiceKindTerminatingGateway:
snap.TerminatingGateway.WatchedServices = make(map[structs.ServiceName]context.CancelFunc)
snap.TerminatingGateway.WatchedIntentions = make(map[structs.ServiceName]context.CancelFunc)
@ -582,15 +610,13 @@ func (s *state) initialConfigSnapshot() ConfigSnapshot {
return snap
}
func (s *state) run() {
func (s *state) run(snap *ConfigSnapshot) {
// Close the channel we return from Watch when we stop so consumers can stop
// watching and clean up their goroutines. It's important we do this here and
// not in Close since this routine sends on this chan and so might panic if it
// gets closed from another goroutine.
defer close(s.snapCh)
snap := s.initialConfigSnapshot()
// This turns out to be really fiddly/painful by just using time.Timer.C
// directly in the code below since you can't detect when a timer is stopped
// vs waiting in order to know to reset it. So just use a chan to send
@ -605,7 +631,7 @@ func (s *state) run() {
case u := <-s.ch:
s.logger.Trace("A blocking query returned; handling snapshot update")
if err := s.handleUpdate(u, &snap); err != nil {
if err := s.handleUpdate(u, snap); err != nil {
s.logger.Error("Failed to handle update from watch",
"id", u.CorrelationID, "error", err,
)
@ -734,6 +760,68 @@ func (s *state) handleUpdateConnectProxy(u cache.UpdateEvent, snap *ConfigSnapsh
}
snap.ConnectProxy.IntentionsSet = true
case u.CorrelationID == intentionUpstreamsID:
resp, ok := u.Result.(*structs.IndexedServiceList)
if !ok {
return fmt.Errorf("invalid type for response %T", u.Result)
}
seenServices := make(map[string]struct{})
for _, svc := range resp.Services {
seenServices[svc.String()] = struct{}{}
cfgMap := make(map[string]interface{})
u, ok := snap.ConnectProxy.UpstreamConfig[svc.String()]
if ok {
cfgMap = u.Config
}
cfg, err := parseReducedUpstreamConfig(cfgMap)
if err != nil {
// Don't hard fail on a config typo, just warn. We'll fall back on
// the plain discovery chain if there is an error so it's safe to
// continue.
s.logger.Warn("failed to parse upstream config",
"upstream", u.Identifier(),
"error", err,
)
}
err = s.watchDiscoveryChain(snap, cfg, svc.String(), svc.Name, svc.NamespaceOrDefault())
if err != nil {
return fmt.Errorf("failed to watch discovery chain for %s: %v", svc.String(), err)
}
}
// Clean up data from services that were not in the update
for sn := range snap.ConnectProxy.WatchedUpstreams {
if _, ok := seenServices[sn]; !ok {
delete(snap.ConnectProxy.WatchedUpstreams, sn)
}
}
for sn := range snap.ConnectProxy.WatchedUpstreamEndpoints {
if _, ok := seenServices[sn]; !ok {
delete(snap.ConnectProxy.WatchedUpstreamEndpoints, sn)
}
}
for sn := range snap.ConnectProxy.WatchedGateways {
if _, ok := seenServices[sn]; !ok {
delete(snap.ConnectProxy.WatchedGateways, sn)
}
}
for sn := range snap.ConnectProxy.WatchedGatewayEndpoints {
if _, ok := seenServices[sn]; !ok {
delete(snap.ConnectProxy.WatchedGatewayEndpoints, sn)
}
}
for sn, cancelFn := range snap.ConnectProxy.WatchedDiscoveryChains {
if _, ok := seenServices[sn]; !ok {
cancelFn()
delete(snap.ConnectProxy.WatchedDiscoveryChains, sn)
delete(snap.ConnectProxy.DiscoveryChain, sn)
}
}
case strings.HasPrefix(u.CorrelationID, "upstream:"+preparedQueryIDPrefix):
resp, ok := u.Result.(*structs.PreparedQueryExecuteResponse)
if !ok {
@ -1465,9 +1553,9 @@ func (s *state) handleUpdateIngressGateway(u cache.UpdateEvent, snap *ConfigSnap
for _, service := range services.Services {
u := makeUpstream(service)
err := s.watchIngressDiscoveryChain(snap, u)
err := s.watchDiscoveryChain(snap, reducedUpstreamConfig{}, u.Identifier(), u.DestinationName, u.DestinationNamespace)
if err != nil {
return err
return fmt.Errorf("failed to watch discovery chain for %s: %v", u.Identifier(), err)
}
watchedSvcs[u.Identifier()] = struct{}{}
@ -1515,8 +1603,8 @@ func makeUpstream(g *structs.GatewayService) structs.Upstream {
return upstream
}
func (s *state) watchIngressDiscoveryChain(snap *ConfigSnapshot, u structs.Upstream) error {
if _, ok := snap.IngressGateway.WatchedDiscoveryChains[u.Identifier()]; ok {
func (s *state) watchDiscoveryChain(snap *ConfigSnapshot, cfg reducedUpstreamConfig, id, name, namespace string) error {
if _, ok := snap.ConnectProxy.WatchedDiscoveryChains[id]; ok {
return nil
}
@ -1524,16 +1612,26 @@ func (s *state) watchIngressDiscoveryChain(snap *ConfigSnapshot, u structs.Upstr
err := s.cache.Notify(ctx, cachetype.CompiledDiscoveryChainName, &structs.DiscoveryChainRequest{
Datacenter: s.source.Datacenter,
QueryOptions: structs.QueryOptions{Token: s.token},
Name: u.DestinationName,
Name: name,
EvaluateInDatacenter: s.source.Datacenter,
EvaluateInNamespace: u.DestinationNamespace,
}, "discovery-chain:"+u.Identifier(), s.ch)
EvaluateInNamespace: namespace,
OverrideProtocol: cfg.Protocol,
OverrideConnectTimeout: cfg.ConnectTimeout(),
}, "discovery-chain:"+id, s.ch)
if err != nil {
cancel()
return err
}
snap.IngressGateway.WatchedDiscoveryChains[u.Identifier()] = cancel
switch s.kind {
case structs.ServiceKindIngressGateway:
snap.IngressGateway.WatchedDiscoveryChains[id] = cancel
case structs.ServiceKindConnectProxy:
snap.ConnectProxy.WatchedDiscoveryChains[id] = cancel
default:
return fmt.Errorf("unsupported kind %s", s.kind)
}
return nil
}

View File

@ -249,6 +249,17 @@ func genVerifyIntentionWatch(expectedService string, expectedDatacenter string)
}
}
func genVerifyIntentionUpstreamsWatch(expectedService string, expectedDatacenter string) verifyWatchRequest {
return func(t testing.TB, cacheType string, request cache.Request) {
require.Equal(t, cachetype.IntentionUpstreamsName, cacheType)
reqReal, ok := request.(*structs.ServiceSpecificRequest)
require.True(t, ok)
require.Equal(t, expectedDatacenter, reqReal.Datacenter)
require.Equal(t, expectedService, reqReal.ServiceName)
}
}
func genVerifyPreparedQueryWatch(expectedName string, expectedDatacenter string) verifyWatchRequest {
return func(t testing.TB, cacheType string, request cache.Request) {
require.Equal(t, cachetype.PreparedQueryName, cacheType)
@ -1503,6 +1514,247 @@ func TestState_WatchesAndUpdates(t *testing.T) {
},
},
},
"transparent-proxy-initial": {
ns: structs.NodeService{
Kind: structs.ServiceKindConnectProxy,
ID: "api-proxy",
Service: "api-proxy",
Address: "10.0.1.1",
Proxy: structs.ConnectProxyConfig{
DestinationServiceName: "api",
TransparentProxy: true,
},
},
sourceDC: "dc1",
stages: []verificationStage{
{
requiredWatches: map[string]verifyWatchRequest{
rootsWatchID: genVerifyRootsWatch("dc1"),
intentionUpstreamsID: genVerifyServiceSpecificRequest(intentionUpstreamsID,
"api", "", "dc1", false),
leafWatchID: genVerifyLeafWatch("api", "dc1"),
intentionsWatchID: genVerifyIntentionWatch("api", "dc1"),
},
verifySnapshot: func(t testing.TB, snap *ConfigSnapshot) {
require.False(t, snap.Valid(), "proxy without roots/leaf/intentions is not valid")
require.True(t, snap.ConnectProxy.IsEmpty())
require.True(t, snap.MeshGateway.IsEmpty())
require.True(t, snap.IngressGateway.IsEmpty())
require.True(t, snap.TerminatingGateway.IsEmpty())
},
},
{
events: []cache.UpdateEvent{
rootWatchEvent(),
{
CorrelationID: leafWatchID,
Result: issuedCert,
Err: nil,
},
{
CorrelationID: intentionsWatchID,
Result: TestIntentions(),
Err: nil,
},
},
verifySnapshot: func(t testing.TB, snap *ConfigSnapshot) {
require.True(t, snap.Valid(), "proxy with roots/leaf/intentions is valid")
require.Equal(t, indexedRoots, snap.Roots)
require.Equal(t, issuedCert, snap.Leaf())
require.Equal(t, TestIntentions().Matches[0], snap.ConnectProxy.Intentions)
require.True(t, snap.MeshGateway.IsEmpty())
require.True(t, snap.IngressGateway.IsEmpty())
require.True(t, snap.TerminatingGateway.IsEmpty())
},
},
},
},
"transparent-proxy-handle-update": {
ns: structs.NodeService{
Kind: structs.ServiceKindConnectProxy,
ID: "api-proxy",
Service: "api-proxy",
Address: "10.0.1.1",
Proxy: structs.ConnectProxyConfig{
DestinationServiceName: "api",
TransparentProxy: true,
},
},
sourceDC: "dc1",
stages: []verificationStage{
// Empty on initialization
{
requiredWatches: map[string]verifyWatchRequest{
rootsWatchID: genVerifyRootsWatch("dc1"),
intentionUpstreamsID: genVerifyServiceSpecificRequest(intentionUpstreamsID,
"api", "", "dc1", false),
leafWatchID: genVerifyLeafWatch("api", "dc1"),
intentionsWatchID: genVerifyIntentionWatch("api", "dc1"),
},
verifySnapshot: func(t testing.TB, snap *ConfigSnapshot) {
require.False(t, snap.Valid(), "proxy without roots/leaf/intentions is not valid")
require.True(t, snap.ConnectProxy.IsEmpty())
require.True(t, snap.MeshGateway.IsEmpty())
require.True(t, snap.IngressGateway.IsEmpty())
require.True(t, snap.TerminatingGateway.IsEmpty())
},
},
// Valid snapshot after roots, leaf, and intentions
{
events: []cache.UpdateEvent{
rootWatchEvent(),
{
CorrelationID: leafWatchID,
Result: issuedCert,
Err: nil,
},
{
CorrelationID: intentionsWatchID,
Result: TestIntentions(),
Err: nil,
},
},
verifySnapshot: func(t testing.TB, snap *ConfigSnapshot) {
require.True(t, snap.Valid(), "proxy with roots/leaf/intentions is valid")
require.Equal(t, indexedRoots, snap.Roots)
require.Equal(t, issuedCert, snap.Leaf())
require.Equal(t, TestIntentions().Matches[0], snap.ConnectProxy.Intentions)
require.True(t, snap.MeshGateway.IsEmpty())
require.True(t, snap.IngressGateway.IsEmpty())
require.True(t, snap.TerminatingGateway.IsEmpty())
},
},
// Receiving an intention should lead to spinning up a discovery chain watch
{
requiredWatches: map[string]verifyWatchRequest{
rootsWatchID: genVerifyRootsWatch("dc1"),
intentionUpstreamsID: genVerifyServiceSpecificRequest(intentionUpstreamsID,
"api", "", "dc1", false),
leafWatchID: genVerifyLeafWatch("api", "dc1"),
intentionsWatchID: genVerifyIntentionWatch("api", "dc1"),
},
events: []cache.UpdateEvent{
{
CorrelationID: intentionUpstreamsID,
Result: &structs.IndexedServiceList{
Services: structs.ServiceList{
db,
},
},
Err: nil,
},
},
verifySnapshot: func(t testing.TB, snap *ConfigSnapshot) {
require.True(t, snap.Valid(), "should still be valid")
// Should start watch for db's chain
require.Contains(t, snap.ConnectProxy.WatchedDiscoveryChains, dbStr)
// Should not have results yet
require.Empty(t, snap.ConnectProxy.DiscoveryChain)
},
},
// Discovery chain updates should be stored
{
requiredWatches: map[string]verifyWatchRequest{
"discovery-chain:" + dbStr: genVerifyDiscoveryChainWatch(&structs.DiscoveryChainRequest{
Name: dbStr,
EvaluateInDatacenter: "dc1",
EvaluateInNamespace: "default",
Datacenter: "dc1",
}),
},
events: []cache.UpdateEvent{
{
CorrelationID: "discovery-chain:" + dbStr,
Result: &structs.DiscoveryChainResponse{
Chain: discoverychain.TestCompileConfigEntries(t, "db", "default", "dc1", "trustdomain.consul", "dc1", nil),
},
Err: nil,
},
},
verifySnapshot: func(t testing.TB, snap *ConfigSnapshot) {
require.Len(t, snap.ConnectProxy.WatchedUpstreams, 1)
require.Len(t, snap.ConnectProxy.WatchedUpstreams[dbStr], 1)
},
},
{
requiredWatches: map[string]verifyWatchRequest{
"upstream-target:db.default.dc1:db": genVerifyServiceWatch("db", "", "dc1", true),
},
events: []cache.UpdateEvent{
{
CorrelationID: "upstream-target:db.default.dc1:db",
Result: &structs.IndexedCheckServiceNodes{
Nodes: structs.CheckServiceNodes{
{
Node: &structs.Node{
Node: "node1",
Address: "127.0.0.1",
},
Service: &structs.NodeService{
ID: "db1",
Service: "db",
},
},
},
},
Err: nil,
},
},
verifySnapshot: func(t testing.TB, snap *ConfigSnapshot) {
require.Len(t, snap.ConnectProxy.WatchedUpstreamEndpoints, 1)
require.Contains(t, snap.ConnectProxy.WatchedUpstreamEndpoints, dbStr)
require.Len(t, snap.ConnectProxy.WatchedUpstreamEndpoints[dbStr], 1)
require.Contains(t, snap.ConnectProxy.WatchedUpstreamEndpoints[dbStr], "db.default.dc1")
require.Equal(t, snap.ConnectProxy.WatchedUpstreamEndpoints[dbStr]["db.default.dc1"],
structs.CheckServiceNodes{
{
Node: &structs.Node{
Node: "node1",
Address: "127.0.0.1",
},
Service: &structs.NodeService{
ID: "db1",
Service: "db",
},
},
},
)
},
},
// Empty list of upstreams should clean everything up
{
requiredWatches: map[string]verifyWatchRequest{
rootsWatchID: genVerifyRootsWatch("dc1"),
intentionUpstreamsID: genVerifyServiceSpecificRequest(intentionUpstreamsID,
"api", "", "dc1", false),
leafWatchID: genVerifyLeafWatch("api", "dc1"),
intentionsWatchID: genVerifyIntentionWatch("api", "dc1"),
},
events: []cache.UpdateEvent{
{
CorrelationID: intentionUpstreamsID,
Result: &structs.IndexedServiceList{
Services: structs.ServiceList{},
},
Err: nil,
},
},
verifySnapshot: func(t testing.TB, snap *ConfigSnapshot) {
require.True(t, snap.Valid(), "should still be valid")
// Empty intention upstreams leads to cancelling all associated watches
require.Empty(t, snap.ConnectProxy.WatchedDiscoveryChains)
require.Empty(t, snap.ConnectProxy.WatchedUpstreams)
require.Empty(t, snap.ConnectProxy.WatchedUpstreamEndpoints)
require.Empty(t, snap.ConnectProxy.WatchedGateways)
require.Empty(t, snap.ConnectProxy.WatchedGatewayEndpoints)
require.Empty(t, snap.ConnectProxy.DiscoveryChain)
},
},
},
},
"connect-proxy": newConnectProxyCase(structs.MeshGatewayModeDefault),
"connect-proxy-mesh-gateway-local": newConnectProxyCase(structs.MeshGatewayModeLocal),
}
@ -1535,12 +1787,12 @@ func TestState_WatchesAndUpdates(t *testing.T) {
// setup the ctx as initWatches expects this to be there
state.ctx, state.cancel = context.WithCancel(context.Background())
// ensure the initial watch setup did not error
require.NoError(t, state.initWatches())
// get the initial configuration snapshot
snap := state.initialConfigSnapshot()
// ensure the initial watch setup did not error
require.NoError(t, state.initWatches(&snap))
//--------------------------------------------------------------------
//
// All the nested subtests here are to make failures easier to

View File

@ -653,6 +653,8 @@ func TestConfigSnapshot(t testing.T) *ConfigSnapshot {
t, "db", "default", "dc1",
connect.TestClusterID+".consul", "dc1", nil)
upstreams := structs.TestUpstreams(t)
return &ConfigSnapshot{
Kind: structs.ServiceKindConnectProxy,
Service: "web-sidecar-proxy",
@ -667,12 +669,13 @@ func TestConfigSnapshot(t testing.T) *ConfigSnapshot {
Config: map[string]interface{}{
"foo": "bar",
},
Upstreams: structs.TestUpstreams(t),
Upstreams: upstreams,
},
Roots: roots,
ConnectProxy: configSnapshotConnectProxy{
ConfigSnapshotUpstreams: ConfigSnapshotUpstreams{
Leaf: leaf,
UpstreamConfig: upstreams.ToMap(),
DiscoveryChain: map[string]*structs.CompiledDiscoveryChain{
"db": dbChain,
},
@ -1315,6 +1318,7 @@ func setupTestVariationConfigEntriesAndSnapshot(
dbChain := discoverychain.TestCompileConfigEntries(t, "db", "default", "dc1", connect.TestClusterID+".consul", "dc1", compileSetup, entries...)
upstreams := structs.TestUpstreams(t)
snap := ConfigSnapshotUpstreams{
Leaf: leaf,
DiscoveryChain: map[string]*structs.CompiledDiscoveryChain{
@ -1325,6 +1329,7 @@ func setupTestVariationConfigEntriesAndSnapshot(
"db.default.dc1": TestUpstreamNodes(t),
},
},
UpstreamConfig: upstreams.ToMap(),
}
switch variation {

View File

@ -215,6 +215,15 @@ func (us Upstreams) ToAPI() []api.Upstream {
return a
}
func (us Upstreams) ToMap() map[string]*Upstream {
upstreamMap := make(map[string]*Upstream)
for i := range us {
upstreamMap[us[i].Identifier()] = &us[i]
}
return upstreamMap
}
// UpstreamsFromAPI is a helper for converting api.Upstream to Upstream.
func UpstreamsFromAPI(us []api.Upstream) Upstreams {
a := make([]Upstream, len(us))