Fix mesh gateway configuration with proxy-defaults (#15186)
* Fix mesh gateway proxy-defaults not affecting upstreams. * Clarify distinction with upstream settings Top-level mesh gateway mode in proxy-defaults and service-defaults gets merged into NodeService.Proxy.MeshGateway, and only gets merged with the mode attached to an an upstream in proxycfg/xds. * Fix mgw mode usage for peered upstreams There were a couple issues with how mgw mode was being handled for peered upstreams. For starters, mesh gateway mode from proxy-defaults and the top-level of service-defaults gets stored in NodeService.Proxy.MeshGateway, but the upstream watch for peered data was only considering the mesh gateway config attached in NodeService.Proxy.Upstreams[i]. This means that applying a mesh gateway mode via global proxy-defaults or service-defaults on the downstream would not have an effect. Separately, transparent proxy watches for peered upstreams didn't consider mesh gateway mode at all. This commit addresses the first issue by ensuring that we overlay the upstream config for peered upstreams as we do for non-peered. The second issue is addressed by re-using setupWatchesForPeeredUpstream when handling transparent proxy updates. Note that for transparent proxies we do not yet support mesh gateway mode per upstream, so the NodeService.Proxy.MeshGateway mode is used. * Fix upstream mesh gateway mode handling in xds This commit ensures that when determining the mesh gateway mode for peered upstreams we consider the NodeService.Proxy.MeshGateway config as a baseline. In absense of this change, setting a mesh gateway mode via proxy-defaults or the top-level of service-defaults will not have an effect for peered upstreams. * Merge service/proxy defaults in cfg resolver Previously the mesh gateway mode for connect proxies would be merged at three points: 1. On servers, in ComputeResolvedServiceConfig. 2. On clients, in MergeServiceConfig. 3. On clients, in proxycfg/xds. The first merge returns a ServiceConfigResponse where there is a top-level MeshGateway config from proxy/service-defaults, along with per-upstream config. The second merge combines per-upstream config specified at the service instance with per-upstream config specified centrally. The third merge combines the NodeService.Proxy.MeshGateway config containing proxy/service-defaults data with the per-upstream mode. This third merge is easy to miss, which led to peered upstreams not considering the mesh gateway mode from proxy-defaults. This commit removes the third merge, and ensures that all mesh gateway config is available at the upstream. This way proxycfg/xds do not need to do additional overlays. * Ensure that proxy-defaults is considered in wc Upstream defaults become a synthetic Upstream definition under a wildcard key "*". Now that proxycfg/xds expect Upstream definitions to have the final MeshGateway values, this commit ensures that values from proxy-defaults/service-defaults are the default for this synthetic upstream. * Add changelog. Co-authored-by: freddygv <freddy@hashicorp.com>
This commit is contained in:
parent
605ab84636
commit
9e76d274ec
|
@ -0,0 +1,3 @@
|
|||
```release-note:bug
|
||||
connect: Fix issue where mesh-gateway settings were not properly inherited from configuration entries.
|
||||
```
|
|
@ -147,7 +147,7 @@ func MergeServiceConfig(defaults *structs.ServiceConfigResponse, service *struct
|
|||
|
||||
// localUpstreams stores the upstreams seen from the local registration so that we can merge in the synthetic entries.
|
||||
// In transparent proxy mode ns.Proxy.Upstreams will likely be empty because users do not need to define upstreams explicitly.
|
||||
// So to store upstream-specific flags from central config, we add entries to ns.Proxy.Upstream with those values.
|
||||
// So to store upstream-specific flags from central config, we add entries to ns.Proxy.Upstreams with those values.
|
||||
localUpstreams := make(map[structs.ServiceID]struct{})
|
||||
|
||||
// Merge upstream defaults into the local registration
|
||||
|
|
|
@ -134,10 +134,10 @@ func ComputeResolvedServiceConfig(
|
|||
|
||||
// Then store upstreams inferred from service-defaults and mapify the overrides.
|
||||
var (
|
||||
upstreamConfigs = make(map[structs.ServiceID]*structs.UpstreamConfig)
|
||||
upstreamOverrides = make(map[structs.ServiceID]*structs.UpstreamConfig)
|
||||
upstreamDefaults *structs.UpstreamConfig
|
||||
// usConfigs stores the opaque config map for each upstream and is keyed on the upstream's ID.
|
||||
usConfigs = make(map[structs.ServiceID]map[string]interface{})
|
||||
// resolvedConfigs stores the opaque config map for each upstream and is keyed on the upstream's ID.
|
||||
resolvedConfigs = make(map[structs.ServiceID]map[string]interface{})
|
||||
)
|
||||
if serviceConf != nil && serviceConf.UpstreamConfig != nil {
|
||||
for i, override := range serviceConf.UpstreamConfig.Overrides {
|
||||
|
@ -152,18 +152,26 @@ func ComputeResolvedServiceConfig(
|
|||
continue // skip this impossible condition
|
||||
}
|
||||
seenUpstreams[override.ServiceID()] = struct{}{}
|
||||
upstreamConfigs[override.ServiceID()] = override
|
||||
upstreamOverrides[override.ServiceID()] = override
|
||||
}
|
||||
if serviceConf.UpstreamConfig.Defaults != nil {
|
||||
upstreamDefaults = serviceConf.UpstreamConfig.Defaults
|
||||
|
||||
if upstreamDefaults.MeshGateway.Mode == structs.MeshGatewayModeDefault {
|
||||
upstreamDefaults.MeshGateway.Mode = thisReply.MeshGateway.Mode
|
||||
}
|
||||
|
||||
// Store the upstream defaults under a wildcard key so that they can be applied to
|
||||
// upstreams that are inferred from intentions and do not have explicit upstream configuration.
|
||||
cfgMap := make(map[string]interface{})
|
||||
upstreamDefaults.MergeInto(cfgMap)
|
||||
|
||||
if !args.MeshGateway.IsZero() {
|
||||
cfgMap["mesh_gateway"] = args.MeshGateway
|
||||
}
|
||||
|
||||
wildcard := structs.NewServiceID(structs.WildcardSpecifier, args.WithWildcardNamespace())
|
||||
usConfigs[wildcard] = cfgMap
|
||||
resolvedConfigs[wildcard] = cfgMap
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -190,34 +198,49 @@ func ComputeResolvedServiceConfig(
|
|||
resolvedCfg["protocol"] = protocol
|
||||
}
|
||||
|
||||
// Merge centralized defaults for all upstreams before configuration for specific upstreams
|
||||
// When dialing an upstream, the goal is to flatten the mesh gateway mode in this order
|
||||
// (larger number wins):
|
||||
// 1. Value from the proxy-defaults
|
||||
// 2. Value from top-level of service-defaults (ServiceDefaults.MeshGateway)
|
||||
// 3. Value from centralized upstream defaults (ServiceDefaults.UpstreamConfig.Defaults)
|
||||
// 4. Value from local proxy registration (NodeService.Proxy.MeshGateway)
|
||||
// 5. Value from centralized upstream override (ServiceDefaults.UpstreamConfig.Overrides)
|
||||
// 6. Value from local upstream definition (NodeService.Proxy.Upstreams[].MeshGateway)
|
||||
//
|
||||
// The MeshGateway value from upstream definitions in the proxy registration override
|
||||
// the one from UpstreamConfig.Defaults and UpstreamConfig.Overrides because they are
|
||||
// specific to the proxy instance.
|
||||
//
|
||||
// Step 6 is handled by the dialer's ServiceManager in MergeServiceConfig.
|
||||
|
||||
// Start with the merged value from proxyConf and serviceConf. (steps 1-2)
|
||||
if !thisReply.MeshGateway.IsZero() {
|
||||
resolvedCfg["mesh_gateway"] = thisReply.MeshGateway
|
||||
}
|
||||
|
||||
// Merge in the upstream defaults (step 3).
|
||||
if upstreamDefaults != nil {
|
||||
upstreamDefaults.MergeInto(resolvedCfg)
|
||||
}
|
||||
|
||||
// The MeshGateway value from the proxy registration overrides the one from upstream_defaults
|
||||
// because it is specific to the proxy instance.
|
||||
//
|
||||
// The goal is to flatten the mesh gateway mode in this order:
|
||||
// 0. Value from centralized upstream_defaults
|
||||
// 1. Value from local proxy registration
|
||||
// 2. Value from centralized upstream_config
|
||||
// 3. Value from local upstream definition. This last step is done in the client's service manager.
|
||||
// Merge in the top-level mode from the proxy instance (step 4).
|
||||
if !args.MeshGateway.IsZero() {
|
||||
// This means each upstream inherits the value from the `NodeService.Proxy.MeshGateway` field.
|
||||
resolvedCfg["mesh_gateway"] = args.MeshGateway
|
||||
}
|
||||
|
||||
if upstreamConfigs[upstream] != nil {
|
||||
upstreamConfigs[upstream].MergeInto(resolvedCfg)
|
||||
// Merge in Overrides for the upstream (step 5).
|
||||
if upstreamOverrides[upstream] != nil {
|
||||
upstreamOverrides[upstream].MergeInto(resolvedCfg)
|
||||
}
|
||||
|
||||
if len(resolvedCfg) > 0 {
|
||||
usConfigs[upstream] = resolvedCfg
|
||||
resolvedConfigs[upstream] = resolvedCfg
|
||||
}
|
||||
}
|
||||
|
||||
// don't allocate the slices just to not fill them
|
||||
if len(usConfigs) == 0 {
|
||||
if len(resolvedConfigs) == 0 {
|
||||
return &thisReply, nil
|
||||
}
|
||||
|
||||
|
@ -225,14 +248,14 @@ func ComputeResolvedServiceConfig(
|
|||
// For legacy upstreams we return a map that is only keyed on the string ID, since they precede namespaces
|
||||
thisReply.UpstreamConfigs = make(map[string]map[string]interface{})
|
||||
|
||||
for us, conf := range usConfigs {
|
||||
for us, conf := range resolvedConfigs {
|
||||
thisReply.UpstreamConfigs[us.ID] = conf
|
||||
}
|
||||
|
||||
} else {
|
||||
thisReply.UpstreamIDConfigs = make(structs.OpaqueUpstreamConfigs, 0, len(usConfigs))
|
||||
thisReply.UpstreamIDConfigs = make(structs.OpaqueUpstreamConfigs, 0, len(resolvedConfigs))
|
||||
|
||||
for us, conf := range usConfigs {
|
||||
for us, conf := range resolvedConfigs {
|
||||
thisReply.UpstreamIDConfigs = append(thisReply.UpstreamIDConfigs,
|
||||
structs.OpaqueUpstreamConfig{Upstream: us, Config: conf})
|
||||
}
|
||||
|
|
|
@ -1,10 +1,12 @@
|
|||
package configentry
|
||||
|
||||
import (
|
||||
"sort"
|
||||
"testing"
|
||||
|
||||
"github.com/stretchr/testify/require"
|
||||
|
||||
"github.com/hashicorp/consul/acl"
|
||||
"github.com/hashicorp/consul/agent/structs"
|
||||
)
|
||||
|
||||
|
@ -17,8 +19,19 @@ func Test_ComputeResolvedServiceConfig(t *testing.T) {
|
|||
|
||||
sid := structs.ServiceID{
|
||||
ID: "sid",
|
||||
EnterpriseMeta: *structs.DefaultEnterpriseMetaInDefaultPartition(),
|
||||
EnterpriseMeta: *acl.DefaultEnterpriseMeta(),
|
||||
}
|
||||
uid := structs.ServiceID{
|
||||
ID: "upstream1",
|
||||
EnterpriseMeta: *acl.DefaultEnterpriseMeta(),
|
||||
}
|
||||
uids := []structs.ServiceID{uid}
|
||||
wildcard := structs.NewServiceID(structs.WildcardSpecifier, acl.WildcardEnterpriseMeta())
|
||||
|
||||
localMeshGW := structs.MeshGatewayConfig{Mode: structs.MeshGatewayModeLocal}
|
||||
remoteMeshGW := structs.MeshGatewayConfig{Mode: structs.MeshGatewayModeRemote}
|
||||
noneMeshGW := structs.MeshGatewayConfig{Mode: structs.MeshGatewayModeNone}
|
||||
|
||||
tests := []struct {
|
||||
name string
|
||||
args args
|
||||
|
@ -88,12 +101,248 @@ func Test_ComputeResolvedServiceConfig(t *testing.T) {
|
|||
},
|
||||
},
|
||||
},
|
||||
{
|
||||
name: "proxy upstream mesh-gateway inherits proxy-defaults",
|
||||
args: args{
|
||||
scReq: &structs.ServiceConfigRequest{
|
||||
Name: "sid",
|
||||
UpstreamIDs: uids,
|
||||
},
|
||||
upstreamIDs: uids,
|
||||
entries: &ResolvedServiceConfigSet{
|
||||
ProxyDefaults: map[string]*structs.ProxyConfigEntry{
|
||||
acl.DefaultEnterpriseMeta().PartitionOrDefault(): {
|
||||
MeshGateway: remoteMeshGW, // applied 1st
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
want: &structs.ServiceConfigResponse{
|
||||
MeshGateway: remoteMeshGW,
|
||||
UpstreamIDConfigs: structs.OpaqueUpstreamConfigs{
|
||||
{
|
||||
Upstream: uid,
|
||||
Config: map[string]interface{}{
|
||||
"mesh_gateway": remoteMeshGW,
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
{
|
||||
name: "proxy upstream mesh-gateway inherits service-defaults",
|
||||
args: args{
|
||||
scReq: &structs.ServiceConfigRequest{
|
||||
Name: "sid",
|
||||
UpstreamIDs: uids,
|
||||
},
|
||||
upstreamIDs: uids,
|
||||
entries: &ResolvedServiceConfigSet{
|
||||
ProxyDefaults: map[string]*structs.ProxyConfigEntry{
|
||||
acl.DefaultEnterpriseMeta().PartitionOrDefault(): {
|
||||
MeshGateway: localMeshGW, // applied 1st
|
||||
},
|
||||
},
|
||||
ServiceDefaults: map[structs.ServiceID]*structs.ServiceConfigEntry{
|
||||
sid: {
|
||||
MeshGateway: noneMeshGW, // applied 2nd
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
want: &structs.ServiceConfigResponse{
|
||||
MeshGateway: noneMeshGW, // service-defaults has a higher precedence.
|
||||
UpstreamIDConfigs: structs.OpaqueUpstreamConfigs{
|
||||
{
|
||||
Upstream: uid,
|
||||
Config: map[string]interface{}{
|
||||
"mesh_gateway": noneMeshGW,
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
{
|
||||
name: "proxy wildcard upstream mesh-gateway inherits proxy-defaults",
|
||||
args: args{
|
||||
scReq: &structs.ServiceConfigRequest{
|
||||
Name: "sid",
|
||||
Mode: structs.ProxyModeTransparent,
|
||||
},
|
||||
entries: &ResolvedServiceConfigSet{
|
||||
ProxyDefaults: map[string]*structs.ProxyConfigEntry{
|
||||
acl.DefaultEnterpriseMeta().PartitionOrDefault(): {
|
||||
MeshGateway: localMeshGW,
|
||||
},
|
||||
},
|
||||
ServiceDefaults: map[structs.ServiceID]*structs.ServiceConfigEntry{
|
||||
sid: {
|
||||
UpstreamConfig: &structs.UpstreamConfiguration{
|
||||
Defaults: &structs.UpstreamConfig{
|
||||
Protocol: "http",
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
want: &structs.ServiceConfigResponse{
|
||||
MeshGateway: localMeshGW,
|
||||
UpstreamIDConfigs: structs.OpaqueUpstreamConfigs{
|
||||
{
|
||||
Upstream: wildcard,
|
||||
Config: map[string]interface{}{
|
||||
"mesh_gateway": localMeshGW, // From proxy-defaults
|
||||
"protocol": "http",
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
{
|
||||
name: "proxy upstream mesh-gateway inherits upstream defaults",
|
||||
args: args{
|
||||
scReq: &structs.ServiceConfigRequest{
|
||||
Name: "sid",
|
||||
UpstreamIDs: uids,
|
||||
},
|
||||
upstreamIDs: uids,
|
||||
entries: &ResolvedServiceConfigSet{
|
||||
ProxyDefaults: map[string]*structs.ProxyConfigEntry{
|
||||
acl.DefaultEnterpriseMeta().PartitionOrDefault(): {
|
||||
MeshGateway: localMeshGW,
|
||||
},
|
||||
},
|
||||
ServiceDefaults: map[structs.ServiceID]*structs.ServiceConfigEntry{
|
||||
sid: {
|
||||
MeshGateway: noneMeshGW,
|
||||
UpstreamConfig: &structs.UpstreamConfiguration{
|
||||
Defaults: &structs.UpstreamConfig{
|
||||
MeshGateway: remoteMeshGW,
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
want: &structs.ServiceConfigResponse{
|
||||
MeshGateway: noneMeshGW, // Merged from proxy-defaults + service-defaults
|
||||
UpstreamIDConfigs: structs.OpaqueUpstreamConfigs{
|
||||
{
|
||||
Upstream: wildcard,
|
||||
Config: map[string]interface{}{
|
||||
// Wildcard stores the values from UpstreamConfig.Defaults directly
|
||||
"mesh_gateway": remoteMeshGW,
|
||||
},
|
||||
},
|
||||
{
|
||||
Upstream: uid,
|
||||
Config: map[string]interface{}{
|
||||
// Upstream-specific config comes from UpstreamConfig.Defaults
|
||||
"mesh_gateway": remoteMeshGW,
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
{
|
||||
name: "proxy upstream mesh-gateway inherits value from node-service",
|
||||
args: args{
|
||||
scReq: &structs.ServiceConfigRequest{
|
||||
Name: "sid",
|
||||
UpstreamIDs: uids,
|
||||
|
||||
// MeshGateway from NodeService is received in the request
|
||||
MeshGateway: remoteMeshGW,
|
||||
},
|
||||
upstreamIDs: uids,
|
||||
entries: &ResolvedServiceConfigSet{
|
||||
ServiceDefaults: map[structs.ServiceID]*structs.ServiceConfigEntry{
|
||||
sid: {
|
||||
UpstreamConfig: &structs.UpstreamConfiguration{
|
||||
Defaults: &structs.UpstreamConfig{
|
||||
MeshGateway: noneMeshGW,
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
want: &structs.ServiceConfigResponse{
|
||||
UpstreamIDConfigs: structs.OpaqueUpstreamConfigs{
|
||||
{
|
||||
Upstream: wildcard,
|
||||
Config: map[string]interface{}{
|
||||
// NodeService.Proxy.MeshGateway has a higher precedence than centralized
|
||||
// UpstreamConfig.Defaults, since it's specific to a service instance.
|
||||
"mesh_gateway": remoteMeshGW,
|
||||
},
|
||||
},
|
||||
{
|
||||
Upstream: uid,
|
||||
Config: map[string]interface{}{
|
||||
"mesh_gateway": remoteMeshGW,
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
{
|
||||
name: "proxy upstream mesh-gateway inherits value from service-defaults override",
|
||||
args: args{
|
||||
scReq: &structs.ServiceConfigRequest{
|
||||
Name: "sid",
|
||||
UpstreamIDs: uids,
|
||||
MeshGateway: localMeshGW, // applied 2nd
|
||||
},
|
||||
upstreamIDs: uids,
|
||||
entries: &ResolvedServiceConfigSet{
|
||||
ServiceDefaults: map[structs.ServiceID]*structs.ServiceConfigEntry{
|
||||
sid: {
|
||||
UpstreamConfig: &structs.UpstreamConfiguration{
|
||||
Defaults: &structs.UpstreamConfig{
|
||||
MeshGateway: localMeshGW, // applied 1st
|
||||
},
|
||||
Overrides: []*structs.UpstreamConfig{
|
||||
{
|
||||
Name: uid.ID,
|
||||
MeshGateway: remoteMeshGW, // applied 3rd
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
want: &structs.ServiceConfigResponse{
|
||||
UpstreamIDConfigs: structs.OpaqueUpstreamConfigs{
|
||||
{
|
||||
Upstream: wildcard,
|
||||
Config: map[string]interface{}{
|
||||
// Wildcard stores the values from UpstreamConfig.Defaults directly
|
||||
"mesh_gateway": localMeshGW,
|
||||
},
|
||||
},
|
||||
{
|
||||
Upstream: uid,
|
||||
Config: map[string]interface{}{
|
||||
// UpstreamConfig.Overrides has a higher precedence than UpstreamConfig.Defaults
|
||||
"mesh_gateway": remoteMeshGW,
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
}
|
||||
for _, tt := range tests {
|
||||
t.Run(tt.name, func(t *testing.T) {
|
||||
got, err := ComputeResolvedServiceConfig(tt.args.scReq, tt.args.upstreamIDs,
|
||||
false, tt.args.entries, nil)
|
||||
require.NoError(t, err)
|
||||
// This is needed because map iteration is random and determines the order of some outputs.
|
||||
sort.Slice(got.UpstreamIDConfigs, func(i, j int) bool {
|
||||
return got.UpstreamIDConfigs[i].Upstream.ID < got.UpstreamIDConfigs[j].Upstream.ID
|
||||
})
|
||||
require.Equal(t, tt.want, got)
|
||||
})
|
||||
}
|
||||
|
|
|
@ -1361,7 +1361,7 @@ func TestConfigEntry_ResolveServiceConfig_Upstreams(t *testing.T) {
|
|||
Upstream: wildcard,
|
||||
Config: map[string]interface{}{
|
||||
"mesh_gateway": map[string]interface{}{
|
||||
"Mode": "remote",
|
||||
"Mode": "none",
|
||||
},
|
||||
},
|
||||
},
|
||||
|
@ -1438,7 +1438,7 @@ func TestConfigEntry_ResolveServiceConfig_Upstreams(t *testing.T) {
|
|||
"EnforcingConsecutive5xx": int64(60),
|
||||
},
|
||||
"mesh_gateway": map[string]interface{}{
|
||||
"Mode": "remote",
|
||||
"Mode": "none",
|
||||
},
|
||||
"protocol": "http",
|
||||
},
|
||||
|
|
|
@ -197,9 +197,9 @@ func (s *handlerConnectProxy) initialize(ctx context.Context) (ConfigSnapshot, e
|
|||
|
||||
case "":
|
||||
if u.DestinationPeer != "" {
|
||||
err := s.setupWatchesForPeeredUpstream(ctx, snap.ConnectProxy, u, dc)
|
||||
err := s.setupWatchesForPeeredUpstream(ctx, snap.ConnectProxy, NewUpstreamID(&u), u.MeshGateway.Mode, dc)
|
||||
if err != nil {
|
||||
return snap, err
|
||||
return snap, fmt.Errorf("failed to setup watches for peered upstream %q: %w", uid.String(), err)
|
||||
}
|
||||
continue
|
||||
}
|
||||
|
@ -211,7 +211,7 @@ func (s *handlerConnectProxy) initialize(ctx context.Context) (ConfigSnapshot, e
|
|||
EvaluateInDatacenter: dc,
|
||||
EvaluateInNamespace: ns,
|
||||
EvaluateInPartition: partition,
|
||||
OverrideMeshGateway: s.proxyCfg.MeshGateway.OverlayWith(u.MeshGateway),
|
||||
OverrideMeshGateway: u.MeshGateway,
|
||||
OverrideProtocol: cfg.Protocol,
|
||||
OverrideConnectTimeout: cfg.ConnectTimeout(),
|
||||
}, "discovery-chain:"+uid.String(), s.ch)
|
||||
|
@ -230,31 +230,30 @@ func (s *handlerConnectProxy) initialize(ctx context.Context) (ConfigSnapshot, e
|
|||
func (s *handlerConnectProxy) setupWatchesForPeeredUpstream(
|
||||
ctx context.Context,
|
||||
snapConnectProxy configSnapshotConnectProxy,
|
||||
u structs.Upstream,
|
||||
uid UpstreamID,
|
||||
mgwMode structs.MeshGatewayMode,
|
||||
dc string,
|
||||
) error {
|
||||
uid := NewUpstreamID(&u)
|
||||
|
||||
s.logger.Trace("initializing watch of peered upstream", "upstream", uid)
|
||||
|
||||
// NOTE: An upstream that points to a peer by definition will
|
||||
// only ever watch a single catalog query, so a map key of just
|
||||
// "UID" is sufficient to cover the peer data watches here.
|
||||
snapConnectProxy.PeerUpstreamEndpoints.InitWatch(uid, nil)
|
||||
err := s.dataSources.Health.Notify(ctx, &structs.ServiceSpecificRequest{
|
||||
PeerName: uid.Peer,
|
||||
Datacenter: dc,
|
||||
QueryOptions: structs.QueryOptions{
|
||||
Token: s.token,
|
||||
},
|
||||
ServiceName: u.DestinationName,
|
||||
ServiceName: uid.Name,
|
||||
Connect: true,
|
||||
Source: *s.source,
|
||||
EnterpriseMeta: uid.EnterpriseMeta,
|
||||
}, upstreamPeerWatchIDPrefix+uid.String(), s.ch)
|
||||
if err != nil {
|
||||
return err
|
||||
return fmt.Errorf("failed to watch health for %s: %v", uid, err)
|
||||
}
|
||||
snapConnectProxy.PeerUpstreamEndpoints.InitWatch(uid, nil)
|
||||
|
||||
// Check whether a watch for this peer exists to avoid duplicates.
|
||||
if ok := snapConnectProxy.UpstreamPeerTrustBundles.IsWatched(uid.Peer); !ok {
|
||||
|
@ -275,7 +274,7 @@ func (s *handlerConnectProxy) setupWatchesForPeeredUpstream(
|
|||
|
||||
// If a peered upstream is set to local mesh gw mode,
|
||||
// set up a watch for them.
|
||||
if u.MeshGateway.Mode == structs.MeshGatewayModeLocal {
|
||||
if mgwMode == structs.MeshGatewayModeLocal {
|
||||
gk := GatewayKey{
|
||||
Partition: s.source.NodePartitionOrDefault(),
|
||||
Datacenter: s.source.Datacenter,
|
||||
|
@ -343,44 +342,9 @@ func (s *handlerConnectProxy) handleUpdate(ctx context.Context, u UpdateEvent, s
|
|||
}
|
||||
seenUpstreams[uid] = struct{}{}
|
||||
|
||||
s.logger.Trace("initializing watch of peered upstream", "upstream", uid)
|
||||
|
||||
hctx, hcancel := context.WithCancel(ctx)
|
||||
err := s.dataSources.Health.Notify(hctx, &structs.ServiceSpecificRequest{
|
||||
PeerName: uid.Peer,
|
||||
Datacenter: s.source.Datacenter,
|
||||
QueryOptions: structs.QueryOptions{
|
||||
Token: s.token,
|
||||
},
|
||||
ServiceName: psn.ServiceName.Name,
|
||||
Connect: true,
|
||||
// Note that Identifier doesn't type-prefix for service any more as it's
|
||||
// the default and makes metrics and other things much cleaner. It's
|
||||
// simpler for us if we have the type to make things unambiguous.
|
||||
Source: *s.source,
|
||||
EnterpriseMeta: uid.EnterpriseMeta,
|
||||
}, upstreamPeerWatchIDPrefix+uid.String(), s.ch)
|
||||
err := s.setupWatchesForPeeredUpstream(ctx, snap.ConnectProxy, uid, s.proxyCfg.MeshGateway.Mode, s.source.Datacenter)
|
||||
if err != nil {
|
||||
hcancel()
|
||||
return fmt.Errorf("failed to watch health for %s: %v", uid, err)
|
||||
}
|
||||
snap.ConnectProxy.PeerUpstreamEndpoints.InitWatch(uid, hcancel)
|
||||
|
||||
// Check whether a watch for this peer exists to avoid duplicates.
|
||||
if ok := snap.ConnectProxy.UpstreamPeerTrustBundles.IsWatched(uid.Peer); !ok {
|
||||
peerCtx, cancel := context.WithCancel(ctx)
|
||||
if err := s.dataSources.TrustBundle.Notify(peerCtx, &cachetype.TrustBundleReadRequest{
|
||||
Request: &pbpeering.TrustBundleReadRequest{
|
||||
Name: uid.Peer,
|
||||
Partition: uid.PartitionOrDefault(),
|
||||
},
|
||||
QueryOptions: structs.QueryOptions{Token: s.token},
|
||||
}, peerTrustBundleIDPrefix+uid.Peer, s.ch); err != nil {
|
||||
cancel()
|
||||
return fmt.Errorf("error while watching trust bundle for peer %q: %w", uid.Peer, err)
|
||||
}
|
||||
|
||||
snap.ConnectProxy.UpstreamPeerTrustBundles.InitWatch(uid.Peer, cancel)
|
||||
return fmt.Errorf("failed to setup watches for peered upstream %q: %w", uid.String(), err)
|
||||
}
|
||||
}
|
||||
snap.ConnectProxy.PeeredUpstreams = seenUpstreams
|
||||
|
@ -474,7 +438,7 @@ func (s *handlerConnectProxy) handleUpdate(ctx context.Context, u UpdateEvent, s
|
|||
|
||||
meshGateway := s.proxyCfg.MeshGateway
|
||||
if u != nil {
|
||||
meshGateway = meshGateway.OverlayWith(u.MeshGateway)
|
||||
meshGateway = u.MeshGateway
|
||||
}
|
||||
watchOpts := discoveryChainWatchOpts{
|
||||
id: NewUpstreamIDFromServiceName(svc),
|
||||
|
|
|
@ -556,7 +556,11 @@ func TestState_WatchesAndUpdates(t *testing.T) {
|
|||
}
|
||||
|
||||
if meshGatewayProxyConfigValue != structs.MeshGatewayModeDefault {
|
||||
ns.Proxy.MeshGateway.Mode = meshGatewayProxyConfigValue
|
||||
for i := range ns.Proxy.Upstreams {
|
||||
if u := &ns.Proxy.Upstreams[i]; u.MeshGateway.Mode == structs.MeshGatewayModeDefault {
|
||||
u.MeshGateway.Mode = meshGatewayProxyConfigValue
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
ixnMatch := TestIntentions()
|
||||
|
@ -3116,6 +3120,9 @@ func TestState_WatchesAndUpdates(t *testing.T) {
|
|||
Address: "10.0.1.1",
|
||||
Proxy: structs.ConnectProxyConfig{
|
||||
DestinationServiceName: "api",
|
||||
MeshGateway: structs.MeshGatewayConfig{
|
||||
Mode: structs.MeshGatewayModeLocal,
|
||||
},
|
||||
Mode: structs.ProxyModeTransparent,
|
||||
Upstreams: structs.Upstreams{
|
||||
{
|
||||
|
@ -3228,11 +3235,11 @@ func TestState_WatchesAndUpdates(t *testing.T) {
|
|||
},
|
||||
},
|
||||
{
|
||||
// Peered upstream will have set up 3 more watches
|
||||
requiredWatches: map[string]verifyWatchRequest{
|
||||
upstreamPeerWatchIDPrefix + extApiUID.String(): genVerifyServiceSpecificPeeredRequest("api-a", "", "dc1", "peer-a", true),
|
||||
upstreamPeerWatchIDPrefix + extDBUID.String(): genVerifyServiceSpecificPeeredRequest("db", "", "dc1", "peer-a", true),
|
||||
peerTrustBundleIDPrefix + "peer-a": genVerifyTrustBundleReadWatch("peer-a"),
|
||||
"mesh-gateway:dc1": genVerifyGatewayWatch("dc1"),
|
||||
},
|
||||
events: []UpdateEvent{
|
||||
{
|
||||
|
@ -3271,14 +3278,18 @@ func TestState_WatchesAndUpdates(t *testing.T) {
|
|||
require.True(t, ok)
|
||||
prototest.AssertDeepEqual(t, peerTrustBundles.Bundles[0], ptb)
|
||||
|
||||
// Sanity check that local upstream maps are not populated
|
||||
// Ensure that maps for non-peering endpoints are not populated.
|
||||
require.Empty(t, snap.ConnectProxy.WatchedUpstreamEndpoints[extDBUID])
|
||||
require.Empty(t, snap.ConnectProxy.PassthroughUpstreams[extDBUID])
|
||||
require.Empty(t, snap.ConnectProxy.PassthroughIndices)
|
||||
|
||||
// Local gateway is watched but there are no endpoints
|
||||
require.True(t, snap.ConnectProxy.WatchedLocalGWEndpoints.IsWatched("dc1"))
|
||||
_, ok = snap.ConnectProxy.WatchedLocalGWEndpoints.Get("dc1")
|
||||
require.False(t, ok)
|
||||
},
|
||||
},
|
||||
{
|
||||
// Add another instance of "api-a" service
|
||||
events: []UpdateEvent{
|
||||
{
|
||||
CorrelationID: upstreamPeerWatchIDPrefix + extDBUID.String(),
|
||||
|
@ -3300,6 +3311,20 @@ func TestState_WatchesAndUpdates(t *testing.T) {
|
|||
},
|
||||
},
|
||||
},
|
||||
{
|
||||
CorrelationID: "mesh-gateway:dc1",
|
||||
Result: &structs.IndexedCheckServiceNodes{
|
||||
Nodes: structs.CheckServiceNodes{
|
||||
{
|
||||
Node: &structs.Node{
|
||||
Node: "node1",
|
||||
Address: "10.1.2.3",
|
||||
},
|
||||
Service: structs.TestNodeServiceMeshGateway(t),
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
verifySnapshot: func(t testing.TB, snap *ConfigSnapshot) {
|
||||
require.True(t, snap.Valid(), "proxy with roots/leaf/intentions is valid")
|
||||
|
@ -3319,12 +3344,18 @@ func TestState_WatchesAndUpdates(t *testing.T) {
|
|||
require.NotNil(t, ep)
|
||||
require.Len(t, ep, 1)
|
||||
|
||||
require.Equal(t, 1, snap.ConnectProxy.WatchedLocalGWEndpoints.Len())
|
||||
|
||||
gwEp, _ := snap.ConnectProxy.WatchedLocalGWEndpoints.Get("dc1")
|
||||
require.NotNil(t, gwEp)
|
||||
require.Len(t, gwEp, 1)
|
||||
|
||||
// Expect a trust bundle
|
||||
ptb, ok := snap.ConnectProxy.UpstreamPeerTrustBundles.Get("peer-a")
|
||||
require.True(t, ok)
|
||||
prototest.AssertDeepEqual(t, peerTrustBundles.Bundles[0], ptb)
|
||||
|
||||
// Sanity check that local upstream maps are not populated
|
||||
// Ensure that maps for non-peering endpoints are not populated.
|
||||
require.Empty(t, snap.ConnectProxy.WatchedUpstreamEndpoints[extDBUID])
|
||||
require.Empty(t, snap.ConnectProxy.PassthroughUpstreams[extDBUID])
|
||||
require.Empty(t, snap.ConnectProxy.PassthroughIndices)
|
||||
|
@ -3377,6 +3408,9 @@ func TestState_WatchesAndUpdates(t *testing.T) {
|
|||
DestinationName: "api-a",
|
||||
DestinationPeer: "peer-a",
|
||||
LocalBindPort: 10001,
|
||||
MeshGateway: structs.MeshGatewayConfig{
|
||||
Mode: structs.MeshGatewayModeLocal,
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
|
@ -3401,6 +3435,7 @@ func TestState_WatchesAndUpdates(t *testing.T) {
|
|||
peeringTrustBundlesWatchID: genVerifyTrustBundleListWatch("web"),
|
||||
peerTrustBundleIDPrefix + "peer-a": genVerifyTrustBundleReadWatch("peer-a"),
|
||||
upstreamPeerWatchIDPrefix + extApiUID.String(): genVerifyServiceSpecificPeeredRequest("api-a", "", "dc1", "peer-a", true),
|
||||
"mesh-gateway:dc1": genVerifyGatewayWatch("dc1"),
|
||||
},
|
||||
verifySnapshot: func(t testing.TB, snap *ConfigSnapshot) {
|
||||
require.False(t, snap.Valid(), "should not be valid")
|
||||
|
@ -3423,6 +3458,11 @@ func TestState_WatchesAndUpdates(t *testing.T) {
|
|||
_, ok = snap.ConnectProxy.PeerUpstreamEndpoints.Get(extApiUID)
|
||||
require.False(t, ok) // but no data
|
||||
|
||||
// Local gateway is watched but there are no endpoints
|
||||
require.True(t, snap.ConnectProxy.WatchedLocalGWEndpoints.IsWatched("dc1"))
|
||||
_, ok = snap.ConnectProxy.WatchedLocalGWEndpoints.Get("dc1")
|
||||
require.False(t, ok)
|
||||
|
||||
require.Len(t, snap.ConnectProxy.WatchedServiceChecks, 0, "%+v", snap.ConnectProxy.WatchedServiceChecks)
|
||||
require.Len(t, snap.ConnectProxy.PreparedQueryEndpoints, 0, "%+v", snap.ConnectProxy.PreparedQueryEndpoints)
|
||||
require.Len(t, snap.ConnectProxy.InboundPeerTrustBundles, 0, "%+v", snap.ConnectProxy.InboundPeerTrustBundles)
|
||||
|
@ -3494,6 +3534,20 @@ func TestState_WatchesAndUpdates(t *testing.T) {
|
|||
},
|
||||
},
|
||||
},
|
||||
{
|
||||
CorrelationID: "mesh-gateway:dc1",
|
||||
Result: &structs.IndexedCheckServiceNodes{
|
||||
Nodes: structs.CheckServiceNodes{
|
||||
{
|
||||
Node: &structs.Node{
|
||||
Node: "node1",
|
||||
Address: "10.1.2.3",
|
||||
},
|
||||
Service: structs.TestNodeServiceMeshGateway(t),
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
verifySnapshot: func(t testing.TB, snap *ConfigSnapshot) {
|
||||
require.True(t, snap.Valid())
|
||||
|
@ -3517,8 +3571,11 @@ func TestState_WatchesAndUpdates(t *testing.T) {
|
|||
ep, _ := snap.ConnectProxy.PeerUpstreamEndpoints.Get(extApiUID)
|
||||
require.NotNil(t, ep)
|
||||
|
||||
require.Len(t, snap.ConnectProxy.WatchedServiceChecks, 0, "%+v", snap.ConnectProxy.WatchedServiceChecks)
|
||||
require.Len(t, snap.ConnectProxy.PreparedQueryEndpoints, 0, "%+v", snap.ConnectProxy.PreparedQueryEndpoints)
|
||||
require.Equal(t, 1, snap.ConnectProxy.WatchedLocalGWEndpoints.Len())
|
||||
|
||||
gwEp, _ := snap.ConnectProxy.WatchedLocalGWEndpoints.Get("dc1")
|
||||
require.NotNil(t, gwEp)
|
||||
require.Len(t, gwEp, 1)
|
||||
},
|
||||
},
|
||||
},
|
||||
|
|
|
@ -256,7 +256,7 @@ func TestConfigSnapshotPeeringLocalMeshGateway(t testing.T) *ConfigSnapshot {
|
|||
DestinationName: "payments",
|
||||
DestinationPeer: "cloud",
|
||||
LocalBindPort: 9090,
|
||||
MeshGateway: structs.MeshGatewayConfig{Mode: structs.MeshGatewayModeLocal},
|
||||
MeshGateway: structs.MeshGatewayConfig{Mode: structs.MeshGatewayModeRemote},
|
||||
}
|
||||
paymentsUID = NewUpstreamID(&paymentsUpstream)
|
||||
|
||||
|
|
|
@ -69,14 +69,6 @@ func (c *MeshGatewayConfig) IsZero() bool {
|
|||
return *c == zeroVal
|
||||
}
|
||||
|
||||
func (base *MeshGatewayConfig) OverlayWith(overlay MeshGatewayConfig) MeshGatewayConfig {
|
||||
out := *base
|
||||
if overlay.Mode != MeshGatewayModeDefault {
|
||||
out.Mode = overlay.Mode
|
||||
}
|
||||
return out
|
||||
}
|
||||
|
||||
func ValidateMeshGatewayMode(mode string) (MeshGatewayMode, error) {
|
||||
switch MeshGatewayMode(mode) {
|
||||
case MeshGatewayModeNone:
|
||||
|
|
|
@ -2,7 +2,6 @@ package structs
|
|||
|
||||
import (
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"testing"
|
||||
|
||||
"github.com/hashicorp/consul/api"
|
||||
|
@ -618,47 +617,6 @@ func TestConnectProxyConfig_UnmarshalJSON(t *testing.T) {
|
|||
}
|
||||
}
|
||||
|
||||
func TestMeshGatewayConfig_OverlayWith(t *testing.T) {
|
||||
var (
|
||||
D = MeshGatewayConfig{Mode: MeshGatewayModeDefault}
|
||||
N = MeshGatewayConfig{Mode: MeshGatewayModeNone}
|
||||
R = MeshGatewayConfig{Mode: MeshGatewayModeRemote}
|
||||
L = MeshGatewayConfig{Mode: MeshGatewayModeLocal}
|
||||
)
|
||||
|
||||
type testCase struct {
|
||||
base, overlay, expect MeshGatewayConfig
|
||||
}
|
||||
cases := []testCase{
|
||||
{D, D, D},
|
||||
{D, N, N},
|
||||
{D, R, R},
|
||||
{D, L, L},
|
||||
{N, D, N},
|
||||
{N, N, N},
|
||||
{N, R, R},
|
||||
{N, L, L},
|
||||
{R, D, R},
|
||||
{R, N, N},
|
||||
{R, R, R},
|
||||
{R, L, L},
|
||||
{L, D, L},
|
||||
{L, N, N},
|
||||
{L, R, R},
|
||||
{L, L, L},
|
||||
}
|
||||
|
||||
for _, tc := range cases {
|
||||
tc := tc
|
||||
|
||||
t.Run(fmt.Sprintf("%s overlaid with %s", tc.base.Mode, tc.overlay.Mode),
|
||||
func(t *testing.T) {
|
||||
got := tc.base.OverlayWith(tc.overlay)
|
||||
require.Equal(t, tc.expect, got)
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
func TestValidateMeshGatewayMode(t *testing.T) {
|
||||
for _, tc := range []struct {
|
||||
modeConstant string
|
||||
|
|
|
@ -547,6 +547,7 @@ func (s *ResourceGenerator) makeUpstreamLoadAssignmentForPeerService(cfgSnap *pr
|
|||
}
|
||||
|
||||
upstream := cfgSnap.ConnectProxy.UpstreamConfig[uid]
|
||||
|
||||
// If an upstream is configured with local mesh gw mode, we make a load assignment
|
||||
// from the gateway endpoints instead of those of the upstreams.
|
||||
if upstream != nil && upstream.MeshGateway.Mode == structs.MeshGatewayModeLocal {
|
||||
|
|
|
@ -29,19 +29,34 @@
|
|||
{
|
||||
"@type": "type.googleapis.com/envoy.config.cluster.v3.Cluster",
|
||||
"name": "payments.default.cloud.external.1c053652-8512-4373-90cf-5a7f6263a994.consul",
|
||||
"type": "EDS",
|
||||
"edsClusterConfig": {
|
||||
"edsConfig": {
|
||||
"ads": {
|
||||
|
||||
},
|
||||
"resourceApiVersion": "V3"
|
||||
"type": "LOGICAL_DNS",
|
||||
"connectTimeout": "5s",
|
||||
"loadAssignment": {
|
||||
"clusterName": "payments.default.cloud.external.1c053652-8512-4373-90cf-5a7f6263a994.consul",
|
||||
"endpoints": [
|
||||
{
|
||||
"lbEndpoints": [
|
||||
{
|
||||
"endpoint": {
|
||||
"address": {
|
||||
"socketAddress": {
|
||||
"address": "123.us-east-1.elb.notaws.com",
|
||||
"portValue": 8443
|
||||
}
|
||||
}
|
||||
},
|
||||
"connectTimeout": "5s",
|
||||
"healthStatus": "HEALTHY",
|
||||
"loadBalancingWeight": 1
|
||||
}
|
||||
]
|
||||
}
|
||||
]
|
||||
},
|
||||
"circuitBreakers": {
|
||||
|
||||
},
|
||||
"dnsRefreshRate": "10s",
|
||||
"dnsLookupFamily": "V4_ONLY",
|
||||
"outlierDetection": {
|
||||
"maxEjectionPercent": 100
|
||||
},
|
||||
|
|
|
@ -1,28 +1,6 @@
|
|||
{
|
||||
"versionInfo": "00000001",
|
||||
"resources": [
|
||||
{
|
||||
"@type": "type.googleapis.com/envoy.config.endpoint.v3.ClusterLoadAssignment",
|
||||
"clusterName": "payments.default.cloud.external.1c053652-8512-4373-90cf-5a7f6263a994.consul",
|
||||
"endpoints": [
|
||||
{
|
||||
"lbEndpoints": [
|
||||
{
|
||||
"endpoint": {
|
||||
"address": {
|
||||
"socketAddress": {
|
||||
"address": "10.0.0.1",
|
||||
"portValue": 1234
|
||||
}
|
||||
}
|
||||
},
|
||||
"healthStatus": "HEALTHY",
|
||||
"loadBalancingWeight": 1
|
||||
}
|
||||
]
|
||||
}
|
||||
]
|
||||
},
|
||||
{
|
||||
"@type": "type.googleapis.com/envoy.config.endpoint.v3.ClusterLoadAssignment",
|
||||
"clusterName": "refunds.default.cloud.external.1c053652-8512-4373-90cf-5a7f6263a994.consul",
|
||||
|
|
Loading…
Reference in New Issue