Merge pull request #14734 from hashicorp/NET-643-update-mesh-gateway-envoy-config-for-inbound-peering-control-plane-traffic
This commit is contained in:
commit
89141256c7
|
@ -8,6 +8,7 @@ import (
|
|||
"time"
|
||||
|
||||
cachetype "github.com/hashicorp/consul/agent/cache-types"
|
||||
"github.com/hashicorp/consul/agent/proxycfg/internal/watch"
|
||||
"github.com/hashicorp/consul/agent/structs"
|
||||
"github.com/hashicorp/consul/lib/maps"
|
||||
"github.com/hashicorp/consul/logging"
|
||||
|
@ -21,6 +22,8 @@ type handlerMeshGateway struct {
|
|||
// initialize sets up the watches needed based on the current mesh gateway registration
|
||||
func (s *handlerMeshGateway) initialize(ctx context.Context) (ConfigSnapshot, error) {
|
||||
snap := newConfigSnapshotFromServiceInstance(s.serviceInstance, s.stateConfig)
|
||||
snap.MeshGateway.WatchedConsulServers = watch.NewMap[string, structs.CheckServiceNodes]()
|
||||
|
||||
// Watch for root changes
|
||||
err := s.dataSources.CARoots.Notify(ctx, &structs.DCSpecificRequest{
|
||||
Datacenter: s.source.Datacenter,
|
||||
|
@ -76,7 +79,7 @@ func (s *handlerMeshGateway) initialize(ctx context.Context) (ConfigSnapshot, er
|
|||
}
|
||||
|
||||
if s.proxyID.InDefaultPartition() {
|
||||
if err := s.initializeCrossDCWatches(ctx); err != nil {
|
||||
if err := s.initializeCrossDCWatches(ctx, &snap); err != nil {
|
||||
return snap, err
|
||||
}
|
||||
}
|
||||
|
@ -123,7 +126,7 @@ func (s *handlerMeshGateway) initialize(ctx context.Context) (ConfigSnapshot, er
|
|||
return snap, err
|
||||
}
|
||||
|
||||
func (s *handlerMeshGateway) initializeCrossDCWatches(ctx context.Context) error {
|
||||
func (s *handlerMeshGateway) initializeCrossDCWatches(ctx context.Context, snap *ConfigSnapshot) error {
|
||||
if s.meta[structs.MetaWANFederationKey] == "1" {
|
||||
// Conveniently we can just use this service meta attribute in one
|
||||
// place here to set the machinery in motion and leave the conditional
|
||||
|
@ -145,6 +148,7 @@ func (s *handlerMeshGateway) initializeCrossDCWatches(ctx context.Context) error
|
|||
if err != nil {
|
||||
return err
|
||||
}
|
||||
snap.MeshGateway.WatchedConsulServers.InitWatch(structs.ConsulServiceName, nil)
|
||||
}
|
||||
|
||||
err := s.dataSources.Datacenters.Notify(ctx, &structs.DatacentersRequest{
|
||||
|
@ -325,7 +329,6 @@ func (s *handlerMeshGateway) handleUpdate(ctx context.Context, u UpdateEvent, sn
|
|||
return fmt.Errorf("invalid type for response: %T", u.Result)
|
||||
}
|
||||
|
||||
// Do some initial sanity checks to avoid doing something dumb.
|
||||
for _, csn := range resp.Nodes {
|
||||
if csn.Service.Service != structs.ConsulServiceName {
|
||||
return fmt.Errorf("expected service name %q but got %q",
|
||||
|
@ -337,7 +340,7 @@ func (s *handlerMeshGateway) handleUpdate(ctx context.Context, u UpdateEvent, sn
|
|||
}
|
||||
}
|
||||
|
||||
snap.MeshGateway.ConsulServers = resp.Nodes
|
||||
snap.MeshGateway.WatchedConsulServers.Set(structs.ConsulServiceName, resp.Nodes)
|
||||
|
||||
case exportedServiceListWatchID:
|
||||
exportedServices, ok := u.Result.(*structs.IndexedExportedServiceList)
|
||||
|
@ -463,17 +466,55 @@ func (s *handlerMeshGateway) handleUpdate(ctx context.Context, u UpdateEvent, sn
|
|||
return fmt.Errorf("invalid type for response: %T", u.Result)
|
||||
}
|
||||
|
||||
if resp.Entry != nil {
|
||||
meshConf, ok := resp.Entry.(*structs.MeshConfigEntry)
|
||||
if !ok {
|
||||
return fmt.Errorf("invalid type for config entry: %T", resp.Entry)
|
||||
}
|
||||
snap.MeshGateway.MeshConfig = meshConf
|
||||
} else {
|
||||
if resp.Entry == nil {
|
||||
snap.MeshGateway.MeshConfig = nil
|
||||
|
||||
// We avoid managing server watches when WAN federation is enabled since it
|
||||
// always requires server watches.
|
||||
if s.meta[structs.MetaWANFederationKey] != "1" {
|
||||
// If the entry was deleted we cancel watches that may have existed because of
|
||||
// PeerThroughMeshGateways being set in the past.
|
||||
snap.MeshGateway.WatchedConsulServers.CancelWatch(structs.ConsulServiceName)
|
||||
}
|
||||
|
||||
snap.MeshGateway.MeshConfigSet = true
|
||||
return nil
|
||||
}
|
||||
|
||||
meshConf, ok := resp.Entry.(*structs.MeshConfigEntry)
|
||||
if !ok {
|
||||
return fmt.Errorf("invalid type for config entry: %T", resp.Entry)
|
||||
}
|
||||
snap.MeshGateway.MeshConfig = meshConf
|
||||
snap.MeshGateway.MeshConfigSet = true
|
||||
|
||||
// We avoid managing Consul server watches when WAN federation is enabled since it
|
||||
// always requires server watches.
|
||||
if s.meta[structs.MetaWANFederationKey] == "1" {
|
||||
return nil
|
||||
}
|
||||
|
||||
if !meshConf.PeerThroughMeshGateways() {
|
||||
snap.MeshGateway.WatchedConsulServers.CancelWatch(structs.ConsulServiceName)
|
||||
return nil
|
||||
}
|
||||
if snap.MeshGateway.WatchedConsulServers.IsWatched(structs.ConsulServiceName) {
|
||||
return nil
|
||||
}
|
||||
|
||||
notifyCtx, cancel := context.WithCancel(ctx)
|
||||
err := s.dataSources.Health.Notify(notifyCtx, &structs.ServiceSpecificRequest{
|
||||
Datacenter: s.source.Datacenter,
|
||||
QueryOptions: structs.QueryOptions{Token: s.token},
|
||||
ServiceName: structs.ConsulServiceName,
|
||||
}, consulServerListWatchID, s.ch)
|
||||
if err != nil {
|
||||
cancel()
|
||||
return fmt.Errorf("failed to watch local consul servers: %w", err)
|
||||
}
|
||||
|
||||
snap.MeshGateway.WatchedConsulServers.InitWatch(structs.ConsulServiceName, cancel)
|
||||
|
||||
default:
|
||||
switch {
|
||||
case strings.HasPrefix(u.CorrelationID, "connect-service:"):
|
||||
|
|
|
@ -385,8 +385,11 @@ type configSnapshotMeshGateway struct {
|
|||
// datacenter.
|
||||
FedStateGateways map[string]structs.CheckServiceNodes
|
||||
|
||||
// ConsulServers is the list of consul servers in this datacenter.
|
||||
ConsulServers structs.CheckServiceNodes
|
||||
// WatchedConsulServers is a map of (structs.ConsulServiceName -> structs.CheckServiceNodes)`
|
||||
// Mesh gateways can spin up watches for local servers both for
|
||||
// WAN federation and for peering. This map ensures we only have one
|
||||
// watch at a time.
|
||||
WatchedConsulServers watch.Map[string, structs.CheckServiceNodes]
|
||||
|
||||
// HostnameDatacenters is a map of datacenters to mesh gateway instances with a hostname as the address.
|
||||
// If hostnames are configured they must be provided to Envoy via CDS not EDS.
|
||||
|
@ -566,8 +569,8 @@ func (c *configSnapshotMeshGateway) isEmpty() bool {
|
|||
len(c.ServiceResolvers) == 0 &&
|
||||
len(c.GatewayGroups) == 0 &&
|
||||
len(c.FedStateGateways) == 0 &&
|
||||
len(c.ConsulServers) == 0 &&
|
||||
len(c.HostnameDatacenters) == 0 &&
|
||||
c.WatchedConsulServers.Len() == 0 &&
|
||||
c.isEmptyPeering()
|
||||
}
|
||||
|
||||
|
@ -703,8 +706,11 @@ func (s *ConfigSnapshot) Valid() bool {
|
|||
s.TerminatingGateway.MeshConfigSet
|
||||
|
||||
case structs.ServiceKindMeshGateway:
|
||||
if s.ServiceMeta[structs.MetaWANFederationKey] == "1" {
|
||||
if len(s.MeshGateway.ConsulServers) == 0 {
|
||||
if s.MeshGateway.WatchedConsulServers.Len() == 0 {
|
||||
if s.ServiceMeta[structs.MetaWANFederationKey] == "1" {
|
||||
return false
|
||||
}
|
||||
if cfg := s.MeshConfig(); cfg.PeerThroughMeshGateways() {
|
||||
return false
|
||||
}
|
||||
}
|
||||
|
|
|
@ -779,6 +779,9 @@ func TestState_WatchesAndUpdates(t *testing.T) {
|
|||
Service: "mesh-gateway",
|
||||
Address: "10.0.1.1",
|
||||
Port: 443,
|
||||
Meta: map[string]string{
|
||||
structs.MetaWANFederationKey: "1",
|
||||
},
|
||||
},
|
||||
sourceDC: "dc1",
|
||||
stages: []verificationStage{
|
||||
|
@ -790,6 +793,7 @@ func TestState_WatchesAndUpdates(t *testing.T) {
|
|||
exportedServiceListWatchID: genVerifyDCSpecificWatch("dc1"),
|
||||
meshConfigEntryID: genVerifyMeshConfigWatch("dc1"),
|
||||
peeringTrustBundlesWatchID: genVerifyTrustBundleListWatchForMeshGateway(""),
|
||||
consulServerListWatchID: genVerifyServiceSpecificPeeredRequest(structs.ConsulServiceName, "", "dc1", "", false),
|
||||
},
|
||||
verifySnapshot: func(t testing.TB, snap *ConfigSnapshot) {
|
||||
require.False(t, snap.Valid(), "gateway without root is not valid")
|
||||
|
@ -1015,6 +1019,186 @@ func TestState_WatchesAndUpdates(t *testing.T) {
|
|||
},
|
||||
},
|
||||
},
|
||||
"mesh-gateway-peering-control-plane": {
|
||||
ns: structs.NodeService{
|
||||
Kind: structs.ServiceKindMeshGateway,
|
||||
ID: "mesh-gateway",
|
||||
Service: "mesh-gateway",
|
||||
Address: "10.0.1.1",
|
||||
Port: 443,
|
||||
},
|
||||
sourceDC: "dc1",
|
||||
stages: []verificationStage{
|
||||
{
|
||||
requiredWatches: map[string]verifyWatchRequest{
|
||||
datacentersWatchID: verifyDatacentersWatch,
|
||||
serviceListWatchID: genVerifyDCSpecificWatch("dc1"),
|
||||
rootsWatchID: genVerifyDCSpecificWatch("dc1"),
|
||||
exportedServiceListWatchID: genVerifyDCSpecificWatch("dc1"),
|
||||
meshConfigEntryID: genVerifyMeshConfigWatch("dc1"),
|
||||
peeringTrustBundlesWatchID: genVerifyTrustBundleListWatchForMeshGateway(""),
|
||||
},
|
||||
verifySnapshot: func(t testing.TB, snap *ConfigSnapshot) {
|
||||
require.False(t, snap.Valid(), "gateway without root is not valid")
|
||||
},
|
||||
},
|
||||
{
|
||||
events: []UpdateEvent{
|
||||
rootWatchEvent(),
|
||||
{
|
||||
CorrelationID: meshConfigEntryID,
|
||||
Result: &structs.ConfigEntryResponse{
|
||||
Entry: &structs.MeshConfigEntry{
|
||||
Peering: &structs.PeeringMeshConfig{
|
||||
PeerThroughMeshGateways: true,
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
{
|
||||
CorrelationID: exportedServiceListWatchID,
|
||||
Result: &structs.IndexedExportedServiceList{
|
||||
Services: nil,
|
||||
},
|
||||
},
|
||||
{
|
||||
CorrelationID: serviceListWatchID,
|
||||
Result: &structs.IndexedServiceList{
|
||||
Services: structs.ServiceList{},
|
||||
},
|
||||
},
|
||||
{
|
||||
CorrelationID: peeringTrustBundlesWatchID,
|
||||
Result: &pbpeering.TrustBundleListByServiceResponse{
|
||||
Bundles: nil,
|
||||
},
|
||||
},
|
||||
},
|
||||
verifySnapshot: func(t testing.TB, snap *ConfigSnapshot) {
|
||||
require.Equal(t, indexedRoots, snap.Roots)
|
||||
require.True(t, snap.MeshGateway.WatchedServicesSet)
|
||||
require.True(t, snap.MeshGateway.PeeringTrustBundlesSet)
|
||||
require.True(t, snap.MeshGateway.MeshConfigSet)
|
||||
|
||||
require.True(t, snap.Valid(), "gateway without services is valid")
|
||||
require.True(t, snap.ConnectProxy.isEmpty())
|
||||
},
|
||||
},
|
||||
{
|
||||
requiredWatches: map[string]verifyWatchRequest{
|
||||
consulServerListWatchID: genVerifyServiceSpecificPeeredRequest(structs.ConsulServiceName, "", "dc1", "", false),
|
||||
},
|
||||
events: []UpdateEvent{
|
||||
{
|
||||
CorrelationID: consulServerListWatchID,
|
||||
Result: &structs.IndexedCheckServiceNodes{
|
||||
Nodes: structs.CheckServiceNodes{
|
||||
{
|
||||
Node: &structs.Node{
|
||||
Datacenter: "dc1",
|
||||
Node: "node1",
|
||||
Address: "127.0.0.1",
|
||||
},
|
||||
Service: &structs.NodeService{
|
||||
ID: structs.ConsulServiceID,
|
||||
Service: structs.ConsulServiceName,
|
||||
},
|
||||
},
|
||||
{
|
||||
Node: &structs.Node{
|
||||
Datacenter: "dc1",
|
||||
Node: "replica1",
|
||||
Address: "127.0.0.1",
|
||||
},
|
||||
Service: &structs.NodeService{
|
||||
ID: structs.ConsulServiceID,
|
||||
Service: structs.ConsulServiceName,
|
||||
Meta: map[string]string{"read_replica": "true"},
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
Err: nil,
|
||||
},
|
||||
},
|
||||
verifySnapshot: func(t testing.TB, snap *ConfigSnapshot) {
|
||||
require.True(t, snap.Valid())
|
||||
|
||||
servers, ok := snap.MeshGateway.WatchedConsulServers.Get(structs.ConsulServiceName)
|
||||
require.True(t, ok)
|
||||
|
||||
expect := structs.CheckServiceNodes{
|
||||
{
|
||||
Node: &structs.Node{
|
||||
Datacenter: "dc1",
|
||||
Node: "node1",
|
||||
Address: "127.0.0.1",
|
||||
},
|
||||
Service: &structs.NodeService{
|
||||
ID: structs.ConsulServiceID,
|
||||
Service: structs.ConsulServiceName,
|
||||
},
|
||||
},
|
||||
{
|
||||
Node: &structs.Node{
|
||||
Datacenter: "dc1",
|
||||
Node: "replica1",
|
||||
Address: "127.0.0.1",
|
||||
},
|
||||
Service: &structs.NodeService{
|
||||
ID: structs.ConsulServiceID,
|
||||
Service: structs.ConsulServiceName,
|
||||
Meta: map[string]string{"read_replica": "true"},
|
||||
},
|
||||
},
|
||||
}
|
||||
require.Equal(t, expect, servers)
|
||||
},
|
||||
},
|
||||
{
|
||||
events: []UpdateEvent{
|
||||
{
|
||||
CorrelationID: meshConfigEntryID,
|
||||
Result: &structs.ConfigEntryResponse{
|
||||
Entry: &structs.MeshConfigEntry{
|
||||
Peering: &structs.PeeringMeshConfig{
|
||||
PeerThroughMeshGateways: false,
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
verifySnapshot: func(t testing.TB, snap *ConfigSnapshot) {
|
||||
require.True(t, snap.Valid())
|
||||
require.NotNil(t, snap.MeshConfig())
|
||||
|
||||
require.False(t, snap.MeshGateway.WatchedConsulServers.IsWatched(structs.ConsulServiceName))
|
||||
servers, ok := snap.MeshGateway.WatchedConsulServers.Get(structs.ConsulServiceName)
|
||||
require.False(t, ok)
|
||||
require.Empty(t, servers)
|
||||
},
|
||||
},
|
||||
{
|
||||
events: []UpdateEvent{
|
||||
{
|
||||
CorrelationID: meshConfigEntryID,
|
||||
Result: &structs.ConfigEntryResponse{
|
||||
Entry: nil,
|
||||
},
|
||||
},
|
||||
},
|
||||
verifySnapshot: func(t testing.TB, snap *ConfigSnapshot) {
|
||||
require.True(t, snap.Valid())
|
||||
require.Nil(t, snap.MeshConfig())
|
||||
|
||||
require.False(t, snap.MeshGateway.WatchedConsulServers.IsWatched(structs.ConsulServiceName))
|
||||
servers, ok := snap.MeshGateway.WatchedConsulServers.Get(structs.ConsulServiceName)
|
||||
require.False(t, ok)
|
||||
require.Empty(t, servers)
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
"ingress-gateway": {
|
||||
ns: structs.NodeService{
|
||||
Kind: structs.ServiceKindIngressGateway,
|
||||
|
|
|
@ -476,6 +476,106 @@ func TestConfigSnapshotPeeredMeshGateway(t testing.T, variant string, nsFn func(
|
|||
)
|
||||
|
||||
switch variant {
|
||||
case "control-plane":
|
||||
extraUpdates = append(extraUpdates,
|
||||
UpdateEvent{
|
||||
CorrelationID: meshConfigEntryID,
|
||||
Result: &structs.ConfigEntryResponse{
|
||||
Entry: &structs.MeshConfigEntry{
|
||||
Peering: &structs.PeeringMeshConfig{
|
||||
PeerThroughMeshGateways: true,
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
UpdateEvent{
|
||||
CorrelationID: consulServerListWatchID,
|
||||
Result: &structs.IndexedCheckServiceNodes{
|
||||
Nodes: structs.CheckServiceNodes{
|
||||
{
|
||||
Node: &structs.Node{
|
||||
Datacenter: "dc1",
|
||||
Node: "replica",
|
||||
Address: "127.0.0.10",
|
||||
},
|
||||
Service: &structs.NodeService{
|
||||
ID: structs.ConsulServiceID,
|
||||
Service: structs.ConsulServiceName,
|
||||
// Read replicas cannot handle peering requests.
|
||||
Meta: map[string]string{"read_replica": "true"},
|
||||
},
|
||||
},
|
||||
{
|
||||
Node: &structs.Node{
|
||||
Datacenter: "dc1",
|
||||
Node: "node1",
|
||||
Address: "127.0.0.1",
|
||||
},
|
||||
Service: &structs.NodeService{
|
||||
ID: structs.ConsulServiceID,
|
||||
Service: structs.ConsulServiceName,
|
||||
Meta: map[string]string{
|
||||
"grpc_port": "8502",
|
||||
"grpc_tls_port": "8503",
|
||||
},
|
||||
},
|
||||
},
|
||||
{
|
||||
Node: &structs.Node{
|
||||
Datacenter: "dc1",
|
||||
Node: "node2",
|
||||
Address: "127.0.0.2",
|
||||
},
|
||||
Service: &structs.NodeService{
|
||||
ID: structs.ConsulServiceID,
|
||||
Service: structs.ConsulServiceName,
|
||||
Meta: map[string]string{
|
||||
"grpc_port": "8502",
|
||||
"grpc_tls_port": "8503",
|
||||
},
|
||||
TaggedAddresses: map[string]structs.ServiceAddress{
|
||||
// WAN address is not considered for traffic from local gateway to local servers.
|
||||
structs.TaggedAddressWAN: {
|
||||
Address: "consul.server.dc1.my-domain",
|
||||
Port: 10101,
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
{
|
||||
Node: &structs.Node{
|
||||
Datacenter: "dc1",
|
||||
Node: "node3",
|
||||
Address: "127.0.0.3",
|
||||
},
|
||||
Service: &structs.NodeService{
|
||||
ID: structs.ConsulServiceID,
|
||||
Service: structs.ConsulServiceName,
|
||||
Meta: map[string]string{
|
||||
// Peering is not allowed over deprecated non-TLS gRPC port.
|
||||
"grpc_port": "8502",
|
||||
},
|
||||
},
|
||||
},
|
||||
{
|
||||
Node: &structs.Node{
|
||||
Datacenter: "dc1",
|
||||
Node: "node4",
|
||||
Address: "127.0.0.4",
|
||||
},
|
||||
Service: &structs.NodeService{
|
||||
ID: structs.ConsulServiceID,
|
||||
Service: structs.ConsulServiceName,
|
||||
Meta: map[string]string{
|
||||
// Must have valid gRPC port.
|
||||
"grpc_tls_port": "bad",
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
)
|
||||
case "default-services-http":
|
||||
proxyDefaults := &structs.ProxyConfigEntry{
|
||||
Config: map[string]interface{}{
|
||||
|
|
|
@ -155,6 +155,13 @@ func (e *MeshConfigEntry) MarshalJSON() ([]byte, error) {
|
|||
return json.Marshal(source)
|
||||
}
|
||||
|
||||
func (e *MeshConfigEntry) PeerThroughMeshGateways() bool {
|
||||
if e == nil || e.Peering == nil {
|
||||
return false
|
||||
}
|
||||
return e.Peering.PeerThroughMeshGateways
|
||||
}
|
||||
|
||||
func validateMeshDirectionalTLSConfig(cfg *MeshDirectionalTLSConfig) error {
|
||||
if cfg == nil {
|
||||
return nil
|
||||
|
|
|
@ -0,0 +1,46 @@
|
|||
package structs
|
||||
|
||||
import (
|
||||
"testing"
|
||||
|
||||
"github.com/stretchr/testify/assert"
|
||||
)
|
||||
|
||||
func TestMeshConfigEntry_PeerThroughMeshGateways(t *testing.T) {
|
||||
tests := map[string]struct {
|
||||
input *MeshConfigEntry
|
||||
want bool
|
||||
}{
|
||||
"nil entry": {
|
||||
input: nil,
|
||||
want: false,
|
||||
},
|
||||
"nil peering config": {
|
||||
input: &MeshConfigEntry{
|
||||
Peering: nil,
|
||||
},
|
||||
want: false,
|
||||
},
|
||||
"not peering through gateways": {
|
||||
input: &MeshConfigEntry{
|
||||
Peering: &PeeringMeshConfig{
|
||||
PeerThroughMeshGateways: false,
|
||||
},
|
||||
},
|
||||
want: false,
|
||||
},
|
||||
"peering through gateways": {
|
||||
input: &MeshConfigEntry{
|
||||
Peering: &PeeringMeshConfig{
|
||||
PeerThroughMeshGateways: true,
|
||||
},
|
||||
},
|
||||
want: true,
|
||||
},
|
||||
}
|
||||
for name, tc := range tests {
|
||||
t.Run(name, func(t *testing.T) {
|
||||
assert.Equalf(t, tc.want, tc.input.PeerThroughMeshGateways(), "PeerThroughMeshGateways()")
|
||||
})
|
||||
}
|
||||
}
|
|
@ -386,7 +386,8 @@ func (s *ResourceGenerator) clustersFromSnapshotMeshGateway(cfgSnap *proxycfg.Co
|
|||
}
|
||||
|
||||
// And for the current datacenter, send all flavors appropriately.
|
||||
for _, srv := range cfgSnap.MeshGateway.ConsulServers {
|
||||
servers, _ := cfgSnap.MeshGateway.WatchedConsulServers.Get(structs.ConsulServiceName)
|
||||
for _, srv := range servers {
|
||||
opts := clusterOpts{
|
||||
name: cfgSnap.ServerSNIFn(cfgSnap.Datacenter, srv.Node.Node),
|
||||
}
|
||||
|
@ -395,6 +396,21 @@ func (s *ResourceGenerator) clustersFromSnapshotMeshGateway(cfgSnap *proxycfg.Co
|
|||
}
|
||||
}
|
||||
|
||||
// Create a single cluster for local servers to be dialed by peers.
|
||||
// When peering through gateways we load balance across the local servers. They cannot be addressed individually.
|
||||
if cfg := cfgSnap.MeshConfig(); cfg.PeerThroughMeshGateways() {
|
||||
servers, _ := cfgSnap.MeshGateway.WatchedConsulServers.Get(structs.ConsulServiceName)
|
||||
|
||||
// Peering control-plane traffic can only ever be handled by the local leader.
|
||||
// We avoid routing to read replicas since they will never be Raft voters.
|
||||
if haveVoters(servers) {
|
||||
cluster := s.makeGatewayCluster(cfgSnap, clusterOpts{
|
||||
name: connect.PeeringServerSAN(cfgSnap.Datacenter, cfgSnap.Roots.TrustDomain),
|
||||
})
|
||||
clusters = append(clusters, cluster)
|
||||
}
|
||||
}
|
||||
|
||||
// generate the per-service/subset clusters
|
||||
c, err := s.makeGatewayServiceClusters(cfgSnap, cfgSnap.MeshGateway.ServiceGroups, cfgSnap.MeshGateway.ServiceResolvers)
|
||||
if err != nil {
|
||||
|
@ -412,6 +428,16 @@ func (s *ResourceGenerator) clustersFromSnapshotMeshGateway(cfgSnap *proxycfg.Co
|
|||
return clusters, nil
|
||||
}
|
||||
|
||||
func haveVoters(servers structs.CheckServiceNodes) bool {
|
||||
for _, srv := range servers {
|
||||
if isReplica := srv.Service.Meta["read_replica"]; isReplica == "true" {
|
||||
continue
|
||||
}
|
||||
return true
|
||||
}
|
||||
return false
|
||||
}
|
||||
|
||||
// clustersFromSnapshotTerminatingGateway returns the xDS API representation of the "clusters"
|
||||
// for a terminating gateway. This will include 1 cluster per Destination associated with this terminating gateway.
|
||||
func (s *ResourceGenerator) clustersFromSnapshotTerminatingGateway(cfgSnap *proxycfg.ConfigSnapshot) ([]proto.Message, error) {
|
||||
|
|
|
@ -3,11 +3,11 @@ package xds
|
|||
import (
|
||||
"errors"
|
||||
"fmt"
|
||||
"strconv"
|
||||
|
||||
envoy_cluster_v3 "github.com/envoyproxy/go-control-plane/envoy/config/cluster/v3"
|
||||
envoy_core_v3 "github.com/envoyproxy/go-control-plane/envoy/config/core/v3"
|
||||
envoy_endpoint_v3 "github.com/envoyproxy/go-control-plane/envoy/config/endpoint/v3"
|
||||
|
||||
"github.com/golang/protobuf/proto"
|
||||
"github.com/hashicorp/go-bexpr"
|
||||
|
||||
|
@ -248,7 +248,8 @@ func (s *ResourceGenerator) endpointsFromSnapshotMeshGateway(cfgSnap *proxycfg.C
|
|||
cfgSnap.ServerSNIFn != nil {
|
||||
var allServersLbEndpoints []*envoy_endpoint_v3.LbEndpoint
|
||||
|
||||
for _, srv := range cfgSnap.MeshGateway.ConsulServers {
|
||||
servers, _ := cfgSnap.MeshGateway.WatchedConsulServers.Get(structs.ConsulServiceName)
|
||||
for _, srv := range servers {
|
||||
clusterName := cfgSnap.ServerSNIFn(cfgSnap.Datacenter, srv.Node.Node)
|
||||
|
||||
_, addr, port := srv.BestAddress(false /*wan*/)
|
||||
|
@ -283,6 +284,50 @@ func (s *ResourceGenerator) endpointsFromSnapshotMeshGateway(cfgSnap *proxycfg.C
|
|||
})
|
||||
}
|
||||
|
||||
// Create endpoints for the cluster where local servers will be dialed by peers.
|
||||
// When peering through gateways we load balance across the local servers. They cannot be addressed individually.
|
||||
if cfg := cfgSnap.MeshConfig(); cfg.PeerThroughMeshGateways() {
|
||||
var serverEndpoints []*envoy_endpoint_v3.LbEndpoint
|
||||
|
||||
servers, _ := cfgSnap.MeshGateway.WatchedConsulServers.Get(structs.ConsulServiceName)
|
||||
for _, srv := range servers {
|
||||
if isReplica := srv.Service.Meta["read_replica"]; isReplica == "true" {
|
||||
// Peering control-plane traffic can only ever be handled by the local leader.
|
||||
// We avoid routing to read replicas since they will never be Raft voters.
|
||||
continue
|
||||
}
|
||||
|
||||
_, addr, _ := srv.BestAddress(false)
|
||||
portStr, ok := srv.Service.Meta["grpc_tls_port"]
|
||||
if !ok {
|
||||
s.Logger.Warn("peering is enabled but local server %q does not have the required gRPC TLS port configured",
|
||||
"server", srv.Node.Node)
|
||||
continue
|
||||
}
|
||||
port, err := strconv.Atoi(portStr)
|
||||
if err != nil {
|
||||
s.Logger.Error("peering is enabled but local server has invalid gRPC TLS port",
|
||||
"server", srv.Node.Node, "port", portStr, "error", err)
|
||||
continue
|
||||
}
|
||||
|
||||
serverEndpoints = append(serverEndpoints, &envoy_endpoint_v3.LbEndpoint{
|
||||
HostIdentifier: &envoy_endpoint_v3.LbEndpoint_Endpoint{
|
||||
Endpoint: &envoy_endpoint_v3.Endpoint{
|
||||
Address: makeAddress(addr, port),
|
||||
},
|
||||
},
|
||||
})
|
||||
}
|
||||
|
||||
resources = append(resources, &envoy_endpoint_v3.ClusterLoadAssignment{
|
||||
ClusterName: connect.PeeringServerSAN(cfgSnap.Datacenter, cfgSnap.Roots.TrustDomain),
|
||||
Endpoints: []*envoy_endpoint_v3.LocalityLbEndpoints{{
|
||||
LbEndpoints: serverEndpoints,
|
||||
}},
|
||||
})
|
||||
}
|
||||
|
||||
// Generate the endpoints for each service and its subsets
|
||||
e, err := s.endpointsFromServicesAndResolvers(cfgSnap, cfgSnap.MeshGateway.ServiceGroups, cfgSnap.MeshGateway.ServiceResolvers)
|
||||
if err != nil {
|
||||
|
|
|
@ -1757,7 +1757,8 @@ func (s *ResourceGenerator) makeMeshGatewayListener(name, addr string, port int,
|
|||
}
|
||||
|
||||
// Wildcard all flavors to each server.
|
||||
for _, srv := range cfgSnap.MeshGateway.ConsulServers {
|
||||
servers, _ := cfgSnap.MeshGateway.WatchedConsulServers.Get(structs.ConsulServiceName)
|
||||
for _, srv := range servers {
|
||||
clusterName := cfgSnap.ServerSNIFn(cfgSnap.Datacenter, srv.Node.Node)
|
||||
|
||||
filterName := fmt.Sprintf("%s.%s", name, cfgSnap.Datacenter)
|
||||
|
@ -1768,7 +1769,7 @@ func (s *ResourceGenerator) makeMeshGatewayListener(name, addr string, port int,
|
|||
|
||||
l.FilterChains = append(l.FilterChains, &envoy_listener_v3.FilterChain{
|
||||
FilterChainMatch: &envoy_listener_v3.FilterChainMatch{
|
||||
ServerNames: []string{fmt.Sprintf("%s", clusterName)},
|
||||
ServerNames: []string{clusterName},
|
||||
},
|
||||
Filters: []*envoy_listener_v3.Filter{
|
||||
dcTCPProxy,
|
||||
|
@ -1777,6 +1778,33 @@ func (s *ResourceGenerator) makeMeshGatewayListener(name, addr string, port int,
|
|||
}
|
||||
}
|
||||
|
||||
// Create a single cluster for local servers to be dialed by peers.
|
||||
// When peering through gateways we load balance across the local servers. They cannot be addressed individually.
|
||||
if cfg := cfgSnap.MeshConfig(); cfg.PeerThroughMeshGateways() {
|
||||
servers, _ := cfgSnap.MeshGateway.WatchedConsulServers.Get(structs.ConsulServiceName)
|
||||
|
||||
// Peering control-plane traffic can only ever be handled by the local leader.
|
||||
// We avoid routing to read replicas since they will never be Raft voters.
|
||||
if haveVoters(servers) {
|
||||
clusterName := connect.PeeringServerSAN(cfgSnap.Datacenter, cfgSnap.Roots.TrustDomain)
|
||||
filterName := fmt.Sprintf("%s.%s", name, cfgSnap.Datacenter)
|
||||
|
||||
filter, err := makeTCPProxyFilter(filterName, clusterName, "mesh_gateway_local_peering_server.")
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
l.FilterChains = append(l.FilterChains, &envoy_listener_v3.FilterChain{
|
||||
FilterChainMatch: &envoy_listener_v3.FilterChainMatch{
|
||||
ServerNames: []string{clusterName},
|
||||
},
|
||||
Filters: []*envoy_listener_v3.Filter{
|
||||
filter,
|
||||
},
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
// This needs to get tacked on at the end as it has no
|
||||
// matching and will act as a catch all
|
||||
l.FilterChains = append(l.FilterChains, sniClusterChain)
|
||||
|
|
|
@ -219,6 +219,12 @@ func getMeshGatewayPeeringGoldenTestCases() []goldenTestCase {
|
|||
return proxycfg.TestConfigSnapshotPeeredMeshGateway(t, "chain-and-l7-stuff", nil, nil)
|
||||
},
|
||||
},
|
||||
{
|
||||
name: "mesh-gateway-peering-control-plane",
|
||||
create: func(t testinf.T) *proxycfg.ConfigSnapshot {
|
||||
return proxycfg.TestConfigSnapshotPeeredMeshGateway(t, "control-plane", nil, nil)
|
||||
},
|
||||
},
|
||||
}
|
||||
}
|
||||
|
||||
|
|
24
agent/xds/testdata/clusters/mesh-gateway-peering-control-plane.latest.golden
vendored
Normal file
24
agent/xds/testdata/clusters/mesh-gateway-peering-control-plane.latest.golden
vendored
Normal file
|
@ -0,0 +1,24 @@
|
|||
{
|
||||
"versionInfo": "00000001",
|
||||
"resources": [
|
||||
{
|
||||
"@type": "type.googleapis.com/envoy.config.cluster.v3.Cluster",
|
||||
"name": "server.dc1.peering.11111111-2222-3333-4444-555555555555.consul",
|
||||
"type": "EDS",
|
||||
"edsClusterConfig": {
|
||||
"edsConfig": {
|
||||
"ads": {
|
||||
|
||||
},
|
||||
"resourceApiVersion": "V3"
|
||||
}
|
||||
},
|
||||
"connectTimeout": "5s",
|
||||
"outlierDetection": {
|
||||
|
||||
}
|
||||
}
|
||||
],
|
||||
"typeUrl": "type.googleapis.com/envoy.config.cluster.v3.Cluster",
|
||||
"nonce": "00000001"
|
||||
}
|
37
agent/xds/testdata/endpoints/mesh-gateway-peering-control-plane.latest.golden
vendored
Normal file
37
agent/xds/testdata/endpoints/mesh-gateway-peering-control-plane.latest.golden
vendored
Normal file
|
@ -0,0 +1,37 @@
|
|||
{
|
||||
"versionInfo": "00000001",
|
||||
"resources": [
|
||||
{
|
||||
"@type": "type.googleapis.com/envoy.config.endpoint.v3.ClusterLoadAssignment",
|
||||
"clusterName": "server.dc1.peering.11111111-2222-3333-4444-555555555555.consul",
|
||||
"endpoints": [
|
||||
{
|
||||
"lbEndpoints": [
|
||||
{
|
||||
"endpoint": {
|
||||
"address": {
|
||||
"socketAddress": {
|
||||
"address": "127.0.0.1",
|
||||
"portValue": 8503
|
||||
}
|
||||
}
|
||||
}
|
||||
},
|
||||
{
|
||||
"endpoint": {
|
||||
"address": {
|
||||
"socketAddress": {
|
||||
"address": "127.0.0.2",
|
||||
"portValue": 8503
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
]
|
||||
}
|
||||
]
|
||||
}
|
||||
],
|
||||
"typeUrl": "type.googleapis.com/envoy.config.endpoint.v3.ClusterLoadAssignment",
|
||||
"nonce": "00000001"
|
||||
}
|
62
agent/xds/testdata/listeners/mesh-gateway-peering-control-plane.latest.golden
vendored
Normal file
62
agent/xds/testdata/listeners/mesh-gateway-peering-control-plane.latest.golden
vendored
Normal file
|
@ -0,0 +1,62 @@
|
|||
{
|
||||
"versionInfo": "00000001",
|
||||
"resources": [
|
||||
{
|
||||
"@type": "type.googleapis.com/envoy.config.listener.v3.Listener",
|
||||
"name": "default:1.2.3.4:8443",
|
||||
"address": {
|
||||
"socketAddress": {
|
||||
"address": "1.2.3.4",
|
||||
"portValue": 8443
|
||||
}
|
||||
},
|
||||
"filterChains": [
|
||||
{
|
||||
"filterChainMatch": {
|
||||
"serverNames": [
|
||||
"server.dc1.peering.11111111-2222-3333-4444-555555555555.consul"
|
||||
]
|
||||
},
|
||||
"filters": [
|
||||
{
|
||||
"name": "envoy.filters.network.tcp_proxy",
|
||||
"typedConfig": {
|
||||
"@type": "type.googleapis.com/envoy.extensions.filters.network.tcp_proxy.v3.TcpProxy",
|
||||
"statPrefix": "mesh_gateway_local_peering_server.default.dc1",
|
||||
"cluster": "server.dc1.peering.11111111-2222-3333-4444-555555555555.consul"
|
||||
}
|
||||
}
|
||||
]
|
||||
},
|
||||
{
|
||||
"filters": [
|
||||
{
|
||||
"name": "envoy.filters.network.sni_cluster",
|
||||
"typedConfig": {
|
||||
"@type": "type.googleapis.com/envoy.extensions.filters.network.sni_cluster.v3.SniCluster"
|
||||
}
|
||||
},
|
||||
{
|
||||
"name": "envoy.filters.network.tcp_proxy",
|
||||
"typedConfig": {
|
||||
"@type": "type.googleapis.com/envoy.extensions.filters.network.tcp_proxy.v3.TcpProxy",
|
||||
"statPrefix": "mesh_gateway_local.default",
|
||||
"cluster": ""
|
||||
}
|
||||
}
|
||||
]
|
||||
}
|
||||
],
|
||||
"listenerFilters": [
|
||||
{
|
||||
"name": "envoy.filters.listener.tls_inspector",
|
||||
"typedConfig": {
|
||||
"@type": "type.googleapis.com/envoy.extensions.filters.listener.tls_inspector.v3.TlsInspector"
|
||||
}
|
||||
}
|
||||
]
|
||||
}
|
||||
],
|
||||
"typeUrl": "type.googleapis.com/envoy.config.listener.v3.Listener",
|
||||
"nonce": "00000001"
|
||||
}
|
|
@ -0,0 +1,5 @@
|
|||
{
|
||||
"versionInfo": "00000001",
|
||||
"typeUrl": "type.googleapis.com/envoy.config.route.v3.RouteConfiguration",
|
||||
"nonce": "00000001"
|
||||
}
|
Loading…
Reference in New Issue