Update xds generation for peering over mesh gws

This commit adds the xDS resources needed for INBOUND traffic from peer
clusters:

- 1 filter chain for all inbound peering requests.
- 1 cluster for all inbound peering requests.
- 1 endpoint per voting server with the gRPC TLS port configured.

There is one filter chain and cluster because unlike with WAN
federation, peer clusters will not attempt to dial individual servers.
Peer clusters will only dial the local mesh gateway addresses.
This commit is contained in:
freddygv 2022-09-22 21:14:25 -06:00
parent 520507232f
commit 0d61aa5d37
13 changed files with 387 additions and 4 deletions

View File

@ -494,7 +494,7 @@ func (s *handlerMeshGateway) handleUpdate(ctx context.Context, u UpdateEvent, sn
return nil
}
if meshConf.Peering == nil || !meshConf.Peering.PeerThroughMeshGateways {
if !meshConf.PeerThroughMeshGateways() {
snap.MeshGateway.WatchedConsulServers.CancelWatch(structs.ConsulServiceName)
return nil
}

View File

@ -697,7 +697,7 @@ func (s *ConfigSnapshot) Valid() bool {
if s.ServiceMeta[structs.MetaWANFederationKey] == "1" {
return false
}
if cfg := s.MeshConfig(); cfg != nil && cfg.Peering != nil && cfg.Peering.PeerThroughMeshGateways {
if cfg := s.MeshConfig(); cfg.PeerThroughMeshGateways() {
return false
}
}

View File

@ -476,6 +476,106 @@ func TestConfigSnapshotPeeredMeshGateway(t testing.T, variant string, nsFn func(
)
switch variant {
case "control-plane":
extraUpdates = append(extraUpdates,
UpdateEvent{
CorrelationID: meshConfigEntryID,
Result: &structs.ConfigEntryResponse{
Entry: &structs.MeshConfigEntry{
Peering: &structs.PeeringMeshConfig{
PeerThroughMeshGateways: true,
},
},
},
},
UpdateEvent{
CorrelationID: consulServerListWatchID,
Result: &structs.IndexedCheckServiceNodes{
Nodes: structs.CheckServiceNodes{
{
Node: &structs.Node{
Datacenter: "dc1",
Node: "replica",
Address: "127.0.0.10",
},
Service: &structs.NodeService{
ID: structs.ConsulServiceID,
Service: structs.ConsulServiceName,
// Read replicas cannot handle peering requests.
Meta: map[string]string{"read_replica": "true"},
},
},
{
Node: &structs.Node{
Datacenter: "dc1",
Node: "node1",
Address: "127.0.0.1",
},
Service: &structs.NodeService{
ID: structs.ConsulServiceID,
Service: structs.ConsulServiceName,
Meta: map[string]string{
"grpc_port": "8502",
"grpc_tls_port": "8503",
},
},
},
{
Node: &structs.Node{
Datacenter: "dc1",
Node: "node2",
Address: "127.0.0.2",
},
Service: &structs.NodeService{
ID: structs.ConsulServiceID,
Service: structs.ConsulServiceName,
Meta: map[string]string{
"grpc_port": "8502",
"grpc_tls_port": "8503",
},
TaggedAddresses: map[string]structs.ServiceAddress{
// WAN address is not considered for traffic from local gateway to local servers.
structs.TaggedAddressWAN: {
Address: "consul.server.dc1.my-domain",
Port: 10101,
},
},
},
},
{
Node: &structs.Node{
Datacenter: "dc1",
Node: "node3",
Address: "127.0.0.3",
},
Service: &structs.NodeService{
ID: structs.ConsulServiceID,
Service: structs.ConsulServiceName,
Meta: map[string]string{
// Peering is not allowed over deprecated non-TLS gRPC port.
"grpc_port": "8502",
},
},
},
{
Node: &structs.Node{
Datacenter: "dc1",
Node: "node4",
Address: "127.0.0.4",
},
Service: &structs.NodeService{
ID: structs.ConsulServiceID,
Service: structs.ConsulServiceName,
Meta: map[string]string{
// Must have valid gRPC port.
"grpc_tls_port": "bad",
},
},
},
},
},
},
)
case "default-services-http":
proxyDefaults := &structs.ProxyConfigEntry{
Config: map[string]interface{}{

View File

@ -155,6 +155,13 @@ func (e *MeshConfigEntry) MarshalJSON() ([]byte, error) {
return json.Marshal(source)
}
func (e *MeshConfigEntry) PeerThroughMeshGateways() bool {
if e == nil || e.Peering == nil {
return false
}
return e.Peering.PeerThroughMeshGateways
}
func validateMeshDirectionalTLSConfig(cfg *MeshDirectionalTLSConfig) error {
if cfg == nil {
return nil

View File

@ -0,0 +1,46 @@
package structs
import (
"testing"
"github.com/stretchr/testify/assert"
)
func TestMeshConfigEntry_PeerThroughMeshGateways(t *testing.T) {
tests := map[string]struct {
input *MeshConfigEntry
want bool
}{
"nil entry": {
input: nil,
want: false,
},
"nil peering config": {
input: &MeshConfigEntry{
Peering: nil,
},
want: false,
},
"not peering through gateways": {
input: &MeshConfigEntry{
Peering: &PeeringMeshConfig{
PeerThroughMeshGateways: false,
},
},
want: false,
},
"peering through gateways": {
input: &MeshConfigEntry{
Peering: &PeeringMeshConfig{
PeerThroughMeshGateways: true,
},
},
want: true,
},
}
for name, tc := range tests {
t.Run(name, func(t *testing.T) {
assert.Equalf(t, tc.want, tc.input.PeerThroughMeshGateways(), "PeerThroughMeshGateways()")
})
}
}

View File

@ -396,6 +396,21 @@ func (s *ResourceGenerator) clustersFromSnapshotMeshGateway(cfgSnap *proxycfg.Co
}
}
// Create a single cluster for local servers to be dialed by peers.
// When peering through gateways we load balance across the local servers. They cannot be addressed individually.
if cfg := cfgSnap.MeshConfig(); cfg.PeerThroughMeshGateways() {
servers, _ := cfgSnap.MeshGateway.WatchedConsulServers.Get(structs.ConsulServiceName)
// Peering control-plane traffic can only ever be handled by the local leader.
// We avoid routing to read replicas since they will never be Raft voters.
if haveVoters(servers) {
cluster := s.makeGatewayCluster(cfgSnap, clusterOpts{
name: connect.PeeringServerSAN(cfgSnap.Datacenter, cfgSnap.Roots.TrustDomain),
})
clusters = append(clusters, cluster)
}
}
// generate the per-service/subset clusters
c, err := s.makeGatewayServiceClusters(cfgSnap, cfgSnap.MeshGateway.ServiceGroups, cfgSnap.MeshGateway.ServiceResolvers)
if err != nil {
@ -413,6 +428,16 @@ func (s *ResourceGenerator) clustersFromSnapshotMeshGateway(cfgSnap *proxycfg.Co
return clusters, nil
}
func haveVoters(servers structs.CheckServiceNodes) bool {
for _, srv := range servers {
if isReplica := srv.Service.Meta["read_replica"]; isReplica == "true" {
continue
}
return true
}
return false
}
// clustersFromSnapshotTerminatingGateway returns the xDS API representation of the "clusters"
// for a terminating gateway. This will include 1 cluster per Destination associated with this terminating gateway.
func (s *ResourceGenerator) clustersFromSnapshotTerminatingGateway(cfgSnap *proxycfg.ConfigSnapshot) ([]proto.Message, error) {

View File

@ -3,11 +3,11 @@ package xds
import (
"errors"
"fmt"
"strconv"
envoy_cluster_v3 "github.com/envoyproxy/go-control-plane/envoy/config/cluster/v3"
envoy_core_v3 "github.com/envoyproxy/go-control-plane/envoy/config/core/v3"
envoy_endpoint_v3 "github.com/envoyproxy/go-control-plane/envoy/config/endpoint/v3"
"github.com/golang/protobuf/proto"
bexpr "github.com/hashicorp/go-bexpr"
@ -285,6 +285,50 @@ func (s *ResourceGenerator) endpointsFromSnapshotMeshGateway(cfgSnap *proxycfg.C
})
}
// Create endpoints for the cluster where local servers will be dialed by peers.
// When peering through gateways we load balance across the local servers. They cannot be addressed individually.
if cfg := cfgSnap.MeshConfig(); cfg.PeerThroughMeshGateways() {
var serverEndpoints []*envoy_endpoint_v3.LbEndpoint
servers, _ := cfgSnap.MeshGateway.WatchedConsulServers.Get(structs.ConsulServiceName)
for _, srv := range servers {
if isReplica := srv.Service.Meta["read_replica"]; isReplica == "true" {
// Peering control-plane traffic can only ever be handled by the local leader.
// We avoid routing to read replicas since they will never be Raft voters.
continue
}
_, addr, _ := srv.BestAddress(false)
portStr, ok := srv.Service.Meta["grpc_tls_port"]
if !ok {
s.Logger.Warn("peering is enabled but local server %q does not have the required gRPC TLS port configured",
"server", srv.Node.Node)
continue
}
port, err := strconv.Atoi(portStr)
if err != nil {
s.Logger.Error("peering is enabled but local server has invalid gRPC TLS port",
"server", srv.Node.Node, "port", portStr, "error", err)
continue
}
serverEndpoints = append(serverEndpoints, &envoy_endpoint_v3.LbEndpoint{
HostIdentifier: &envoy_endpoint_v3.LbEndpoint_Endpoint{
Endpoint: &envoy_endpoint_v3.Endpoint{
Address: makeAddress(addr, port),
},
},
})
}
resources = append(resources, &envoy_endpoint_v3.ClusterLoadAssignment{
ClusterName: connect.PeeringServerSAN(cfgSnap.Datacenter, cfgSnap.Roots.TrustDomain),
Endpoints: []*envoy_endpoint_v3.LocalityLbEndpoints{{
LbEndpoints: serverEndpoints,
}},
})
}
// Generate the endpoints for each service and its subsets
e, err := s.endpointsFromServicesAndResolvers(cfgSnap, cfgSnap.MeshGateway.ServiceGroups, cfgSnap.MeshGateway.ServiceResolvers)
if err != nil {

View File

@ -1751,7 +1751,7 @@ func (s *ResourceGenerator) makeMeshGatewayListener(name, addr string, port int,
l.FilterChains = append(l.FilterChains, &envoy_listener_v3.FilterChain{
FilterChainMatch: &envoy_listener_v3.FilterChainMatch{
ServerNames: []string{fmt.Sprintf("%s", clusterName)},
ServerNames: []string{clusterName},
},
Filters: []*envoy_listener_v3.Filter{
dcTCPProxy,
@ -1760,6 +1760,33 @@ func (s *ResourceGenerator) makeMeshGatewayListener(name, addr string, port int,
}
}
// Create a single cluster for local servers to be dialed by peers.
// When peering through gateways we load balance across the local servers. They cannot be addressed individually.
if cfg := cfgSnap.MeshConfig(); cfg.PeerThroughMeshGateways() {
servers, _ := cfgSnap.MeshGateway.WatchedConsulServers.Get(structs.ConsulServiceName)
// Peering control-plane traffic can only ever be handled by the local leader.
// We avoid routing to read replicas since they will never be Raft voters.
if haveVoters(servers) {
clusterName := connect.PeeringServerSAN(cfgSnap.Datacenter, cfgSnap.Roots.TrustDomain)
filterName := fmt.Sprintf("%s.%s", name, cfgSnap.Datacenter)
filter, err := makeTCPProxyFilter(filterName, clusterName, "mesh_gateway_local_peering_server.")
if err != nil {
return nil, err
}
l.FilterChains = append(l.FilterChains, &envoy_listener_v3.FilterChain{
FilterChainMatch: &envoy_listener_v3.FilterChainMatch{
ServerNames: []string{clusterName},
},
Filters: []*envoy_listener_v3.Filter{
filter,
},
})
}
}
// This needs to get tacked on at the end as it has no
// matching and will act as a catch all
l.FilterChains = append(l.FilterChains, sniClusterChain)

View File

@ -215,6 +215,12 @@ func getMeshGatewayPeeringGoldenTestCases() []goldenTestCase {
return proxycfg.TestConfigSnapshotPeeredMeshGateway(t, "chain-and-l7-stuff", nil, nil)
},
},
{
name: "mesh-gateway-peering-control-plane",
create: func(t testinf.T) *proxycfg.ConfigSnapshot {
return proxycfg.TestConfigSnapshotPeeredMeshGateway(t, "control-plane", nil, nil)
},
},
}
}

View File

@ -0,0 +1,24 @@
{
"versionInfo": "00000001",
"resources": [
{
"@type": "type.googleapis.com/envoy.config.cluster.v3.Cluster",
"name": "server.dc1.peering.11111111-2222-3333-4444-555555555555.consul",
"type": "EDS",
"edsClusterConfig": {
"edsConfig": {
"ads": {
},
"resourceApiVersion": "V3"
}
},
"connectTimeout": "5s",
"outlierDetection": {
}
}
],
"typeUrl": "type.googleapis.com/envoy.config.cluster.v3.Cluster",
"nonce": "00000001"
}

View File

@ -0,0 +1,37 @@
{
"versionInfo": "00000001",
"resources": [
{
"@type": "type.googleapis.com/envoy.config.endpoint.v3.ClusterLoadAssignment",
"clusterName": "server.dc1.peering.11111111-2222-3333-4444-555555555555.consul",
"endpoints": [
{
"lbEndpoints": [
{
"endpoint": {
"address": {
"socketAddress": {
"address": "127.0.0.1",
"portValue": 8503
}
}
}
},
{
"endpoint": {
"address": {
"socketAddress": {
"address": "127.0.0.2",
"portValue": 8503
}
}
}
}
]
}
]
}
],
"typeUrl": "type.googleapis.com/envoy.config.endpoint.v3.ClusterLoadAssignment",
"nonce": "00000001"
}

View File

@ -0,0 +1,62 @@
{
"versionInfo": "00000001",
"resources": [
{
"@type": "type.googleapis.com/envoy.config.listener.v3.Listener",
"name": "default:1.2.3.4:8443",
"address": {
"socketAddress": {
"address": "1.2.3.4",
"portValue": 8443
}
},
"filterChains": [
{
"filterChainMatch": {
"serverNames": [
"server.dc1.peering.11111111-2222-3333-4444-555555555555.consul"
]
},
"filters": [
{
"name": "envoy.filters.network.tcp_proxy",
"typedConfig": {
"@type": "type.googleapis.com/envoy.extensions.filters.network.tcp_proxy.v3.TcpProxy",
"statPrefix": "mesh_gateway_local_peering_server.default.dc1",
"cluster": "server.dc1.peering.11111111-2222-3333-4444-555555555555.consul"
}
}
]
},
{
"filters": [
{
"name": "envoy.filters.network.sni_cluster",
"typedConfig": {
"@type": "type.googleapis.com/envoy.extensions.filters.network.sni_cluster.v3.SniCluster"
}
},
{
"name": "envoy.filters.network.tcp_proxy",
"typedConfig": {
"@type": "type.googleapis.com/envoy.extensions.filters.network.tcp_proxy.v3.TcpProxy",
"statPrefix": "mesh_gateway_local.default",
"cluster": ""
}
}
]
}
],
"listenerFilters": [
{
"name": "envoy.filters.listener.tls_inspector",
"typedConfig": {
"@type": "type.googleapis.com/envoy.extensions.filters.listener.tls_inspector.v3.TlsInspector"
}
}
]
}
],
"typeUrl": "type.googleapis.com/envoy.config.listener.v3.Listener",
"nonce": "00000001"
}

View File

@ -0,0 +1,5 @@
{
"versionInfo": "00000001",
"typeUrl": "type.googleapis.com/envoy.config.route.v3.RouteConfiguration",
"nonce": "00000001"
}