diff --git a/.changelog/15690.txt b/.changelog/15690.txt new file mode 100644 index 000000000..75e08b229 --- /dev/null +++ b/.changelog/15690.txt @@ -0,0 +1,3 @@ +```release-note:bug +connect: Fix peering failovers ignoring local mesh gateway configuration. +``` diff --git a/agent/proxycfg/connect_proxy.go b/agent/proxycfg/connect_proxy.go index 416e321ae..f544ef701 100644 --- a/agent/proxycfg/connect_proxy.go +++ b/agent/proxycfg/connect_proxy.go @@ -275,23 +275,8 @@ func (s *handlerConnectProxy) setupWatchesForPeeredUpstream( // If a peered upstream is set to local mesh gw mode, // set up a watch for them. if mgwMode == structs.MeshGatewayModeLocal { - gk := GatewayKey{ - Partition: s.source.NodePartitionOrDefault(), - Datacenter: s.source.Datacenter, - } - if !snapConnectProxy.WatchedLocalGWEndpoints.IsWatched(gk.String()) { - opts := gatewayWatchOpts{ - internalServiceDump: s.dataSources.InternalServiceDump, - notifyCh: s.ch, - source: *s.source, - token: s.token, - key: gk, - } - if err := watchMeshGateway(ctx, opts); err != nil { - return fmt.Errorf("error while watching for local mesh gateway: %w", err) - } - snapConnectProxy.WatchedLocalGWEndpoints.InitWatch(gk.String(), nil) - } + up := &handlerUpstreams{handlerState: s.handlerState} + up.setupWatchForLocalGWEndpoints(ctx, &snapConnectProxy.ConfigSnapshotUpstreams) } else if mgwMode == structs.MeshGatewayModeNone { s.logger.Warn(fmt.Sprintf("invalid mesh gateway mode 'none', defaulting to 'remote' for %q", uid)) } diff --git a/agent/proxycfg/state_test.go b/agent/proxycfg/state_test.go index b072eb630..85cf0228e 100644 --- a/agent/proxycfg/state_test.go +++ b/agent/proxycfg/state_test.go @@ -766,6 +766,12 @@ func TestState_WatchesAndUpdates(t *testing.T) { require.True(t, snap.ConnectProxy.IntentionsSet) require.Equal(t, ixnMatch, snap.ConnectProxy.Intentions) require.True(t, snap.ConnectProxy.MeshConfigSet) + + if meshGatewayProxyConfigValue == structs.MeshGatewayModeLocal { + require.True(t, snap.ConnectProxy.WatchedLocalGWEndpoints.IsWatched("dc1")) + _, ok := snap.ConnectProxy.WatchedLocalGWEndpoints.Get("dc1") + require.False(t, ok) + } }, } @@ -799,6 +805,12 @@ func TestState_WatchesAndUpdates(t *testing.T) { require.True(t, snap.ConnectProxy.IntentionsSet) require.Equal(t, ixnMatch, snap.ConnectProxy.Intentions) + + if meshGatewayProxyConfigValue == structs.MeshGatewayModeLocal { + require.True(t, snap.ConnectProxy.WatchedLocalGWEndpoints.IsWatched("dc1")) + _, ok := snap.ConnectProxy.WatchedLocalGWEndpoints.Get("dc1") + require.False(t, ok) + } }, } diff --git a/agent/proxycfg/upstreams.go b/agent/proxycfg/upstreams.go index a2dc38b9d..4ebbabb65 100644 --- a/agent/proxycfg/upstreams.go +++ b/agent/proxycfg/upstreams.go @@ -326,6 +326,10 @@ func (s *handlerUpstreams) resetWatchesFromChain( if s.source.Datacenter != target.Datacenter || s.proxyID.PartitionOrDefault() != target.Partition { needGateways[gk.String()] = struct{}{} } + // Register a local gateway watch if any targets are pointing to a peer and require a mode of local. + if target.Peer != "" && target.MeshGateway.Mode == structs.MeshGatewayModeLocal { + s.setupWatchForLocalGWEndpoints(ctx, snap) + } } // If the discovery chain's targets do not lead to watching all endpoints @@ -548,3 +552,30 @@ func parseReducedUpstreamConfig(m map[string]interface{}) (reducedUpstreamConfig err := mapstructure.WeakDecode(m, &cfg) return cfg, err } + +func (s *handlerUpstreams) setupWatchForLocalGWEndpoints( + ctx context.Context, + upstreams *ConfigSnapshotUpstreams, +) error { + gk := GatewayKey{ + Partition: s.proxyID.PartitionOrDefault(), + Datacenter: s.source.Datacenter, + } + // If the watch is already initialized, do nothing. + if upstreams.WatchedLocalGWEndpoints.IsWatched(gk.String()) { + return nil + } + + opts := gatewayWatchOpts{ + internalServiceDump: s.dataSources.InternalServiceDump, + notifyCh: s.ch, + source: *s.source, + token: s.token, + key: gk, + } + if err := watchMeshGateway(ctx, opts); err != nil { + return fmt.Errorf("error while watching for local mesh gateway: %w", err) + } + upstreams.WatchedLocalGWEndpoints.InitWatch(gk.String(), nil) + return nil +} diff --git a/agent/xds/endpoints.go b/agent/xds/endpoints.go index cc085cd71..4251dd019 100644 --- a/agent/xds/endpoints.go +++ b/agent/xds/endpoints.go @@ -92,7 +92,7 @@ func (s *ResourceGenerator) endpointsFromSnapshotConnectProxy(cfgSnap *proxycfg. // upstream in clusters.go so that the sets of endpoints generated matches // the sets of clusters. for _, uid := range cfgSnap.ConnectProxy.PeeredUpstreamIDs() { - _, skip := getUpstream(uid) + upstream, skip := getUpstream(uid) if skip { // Discovery chain is not associated with a known explicit or implicit upstream so it is skipped. continue @@ -107,7 +107,11 @@ func (s *ResourceGenerator) endpointsFromSnapshotConnectProxy(cfgSnap *proxycfg. clusterName := generatePeeredClusterName(uid, tbs) - loadAssignment, err := s.makeUpstreamLoadAssignmentForPeerService(cfgSnap, clusterName, uid) + mgwMode := structs.MeshGatewayModeDefault + if upstream != nil { + mgwMode = upstream.MeshGateway.Mode + } + loadAssignment, err := s.makeUpstreamLoadAssignmentForPeerService(cfgSnap, clusterName, uid, mgwMode) if err != nil { return nil, err } @@ -538,7 +542,12 @@ func makePipeEndpoint(path string) *envoy_endpoint_v3.LbEndpoint { } } -func (s *ResourceGenerator) makeUpstreamLoadAssignmentForPeerService(cfgSnap *proxycfg.ConfigSnapshot, clusterName string, uid proxycfg.UpstreamID) (*envoy_endpoint_v3.ClusterLoadAssignment, error) { +func (s *ResourceGenerator) makeUpstreamLoadAssignmentForPeerService( + cfgSnap *proxycfg.ConfigSnapshot, + clusterName string, + uid proxycfg.UpstreamID, + upstreamGatewayMode structs.MeshGatewayMode, +) (*envoy_endpoint_v3.ClusterLoadAssignment, error) { var la *envoy_endpoint_v3.ClusterLoadAssignment upstreamsSnapshot, err := cfgSnap.ToConfigSnapshotUpstreams() @@ -546,11 +555,9 @@ func (s *ResourceGenerator) makeUpstreamLoadAssignmentForPeerService(cfgSnap *pr return la, err } - upstream := cfgSnap.ConnectProxy.UpstreamConfig[uid] - // If an upstream is configured with local mesh gw mode, we make a load assignment // from the gateway endpoints instead of those of the upstreams. - if upstream != nil && upstream.MeshGateway.Mode == structs.MeshGatewayModeLocal { + if upstreamGatewayMode == structs.MeshGatewayModeLocal { localGw, ok := cfgSnap.ConnectProxy.WatchedLocalGWEndpoints.Get(cfgSnap.Locality.String()) if !ok { // local GW is not ready; return early @@ -643,6 +650,11 @@ func (s *ResourceGenerator) endpointsFromDiscoveryChain( } } + mgwMode := structs.MeshGatewayModeDefault + if upstream := cfgSnap.ConnectProxy.UpstreamConfig[uid]; upstream != nil { + mgwMode = upstream.MeshGateway.Mode + } + // Find all resolver nodes. for _, node := range chain.Nodes { if node.Type != structs.DiscoveryGraphNodeTypeResolver { @@ -682,7 +694,7 @@ func (s *ResourceGenerator) endpointsFromDiscoveryChain( s.Logger.Debug("generating endpoints for", "cluster", targetOpt.clusterName) targetUID := proxycfg.NewUpstreamIDFromTargetID(targetOpt.targetID) if targetUID.Peer != "" { - loadAssignment, err := s.makeUpstreamLoadAssignmentForPeerService(cfgSnap, targetOpt.clusterName, targetUID) + loadAssignment, err := s.makeUpstreamLoadAssignmentForPeerService(cfgSnap, targetOpt.clusterName, targetUID, mgwMode) if err != nil { return nil, err }