Fix configuration merging for implicit tproxy upstreams. (#16000)
Fix configuration merging for implicit tproxy upstreams. Change the merging logic so that the wildcard upstream has correct proxy-defaults and service-defaults values combined into it. It did not previously merge all fields, and the wildcard upstream did not exist unless service-defaults existed (it ignored proxy-defaults, essentially). Change the way we fetch upstream configuration in the xDS layer so that it falls back to the wildcard when no matching upstream is found. This is what allows implicit peer upstreams to have the correct "merged" config. Change proxycfg to always watch local mesh gateway endpoints whenever a peer upstream is found. This simplifies the logic so that we do not have to inspect the "merged" configuration on peer upstreams to extract the mesh gateway mode.
This commit is contained in:
parent
618deae657
commit
9b0984e5a6
|
@ -0,0 +1,3 @@
|
||||||
|
```release-note:breaking-change
|
||||||
|
connect: Fix configuration merging for transparent proxy upstreams. Proxy-defaults and service-defaults config entries were not correctly merged for implicit upstreams in transparent proxy mode and would result in some configuration not being applied. To avoid issues when upgrading, ensure that any proxy-defaults or service-defaults have correct configuration for upstreams, since all fields will now be properly used to configure proxies.
|
||||||
|
```
|
|
@ -4,6 +4,7 @@ import (
|
||||||
"fmt"
|
"fmt"
|
||||||
|
|
||||||
"github.com/hashicorp/go-hclog"
|
"github.com/hashicorp/go-hclog"
|
||||||
|
"github.com/imdario/mergo"
|
||||||
"github.com/mitchellh/copystructure"
|
"github.com/mitchellh/copystructure"
|
||||||
|
|
||||||
"github.com/hashicorp/consul/agent/structs"
|
"github.com/hashicorp/consul/agent/structs"
|
||||||
|
@ -20,10 +21,18 @@ func ComputeResolvedServiceConfig(
|
||||||
|
|
||||||
thisReply.MeshGateway.Mode = structs.MeshGatewayModeDefault
|
thisReply.MeshGateway.Mode = structs.MeshGatewayModeDefault
|
||||||
|
|
||||||
|
// Store the upstream defaults under a wildcard key so that they can be applied to
|
||||||
|
// upstreams that are inferred from intentions and do not have explicit upstream configuration.
|
||||||
|
wildcard := structs.NewServiceID(structs.WildcardSpecifier, args.WithWildcardNamespace())
|
||||||
|
wildcardUpstreamDefaults := make(map[string]interface{})
|
||||||
|
// resolvedConfigs stores the opaque config map for each upstream and is keyed on the upstream's ID.
|
||||||
|
resolvedConfigs := make(map[structs.ServiceID]map[string]interface{})
|
||||||
|
|
||||||
// TODO(freddy) Refactor this into smaller set of state store functions
|
// TODO(freddy) Refactor this into smaller set of state store functions
|
||||||
// Pass the WatchSet to both the service and proxy config lookups. If either is updated during the
|
// Pass the WatchSet to both the service and proxy config lookups. If either is updated during the
|
||||||
// blocking query, this function will be rerun and these state store lookups will both be current.
|
// blocking query, this function will be rerun and these state store lookups will both be current.
|
||||||
// We use the default enterprise meta to look up the global proxy defaults because they are not namespaced.
|
// We use the default enterprise meta to look up the global proxy defaults because they are not namespaced.
|
||||||
|
|
||||||
var proxyConfGlobalProtocol string
|
var proxyConfGlobalProtocol string
|
||||||
proxyConf := entries.GetProxyDefaults(args.PartitionOrDefault())
|
proxyConf := entries.GetProxyDefaults(args.PartitionOrDefault())
|
||||||
if proxyConf != nil {
|
if proxyConf != nil {
|
||||||
|
@ -32,6 +41,7 @@ func ComputeResolvedServiceConfig(
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, fmt.Errorf("failed to copy global proxy-defaults: %v", err)
|
return nil, fmt.Errorf("failed to copy global proxy-defaults: %v", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
thisReply.ProxyConfig = mapCopy.(map[string]interface{})
|
thisReply.ProxyConfig = mapCopy.(map[string]interface{})
|
||||||
thisReply.Mode = proxyConf.Mode
|
thisReply.Mode = proxyConf.Mode
|
||||||
thisReply.TransparentProxy = proxyConf.TransparentProxy
|
thisReply.TransparentProxy = proxyConf.TransparentProxy
|
||||||
|
@ -40,14 +50,20 @@ func ComputeResolvedServiceConfig(
|
||||||
thisReply.EnvoyExtensions = proxyConf.EnvoyExtensions
|
thisReply.EnvoyExtensions = proxyConf.EnvoyExtensions
|
||||||
thisReply.AccessLogs = proxyConf.AccessLogs
|
thisReply.AccessLogs = proxyConf.AccessLogs
|
||||||
|
|
||||||
// Extract the global protocol from proxyConf for upstream configs.
|
parsed, err := structs.ParseUpstreamConfigNoDefaults(mapCopy.(map[string]interface{}))
|
||||||
rawProtocol := proxyConf.Config["protocol"]
|
if err != nil {
|
||||||
if rawProtocol != nil {
|
return nil, fmt.Errorf("failed to parse upstream config map for proxy-defaults: %v", err)
|
||||||
var ok bool
|
|
||||||
proxyConfGlobalProtocol, ok = rawProtocol.(string)
|
|
||||||
if !ok {
|
|
||||||
return nil, fmt.Errorf("invalid protocol type %T", rawProtocol)
|
|
||||||
}
|
}
|
||||||
|
proxyConfGlobalProtocol = parsed.Protocol
|
||||||
|
|
||||||
|
// MeshGateway is strange. It's marshaled into UpstreamConfigs via the arbitrary map, but it
|
||||||
|
// uses concrete fields everywhere else. We always take the explicit definition here for
|
||||||
|
// wildcard upstreams and discard the user setting it via arbitrary map in proxy-defaults.
|
||||||
|
if err := mergo.MergeWithOverwrite(&wildcardUpstreamDefaults, mapCopy); err != nil {
|
||||||
|
return nil, fmt.Errorf("failed to merge upstream config map for proxy-defaults: %v", err)
|
||||||
|
}
|
||||||
|
if !proxyConf.MeshGateway.IsZero() {
|
||||||
|
wildcardUpstreamDefaults["mesh_gateway"] = proxyConf.MeshGateway
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -64,6 +80,7 @@ func ComputeResolvedServiceConfig(
|
||||||
}
|
}
|
||||||
if serviceConf.MeshGateway.Mode != structs.MeshGatewayModeDefault {
|
if serviceConf.MeshGateway.Mode != structs.MeshGatewayModeDefault {
|
||||||
thisReply.MeshGateway.Mode = serviceConf.MeshGateway.Mode
|
thisReply.MeshGateway.Mode = serviceConf.MeshGateway.Mode
|
||||||
|
wildcardUpstreamDefaults["mesh_gateway"] = serviceConf.MeshGateway
|
||||||
}
|
}
|
||||||
if serviceConf.TransparentProxy.OutboundListenerPort != 0 {
|
if serviceConf.TransparentProxy.OutboundListenerPort != 0 {
|
||||||
thisReply.TransparentProxy.OutboundListenerPort = serviceConf.TransparentProxy.OutboundListenerPort
|
thisReply.TransparentProxy.OutboundListenerPort = serviceConf.TransparentProxy.OutboundListenerPort
|
||||||
|
@ -139,10 +156,8 @@ func ComputeResolvedServiceConfig(
|
||||||
|
|
||||||
// Then store upstreams inferred from service-defaults and mapify the overrides.
|
// Then store upstreams inferred from service-defaults and mapify the overrides.
|
||||||
var (
|
var (
|
||||||
upstreamOverrides = make(map[structs.ServiceID]*structs.UpstreamConfig)
|
|
||||||
upstreamDefaults *structs.UpstreamConfig
|
upstreamDefaults *structs.UpstreamConfig
|
||||||
// resolvedConfigs stores the opaque config map for each upstream and is keyed on the upstream's ID.
|
upstreamOverrides = make(map[structs.ServiceID]*structs.UpstreamConfig)
|
||||||
resolvedConfigs = make(map[structs.ServiceID]map[string]interface{})
|
|
||||||
)
|
)
|
||||||
if serviceConf != nil && serviceConf.UpstreamConfig != nil {
|
if serviceConf != nil && serviceConf.UpstreamConfig != nil {
|
||||||
for i, override := range serviceConf.UpstreamConfig.Overrides {
|
for i, override := range serviceConf.UpstreamConfig.Overrides {
|
||||||
|
@ -161,23 +176,24 @@ func ComputeResolvedServiceConfig(
|
||||||
}
|
}
|
||||||
if serviceConf.UpstreamConfig.Defaults != nil {
|
if serviceConf.UpstreamConfig.Defaults != nil {
|
||||||
upstreamDefaults = serviceConf.UpstreamConfig.Defaults
|
upstreamDefaults = serviceConf.UpstreamConfig.Defaults
|
||||||
|
|
||||||
if upstreamDefaults.MeshGateway.Mode == structs.MeshGatewayModeDefault {
|
if upstreamDefaults.MeshGateway.Mode == structs.MeshGatewayModeDefault {
|
||||||
upstreamDefaults.MeshGateway.Mode = thisReply.MeshGateway.Mode
|
upstreamDefaults.MeshGateway.Mode = thisReply.MeshGateway.Mode
|
||||||
}
|
}
|
||||||
|
upstreamDefaults.MergeInto(wildcardUpstreamDefaults)
|
||||||
// Store the upstream defaults under a wildcard key so that they can be applied to
|
// Always add the wildcard upstream if a service-defaults default-upstream was configured.
|
||||||
// upstreams that are inferred from intentions and do not have explicit upstream configuration.
|
resolvedConfigs[wildcard] = wildcardUpstreamDefaults
|
||||||
cfgMap := make(map[string]interface{})
|
}
|
||||||
upstreamDefaults.MergeInto(cfgMap)
|
}
|
||||||
|
|
||||||
if !args.MeshGateway.IsZero() {
|
if !args.MeshGateway.IsZero() {
|
||||||
cfgMap["mesh_gateway"] = args.MeshGateway
|
wildcardUpstreamDefaults["mesh_gateway"] = args.MeshGateway
|
||||||
}
|
}
|
||||||
|
|
||||||
wildcard := structs.NewServiceID(structs.WildcardSpecifier, args.WithWildcardNamespace())
|
// Add the wildcard upstream if any fields were populated (it may have been already
|
||||||
resolvedConfigs[wildcard] = cfgMap
|
// added if a service-defaults exists). We likely could always add it without issues,
|
||||||
}
|
// but this has been existing behavior, and many unit tests would break.
|
||||||
|
if len(wildcardUpstreamDefaults) > 0 {
|
||||||
|
resolvedConfigs[wildcard] = wildcardUpstreamDefaults
|
||||||
}
|
}
|
||||||
|
|
||||||
for upstream := range seenUpstreams {
|
for upstream := range seenUpstreams {
|
||||||
|
|
|
@ -120,6 +120,14 @@ func Test_ComputeResolvedServiceConfig(t *testing.T) {
|
||||||
want: &structs.ServiceConfigResponse{
|
want: &structs.ServiceConfigResponse{
|
||||||
MeshGateway: remoteMeshGW,
|
MeshGateway: remoteMeshGW,
|
||||||
UpstreamIDConfigs: structs.OpaqueUpstreamConfigs{
|
UpstreamIDConfigs: structs.OpaqueUpstreamConfigs{
|
||||||
|
{
|
||||||
|
Upstream: wildcard,
|
||||||
|
Config: map[string]interface{}{
|
||||||
|
"mesh_gateway": structs.MeshGatewayConfig{
|
||||||
|
Mode: structs.MeshGatewayModeRemote,
|
||||||
|
},
|
||||||
|
},
|
||||||
|
},
|
||||||
{
|
{
|
||||||
Upstream: uid,
|
Upstream: uid,
|
||||||
Config: map[string]interface{}{
|
Config: map[string]interface{}{
|
||||||
|
@ -183,6 +191,15 @@ func Test_ComputeResolvedServiceConfig(t *testing.T) {
|
||||||
Path: "/tmp/accesslog.txt",
|
Path: "/tmp/accesslog.txt",
|
||||||
JSONFormat: "{ \"custom_start_time\": \"%START_TIME%\" }",
|
JSONFormat: "{ \"custom_start_time\": \"%START_TIME%\" }",
|
||||||
},
|
},
|
||||||
|
UpstreamIDConfigs: structs.OpaqueUpstreamConfigs{
|
||||||
|
{
|
||||||
|
Upstream: wildcard,
|
||||||
|
Config: map[string]interface{}{
|
||||||
|
"foo": "bar",
|
||||||
|
"mesh_gateway": remoteMeshGW,
|
||||||
|
},
|
||||||
|
},
|
||||||
|
},
|
||||||
},
|
},
|
||||||
},
|
},
|
||||||
{
|
{
|
||||||
|
@ -209,6 +226,12 @@ func Test_ComputeResolvedServiceConfig(t *testing.T) {
|
||||||
want: &structs.ServiceConfigResponse{
|
want: &structs.ServiceConfigResponse{
|
||||||
MeshGateway: noneMeshGW, // service-defaults has a higher precedence.
|
MeshGateway: noneMeshGW, // service-defaults has a higher precedence.
|
||||||
UpstreamIDConfigs: structs.OpaqueUpstreamConfigs{
|
UpstreamIDConfigs: structs.OpaqueUpstreamConfigs{
|
||||||
|
{
|
||||||
|
Upstream: wildcard,
|
||||||
|
Config: map[string]interface{}{
|
||||||
|
"mesh_gateway": noneMeshGW,
|
||||||
|
},
|
||||||
|
},
|
||||||
{
|
{
|
||||||
Upstream: uid,
|
Upstream: uid,
|
||||||
Config: map[string]interface{}{
|
Config: map[string]interface{}{
|
||||||
|
|
|
@ -1057,6 +1057,9 @@ func TestConfigEntry_ResolveServiceConfig(t *testing.T) {
|
||||||
"protocol": "http",
|
"protocol": "http",
|
||||||
},
|
},
|
||||||
UpstreamConfigs: map[string]map[string]interface{}{
|
UpstreamConfigs: map[string]map[string]interface{}{
|
||||||
|
"*": {
|
||||||
|
"foo": int64(1),
|
||||||
|
},
|
||||||
"bar": {
|
"bar": {
|
||||||
"protocol": "grpc",
|
"protocol": "grpc",
|
||||||
},
|
},
|
||||||
|
@ -1270,6 +1273,9 @@ func TestConfigEntry_ResolveServiceConfig_Upstreams(t *testing.T) {
|
||||||
"protocol": "grpc",
|
"protocol": "grpc",
|
||||||
},
|
},
|
||||||
UpstreamConfigs: map[string]map[string]interface{}{
|
UpstreamConfigs: map[string]map[string]interface{}{
|
||||||
|
"*": {
|
||||||
|
"protocol": "grpc",
|
||||||
|
},
|
||||||
"mysql": {
|
"mysql": {
|
||||||
"protocol": "http",
|
"protocol": "http",
|
||||||
},
|
},
|
||||||
|
@ -1314,6 +1320,12 @@ func TestConfigEntry_ResolveServiceConfig_Upstreams(t *testing.T) {
|
||||||
"protocol": "grpc",
|
"protocol": "grpc",
|
||||||
},
|
},
|
||||||
UpstreamIDConfigs: structs.OpaqueUpstreamConfigs{
|
UpstreamIDConfigs: structs.OpaqueUpstreamConfigs{
|
||||||
|
{
|
||||||
|
Upstream: wildcard,
|
||||||
|
Config: map[string]interface{}{
|
||||||
|
"protocol": "grpc",
|
||||||
|
},
|
||||||
|
},
|
||||||
{
|
{
|
||||||
Upstream: cache,
|
Upstream: cache,
|
||||||
Config: map[string]interface{}{
|
Config: map[string]interface{}{
|
||||||
|
@ -2052,6 +2064,9 @@ func TestConfigEntry_ResolveServiceConfig_UpstreamProxyDefaultsProtocol(t *testi
|
||||||
"protocol": "http",
|
"protocol": "http",
|
||||||
},
|
},
|
||||||
UpstreamConfigs: map[string]map[string]interface{}{
|
UpstreamConfigs: map[string]map[string]interface{}{
|
||||||
|
"*": {
|
||||||
|
"protocol": "http",
|
||||||
|
},
|
||||||
"bar": {
|
"bar": {
|
||||||
"protocol": "http",
|
"protocol": "http",
|
||||||
},
|
},
|
||||||
|
@ -2107,6 +2122,9 @@ func TestConfigEntry_ResolveServiceConfig_ProxyDefaultsProtocol_UsedForAllUpstre
|
||||||
"protocol": "http",
|
"protocol": "http",
|
||||||
},
|
},
|
||||||
UpstreamConfigs: map[string]map[string]interface{}{
|
UpstreamConfigs: map[string]map[string]interface{}{
|
||||||
|
"*": {
|
||||||
|
"protocol": "http",
|
||||||
|
},
|
||||||
"bar": {
|
"bar": {
|
||||||
"protocol": "http",
|
"protocol": "http",
|
||||||
},
|
},
|
||||||
|
|
|
@ -197,7 +197,7 @@ func (s *handlerConnectProxy) initialize(ctx context.Context) (ConfigSnapshot, e
|
||||||
|
|
||||||
case "":
|
case "":
|
||||||
if u.DestinationPeer != "" {
|
if u.DestinationPeer != "" {
|
||||||
err := s.setupWatchesForPeeredUpstream(ctx, snap.ConnectProxy, NewUpstreamID(&u), u.MeshGateway.Mode, dc)
|
err := s.setupWatchesForPeeredUpstream(ctx, snap.ConnectProxy, NewUpstreamID(&u), dc)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return snap, fmt.Errorf("failed to setup watches for peered upstream %q: %w", uid.String(), err)
|
return snap, fmt.Errorf("failed to setup watches for peered upstream %q: %w", uid.String(), err)
|
||||||
}
|
}
|
||||||
|
@ -231,7 +231,6 @@ func (s *handlerConnectProxy) setupWatchesForPeeredUpstream(
|
||||||
ctx context.Context,
|
ctx context.Context,
|
||||||
snapConnectProxy configSnapshotConnectProxy,
|
snapConnectProxy configSnapshotConnectProxy,
|
||||||
uid UpstreamID,
|
uid UpstreamID,
|
||||||
mgwMode structs.MeshGatewayMode,
|
|
||||||
dc string,
|
dc string,
|
||||||
) error {
|
) error {
|
||||||
s.logger.Trace("initializing watch of peered upstream", "upstream", uid)
|
s.logger.Trace("initializing watch of peered upstream", "upstream", uid)
|
||||||
|
@ -272,14 +271,10 @@ func (s *handlerConnectProxy) setupWatchesForPeeredUpstream(
|
||||||
snapConnectProxy.UpstreamPeerTrustBundles.InitWatch(uid.Peer, cancel)
|
snapConnectProxy.UpstreamPeerTrustBundles.InitWatch(uid.Peer, cancel)
|
||||||
}
|
}
|
||||||
|
|
||||||
// If a peered upstream is set to local mesh gw mode,
|
// Always watch local GW endpoints for peer upstreams so that we don't have to worry about
|
||||||
// set up a watch for them.
|
// the timing on whether the wildcard upstream config was fetched yet.
|
||||||
if mgwMode == structs.MeshGatewayModeLocal {
|
|
||||||
up := &handlerUpstreams{handlerState: s.handlerState}
|
up := &handlerUpstreams{handlerState: s.handlerState}
|
||||||
up.setupWatchForLocalGWEndpoints(ctx, &snapConnectProxy.ConfigSnapshotUpstreams)
|
up.setupWatchForLocalGWEndpoints(ctx, &snapConnectProxy.ConfigSnapshotUpstreams)
|
||||||
} else if mgwMode == structs.MeshGatewayModeNone {
|
|
||||||
s.logger.Warn(fmt.Sprintf("invalid mesh gateway mode 'none', defaulting to 'remote' for %q", uid))
|
|
||||||
}
|
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -329,7 +324,7 @@ func (s *handlerConnectProxy) handleUpdate(ctx context.Context, u UpdateEvent, s
|
||||||
}
|
}
|
||||||
seenUpstreams[uid] = struct{}{}
|
seenUpstreams[uid] = struct{}{}
|
||||||
|
|
||||||
err := s.setupWatchesForPeeredUpstream(ctx, snap.ConnectProxy, uid, s.proxyCfg.MeshGateway.Mode, s.source.Datacenter)
|
err := s.setupWatchesForPeeredUpstream(ctx, snap.ConnectProxy, uid, s.source.Datacenter)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return fmt.Errorf("failed to setup watches for peered upstream %q: %w", uid.String(), err)
|
return fmt.Errorf("failed to setup watches for peered upstream %q: %w", uid.String(), err)
|
||||||
}
|
}
|
||||||
|
@ -402,8 +397,7 @@ func (s *handlerConnectProxy) handleUpdate(ctx context.Context, u UpdateEvent, s
|
||||||
// Use the centralized upstream defaults if they exist and there isn't specific configuration for this upstream
|
// Use the centralized upstream defaults if they exist and there isn't specific configuration for this upstream
|
||||||
// This is only relevant to upstreams from intentions because for explicit upstreams the defaulting is handled
|
// This is only relevant to upstreams from intentions because for explicit upstreams the defaulting is handled
|
||||||
// by the ResolveServiceConfig endpoint.
|
// by the ResolveServiceConfig endpoint.
|
||||||
wildcardSID := structs.NewServiceID(structs.WildcardSpecifier, s.proxyID.WithWildcardNamespace())
|
wildcardUID := NewWildcardUID(&s.proxyID.EnterpriseMeta)
|
||||||
wildcardUID := NewUpstreamIDFromServiceID(wildcardSID)
|
|
||||||
defaults, ok := snap.ConnectProxy.UpstreamConfig[wildcardUID]
|
defaults, ok := snap.ConnectProxy.UpstreamConfig[wildcardUID]
|
||||||
if ok {
|
if ok {
|
||||||
u = defaults
|
u = defaults
|
||||||
|
|
|
@ -180,3 +180,11 @@ func UpstreamsToMap(us structs.Upstreams) map[UpstreamID]*structs.Upstream {
|
||||||
}
|
}
|
||||||
return upstreamMap
|
return upstreamMap
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func NewWildcardUID(entMeta *acl.EnterpriseMeta) UpstreamID {
|
||||||
|
wildcardSID := structs.NewServiceID(
|
||||||
|
structs.WildcardSpecifier,
|
||||||
|
entMeta.WithWildcardNamespace(),
|
||||||
|
)
|
||||||
|
return NewUpstreamIDFromServiceID(wildcardSID)
|
||||||
|
}
|
||||||
|
|
|
@ -189,6 +189,20 @@ func (c *configSnapshotConnectProxy) IsImplicitUpstream(uid UpstreamID) bool {
|
||||||
return intentionImplicit || peeringImplicit
|
return intentionImplicit || peeringImplicit
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (c *configSnapshotConnectProxy) GetUpstream(uid UpstreamID, entMeta *acl.EnterpriseMeta) (*structs.Upstream, bool) {
|
||||||
|
upstream, found := c.UpstreamConfig[uid]
|
||||||
|
// We should fallback to the wildcard defaults generated from service-defaults + proxy-defaults
|
||||||
|
// whenever we don't find the upstream config.
|
||||||
|
if !found {
|
||||||
|
wildcardUID := NewWildcardUID(entMeta)
|
||||||
|
upstream = c.UpstreamConfig[wildcardUID]
|
||||||
|
}
|
||||||
|
|
||||||
|
explicit := upstream != nil && upstream.HasLocalPortOrSocket()
|
||||||
|
implicit := c.IsImplicitUpstream(uid)
|
||||||
|
return upstream, !implicit && !explicit
|
||||||
|
}
|
||||||
|
|
||||||
type configSnapshotTerminatingGateway struct {
|
type configSnapshotTerminatingGateway struct {
|
||||||
MeshConfig *structs.MeshConfigEntry
|
MeshConfig *structs.MeshConfigEntry
|
||||||
MeshConfigSet bool
|
MeshConfigSet bool
|
||||||
|
|
|
@ -6,10 +6,12 @@ import (
|
||||||
"fmt"
|
"fmt"
|
||||||
"os"
|
"os"
|
||||||
"path/filepath"
|
"path/filepath"
|
||||||
|
"sort"
|
||||||
"testing"
|
"testing"
|
||||||
|
|
||||||
"github.com/stretchr/testify/require"
|
"github.com/stretchr/testify/require"
|
||||||
|
|
||||||
|
"github.com/hashicorp/consul/acl"
|
||||||
"github.com/hashicorp/consul/agent/structs"
|
"github.com/hashicorp/consul/agent/structs"
|
||||||
"github.com/hashicorp/consul/sdk/testutil/retry"
|
"github.com/hashicorp/consul/sdk/testutil/retry"
|
||||||
"github.com/hashicorp/consul/testrpc"
|
"github.com/hashicorp/consul/testrpc"
|
||||||
|
@ -419,7 +421,13 @@ func TestServiceManager_PersistService_API(t *testing.T) {
|
||||||
"protocol": "http",
|
"protocol": "http",
|
||||||
},
|
},
|
||||||
UpstreamIDConfigs: structs.OpaqueUpstreamConfigs{
|
UpstreamIDConfigs: structs.OpaqueUpstreamConfigs{
|
||||||
structs.OpaqueUpstreamConfig{
|
{
|
||||||
|
Upstream: structs.NewServiceID(structs.WildcardSpecifier, acl.DefaultEnterpriseMeta().WithWildcardNamespace()),
|
||||||
|
Config: map[string]interface{}{
|
||||||
|
"foo": int64(1),
|
||||||
|
},
|
||||||
|
},
|
||||||
|
{
|
||||||
Upstream: structs.NewServiceID("redis", nil),
|
Upstream: structs.NewServiceID("redis", nil),
|
||||||
Config: map[string]interface{}{
|
Config: map[string]interface{}{
|
||||||
"protocol": "tcp",
|
"protocol": "tcp",
|
||||||
|
@ -429,7 +437,7 @@ func TestServiceManager_PersistService_API(t *testing.T) {
|
||||||
},
|
},
|
||||||
EnterpriseMeta: *structs.DefaultEnterpriseMetaInDefaultPartition(),
|
EnterpriseMeta: *structs.DefaultEnterpriseMetaInDefaultPartition(),
|
||||||
}
|
}
|
||||||
expectJSONFile(t, configFile, pcfg, resetDefaultsQueryMeta)
|
expectJSONFile(t, configFile, pcfg, fixPersistedServiceConfigForTest)
|
||||||
|
|
||||||
// Verify in memory state.
|
// Verify in memory state.
|
||||||
{
|
{
|
||||||
|
@ -467,7 +475,13 @@ func TestServiceManager_PersistService_API(t *testing.T) {
|
||||||
"protocol": "http",
|
"protocol": "http",
|
||||||
},
|
},
|
||||||
UpstreamIDConfigs: structs.OpaqueUpstreamConfigs{
|
UpstreamIDConfigs: structs.OpaqueUpstreamConfigs{
|
||||||
structs.OpaqueUpstreamConfig{
|
{
|
||||||
|
Upstream: structs.NewServiceID(structs.WildcardSpecifier, acl.DefaultEnterpriseMeta().WithWildcardNamespace()),
|
||||||
|
Config: map[string]interface{}{
|
||||||
|
"foo": int64(1),
|
||||||
|
},
|
||||||
|
},
|
||||||
|
{
|
||||||
Upstream: structs.NewServiceID("redis", nil),
|
Upstream: structs.NewServiceID("redis", nil),
|
||||||
Config: map[string]interface{}{
|
Config: map[string]interface{}{
|
||||||
"protocol": "tcp",
|
"protocol": "tcp",
|
||||||
|
@ -477,7 +491,7 @@ func TestServiceManager_PersistService_API(t *testing.T) {
|
||||||
},
|
},
|
||||||
EnterpriseMeta: *structs.DefaultEnterpriseMetaInDefaultPartition(),
|
EnterpriseMeta: *structs.DefaultEnterpriseMetaInDefaultPartition(),
|
||||||
}
|
}
|
||||||
expectJSONFile(t, configFile, pcfg, resetDefaultsQueryMeta)
|
expectJSONFile(t, configFile, pcfg, fixPersistedServiceConfigForTest)
|
||||||
|
|
||||||
// Verify in memory state.
|
// Verify in memory state.
|
||||||
expectState.Proxy.LocalServicePort = 8001
|
expectState.Proxy.LocalServicePort = 8001
|
||||||
|
@ -646,7 +660,13 @@ func TestServiceManager_PersistService_ConfigFiles(t *testing.T) {
|
||||||
"protocol": "http",
|
"protocol": "http",
|
||||||
},
|
},
|
||||||
UpstreamIDConfigs: structs.OpaqueUpstreamConfigs{
|
UpstreamIDConfigs: structs.OpaqueUpstreamConfigs{
|
||||||
structs.OpaqueUpstreamConfig{
|
{
|
||||||
|
Upstream: structs.NewServiceID(structs.WildcardSpecifier, acl.DefaultEnterpriseMeta().WithWildcardNamespace()),
|
||||||
|
Config: map[string]interface{}{
|
||||||
|
"foo": int64(1),
|
||||||
|
},
|
||||||
|
},
|
||||||
|
{
|
||||||
Upstream: structs.NewServiceID("redis", nil),
|
Upstream: structs.NewServiceID("redis", nil),
|
||||||
Config: map[string]interface{}{
|
Config: map[string]interface{}{
|
||||||
"protocol": "tcp",
|
"protocol": "tcp",
|
||||||
|
@ -655,7 +675,7 @@ func TestServiceManager_PersistService_ConfigFiles(t *testing.T) {
|
||||||
},
|
},
|
||||||
},
|
},
|
||||||
EnterpriseMeta: *structs.DefaultEnterpriseMetaInDefaultPartition(),
|
EnterpriseMeta: *structs.DefaultEnterpriseMetaInDefaultPartition(),
|
||||||
}, resetDefaultsQueryMeta)
|
}, fixPersistedServiceConfigForTest)
|
||||||
|
|
||||||
// Verify in memory state.
|
// Verify in memory state.
|
||||||
{
|
{
|
||||||
|
@ -813,6 +833,23 @@ func expectJSONFile(t *testing.T, file string, expect interface{}, fixupContentB
|
||||||
require.JSONEq(t, string(expected), string(content))
|
require.JSONEq(t, string(expected), string(content))
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func fixPersistedServiceConfigForTest(content []byte) ([]byte, error) {
|
||||||
|
var parsed persistedServiceConfig
|
||||||
|
if err := json.Unmarshal(content, &parsed); err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
// Sort the output, since it's randomized and causes flaky tests otherwise.
|
||||||
|
sort.Slice(parsed.Defaults.UpstreamIDConfigs, func(i, j int) bool {
|
||||||
|
return parsed.Defaults.UpstreamIDConfigs[i].Upstream.ID < parsed.Defaults.UpstreamIDConfigs[j].Upstream.ID
|
||||||
|
})
|
||||||
|
out, err := json.Marshal(parsed)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
// Clean the query meta
|
||||||
|
return resetDefaultsQueryMeta(out)
|
||||||
|
}
|
||||||
|
|
||||||
// resetDefaultsQueryMeta will reset the embedded fields from structs.QueryMeta
|
// resetDefaultsQueryMeta will reset the embedded fields from structs.QueryMeta
|
||||||
// to their zero values in the json object keyed under 'Defaults'.
|
// to their zero values in the json object keyed under 'Defaults'.
|
||||||
func resetDefaultsQueryMeta(content []byte) ([]byte, error) {
|
func resetDefaultsQueryMeta(content []byte) ([]byte, error) {
|
||||||
|
|
|
@ -87,18 +87,10 @@ func (s *ResourceGenerator) clustersFromSnapshotConnectProxy(cfgSnap *proxycfg.C
|
||||||
clusters = append(clusters, passthroughs...)
|
clusters = append(clusters, passthroughs...)
|
||||||
}
|
}
|
||||||
|
|
||||||
getUpstream := func(uid proxycfg.UpstreamID) (*structs.Upstream, bool) {
|
|
||||||
upstream := cfgSnap.ConnectProxy.UpstreamConfig[uid]
|
|
||||||
|
|
||||||
explicit := upstream.HasLocalPortOrSocket()
|
|
||||||
implicit := cfgSnap.ConnectProxy.IsImplicitUpstream(uid)
|
|
||||||
return upstream, !implicit && !explicit
|
|
||||||
}
|
|
||||||
|
|
||||||
// NOTE: Any time we skip a chain below we MUST also skip that discovery chain in endpoints.go
|
// NOTE: Any time we skip a chain below we MUST also skip that discovery chain in endpoints.go
|
||||||
// so that the sets of endpoints generated matches the sets of clusters.
|
// so that the sets of endpoints generated matches the sets of clusters.
|
||||||
for uid, chain := range cfgSnap.ConnectProxy.DiscoveryChain {
|
for uid, chain := range cfgSnap.ConnectProxy.DiscoveryChain {
|
||||||
upstream, skip := getUpstream(uid)
|
upstream, skip := cfgSnap.ConnectProxy.GetUpstream(uid, &cfgSnap.ProxyID.EnterpriseMeta)
|
||||||
if skip {
|
if skip {
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
|
@ -123,7 +115,7 @@ func (s *ResourceGenerator) clustersFromSnapshotConnectProxy(cfgSnap *proxycfg.C
|
||||||
// upstream in endpoints.go so that the sets of endpoints generated matches
|
// upstream in endpoints.go so that the sets of endpoints generated matches
|
||||||
// the sets of clusters.
|
// the sets of clusters.
|
||||||
for _, uid := range cfgSnap.ConnectProxy.PeeredUpstreamIDs() {
|
for _, uid := range cfgSnap.ConnectProxy.PeeredUpstreamIDs() {
|
||||||
upstream, skip := getUpstream(uid)
|
upstream, skip := cfgSnap.ConnectProxy.GetUpstream(uid, &cfgSnap.ProxyID.EnterpriseMeta)
|
||||||
if skip {
|
if skip {
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
|
|
|
@ -50,18 +50,10 @@ func (s *ResourceGenerator) endpointsFromSnapshotConnectProxy(cfgSnap *proxycfg.
|
||||||
cfgSnap.ConnectProxy.PeerUpstreamEndpoints.Len()+
|
cfgSnap.ConnectProxy.PeerUpstreamEndpoints.Len()+
|
||||||
len(cfgSnap.ConnectProxy.WatchedUpstreamEndpoints))
|
len(cfgSnap.ConnectProxy.WatchedUpstreamEndpoints))
|
||||||
|
|
||||||
getUpstream := func(uid proxycfg.UpstreamID) (*structs.Upstream, bool) {
|
|
||||||
upstream := cfgSnap.ConnectProxy.UpstreamConfig[uid]
|
|
||||||
|
|
||||||
explicit := upstream.HasLocalPortOrSocket()
|
|
||||||
implicit := cfgSnap.ConnectProxy.IsImplicitUpstream(uid)
|
|
||||||
return upstream, !implicit && !explicit
|
|
||||||
}
|
|
||||||
|
|
||||||
// NOTE: Any time we skip a chain below we MUST also skip that discovery chain in clusters.go
|
// NOTE: Any time we skip a chain below we MUST also skip that discovery chain in clusters.go
|
||||||
// so that the sets of endpoints generated matches the sets of clusters.
|
// so that the sets of endpoints generated matches the sets of clusters.
|
||||||
for uid, chain := range cfgSnap.ConnectProxy.DiscoveryChain {
|
for uid, chain := range cfgSnap.ConnectProxy.DiscoveryChain {
|
||||||
upstream, skip := getUpstream(uid)
|
upstream, skip := cfgSnap.ConnectProxy.GetUpstream(uid, &cfgSnap.ProxyID.EnterpriseMeta)
|
||||||
if skip {
|
if skip {
|
||||||
// Discovery chain is not associated with a known explicit or implicit upstream so it is skipped.
|
// Discovery chain is not associated with a known explicit or implicit upstream so it is skipped.
|
||||||
continue
|
continue
|
||||||
|
@ -92,7 +84,7 @@ func (s *ResourceGenerator) endpointsFromSnapshotConnectProxy(cfgSnap *proxycfg.
|
||||||
// upstream in clusters.go so that the sets of endpoints generated matches
|
// upstream in clusters.go so that the sets of endpoints generated matches
|
||||||
// the sets of clusters.
|
// the sets of clusters.
|
||||||
for _, uid := range cfgSnap.ConnectProxy.PeeredUpstreamIDs() {
|
for _, uid := range cfgSnap.ConnectProxy.PeeredUpstreamIDs() {
|
||||||
upstream, skip := getUpstream(uid)
|
upstream, skip := cfgSnap.ConnectProxy.GetUpstream(uid, &cfgSnap.ProxyID.EnterpriseMeta)
|
||||||
if skip {
|
if skip {
|
||||||
// Discovery chain is not associated with a known explicit or implicit upstream so it is skipped.
|
// Discovery chain is not associated with a known explicit or implicit upstream so it is skipped.
|
||||||
continue
|
continue
|
||||||
|
@ -555,6 +547,10 @@ func (s *ResourceGenerator) makeUpstreamLoadAssignmentForPeerService(
|
||||||
return la, err
|
return la, err
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if upstreamGatewayMode == structs.MeshGatewayModeNone {
|
||||||
|
s.Logger.Warn(fmt.Sprintf("invalid mesh gateway mode 'none', defaulting to 'remote' for %q", uid))
|
||||||
|
}
|
||||||
|
|
||||||
// If an upstream is configured with local mesh gw mode, we make a load assignment
|
// If an upstream is configured with local mesh gw mode, we make a load assignment
|
||||||
// from the gateway endpoints instead of those of the upstreams.
|
// from the gateway endpoints instead of those of the upstreams.
|
||||||
if upstreamGatewayMode == structs.MeshGatewayModeLocal {
|
if upstreamGatewayMode == structs.MeshGatewayModeLocal {
|
||||||
|
@ -651,7 +647,7 @@ func (s *ResourceGenerator) endpointsFromDiscoveryChain(
|
||||||
}
|
}
|
||||||
|
|
||||||
mgwMode := structs.MeshGatewayModeDefault
|
mgwMode := structs.MeshGatewayModeDefault
|
||||||
if upstream := cfgSnap.ConnectProxy.UpstreamConfig[uid]; upstream != nil {
|
if upstream, _ := cfgSnap.ConnectProxy.GetUpstream(uid, &cfgSnap.ProxyID.EnterpriseMeta); upstream != nil {
|
||||||
mgwMode = upstream.MeshGateway.Mode
|
mgwMode = upstream.MeshGateway.Mode
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -134,17 +134,8 @@ func (s *ResourceGenerator) listenersFromSnapshotConnectProxy(cfgSnap *proxycfg.
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
|
||||||
getUpstream := func(uid proxycfg.UpstreamID) (*structs.Upstream, bool) {
|
|
||||||
upstream := cfgSnap.ConnectProxy.UpstreamConfig[uid]
|
|
||||||
|
|
||||||
explicit := upstream.HasLocalPortOrSocket()
|
|
||||||
implicit := cfgSnap.ConnectProxy.IsImplicitUpstream(uid)
|
|
||||||
return upstream, !implicit && !explicit
|
|
||||||
}
|
|
||||||
|
|
||||||
for uid, chain := range cfgSnap.ConnectProxy.DiscoveryChain {
|
for uid, chain := range cfgSnap.ConnectProxy.DiscoveryChain {
|
||||||
upstreamCfg, skip := getUpstream(uid)
|
upstreamCfg, skip := cfgSnap.ConnectProxy.GetUpstream(uid, &cfgSnap.ProxyID.EnterpriseMeta)
|
||||||
|
|
||||||
if skip {
|
if skip {
|
||||||
// Discovery chain is not associated with a known explicit or implicit upstream so it is skipped.
|
// Discovery chain is not associated with a known explicit or implicit upstream so it is skipped.
|
||||||
continue
|
continue
|
||||||
|
@ -355,8 +346,7 @@ func (s *ResourceGenerator) listenersFromSnapshotConnectProxy(cfgSnap *proxycfg.
|
||||||
// Looping over explicit and implicit upstreams is only needed for cross-peer
|
// Looping over explicit and implicit upstreams is only needed for cross-peer
|
||||||
// because they do not have discovery chains.
|
// because they do not have discovery chains.
|
||||||
for _, uid := range cfgSnap.ConnectProxy.PeeredUpstreamIDs() {
|
for _, uid := range cfgSnap.ConnectProxy.PeeredUpstreamIDs() {
|
||||||
upstreamCfg, skip := getUpstream(uid)
|
upstreamCfg, skip := cfgSnap.ConnectProxy.GetUpstream(uid, &cfgSnap.ProxyID.EnterpriseMeta)
|
||||||
|
|
||||||
if skip {
|
if skip {
|
||||||
// Not associated with a known explicit or implicit upstream so it is skipped.
|
// Not associated with a known explicit or implicit upstream so it is skipped.
|
||||||
continue
|
continue
|
||||||
|
@ -2260,10 +2250,9 @@ func (s *ResourceGenerator) getAndModifyUpstreamConfigForPeeredListener(
|
||||||
s.Logger.Warn("failed to parse", "upstream", uid, "error", err)
|
s.Logger.Warn("failed to parse", "upstream", uid, "error", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
protocol := cfg.Protocol
|
// Ignore the configured protocol for peer upstreams, since it is defined by the remote
|
||||||
if protocol == "" {
|
// cluster, which we cannot control.
|
||||||
protocol = peerMeta.Protocol
|
protocol := peerMeta.Protocol
|
||||||
}
|
|
||||||
if protocol == "" {
|
if protocol == "" {
|
||||||
protocol = "tcp"
|
protocol = "tcp"
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue