Manage local server watches depending on mesh cfg

Routing peering control plane traffic through mesh gateways can be
enabled or disabled at runtime with the mesh config entry.

This commit updates proxycfg to add or cancel watches for local servers
depending on this central config.

Note that WAN federation over mesh gateways is determined by a service
metadata flag, and any updates to the gateway service registration will
force the creation of a new snapshot. If enabled, WAN-fed over mesh
gateways will trigger a local server watch on initialize().

Because of this we will only add/remove server watches if WAN federation
over mesh gateways is disabled.
This commit is contained in:
freddygv 2022-09-22 19:24:13 -06:00
parent 3b89b26992
commit 520507232f
6 changed files with 254 additions and 20 deletions

View file

@ -8,6 +8,7 @@ import (
"time" "time"
cachetype "github.com/hashicorp/consul/agent/cache-types" cachetype "github.com/hashicorp/consul/agent/cache-types"
"github.com/hashicorp/consul/agent/proxycfg/internal/watch"
"github.com/hashicorp/consul/agent/structs" "github.com/hashicorp/consul/agent/structs"
"github.com/hashicorp/consul/lib/maps" "github.com/hashicorp/consul/lib/maps"
"github.com/hashicorp/consul/logging" "github.com/hashicorp/consul/logging"
@ -21,6 +22,8 @@ type handlerMeshGateway struct {
// initialize sets up the watches needed based on the current mesh gateway registration // initialize sets up the watches needed based on the current mesh gateway registration
func (s *handlerMeshGateway) initialize(ctx context.Context) (ConfigSnapshot, error) { func (s *handlerMeshGateway) initialize(ctx context.Context) (ConfigSnapshot, error) {
snap := newConfigSnapshotFromServiceInstance(s.serviceInstance, s.stateConfig) snap := newConfigSnapshotFromServiceInstance(s.serviceInstance, s.stateConfig)
snap.MeshGateway.WatchedConsulServers = watch.NewMap[string, structs.CheckServiceNodes]()
// Watch for root changes // Watch for root changes
err := s.dataSources.CARoots.Notify(ctx, &structs.DCSpecificRequest{ err := s.dataSources.CARoots.Notify(ctx, &structs.DCSpecificRequest{
Datacenter: s.source.Datacenter, Datacenter: s.source.Datacenter,
@ -76,7 +79,7 @@ func (s *handlerMeshGateway) initialize(ctx context.Context) (ConfigSnapshot, er
} }
if s.proxyID.InDefaultPartition() { if s.proxyID.InDefaultPartition() {
if err := s.initializeCrossDCWatches(ctx); err != nil { if err := s.initializeCrossDCWatches(ctx, &snap); err != nil {
return snap, err return snap, err
} }
} }
@ -123,7 +126,7 @@ func (s *handlerMeshGateway) initialize(ctx context.Context) (ConfigSnapshot, er
return snap, err return snap, err
} }
func (s *handlerMeshGateway) initializeCrossDCWatches(ctx context.Context) error { func (s *handlerMeshGateway) initializeCrossDCWatches(ctx context.Context, snap *ConfigSnapshot) error {
if s.meta[structs.MetaWANFederationKey] == "1" { if s.meta[structs.MetaWANFederationKey] == "1" {
// Conveniently we can just use this service meta attribute in one // Conveniently we can just use this service meta attribute in one
// place here to set the machinery in motion and leave the conditional // place here to set the machinery in motion and leave the conditional
@ -145,6 +148,7 @@ func (s *handlerMeshGateway) initializeCrossDCWatches(ctx context.Context) error
if err != nil { if err != nil {
return err return err
} }
snap.MeshGateway.WatchedConsulServers.InitWatch(structs.ConsulServiceName, nil)
} }
err := s.dataSources.Datacenters.Notify(ctx, &structs.DatacentersRequest{ err := s.dataSources.Datacenters.Notify(ctx, &structs.DatacentersRequest{
@ -325,7 +329,6 @@ func (s *handlerMeshGateway) handleUpdate(ctx context.Context, u UpdateEvent, sn
return fmt.Errorf("invalid type for response: %T", u.Result) return fmt.Errorf("invalid type for response: %T", u.Result)
} }
// Do some initial sanity checks to avoid doing something dumb.
for _, csn := range resp.Nodes { for _, csn := range resp.Nodes {
if csn.Service.Service != structs.ConsulServiceName { if csn.Service.Service != structs.ConsulServiceName {
return fmt.Errorf("expected service name %q but got %q", return fmt.Errorf("expected service name %q but got %q",
@ -337,7 +340,7 @@ func (s *handlerMeshGateway) handleUpdate(ctx context.Context, u UpdateEvent, sn
} }
} }
snap.MeshGateway.ConsulServers = resp.Nodes snap.MeshGateway.WatchedConsulServers.Set(structs.ConsulServiceName, resp.Nodes)
case exportedServiceListWatchID: case exportedServiceListWatchID:
exportedServices, ok := u.Result.(*structs.IndexedExportedServiceList) exportedServices, ok := u.Result.(*structs.IndexedExportedServiceList)
@ -463,17 +466,55 @@ func (s *handlerMeshGateway) handleUpdate(ctx context.Context, u UpdateEvent, sn
return fmt.Errorf("invalid type for response: %T", u.Result) return fmt.Errorf("invalid type for response: %T", u.Result)
} }
if resp.Entry != nil { if resp.Entry == nil {
meshConf, ok := resp.Entry.(*structs.MeshConfigEntry)
if !ok {
return fmt.Errorf("invalid type for config entry: %T", resp.Entry)
}
snap.MeshGateway.MeshConfig = meshConf
} else {
snap.MeshGateway.MeshConfig = nil snap.MeshGateway.MeshConfig = nil
// We avoid managing server watches when WAN federation is enabled since it
// always requires server watches.
if s.meta[structs.MetaWANFederationKey] != "1" {
// If the entry was deleted we cancel watches that may have existed because of
// PeerThroughMeshGateways being set in the past.
snap.MeshGateway.WatchedConsulServers.CancelWatch(structs.ConsulServiceName)
}
snap.MeshGateway.MeshConfigSet = true
return nil
} }
meshConf, ok := resp.Entry.(*structs.MeshConfigEntry)
if !ok {
return fmt.Errorf("invalid type for config entry: %T", resp.Entry)
}
snap.MeshGateway.MeshConfig = meshConf
snap.MeshGateway.MeshConfigSet = true snap.MeshGateway.MeshConfigSet = true
// We avoid managing Consul server watches when WAN federation is enabled since it
// always requires server watches.
if s.meta[structs.MetaWANFederationKey] == "1" {
return nil
}
if meshConf.Peering == nil || !meshConf.Peering.PeerThroughMeshGateways {
snap.MeshGateway.WatchedConsulServers.CancelWatch(structs.ConsulServiceName)
return nil
}
if snap.MeshGateway.WatchedConsulServers.IsWatched(structs.ConsulServiceName) {
return nil
}
notifyCtx, cancel := context.WithCancel(ctx)
err := s.dataSources.Health.Notify(notifyCtx, &structs.ServiceSpecificRequest{
Datacenter: s.source.Datacenter,
QueryOptions: structs.QueryOptions{Token: s.token},
ServiceName: structs.ConsulServiceName,
}, consulServerListWatchID, s.ch)
if err != nil {
cancel()
return fmt.Errorf("failed to watch local consul servers: %w", err)
}
snap.MeshGateway.WatchedConsulServers.InitWatch(structs.ConsulServiceName, cancel)
default: default:
switch { switch {
case strings.HasPrefix(u.CorrelationID, "connect-service:"): case strings.HasPrefix(u.CorrelationID, "connect-service:"):

View file

@ -375,8 +375,11 @@ type configSnapshotMeshGateway struct {
// datacenter. // datacenter.
FedStateGateways map[string]structs.CheckServiceNodes FedStateGateways map[string]structs.CheckServiceNodes
// ConsulServers is the list of consul servers in this datacenter. // WatchedConsulServers is a map of (structs.ConsulServiceName -> structs.CheckServiceNodes)`
ConsulServers structs.CheckServiceNodes // Mesh gateways can spin up watches for local servers both for
// WAN federation and for peering. This map ensures we only have one
// watch at a time.
WatchedConsulServers watch.Map[string, structs.CheckServiceNodes]
// HostnameDatacenters is a map of datacenters to mesh gateway instances with a hostname as the address. // HostnameDatacenters is a map of datacenters to mesh gateway instances with a hostname as the address.
// If hostnames are configured they must be provided to Envoy via CDS not EDS. // If hostnames are configured they must be provided to Envoy via CDS not EDS.
@ -556,8 +559,8 @@ func (c *configSnapshotMeshGateway) isEmpty() bool {
len(c.ServiceResolvers) == 0 && len(c.ServiceResolvers) == 0 &&
len(c.GatewayGroups) == 0 && len(c.GatewayGroups) == 0 &&
len(c.FedStateGateways) == 0 && len(c.FedStateGateways) == 0 &&
len(c.ConsulServers) == 0 &&
len(c.HostnameDatacenters) == 0 && len(c.HostnameDatacenters) == 0 &&
c.WatchedConsulServers.Len() == 0 &&
c.isEmptyPeering() c.isEmptyPeering()
} }
@ -690,8 +693,11 @@ func (s *ConfigSnapshot) Valid() bool {
s.TerminatingGateway.MeshConfigSet s.TerminatingGateway.MeshConfigSet
case structs.ServiceKindMeshGateway: case structs.ServiceKindMeshGateway:
if s.ServiceMeta[structs.MetaWANFederationKey] == "1" { if s.MeshGateway.WatchedConsulServers.Len() == 0 {
if len(s.MeshGateway.ConsulServers) == 0 { if s.ServiceMeta[structs.MetaWANFederationKey] == "1" {
return false
}
if cfg := s.MeshConfig(); cfg != nil && cfg.Peering != nil && cfg.Peering.PeerThroughMeshGateways {
return false return false
} }
} }

View file

@ -779,6 +779,9 @@ func TestState_WatchesAndUpdates(t *testing.T) {
Service: "mesh-gateway", Service: "mesh-gateway",
Address: "10.0.1.1", Address: "10.0.1.1",
Port: 443, Port: 443,
Meta: map[string]string{
structs.MetaWANFederationKey: "1",
},
}, },
sourceDC: "dc1", sourceDC: "dc1",
stages: []verificationStage{ stages: []verificationStage{
@ -790,6 +793,7 @@ func TestState_WatchesAndUpdates(t *testing.T) {
exportedServiceListWatchID: genVerifyDCSpecificWatch("dc1"), exportedServiceListWatchID: genVerifyDCSpecificWatch("dc1"),
meshConfigEntryID: genVerifyMeshConfigWatch("dc1"), meshConfigEntryID: genVerifyMeshConfigWatch("dc1"),
peeringTrustBundlesWatchID: genVerifyTrustBundleListWatchForMeshGateway(""), peeringTrustBundlesWatchID: genVerifyTrustBundleListWatchForMeshGateway(""),
consulServerListWatchID: genVerifyServiceSpecificPeeredRequest(structs.ConsulServiceName, "", "dc1", "", false),
}, },
verifySnapshot: func(t testing.TB, snap *ConfigSnapshot) { verifySnapshot: func(t testing.TB, snap *ConfigSnapshot) {
require.False(t, snap.Valid(), "gateway without root is not valid") require.False(t, snap.Valid(), "gateway without root is not valid")
@ -1015,6 +1019,186 @@ func TestState_WatchesAndUpdates(t *testing.T) {
}, },
}, },
}, },
"mesh-gateway-peering-control-plane": {
ns: structs.NodeService{
Kind: structs.ServiceKindMeshGateway,
ID: "mesh-gateway",
Service: "mesh-gateway",
Address: "10.0.1.1",
Port: 443,
},
sourceDC: "dc1",
stages: []verificationStage{
{
requiredWatches: map[string]verifyWatchRequest{
datacentersWatchID: verifyDatacentersWatch,
serviceListWatchID: genVerifyDCSpecificWatch("dc1"),
rootsWatchID: genVerifyDCSpecificWatch("dc1"),
exportedServiceListWatchID: genVerifyDCSpecificWatch("dc1"),
meshConfigEntryID: genVerifyMeshConfigWatch("dc1"),
peeringTrustBundlesWatchID: genVerifyTrustBundleListWatchForMeshGateway(""),
},
verifySnapshot: func(t testing.TB, snap *ConfigSnapshot) {
require.False(t, snap.Valid(), "gateway without root is not valid")
},
},
{
events: []UpdateEvent{
rootWatchEvent(),
{
CorrelationID: meshConfigEntryID,
Result: &structs.ConfigEntryResponse{
Entry: &structs.MeshConfigEntry{
Peering: &structs.PeeringMeshConfig{
PeerThroughMeshGateways: true,
},
},
},
},
{
CorrelationID: exportedServiceListWatchID,
Result: &structs.IndexedExportedServiceList{
Services: nil,
},
},
{
CorrelationID: serviceListWatchID,
Result: &structs.IndexedServiceList{
Services: structs.ServiceList{},
},
},
{
CorrelationID: peeringTrustBundlesWatchID,
Result: &pbpeering.TrustBundleListByServiceResponse{
Bundles: nil,
},
},
},
verifySnapshot: func(t testing.TB, snap *ConfigSnapshot) {
require.Equal(t, indexedRoots, snap.Roots)
require.True(t, snap.MeshGateway.WatchedServicesSet)
require.True(t, snap.MeshGateway.PeeringTrustBundlesSet)
require.True(t, snap.MeshGateway.MeshConfigSet)
require.True(t, snap.Valid(), "gateway without services is valid")
require.True(t, snap.ConnectProxy.isEmpty())
},
},
{
requiredWatches: map[string]verifyWatchRequest{
consulServerListWatchID: genVerifyServiceSpecificPeeredRequest(structs.ConsulServiceName, "", "dc1", "", false),
},
events: []UpdateEvent{
{
CorrelationID: consulServerListWatchID,
Result: &structs.IndexedCheckServiceNodes{
Nodes: structs.CheckServiceNodes{
{
Node: &structs.Node{
Datacenter: "dc1",
Node: "node1",
Address: "127.0.0.1",
},
Service: &structs.NodeService{
ID: structs.ConsulServiceID,
Service: structs.ConsulServiceName,
},
},
{
Node: &structs.Node{
Datacenter: "dc1",
Node: "replica1",
Address: "127.0.0.1",
},
Service: &structs.NodeService{
ID: structs.ConsulServiceID,
Service: structs.ConsulServiceName,
Meta: map[string]string{"read_replica": "true"},
},
},
},
},
Err: nil,
},
},
verifySnapshot: func(t testing.TB, snap *ConfigSnapshot) {
require.True(t, snap.Valid())
servers, ok := snap.MeshGateway.WatchedConsulServers.Get(structs.ConsulServiceName)
require.True(t, ok)
expect := structs.CheckServiceNodes{
{
Node: &structs.Node{
Datacenter: "dc1",
Node: "node1",
Address: "127.0.0.1",
},
Service: &structs.NodeService{
ID: structs.ConsulServiceID,
Service: structs.ConsulServiceName,
},
},
{
Node: &structs.Node{
Datacenter: "dc1",
Node: "replica1",
Address: "127.0.0.1",
},
Service: &structs.NodeService{
ID: structs.ConsulServiceID,
Service: structs.ConsulServiceName,
Meta: map[string]string{"read_replica": "true"},
},
},
}
require.Equal(t, expect, servers)
},
},
{
events: []UpdateEvent{
{
CorrelationID: meshConfigEntryID,
Result: &structs.ConfigEntryResponse{
Entry: &structs.MeshConfigEntry{
Peering: &structs.PeeringMeshConfig{
PeerThroughMeshGateways: false,
},
},
},
},
},
verifySnapshot: func(t testing.TB, snap *ConfigSnapshot) {
require.True(t, snap.Valid())
require.NotNil(t, snap.MeshConfig())
require.False(t, snap.MeshGateway.WatchedConsulServers.IsWatched(structs.ConsulServiceName))
servers, ok := snap.MeshGateway.WatchedConsulServers.Get(structs.ConsulServiceName)
require.False(t, ok)
require.Empty(t, servers)
},
},
{
events: []UpdateEvent{
{
CorrelationID: meshConfigEntryID,
Result: &structs.ConfigEntryResponse{
Entry: nil,
},
},
},
verifySnapshot: func(t testing.TB, snap *ConfigSnapshot) {
require.True(t, snap.Valid())
require.Nil(t, snap.MeshConfig())
require.False(t, snap.MeshGateway.WatchedConsulServers.IsWatched(structs.ConsulServiceName))
servers, ok := snap.MeshGateway.WatchedConsulServers.Get(structs.ConsulServiceName)
require.False(t, ok)
require.Empty(t, servers)
},
},
},
},
"ingress-gateway": { "ingress-gateway": {
ns: structs.NodeService{ ns: structs.NodeService{
Kind: structs.ServiceKindIngressGateway, Kind: structs.ServiceKindIngressGateway,

View file

@ -386,7 +386,8 @@ func (s *ResourceGenerator) clustersFromSnapshotMeshGateway(cfgSnap *proxycfg.Co
} }
// And for the current datacenter, send all flavors appropriately. // And for the current datacenter, send all flavors appropriately.
for _, srv := range cfgSnap.MeshGateway.ConsulServers { servers, _ := cfgSnap.MeshGateway.WatchedConsulServers.Get(structs.ConsulServiceName)
for _, srv := range servers {
opts := clusterOpts{ opts := clusterOpts{
name: cfgSnap.ServerSNIFn(cfgSnap.Datacenter, srv.Node.Node), name: cfgSnap.ServerSNIFn(cfgSnap.Datacenter, srv.Node.Node),
} }

View file

@ -249,7 +249,8 @@ func (s *ResourceGenerator) endpointsFromSnapshotMeshGateway(cfgSnap *proxycfg.C
cfgSnap.ServerSNIFn != nil { cfgSnap.ServerSNIFn != nil {
var allServersLbEndpoints []*envoy_endpoint_v3.LbEndpoint var allServersLbEndpoints []*envoy_endpoint_v3.LbEndpoint
for _, srv := range cfgSnap.MeshGateway.ConsulServers { servers, _ := cfgSnap.MeshGateway.WatchedConsulServers.Get(structs.ConsulServiceName)
for _, srv := range servers {
clusterName := cfgSnap.ServerSNIFn(cfgSnap.Datacenter, srv.Node.Node) clusterName := cfgSnap.ServerSNIFn(cfgSnap.Datacenter, srv.Node.Node)
_, addr, port := srv.BestAddress(false /*wan*/) _, addr, port := srv.BestAddress(false /*wan*/)

View file

@ -274,7 +274,7 @@ func (s *ResourceGenerator) listenersFromSnapshotConnectProxy(cfgSnap *proxycfg.
return nil return nil
} }
configuredPorts[svcConfig.Destination.Port] = struct{}{} configuredPorts[svcConfig.Destination.Port] = struct{}{}
const name = "~http" //name used for the shared route name const name = "~http" // name used for the shared route name
routeName := clusterNameForDestination(cfgSnap, name, fmt.Sprintf("%d", svcConfig.Destination.Port), svcConfig.NamespaceOrDefault(), svcConfig.PartitionOrDefault()) routeName := clusterNameForDestination(cfgSnap, name, fmt.Sprintf("%d", svcConfig.Destination.Port), svcConfig.NamespaceOrDefault(), svcConfig.PartitionOrDefault())
filterChain, err := s.makeUpstreamFilterChain(filterChainOpts{ filterChain, err := s.makeUpstreamFilterChain(filterChainOpts{
routeName: routeName, routeName: routeName,
@ -1739,7 +1739,8 @@ func (s *ResourceGenerator) makeMeshGatewayListener(name, addr string, port int,
} }
// Wildcard all flavors to each server. // Wildcard all flavors to each server.
for _, srv := range cfgSnap.MeshGateway.ConsulServers { servers, _ := cfgSnap.MeshGateway.WatchedConsulServers.Get(structs.ConsulServiceName)
for _, srv := range servers {
clusterName := cfgSnap.ServerSNIFn(cfgSnap.Datacenter, srv.Node.Node) clusterName := cfgSnap.ServerSNIFn(cfgSnap.Datacenter, srv.Node.Node)
filterName := fmt.Sprintf("%s.%s", name, cfgSnap.Datacenter) filterName := fmt.Sprintf("%s.%s", name, cfgSnap.Datacenter)