Make the mesh gateway changes to allow `local` mode for cluster peering data plane traffic (#14817)
Make the mesh gateway changes to allow `local` mode for cluster peering data plane traffic
This commit is contained in:
parent
53ff317b01
commit
2f08fab317
|
@ -0,0 +1,3 @@
|
|||
```release-note:feature
|
||||
peering: Add mesh gateway local mode support for cluster peering.
|
||||
```
|
|
@ -120,6 +120,9 @@ func (s *handlerMeshGateway) initialize(ctx context.Context) (ConfigSnapshot, er
|
|||
snap.MeshGateway.ExportedServicesWithPeers = make(map[structs.ServiceName][]string)
|
||||
snap.MeshGateway.DiscoveryChain = make(map[structs.ServiceName]*structs.CompiledDiscoveryChain)
|
||||
snap.MeshGateway.WatchedDiscoveryChains = make(map[structs.ServiceName]context.CancelFunc)
|
||||
snap.MeshGateway.WatchedPeeringServices = make(map[string]map[structs.ServiceName]context.CancelFunc)
|
||||
snap.MeshGateway.WatchedPeers = make(map[string]context.CancelFunc)
|
||||
snap.MeshGateway.PeeringServices = make(map[string]map[structs.ServiceName]PeeringServiceValue)
|
||||
|
||||
// there is no need to initialize the map of service resolvers as we
|
||||
// fully rebuild it every time we get updates
|
||||
|
@ -460,6 +463,52 @@ func (s *handlerMeshGateway) handleUpdate(ctx context.Context, u UpdateEvent, sn
|
|||
}
|
||||
snap.MeshGateway.PeeringTrustBundlesSet = true
|
||||
|
||||
wildcardEntMeta := s.proxyID.WithWildcardNamespace()
|
||||
|
||||
// For each peer, fetch the imported services to support mesh gateway local
|
||||
// mode.
|
||||
for _, tb := range resp.Bundles {
|
||||
entMeta := structs.DefaultEnterpriseMetaInDefaultPartition()
|
||||
|
||||
if _, ok := snap.MeshGateway.WatchedPeers[tb.PeerName]; !ok {
|
||||
ctx, cancel := context.WithCancel(ctx)
|
||||
|
||||
err := s.dataSources.ServiceList.Notify(ctx, &structs.DCSpecificRequest{
|
||||
PeerName: tb.PeerName,
|
||||
QueryOptions: structs.QueryOptions{Token: s.token},
|
||||
Source: *s.source,
|
||||
EnterpriseMeta: *wildcardEntMeta,
|
||||
}, peeringServiceListWatchID+tb.PeerName, s.ch)
|
||||
|
||||
if err != nil {
|
||||
meshLogger.Error("failed to register watch for mesh-gateway",
|
||||
"peer", tb.PeerName,
|
||||
"partition", entMeta.PartitionOrDefault(),
|
||||
"error", err,
|
||||
)
|
||||
cancel()
|
||||
return err
|
||||
}
|
||||
snap.MeshGateway.WatchedPeers[tb.PeerName] = cancel
|
||||
}
|
||||
}
|
||||
|
||||
for peerName, cancelFn := range snap.MeshGateway.WatchedPeers {
|
||||
found := false
|
||||
for _, bundle := range resp.Bundles {
|
||||
if peerName == bundle.PeerName {
|
||||
found = true
|
||||
break
|
||||
}
|
||||
}
|
||||
if !found {
|
||||
delete(snap.MeshGateway.PeeringServices, peerName)
|
||||
delete(snap.MeshGateway.WatchedPeers, peerName)
|
||||
delete(snap.MeshGateway.WatchedPeeringServices, peerName)
|
||||
cancelFn()
|
||||
}
|
||||
}
|
||||
|
||||
case meshConfigEntryID:
|
||||
resp, ok := u.Result.(*structs.ConfigEntryResponse)
|
||||
if !ok {
|
||||
|
@ -517,6 +566,57 @@ func (s *handlerMeshGateway) handleUpdate(ctx context.Context, u UpdateEvent, sn
|
|||
|
||||
default:
|
||||
switch {
|
||||
case strings.HasPrefix(u.CorrelationID, peeringServiceListWatchID):
|
||||
services, ok := u.Result.(*structs.IndexedServiceList)
|
||||
if !ok {
|
||||
return fmt.Errorf("invalid type for response: %T", u.Result)
|
||||
}
|
||||
|
||||
peerName := strings.TrimPrefix(u.CorrelationID, peeringServiceListWatchID)
|
||||
|
||||
svcMap := make(map[structs.ServiceName]struct{})
|
||||
|
||||
if _, ok := snap.MeshGateway.WatchedPeeringServices[peerName]; !ok {
|
||||
snap.MeshGateway.WatchedPeeringServices[peerName] = make(map[structs.ServiceName]context.CancelFunc)
|
||||
}
|
||||
|
||||
for _, svc := range services.Services {
|
||||
// Make sure to add every service to this map, we use it to cancel
|
||||
// watches below.
|
||||
svcMap[svc] = struct{}{}
|
||||
|
||||
if _, ok := snap.MeshGateway.WatchedPeeringServices[peerName][svc]; !ok {
|
||||
ctx, cancel := context.WithCancel(ctx)
|
||||
err := s.dataSources.Health.Notify(ctx, &structs.ServiceSpecificRequest{
|
||||
PeerName: peerName,
|
||||
QueryOptions: structs.QueryOptions{Token: s.token},
|
||||
ServiceName: svc.Name,
|
||||
Connect: true,
|
||||
EnterpriseMeta: svc.EnterpriseMeta,
|
||||
}, fmt.Sprintf("peering-connect-service:%s:%s", peerName, svc.String()), s.ch)
|
||||
|
||||
if err != nil {
|
||||
meshLogger.Error("failed to register watch for connect-service",
|
||||
"service", svc.String(),
|
||||
"error", err,
|
||||
)
|
||||
cancel()
|
||||
return err
|
||||
}
|
||||
snap.MeshGateway.WatchedPeeringServices[peerName][svc] = cancel
|
||||
}
|
||||
}
|
||||
|
||||
watchedServices := snap.MeshGateway.WatchedPeeringServices[peerName]
|
||||
for sn, cancelFn := range watchedServices {
|
||||
if _, ok := svcMap[sn]; !ok {
|
||||
meshLogger.Debug("canceling watch for service", "service", sn.String())
|
||||
delete(snap.MeshGateway.WatchedPeeringServices[peerName], sn)
|
||||
delete(snap.MeshGateway.PeeringServices[peerName], sn)
|
||||
cancelFn()
|
||||
}
|
||||
}
|
||||
|
||||
case strings.HasPrefix(u.CorrelationID, "connect-service:"):
|
||||
resp, ok := u.Result.(*structs.IndexedCheckServiceNodes)
|
||||
if !ok {
|
||||
|
@ -530,6 +630,38 @@ func (s *handlerMeshGateway) handleUpdate(ctx context.Context, u UpdateEvent, sn
|
|||
} else if _, ok := snap.MeshGateway.ServiceGroups[sn]; ok {
|
||||
delete(snap.MeshGateway.ServiceGroups, sn)
|
||||
}
|
||||
case strings.HasPrefix(u.CorrelationID, "peering-connect-service:"):
|
||||
resp, ok := u.Result.(*structs.IndexedCheckServiceNodes)
|
||||
|
||||
if !ok {
|
||||
return fmt.Errorf("invalid type for response: %T", u.Result)
|
||||
}
|
||||
|
||||
key := strings.TrimPrefix(u.CorrelationID, "peering-connect-service:")
|
||||
peer, snString, ok := strings.Cut(key, ":")
|
||||
|
||||
if ok {
|
||||
sn := structs.ServiceNameFromString(snString)
|
||||
|
||||
if len(resp.Nodes) > 0 {
|
||||
if _, ok := snap.MeshGateway.PeeringServices[peer]; !ok {
|
||||
snap.MeshGateway.PeeringServices[peer] = make(map[structs.ServiceName]PeeringServiceValue)
|
||||
}
|
||||
|
||||
if eps := hostnameEndpoints(s.logger, GatewayKey{}, resp.Nodes); len(eps) > 0 {
|
||||
snap.MeshGateway.PeeringServices[peer][sn] = PeeringServiceValue{
|
||||
Nodes: eps,
|
||||
UseCDS: true,
|
||||
}
|
||||
} else {
|
||||
snap.MeshGateway.PeeringServices[peer][sn] = PeeringServiceValue{
|
||||
Nodes: resp.Nodes,
|
||||
}
|
||||
}
|
||||
} else if _, ok := snap.MeshGateway.PeeringServices[peer]; ok {
|
||||
delete(snap.MeshGateway.PeeringServices[peer], sn)
|
||||
}
|
||||
}
|
||||
|
||||
case strings.HasPrefix(u.CorrelationID, "mesh-gateway:"):
|
||||
resp, ok := u.Result.(*structs.IndexedCheckServiceNodes)
|
||||
|
|
|
@ -348,6 +348,11 @@ func (c *configSnapshotTerminatingGateway) isEmpty() bool {
|
|||
!c.MeshConfigSet
|
||||
}
|
||||
|
||||
type PeeringServiceValue struct {
|
||||
Nodes structs.CheckServiceNodes
|
||||
UseCDS bool
|
||||
}
|
||||
|
||||
type configSnapshotMeshGateway struct {
|
||||
// WatchedServices is a map of service name to a cancel function. This cancel
|
||||
// function is tied to the watch of connect enabled services for the given
|
||||
|
@ -373,6 +378,19 @@ type configSnapshotMeshGateway struct {
|
|||
// service in the local datacenter.
|
||||
ServiceGroups map[structs.ServiceName]structs.CheckServiceNodes
|
||||
|
||||
// PeeringServices is a map of peer name -> (map of
|
||||
// service name -> CheckServiceNodes) and is used to determine the backing
|
||||
// endpoints of a service on a peer.
|
||||
PeeringServices map[string]map[structs.ServiceName]PeeringServiceValue
|
||||
|
||||
// WatchedPeeringServices is a map of peer name -> (map of service name ->
|
||||
// cancel function) and is used to track watches on services within a peer.
|
||||
WatchedPeeringServices map[string]map[structs.ServiceName]context.CancelFunc
|
||||
|
||||
// WatchedPeers is a map of peer name -> cancel functions. It is used to
|
||||
// track watches on peers.
|
||||
WatchedPeers map[string]context.CancelFunc
|
||||
|
||||
// ServiceResolvers is a map of service name to an associated
|
||||
// service-resolver config entry for that service.
|
||||
ServiceResolvers map[structs.ServiceName]*structs.ServiceResolverConfigEntry
|
||||
|
|
|
@ -25,6 +25,7 @@ const (
|
|||
peerTrustBundleIDPrefix = "peer-trust-bundle:"
|
||||
intentionsWatchID = "intentions"
|
||||
serviceListWatchID = "service-list"
|
||||
peeringServiceListWatchID = "peering-service-list:"
|
||||
federationStateListGatewaysWatchID = "federation-state-list-mesh-gateways"
|
||||
consulServerListWatchID = "consul-server-list"
|
||||
datacentersWatchID = "datacenters"
|
||||
|
|
|
@ -830,6 +830,9 @@ func TestState_WatchesAndUpdates(t *testing.T) {
|
|||
require.Empty(t, snap.MeshGateway.ServiceGroups)
|
||||
require.Empty(t, snap.MeshGateway.ServiceResolvers)
|
||||
require.Empty(t, snap.MeshGateway.GatewayGroups)
|
||||
require.Empty(t, snap.MeshGateway.WatchedPeeringServices)
|
||||
require.Empty(t, snap.MeshGateway.WatchedPeers)
|
||||
require.Empty(t, snap.MeshGateway.PeeringServices)
|
||||
},
|
||||
},
|
||||
{
|
||||
|
@ -897,8 +900,22 @@ func TestState_WatchesAndUpdates(t *testing.T) {
|
|||
},
|
||||
{
|
||||
CorrelationID: peeringTrustBundlesWatchID,
|
||||
Result: &pbpeering.TrustBundleListByServiceResponse{
|
||||
Bundles: nil,
|
||||
Result: peerTrustBundles,
|
||||
},
|
||||
{
|
||||
CorrelationID: peeringServiceListWatchID + "peer-a",
|
||||
Result: &structs.IndexedServiceList{
|
||||
Services: structs.ServiceList{
|
||||
{Name: "service-1"},
|
||||
},
|
||||
},
|
||||
},
|
||||
{
|
||||
CorrelationID: peeringServiceListWatchID + "peer-b",
|
||||
Result: &structs.IndexedServiceList{
|
||||
Services: structs.ServiceList{
|
||||
{Name: "service-10"},
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
|
@ -906,6 +923,10 @@ func TestState_WatchesAndUpdates(t *testing.T) {
|
|||
require.True(t, snap.Valid(), "gateway with service list is valid")
|
||||
require.Len(t, snap.MeshGateway.WatchedServices, 1)
|
||||
require.True(t, snap.MeshGateway.WatchedServicesSet)
|
||||
require.Len(t, snap.MeshGateway.WatchedPeers, 2)
|
||||
require.Len(t, snap.MeshGateway.WatchedPeeringServices, 2)
|
||||
require.Len(t, snap.MeshGateway.WatchedPeeringServices["peer-a"], 1)
|
||||
require.Len(t, snap.MeshGateway.WatchedPeeringServices["peer-b"], 1)
|
||||
},
|
||||
},
|
||||
{
|
||||
|
@ -920,11 +941,33 @@ func TestState_WatchesAndUpdates(t *testing.T) {
|
|||
},
|
||||
Err: nil,
|
||||
},
|
||||
{
|
||||
CorrelationID: peeringServiceListWatchID + "peer-a",
|
||||
Result: &structs.IndexedServiceList{
|
||||
Services: structs.ServiceList{
|
||||
{Name: "service-1"},
|
||||
{Name: "service-2"},
|
||||
{Name: "service-3"},
|
||||
},
|
||||
},
|
||||
Err: nil,
|
||||
},
|
||||
{
|
||||
CorrelationID: peeringServiceListWatchID + "peer-b",
|
||||
Result: &structs.IndexedServiceList{
|
||||
Services: structs.ServiceList{},
|
||||
},
|
||||
Err: nil,
|
||||
},
|
||||
},
|
||||
verifySnapshot: func(t testing.TB, snap *ConfigSnapshot) {
|
||||
require.True(t, snap.Valid(), "gateway with service list is valid")
|
||||
require.Len(t, snap.MeshGateway.WatchedServices, 2)
|
||||
require.True(t, snap.MeshGateway.WatchedServicesSet)
|
||||
require.Len(t, snap.MeshGateway.WatchedPeers, 2)
|
||||
require.Len(t, snap.MeshGateway.WatchedPeeringServices, 2)
|
||||
require.Len(t, snap.MeshGateway.WatchedPeeringServices["peer-a"], 3)
|
||||
require.Len(t, snap.MeshGateway.WatchedPeeringServices["peer-b"], 0)
|
||||
},
|
||||
},
|
||||
{
|
||||
|
|
|
@ -280,31 +280,6 @@ func TestUpstreamNodesDC2(t testing.T) structs.CheckServiceNodes {
|
|||
}
|
||||
}
|
||||
|
||||
func TestUpstreamNodesPeerCluster01(t testing.T) structs.CheckServiceNodes {
|
||||
peer := "cluster-01"
|
||||
service := structs.TestNodeServiceWithNameInPeer(t, "web", peer)
|
||||
return structs.CheckServiceNodes{
|
||||
structs.CheckServiceNode{
|
||||
Node: &structs.Node{
|
||||
ID: "test1",
|
||||
Node: "test1",
|
||||
Address: "10.40.1.1",
|
||||
PeerName: peer,
|
||||
},
|
||||
Service: service,
|
||||
},
|
||||
structs.CheckServiceNode{
|
||||
Node: &structs.Node{
|
||||
ID: "test2",
|
||||
Node: "test2",
|
||||
Address: "10.40.1.2",
|
||||
PeerName: peer,
|
||||
},
|
||||
Service: service,
|
||||
},
|
||||
}
|
||||
}
|
||||
|
||||
func TestUpstreamNodesInStatusDC2(t testing.T, status string) structs.CheckServiceNodes {
|
||||
return structs.CheckServiceNodes{
|
||||
structs.CheckServiceNode{
|
||||
|
|
|
@ -632,6 +632,48 @@ func TestConfigSnapshotPeeredMeshGateway(t testing.T, variant string, nsFn func(
|
|||
},
|
||||
},
|
||||
)
|
||||
case "imported-services":
|
||||
peerTrustBundles := TestPeerTrustBundles(t).Bundles
|
||||
dbSN := structs.NewServiceName("db", nil)
|
||||
altSN := structs.NewServiceName("alt", nil)
|
||||
extraUpdates = append(extraUpdates,
|
||||
UpdateEvent{
|
||||
CorrelationID: peeringTrustBundlesWatchID,
|
||||
Result: &pbpeering.TrustBundleListByServiceResponse{
|
||||
Bundles: peerTrustBundles,
|
||||
},
|
||||
},
|
||||
UpdateEvent{
|
||||
CorrelationID: peeringServiceListWatchID + "peer-a",
|
||||
Result: &structs.IndexedServiceList{
|
||||
Services: []structs.ServiceName{altSN},
|
||||
},
|
||||
},
|
||||
UpdateEvent{
|
||||
CorrelationID: peeringServiceListWatchID + "peer-b",
|
||||
Result: &structs.IndexedServiceList{
|
||||
Services: []structs.ServiceName{dbSN},
|
||||
},
|
||||
},
|
||||
UpdateEvent{
|
||||
CorrelationID: "peering-connect-service:peer-a:db",
|
||||
Result: &structs.IndexedCheckServiceNodes{
|
||||
Nodes: structs.CheckServiceNodes{
|
||||
structs.TestCheckNodeServiceWithNameInPeer(t, "db", "peer-a", "10.40.1.1", false),
|
||||
structs.TestCheckNodeServiceWithNameInPeer(t, "db", "peer-a", "10.40.1.2", false),
|
||||
},
|
||||
},
|
||||
},
|
||||
UpdateEvent{
|
||||
CorrelationID: "peering-connect-service:peer-b:alt",
|
||||
Result: &structs.IndexedCheckServiceNodes{
|
||||
Nodes: structs.CheckServiceNodes{
|
||||
structs.TestCheckNodeServiceWithNameInPeer(t, "alt", "peer-b", "10.40.2.1", false),
|
||||
structs.TestCheckNodeServiceWithNameInPeer(t, "alt", "peer-b", "10.40.2.2", true),
|
||||
},
|
||||
},
|
||||
},
|
||||
)
|
||||
case "chain-and-l7-stuff":
|
||||
entries = []structs.ConfigEntry{
|
||||
&structs.ProxyConfigEntry{
|
||||
|
|
|
@ -88,7 +88,7 @@ func setupTestVariationConfigEntriesAndSnapshot(
|
|||
events = append(events, UpdateEvent{
|
||||
CorrelationID: "upstream-peer:db?peer=cluster-01",
|
||||
Result: &structs.IndexedCheckServiceNodes{
|
||||
Nodes: TestUpstreamNodesPeerCluster01(t),
|
||||
Nodes: structs.CheckServiceNodes{structs.TestCheckNodeServiceWithNameInPeer(t, "db", "cluster-01", "10.40.1.1", false)},
|
||||
},
|
||||
})
|
||||
case "redirect-to-cluster-peer":
|
||||
|
@ -106,7 +106,7 @@ func setupTestVariationConfigEntriesAndSnapshot(
|
|||
events = append(events, UpdateEvent{
|
||||
CorrelationID: "upstream-peer:db?peer=cluster-01",
|
||||
Result: &structs.IndexedCheckServiceNodes{
|
||||
Nodes: TestUpstreamNodesPeerCluster01(t),
|
||||
Nodes: structs.CheckServiceNodes{structs.TestCheckNodeServiceWithNameInPeer(t, "db", "cluster-01", "10.40.1.1", false)},
|
||||
},
|
||||
})
|
||||
case "failover-through-double-remote-gateway-triggered":
|
||||
|
|
|
@ -55,24 +55,47 @@ func TestNodeServiceWithName(t testing.T, name string) *NodeService {
|
|||
|
||||
const peerTrustDomain = "1c053652-8512-4373-90cf-5a7f6263a994.consul"
|
||||
|
||||
func TestNodeServiceWithNameInPeer(t testing.T, name string, peer string) *NodeService {
|
||||
service := "payments"
|
||||
return &NodeService{
|
||||
Kind: ServiceKindTypical,
|
||||
Service: name,
|
||||
Port: 8080,
|
||||
func TestCheckNodeServiceWithNameInPeer(t testing.T, name, peer, ip string, useHostname bool) CheckServiceNode {
|
||||
service := &NodeService{
|
||||
Kind: ServiceKindTypical,
|
||||
Service: name,
|
||||
Port: 8080,
|
||||
PeerName: peer,
|
||||
Connect: ServiceConnect{
|
||||
PeerMeta: &PeeringServiceMeta{
|
||||
SNI: []string{
|
||||
service + ".default.default." + peer + ".external." + peerTrustDomain,
|
||||
name + ".default.default." + peer + ".external." + peerTrustDomain,
|
||||
},
|
||||
SpiffeID: []string{
|
||||
"spiffe://" + peerTrustDomain + "/ns/default/dc/" + peer + "-dc/svc/" + service,
|
||||
"spiffe://" + peerTrustDomain + "/ns/default/dc/" + peer + "-dc/svc/" + name,
|
||||
},
|
||||
Protocol: "tcp",
|
||||
},
|
||||
},
|
||||
}
|
||||
|
||||
if useHostname {
|
||||
service.TaggedAddresses = map[string]ServiceAddress{
|
||||
TaggedAddressLAN: {
|
||||
Address: ip,
|
||||
Port: 443,
|
||||
},
|
||||
TaggedAddressWAN: {
|
||||
Address: name + ".us-east-1.elb.notaws.com",
|
||||
Port: 8443,
|
||||
},
|
||||
}
|
||||
}
|
||||
|
||||
return CheckServiceNode{
|
||||
Node: &Node{
|
||||
ID: "test1",
|
||||
Node: "test1",
|
||||
Address: ip,
|
||||
Datacenter: "cloud-dc",
|
||||
},
|
||||
Service: service,
|
||||
}
|
||||
}
|
||||
|
||||
// TestNodeServiceProxy returns a *NodeService representing a valid
|
||||
|
|
|
@ -418,6 +418,13 @@ func (s *ResourceGenerator) clustersFromSnapshotMeshGateway(cfgSnap *proxycfg.Co
|
|||
}
|
||||
clusters = append(clusters, c...)
|
||||
|
||||
// generate the outgoing clusters for imported peer services.
|
||||
c, err = s.makeGatewayOutgoingClusterPeeringServiceClusters(cfgSnap)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
clusters = append(clusters, c...)
|
||||
|
||||
// Generate per-target clusters for all exported discovery chains.
|
||||
c, err = s.makeExportedUpstreamClustersForMeshGateway(cfgSnap)
|
||||
if err != nil {
|
||||
|
@ -559,6 +566,62 @@ func (s *ResourceGenerator) makeGatewayServiceClusters(
|
|||
return clusters, nil
|
||||
}
|
||||
|
||||
func (s *ResourceGenerator) makeGatewayOutgoingClusterPeeringServiceClusters(cfgSnap *proxycfg.ConfigSnapshot) ([]proto.Message, error) {
|
||||
if cfgSnap.Kind != structs.ServiceKindMeshGateway {
|
||||
return nil, fmt.Errorf("unsupported gateway kind %q", cfgSnap.Kind)
|
||||
}
|
||||
|
||||
var clusters []proto.Message
|
||||
|
||||
for _, serviceGroups := range cfgSnap.MeshGateway.PeeringServices {
|
||||
for sn, serviceGroup := range serviceGroups {
|
||||
if len(serviceGroup.Nodes) == 0 {
|
||||
continue
|
||||
}
|
||||
|
||||
node := serviceGroup.Nodes[0]
|
||||
if node.Service == nil {
|
||||
return nil, fmt.Errorf("couldn't get SNI for peered service %s", sn.String())
|
||||
}
|
||||
// This uses the SNI in the accepting cluster peer so the remote mesh
|
||||
// gateway can distinguish between an exported service as opposed to the
|
||||
// usual mesh gateway route for a service.
|
||||
clusterName := node.Service.Connect.PeerMeta.PrimarySNI()
|
||||
|
||||
opts := clusterOpts{
|
||||
name: clusterName,
|
||||
isRemote: true,
|
||||
}
|
||||
cluster := s.makeGatewayCluster(cfgSnap, opts)
|
||||
|
||||
if serviceGroup.UseCDS {
|
||||
configureClusterWithHostnames(
|
||||
s.Logger,
|
||||
cluster,
|
||||
"", /*TODO:make configurable?*/
|
||||
serviceGroup.Nodes,
|
||||
true, /*isRemote*/
|
||||
false, /*onlyPassing*/
|
||||
)
|
||||
} else {
|
||||
cluster.ClusterDiscoveryType = &envoy_cluster_v3.Cluster_Type{Type: envoy_cluster_v3.Cluster_EDS}
|
||||
cluster.EdsClusterConfig = &envoy_cluster_v3.Cluster_EdsClusterConfig{
|
||||
EdsConfig: &envoy_core_v3.ConfigSource{
|
||||
ResourceApiVersion: envoy_core_v3.ApiVersion_V3,
|
||||
ConfigSourceSpecifier: &envoy_core_v3.ConfigSource_Ads{
|
||||
Ads: &envoy_core_v3.AggregatedConfigSource{},
|
||||
},
|
||||
},
|
||||
}
|
||||
}
|
||||
|
||||
clusters = append(clusters, cluster)
|
||||
}
|
||||
}
|
||||
|
||||
return clusters, nil
|
||||
}
|
||||
|
||||
func (s *ResourceGenerator) makeDestinationClusters(cfgSnap *proxycfg.ConfigSnapshot) ([]proto.Message, error) {
|
||||
serviceConfigs := cfgSnap.TerminatingGateway.ServiceConfigs
|
||||
|
||||
|
|
|
@ -342,6 +342,13 @@ func (s *ResourceGenerator) endpointsFromSnapshotMeshGateway(cfgSnap *proxycfg.C
|
|||
}
|
||||
resources = append(resources, e...)
|
||||
|
||||
// generate the outgoing endpoints for imported peer services.
|
||||
e, err = s.makeEndpointsForOutgoingPeeredServices(cfgSnap)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
resources = append(resources, e...)
|
||||
|
||||
return resources, nil
|
||||
}
|
||||
|
||||
|
@ -398,6 +405,41 @@ func (s *ResourceGenerator) endpointsFromServicesAndResolvers(
|
|||
return resources, nil
|
||||
}
|
||||
|
||||
func (s *ResourceGenerator) makeEndpointsForOutgoingPeeredServices(
|
||||
cfgSnap *proxycfg.ConfigSnapshot,
|
||||
) ([]proto.Message, error) {
|
||||
var resources []proto.Message
|
||||
|
||||
// generate the endpoints for the linked service groups
|
||||
for _, serviceGroups := range cfgSnap.MeshGateway.PeeringServices {
|
||||
for sn, serviceGroup := range serviceGroups {
|
||||
if serviceGroup.UseCDS || len(serviceGroup.Nodes) == 0 {
|
||||
continue
|
||||
}
|
||||
|
||||
node := serviceGroup.Nodes[0]
|
||||
if node.Service == nil {
|
||||
return nil, fmt.Errorf("couldn't get SNI for peered service %s", sn.String())
|
||||
}
|
||||
// This uses the SNI in the accepting cluster peer so the remote mesh
|
||||
// gateway can distinguish between an exported service as opposed to the
|
||||
// usual mesh gateway route for a service.
|
||||
clusterName := node.Service.Connect.PeerMeta.PrimarySNI()
|
||||
|
||||
groups := []loadAssignmentEndpointGroup{{Endpoints: serviceGroup.Nodes, OnlyPassing: false}}
|
||||
|
||||
la := makeLoadAssignment(
|
||||
clusterName,
|
||||
groups,
|
||||
cfgSnap.Locality,
|
||||
)
|
||||
resources = append(resources, la)
|
||||
}
|
||||
}
|
||||
|
||||
return resources, nil
|
||||
}
|
||||
|
||||
func (s *ResourceGenerator) endpointsFromSnapshotIngressGateway(cfgSnap *proxycfg.ConfigSnapshot) ([]proto.Message, error) {
|
||||
var resources []proto.Message
|
||||
createdClusters := make(map[proxycfg.UpstreamID]bool)
|
||||
|
|
|
@ -225,6 +225,12 @@ func getMeshGatewayPeeringGoldenTestCases() []goldenTestCase {
|
|||
return proxycfg.TestConfigSnapshotPeeredMeshGateway(t, "control-plane", nil, nil)
|
||||
},
|
||||
},
|
||||
{
|
||||
name: "mesh-gateway-with-imported-peered-services",
|
||||
create: func(t testinf.T) *proxycfg.ConfigSnapshot {
|
||||
return proxycfg.TestConfigSnapshotPeeredMeshGateway(t, "imported-services", nil, nil)
|
||||
},
|
||||
},
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -66,12 +66,12 @@
|
|||
},
|
||||
"matchSubjectAltNames": [
|
||||
{
|
||||
"exact": "spiffe://1c053652-8512-4373-90cf-5a7f6263a994.consul/ns/default/dc/cluster-01-dc/svc/payments"
|
||||
"exact": "spiffe://1c053652-8512-4373-90cf-5a7f6263a994.consul/ns/default/dc/cluster-01-dc/svc/db"
|
||||
}
|
||||
]
|
||||
}
|
||||
},
|
||||
"sni": "payments.default.default.cluster-01.external.1c053652-8512-4373-90cf-5a7f6263a994.consul"
|
||||
"sni": "db.default.default.cluster-01.external.1c053652-8512-4373-90cf-5a7f6263a994.consul"
|
||||
}
|
||||
}
|
||||
},
|
||||
|
|
|
@ -49,12 +49,12 @@
|
|||
},
|
||||
"matchSubjectAltNames": [
|
||||
{
|
||||
"exact": "spiffe://1c053652-8512-4373-90cf-5a7f6263a994.consul/ns/default/dc/cluster-01-dc/svc/payments"
|
||||
"exact": "spiffe://1c053652-8512-4373-90cf-5a7f6263a994.consul/ns/default/dc/cluster-01-dc/svc/db"
|
||||
}
|
||||
]
|
||||
}
|
||||
},
|
||||
"sni": "payments.default.default.cluster-01.external.1c053652-8512-4373-90cf-5a7f6263a994.consul"
|
||||
"sni": "db.default.default.cluster-01.external.1c053652-8512-4373-90cf-5a7f6263a994.consul"
|
||||
}
|
||||
}
|
||||
},
|
||||
|
|
|
@ -66,12 +66,12 @@
|
|||
},
|
||||
"matchSubjectAltNames": [
|
||||
{
|
||||
"exact": "spiffe://1c053652-8512-4373-90cf-5a7f6263a994.consul/ns/default/dc/cluster-01-dc/svc/payments"
|
||||
"exact": "spiffe://1c053652-8512-4373-90cf-5a7f6263a994.consul/ns/default/dc/cluster-01-dc/svc/db"
|
||||
}
|
||||
]
|
||||
}
|
||||
},
|
||||
"sni": "payments.default.default.cluster-01.external.1c053652-8512-4373-90cf-5a7f6263a994.consul"
|
||||
"sni": "db.default.default.cluster-01.external.1c053652-8512-4373-90cf-5a7f6263a994.consul"
|
||||
}
|
||||
}
|
||||
},
|
||||
|
|
|
@ -16,7 +16,7 @@
|
|||
},
|
||||
"connectTimeout": "33s",
|
||||
"circuitBreakers": {
|
||||
"thresholds":[
|
||||
"thresholds": [
|
||||
{
|
||||
"maxConnections": 2048,
|
||||
"maxPendingRequests": 512,
|
||||
|
|
|
@ -16,7 +16,7 @@
|
|||
},
|
||||
"connectTimeout": "33s",
|
||||
"circuitBreakers": {
|
||||
"thresholds":[
|
||||
"thresholds": [
|
||||
{
|
||||
"maxConnections": 4096,
|
||||
"maxPendingRequests": 2048
|
||||
|
|
|
@ -16,7 +16,7 @@
|
|||
},
|
||||
"connectTimeout": "33s",
|
||||
"circuitBreakers": {
|
||||
"thresholds":[
|
||||
"thresholds": [
|
||||
{
|
||||
"maxConnections": 4096
|
||||
}
|
||||
|
|
64
agent/xds/testdata/clusters/mesh-gateway-with-imported-peered-services.latest.golden
vendored
Normal file
64
agent/xds/testdata/clusters/mesh-gateway-with-imported-peered-services.latest.golden
vendored
Normal file
|
@ -0,0 +1,64 @@
|
|||
{
|
||||
"versionInfo": "00000001",
|
||||
"resources": [
|
||||
{
|
||||
"@type": "type.googleapis.com/envoy.config.cluster.v3.Cluster",
|
||||
"name": "alt.default.default.peer-b.external.1c053652-8512-4373-90cf-5a7f6263a994.consul",
|
||||
"type": "LOGICAL_DNS",
|
||||
"edsClusterConfig": {
|
||||
"edsConfig": {
|
||||
"ads": {
|
||||
|
||||
},
|
||||
"resourceApiVersion": "V3"
|
||||
}
|
||||
},
|
||||
"connectTimeout": "5s",
|
||||
"loadAssignment": {
|
||||
"clusterName": "alt.default.default.peer-b.external.1c053652-8512-4373-90cf-5a7f6263a994.consul",
|
||||
"endpoints": [
|
||||
{
|
||||
"lbEndpoints": [
|
||||
{
|
||||
"endpoint": {
|
||||
"address": {
|
||||
"socketAddress": {
|
||||
"address": "alt.us-east-1.elb.notaws.com",
|
||||
"portValue": 8443
|
||||
}
|
||||
}
|
||||
},
|
||||
"healthStatus": "HEALTHY",
|
||||
"loadBalancingWeight": 1
|
||||
}
|
||||
]
|
||||
}
|
||||
]
|
||||
},
|
||||
"dnsRefreshRate": "10s",
|
||||
"dnsLookupFamily": "V4_ONLY",
|
||||
"outlierDetection": {
|
||||
|
||||
}
|
||||
},
|
||||
{
|
||||
"@type": "type.googleapis.com/envoy.config.cluster.v3.Cluster",
|
||||
"name": "db.default.default.peer-a.external.1c053652-8512-4373-90cf-5a7f6263a994.consul",
|
||||
"type": "EDS",
|
||||
"edsClusterConfig": {
|
||||
"edsConfig": {
|
||||
"ads": {
|
||||
|
||||
},
|
||||
"resourceApiVersion": "V3"
|
||||
}
|
||||
},
|
||||
"connectTimeout": "5s",
|
||||
"outlierDetection": {
|
||||
|
||||
}
|
||||
}
|
||||
],
|
||||
"typeUrl": "type.googleapis.com/envoy.config.cluster.v3.Cluster",
|
||||
"nonce": "00000001"
|
||||
}
|
|
@ -18,18 +18,6 @@
|
|||
},
|
||||
"healthStatus": "HEALTHY",
|
||||
"loadBalancingWeight": 1
|
||||
},
|
||||
{
|
||||
"endpoint": {
|
||||
"address": {
|
||||
"socketAddress": {
|
||||
"address": "10.40.1.2",
|
||||
"portValue": 8080
|
||||
}
|
||||
}
|
||||
},
|
||||
"healthStatus": "HEALTHY",
|
||||
"loadBalancingWeight": 1
|
||||
}
|
||||
]
|
||||
}
|
||||
|
|
|
@ -18,18 +18,6 @@
|
|||
},
|
||||
"healthStatus": "HEALTHY",
|
||||
"loadBalancingWeight": 1
|
||||
},
|
||||
{
|
||||
"endpoint": {
|
||||
"address": {
|
||||
"socketAddress": {
|
||||
"address": "10.40.1.2",
|
||||
"portValue": 8080
|
||||
}
|
||||
}
|
||||
},
|
||||
"healthStatus": "HEALTHY",
|
||||
"loadBalancingWeight": 1
|
||||
}
|
||||
]
|
||||
}
|
||||
|
|
|
@ -18,18 +18,6 @@
|
|||
},
|
||||
"healthStatus": "HEALTHY",
|
||||
"loadBalancingWeight": 1
|
||||
},
|
||||
{
|
||||
"endpoint": {
|
||||
"address": {
|
||||
"socketAddress": {
|
||||
"address": "10.40.1.2",
|
||||
"portValue": 8080
|
||||
}
|
||||
}
|
||||
},
|
||||
"healthStatus": "HEALTHY",
|
||||
"loadBalancingWeight": 1
|
||||
}
|
||||
]
|
||||
}
|
||||
|
|
41
agent/xds/testdata/endpoints/mesh-gateway-with-imported-peered-services.latest.golden
vendored
Normal file
41
agent/xds/testdata/endpoints/mesh-gateway-with-imported-peered-services.latest.golden
vendored
Normal file
|
@ -0,0 +1,41 @@
|
|||
{
|
||||
"versionInfo": "00000001",
|
||||
"resources": [
|
||||
{
|
||||
"@type": "type.googleapis.com/envoy.config.endpoint.v3.ClusterLoadAssignment",
|
||||
"clusterName": "db.default.default.peer-a.external.1c053652-8512-4373-90cf-5a7f6263a994.consul",
|
||||
"endpoints": [
|
||||
{
|
||||
"lbEndpoints": [
|
||||
{
|
||||
"endpoint": {
|
||||
"address": {
|
||||
"socketAddress": {
|
||||
"address": "10.40.1.1",
|
||||
"portValue": 8080
|
||||
}
|
||||
}
|
||||
},
|
||||
"healthStatus": "HEALTHY",
|
||||
"loadBalancingWeight": 1
|
||||
},
|
||||
{
|
||||
"endpoint": {
|
||||
"address": {
|
||||
"socketAddress": {
|
||||
"address": "10.40.1.2",
|
||||
"portValue": 8080
|
||||
}
|
||||
}
|
||||
},
|
||||
"healthStatus": "HEALTHY",
|
||||
"loadBalancingWeight": 1
|
||||
}
|
||||
]
|
||||
}
|
||||
]
|
||||
}
|
||||
],
|
||||
"typeUrl": "type.googleapis.com/envoy.config.endpoint.v3.ClusterLoadAssignment",
|
||||
"nonce": "00000001"
|
||||
}
|
|
@ -113,7 +113,9 @@
|
|||
],
|
||||
"trafficDirection": "INBOUND",
|
||||
"connectionBalanceConfig": {
|
||||
"exactBalance": {}
|
||||
"exactBalance": {
|
||||
|
||||
}
|
||||
}
|
||||
}
|
||||
],
|
||||
|
|
|
@ -26,7 +26,9 @@
|
|||
],
|
||||
"trafficDirection": "OUTBOUND",
|
||||
"connectionBalanceConfig": {
|
||||
"exactBalance": {}
|
||||
"exactBalance": {
|
||||
|
||||
}
|
||||
}
|
||||
},
|
||||
{
|
||||
|
|
45
agent/xds/testdata/listeners/mesh-gateway-with-imported-peered-services.latest.golden
vendored
Normal file
45
agent/xds/testdata/listeners/mesh-gateway-with-imported-peered-services.latest.golden
vendored
Normal file
|
@ -0,0 +1,45 @@
|
|||
{
|
||||
"versionInfo": "00000001",
|
||||
"resources": [
|
||||
{
|
||||
"@type": "type.googleapis.com/envoy.config.listener.v3.Listener",
|
||||
"name": "default:1.2.3.4:8443",
|
||||
"address": {
|
||||
"socketAddress": {
|
||||
"address": "1.2.3.4",
|
||||
"portValue": 8443
|
||||
}
|
||||
},
|
||||
"filterChains": [
|
||||
{
|
||||
"filters": [
|
||||
{
|
||||
"name": "envoy.filters.network.sni_cluster",
|
||||
"typedConfig": {
|
||||
"@type": "type.googleapis.com/envoy.extensions.filters.network.sni_cluster.v3.SniCluster"
|
||||
}
|
||||
},
|
||||
{
|
||||
"name": "envoy.filters.network.tcp_proxy",
|
||||
"typedConfig": {
|
||||
"@type": "type.googleapis.com/envoy.extensions.filters.network.tcp_proxy.v3.TcpProxy",
|
||||
"statPrefix": "mesh_gateway_local.default",
|
||||
"cluster": ""
|
||||
}
|
||||
}
|
||||
]
|
||||
}
|
||||
],
|
||||
"listenerFilters": [
|
||||
{
|
||||
"name": "envoy.filters.listener.tls_inspector",
|
||||
"typedConfig": {
|
||||
"@type": "type.googleapis.com/envoy.extensions.filters.listener.tls_inspector.v3.TlsInspector"
|
||||
}
|
||||
}
|
||||
]
|
||||
}
|
||||
],
|
||||
"typeUrl": "type.googleapis.com/envoy.config.listener.v3.Listener",
|
||||
"nonce": "00000001"
|
||||
}
|
5
agent/xds/testdata/routes/mesh-gateway-with-imported-peered-services.latest.golden
vendored
Normal file
5
agent/xds/testdata/routes/mesh-gateway-with-imported-peered-services.latest.golden
vendored
Normal file
|
@ -0,0 +1,5 @@
|
|||
{
|
||||
"versionInfo": "00000001",
|
||||
"typeUrl": "type.googleapis.com/envoy.config.route.v3.RouteConfiguration",
|
||||
"nonce": "00000001"
|
||||
}
|
|
@ -9,6 +9,9 @@ services {
|
|||
destination_name = "s2"
|
||||
destination_peer = "primary-to-alpha"
|
||||
local_bind_port = 5000
|
||||
mesh_gateway {
|
||||
mode = "local"
|
||||
}
|
||||
}
|
||||
]
|
||||
}
|
||||
|
|
|
@ -55,3 +55,7 @@ load helpers
|
|||
@test "s1 upstream made 1 connection to s2" {
|
||||
assert_envoy_metric_at_least 127.0.0.1:19000 "cluster.s2.default.primary-to-alpha.external.*cx_total" 1
|
||||
}
|
||||
|
||||
@test "s1 upstream made 1 connection to s2 through the primary mesh gateway" {
|
||||
assert_envoy_metric_at_least 127.0.0.1:19001 "cluster.s2.default.default.alpha-to-primary.external.*cx_total" 1
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue