diff --git a/agent/agent.go b/agent/agent.go index 43ec86877..8c3b30d13 100644 --- a/agent/agent.go +++ b/agent/agent.go @@ -3713,6 +3713,8 @@ func (a *Agent) registerCache() { a.cache.RegisterType(cachetype.IntentionMatchName, &cachetype.IntentionMatch{RPC: a}) + a.cache.RegisterType(cachetype.IntentionUpstreamsName, &cachetype.IntentionUpstreams{RPC: a}) + a.cache.RegisterType(cachetype.CatalogServicesName, &cachetype.CatalogServices{RPC: a}) a.cache.RegisterType(cachetype.HealthServicesName, &cachetype.HealthServices{RPC: a}) diff --git a/agent/proxycfg/manager_test.go b/agent/proxycfg/manager_test.go index 01b896d29..b51100fc1 100644 --- a/agent/proxycfg/manager_test.go +++ b/agent/proxycfg/manager_test.go @@ -1,6 +1,7 @@ package proxycfg import ( + "context" "path" "testing" "time" @@ -105,6 +106,11 @@ func TestManager_BasicLifecycle(t *testing.T) { }, ) } + + upstreams := structs.TestUpstreams(t) + for i := range upstreams { + upstreams[i].DestinationNamespace = structs.IntentionDefaultNamespace + } webProxy := &structs.NodeService{ Kind: structs.ServiceKindConnectProxy, ID: "web-sidecar-proxy", @@ -119,7 +125,7 @@ func TestManager_BasicLifecycle(t *testing.T) { Config: map[string]interface{}{ "foo": "bar", }, - Upstreams: structs.TestUpstreams(t), + Upstreams: upstreams, }, } @@ -212,7 +218,8 @@ func TestManager_BasicLifecycle(t *testing.T) { DiscoveryChain: map[string]*structs.CompiledDiscoveryChain{ "db": dbDefaultChain(), }, - WatchedUpstreams: nil, // Clone() clears this out + WatchedDiscoveryChains: map[string]context.CancelFunc{}, + WatchedUpstreams: nil, // Clone() clears this out WatchedUpstreamEndpoints: map[string]map[string]structs.CheckServiceNodes{ "db": { "db.default.dc1": TestUpstreamNodes(t), @@ -222,6 +229,10 @@ func TestManager_BasicLifecycle(t *testing.T) { WatchedGatewayEndpoints: map[string]map[string]structs.CheckServiceNodes{ "db": {}, }, + UpstreamConfig: map[string]*structs.Upstream{ + upstreams[0].Identifier(): &upstreams[0], + upstreams[1].Identifier(): &upstreams[1], + }, }, PreparedQueryEndpoints: map[string]structs.CheckServiceNodes{}, WatchedServiceChecks: map[structs.ServiceID][]structs.CheckType{}, @@ -261,7 +272,8 @@ func TestManager_BasicLifecycle(t *testing.T) { DiscoveryChain: map[string]*structs.CompiledDiscoveryChain{ "db": dbSplitChain(), }, - WatchedUpstreams: nil, // Clone() clears this out + WatchedDiscoveryChains: map[string]context.CancelFunc{}, + WatchedUpstreams: nil, // Clone() clears this out WatchedUpstreamEndpoints: map[string]map[string]structs.CheckServiceNodes{ "db": { "v1.db.default.dc1": TestUpstreamNodes(t), @@ -272,6 +284,10 @@ func TestManager_BasicLifecycle(t *testing.T) { WatchedGatewayEndpoints: map[string]map[string]structs.CheckServiceNodes{ "db": {}, }, + UpstreamConfig: map[string]*structs.Upstream{ + upstreams[0].Identifier(): &upstreams[0], + upstreams[1].Identifier(): &upstreams[1], + }, }, PreparedQueryEndpoints: map[string]structs.CheckServiceNodes{}, WatchedServiceChecks: map[structs.ServiceID][]structs.CheckType{}, diff --git a/agent/proxycfg/snapshot.go b/agent/proxycfg/snapshot.go index 08cb91da5..2974dcd20 100644 --- a/agent/proxycfg/snapshot.go +++ b/agent/proxycfg/snapshot.go @@ -18,6 +18,13 @@ type ConfigSnapshotUpstreams struct { // targeted by this upstream. We then instantiate watches for those targets. DiscoveryChain map[string]*structs.CompiledDiscoveryChain + // WatchedDiscoveryChains is a map of upstream.Identifier() -> CancelFunc's + // in order to cancel any watches when the proxy's configuration is + // changed. Ingress gateways and transparent proxies need this because + // discovery chain watches are added and removed through the lifecycle + // of a single proxycfg state instance. + WatchedDiscoveryChains map[string]context.CancelFunc + // WatchedUpstreams is a map of upstream.Identifier() -> (map of TargetID -> // CancelFunc's) in order to cancel any watches when the configuration is // changed. @@ -36,6 +43,9 @@ type ConfigSnapshotUpstreams struct { // TargetID -> CheckServiceNodes) and is used to determine the backing // endpoints of a mesh gateway. WatchedGatewayEndpoints map[string]map[string]structs.CheckServiceNodes + + // UpstreamConfig is a map to an upstream's configuration. + UpstreamConfig map[string]*structs.Upstream } type configSnapshotConnectProxy struct { @@ -58,12 +68,14 @@ func (c *configSnapshotConnectProxy) IsEmpty() bool { return c.Leaf == nil && !c.IntentionsSet && len(c.DiscoveryChain) == 0 && + len(c.WatchedDiscoveryChains) == 0 && len(c.WatchedUpstreams) == 0 && len(c.WatchedUpstreamEndpoints) == 0 && len(c.WatchedGateways) == 0 && len(c.WatchedGatewayEndpoints) == 0 && len(c.WatchedServiceChecks) == 0 && - len(c.PreparedQueryEndpoints) == 0 + len(c.PreparedQueryEndpoints) == 0 && + len(c.UpstreamConfig) == 0 } type configSnapshotTerminatingGateway struct { @@ -287,12 +299,6 @@ type configSnapshotIngressGateway struct { // to. This is constructed from the ingress-gateway config entry, and uses // the GatewayServices RPC to retrieve them. Upstreams map[IngressListenerKey]structs.Upstreams - - // WatchedDiscoveryChains is a map of upstream.Identifier() -> CancelFunc's - // in order to cancel any watches when the ingress gateway configuration is - // changed. Ingress gateways need this because discovery chain watches are - // added and removed through the lifecycle of single proxycfg.state instance. - WatchedDiscoveryChains map[string]context.CancelFunc } func (c *configSnapshotIngressGateway) IsEmpty() bool { @@ -301,7 +307,6 @@ func (c *configSnapshotIngressGateway) IsEmpty() bool { } return len(c.Upstreams) == 0 && len(c.DiscoveryChain) == 0 && - len(c.WatchedDiscoveryChains) == 0 && len(c.WatchedUpstreams) == 0 && len(c.WatchedUpstreamEndpoints) == 0 } diff --git a/agent/proxycfg/state.go b/agent/proxycfg/state.go index 59d11a2f1..43652dcc3 100644 --- a/agent/proxycfg/state.go +++ b/agent/proxycfg/state.go @@ -40,6 +40,7 @@ const ( serviceConfigIDPrefix = "service-config:" serviceResolverIDPrefix = "service-resolver:" serviceIntentionsIDPrefix = "service-intentions:" + intentionUpstreamsID = "intention-upstreams" svcChecksWatchIDPrefix = cachetype.ServiceHTTPChecksName + ":" serviceIDPrefix = string(structs.UpstreamDestTypeService) + ":" preparedQueryIDPrefix = string(structs.UpstreamDestTypePreparedQuery) + ":" @@ -175,13 +176,14 @@ func newState(ns *structs.NodeService, token string) (*state, error) { func (s *state) Watch() (<-chan ConfigSnapshot, error) { s.ctx, s.cancel = context.WithCancel(context.Background()) - err := s.initWatches() + snap := s.initialConfigSnapshot() + err := s.initWatches(&snap) if err != nil { s.cancel() return nil, err } - go s.run() + go s.run(&snap) return s.snapCh, nil } @@ -195,10 +197,10 @@ func (s *state) Close() error { } // initWatches sets up the watches needed for the particular service -func (s *state) initWatches() error { +func (s *state) initWatches(snap *ConfigSnapshot) error { switch s.kind { case structs.ServiceKindConnectProxy: - return s.initWatchesConnectProxy() + return s.initWatchesConnectProxy(snap) case structs.ServiceKindTerminatingGateway: return s.initWatchesTerminatingGateway() case structs.ServiceKindMeshGateway: @@ -243,7 +245,7 @@ func (s *state) watchConnectProxyService(ctx context.Context, correlationId stri // initWatchesConnectProxy sets up the watches needed based on current proxy registration // state. -func (s *state) initWatchesConnectProxy() error { +func (s *state) initWatchesConnectProxy(snap *ConfigSnapshot) error { // Watch for root changes err := s.cache.Notify(s.ctx, cachetype.ConnectCARootName, &structs.DCSpecificRequest{ Datacenter: s.source.Datacenter, @@ -295,12 +297,36 @@ func (s *state) initWatchesConnectProxy() error { // default the namespace to the namespace of this proxy service currentNamespace := s.proxyID.NamespaceOrDefault() + if s.proxyCfg.TransparentProxy { + // When in transparent proxy we will infer upstreams from intentions with this source + err := s.cache.Notify(s.ctx, cachetype.IntentionUpstreamsName, &structs.ServiceSpecificRequest{ + Datacenter: s.source.Datacenter, + QueryOptions: structs.QueryOptions{Token: s.token}, + ServiceName: s.proxyCfg.DestinationServiceName, + EnterpriseMeta: structs.NewEnterpriseMeta(s.proxyID.NamespaceOrEmpty()), + }, intentionUpstreamsID, s.ch) + if err != nil { + return err + } + } + // Watch for updates to service endpoints for all upstreams for _, u := range s.proxyCfg.Upstreams { + // This can be true if the upstream is a synthetic entry populated from centralized upstream config. + // Watches should not be created for them. + if u.CentrallyConfigured { + continue + } + snap.ConnectProxy.UpstreamConfig[u.Identifier()] = &u + dc := s.source.Datacenter if u.Datacenter != "" { dc = u.Datacenter } + if s.proxyCfg.TransparentProxy && (dc == "" || dc == s.source.Datacenter) { + // In TransparentProxy mode, watches for upstreams in the local DC are handled by the IntentionUpstreams watch. + continue + } ns := currentNamespace if u.DestinationNamespace != "" { @@ -541,12 +567,14 @@ func (s *state) initialConfigSnapshot() ConfigSnapshot { switch s.kind { case structs.ServiceKindConnectProxy: snap.ConnectProxy.DiscoveryChain = make(map[string]*structs.CompiledDiscoveryChain) + snap.ConnectProxy.WatchedDiscoveryChains = make(map[string]context.CancelFunc) snap.ConnectProxy.WatchedUpstreams = make(map[string]map[string]context.CancelFunc) snap.ConnectProxy.WatchedUpstreamEndpoints = make(map[string]map[string]structs.CheckServiceNodes) snap.ConnectProxy.WatchedGateways = make(map[string]map[string]context.CancelFunc) snap.ConnectProxy.WatchedGatewayEndpoints = make(map[string]map[string]structs.CheckServiceNodes) snap.ConnectProxy.WatchedServiceChecks = make(map[structs.ServiceID][]structs.CheckType) snap.ConnectProxy.PreparedQueryEndpoints = make(map[string]structs.CheckServiceNodes) + snap.ConnectProxy.UpstreamConfig = make(map[string]*structs.Upstream) case structs.ServiceKindTerminatingGateway: snap.TerminatingGateway.WatchedServices = make(map[structs.ServiceName]context.CancelFunc) snap.TerminatingGateway.WatchedIntentions = make(map[structs.ServiceName]context.CancelFunc) @@ -582,15 +610,13 @@ func (s *state) initialConfigSnapshot() ConfigSnapshot { return snap } -func (s *state) run() { +func (s *state) run(snap *ConfigSnapshot) { // Close the channel we return from Watch when we stop so consumers can stop // watching and clean up their goroutines. It's important we do this here and // not in Close since this routine sends on this chan and so might panic if it // gets closed from another goroutine. defer close(s.snapCh) - snap := s.initialConfigSnapshot() - // This turns out to be really fiddly/painful by just using time.Timer.C // directly in the code below since you can't detect when a timer is stopped // vs waiting in order to know to reset it. So just use a chan to send @@ -605,7 +631,7 @@ func (s *state) run() { case u := <-s.ch: s.logger.Trace("A blocking query returned; handling snapshot update") - if err := s.handleUpdate(u, &snap); err != nil { + if err := s.handleUpdate(u, snap); err != nil { s.logger.Error("Failed to handle update from watch", "id", u.CorrelationID, "error", err, ) @@ -734,6 +760,68 @@ func (s *state) handleUpdateConnectProxy(u cache.UpdateEvent, snap *ConfigSnapsh } snap.ConnectProxy.IntentionsSet = true + case u.CorrelationID == intentionUpstreamsID: + resp, ok := u.Result.(*structs.IndexedServiceList) + if !ok { + return fmt.Errorf("invalid type for response %T", u.Result) + } + + seenServices := make(map[string]struct{}) + for _, svc := range resp.Services { + seenServices[svc.String()] = struct{}{} + + cfgMap := make(map[string]interface{}) + u, ok := snap.ConnectProxy.UpstreamConfig[svc.String()] + if ok { + cfgMap = u.Config + } + + cfg, err := parseReducedUpstreamConfig(cfgMap) + if err != nil { + // Don't hard fail on a config typo, just warn. We'll fall back on + // the plain discovery chain if there is an error so it's safe to + // continue. + s.logger.Warn("failed to parse upstream config", + "upstream", u.Identifier(), + "error", err, + ) + } + + err = s.watchDiscoveryChain(snap, cfg, svc.String(), svc.Name, svc.NamespaceOrDefault()) + if err != nil { + return fmt.Errorf("failed to watch discovery chain for %s: %v", svc.String(), err) + } + } + + // Clean up data from services that were not in the update + for sn := range snap.ConnectProxy.WatchedUpstreams { + if _, ok := seenServices[sn]; !ok { + delete(snap.ConnectProxy.WatchedUpstreams, sn) + } + } + for sn := range snap.ConnectProxy.WatchedUpstreamEndpoints { + if _, ok := seenServices[sn]; !ok { + delete(snap.ConnectProxy.WatchedUpstreamEndpoints, sn) + } + } + for sn := range snap.ConnectProxy.WatchedGateways { + if _, ok := seenServices[sn]; !ok { + delete(snap.ConnectProxy.WatchedGateways, sn) + } + } + for sn := range snap.ConnectProxy.WatchedGatewayEndpoints { + if _, ok := seenServices[sn]; !ok { + delete(snap.ConnectProxy.WatchedGatewayEndpoints, sn) + } + } + for sn, cancelFn := range snap.ConnectProxy.WatchedDiscoveryChains { + if _, ok := seenServices[sn]; !ok { + cancelFn() + delete(snap.ConnectProxy.WatchedDiscoveryChains, sn) + delete(snap.ConnectProxy.DiscoveryChain, sn) + } + } + case strings.HasPrefix(u.CorrelationID, "upstream:"+preparedQueryIDPrefix): resp, ok := u.Result.(*structs.PreparedQueryExecuteResponse) if !ok { @@ -1465,9 +1553,9 @@ func (s *state) handleUpdateIngressGateway(u cache.UpdateEvent, snap *ConfigSnap for _, service := range services.Services { u := makeUpstream(service) - err := s.watchIngressDiscoveryChain(snap, u) + err := s.watchDiscoveryChain(snap, reducedUpstreamConfig{}, u.Identifier(), u.DestinationName, u.DestinationNamespace) if err != nil { - return err + return fmt.Errorf("failed to watch discovery chain for %s: %v", u.Identifier(), err) } watchedSvcs[u.Identifier()] = struct{}{} @@ -1515,25 +1603,35 @@ func makeUpstream(g *structs.GatewayService) structs.Upstream { return upstream } -func (s *state) watchIngressDiscoveryChain(snap *ConfigSnapshot, u structs.Upstream) error { - if _, ok := snap.IngressGateway.WatchedDiscoveryChains[u.Identifier()]; ok { +func (s *state) watchDiscoveryChain(snap *ConfigSnapshot, cfg reducedUpstreamConfig, id, name, namespace string) error { + if _, ok := snap.ConnectProxy.WatchedDiscoveryChains[id]; ok { return nil } ctx, cancel := context.WithCancel(s.ctx) err := s.cache.Notify(ctx, cachetype.CompiledDiscoveryChainName, &structs.DiscoveryChainRequest{ - Datacenter: s.source.Datacenter, - QueryOptions: structs.QueryOptions{Token: s.token}, - Name: u.DestinationName, - EvaluateInDatacenter: s.source.Datacenter, - EvaluateInNamespace: u.DestinationNamespace, - }, "discovery-chain:"+u.Identifier(), s.ch) + Datacenter: s.source.Datacenter, + QueryOptions: structs.QueryOptions{Token: s.token}, + Name: name, + EvaluateInDatacenter: s.source.Datacenter, + EvaluateInNamespace: namespace, + OverrideProtocol: cfg.Protocol, + OverrideConnectTimeout: cfg.ConnectTimeout(), + }, "discovery-chain:"+id, s.ch) if err != nil { cancel() return err } - snap.IngressGateway.WatchedDiscoveryChains[u.Identifier()] = cancel + switch s.kind { + case structs.ServiceKindIngressGateway: + snap.IngressGateway.WatchedDiscoveryChains[id] = cancel + case structs.ServiceKindConnectProxy: + snap.ConnectProxy.WatchedDiscoveryChains[id] = cancel + default: + return fmt.Errorf("unsupported kind %s", s.kind) + } + return nil } diff --git a/agent/proxycfg/state_test.go b/agent/proxycfg/state_test.go index 19a0ed22b..b2f1295b4 100644 --- a/agent/proxycfg/state_test.go +++ b/agent/proxycfg/state_test.go @@ -249,6 +249,17 @@ func genVerifyIntentionWatch(expectedService string, expectedDatacenter string) } } +func genVerifyIntentionUpstreamsWatch(expectedService string, expectedDatacenter string) verifyWatchRequest { + return func(t testing.TB, cacheType string, request cache.Request) { + require.Equal(t, cachetype.IntentionUpstreamsName, cacheType) + + reqReal, ok := request.(*structs.ServiceSpecificRequest) + require.True(t, ok) + require.Equal(t, expectedDatacenter, reqReal.Datacenter) + require.Equal(t, expectedService, reqReal.ServiceName) + } +} + func genVerifyPreparedQueryWatch(expectedName string, expectedDatacenter string) verifyWatchRequest { return func(t testing.TB, cacheType string, request cache.Request) { require.Equal(t, cachetype.PreparedQueryName, cacheType) @@ -1503,6 +1514,247 @@ func TestState_WatchesAndUpdates(t *testing.T) { }, }, }, + "transparent-proxy-initial": { + ns: structs.NodeService{ + Kind: structs.ServiceKindConnectProxy, + ID: "api-proxy", + Service: "api-proxy", + Address: "10.0.1.1", + Proxy: structs.ConnectProxyConfig{ + DestinationServiceName: "api", + TransparentProxy: true, + }, + }, + sourceDC: "dc1", + stages: []verificationStage{ + { + requiredWatches: map[string]verifyWatchRequest{ + rootsWatchID: genVerifyRootsWatch("dc1"), + intentionUpstreamsID: genVerifyServiceSpecificRequest(intentionUpstreamsID, + "api", "", "dc1", false), + leafWatchID: genVerifyLeafWatch("api", "dc1"), + intentionsWatchID: genVerifyIntentionWatch("api", "dc1"), + }, + verifySnapshot: func(t testing.TB, snap *ConfigSnapshot) { + require.False(t, snap.Valid(), "proxy without roots/leaf/intentions is not valid") + require.True(t, snap.ConnectProxy.IsEmpty()) + require.True(t, snap.MeshGateway.IsEmpty()) + require.True(t, snap.IngressGateway.IsEmpty()) + require.True(t, snap.TerminatingGateway.IsEmpty()) + }, + }, + { + events: []cache.UpdateEvent{ + rootWatchEvent(), + { + CorrelationID: leafWatchID, + Result: issuedCert, + Err: nil, + }, + { + CorrelationID: intentionsWatchID, + Result: TestIntentions(), + Err: nil, + }, + }, + verifySnapshot: func(t testing.TB, snap *ConfigSnapshot) { + require.True(t, snap.Valid(), "proxy with roots/leaf/intentions is valid") + require.Equal(t, indexedRoots, snap.Roots) + require.Equal(t, issuedCert, snap.Leaf()) + require.Equal(t, TestIntentions().Matches[0], snap.ConnectProxy.Intentions) + require.True(t, snap.MeshGateway.IsEmpty()) + require.True(t, snap.IngressGateway.IsEmpty()) + require.True(t, snap.TerminatingGateway.IsEmpty()) + }, + }, + }, + }, + "transparent-proxy-handle-update": { + ns: structs.NodeService{ + Kind: structs.ServiceKindConnectProxy, + ID: "api-proxy", + Service: "api-proxy", + Address: "10.0.1.1", + Proxy: structs.ConnectProxyConfig{ + DestinationServiceName: "api", + TransparentProxy: true, + }, + }, + sourceDC: "dc1", + stages: []verificationStage{ + // Empty on initialization + { + requiredWatches: map[string]verifyWatchRequest{ + rootsWatchID: genVerifyRootsWatch("dc1"), + intentionUpstreamsID: genVerifyServiceSpecificRequest(intentionUpstreamsID, + "api", "", "dc1", false), + leafWatchID: genVerifyLeafWatch("api", "dc1"), + intentionsWatchID: genVerifyIntentionWatch("api", "dc1"), + }, + verifySnapshot: func(t testing.TB, snap *ConfigSnapshot) { + require.False(t, snap.Valid(), "proxy without roots/leaf/intentions is not valid") + require.True(t, snap.ConnectProxy.IsEmpty()) + require.True(t, snap.MeshGateway.IsEmpty()) + require.True(t, snap.IngressGateway.IsEmpty()) + require.True(t, snap.TerminatingGateway.IsEmpty()) + }, + }, + // Valid snapshot after roots, leaf, and intentions + { + events: []cache.UpdateEvent{ + rootWatchEvent(), + { + CorrelationID: leafWatchID, + Result: issuedCert, + Err: nil, + }, + { + CorrelationID: intentionsWatchID, + Result: TestIntentions(), + Err: nil, + }, + }, + verifySnapshot: func(t testing.TB, snap *ConfigSnapshot) { + require.True(t, snap.Valid(), "proxy with roots/leaf/intentions is valid") + require.Equal(t, indexedRoots, snap.Roots) + require.Equal(t, issuedCert, snap.Leaf()) + require.Equal(t, TestIntentions().Matches[0], snap.ConnectProxy.Intentions) + require.True(t, snap.MeshGateway.IsEmpty()) + require.True(t, snap.IngressGateway.IsEmpty()) + require.True(t, snap.TerminatingGateway.IsEmpty()) + }, + }, + // Receiving an intention should lead to spinning up a discovery chain watch + { + requiredWatches: map[string]verifyWatchRequest{ + rootsWatchID: genVerifyRootsWatch("dc1"), + intentionUpstreamsID: genVerifyServiceSpecificRequest(intentionUpstreamsID, + "api", "", "dc1", false), + leafWatchID: genVerifyLeafWatch("api", "dc1"), + intentionsWatchID: genVerifyIntentionWatch("api", "dc1"), + }, + events: []cache.UpdateEvent{ + { + CorrelationID: intentionUpstreamsID, + Result: &structs.IndexedServiceList{ + Services: structs.ServiceList{ + db, + }, + }, + Err: nil, + }, + }, + verifySnapshot: func(t testing.TB, snap *ConfigSnapshot) { + require.True(t, snap.Valid(), "should still be valid") + + // Should start watch for db's chain + require.Contains(t, snap.ConnectProxy.WatchedDiscoveryChains, dbStr) + + // Should not have results yet + require.Empty(t, snap.ConnectProxy.DiscoveryChain) + }, + }, + // Discovery chain updates should be stored + { + requiredWatches: map[string]verifyWatchRequest{ + "discovery-chain:" + dbStr: genVerifyDiscoveryChainWatch(&structs.DiscoveryChainRequest{ + Name: dbStr, + EvaluateInDatacenter: "dc1", + EvaluateInNamespace: "default", + Datacenter: "dc1", + }), + }, + events: []cache.UpdateEvent{ + { + CorrelationID: "discovery-chain:" + dbStr, + Result: &structs.DiscoveryChainResponse{ + Chain: discoverychain.TestCompileConfigEntries(t, "db", "default", "dc1", "trustdomain.consul", "dc1", nil), + }, + Err: nil, + }, + }, + verifySnapshot: func(t testing.TB, snap *ConfigSnapshot) { + require.Len(t, snap.ConnectProxy.WatchedUpstreams, 1) + require.Len(t, snap.ConnectProxy.WatchedUpstreams[dbStr], 1) + }, + }, + { + requiredWatches: map[string]verifyWatchRequest{ + "upstream-target:db.default.dc1:db": genVerifyServiceWatch("db", "", "dc1", true), + }, + events: []cache.UpdateEvent{ + { + CorrelationID: "upstream-target:db.default.dc1:db", + Result: &structs.IndexedCheckServiceNodes{ + Nodes: structs.CheckServiceNodes{ + { + Node: &structs.Node{ + Node: "node1", + Address: "127.0.0.1", + }, + Service: &structs.NodeService{ + ID: "db1", + Service: "db", + }, + }, + }, + }, + Err: nil, + }, + }, + verifySnapshot: func(t testing.TB, snap *ConfigSnapshot) { + require.Len(t, snap.ConnectProxy.WatchedUpstreamEndpoints, 1) + require.Contains(t, snap.ConnectProxy.WatchedUpstreamEndpoints, dbStr) + require.Len(t, snap.ConnectProxy.WatchedUpstreamEndpoints[dbStr], 1) + require.Contains(t, snap.ConnectProxy.WatchedUpstreamEndpoints[dbStr], "db.default.dc1") + require.Equal(t, snap.ConnectProxy.WatchedUpstreamEndpoints[dbStr]["db.default.dc1"], + structs.CheckServiceNodes{ + { + Node: &structs.Node{ + Node: "node1", + Address: "127.0.0.1", + }, + Service: &structs.NodeService{ + ID: "db1", + Service: "db", + }, + }, + }, + ) + }, + }, + // Empty list of upstreams should clean everything up + { + requiredWatches: map[string]verifyWatchRequest{ + rootsWatchID: genVerifyRootsWatch("dc1"), + intentionUpstreamsID: genVerifyServiceSpecificRequest(intentionUpstreamsID, + "api", "", "dc1", false), + leafWatchID: genVerifyLeafWatch("api", "dc1"), + intentionsWatchID: genVerifyIntentionWatch("api", "dc1"), + }, + events: []cache.UpdateEvent{ + { + CorrelationID: intentionUpstreamsID, + Result: &structs.IndexedServiceList{ + Services: structs.ServiceList{}, + }, + Err: nil, + }, + }, + verifySnapshot: func(t testing.TB, snap *ConfigSnapshot) { + require.True(t, snap.Valid(), "should still be valid") + + // Empty intention upstreams leads to cancelling all associated watches + require.Empty(t, snap.ConnectProxy.WatchedDiscoveryChains) + require.Empty(t, snap.ConnectProxy.WatchedUpstreams) + require.Empty(t, snap.ConnectProxy.WatchedUpstreamEndpoints) + require.Empty(t, snap.ConnectProxy.WatchedGateways) + require.Empty(t, snap.ConnectProxy.WatchedGatewayEndpoints) + require.Empty(t, snap.ConnectProxy.DiscoveryChain) + }, + }, + }, + }, "connect-proxy": newConnectProxyCase(structs.MeshGatewayModeDefault), "connect-proxy-mesh-gateway-local": newConnectProxyCase(structs.MeshGatewayModeLocal), } @@ -1535,12 +1787,12 @@ func TestState_WatchesAndUpdates(t *testing.T) { // setup the ctx as initWatches expects this to be there state.ctx, state.cancel = context.WithCancel(context.Background()) - // ensure the initial watch setup did not error - require.NoError(t, state.initWatches()) - // get the initial configuration snapshot snap := state.initialConfigSnapshot() + // ensure the initial watch setup did not error + require.NoError(t, state.initWatches(&snap)) + //-------------------------------------------------------------------- // // All the nested subtests here are to make failures easier to diff --git a/agent/proxycfg/testing.go b/agent/proxycfg/testing.go index 1c0174493..66546f7fc 100644 --- a/agent/proxycfg/testing.go +++ b/agent/proxycfg/testing.go @@ -653,6 +653,8 @@ func TestConfigSnapshot(t testing.T) *ConfigSnapshot { t, "db", "default", "dc1", connect.TestClusterID+".consul", "dc1", nil) + upstreams := structs.TestUpstreams(t) + return &ConfigSnapshot{ Kind: structs.ServiceKindConnectProxy, Service: "web-sidecar-proxy", @@ -667,12 +669,13 @@ func TestConfigSnapshot(t testing.T) *ConfigSnapshot { Config: map[string]interface{}{ "foo": "bar", }, - Upstreams: structs.TestUpstreams(t), + Upstreams: upstreams, }, Roots: roots, ConnectProxy: configSnapshotConnectProxy{ ConfigSnapshotUpstreams: ConfigSnapshotUpstreams{ - Leaf: leaf, + Leaf: leaf, + UpstreamConfig: upstreams.ToMap(), DiscoveryChain: map[string]*structs.CompiledDiscoveryChain{ "db": dbChain, }, @@ -1315,6 +1318,7 @@ func setupTestVariationConfigEntriesAndSnapshot( dbChain := discoverychain.TestCompileConfigEntries(t, "db", "default", "dc1", connect.TestClusterID+".consul", "dc1", compileSetup, entries...) + upstreams := structs.TestUpstreams(t) snap := ConfigSnapshotUpstreams{ Leaf: leaf, DiscoveryChain: map[string]*structs.CompiledDiscoveryChain{ @@ -1325,6 +1329,7 @@ func setupTestVariationConfigEntriesAndSnapshot( "db.default.dc1": TestUpstreamNodes(t), }, }, + UpstreamConfig: upstreams.ToMap(), } switch variation { diff --git a/agent/structs/connect_proxy_config.go b/agent/structs/connect_proxy_config.go index b8708fab1..4e0af9cab 100644 --- a/agent/structs/connect_proxy_config.go +++ b/agent/structs/connect_proxy_config.go @@ -215,6 +215,15 @@ func (us Upstreams) ToAPI() []api.Upstream { return a } +func (us Upstreams) ToMap() map[string]*Upstream { + upstreamMap := make(map[string]*Upstream) + + for i := range us { + upstreamMap[us[i].Identifier()] = &us[i] + } + return upstreamMap +} + // UpstreamsFromAPI is a helper for converting api.Upstream to Upstream. func UpstreamsFromAPI(us []api.Upstream) Upstreams { a := make([]Upstream, len(us))