peering: allow mesh gateways to proxy L4 peered traffic (#13339)
Mesh gateways will now enable tcp connections with SNI names including peering information so that those connections may be proxied. Note: this does not change the callers to use these mesh gateways.
This commit is contained in:
parent
f155ff347c
commit
0681f3571d
|
@ -651,6 +651,7 @@ func (a *Agent) Start(ctx context.Context) error {
|
|||
ServiceList: proxycfgglue.CacheServiceList(a.cache),
|
||||
TrustBundle: proxycfgglue.CacheTrustBundle(a.cache),
|
||||
TrustBundleList: proxycfgglue.CacheTrustBundleList(a.cache),
|
||||
ExportedPeeredServices: proxycfgglue.CacheExportedPeeredServices(a.cache),
|
||||
}
|
||||
a.fillEnterpriseProxyDataSources(&proxyDataSources)
|
||||
a.proxyConfig, err = proxycfg.NewManager(proxycfg.ManagerConfig{
|
||||
|
@ -4136,6 +4137,8 @@ func (a *Agent) registerCache() {
|
|||
|
||||
a.cache.RegisterType(cachetype.TrustBundleReadName, &cachetype.TrustBundle{Client: a.rpcClientPeering})
|
||||
|
||||
a.cache.RegisterType(cachetype.ExportedPeeredServicesName, &cachetype.ExportedPeeredServices{RPC: a})
|
||||
|
||||
a.cache.RegisterType(cachetype.FederationStateListMeshGatewaysName,
|
||||
&cachetype.FederationStateListMeshGateways{RPC: a})
|
||||
|
||||
|
|
|
@ -0,0 +1,51 @@
|
|||
package cachetype
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
|
||||
"github.com/hashicorp/consul/agent/cache"
|
||||
"github.com/hashicorp/consul/agent/structs"
|
||||
)
|
||||
|
||||
// Recommended name for registration.
|
||||
const ExportedPeeredServicesName = "exported-peered-services"
|
||||
|
||||
type ExportedPeeredServices struct {
|
||||
RegisterOptionsBlockingRefresh
|
||||
RPC RPC
|
||||
}
|
||||
|
||||
func (c *ExportedPeeredServices) Fetch(opts cache.FetchOptions, req cache.Request) (cache.FetchResult, error) {
|
||||
var result cache.FetchResult
|
||||
|
||||
// The request should be a DCSpecificRequest.
|
||||
reqReal, ok := req.(*structs.DCSpecificRequest)
|
||||
if !ok {
|
||||
return result, fmt.Errorf(
|
||||
"Internal cache failure: request wrong type: %T", req)
|
||||
}
|
||||
|
||||
// Lightweight copy this object so that manipulating QueryOptions doesn't race.
|
||||
dup := *reqReal
|
||||
reqReal = &dup
|
||||
|
||||
// Set the minimum query index to our current index so we block
|
||||
reqReal.QueryOptions.MinQueryIndex = opts.MinIndex
|
||||
reqReal.QueryOptions.MaxQueryTime = opts.Timeout
|
||||
|
||||
// Always allow stale - there's no point in hitting leader if the request is
|
||||
// going to be served from cache and end up arbitrarily stale anyway. This
|
||||
// allows cached service-discover to automatically read scale across all
|
||||
// servers too.
|
||||
reqReal.AllowStale = true
|
||||
|
||||
// Fetch
|
||||
var reply structs.IndexedExportedServiceList
|
||||
if err := c.RPC.RPC("Internal.ExportedPeeredServices", reqReal, &reply); err != nil {
|
||||
return result, err
|
||||
}
|
||||
|
||||
result.Value = &reply
|
||||
result.Index = reply.QueryMeta.Index
|
||||
return result, nil
|
||||
}
|
|
@ -0,0 +1,69 @@
|
|||
package cachetype
|
||||
|
||||
import (
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/stretchr/testify/mock"
|
||||
"github.com/stretchr/testify/require"
|
||||
|
||||
"github.com/hashicorp/consul/agent/cache"
|
||||
"github.com/hashicorp/consul/agent/structs"
|
||||
)
|
||||
|
||||
func TestExportedPeeredServices(t *testing.T) {
|
||||
rpc := TestRPC(t)
|
||||
typ := &ExportedPeeredServices{RPC: rpc}
|
||||
|
||||
// Expect the proper RPC call. This also sets the expected value
|
||||
// since that is return-by-pointer in the arguments.
|
||||
var resp *structs.IndexedExportedServiceList
|
||||
rpc.On("RPC", "Internal.ExportedPeeredServices", mock.Anything, mock.Anything).Return(nil).
|
||||
Run(func(args mock.Arguments) {
|
||||
req := args.Get(1).(*structs.DCSpecificRequest)
|
||||
require.Equal(t, uint64(24), req.QueryOptions.MinQueryIndex)
|
||||
require.Equal(t, 1*time.Second, req.QueryOptions.MaxQueryTime)
|
||||
require.True(t, req.AllowStale)
|
||||
|
||||
reply := args.Get(2).(*structs.IndexedExportedServiceList)
|
||||
reply.Services = map[string]structs.ServiceList{
|
||||
"my-peer": {
|
||||
structs.ServiceName{
|
||||
Name: "foo",
|
||||
},
|
||||
structs.ServiceName{
|
||||
Name: "bar",
|
||||
},
|
||||
},
|
||||
}
|
||||
reply.QueryMeta.Index = 48
|
||||
resp = reply
|
||||
})
|
||||
|
||||
// Fetch
|
||||
resultA, err := typ.Fetch(cache.FetchOptions{
|
||||
MinIndex: 24,
|
||||
Timeout: 1 * time.Second,
|
||||
}, &structs.DCSpecificRequest{
|
||||
Datacenter: "dc1",
|
||||
})
|
||||
require.NoError(t, err)
|
||||
require.Equal(t, cache.FetchResult{
|
||||
Value: resp,
|
||||
Index: 48,
|
||||
}, resultA)
|
||||
|
||||
rpc.AssertExpectations(t)
|
||||
}
|
||||
|
||||
func TestExportedPeeredServices_badReqType(t *testing.T) {
|
||||
rpc := TestRPC(t)
|
||||
typ := &ExportedPeeredServices{RPC: rpc}
|
||||
|
||||
// Fetch
|
||||
_, err := typ.Fetch(cache.FetchOptions{}, cache.TestRequest(
|
||||
t, cache.RequestInfo{Key: "foo", MinIndex: 64}))
|
||||
require.Error(t, err)
|
||||
require.Contains(t, err.Error(), "wrong type")
|
||||
rpc.AssertExpectations(t)
|
||||
}
|
|
@ -1940,6 +1940,16 @@ func filterACLWithAuthorizer(logger hclog.Logger, authorizer acl.Authorizer, sub
|
|||
case *structs.IndexedServiceList:
|
||||
v.QueryMeta.ResultsFilteredByACLs = filt.filterServiceList(&v.Services)
|
||||
|
||||
case *structs.IndexedExportedServiceList:
|
||||
for peer, peerServices := range v.Services {
|
||||
v.QueryMeta.ResultsFilteredByACLs = filt.filterServiceList(&peerServices)
|
||||
if len(peerServices) == 0 {
|
||||
delete(v.Services, peer)
|
||||
} else {
|
||||
v.Services[peer] = peerServices
|
||||
}
|
||||
}
|
||||
|
||||
case *structs.IndexedGatewayServices:
|
||||
v.QueryMeta.ResultsFilteredByACLs = filt.filterGatewayServices(&v.Services)
|
||||
|
||||
|
|
|
@ -435,6 +435,39 @@ func (m *Internal) GatewayIntentions(args *structs.IntentionQueryRequest, reply
|
|||
)
|
||||
}
|
||||
|
||||
// ExportedPeeredServices is used to query the exported services for peers.
|
||||
// Returns services as a map of ServiceNames by peer.
|
||||
func (m *Internal) ExportedPeeredServices(args *structs.DCSpecificRequest, reply *structs.IndexedExportedServiceList) error {
|
||||
if done, err := m.srv.ForwardRPC("Internal.ExportedPeeredServices", args, reply); done {
|
||||
return err
|
||||
}
|
||||
|
||||
authz, err := m.srv.ResolveTokenAndDefaultMeta(args.Token, &args.EnterpriseMeta, nil)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
if err := m.srv.validateEnterpriseRequest(&args.EnterpriseMeta, false); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
// TODO(peering): acls: mesh gateway needs appropriate wildcard service:read
|
||||
|
||||
return m.srv.blockingQuery(
|
||||
&args.QueryOptions,
|
||||
&reply.QueryMeta,
|
||||
func(ws memdb.WatchSet, state *state.Store) error {
|
||||
index, serviceMap, err := state.ExportedServicesForAllPeersByName(ws, args.EnterpriseMeta)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
reply.Index, reply.Services = index, serviceMap
|
||||
m.srv.filterACLWithAuthorizer(authz, reply)
|
||||
return nil
|
||||
})
|
||||
}
|
||||
|
||||
// EventFire is a bit of an odd endpoint, but it allows for a cross-DC RPC
|
||||
// call to fire an event. The primary use case is to enable user events being
|
||||
// triggered in a remote DC.
|
||||
|
|
|
@ -310,6 +310,33 @@ func (s *Store) ExportedServicesForPeer(ws memdb.WatchSet, peerID string) (uint6
|
|||
return s.exportedServicesForPeerTxn(ws, tx, peering)
|
||||
}
|
||||
|
||||
func (s *Store) ExportedServicesForAllPeersByName(ws memdb.WatchSet, entMeta acl.EnterpriseMeta) (uint64, map[string]structs.ServiceList, error) {
|
||||
tx := s.db.ReadTxn()
|
||||
defer tx.Abort()
|
||||
|
||||
maxIdx, peerings, err := s.peeringListTxn(ws, tx, entMeta)
|
||||
if err != nil {
|
||||
return 0, nil, fmt.Errorf("failed to list peerings: %w", err)
|
||||
}
|
||||
|
||||
out := make(map[string]structs.ServiceList)
|
||||
for _, peering := range peerings {
|
||||
idx, list, err := s.exportedServicesForPeerTxn(ws, tx, peering)
|
||||
if err != nil {
|
||||
return 0, nil, fmt.Errorf("failed to list exported services for peer %q: %w", peering.ID, err)
|
||||
}
|
||||
if idx > maxIdx {
|
||||
maxIdx = idx
|
||||
}
|
||||
m := list.ListAllDiscoveryChains()
|
||||
if len(m) > 0 {
|
||||
out[peering.Name] = maps.SliceOfKeys(m)
|
||||
}
|
||||
}
|
||||
|
||||
return maxIdx, out, nil
|
||||
}
|
||||
|
||||
func (s *Store) exportedServicesForPeerTxn(ws memdb.WatchSet, tx ReadTxn, peering *pbpeering.Peering) (uint64, *structs.ExportedServiceList, error) {
|
||||
maxIdx := peering.ModifyIndex
|
||||
|
||||
|
|
|
@ -113,6 +113,12 @@ func CacheTrustBundleList(c *cache.Cache) proxycfg.TrustBundleList {
|
|||
return &cacheProxyDataSource[*pbpeering.TrustBundleListByServiceRequest]{c, cachetype.TrustBundleListName}
|
||||
}
|
||||
|
||||
// CacheExportedPeeredServices satisfies the proxycfg.ExportedPeeredServices
|
||||
// interface by sourcing data from the agent cache.
|
||||
func CacheExportedPeeredServices(c *cache.Cache) proxycfg.ExportedPeeredServices {
|
||||
return &cacheProxyDataSource[*structs.DCSpecificRequest]{c, cachetype.ExportedPeeredServicesName}
|
||||
}
|
||||
|
||||
// cacheProxyDataSource implements a generic wrapper around the agent cache to
|
||||
// provide data to the proxycfg.Manager.
|
||||
type cacheProxyDataSource[ReqType cache.Request] struct {
|
||||
|
|
|
@ -86,6 +86,10 @@ type DataSources struct {
|
|||
// peered clusters that the given proxy is exported to.
|
||||
TrustBundleList TrustBundleList
|
||||
|
||||
// ExportedPeeredServices provides updates about the list of all exported
|
||||
// services in a datacenter on a notification channel.
|
||||
ExportedPeeredServices ExportedPeeredServices
|
||||
|
||||
DataSourcesEnterprise
|
||||
}
|
||||
|
||||
|
@ -195,3 +199,9 @@ type TrustBundle interface {
|
|||
type TrustBundleList interface {
|
||||
Notify(ctx context.Context, req *pbpeering.TrustBundleListByServiceRequest, correlationID string, ch chan<- UpdateEvent) error
|
||||
}
|
||||
|
||||
// ExportedPeeredServices is the interface used to consume updates about the
|
||||
// list of all services exported to peers in a datacenter.
|
||||
type ExportedPeeredServices interface {
|
||||
Notify(ctx context.Context, req *structs.DCSpecificRequest, correlationID string, ch chan<- UpdateEvent) error
|
||||
}
|
||||
|
|
|
@ -3,10 +3,12 @@ package proxycfg
|
|||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"sort"
|
||||
"strings"
|
||||
"time"
|
||||
|
||||
"github.com/hashicorp/consul/agent/structs"
|
||||
"github.com/hashicorp/consul/lib/maps"
|
||||
"github.com/hashicorp/consul/logging"
|
||||
)
|
||||
|
||||
|
@ -67,12 +69,26 @@ func (s *handlerMeshGateway) initialize(ctx context.Context) (ConfigSnapshot, er
|
|||
return snap, err
|
||||
}
|
||||
|
||||
// Watch for all exported services from this mesh gateway's partition in any peering.
|
||||
err = s.dataSources.ExportedPeeredServices.Notify(ctx, &structs.DCSpecificRequest{
|
||||
Datacenter: s.source.Datacenter,
|
||||
QueryOptions: structs.QueryOptions{Token: s.token},
|
||||
Source: *s.source,
|
||||
EnterpriseMeta: s.proxyID.EnterpriseMeta,
|
||||
}, exportedServiceListWatchID, s.ch)
|
||||
if err != nil {
|
||||
return snap, err
|
||||
}
|
||||
|
||||
snap.MeshGateway.WatchedServices = make(map[structs.ServiceName]context.CancelFunc)
|
||||
snap.MeshGateway.WatchedGateways = make(map[string]context.CancelFunc)
|
||||
snap.MeshGateway.ServiceGroups = make(map[structs.ServiceName]structs.CheckServiceNodes)
|
||||
snap.MeshGateway.GatewayGroups = make(map[string]structs.CheckServiceNodes)
|
||||
snap.MeshGateway.ServiceResolvers = make(map[structs.ServiceName]*structs.ServiceResolverConfigEntry)
|
||||
snap.MeshGateway.HostnameDatacenters = make(map[string]structs.CheckServiceNodes)
|
||||
snap.MeshGateway.ExportedServicesWithPeers = make(map[structs.ServiceName][]string)
|
||||
snap.MeshGateway.DiscoveryChain = make(map[structs.ServiceName]*structs.CompiledDiscoveryChain)
|
||||
snap.MeshGateway.WatchedDiscoveryChains = make(map[structs.ServiceName]context.CancelFunc)
|
||||
|
||||
// there is no need to initialize the map of service resolvers as we
|
||||
// fully rebuild it every time we get updates
|
||||
|
@ -295,6 +311,80 @@ func (s *handlerMeshGateway) handleUpdate(ctx context.Context, u UpdateEvent, sn
|
|||
|
||||
snap.MeshGateway.ConsulServers = resp.Nodes
|
||||
|
||||
case exportedServiceListWatchID:
|
||||
exportedServices, ok := u.Result.(*structs.IndexedExportedServiceList)
|
||||
if !ok {
|
||||
return fmt.Errorf("invalid type for response: %T", u.Result)
|
||||
}
|
||||
|
||||
seenServices := make(map[structs.ServiceName][]string) // svc -> peername slice
|
||||
for peerName, services := range exportedServices.Services {
|
||||
for _, svc := range services {
|
||||
seenServices[svc] = append(seenServices[svc], peerName)
|
||||
}
|
||||
}
|
||||
// Sort the peer names so ultimately xDS has a stable output.
|
||||
for svc := range seenServices {
|
||||
sort.Strings(seenServices[svc])
|
||||
}
|
||||
peeredServiceList := maps.SliceOfKeys(seenServices)
|
||||
structs.ServiceList(peeredServiceList).Sort()
|
||||
|
||||
snap.MeshGateway.ExportedServicesSlice = peeredServiceList
|
||||
snap.MeshGateway.ExportedServicesWithPeers = seenServices
|
||||
snap.MeshGateway.WatchedExportedServices = exportedServices.Services
|
||||
snap.MeshGateway.WatchedExportedServicesSet = true
|
||||
|
||||
// For each service that we should be exposing, also watch disco chains
|
||||
// in the same manner as an ingress gateway would.
|
||||
|
||||
for _, svc := range snap.MeshGateway.ExportedServicesSlice {
|
||||
if _, ok := snap.MeshGateway.WatchedDiscoveryChains[svc]; ok {
|
||||
continue
|
||||
}
|
||||
|
||||
ctx, cancel := context.WithCancel(ctx)
|
||||
err := s.dataSources.CompiledDiscoveryChain.Notify(ctx, &structs.DiscoveryChainRequest{
|
||||
Datacenter: s.source.Datacenter,
|
||||
QueryOptions: structs.QueryOptions{Token: s.token},
|
||||
Name: svc.Name,
|
||||
EvaluateInDatacenter: s.source.Datacenter,
|
||||
EvaluateInNamespace: svc.NamespaceOrDefault(),
|
||||
EvaluateInPartition: svc.PartitionOrDefault(),
|
||||
}, "discovery-chain:"+svc.String(), s.ch)
|
||||
if err != nil {
|
||||
meshLogger.Error("failed to register watch for discovery chain",
|
||||
"service", svc.String(),
|
||||
"error", err,
|
||||
)
|
||||
cancel()
|
||||
return err
|
||||
}
|
||||
|
||||
snap.MeshGateway.WatchedDiscoveryChains[svc] = cancel
|
||||
}
|
||||
|
||||
// Clean up data from services that were not in the update
|
||||
|
||||
for svc, cancelFn := range snap.MeshGateway.WatchedDiscoveryChains {
|
||||
if _, ok := seenServices[svc]; !ok {
|
||||
cancelFn()
|
||||
delete(snap.MeshGateway.WatchedDiscoveryChains, svc)
|
||||
}
|
||||
}
|
||||
|
||||
// These entries are intentionally handled separately from the
|
||||
// WatchedDiscoveryChains above. There have been situations where a
|
||||
// discovery watch was cancelled, then fired. That update event then
|
||||
// re-populated the DiscoveryChain map entry, which wouldn't get
|
||||
// cleaned up since there was no known watch for it.
|
||||
|
||||
for svc := range snap.MeshGateway.DiscoveryChain {
|
||||
if _, ok := seenServices[svc]; !ok {
|
||||
delete(snap.MeshGateway.DiscoveryChain, svc)
|
||||
}
|
||||
}
|
||||
|
||||
default:
|
||||
switch {
|
||||
case strings.HasPrefix(u.CorrelationID, "connect-service:"):
|
||||
|
@ -330,6 +420,24 @@ func (s *handlerMeshGateway) handleUpdate(ctx context.Context, u UpdateEvent, sn
|
|||
)
|
||||
}
|
||||
|
||||
case strings.HasPrefix(u.CorrelationID, "discovery-chain:"):
|
||||
resp, ok := u.Result.(*structs.DiscoveryChainResponse)
|
||||
if !ok {
|
||||
return fmt.Errorf("invalid type for response: %T", u.Result)
|
||||
}
|
||||
svcString := strings.TrimPrefix(u.CorrelationID, "discovery-chain:")
|
||||
svc := structs.ServiceNameFromString(svcString)
|
||||
|
||||
if !snap.MeshGateway.IsServiceExported(svc) {
|
||||
delete(snap.MeshGateway.DiscoveryChain, svc)
|
||||
s.logger.Trace("discovery-chain watch fired for unknown service", "service", svc)
|
||||
return nil
|
||||
}
|
||||
|
||||
snap.MeshGateway.DiscoveryChain[svc] = resp.Chain
|
||||
|
||||
// TODO(peering): we need to do this if we are going to setup a cross-partition or cross-datacenter target
|
||||
|
||||
default:
|
||||
if err := s.handleEntUpdate(meshLogger, ctx, u, snap); err != nil {
|
||||
return err
|
||||
|
|
|
@ -331,6 +331,33 @@ type configSnapshotMeshGateway struct {
|
|||
// HostnameDatacenters is a map of datacenters to mesh gateway instances with a hostname as the address.
|
||||
// If hostnames are configured they must be provided to Envoy via CDS not EDS.
|
||||
HostnameDatacenters map[string]structs.CheckServiceNodes
|
||||
|
||||
// TODO(peering):
|
||||
ExportedServicesSlice []structs.ServiceName
|
||||
|
||||
// TODO(peering): svc -> peername slice
|
||||
ExportedServicesWithPeers map[structs.ServiceName][]string
|
||||
|
||||
// TODO(peering): discard this maybe
|
||||
WatchedExportedServices map[string]structs.ServiceList
|
||||
|
||||
// TODO(peering):
|
||||
WatchedExportedServicesSet bool
|
||||
|
||||
// TODO(peering):
|
||||
DiscoveryChain map[structs.ServiceName]*structs.CompiledDiscoveryChain
|
||||
|
||||
// TODO(peering):
|
||||
WatchedDiscoveryChains map[structs.ServiceName]context.CancelFunc
|
||||
}
|
||||
|
||||
func (c *configSnapshotMeshGateway) IsServiceExported(svc structs.ServiceName) bool {
|
||||
if c == nil || len(c.ExportedServicesWithPeers) == 0 {
|
||||
return false
|
||||
}
|
||||
|
||||
_, ok := c.ExportedServicesWithPeers[svc]
|
||||
return ok
|
||||
}
|
||||
|
||||
func (c *configSnapshotMeshGateway) GatewayKeys() []GatewayKey {
|
||||
|
@ -376,7 +403,22 @@ func (c *configSnapshotMeshGateway) isEmpty() bool {
|
|||
len(c.GatewayGroups) == 0 &&
|
||||
len(c.FedStateGateways) == 0 &&
|
||||
len(c.ConsulServers) == 0 &&
|
||||
len(c.HostnameDatacenters) == 0
|
||||
len(c.HostnameDatacenters) == 0 &&
|
||||
c.isEmptyPeering()
|
||||
}
|
||||
|
||||
// isEmptyPeering is a test helper
|
||||
func (c *configSnapshotMeshGateway) isEmptyPeering() bool {
|
||||
if c == nil {
|
||||
return true
|
||||
}
|
||||
|
||||
return len(c.ExportedServicesSlice) == 0 &&
|
||||
len(c.ExportedServicesWithPeers) == 0 &&
|
||||
len(c.WatchedExportedServices) == 0 &&
|
||||
!c.WatchedExportedServicesSet &&
|
||||
len(c.DiscoveryChain) == 0 &&
|
||||
len(c.WatchedDiscoveryChains) == 0
|
||||
}
|
||||
|
||||
type configSnapshotIngressGateway struct {
|
||||
|
@ -496,7 +538,8 @@ func (s *ConfigSnapshot) Valid() bool {
|
|||
}
|
||||
}
|
||||
return s.Roots != nil &&
|
||||
(s.MeshGateway.WatchedServicesSet || len(s.MeshGateway.ServiceGroups) > 0)
|
||||
(s.MeshGateway.WatchedServicesSet || len(s.MeshGateway.ServiceGroups) > 0) &&
|
||||
s.MeshGateway.WatchedExportedServicesSet
|
||||
|
||||
case structs.ServiceKindIngressGateway:
|
||||
return s.Roots != nil &&
|
||||
|
|
|
@ -37,6 +37,7 @@ const (
|
|||
serviceIntentionsIDPrefix = "service-intentions:"
|
||||
intentionUpstreamsID = "intention-upstreams"
|
||||
upstreamPeerWatchIDPrefix = "upstream-peer:"
|
||||
exportedServiceListWatchID = "exported-service-list"
|
||||
meshConfigEntryID = "mesh"
|
||||
svcChecksWatchIDPrefix = cachetype.ServiceHTTPChecksName + ":"
|
||||
preparedQueryIDPrefix = string(structs.UpstreamDestTypePreparedQuery) + ":"
|
||||
|
|
|
@ -136,6 +136,7 @@ func recordWatches(sc *stateConfig) *watchRecorder {
|
|||
ServiceList: typedWatchRecorder[*structs.DCSpecificRequest]{wr},
|
||||
TrustBundle: typedWatchRecorder[*pbpeering.TrustBundleReadRequest]{wr},
|
||||
TrustBundleList: typedWatchRecorder[*pbpeering.TrustBundleListByServiceRequest]{wr},
|
||||
ExportedPeeredServices: typedWatchRecorder[*structs.DCSpecificRequest]{wr},
|
||||
}
|
||||
recordWatchesEnterprise(sc, wr)
|
||||
|
||||
|
@ -725,9 +726,10 @@ func TestState_WatchesAndUpdates(t *testing.T) {
|
|||
stages: []verificationStage{
|
||||
{
|
||||
requiredWatches: map[string]verifyWatchRequest{
|
||||
datacentersWatchID: verifyDatacentersWatch,
|
||||
serviceListWatchID: genVerifyDCSpecificWatch("dc1"),
|
||||
rootsWatchID: genVerifyDCSpecificWatch("dc1"),
|
||||
datacentersWatchID: verifyDatacentersWatch,
|
||||
serviceListWatchID: genVerifyDCSpecificWatch("dc1"),
|
||||
rootsWatchID: genVerifyDCSpecificWatch("dc1"),
|
||||
exportedServiceListWatchID: genVerifyDCSpecificWatch("dc1"),
|
||||
},
|
||||
verifySnapshot: func(t testing.TB, snap *ConfigSnapshot) {
|
||||
require.False(t, snap.Valid(), "gateway without root is not valid")
|
||||
|
@ -737,6 +739,12 @@ func TestState_WatchesAndUpdates(t *testing.T) {
|
|||
{
|
||||
events: []UpdateEvent{
|
||||
rootWatchEvent(),
|
||||
{
|
||||
CorrelationID: exportedServiceListWatchID,
|
||||
Result: &structs.IndexedExportedServiceList{
|
||||
Services: nil,
|
||||
},
|
||||
},
|
||||
},
|
||||
verifySnapshot: func(t testing.TB, snap *ConfigSnapshot) {
|
||||
require.False(t, snap.Valid(), "gateway without services is valid")
|
||||
|
@ -786,12 +794,19 @@ func TestState_WatchesAndUpdates(t *testing.T) {
|
|||
stages: []verificationStage{
|
||||
{
|
||||
requiredWatches: map[string]verifyWatchRequest{
|
||||
datacentersWatchID: verifyDatacentersWatch,
|
||||
serviceListWatchID: genVerifyDCSpecificWatch("dc1"),
|
||||
rootsWatchID: genVerifyDCSpecificWatch("dc1"),
|
||||
datacentersWatchID: verifyDatacentersWatch,
|
||||
serviceListWatchID: genVerifyDCSpecificWatch("dc1"),
|
||||
rootsWatchID: genVerifyDCSpecificWatch("dc1"),
|
||||
exportedServiceListWatchID: genVerifyDCSpecificWatch("dc1"),
|
||||
},
|
||||
events: []UpdateEvent{
|
||||
rootWatchEvent(),
|
||||
{
|
||||
CorrelationID: exportedServiceListWatchID,
|
||||
Result: &structs.IndexedExportedServiceList{
|
||||
Services: nil,
|
||||
},
|
||||
},
|
||||
{
|
||||
CorrelationID: serviceListWatchID,
|
||||
Result: &structs.IndexedServiceList{
|
||||
|
|
|
@ -721,6 +721,7 @@ func testConfigSnapshotFixture(
|
|||
ServiceList: &noopDataSource[*structs.DCSpecificRequest]{},
|
||||
TrustBundle: &noopDataSource[*pbpeering.TrustBundleReadRequest]{},
|
||||
TrustBundleList: &noopDataSource[*pbpeering.TrustBundleListByServiceRequest]{},
|
||||
ExportedPeeredServices: &noopDataSource[*structs.DCSpecificRequest]{},
|
||||
},
|
||||
dnsConfig: DNSConfig{ // TODO: make configurable
|
||||
Domain: "consul",
|
||||
|
|
|
@ -5,7 +5,10 @@ import (
|
|||
"time"
|
||||
|
||||
"github.com/mitchellh/go-testing-interface"
|
||||
"github.com/stretchr/testify/assert"
|
||||
|
||||
"github.com/hashicorp/consul/agent/connect"
|
||||
"github.com/hashicorp/consul/agent/consul/discoverychain"
|
||||
"github.com/hashicorp/consul/agent/structs"
|
||||
)
|
||||
|
||||
|
@ -20,6 +23,50 @@ func TestConfigSnapshotMeshGateway(t testing.T, variant string, nsFn func(ns *st
|
|||
|
||||
switch variant {
|
||||
case "default":
|
||||
case "peered-services":
|
||||
var (
|
||||
fooSN = structs.NewServiceName("foo", nil)
|
||||
barSN = structs.NewServiceName("bar", nil)
|
||||
girSN = structs.NewServiceName("gir", nil)
|
||||
|
||||
fooChain = discoverychain.TestCompileConfigEntries(t, "foo", "default", "default", "dc1", connect.TestClusterID+".consul", nil)
|
||||
barChain = discoverychain.TestCompileConfigEntries(t, "bar", "default", "default", "dc1", connect.TestClusterID+".consul", nil)
|
||||
girChain = discoverychain.TestCompileConfigEntries(t, "gir", "default", "default", "dc1", connect.TestClusterID+".consul", nil)
|
||||
)
|
||||
|
||||
assert.True(t, fooChain.Default)
|
||||
assert.True(t, barChain.Default)
|
||||
assert.True(t, girChain.Default)
|
||||
|
||||
extraUpdates = append(extraUpdates,
|
||||
UpdateEvent{
|
||||
CorrelationID: exportedServiceListWatchID,
|
||||
Result: &structs.IndexedExportedServiceList{
|
||||
Services: map[string]structs.ServiceList{
|
||||
"peer1": []structs.ServiceName{fooSN, barSN},
|
||||
"peer2": []structs.ServiceName{girSN},
|
||||
},
|
||||
},
|
||||
},
|
||||
UpdateEvent{
|
||||
CorrelationID: "discovery-chain:" + fooSN.String(),
|
||||
Result: &structs.DiscoveryChainResponse{
|
||||
Chain: fooChain,
|
||||
},
|
||||
},
|
||||
UpdateEvent{
|
||||
CorrelationID: "discovery-chain:" + barSN.String(),
|
||||
Result: &structs.DiscoveryChainResponse{
|
||||
Chain: barChain,
|
||||
},
|
||||
},
|
||||
UpdateEvent{
|
||||
CorrelationID: "discovery-chain:" + girSN.String(),
|
||||
Result: &structs.DiscoveryChainResponse{
|
||||
Chain: girChain,
|
||||
},
|
||||
},
|
||||
)
|
||||
case "federation-states":
|
||||
populateServices = true
|
||||
useFederationStates = true
|
||||
|
@ -257,6 +304,12 @@ func TestConfigSnapshotMeshGateway(t testing.T, variant string, nsFn func(ns *st
|
|||
CorrelationID: rootsWatchID,
|
||||
Result: roots,
|
||||
},
|
||||
{
|
||||
CorrelationID: exportedServiceListWatchID,
|
||||
Result: &structs.IndexedExportedServiceList{
|
||||
Services: nil,
|
||||
},
|
||||
},
|
||||
{
|
||||
CorrelationID: serviceListWatchID,
|
||||
Result: &structs.IndexedServiceList{
|
||||
|
|
|
@ -181,6 +181,13 @@ func (m *subscriptionManager) handleEvent(ctx context.Context, state *subscripti
|
|||
// skip checks since we just generated one from scratch
|
||||
}
|
||||
|
||||
// Scrub raft indexes
|
||||
for _, instance := range csn.Nodes {
|
||||
instance.Node.RaftIndex = nil
|
||||
instance.Service.RaftIndex = nil
|
||||
// skip checks since we just generated one from scratch
|
||||
}
|
||||
|
||||
id := servicePayloadIDPrefix + strings.TrimPrefix(u.CorrelationID, subExportedService)
|
||||
|
||||
// Just ferry this one directly along to the destination.
|
||||
|
|
|
@ -8,6 +8,11 @@ type PeeringToken struct {
|
|||
PeerID string
|
||||
}
|
||||
|
||||
type IndexedExportedServiceList struct {
|
||||
Services map[string]ServiceList
|
||||
QueryMeta
|
||||
}
|
||||
|
||||
// NOTE: this is not serialized via msgpack so it can be changed without concern.
|
||||
type ExportedServiceList struct {
|
||||
// Services is a list of exported services that apply to both standard
|
||||
|
|
|
@ -1371,6 +1371,59 @@ func (s *ResourceGenerator) makeMeshGatewayListener(name, addr string, port int,
|
|||
l := makePortListener(name, addr, port, envoy_core_v3.TrafficDirection_UNSPECIFIED)
|
||||
l.ListenerFilters = []*envoy_listener_v3.ListenerFilter{tlsInspector}
|
||||
|
||||
// Add in TCP filter chains for plain peered passthrough.
|
||||
//
|
||||
// TODO(peering): make this work for L7 as well
|
||||
// TODO(peering): make failover work
|
||||
for _, svc := range cfgSnap.MeshGateway.ExportedServicesSlice {
|
||||
peerNames, ok := cfgSnap.MeshGateway.ExportedServicesWithPeers[svc]
|
||||
if !ok {
|
||||
continue // not possible
|
||||
}
|
||||
chain, ok := cfgSnap.MeshGateway.DiscoveryChain[svc]
|
||||
if !ok {
|
||||
continue // ignore; not ready
|
||||
}
|
||||
|
||||
if structs.IsProtocolHTTPLike(chain.Protocol) {
|
||||
continue // temporary skip
|
||||
}
|
||||
|
||||
target, err := simpleChainTarget(chain)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
clusterName := CustomizeClusterName(target.Name, chain)
|
||||
|
||||
filterName := fmt.Sprintf("%s.%s.%s.%s", chain.ServiceName, chain.Namespace, chain.Partition, chain.Datacenter)
|
||||
|
||||
dcTCPProxy, err := makeTCPProxyFilter(filterName, clusterName, "mesh_gateway_local_peered.")
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
var peeredServerNames []string
|
||||
for _, peerName := range peerNames {
|
||||
peeredSNI := connect.PeeredServiceSNI(
|
||||
svc.Name,
|
||||
svc.NamespaceOrDefault(),
|
||||
svc.PartitionOrDefault(),
|
||||
peerName,
|
||||
cfgSnap.Roots.TrustDomain,
|
||||
)
|
||||
peeredServerNames = append(peeredServerNames, peeredSNI)
|
||||
}
|
||||
|
||||
l.FilterChains = append(l.FilterChains, &envoy_listener_v3.FilterChain{
|
||||
FilterChainMatch: &envoy_listener_v3.FilterChainMatch{
|
||||
ServerNames: peeredServerNames,
|
||||
},
|
||||
Filters: []*envoy_listener_v3.Filter{
|
||||
dcTCPProxy,
|
||||
},
|
||||
})
|
||||
}
|
||||
|
||||
// We need 1 Filter Chain per remote cluster
|
||||
keys := cfgSnap.MeshGateway.GatewayKeys()
|
||||
for _, key := range keys {
|
||||
|
|
|
@ -476,6 +476,12 @@ func TestListenersFromSnapshot(t *testing.T) {
|
|||
return proxycfg.TestConfigSnapshotMeshGateway(t, "no-services", nil, nil)
|
||||
},
|
||||
},
|
||||
{
|
||||
name: "mesh-gateway-with-exported-peered-services",
|
||||
create: func(t testinf.T) *proxycfg.ConfigSnapshot {
|
||||
return proxycfg.TestConfigSnapshotMeshGateway(t, "peered-services", nil, nil)
|
||||
},
|
||||
},
|
||||
{
|
||||
name: "mesh-gateway-tagged-addresses",
|
||||
create: func(t testinf.T) *proxycfg.ConfigSnapshot {
|
||||
|
|
147
agent/xds/testdata/listeners/mesh-gateway-with-exported-peered-services.latest.golden
vendored
Normal file
147
agent/xds/testdata/listeners/mesh-gateway-with-exported-peered-services.latest.golden
vendored
Normal file
|
@ -0,0 +1,147 @@
|
|||
{
|
||||
"versionInfo": "00000001",
|
||||
"resources": [
|
||||
{
|
||||
"@type": "type.googleapis.com/envoy.config.listener.v3.Listener",
|
||||
"name": "default:1.2.3.4:8443",
|
||||
"address": {
|
||||
"socketAddress": {
|
||||
"address": "1.2.3.4",
|
||||
"portValue": 8443
|
||||
}
|
||||
},
|
||||
"filterChains": [
|
||||
{
|
||||
"filterChainMatch": {
|
||||
"serverNames": [
|
||||
"bar.default.default.peer1.external.11111111-2222-3333-4444-555555555555.consul"
|
||||
]
|
||||
},
|
||||
"filters": [
|
||||
{
|
||||
"name": "envoy.filters.network.tcp_proxy",
|
||||
"typedConfig": {
|
||||
"@type": "type.googleapis.com/envoy.extensions.filters.network.tcp_proxy.v3.TcpProxy",
|
||||
"statPrefix": "mesh_gateway_local_peered.bar.default.default.dc1",
|
||||
"cluster": "bar.default.dc1.internal.11111111-2222-3333-4444-555555555555.consul"
|
||||
}
|
||||
}
|
||||
]
|
||||
},
|
||||
{
|
||||
"filterChainMatch": {
|
||||
"serverNames": [
|
||||
"foo.default.default.peer1.external.11111111-2222-3333-4444-555555555555.consul"
|
||||
]
|
||||
},
|
||||
"filters": [
|
||||
{
|
||||
"name": "envoy.filters.network.tcp_proxy",
|
||||
"typedConfig": {
|
||||
"@type": "type.googleapis.com/envoy.extensions.filters.network.tcp_proxy.v3.TcpProxy",
|
||||
"statPrefix": "mesh_gateway_local_peered.foo.default.default.dc1",
|
||||
"cluster": "foo.default.dc1.internal.11111111-2222-3333-4444-555555555555.consul"
|
||||
}
|
||||
}
|
||||
]
|
||||
},
|
||||
{
|
||||
"filterChainMatch": {
|
||||
"serverNames": [
|
||||
"gir.default.default.peer2.external.11111111-2222-3333-4444-555555555555.consul"
|
||||
]
|
||||
},
|
||||
"filters": [
|
||||
{
|
||||
"name": "envoy.filters.network.tcp_proxy",
|
||||
"typedConfig": {
|
||||
"@type": "type.googleapis.com/envoy.extensions.filters.network.tcp_proxy.v3.TcpProxy",
|
||||
"statPrefix": "mesh_gateway_local_peered.gir.default.default.dc1",
|
||||
"cluster": "gir.default.dc1.internal.11111111-2222-3333-4444-555555555555.consul"
|
||||
}
|
||||
}
|
||||
]
|
||||
},
|
||||
{
|
||||
"filterChainMatch": {
|
||||
"serverNames": [
|
||||
"*.dc2.internal.11111111-2222-3333-4444-555555555555.consul"
|
||||
]
|
||||
},
|
||||
"filters": [
|
||||
{
|
||||
"name": "envoy.filters.network.tcp_proxy",
|
||||
"typedConfig": {
|
||||
"@type": "type.googleapis.com/envoy.extensions.filters.network.tcp_proxy.v3.TcpProxy",
|
||||
"statPrefix": "mesh_gateway_remote.default.dc2",
|
||||
"cluster": "dc2.internal.11111111-2222-3333-4444-555555555555.consul"
|
||||
}
|
||||
}
|
||||
]
|
||||
},
|
||||
{
|
||||
"filterChainMatch": {
|
||||
"serverNames": [
|
||||
"*.dc4.internal.11111111-2222-3333-4444-555555555555.consul"
|
||||
]
|
||||
},
|
||||
"filters": [
|
||||
{
|
||||
"name": "envoy.filters.network.tcp_proxy",
|
||||
"typedConfig": {
|
||||
"@type": "type.googleapis.com/envoy.extensions.filters.network.tcp_proxy.v3.TcpProxy",
|
||||
"statPrefix": "mesh_gateway_remote.default.dc4",
|
||||
"cluster": "dc4.internal.11111111-2222-3333-4444-555555555555.consul"
|
||||
}
|
||||
}
|
||||
]
|
||||
},
|
||||
{
|
||||
"filterChainMatch": {
|
||||
"serverNames": [
|
||||
"*.dc6.internal.11111111-2222-3333-4444-555555555555.consul"
|
||||
]
|
||||
},
|
||||
"filters": [
|
||||
{
|
||||
"name": "envoy.filters.network.tcp_proxy",
|
||||
"typedConfig": {
|
||||
"@type": "type.googleapis.com/envoy.extensions.filters.network.tcp_proxy.v3.TcpProxy",
|
||||
"statPrefix": "mesh_gateway_remote.default.dc6",
|
||||
"cluster": "dc6.internal.11111111-2222-3333-4444-555555555555.consul"
|
||||
}
|
||||
}
|
||||
]
|
||||
},
|
||||
{
|
||||
"filters": [
|
||||
{
|
||||
"name": "envoy.filters.network.sni_cluster",
|
||||
"typedConfig": {
|
||||
"@type": "type.googleapis.com/envoy.extensions.filters.network.sni_cluster.v3.SniCluster"
|
||||
}
|
||||
},
|
||||
{
|
||||
"name": "envoy.filters.network.tcp_proxy",
|
||||
"typedConfig": {
|
||||
"@type": "type.googleapis.com/envoy.extensions.filters.network.tcp_proxy.v3.TcpProxy",
|
||||
"statPrefix": "mesh_gateway_local.default",
|
||||
"cluster": ""
|
||||
}
|
||||
}
|
||||
]
|
||||
}
|
||||
],
|
||||
"listenerFilters": [
|
||||
{
|
||||
"name": "envoy.filters.listener.tls_inspector",
|
||||
"typedConfig": {
|
||||
"@type": "type.googleapis.com/envoy.extensions.filters.listener.tls_inspector.v3.TlsInspector"
|
||||
}
|
||||
}
|
||||
]
|
||||
}
|
||||
],
|
||||
"typeUrl": "type.googleapis.com/envoy.config.listener.v3.Listener",
|
||||
"nonce": "00000001"
|
||||
}
|
Loading…
Reference in New Issue