From 2af14c00847871093da2010ef55365e6919249bd Mon Sep 17 00:00:00 2001 From: Derek Menteer <105233703+hashi-derek@users.noreply.github.com> Date: Tue, 3 Jan 2023 10:44:08 -0600 Subject: [PATCH] Fix issue with incorrect proxycfg watch on upstream peer-targets. (#15865) This fixes an issue where the incorrect partition was given to the upstream target watch, which meant that failover logic would not work correctly. --- .changelog/15865.txt | 3 +++ agent/proxycfg/snapshot.go | 8 +++---- agent/proxycfg/upstreams.go | 48 ++++++++++++++++++++++++++----------- agent/xds/clusters.go | 31 +++++++++++++----------- agent/xds/listeners.go | 5 +++- 5 files changed, 62 insertions(+), 33 deletions(-) create mode 100644 .changelog/15865.txt diff --git a/.changelog/15865.txt b/.changelog/15865.txt new file mode 100644 index 000000000..6e34813a8 --- /dev/null +++ b/.changelog/15865.txt @@ -0,0 +1,3 @@ +```release-note:bug +connect: Fix issue where watches on upstream failover peer targets did not always query the correct data. +``` diff --git a/agent/proxycfg/snapshot.go b/agent/proxycfg/snapshot.go index be9fb251e..57be8bf7f 100644 --- a/agent/proxycfg/snapshot.go +++ b/agent/proxycfg/snapshot.go @@ -873,10 +873,10 @@ func (s *ConfigSnapshot) ToConfigSnapshotUpstreams() (*ConfigSnapshotUpstreams, } } -func (u *ConfigSnapshotUpstreams) UpstreamPeerMeta(uid UpstreamID) structs.PeeringServiceMeta { +func (u *ConfigSnapshotUpstreams) UpstreamPeerMeta(uid UpstreamID) (structs.PeeringServiceMeta, bool) { nodes, _ := u.PeerUpstreamEndpoints.Get(uid) if len(nodes) == 0 { - return structs.PeeringServiceMeta{} + return structs.PeeringServiceMeta{}, false } // In agent/rpc/peering/subscription_manager.go we denormalize the @@ -892,9 +892,9 @@ func (u *ConfigSnapshotUpstreams) UpstreamPeerMeta(uid UpstreamID) structs.Peeri // catalog to avoid this weird construction. csn := nodes[0] if csn.Service == nil { - return structs.PeeringServiceMeta{} + return structs.PeeringServiceMeta{}, false } - return *csn.Service.Connect.PeerMeta + return *csn.Service.Connect.PeerMeta, true } // PeeredUpstreamIDs returns a slice of peered UpstreamIDs from explicit config entries diff --git a/agent/proxycfg/upstreams.go b/agent/proxycfg/upstreams.go index 31a51c661..71131117e 100644 --- a/agent/proxycfg/upstreams.go +++ b/agent/proxycfg/upstreams.go @@ -98,7 +98,6 @@ func (s *handlerUpstreams) handleUpdateUpstreams(ctx context.Context, u UpdateEv return fmt.Errorf("invalid type for response: %T", u.Result) } uidString := strings.TrimPrefix(u.CorrelationID, upstreamPeerWatchIDPrefix) - uid := UpstreamIDFromString(uidString) s.setPeerEndpoints(upstreamsSnapshot, uid, resp.Nodes) @@ -306,15 +305,9 @@ func (s *handlerUpstreams) resetWatchesFromChain( watchedChainEndpoints = true } - opts := targetWatchOpts{ - upstreamID: uid, - chainID: target.ID, - service: target.Service, - filter: target.Subset.Filter, - datacenter: target.Datacenter, - peer: target.Peer, - entMeta: target.GetEnterpriseMetadata(), - } + opts := targetWatchOpts{upstreamID: uid} + opts.fromChainTarget(chain, target) + err := s.watchUpstreamTarget(ctx, snap, opts) if err != nil { return fmt.Errorf("failed to watch target %q for upstream %q", target.ID, uid) @@ -431,6 +424,32 @@ type targetWatchOpts struct { entMeta *acl.EnterpriseMeta } +func (o *targetWatchOpts) fromChainTarget(c *structs.CompiledDiscoveryChain, t *structs.DiscoveryTarget) { + o.chainID = t.ID + o.service = t.Service + o.filter = t.Subset.Filter + o.datacenter = t.Datacenter + o.peer = t.Peer + o.entMeta = t.GetEnterpriseMetadata() + + // The peer-targets in a discovery chain intentionally clear out + // the partition field, since we don't know the remote service's partition. + // Therefore, we must query with the chain's local partition / DC, or else + // the services will not be found. + // + // Note that the namespace is not swapped out, because it should + // always match the value in the remote datacenter (and shouldn't + // have been changed anywhere). + if o.peer != "" { + o.datacenter = "" + // Clone the enterprise meta so it's not modified when we swap the partition. + var em acl.EnterpriseMeta + em.Merge(o.entMeta) + em.OverridePartition(c.Partition) + o.entMeta = &em + } +} + func (s *handlerUpstreams) watchUpstreamTarget(ctx context.Context, snap *ConfigSnapshotUpstreams, opts targetWatchOpts) error { s.logger.Trace("initializing watch of target", "upstream", opts.upstreamID, @@ -438,9 +457,6 @@ func (s *handlerUpstreams) watchUpstreamTarget(ctx context.Context, snap *Config "target", opts.chainID, ) - var finalMeta acl.EnterpriseMeta - finalMeta.Merge(opts.entMeta) - uid := opts.upstreamID correlationID := "upstream-target:" + opts.chainID + ":" + uid.String() @@ -449,6 +465,10 @@ func (s *handlerUpstreams) watchUpstreamTarget(ctx context.Context, snap *Config correlationID = upstreamPeerWatchIDPrefix + uid.String() } + // Perform this merge so that a nil EntMeta isn't possible. + var entMeta acl.EnterpriseMeta + entMeta.Merge(opts.entMeta) + ctx, cancel := context.WithCancel(ctx) err := s.dataSources.Health.Notify(ctx, &structs.ServiceSpecificRequest{ PeerName: opts.peer, @@ -463,7 +483,7 @@ func (s *handlerUpstreams) watchUpstreamTarget(ctx context.Context, snap *Config // the default and makes metrics and other things much cleaner. It's // simpler for us if we have the type to make things unambiguous. Source: *s.source, - EnterpriseMeta: finalMeta, + EnterpriseMeta: entMeta, }, correlationID, s.ch) if err != nil { diff --git a/agent/xds/clusters.go b/agent/xds/clusters.go index a75ef6129..15dea732c 100644 --- a/agent/xds/clusters.go +++ b/agent/xds/clusters.go @@ -3,7 +3,6 @@ package xds import ( "errors" "fmt" - "sort" "strings" "time" @@ -129,7 +128,10 @@ func (s *ResourceGenerator) clustersFromSnapshotConnectProxy(cfgSnap *proxycfg.C continue } - peerMeta := cfgSnap.ConnectProxy.UpstreamPeerMeta(uid) + peerMeta, found := cfgSnap.ConnectProxy.UpstreamPeerMeta(uid) + if !found { + s.Logger.Warn("failed to fetch upstream peering metadata for cluster", "uid", uid) + } cfg := s.getAndModifyUpstreamConfigForPeeredListener(uid, upstream, peerMeta) upstreamCluster, err := s.makeUpstreamClusterForPeerService(uid, cfg, peerMeta, cfgSnap) @@ -1296,18 +1298,13 @@ func (s *ResourceGenerator) makeUpstreamClustersForDiscoveryChain( for _, targetData := range targetClustersData { target := chain.Targets[targetData.targetID] sni := target.SNI - var additionalSpiffeIDs []string - targetSpiffeID := connect.SpiffeIDService{ - Host: cfgSnap.Roots.TrustDomain, - Namespace: target.Namespace, - Partition: target.Partition, - Datacenter: target.Datacenter, - Service: target.Service, - }.URI().String() targetUID := proxycfg.NewUpstreamIDFromTargetID(targetData.targetID) if targetUID.Peer != "" { - peerMeta := upstreamsSnapshot.UpstreamPeerMeta(targetUID) + peerMeta, found := upstreamsSnapshot.UpstreamPeerMeta(targetUID) + if !found { + s.Logger.Warn("failed to fetch upstream peering metadata for cluster", "target", targetUID) + } upstreamCluster, err := s.makeUpstreamClusterForPeerService(targetUID, upstreamConfig, peerMeta, cfgSnap) if err != nil { continue @@ -1318,6 +1315,14 @@ func (s *ResourceGenerator) makeUpstreamClustersForDiscoveryChain( continue } + targetSpiffeID := connect.SpiffeIDService{ + Host: cfgSnap.Roots.TrustDomain, + Namespace: target.Namespace, + Partition: target.Partition, + Datacenter: target.Datacenter, + Service: target.Service, + }.URI().String() + s.Logger.Debug("generating cluster for", "cluster", targetData.clusterName) c := &envoy_cluster_v3.Cluster{ Name: targetData.clusterName, @@ -1371,9 +1376,7 @@ func (s *ResourceGenerator) makeUpstreamClustersForDiscoveryChain( makeTLSParametersFromProxyTLSConfig(cfgSnap.MeshConfigTLSOutgoing()), ) - spiffeIDs := append([]string{targetSpiffeID}, additionalSpiffeIDs...) - sort.Strings(spiffeIDs) - err = injectSANMatcher(commonTLSContext, spiffeIDs...) + err = injectSANMatcher(commonTLSContext, targetSpiffeID) if err != nil { return nil, fmt.Errorf("failed to inject SAN matcher rules for cluster %q: %v", sni, err) } diff --git a/agent/xds/listeners.go b/agent/xds/listeners.go index 784dc656d..607c3814b 100644 --- a/agent/xds/listeners.go +++ b/agent/xds/listeners.go @@ -363,7 +363,10 @@ func (s *ResourceGenerator) listenersFromSnapshotConnectProxy(cfgSnap *proxycfg. continue } - peerMeta := cfgSnap.ConnectProxy.UpstreamPeerMeta(uid) + peerMeta, found := cfgSnap.ConnectProxy.UpstreamPeerMeta(uid) + if !found { + s.Logger.Warn("failed to fetch upstream peering metadata for listener", "uid", uid) + } cfg := s.getAndModifyUpstreamConfigForPeeredListener(uid, upstreamCfg, peerMeta) // If escape hatch is present, create a listener from it and move on to the next