diff --git a/.changelog/10391.txt b/.changelog/10391.txt new file mode 100644 index 000000000..3570a542d --- /dev/null +++ b/.changelog/10391.txt @@ -0,0 +1,3 @@ +```release-note:bug +proxycfg: Ensure that endpoints for explicit upstreams in other datacenters are watched in transparent mode. +``` diff --git a/agent/proxycfg/state.go b/agent/proxycfg/state.go index 63269378f..64bfc424e 100644 --- a/agent/proxycfg/state.go +++ b/agent/proxycfg/state.go @@ -380,7 +380,7 @@ func (s *state) initWatchesConnectProxy(snap *ConfigSnapshot) error { OverrideConnectTimeout: cfg.ConnectTimeout(), }, "discovery-chain:"+u.Identifier(), s.ch) if err != nil { - return err + return fmt.Errorf("failed to watch discovery chain for %s: %v", u.Identifier(), err) } default: @@ -815,6 +815,7 @@ func (s *state) handleUpdateConnectProxy(u cache.UpdateEvent, snap *ConfigSnapsh id: svc.String(), name: svc.Name, namespace: svc.NamespaceOrDefault(), + datacenter: s.source.Datacenter, cfg: cfg, meshGateway: meshGateway, } @@ -826,26 +827,46 @@ func (s *state) handleUpdateConnectProxy(u cache.UpdateEvent, snap *ConfigSnapsh // Clean up data from services that were not in the update for sn := range snap.ConnectProxy.WatchedUpstreams { + upstream := snap.ConnectProxy.UpstreamConfig[sn] + if upstream.Datacenter != "" && upstream.Datacenter != s.source.Datacenter { + continue + } if _, ok := seenServices[sn]; !ok { delete(snap.ConnectProxy.WatchedUpstreams, sn) } } for sn := range snap.ConnectProxy.WatchedUpstreamEndpoints { + upstream := snap.ConnectProxy.UpstreamConfig[sn] + if upstream.Datacenter != "" && upstream.Datacenter != s.source.Datacenter { + continue + } if _, ok := seenServices[sn]; !ok { delete(snap.ConnectProxy.WatchedUpstreamEndpoints, sn) } } for sn := range snap.ConnectProxy.WatchedGateways { + upstream := snap.ConnectProxy.UpstreamConfig[sn] + if upstream.Datacenter != "" && upstream.Datacenter != s.source.Datacenter { + continue + } if _, ok := seenServices[sn]; !ok { delete(snap.ConnectProxy.WatchedGateways, sn) } } for sn := range snap.ConnectProxy.WatchedGatewayEndpoints { + upstream := snap.ConnectProxy.UpstreamConfig[sn] + if upstream.Datacenter != "" && upstream.Datacenter != s.source.Datacenter { + continue + } if _, ok := seenServices[sn]; !ok { delete(snap.ConnectProxy.WatchedGatewayEndpoints, sn) } } for sn, cancelFn := range snap.ConnectProxy.WatchedDiscoveryChains { + upstream := snap.ConnectProxy.UpstreamConfig[sn] + if upstream.Datacenter != "" && upstream.Datacenter != s.source.Datacenter { + continue + } if _, ok := seenServices[sn]; !ok { cancelFn() delete(snap.ConnectProxy.WatchedDiscoveryChains, sn) @@ -1721,9 +1742,10 @@ func (s *state) handleUpdateIngressGateway(u cache.UpdateEvent, snap *ConfigSnap u := makeUpstream(service) watchOpts := discoveryChainWatchOpts{ - id: u.Identifier(), - name: u.DestinationName, - namespace: u.DestinationNamespace, + id: u.Identifier(), + name: u.DestinationName, + namespace: u.DestinationNamespace, + datacenter: s.source.Datacenter, } err := s.watchDiscoveryChain(snap, watchOpts) if err != nil { @@ -1781,6 +1803,7 @@ type discoveryChainWatchOpts struct { id string name string namespace string + datacenter string cfg reducedUpstreamConfig meshGateway structs.MeshGatewayConfig } @@ -1795,7 +1818,7 @@ func (s *state) watchDiscoveryChain(snap *ConfigSnapshot, opts discoveryChainWat Datacenter: s.source.Datacenter, QueryOptions: structs.QueryOptions{Token: s.token}, Name: opts.name, - EvaluateInDatacenter: s.source.Datacenter, + EvaluateInDatacenter: opts.datacenter, EvaluateInNamespace: opts.namespace, OverrideProtocol: opts.cfg.Protocol, OverrideConnectTimeout: opts.cfg.ConnectTimeout(), diff --git a/agent/proxycfg/state_test.go b/agent/proxycfg/state_test.go index 22e99477e..37835d54a 100644 --- a/agent/proxycfg/state_test.go +++ b/agent/proxycfg/state_test.go @@ -3,11 +3,12 @@ package proxycfg import ( "context" "fmt" - "github.com/hashicorp/consul/agent/connect" "sync" "testing" "time" + "github.com/hashicorp/consul/agent/connect" + "github.com/stretchr/testify/require" "github.com/hashicorp/consul/agent/cache" @@ -362,6 +363,10 @@ func ingressConfigWatchEvent(tlsEnabled bool) cache.UpdateEvent { } } +func upstreamIDForDC2(name string) string { + return fmt.Sprintf("%s?dc=dc2", name) +} + // This test is meant to exercise the various parts of the cache watching done by the state as // well as its management of the ConfigSnapshot // @@ -1942,6 +1947,179 @@ func TestState_WatchesAndUpdates(t *testing.T) { }, }, }, + // Receiving an empty upstreams from Intentions list shouldn't delete explicit upstream watches + "transparent-proxy-handle-update-explicit-cross-dc": { + ns: structs.NodeService{ + Kind: structs.ServiceKindConnectProxy, + ID: "api-proxy", + Service: "api-proxy", + Address: "10.0.1.1", + Proxy: structs.ConnectProxyConfig{ + DestinationServiceName: "api", + Mode: structs.ProxyModeTransparent, + Upstreams: structs.Upstreams{ + { + CentrallyConfigured: true, + DestinationName: structs.WildcardSpecifier, + DestinationNamespace: structs.WildcardSpecifier, + Config: map[string]interface{}{ + "connect_timeout_ms": 6000, + }, + MeshGateway: structs.MeshGatewayConfig{Mode: structs.MeshGatewayModeRemote}, + }, + { + DestinationName: db.Name, + DestinationNamespace: db.NamespaceOrDefault(), + Datacenter: "dc2", + MeshGateway: structs.MeshGatewayConfig{Mode: structs.MeshGatewayModeLocal}, + }, + }, + }, + }, + sourceDC: "dc1", + stages: []verificationStage{ + // Empty on initialization + { + requiredWatches: map[string]verifyWatchRequest{ + rootsWatchID: genVerifyRootsWatch("dc1"), + intentionUpstreamsID: genVerifyServiceSpecificRequest(intentionUpstreamsID, + "api", "", "dc1", false), + leafWatchID: genVerifyLeafWatch("api", "dc1"), + intentionsWatchID: genVerifyIntentionWatch("api", "dc1"), + meshConfigEntryID: genVerifyMeshConfigWatch("dc1"), + "discovery-chain:" + upstreamIDForDC2(db.String()): genVerifyDiscoveryChainWatch(&structs.DiscoveryChainRequest{ + Name: "db", + EvaluateInDatacenter: "dc2", + EvaluateInNamespace: "default", + Datacenter: "dc1", + OverrideMeshGateway: structs.MeshGatewayConfig{Mode: structs.MeshGatewayModeLocal}, + }), + }, + verifySnapshot: func(t testing.TB, snap *ConfigSnapshot) { + require.False(t, snap.Valid(), "proxy without roots/leaf/intentions is not valid") + require.True(t, snap.MeshGateway.IsEmpty()) + require.True(t, snap.IngressGateway.IsEmpty()) + require.True(t, snap.TerminatingGateway.IsEmpty()) + + // Centrally configured upstream defaults should be stored so that upstreams from intentions can inherit them + require.Len(t, snap.ConnectProxy.UpstreamConfig, 2) + + wc := structs.NewServiceName(structs.WildcardSpecifier, structs.WildcardEnterpriseMeta()) + require.Contains(t, snap.ConnectProxy.UpstreamConfig, wc.String()) + require.Contains(t, snap.ConnectProxy.UpstreamConfig, upstreamIDForDC2(db.String())) + }, + }, + // Valid snapshot after roots, leaf, and intentions + { + events: []cache.UpdateEvent{ + rootWatchEvent(), + { + CorrelationID: leafWatchID, + Result: issuedCert, + Err: nil, + }, + { + CorrelationID: intentionsWatchID, + Result: TestIntentions(), + Err: nil, + }, + { + CorrelationID: meshConfigEntryID, + Result: &structs.ConfigEntryResponse{ + Entry: &structs.MeshConfigEntry{ + TransparentProxy: structs.TransparentProxyMeshConfig{}, + }, + }, + Err: nil, + }, + }, + verifySnapshot: func(t testing.TB, snap *ConfigSnapshot) { + require.True(t, snap.Valid(), "proxy with roots/leaf/intentions is valid") + require.Equal(t, indexedRoots, snap.Roots) + require.Equal(t, issuedCert, snap.Leaf()) + require.Equal(t, TestIntentions().Matches[0], snap.ConnectProxy.Intentions) + require.True(t, snap.MeshGateway.IsEmpty()) + require.True(t, snap.IngressGateway.IsEmpty()) + require.True(t, snap.TerminatingGateway.IsEmpty()) + require.True(t, snap.ConnectProxy.MeshConfigSet) + require.NotNil(t, snap.ConnectProxy.MeshConfig) + }, + }, + // Discovery chain updates should be stored + { + requiredWatches: map[string]verifyWatchRequest{ + "discovery-chain:" + upstreamIDForDC2(db.String()): genVerifyDiscoveryChainWatch(&structs.DiscoveryChainRequest{ + Name: "db", + EvaluateInDatacenter: "dc2", + EvaluateInNamespace: "default", + Datacenter: "dc1", + OverrideMeshGateway: structs.MeshGatewayConfig{Mode: structs.MeshGatewayModeLocal}, + }), + }, + events: []cache.UpdateEvent{ + { + CorrelationID: "discovery-chain:" + upstreamIDForDC2(db.String()), + Result: &structs.DiscoveryChainResponse{ + Chain: discoverychain.TestCompileConfigEntries(t, "db", "default", "dc2", "trustdomain.consul", "dc1", + func(req *discoverychain.CompileRequest) { + req.OverrideMeshGateway.Mode = structs.MeshGatewayModeLocal + }), + }, + Err: nil, + }, + }, + verifySnapshot: func(t testing.TB, snap *ConfigSnapshot) { + require.Len(t, snap.ConnectProxy.WatchedGateways, 1) + require.Len(t, snap.ConnectProxy.WatchedGateways[upstreamIDForDC2(db.String())], 1) + require.Len(t, snap.ConnectProxy.WatchedUpstreams, 1) + require.Len(t, snap.ConnectProxy.WatchedUpstreams[upstreamIDForDC2(db.String())], 1) + }, + }, + // Empty list of upstreams should only clean up implicit upstreams. The explicit upstream db should not + // be deleted from the snapshot. + { + requiredWatches: map[string]verifyWatchRequest{ + rootsWatchID: genVerifyRootsWatch("dc1"), + intentionUpstreamsID: genVerifyServiceSpecificRequest(intentionUpstreamsID, + "api", "", "dc1", false), + leafWatchID: genVerifyLeafWatch("api", "dc1"), + intentionsWatchID: genVerifyIntentionWatch("api", "dc1"), + "discovery-chain:" + upstreamIDForDC2(db.String()): genVerifyDiscoveryChainWatch(&structs.DiscoveryChainRequest{ + Name: "db", + EvaluateInDatacenter: "dc2", + EvaluateInNamespace: "default", + Datacenter: "dc1", + OverrideMeshGateway: structs.MeshGatewayConfig{Mode: structs.MeshGatewayModeLocal}, + }), + }, + events: []cache.UpdateEvent{ + { + CorrelationID: intentionUpstreamsID, + Result: &structs.IndexedServiceList{ + Services: structs.ServiceList{}, + }, + Err: nil, + }, + }, + verifySnapshot: func(t testing.TB, snap *ConfigSnapshot) { + require.True(t, snap.Valid(), "should still be valid") + + // Explicit upstream discovery chain watches don't get stored in these maps because they don't + // get canceled unless the proxy registration is modified. + require.Empty(t, snap.ConnectProxy.WatchedDiscoveryChains) + + // Explicit upstreams should not be deleted when the empty update event happens since that is + // for intention upstreams. + require.Len(t, snap.ConnectProxy.DiscoveryChain, 1) + require.Contains(t, snap.ConnectProxy.DiscoveryChain, upstreamIDForDC2(db.String())) + require.Len(t, snap.ConnectProxy.WatchedGateways, 1) + require.Len(t, snap.ConnectProxy.WatchedGateways[upstreamIDForDC2(db.String())], 1) + require.Len(t, snap.ConnectProxy.WatchedUpstreams, 1) + require.Len(t, snap.ConnectProxy.WatchedUpstreams[upstreamIDForDC2(db.String())], 1) + }, + }, + }, + }, "connect-proxy": newConnectProxyCase(structs.MeshGatewayModeDefault), "connect-proxy-mesh-gateway-local": newConnectProxyCase(structs.MeshGatewayModeLocal), }