Use new maps for proxycfg peered data
This commit is contained in:
parent
34c0093d44
commit
5d890cdbb2
|
@ -4113,6 +4113,8 @@ func (a *Agent) registerCache() {
|
|||
|
||||
a.cache.RegisterType(cachetype.TrustBundleListName, &cachetype.TrustBundles{Client: a.rpcClientPeering})
|
||||
|
||||
a.cache.RegisterType(cachetype.PeeredUpstreamsName, &cachetype.PeeredUpstreams{RPC: a})
|
||||
|
||||
a.registerEntCache()
|
||||
}
|
||||
|
||||
|
|
|
@ -6,6 +6,7 @@ import (
|
|||
"strings"
|
||||
|
||||
cachetype "github.com/hashicorp/consul/agent/cache-types"
|
||||
"github.com/hashicorp/consul/agent/proxycfg/internal/watch"
|
||||
"github.com/hashicorp/consul/agent/structs"
|
||||
"github.com/hashicorp/consul/proto/pbpeering"
|
||||
)
|
||||
|
@ -22,8 +23,7 @@ func (s *handlerConnectProxy) initialize(ctx context.Context) (ConfigSnapshot, e
|
|||
snap.ConnectProxy.WatchedDiscoveryChains = make(map[UpstreamID]context.CancelFunc)
|
||||
snap.ConnectProxy.WatchedUpstreams = make(map[UpstreamID]map[string]context.CancelFunc)
|
||||
snap.ConnectProxy.WatchedUpstreamEndpoints = make(map[UpstreamID]map[string]structs.CheckServiceNodes)
|
||||
snap.ConnectProxy.WatchedUpstreamPeerTrustBundles = make(map[string]context.CancelFunc)
|
||||
snap.ConnectProxy.UpstreamPeerTrustBundles = make(map[string]*pbpeering.PeeringTrustBundle)
|
||||
snap.ConnectProxy.UpstreamPeerTrustBundles = watch.NewMap[string, *pbpeering.PeeringTrustBundle]()
|
||||
snap.ConnectProxy.WatchedGateways = make(map[UpstreamID]map[string]context.CancelFunc)
|
||||
snap.ConnectProxy.WatchedGatewayEndpoints = make(map[UpstreamID]map[string]structs.CheckServiceNodes)
|
||||
snap.ConnectProxy.WatchedServiceChecks = make(map[structs.ServiceID][]structs.CheckType)
|
||||
|
@ -31,7 +31,7 @@ func (s *handlerConnectProxy) initialize(ctx context.Context) (ConfigSnapshot, e
|
|||
snap.ConnectProxy.UpstreamConfig = make(map[UpstreamID]*structs.Upstream)
|
||||
snap.ConnectProxy.PassthroughUpstreams = make(map[UpstreamID]map[string]map[string]struct{})
|
||||
snap.ConnectProxy.PassthroughIndices = make(map[string]indexedTarget)
|
||||
snap.ConnectProxy.PeerUpstreamEndpoints = make(map[UpstreamID]structs.CheckServiceNodes)
|
||||
snap.ConnectProxy.PeerUpstreamEndpoints = watch.NewMap[UpstreamID, structs.CheckServiceNodes]()
|
||||
snap.ConnectProxy.PeerUpstreamEndpointsUseHostnames = make(map[UpstreamID]struct{})
|
||||
|
||||
// Watch for root changes
|
||||
|
@ -108,6 +108,14 @@ func (s *handlerConnectProxy) initialize(ctx context.Context) (ConfigSnapshot, e
|
|||
if err != nil {
|
||||
return snap, err
|
||||
}
|
||||
err = s.dataSources.PeeredUpstreams.Notify(ctx, &structs.PartitionSpecificRequest{
|
||||
QueryOptions: structs.QueryOptions{Token: s.token},
|
||||
Datacenter: s.source.Datacenter,
|
||||
EnterpriseMeta: s.proxyID.EnterpriseMeta,
|
||||
}, peeredUpstreamsID, s.ch)
|
||||
if err != nil {
|
||||
return snap, err
|
||||
}
|
||||
}
|
||||
|
||||
// Watch for updates to service endpoints for all upstreams
|
||||
|
@ -134,7 +142,8 @@ func (s *handlerConnectProxy) initialize(ctx context.Context) (ConfigSnapshot, e
|
|||
dc = u.Datacenter
|
||||
}
|
||||
if s.proxyCfg.Mode == structs.ProxyModeTransparent && (dc == "" || dc == s.source.Datacenter) {
|
||||
// In transparent proxy mode, watches for upstreams in the local DC are handled by the IntentionUpstreams watch.
|
||||
// In transparent proxy mode, watches for upstreams in the local DC
|
||||
// are handled by the IntentionUpstreams and PeeredUpstreams watch.
|
||||
continue
|
||||
}
|
||||
|
||||
|
@ -183,8 +192,7 @@ func (s *handlerConnectProxy) initialize(ctx context.Context) (ConfigSnapshot, e
|
|||
|
||||
s.logger.Trace("initializing watch of peered upstream", "upstream", uid)
|
||||
|
||||
// TODO(peering): We'll need to track a CancelFunc for this
|
||||
// once the tproxy support lands.
|
||||
snap.ConnectProxy.PeerUpstreamEndpoints.InitWatch(uid, nil)
|
||||
err := s.dataSources.Health.Notify(ctx, &structs.ServiceSpecificRequest{
|
||||
PeerName: uid.Peer,
|
||||
Datacenter: dc,
|
||||
|
@ -204,7 +212,7 @@ func (s *handlerConnectProxy) initialize(ctx context.Context) (ConfigSnapshot, e
|
|||
}
|
||||
|
||||
// Check whether a watch for this peer exists to avoid duplicates.
|
||||
if _, ok := snap.ConnectProxy.WatchedUpstreamPeerTrustBundles[uid.Peer]; !ok {
|
||||
if _, ok := snap.ConnectProxy.UpstreamPeerTrustBundles.Get(uid.Peer); !ok {
|
||||
peerCtx, cancel := context.WithCancel(ctx)
|
||||
if err := s.dataSources.TrustBundle.Notify(peerCtx, &pbpeering.TrustBundleReadRequest{
|
||||
Name: uid.Peer,
|
||||
|
@ -214,7 +222,7 @@ func (s *handlerConnectProxy) initialize(ctx context.Context) (ConfigSnapshot, e
|
|||
return snap, fmt.Errorf("error while watching trust bundle for peer %q: %w", uid.Peer, err)
|
||||
}
|
||||
|
||||
snap.ConnectProxy.WatchedUpstreamPeerTrustBundles[uid.Peer] = cancel
|
||||
snap.ConnectProxy.UpstreamPeerTrustBundles.InitWatch(uid.Peer, cancel)
|
||||
}
|
||||
continue
|
||||
}
|
||||
|
@ -262,7 +270,7 @@ func (s *handlerConnectProxy) handleUpdate(ctx context.Context, u UpdateEvent, s
|
|||
}
|
||||
peer := strings.TrimPrefix(u.CorrelationID, peerTrustBundleIDPrefix)
|
||||
if resp.Bundle != nil {
|
||||
snap.ConnectProxy.UpstreamPeerTrustBundles[peer] = resp.Bundle
|
||||
snap.ConnectProxy.UpstreamPeerTrustBundles.Set(peer, resp.Bundle)
|
||||
}
|
||||
|
||||
case u.CorrelationID == peeringTrustBundlesWatchID:
|
||||
|
@ -283,6 +291,90 @@ func (s *handlerConnectProxy) handleUpdate(ctx context.Context, u UpdateEvent, s
|
|||
snap.ConnectProxy.Intentions = resp
|
||||
snap.ConnectProxy.IntentionsSet = true
|
||||
|
||||
case u.CorrelationID == peeredUpstreamsID:
|
||||
resp, ok := u.Result.(*structs.IndexedPeeredServiceList)
|
||||
if !ok {
|
||||
return fmt.Errorf("invalid type for response %T", u.Result)
|
||||
}
|
||||
|
||||
seenUpstreams := make(map[UpstreamID]struct{})
|
||||
for _, psn := range resp.Services {
|
||||
uid := NewUpstreamIDFromPeeredServiceName(psn)
|
||||
|
||||
if _, ok := seenUpstreams[uid]; ok {
|
||||
continue
|
||||
}
|
||||
seenUpstreams[uid] = struct{}{}
|
||||
|
||||
s.logger.Trace("initializing watch of peered upstream", "upstream", uid)
|
||||
|
||||
hctx, hcancel := context.WithCancel(ctx)
|
||||
err := s.dataSources.Health.Notify(hctx, &structs.ServiceSpecificRequest{
|
||||
PeerName: uid.Peer,
|
||||
Datacenter: s.source.Datacenter,
|
||||
QueryOptions: structs.QueryOptions{
|
||||
Token: s.token,
|
||||
},
|
||||
ServiceName: psn.ServiceName.Name,
|
||||
Connect: true,
|
||||
// Note that Identifier doesn't type-prefix for service any more as it's
|
||||
// the default and makes metrics and other things much cleaner. It's
|
||||
// simpler for us if we have the type to make things unambiguous.
|
||||
Source: *s.source,
|
||||
EnterpriseMeta: uid.EnterpriseMeta,
|
||||
}, upstreamPeerWatchIDPrefix+uid.String(), s.ch)
|
||||
if err != nil {
|
||||
hcancel()
|
||||
return fmt.Errorf("failed to watch health for %s: %v", uid, err)
|
||||
}
|
||||
snap.ConnectProxy.PeerUpstreamEndpoints.InitWatch(uid, hcancel)
|
||||
|
||||
// Check whether a watch for this peer exists to avoid duplicates.
|
||||
if _, ok := snap.ConnectProxy.UpstreamPeerTrustBundles.Get(uid.Peer); !ok {
|
||||
peerCtx, cancel := context.WithCancel(ctx)
|
||||
if err := s.dataSources.TrustBundle.Notify(peerCtx, &pbpeering.TrustBundleReadRequest{
|
||||
Name: uid.Peer,
|
||||
Partition: uid.PartitionOrDefault(),
|
||||
}, peerTrustBundleIDPrefix+uid.Peer, s.ch); err != nil {
|
||||
cancel()
|
||||
return fmt.Errorf("error while watching trust bundle for peer %q: %w", uid.Peer, err)
|
||||
}
|
||||
|
||||
snap.ConnectProxy.UpstreamPeerTrustBundles.InitWatch(uid.Peer, cancel)
|
||||
}
|
||||
}
|
||||
snap.ConnectProxy.PeeredUpstreams = seenUpstreams
|
||||
|
||||
//
|
||||
// Clean up data
|
||||
//
|
||||
|
||||
validPeerNames := make(map[string]struct{})
|
||||
|
||||
// Iterate through all known endpoints and remove references to upstream IDs that weren't in the update
|
||||
snap.ConnectProxy.PeerUpstreamEndpoints.ForEachKey(func(uid UpstreamID) bool {
|
||||
// Peered upstream is explicitly defined in upstream config
|
||||
if _, ok := snap.ConnectProxy.UpstreamConfig[uid]; ok {
|
||||
validPeerNames[uid.Peer] = struct{}{}
|
||||
return true
|
||||
}
|
||||
// Peered upstream came from dynamic source of imported services
|
||||
if _, ok := seenUpstreams[uid]; ok {
|
||||
validPeerNames[uid.Peer] = struct{}{}
|
||||
return true
|
||||
}
|
||||
snap.ConnectProxy.PeerUpstreamEndpoints.CancelWatch(uid)
|
||||
return true
|
||||
})
|
||||
|
||||
// Iterate through all known trust bundles and remove references to any unseen peer names
|
||||
snap.ConnectProxy.UpstreamPeerTrustBundles.ForEachKey(func(peerName PeerName) bool {
|
||||
if _, ok := validPeerNames[peerName]; !ok {
|
||||
snap.ConnectProxy.UpstreamPeerTrustBundles.CancelWatch(peerName)
|
||||
}
|
||||
return true
|
||||
})
|
||||
|
||||
case u.CorrelationID == intentionUpstreamsID:
|
||||
resp, ok := u.Result.(*structs.IndexedServiceList)
|
||||
if !ok {
|
||||
|
|
|
@ -69,6 +69,8 @@ type DataSources struct {
|
|||
// notification channel.
|
||||
LeafCertificate LeafCertificate
|
||||
|
||||
// PeeredUpstreams provides imported-service upstream updates on a
|
||||
// notification channel.
|
||||
PeeredUpstreams PeeredUpstreams
|
||||
|
||||
// PreparedQuery provides updates about the results of a prepared query.
|
||||
|
|
|
@ -11,6 +11,7 @@ import (
|
|||
cachetype "github.com/hashicorp/consul/agent/cache-types"
|
||||
"github.com/hashicorp/consul/agent/connect"
|
||||
"github.com/hashicorp/consul/agent/consul/discoverychain"
|
||||
"github.com/hashicorp/consul/agent/proxycfg/internal/watch"
|
||||
"github.com/hashicorp/consul/agent/structs"
|
||||
"github.com/hashicorp/consul/api"
|
||||
"github.com/hashicorp/consul/proto/pbpeering"
|
||||
|
@ -230,8 +231,8 @@ func TestManager_BasicLifecycle(t *testing.T) {
|
|||
},
|
||||
PassthroughUpstreams: map[UpstreamID]map[string]map[string]struct{}{},
|
||||
PassthroughIndices: map[string]indexedTarget{},
|
||||
UpstreamPeerTrustBundles: map[string]*pbpeering.PeeringTrustBundle{},
|
||||
PeerUpstreamEndpoints: map[UpstreamID]structs.CheckServiceNodes{},
|
||||
UpstreamPeerTrustBundles: watch.NewMap[PeerName, *pbpeering.PeeringTrustBundle](),
|
||||
PeerUpstreamEndpoints: watch.NewMap[UpstreamID, structs.CheckServiceNodes](),
|
||||
PeerUpstreamEndpointsUseHostnames: map[UpstreamID]struct{}{},
|
||||
},
|
||||
PreparedQueryEndpoints: map[UpstreamID]structs.CheckServiceNodes{},
|
||||
|
@ -291,8 +292,8 @@ func TestManager_BasicLifecycle(t *testing.T) {
|
|||
},
|
||||
PassthroughUpstreams: map[UpstreamID]map[string]map[string]struct{}{},
|
||||
PassthroughIndices: map[string]indexedTarget{},
|
||||
UpstreamPeerTrustBundles: map[string]*pbpeering.PeeringTrustBundle{},
|
||||
PeerUpstreamEndpoints: map[UpstreamID]structs.CheckServiceNodes{},
|
||||
UpstreamPeerTrustBundles: watch.NewMap[PeerName, *pbpeering.PeeringTrustBundle](),
|
||||
PeerUpstreamEndpoints: watch.NewMap[UpstreamID, structs.CheckServiceNodes](),
|
||||
PeerUpstreamEndpointsUseHostnames: map[UpstreamID]struct{}{},
|
||||
},
|
||||
PreparedQueryEndpoints: map[UpstreamID]structs.CheckServiceNodes{},
|
||||
|
|
|
@ -7,6 +7,8 @@ import (
|
|||
"github.com/hashicorp/consul/agent/structs"
|
||||
)
|
||||
|
||||
type PeerName = string
|
||||
|
||||
type UpstreamID struct {
|
||||
Type string
|
||||
Name string
|
||||
|
@ -32,7 +34,16 @@ func NewUpstreamID(u *structs.Upstream) UpstreamID {
|
|||
return id
|
||||
}
|
||||
|
||||
// TODO(peering): confirm we don't need peername here
|
||||
func NewUpstreamIDFromPeeredServiceName(psn structs.PeeredServiceName) UpstreamID {
|
||||
id := UpstreamID{
|
||||
Name: psn.ServiceName.Name,
|
||||
EnterpriseMeta: psn.ServiceName.EnterpriseMeta,
|
||||
Peer: psn.Peer,
|
||||
}
|
||||
id.normalize()
|
||||
return id
|
||||
}
|
||||
|
||||
func NewUpstreamIDFromServiceName(sn structs.ServiceName) UpstreamID {
|
||||
id := UpstreamID{
|
||||
Name: sn.Name,
|
||||
|
|
|
@ -9,6 +9,7 @@ import (
|
|||
"github.com/mitchellh/copystructure"
|
||||
|
||||
"github.com/hashicorp/consul/acl"
|
||||
"github.com/hashicorp/consul/agent/proxycfg/internal/watch"
|
||||
"github.com/hashicorp/consul/agent/structs"
|
||||
"github.com/hashicorp/consul/lib"
|
||||
"github.com/hashicorp/consul/proto/pbpeering"
|
||||
|
@ -44,13 +45,9 @@ type ConfigSnapshotUpstreams struct {
|
|||
// endpoints of an upstream.
|
||||
WatchedUpstreamEndpoints map[UpstreamID]map[string]structs.CheckServiceNodes
|
||||
|
||||
// WatchedUpstreamPeerTrustBundles is a map of (PeerName -> CancelFunc) in order to cancel
|
||||
// watches for peer trust bundles any time the list of upstream peers changes.
|
||||
WatchedUpstreamPeerTrustBundles map[string]context.CancelFunc
|
||||
|
||||
// UpstreamPeerTrustBundles is a map of (PeerName -> PeeringTrustBundle).
|
||||
// It is used to store trust bundles for upstream TLS transport sockets.
|
||||
UpstreamPeerTrustBundles map[string]*pbpeering.PeeringTrustBundle
|
||||
UpstreamPeerTrustBundles watch.Map[PeerName, *pbpeering.PeeringTrustBundle]
|
||||
|
||||
// WatchedGateways is a map of UpstreamID -> (map of GatewayKey.String() ->
|
||||
// CancelFunc) in order to cancel watches for mesh gateways
|
||||
|
@ -80,10 +77,16 @@ type ConfigSnapshotUpstreams struct {
|
|||
// This list only applies to proxies registered in 'transparent' mode.
|
||||
IntentionUpstreams map[UpstreamID]struct{}
|
||||
|
||||
// PeeredUpstreams is a set of all upstream targets in a local partition.
|
||||
//
|
||||
// This list only applies to proxies registered in 'transparent' mode.
|
||||
PeeredUpstreams map[UpstreamID]struct{}
|
||||
|
||||
// PeerUpstreamEndpoints is a map of UpstreamID -> (set of IP addresses)
|
||||
// and used to determine the backing endpoints of an upstream in another
|
||||
// peer.
|
||||
PeerUpstreamEndpoints map[UpstreamID]structs.CheckServiceNodes
|
||||
PeerUpstreamEndpoints watch.Map[UpstreamID, structs.CheckServiceNodes]
|
||||
|
||||
PeerUpstreamEndpointsUseHostnames map[UpstreamID]struct{}
|
||||
}
|
||||
|
||||
|
@ -152,8 +155,7 @@ func (c *configSnapshotConnectProxy) isEmpty() bool {
|
|||
len(c.WatchedDiscoveryChains) == 0 &&
|
||||
len(c.WatchedUpstreams) == 0 &&
|
||||
len(c.WatchedUpstreamEndpoints) == 0 &&
|
||||
len(c.WatchedUpstreamPeerTrustBundles) == 0 &&
|
||||
len(c.UpstreamPeerTrustBundles) == 0 &&
|
||||
c.UpstreamPeerTrustBundles.Len() == 0 &&
|
||||
len(c.WatchedGateways) == 0 &&
|
||||
len(c.WatchedGatewayEndpoints) == 0 &&
|
||||
len(c.WatchedServiceChecks) == 0 &&
|
||||
|
@ -161,9 +163,10 @@ func (c *configSnapshotConnectProxy) isEmpty() bool {
|
|||
len(c.UpstreamConfig) == 0 &&
|
||||
len(c.PassthroughUpstreams) == 0 &&
|
||||
len(c.IntentionUpstreams) == 0 &&
|
||||
len(c.PeeredUpstreams) == 0 &&
|
||||
!c.InboundPeerTrustBundlesSet &&
|
||||
!c.MeshConfigSet &&
|
||||
len(c.PeerUpstreamEndpoints) == 0 &&
|
||||
c.PeerUpstreamEndpoints.Len() == 0 &&
|
||||
len(c.PeerUpstreamEndpointsUseHostnames) == 0
|
||||
}
|
||||
|
||||
|
@ -715,7 +718,6 @@ func (s *ConfigSnapshot) Clone() (*ConfigSnapshot, error) {
|
|||
snap.ConnectProxy.WatchedUpstreams = nil
|
||||
snap.ConnectProxy.WatchedGateways = nil
|
||||
snap.ConnectProxy.WatchedDiscoveryChains = nil
|
||||
snap.ConnectProxy.WatchedUpstreamPeerTrustBundles = nil
|
||||
case structs.ServiceKindTerminatingGateway:
|
||||
snap.TerminatingGateway.WatchedServices = nil
|
||||
snap.TerminatingGateway.WatchedIntentions = nil
|
||||
|
@ -730,7 +732,6 @@ func (s *ConfigSnapshot) Clone() (*ConfigSnapshot, error) {
|
|||
snap.IngressGateway.WatchedUpstreams = nil
|
||||
snap.IngressGateway.WatchedGateways = nil
|
||||
snap.IngressGateway.WatchedDiscoveryChains = nil
|
||||
snap.IngressGateway.WatchedUpstreamPeerTrustBundles = nil
|
||||
// only ingress-gateway
|
||||
snap.IngressGateway.LeafCertWatchCancel = nil
|
||||
}
|
||||
|
@ -803,7 +804,7 @@ func (s *ConfigSnapshot) MeshConfigTLSOutgoing() *structs.MeshDirectionalTLSConf
|
|||
}
|
||||
|
||||
func (u *ConfigSnapshotUpstreams) UpstreamPeerMeta(uid UpstreamID) structs.PeeringServiceMeta {
|
||||
nodes := u.PeerUpstreamEndpoints[uid]
|
||||
nodes, _ := u.PeerUpstreamEndpoints.Get(uid)
|
||||
if len(nodes) == 0 {
|
||||
return structs.PeeringServiceMeta{}
|
||||
}
|
||||
|
@ -833,7 +834,7 @@ func (u *ConfigSnapshotUpstreams) PeeredUpstreamIDs() []UpstreamID {
|
|||
continue
|
||||
}
|
||||
|
||||
if _, ok := u.UpstreamPeerTrustBundles[uid.Peer]; uid.Peer != "" && !ok {
|
||||
if _, ok := u.UpstreamPeerTrustBundles.Get(uid.Peer); !ok {
|
||||
// The trust bundle for this upstream is not available yet, skip for now.
|
||||
continue
|
||||
}
|
||||
|
|
|
@ -36,6 +36,7 @@ const (
|
|||
serviceResolverIDPrefix = "service-resolver:"
|
||||
serviceIntentionsIDPrefix = "service-intentions:"
|
||||
intentionUpstreamsID = "intention-upstreams"
|
||||
peeredUpstreamsID = "peered-upstreams"
|
||||
upstreamPeerWatchIDPrefix = "upstream-peer:"
|
||||
exportedServiceListWatchID = "exported-service-list"
|
||||
meshConfigEntryID = "mesh"
|
||||
|
|
|
@ -131,6 +131,7 @@ func recordWatches(sc *stateConfig) *watchRecorder {
|
|||
IntentionUpstreams: typedWatchRecorder[*structs.ServiceSpecificRequest]{wr},
|
||||
InternalServiceDump: typedWatchRecorder[*structs.ServiceDumpRequest]{wr},
|
||||
LeafCertificate: typedWatchRecorder[*cachetype.ConnectCALeafRequest]{wr},
|
||||
PeeredUpstreams: typedWatchRecorder[*structs.PartitionSpecificRequest]{wr},
|
||||
PreparedQuery: typedWatchRecorder[*structs.PreparedQueryExecuteRequest]{wr},
|
||||
ResolvedServiceConfig: typedWatchRecorder[*structs.ServiceConfigRequest]{wr},
|
||||
ServiceList: typedWatchRecorder[*structs.DCSpecificRequest]{wr},
|
||||
|
@ -321,6 +322,15 @@ func genVerifyServiceSpecificPeeredRequest(expectedService, expectedFilter, expe
|
|||
}
|
||||
}
|
||||
|
||||
func genVerifyPartitionSpecificRequest(expectedPartition, expectedDatacenter string) verifyWatchRequest {
|
||||
return func(t testing.TB, request any) {
|
||||
reqReal, ok := request.(*structs.PartitionSpecificRequest)
|
||||
require.True(t, ok)
|
||||
require.Equal(t, expectedDatacenter, reqReal.Datacenter)
|
||||
require.Equal(t, expectedPartition, reqReal.PartitionOrDefault())
|
||||
}
|
||||
}
|
||||
|
||||
func genVerifyGatewayServiceWatch(expectedService, expectedDatacenter string) verifyWatchRequest {
|
||||
return genVerifyServiceSpecificRequest(expectedService, "", expectedDatacenter, false)
|
||||
}
|
||||
|
@ -404,9 +414,11 @@ func TestState_WatchesAndUpdates(t *testing.T) {
|
|||
dbUID = NewUpstreamIDFromServiceName(db)
|
||||
pqUID = UpstreamIDFromString("prepared_query:query")
|
||||
extApiUID = NewUpstreamIDFromServiceName(apiA)
|
||||
extDBUID = NewUpstreamIDFromServiceName(db)
|
||||
)
|
||||
// TODO(peering): NewUpstreamIDFromServiceName should take a PeerName
|
||||
extApiUID.Peer = "peer-a"
|
||||
extDBUID.Peer = "peer-a"
|
||||
|
||||
const peerTrustDomain = "1c053652-8512-4373-90cf-5a7f6263a994.consul"
|
||||
|
||||
|
@ -2509,6 +2521,253 @@ func TestState_WatchesAndUpdates(t *testing.T) {
|
|||
},
|
||||
},
|
||||
},
|
||||
"transparent-proxy-initial-with-peers": {
|
||||
ns: structs.NodeService{
|
||||
Kind: structs.ServiceKindConnectProxy,
|
||||
ID: "api-proxy",
|
||||
Service: "api-proxy",
|
||||
Address: "10.0.1.1",
|
||||
Proxy: structs.ConnectProxyConfig{
|
||||
DestinationServiceName: "api",
|
||||
Mode: structs.ProxyModeTransparent,
|
||||
Upstreams: structs.Upstreams{
|
||||
{
|
||||
DestinationName: "api-a",
|
||||
DestinationPeer: "peer-a",
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
sourceDC: "dc1",
|
||||
stages: []verificationStage{
|
||||
{
|
||||
requiredWatches: map[string]verifyWatchRequest{
|
||||
peeringTrustBundlesWatchID: genVerifyTrustBundleListWatch("api"),
|
||||
peeredUpstreamsID: genVerifyPartitionSpecificRequest(acl.DefaultEnterpriseMeta().PartitionOrDefault(), "dc1"),
|
||||
meshConfigEntryID: genVerifyMeshConfigWatch("dc1"),
|
||||
rootsWatchID: genVerifyDCSpecificWatch("dc1"),
|
||||
leafWatchID: genVerifyLeafWatch("api", "dc1"),
|
||||
},
|
||||
verifySnapshot: func(t testing.TB, snap *ConfigSnapshot) {
|
||||
require.False(t, snap.Valid(), "proxy without roots/leaf/intentions is not valid")
|
||||
require.True(t, snap.MeshGateway.isEmpty())
|
||||
require.True(t, snap.IngressGateway.isEmpty())
|
||||
require.True(t, snap.TerminatingGateway.isEmpty())
|
||||
|
||||
require.False(t, snap.ConnectProxy.isEmpty())
|
||||
|
||||
// This is explicitly defined from proxy config
|
||||
expectUpstreams := map[UpstreamID]*structs.Upstream{
|
||||
extApiUID: {
|
||||
DestinationName: "api-a",
|
||||
DestinationNamespace: structs.IntentionDefaultNamespace,
|
||||
DestinationPartition: structs.IntentionDefaultNamespace,
|
||||
DestinationPeer: "peer-a",
|
||||
},
|
||||
}
|
||||
require.Equal(t, expectUpstreams, snap.ConnectProxy.UpstreamConfig)
|
||||
},
|
||||
},
|
||||
{
|
||||
// Initial events
|
||||
events: []UpdateEvent{
|
||||
rootWatchEvent(),
|
||||
{
|
||||
CorrelationID: leafWatchID,
|
||||
Result: issuedCert,
|
||||
Err: nil,
|
||||
},
|
||||
{
|
||||
CorrelationID: intentionsWatchID,
|
||||
Result: TestIntentions(),
|
||||
Err: nil,
|
||||
},
|
||||
{
|
||||
CorrelationID: peeringTrustBundlesWatchID,
|
||||
Result: peerTrustBundles,
|
||||
},
|
||||
{
|
||||
CorrelationID: peeredUpstreamsID,
|
||||
Result: &structs.IndexedPeeredServiceList{
|
||||
Services: []structs.PeeredServiceName{
|
||||
{
|
||||
ServiceName: apiA,
|
||||
Peer: "peer-a",
|
||||
},
|
||||
{
|
||||
// This service is dynamic (not from static config)
|
||||
ServiceName: db,
|
||||
Peer: "peer-a",
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
{
|
||||
CorrelationID: meshConfigEntryID,
|
||||
Result: &structs.ConfigEntryResponse{
|
||||
Entry: nil, // no explicit config
|
||||
},
|
||||
Err: nil,
|
||||
},
|
||||
},
|
||||
verifySnapshot: func(t testing.TB, snap *ConfigSnapshot) {
|
||||
require.True(t, snap.Valid(), "proxy with roots/leaf/intentions is valid")
|
||||
require.Equal(t, indexedRoots, snap.Roots)
|
||||
require.Equal(t, issuedCert, snap.Leaf())
|
||||
require.True(t, snap.MeshGateway.isEmpty())
|
||||
require.True(t, snap.IngressGateway.isEmpty())
|
||||
require.True(t, snap.TerminatingGateway.isEmpty())
|
||||
require.True(t, snap.ConnectProxy.MeshConfigSet)
|
||||
require.Nil(t, snap.ConnectProxy.MeshConfig)
|
||||
|
||||
// Check PeeredUpstream is populated
|
||||
expect := map[UpstreamID]struct{}{
|
||||
extDBUID: {},
|
||||
extApiUID: {},
|
||||
}
|
||||
require.Equal(t, expect, snap.ConnectProxy.PeeredUpstreams)
|
||||
|
||||
require.True(t, snap.ConnectProxy.PeerUpstreamEndpoints.IsWatched(extApiUID))
|
||||
_, ok := snap.ConnectProxy.PeerUpstreamEndpoints.Get(extApiUID)
|
||||
require.False(t, ok, "expected initialized but empty PeerUpstreamEndpoint")
|
||||
|
||||
require.True(t, snap.ConnectProxy.PeerUpstreamEndpoints.IsWatched(extDBUID))
|
||||
_, ok = snap.ConnectProxy.PeerUpstreamEndpoints.Get(extDBUID)
|
||||
require.False(t, ok, "expected initialized but empty PeerUpstreamEndpoint")
|
||||
|
||||
require.True(t, snap.ConnectProxy.UpstreamPeerTrustBundles.IsWatched("peer-a"))
|
||||
_, ok = snap.ConnectProxy.UpstreamPeerTrustBundles.Get("peer-a")
|
||||
require.False(t, ok, "expected initialized but empty PeerTrustBundle")
|
||||
},
|
||||
},
|
||||
{
|
||||
// Peered upstream will have set up 3 more watches
|
||||
requiredWatches: map[string]verifyWatchRequest{
|
||||
upstreamPeerWatchIDPrefix + extApiUID.String(): genVerifyServiceSpecificPeeredRequest("api-a", "", "dc1", "peer-a", true),
|
||||
upstreamPeerWatchIDPrefix + extDBUID.String(): genVerifyServiceSpecificPeeredRequest("db", "", "dc1", "peer-a", true),
|
||||
peerTrustBundleIDPrefix + "peer-a": genVerifyTrustBundleReadWatch("peer-a"),
|
||||
},
|
||||
events: []UpdateEvent{
|
||||
{
|
||||
CorrelationID: peerTrustBundleIDPrefix + "peer-a",
|
||||
Result: &pbpeering.TrustBundleReadResponse{
|
||||
Bundle: peerTrustBundles.Bundles[0],
|
||||
},
|
||||
},
|
||||
},
|
||||
verifySnapshot: func(t testing.TB, snap *ConfigSnapshot) {
|
||||
require.True(t, snap.Valid(), "proxy with roots/leaf/intentions is valid")
|
||||
require.Equal(t, indexedRoots, snap.Roots)
|
||||
require.Equal(t, issuedCert, snap.Leaf())
|
||||
require.True(t, snap.MeshGateway.isEmpty())
|
||||
require.True(t, snap.IngressGateway.isEmpty())
|
||||
require.True(t, snap.TerminatingGateway.isEmpty())
|
||||
require.True(t, snap.ConnectProxy.MeshConfigSet)
|
||||
require.Nil(t, snap.ConnectProxy.MeshConfig)
|
||||
|
||||
// Check PeeredUpstream is populated
|
||||
expect := map[UpstreamID]struct{}{
|
||||
extDBUID: {},
|
||||
extApiUID: {},
|
||||
}
|
||||
require.Equal(t, expect, snap.ConnectProxy.PeeredUpstreams)
|
||||
|
||||
// Expect two entries (DB and api-a)
|
||||
require.Equal(t, 2, snap.ConnectProxy.PeerUpstreamEndpoints.Len())
|
||||
|
||||
// db does not have endpoints yet
|
||||
ep, _ := snap.ConnectProxy.PeerUpstreamEndpoints.Get(extDBUID)
|
||||
require.Nil(t, ep)
|
||||
|
||||
// Expect a trust bundle
|
||||
ptb, ok := snap.ConnectProxy.UpstreamPeerTrustBundles.Get("peer-a")
|
||||
require.True(t, ok)
|
||||
prototest.AssertDeepEqual(t, peerTrustBundles.Bundles[0], ptb)
|
||||
|
||||
// Sanity check that local upstream maps are not populated
|
||||
require.Empty(t, snap.ConnectProxy.WatchedUpstreamEndpoints[extDBUID])
|
||||
require.Empty(t, snap.ConnectProxy.PassthroughUpstreams[extDBUID])
|
||||
require.Empty(t, snap.ConnectProxy.PassthroughIndices)
|
||||
},
|
||||
},
|
||||
{
|
||||
// Add another instance of "api-a" service
|
||||
events: []UpdateEvent{
|
||||
{
|
||||
CorrelationID: upstreamPeerWatchIDPrefix + extDBUID.String(),
|
||||
Result: &structs.IndexedCheckServiceNodes{
|
||||
Nodes: structs.CheckServiceNodes{
|
||||
{
|
||||
Node: &structs.Node{
|
||||
Node: "node1",
|
||||
Address: "127.0.0.1",
|
||||
PeerName: "peer-a",
|
||||
},
|
||||
Service: &structs.NodeService{
|
||||
ID: "db",
|
||||
Service: "db",
|
||||
PeerName: "peer-a",
|
||||
Connect: structs.ServiceConnect{},
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
verifySnapshot: func(t testing.TB, snap *ConfigSnapshot) {
|
||||
require.True(t, snap.Valid(), "proxy with roots/leaf/intentions is valid")
|
||||
|
||||
// Check PeeredUpstream is populated
|
||||
expect := map[UpstreamID]struct{}{
|
||||
extApiUID: {},
|
||||
extDBUID: {},
|
||||
}
|
||||
require.Equal(t, expect, snap.ConnectProxy.PeeredUpstreams)
|
||||
|
||||
// Expect two entries (api-a, db)
|
||||
require.Equal(t, 2, snap.ConnectProxy.PeerUpstreamEndpoints.Len())
|
||||
|
||||
// db has an endpoint now
|
||||
ep, _ := snap.ConnectProxy.PeerUpstreamEndpoints.Get(extDBUID)
|
||||
require.NotNil(t, ep)
|
||||
require.Len(t, ep, 1)
|
||||
|
||||
// Expect a trust bundle
|
||||
ptb, ok := snap.ConnectProxy.UpstreamPeerTrustBundles.Get("peer-a")
|
||||
require.True(t, ok)
|
||||
prototest.AssertDeepEqual(t, peerTrustBundles.Bundles[0], ptb)
|
||||
|
||||
// Sanity check that local upstream maps are not populated
|
||||
require.Empty(t, snap.ConnectProxy.WatchedUpstreamEndpoints[extDBUID])
|
||||
require.Empty(t, snap.ConnectProxy.PassthroughUpstreams[extDBUID])
|
||||
require.Empty(t, snap.ConnectProxy.PassthroughIndices)
|
||||
},
|
||||
},
|
||||
{
|
||||
// Empty list of peered upstreams should clean up map keys
|
||||
events: []UpdateEvent{
|
||||
{
|
||||
CorrelationID: peeredUpstreamsID,
|
||||
Result: &structs.IndexedPeeredServiceList{
|
||||
Services: []structs.PeeredServiceName{},
|
||||
},
|
||||
},
|
||||
},
|
||||
verifySnapshot: func(t testing.TB, snap *ConfigSnapshot) {
|
||||
require.True(t, snap.Valid(), "proxy with roots/leaf/intentions is valid")
|
||||
|
||||
require.Empty(t, snap.ConnectProxy.PeeredUpstreams)
|
||||
|
||||
// db endpoint should have been cleaned up
|
||||
require.False(t, snap.ConnectProxy.PeerUpstreamEndpoints.IsWatched(extDBUID))
|
||||
|
||||
// Expect only api-a endpoint
|
||||
require.Equal(t, 1, snap.ConnectProxy.PeerUpstreamEndpoints.Len())
|
||||
require.Equal(t, 1, snap.ConnectProxy.UpstreamPeerTrustBundles.Len())
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
"connect-proxy": newConnectProxyCase(structs.MeshGatewayModeDefault),
|
||||
"connect-proxy-mesh-gateway-local": newConnectProxyCase(structs.MeshGatewayModeLocal),
|
||||
"connect-proxy-with-peers": {
|
||||
|
@ -2564,10 +2823,15 @@ func TestState_WatchesAndUpdates(t *testing.T) {
|
|||
require.Len(t, snap.ConnectProxy.WatchedGateways, 0, "%+v", snap.ConnectProxy.WatchedGateways)
|
||||
require.Len(t, snap.ConnectProxy.WatchedGatewayEndpoints, 0, "%+v", snap.ConnectProxy.WatchedGatewayEndpoints)
|
||||
|
||||
require.Contains(t, snap.ConnectProxy.WatchedUpstreamPeerTrustBundles, "peer-a", "%+v", snap.ConnectProxy.WatchedUpstreamPeerTrustBundles)
|
||||
require.Len(t, snap.ConnectProxy.UpstreamPeerTrustBundles, 0, "%+v", snap.ConnectProxy.UpstreamPeerTrustBundles)
|
||||
// watch initialized
|
||||
require.True(t, snap.ConnectProxy.UpstreamPeerTrustBundles.IsWatched("peer-a"))
|
||||
_, ok := snap.ConnectProxy.UpstreamPeerTrustBundles.Get("peer-a")
|
||||
require.False(t, ok) // but no data
|
||||
|
||||
require.Len(t, snap.ConnectProxy.PeerUpstreamEndpoints, 0, "%+v", snap.ConnectProxy.PeerUpstreamEndpoints)
|
||||
// watch initialized
|
||||
require.True(t, snap.ConnectProxy.PeerUpstreamEndpoints.IsWatched(extApiUID))
|
||||
_, ok = snap.ConnectProxy.PeerUpstreamEndpoints.Get(extApiUID)
|
||||
require.False(t, ok) // but no data
|
||||
|
||||
require.Len(t, snap.ConnectProxy.WatchedServiceChecks, 0, "%+v", snap.ConnectProxy.WatchedServiceChecks)
|
||||
require.Len(t, snap.ConnectProxy.PreparedQueryEndpoints, 0, "%+v", snap.ConnectProxy.PreparedQueryEndpoints)
|
||||
|
@ -2655,11 +2919,13 @@ func TestState_WatchesAndUpdates(t *testing.T) {
|
|||
require.Len(t, snap.ConnectProxy.WatchedGateways, 1, "%+v", snap.ConnectProxy.WatchedGateways)
|
||||
require.Len(t, snap.ConnectProxy.WatchedGatewayEndpoints, 1, "%+v", snap.ConnectProxy.WatchedGatewayEndpoints)
|
||||
|
||||
require.Contains(t, snap.ConnectProxy.WatchedUpstreamPeerTrustBundles, "peer-a", "%+v", snap.ConnectProxy.WatchedUpstreamPeerTrustBundles)
|
||||
require.Equal(t, peerTrustBundles.Bundles[0], snap.ConnectProxy.UpstreamPeerTrustBundles["peer-a"], "%+v", snap.ConnectProxy.WatchedUpstreamPeerTrustBundles)
|
||||
tb, ok := snap.ConnectProxy.UpstreamPeerTrustBundles.Get("peer-a")
|
||||
require.True(t, ok)
|
||||
prototest.AssertDeepEqual(t, peerTrustBundles.Bundles[0], tb)
|
||||
|
||||
require.Len(t, snap.ConnectProxy.PeerUpstreamEndpoints, 1, "%+v", snap.ConnectProxy.PeerUpstreamEndpoints)
|
||||
require.NotNil(t, snap.ConnectProxy.PeerUpstreamEndpoints[extApiUID])
|
||||
require.Equal(t, 1, snap.ConnectProxy.PeerUpstreamEndpoints.Len())
|
||||
ep, _ := snap.ConnectProxy.PeerUpstreamEndpoints.Get(extApiUID)
|
||||
require.NotNil(t, ep)
|
||||
|
||||
require.Len(t, snap.ConnectProxy.WatchedServiceChecks, 0, "%+v", snap.ConnectProxy.WatchedServiceChecks)
|
||||
require.Len(t, snap.ConnectProxy.PreparedQueryEndpoints, 0, "%+v", snap.ConnectProxy.PreparedQueryEndpoints)
|
||||
|
|
|
@ -745,6 +745,7 @@ func testConfigSnapshotFixture(
|
|||
IntentionUpstreams: &noopDataSource[*structs.ServiceSpecificRequest]{},
|
||||
InternalServiceDump: &noopDataSource[*structs.ServiceDumpRequest]{},
|
||||
LeafCertificate: &noopDataSource[*cachetype.ConnectCALeafRequest]{},
|
||||
PeeredUpstreams: &noopDataSource[*structs.PartitionSpecificRequest]{},
|
||||
PreparedQuery: &noopDataSource[*structs.PreparedQueryExecuteRequest]{},
|
||||
ResolvedServiceConfig: &noopDataSource[*structs.ServiceConfigRequest]{},
|
||||
ServiceList: &noopDataSource[*structs.DCSpecificRequest]{},
|
||||
|
@ -971,6 +972,7 @@ type TestDataSources struct {
|
|||
IntentionUpstreams *TestDataSource[*structs.ServiceSpecificRequest, *structs.IndexedServiceList]
|
||||
InternalServiceDump *TestDataSource[*structs.ServiceDumpRequest, *structs.IndexedNodesWithGateways]
|
||||
LeafCertificate *TestDataSource[*cachetype.ConnectCALeafRequest, *structs.IssuedCert]
|
||||
PeeredUpstreams *TestDataSource[*structs.PartitionSpecificRequest, *structs.IndexedPeeredServiceList]
|
||||
PreparedQuery *TestDataSource[*structs.PreparedQueryExecuteRequest, *structs.PreparedQueryExecuteResponse]
|
||||
ResolvedServiceConfig *TestDataSource[*structs.ServiceConfigRequest, *structs.ServiceConfigResponse]
|
||||
ServiceList *TestDataSource[*structs.DCSpecificRequest, *structs.IndexedServiceList]
|
||||
|
@ -994,6 +996,7 @@ func (t *TestDataSources) ToDataSources() DataSources {
|
|||
IntentionUpstreams: t.IntentionUpstreams,
|
||||
InternalServiceDump: t.InternalServiceDump,
|
||||
LeafCertificate: t.LeafCertificate,
|
||||
PeeredUpstreams: t.PeeredUpstreams,
|
||||
PreparedQuery: t.PreparedQuery,
|
||||
ResolvedServiceConfig: t.ResolvedServiceConfig,
|
||||
ServiceList: t.ServiceList,
|
||||
|
|
|
@ -103,19 +103,15 @@ func (s *handlerUpstreams) handleUpdateUpstreams(ctx context.Context, u UpdateEv
|
|||
resp.Nodes,
|
||||
)
|
||||
if len(filteredNodes) > 0 {
|
||||
upstreamsSnapshot.PeerUpstreamEndpoints[uid] = filteredNodes
|
||||
upstreamsSnapshot.PeerUpstreamEndpointsUseHostnames[uid] = struct{}{}
|
||||
if set := upstreamsSnapshot.PeerUpstreamEndpoints.Set(uid, filteredNodes); set {
|
||||
upstreamsSnapshot.PeerUpstreamEndpointsUseHostnames[uid] = struct{}{}
|
||||
}
|
||||
} else {
|
||||
upstreamsSnapshot.PeerUpstreamEndpoints[uid] = resp.Nodes
|
||||
delete(upstreamsSnapshot.PeerUpstreamEndpointsUseHostnames, uid)
|
||||
if set := upstreamsSnapshot.PeerUpstreamEndpoints.Set(uid, resp.Nodes); set {
|
||||
delete(upstreamsSnapshot.PeerUpstreamEndpointsUseHostnames, uid)
|
||||
}
|
||||
}
|
||||
|
||||
if s.kind != structs.ServiceKindConnectProxy || s.proxyCfg.Mode != structs.ProxyModeTransparent {
|
||||
return nil
|
||||
}
|
||||
|
||||
s.logger.Warn("skipping transparent proxy update for peered upstream")
|
||||
|
||||
case strings.HasPrefix(u.CorrelationID, "upstream-target:"):
|
||||
resp, ok := u.Result.(*structs.IndexedCheckServiceNodes)
|
||||
if !ok {
|
||||
|
@ -157,6 +153,10 @@ func (s *handlerUpstreams) handleUpdateUpstreams(ctx context.Context, u UpdateEv
|
|||
|
||||
// Make sure to use an external address when crossing partition or DC boundaries.
|
||||
isRemote := !snap.Locality.Matches(node.Node.Datacenter, node.Node.PartitionOrDefault())
|
||||
// If node is peered it must be remote
|
||||
if node.Node.PeerOrEmpty() != "" {
|
||||
isRemote = true
|
||||
}
|
||||
csnIdx, addr, _ := node.BestAddress(isRemote)
|
||||
|
||||
existing := upstreamsSnapshot.PassthroughIndices[addr]
|
||||
|
|
|
@ -729,11 +729,12 @@ func (s *ResourceGenerator) makeUpstreamClusterForPeerService(
|
|||
},
|
||||
}
|
||||
} else {
|
||||
ep, _ := cfgSnap.ConnectProxy.PeerUpstreamEndpoints.Get(uid)
|
||||
configureClusterWithHostnames(
|
||||
s.Logger,
|
||||
c,
|
||||
"", /*TODO:make configurable?*/
|
||||
cfgSnap.ConnectProxy.PeerUpstreamEndpoints[uid],
|
||||
ep,
|
||||
true, /*isRemote*/
|
||||
false, /*onlyPassing*/
|
||||
)
|
||||
|
@ -743,7 +744,8 @@ func (s *ResourceGenerator) makeUpstreamClusterForPeerService(
|
|||
|
||||
rootPEMs := cfgSnap.RootPEMs()
|
||||
if uid.Peer != "" {
|
||||
rootPEMs = cfgSnap.ConnectProxy.UpstreamPeerTrustBundles[uid.Peer].ConcatenatedRootPEMs()
|
||||
tbs, _ := cfgSnap.ConnectProxy.UpstreamPeerTrustBundles.Get(uid.Peer)
|
||||
rootPEMs = tbs.ConcatenatedRootPEMs()
|
||||
}
|
||||
|
||||
// Enable TLS upstream with the configured client certificate.
|
||||
|
@ -1077,13 +1079,9 @@ func (s *ResourceGenerator) makeUpstreamClustersForDiscoveryChain(
|
|||
}
|
||||
|
||||
if configureTLS {
|
||||
rootPEMs := cfgSnap.RootPEMs()
|
||||
if uid.Peer != "" {
|
||||
rootPEMs = cfgSnap.ConnectProxy.UpstreamPeerTrustBundles[uid.Peer].ConcatenatedRootPEMs()
|
||||
}
|
||||
commonTLSContext := makeCommonTLSContext(
|
||||
cfgSnap.Leaf(),
|
||||
rootPEMs,
|
||||
cfgSnap.RootPEMs(),
|
||||
makeTLSParametersFromProxyTLSConfig(cfgSnap.MeshConfigTLSOutgoing()),
|
||||
)
|
||||
|
||||
|
|
|
@ -47,7 +47,7 @@ func (s *ResourceGenerator) endpointsFromSnapshotConnectProxy(cfgSnap *proxycfg.
|
|||
// TODO: this estimate is wrong
|
||||
resources := make([]proto.Message, 0,
|
||||
len(cfgSnap.ConnectProxy.PreparedQueryEndpoints)+
|
||||
len(cfgSnap.ConnectProxy.PeerUpstreamEndpoints)+
|
||||
cfgSnap.ConnectProxy.PeerUpstreamEndpoints.Len()+
|
||||
len(cfgSnap.ConnectProxy.WatchedUpstreamEndpoints))
|
||||
|
||||
// NOTE: Any time we skip a chain below we MUST also skip that discovery chain in clusters.go
|
||||
|
@ -110,7 +110,7 @@ func (s *ResourceGenerator) endpointsFromSnapshotConnectProxy(cfgSnap *proxycfg.
|
|||
continue
|
||||
}
|
||||
|
||||
endpoints, ok := cfgSnap.ConnectProxy.PeerUpstreamEndpoints[uid]
|
||||
endpoints, ok := cfgSnap.ConnectProxy.PeerUpstreamEndpoints.Get(uid)
|
||||
if ok {
|
||||
la := makeLoadAssignment(
|
||||
clusterName,
|
||||
|
|
Loading…
Reference in a new issue