From f17a4f07c50f846f6e2a453b33f9fe6d9809a9f1 Mon Sep 17 00:00:00 2001 From: Derek Menteer <105233703+hashi-derek@users.noreply.github.com> Date: Wed, 7 Dec 2022 13:07:42 -0600 Subject: [PATCH] Fix local mesh gateway with peering discovery chains. (#15690) Fix local mesh gateway with peering discovery chains. Prior to this patch, discovery chains with peers would not properly honor the mesh gateway mode for two reasons. 1. An incorrect target upstream ID was used to lookup the mesh gateway mode. To fix this, the parent upstream uid is now used instead of the discovery-chain-target-uid to find the intended mesh gateway mode. 2. The watch for local mesh gateways was never initialized for discovery chains. To fix this, the discovery chains are now scanned, and a local GW watch is spawned if: the mesh gateway mode is local and the target is a peering connection. --- .changelog/15690.txt | 3 +++ agent/proxycfg/connect_proxy.go | 19 ++----------------- agent/proxycfg/state_test.go | 12 ++++++++++++ agent/proxycfg/upstreams.go | 31 +++++++++++++++++++++++++++++++ agent/xds/endpoints.go | 26 +++++++++++++++++++------- 5 files changed, 67 insertions(+), 24 deletions(-) create mode 100644 .changelog/15690.txt diff --git a/.changelog/15690.txt b/.changelog/15690.txt new file mode 100644 index 000000000..75e08b229 --- /dev/null +++ b/.changelog/15690.txt @@ -0,0 +1,3 @@ +```release-note:bug +connect: Fix peering failovers ignoring local mesh gateway configuration. +``` diff --git a/agent/proxycfg/connect_proxy.go b/agent/proxycfg/connect_proxy.go index 416e321ae..f544ef701 100644 --- a/agent/proxycfg/connect_proxy.go +++ b/agent/proxycfg/connect_proxy.go @@ -275,23 +275,8 @@ func (s *handlerConnectProxy) setupWatchesForPeeredUpstream( // If a peered upstream is set to local mesh gw mode, // set up a watch for them. if mgwMode == structs.MeshGatewayModeLocal { - gk := GatewayKey{ - Partition: s.source.NodePartitionOrDefault(), - Datacenter: s.source.Datacenter, - } - if !snapConnectProxy.WatchedLocalGWEndpoints.IsWatched(gk.String()) { - opts := gatewayWatchOpts{ - internalServiceDump: s.dataSources.InternalServiceDump, - notifyCh: s.ch, - source: *s.source, - token: s.token, - key: gk, - } - if err := watchMeshGateway(ctx, opts); err != nil { - return fmt.Errorf("error while watching for local mesh gateway: %w", err) - } - snapConnectProxy.WatchedLocalGWEndpoints.InitWatch(gk.String(), nil) - } + up := &handlerUpstreams{handlerState: s.handlerState} + up.setupWatchForLocalGWEndpoints(ctx, &snapConnectProxy.ConfigSnapshotUpstreams) } else if mgwMode == structs.MeshGatewayModeNone { s.logger.Warn(fmt.Sprintf("invalid mesh gateway mode 'none', defaulting to 'remote' for %q", uid)) } diff --git a/agent/proxycfg/state_test.go b/agent/proxycfg/state_test.go index b072eb630..85cf0228e 100644 --- a/agent/proxycfg/state_test.go +++ b/agent/proxycfg/state_test.go @@ -766,6 +766,12 @@ func TestState_WatchesAndUpdates(t *testing.T) { require.True(t, snap.ConnectProxy.IntentionsSet) require.Equal(t, ixnMatch, snap.ConnectProxy.Intentions) require.True(t, snap.ConnectProxy.MeshConfigSet) + + if meshGatewayProxyConfigValue == structs.MeshGatewayModeLocal { + require.True(t, snap.ConnectProxy.WatchedLocalGWEndpoints.IsWatched("dc1")) + _, ok := snap.ConnectProxy.WatchedLocalGWEndpoints.Get("dc1") + require.False(t, ok) + } }, } @@ -799,6 +805,12 @@ func TestState_WatchesAndUpdates(t *testing.T) { require.True(t, snap.ConnectProxy.IntentionsSet) require.Equal(t, ixnMatch, snap.ConnectProxy.Intentions) + + if meshGatewayProxyConfigValue == structs.MeshGatewayModeLocal { + require.True(t, snap.ConnectProxy.WatchedLocalGWEndpoints.IsWatched("dc1")) + _, ok := snap.ConnectProxy.WatchedLocalGWEndpoints.Get("dc1") + require.False(t, ok) + } }, } diff --git a/agent/proxycfg/upstreams.go b/agent/proxycfg/upstreams.go index a2dc38b9d..4ebbabb65 100644 --- a/agent/proxycfg/upstreams.go +++ b/agent/proxycfg/upstreams.go @@ -326,6 +326,10 @@ func (s *handlerUpstreams) resetWatchesFromChain( if s.source.Datacenter != target.Datacenter || s.proxyID.PartitionOrDefault() != target.Partition { needGateways[gk.String()] = struct{}{} } + // Register a local gateway watch if any targets are pointing to a peer and require a mode of local. + if target.Peer != "" && target.MeshGateway.Mode == structs.MeshGatewayModeLocal { + s.setupWatchForLocalGWEndpoints(ctx, snap) + } } // If the discovery chain's targets do not lead to watching all endpoints @@ -548,3 +552,30 @@ func parseReducedUpstreamConfig(m map[string]interface{}) (reducedUpstreamConfig err := mapstructure.WeakDecode(m, &cfg) return cfg, err } + +func (s *handlerUpstreams) setupWatchForLocalGWEndpoints( + ctx context.Context, + upstreams *ConfigSnapshotUpstreams, +) error { + gk := GatewayKey{ + Partition: s.proxyID.PartitionOrDefault(), + Datacenter: s.source.Datacenter, + } + // If the watch is already initialized, do nothing. + if upstreams.WatchedLocalGWEndpoints.IsWatched(gk.String()) { + return nil + } + + opts := gatewayWatchOpts{ + internalServiceDump: s.dataSources.InternalServiceDump, + notifyCh: s.ch, + source: *s.source, + token: s.token, + key: gk, + } + if err := watchMeshGateway(ctx, opts); err != nil { + return fmt.Errorf("error while watching for local mesh gateway: %w", err) + } + upstreams.WatchedLocalGWEndpoints.InitWatch(gk.String(), nil) + return nil +} diff --git a/agent/xds/endpoints.go b/agent/xds/endpoints.go index cc085cd71..4251dd019 100644 --- a/agent/xds/endpoints.go +++ b/agent/xds/endpoints.go @@ -92,7 +92,7 @@ func (s *ResourceGenerator) endpointsFromSnapshotConnectProxy(cfgSnap *proxycfg. // upstream in clusters.go so that the sets of endpoints generated matches // the sets of clusters. for _, uid := range cfgSnap.ConnectProxy.PeeredUpstreamIDs() { - _, skip := getUpstream(uid) + upstream, skip := getUpstream(uid) if skip { // Discovery chain is not associated with a known explicit or implicit upstream so it is skipped. continue @@ -107,7 +107,11 @@ func (s *ResourceGenerator) endpointsFromSnapshotConnectProxy(cfgSnap *proxycfg. clusterName := generatePeeredClusterName(uid, tbs) - loadAssignment, err := s.makeUpstreamLoadAssignmentForPeerService(cfgSnap, clusterName, uid) + mgwMode := structs.MeshGatewayModeDefault + if upstream != nil { + mgwMode = upstream.MeshGateway.Mode + } + loadAssignment, err := s.makeUpstreamLoadAssignmentForPeerService(cfgSnap, clusterName, uid, mgwMode) if err != nil { return nil, err } @@ -538,7 +542,12 @@ func makePipeEndpoint(path string) *envoy_endpoint_v3.LbEndpoint { } } -func (s *ResourceGenerator) makeUpstreamLoadAssignmentForPeerService(cfgSnap *proxycfg.ConfigSnapshot, clusterName string, uid proxycfg.UpstreamID) (*envoy_endpoint_v3.ClusterLoadAssignment, error) { +func (s *ResourceGenerator) makeUpstreamLoadAssignmentForPeerService( + cfgSnap *proxycfg.ConfigSnapshot, + clusterName string, + uid proxycfg.UpstreamID, + upstreamGatewayMode structs.MeshGatewayMode, +) (*envoy_endpoint_v3.ClusterLoadAssignment, error) { var la *envoy_endpoint_v3.ClusterLoadAssignment upstreamsSnapshot, err := cfgSnap.ToConfigSnapshotUpstreams() @@ -546,11 +555,9 @@ func (s *ResourceGenerator) makeUpstreamLoadAssignmentForPeerService(cfgSnap *pr return la, err } - upstream := cfgSnap.ConnectProxy.UpstreamConfig[uid] - // If an upstream is configured with local mesh gw mode, we make a load assignment // from the gateway endpoints instead of those of the upstreams. - if upstream != nil && upstream.MeshGateway.Mode == structs.MeshGatewayModeLocal { + if upstreamGatewayMode == structs.MeshGatewayModeLocal { localGw, ok := cfgSnap.ConnectProxy.WatchedLocalGWEndpoints.Get(cfgSnap.Locality.String()) if !ok { // local GW is not ready; return early @@ -643,6 +650,11 @@ func (s *ResourceGenerator) endpointsFromDiscoveryChain( } } + mgwMode := structs.MeshGatewayModeDefault + if upstream := cfgSnap.ConnectProxy.UpstreamConfig[uid]; upstream != nil { + mgwMode = upstream.MeshGateway.Mode + } + // Find all resolver nodes. for _, node := range chain.Nodes { if node.Type != structs.DiscoveryGraphNodeTypeResolver { @@ -682,7 +694,7 @@ func (s *ResourceGenerator) endpointsFromDiscoveryChain( s.Logger.Debug("generating endpoints for", "cluster", targetOpt.clusterName) targetUID := proxycfg.NewUpstreamIDFromTargetID(targetOpt.targetID) if targetUID.Peer != "" { - loadAssignment, err := s.makeUpstreamLoadAssignmentForPeerService(cfgSnap, targetOpt.clusterName, targetUID) + loadAssignment, err := s.makeUpstreamLoadAssignmentForPeerService(cfgSnap, targetOpt.clusterName, targetUID, mgwMode) if err != nil { return nil, err }