Update mesh gateway proxy watches for partitions
This commit updates mesh gateway watches for cross-partitions communication. * Mesh gateways are keyed by partition and datacenter. * Mesh gateways will now watch gateways in partitions that export services to their partition. * Mesh gateways in non-default partitions will not have cross-datacenter watches. They are not involved in traditional WAN federation.
This commit is contained in:
parent
12e57ad0f7
commit
f3f15640a9
|
@ -6,6 +6,7 @@ import (
|
|||
"strings"
|
||||
"time"
|
||||
|
||||
"github.com/hashicorp/consul/acl"
|
||||
"github.com/hashicorp/consul/agent/cache"
|
||||
cachetype "github.com/hashicorp/consul/agent/cache-types"
|
||||
"github.com/hashicorp/consul/agent/structs"
|
||||
|
@ -31,7 +32,10 @@ func (s *handlerMeshGateway) initialize(ctx context.Context) (ConfigSnapshot, er
|
|||
|
||||
wildcardEntMeta := s.proxyID.WithWildcardNamespace()
|
||||
|
||||
// Watch for all services
|
||||
// Watch for all services.
|
||||
// Eventually we will have to watch connect enable instances for each service as well as the
|
||||
// destination services themselves but those notifications will be setup later.
|
||||
// We cannot setup those watches until we know what the services are.
|
||||
err = s.cache.Notify(ctx, cachetype.CatalogServiceListName, &structs.DCSpecificRequest{
|
||||
Datacenter: s.source.Datacenter,
|
||||
QueryOptions: structs.QueryOptions{Token: s.token},
|
||||
|
@ -43,45 +47,6 @@ func (s *handlerMeshGateway) initialize(ctx context.Context) (ConfigSnapshot, er
|
|||
return snap, err
|
||||
}
|
||||
|
||||
if s.meta[structs.MetaWANFederationKey] == "1" {
|
||||
// Conveniently we can just use this service meta attribute in one
|
||||
// place here to set the machinery in motion and leave the conditional
|
||||
// behavior out of the rest of the package.
|
||||
err = s.cache.Notify(ctx, cachetype.FederationStateListMeshGatewaysName, &structs.DCSpecificRequest{
|
||||
Datacenter: s.source.Datacenter,
|
||||
QueryOptions: structs.QueryOptions{Token: s.token},
|
||||
Source: *s.source,
|
||||
}, federationStateListGatewaysWatchID, s.ch)
|
||||
if err != nil {
|
||||
return snap, err
|
||||
}
|
||||
|
||||
err = s.health.Notify(ctx, structs.ServiceSpecificRequest{
|
||||
Datacenter: s.source.Datacenter,
|
||||
QueryOptions: structs.QueryOptions{Token: s.token},
|
||||
ServiceName: structs.ConsulServiceName,
|
||||
}, consulServerListWatchID, s.ch)
|
||||
if err != nil {
|
||||
return snap, err
|
||||
}
|
||||
}
|
||||
|
||||
// Eventually we will have to watch connect enable instances for each service as well as the
|
||||
// destination services themselves but those notifications will be setup later. However we
|
||||
// cannot setup those watches until we know what the services are. from the service list
|
||||
// watch above
|
||||
|
||||
err = s.cache.Notify(ctx, cachetype.CatalogDatacentersName, &structs.DatacentersRequest{
|
||||
QueryOptions: structs.QueryOptions{Token: s.token, MaxAge: 30 * time.Second},
|
||||
}, datacentersWatchID, s.ch)
|
||||
if err != nil {
|
||||
return snap, err
|
||||
}
|
||||
|
||||
// Once we start getting notified about the datacenters we will setup watches on the
|
||||
// gateways within those other datacenters. We cannot do that here because we don't
|
||||
// know what they are yet.
|
||||
|
||||
// Watch service-resolvers so we can setup service subset clusters
|
||||
err = s.cache.Notify(ctx, cachetype.ConfigEntriesName, &structs.ConfigEntryQuery{
|
||||
Datacenter: s.source.Datacenter,
|
||||
|
@ -95,17 +60,66 @@ func (s *handlerMeshGateway) initialize(ctx context.Context) (ConfigSnapshot, er
|
|||
return snap, err
|
||||
}
|
||||
|
||||
if s.proxyID.PartitionOrEmpty() == acl.DefaultPartitionName {
|
||||
if err := s.initializeCrossDCWatches(ctx); err != nil {
|
||||
return snap, err
|
||||
}
|
||||
}
|
||||
|
||||
if err := s.initializeEntWatches(ctx); err != nil {
|
||||
return snap, err
|
||||
}
|
||||
|
||||
snap.MeshGateway.WatchedServices = make(map[structs.ServiceName]context.CancelFunc)
|
||||
snap.MeshGateway.WatchedDatacenters = make(map[string]context.CancelFunc)
|
||||
snap.MeshGateway.WatchedGateways = make(map[string]context.CancelFunc)
|
||||
snap.MeshGateway.ServiceGroups = make(map[structs.ServiceName]structs.CheckServiceNodes)
|
||||
snap.MeshGateway.GatewayGroups = make(map[string]structs.CheckServiceNodes)
|
||||
snap.MeshGateway.ServiceResolvers = make(map[structs.ServiceName]*structs.ServiceResolverConfigEntry)
|
||||
snap.MeshGateway.HostnameDatacenters = make(map[string]structs.CheckServiceNodes)
|
||||
|
||||
// there is no need to initialize the map of service resolvers as we
|
||||
// fully rebuild it every time we get updates
|
||||
return snap, err
|
||||
}
|
||||
|
||||
func (s *handlerMeshGateway) initializeCrossDCWatches(ctx context.Context) error {
|
||||
if s.meta[structs.MetaWANFederationKey] == "1" {
|
||||
// Conveniently we can just use this service meta attribute in one
|
||||
// place here to set the machinery in motion and leave the conditional
|
||||
// behavior out of the rest of the package.
|
||||
err := s.cache.Notify(ctx, cachetype.FederationStateListMeshGatewaysName, &structs.DCSpecificRequest{
|
||||
Datacenter: s.source.Datacenter,
|
||||
QueryOptions: structs.QueryOptions{Token: s.token},
|
||||
Source: *s.source,
|
||||
}, federationStateListGatewaysWatchID, s.ch)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
err = s.health.Notify(ctx, structs.ServiceSpecificRequest{
|
||||
Datacenter: s.source.Datacenter,
|
||||
QueryOptions: structs.QueryOptions{Token: s.token},
|
||||
ServiceName: structs.ConsulServiceName,
|
||||
}, consulServerListWatchID, s.ch)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
||||
err := s.cache.Notify(ctx, cachetype.CatalogDatacentersName, &structs.DatacentersRequest{
|
||||
QueryOptions: structs.QueryOptions{Token: s.token, MaxAge: 30 * time.Second},
|
||||
}, datacentersWatchID, s.ch)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
// Once we start getting notified about the datacenters we will setup watches on the
|
||||
// gateways within those other datacenters. We cannot do that here because we don't
|
||||
// know what they are yet.
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (s *handlerMeshGateway) handleUpdate(ctx context.Context, u cache.UpdateEvent, snap *ConfigSnapshot) error {
|
||||
if u.Err != nil {
|
||||
return fmt.Errorf("error filling agent cache: %v", u.Err)
|
||||
|
@ -120,6 +134,7 @@ func (s *handlerMeshGateway) handleUpdate(ctx context.Context, u cache.UpdateEve
|
|||
return fmt.Errorf("invalid type for response: %T", u.Result)
|
||||
}
|
||||
snap.Roots = roots
|
||||
|
||||
case federationStateListGatewaysWatchID:
|
||||
dcIndexedNodes, ok := u.Result.(*structs.DatacenterIndexedCheckServiceNodes)
|
||||
if !ok {
|
||||
|
@ -181,8 +196,8 @@ func (s *handlerMeshGateway) handleUpdate(ctx context.Context, u cache.UpdateEve
|
|||
cancelFn()
|
||||
}
|
||||
}
|
||||
|
||||
snap.MeshGateway.WatchedServicesSet = true
|
||||
|
||||
case datacentersWatchID:
|
||||
datacentersRaw, ok := u.Result.(*[]string)
|
||||
if !ok {
|
||||
|
@ -199,7 +214,10 @@ func (s *handlerMeshGateway) handleUpdate(ctx context.Context, u cache.UpdateEve
|
|||
continue
|
||||
}
|
||||
|
||||
if _, ok := snap.MeshGateway.WatchedDatacenters[dc]; !ok {
|
||||
entMeta := structs.DefaultEnterpriseMetaInDefaultPartition()
|
||||
gk := GatewayKey{Datacenter: dc, Partition: entMeta.PartitionOrDefault()}
|
||||
|
||||
if _, ok := snap.MeshGateway.WatchedGateways[gk.String()]; !ok {
|
||||
ctx, cancel := context.WithCancel(ctx)
|
||||
err := s.cache.Notify(ctx, cachetype.InternalServiceDumpName, &structs.ServiceDumpRequest{
|
||||
Datacenter: dc,
|
||||
|
@ -207,36 +225,42 @@ func (s *handlerMeshGateway) handleUpdate(ctx context.Context, u cache.UpdateEve
|
|||
ServiceKind: structs.ServiceKindMeshGateway,
|
||||
UseServiceKind: true,
|
||||
Source: *s.source,
|
||||
EnterpriseMeta: *structs.DefaultEnterpriseMetaInDefaultPartition(),
|
||||
}, fmt.Sprintf("mesh-gateway:%s", dc), s.ch)
|
||||
EnterpriseMeta: *entMeta,
|
||||
}, fmt.Sprintf("mesh-gateway:%s", gk.String()), s.ch)
|
||||
|
||||
if err != nil {
|
||||
meshLogger.Error("failed to register watch for mesh-gateway",
|
||||
"datacenter", dc,
|
||||
"partition", entMeta.PartitionOrDefault(),
|
||||
"error", err,
|
||||
)
|
||||
cancel()
|
||||
return err
|
||||
}
|
||||
|
||||
snap.MeshGateway.WatchedDatacenters[dc] = cancel
|
||||
snap.MeshGateway.WatchedGateways[gk.String()] = cancel
|
||||
}
|
||||
}
|
||||
|
||||
for dc, cancelFn := range snap.MeshGateway.WatchedDatacenters {
|
||||
for key, cancelFn := range snap.MeshGateway.WatchedGateways {
|
||||
gk := gatewayKeyFromString(key)
|
||||
if gk.Datacenter == s.source.Datacenter {
|
||||
// Only cross-DC watches are managed by the datacenters watch.
|
||||
continue
|
||||
}
|
||||
|
||||
found := false
|
||||
for _, dcCurrent := range datacenters {
|
||||
if dcCurrent == dc {
|
||||
if dcCurrent == gk.Datacenter {
|
||||
found = true
|
||||
break
|
||||
}
|
||||
}
|
||||
|
||||
if !found {
|
||||
delete(snap.MeshGateway.WatchedDatacenters, dc)
|
||||
delete(snap.MeshGateway.WatchedGateways, key)
|
||||
cancelFn()
|
||||
}
|
||||
}
|
||||
|
||||
case serviceResolversWatchID:
|
||||
configEntries, ok := u.Result.(*structs.IndexedConfigEntries)
|
||||
if !ok {
|
||||
|
@ -286,23 +310,27 @@ func (s *handlerMeshGateway) handleUpdate(ctx context.Context, u cache.UpdateEve
|
|||
} else if _, ok := snap.MeshGateway.ServiceGroups[sn]; ok {
|
||||
delete(snap.MeshGateway.ServiceGroups, sn)
|
||||
}
|
||||
|
||||
case strings.HasPrefix(u.CorrelationID, "mesh-gateway:"):
|
||||
resp, ok := u.Result.(*structs.IndexedNodesWithGateways)
|
||||
if !ok {
|
||||
return fmt.Errorf("invalid type for response: %T", u.Result)
|
||||
}
|
||||
|
||||
dc := strings.TrimPrefix(u.CorrelationID, "mesh-gateway:")
|
||||
delete(snap.MeshGateway.GatewayGroups, dc)
|
||||
delete(snap.MeshGateway.HostnameDatacenters, dc)
|
||||
key := strings.TrimPrefix(u.CorrelationID, "mesh-gateway:")
|
||||
delete(snap.MeshGateway.GatewayGroups, key)
|
||||
delete(snap.MeshGateway.HostnameDatacenters, key)
|
||||
|
||||
if len(resp.Nodes) > 0 {
|
||||
snap.MeshGateway.GatewayGroups[dc] = resp.Nodes
|
||||
snap.MeshGateway.HostnameDatacenters[dc] = hostnameEndpoints(
|
||||
snap.MeshGateway.GatewayGroups[key] = resp.Nodes
|
||||
snap.MeshGateway.HostnameDatacenters[key] = hostnameEndpoints(
|
||||
s.logger.Named(logging.MeshGateway), snap.Datacenter, resp.Nodes)
|
||||
}
|
||||
|
||||
default:
|
||||
// do nothing for now
|
||||
if err := s.handleEntUpdate(meshLogger, ctx, u, snap); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -0,0 +1,19 @@
|
|||
// +build !consulent
|
||||
|
||||
package proxycfg
|
||||
|
||||
import (
|
||||
"context"
|
||||
|
||||
"github.com/hashicorp/go-hclog"
|
||||
|
||||
"github.com/hashicorp/consul/agent/cache"
|
||||
)
|
||||
|
||||
func (s *handlerMeshGateway) initializeEntWatches(_ context.Context) error {
|
||||
return nil
|
||||
}
|
||||
|
||||
func (s *handlerMeshGateway) handleEntUpdate(_ hclog.Logger, _ context.Context, _ cache.UpdateEvent, _ *ConfigSnapshot) error {
|
||||
return nil
|
||||
}
|
|
@ -256,10 +256,10 @@ type configSnapshotMeshGateway struct {
|
|||
// health check to pass.
|
||||
WatchedServicesSet bool
|
||||
|
||||
// WatchedDatacenters is a map of datacenter name to a cancel function.
|
||||
// WatchedGateways is a map of GatewayKeys to a cancel function.
|
||||
// This cancel function is tied to the watch of mesh-gateway services in
|
||||
// that datacenter.
|
||||
WatchedDatacenters map[string]context.CancelFunc
|
||||
// that datacenter/partition.
|
||||
WatchedGateways map[string]context.CancelFunc
|
||||
|
||||
// ServiceGroups is a map of service name to the service instances of that
|
||||
// service in the local datacenter.
|
||||
|
@ -315,7 +315,7 @@ func (c *configSnapshotMeshGateway) IsEmpty() bool {
|
|||
}
|
||||
return len(c.WatchedServices) == 0 &&
|
||||
!c.WatchedServicesSet &&
|
||||
len(c.WatchedDatacenters) == 0 &&
|
||||
len(c.WatchedGateways) == 0 &&
|
||||
len(c.ServiceGroups) == 0 &&
|
||||
len(c.ServiceResolvers) == 0 &&
|
||||
len(c.GatewayGroups) == 0 &&
|
||||
|
@ -466,7 +466,7 @@ func (s *ConfigSnapshot) Clone() (*ConfigSnapshot, error) {
|
|||
snap.TerminatingGateway.WatchedConfigs = nil
|
||||
snap.TerminatingGateway.WatchedResolvers = nil
|
||||
case structs.ServiceKindMeshGateway:
|
||||
snap.MeshGateway.WatchedDatacenters = nil
|
||||
snap.MeshGateway.WatchedGateways = nil
|
||||
snap.MeshGateway.WatchedServices = nil
|
||||
case structs.ServiceKindIngressGateway:
|
||||
snap.IngressGateway.WatchedUpstreams = nil
|
||||
|
|
|
@ -37,6 +37,7 @@ const (
|
|||
datacentersWatchID = "datacenters"
|
||||
serviceResolversWatchID = "service-resolvers"
|
||||
gatewayServicesWatchID = "gateway-services"
|
||||
exportingPartitionsWatchID = "exporting-partitions"
|
||||
gatewayConfigWatchID = "gateway-config"
|
||||
externalServiceIDPrefix = "external-service:"
|
||||
serviceLeafIDPrefix = "service-leaf:"
|
||||
|
|
|
@ -750,7 +750,7 @@ func TestState_WatchesAndUpdates(t *testing.T) {
|
|||
require.Equal(t, indexedRoots, snap.Roots)
|
||||
require.Empty(t, snap.MeshGateway.WatchedServices)
|
||||
require.False(t, snap.MeshGateway.WatchedServicesSet)
|
||||
require.Empty(t, snap.MeshGateway.WatchedDatacenters)
|
||||
require.Empty(t, snap.MeshGateway.WatchedGateways)
|
||||
require.Empty(t, snap.MeshGateway.ServiceGroups)
|
||||
require.Empty(t, snap.MeshGateway.ServiceResolvers)
|
||||
require.Empty(t, snap.MeshGateway.GatewayGroups)
|
||||
|
@ -772,7 +772,7 @@ func TestState_WatchesAndUpdates(t *testing.T) {
|
|||
require.Equal(t, indexedRoots, snap.Roots)
|
||||
require.Empty(t, snap.MeshGateway.WatchedServices)
|
||||
require.True(t, snap.MeshGateway.WatchedServicesSet)
|
||||
require.Empty(t, snap.MeshGateway.WatchedDatacenters)
|
||||
require.Empty(t, snap.MeshGateway.WatchedGateways)
|
||||
require.Empty(t, snap.MeshGateway.ServiceGroups)
|
||||
require.Empty(t, snap.MeshGateway.ServiceResolvers)
|
||||
require.Empty(t, snap.MeshGateway.GatewayGroups)
|
||||
|
|
|
@ -1542,8 +1542,8 @@ func testConfigSnapshotMeshGateway(t testing.T, populateServices bool, useFedera
|
|||
structs.NewServiceName("bar", nil): nil,
|
||||
},
|
||||
WatchedServicesSet: true,
|
||||
WatchedDatacenters: map[string]context.CancelFunc{
|
||||
"dc2": nil,
|
||||
WatchedGateways: map[string]context.CancelFunc{
|
||||
"default.dc2": nil,
|
||||
},
|
||||
ServiceGroups: map[structs.ServiceName]structs.CheckServiceNodes{
|
||||
structs.NewServiceName("foo", nil): TestGatewayServiceGroupFooDC1(t),
|
||||
|
|
Loading…
Reference in New Issue