Merge pull request #9976 from hashicorp/centralized-upstream-fixups
This commit is contained in:
commit
920ba3db39
|
@ -410,13 +410,29 @@ func (c *ConfigEntry) ResolveServiceConfig(args *structs.ServiceConfigRequest, r
|
||||||
upstreamIDs := args.UpstreamIDs
|
upstreamIDs := args.UpstreamIDs
|
||||||
legacyUpstreams := false
|
legacyUpstreams := false
|
||||||
|
|
||||||
// Before Consul namespaces were released, the Upstreams provided to the endpoint did not contain the namespace.
|
var (
|
||||||
// Because of this we attach the enterprise meta of the request, which will just be the default namespace.
|
noUpstreamArgs = len(upstreamIDs) == 0 && len(args.Upstreams) == 0
|
||||||
if len(upstreamIDs) == 0 {
|
|
||||||
|
// Check the args and the resolved value. If it was exclusively set via a config entry, then args.TransparentProxy
|
||||||
|
// will never be true because the service config request does not use the resolved value.
|
||||||
|
tproxy = args.TransparentProxy || reply.TransparentProxy
|
||||||
|
)
|
||||||
|
|
||||||
|
// The upstreams passed as arguments to this endpoint are the upstreams explicitly defined in a proxy registration.
|
||||||
|
// If no upstreams were passed, then we should only returned the resolved config if the proxy has TransparentProxy mode enabled.
|
||||||
|
// Otherwise we would return a resolved upstream config to a proxy with no configured upstreams.
|
||||||
|
if noUpstreamArgs && !tproxy {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// The request is considered legacy if the deprecated args.Upstream was used
|
||||||
|
if len(upstreamIDs) == 0 && len(args.Upstreams) > 0 {
|
||||||
legacyUpstreams = true
|
legacyUpstreams = true
|
||||||
|
|
||||||
upstreamIDs = make([]structs.ServiceID, 0)
|
upstreamIDs = make([]structs.ServiceID, 0)
|
||||||
for _, upstream := range args.Upstreams {
|
for _, upstream := range args.Upstreams {
|
||||||
|
// Before Consul namespaces were released, the Upstreams provided to the endpoint did not contain the namespace.
|
||||||
|
// Because of this we attach the enterprise meta of the request, which will just be the default namespace.
|
||||||
sid := structs.NewServiceID(upstream, &args.EnterpriseMeta)
|
sid := structs.NewServiceID(upstream, &args.EnterpriseMeta)
|
||||||
upstreamIDs = append(upstreamIDs, sid)
|
upstreamIDs = append(upstreamIDs, sid)
|
||||||
}
|
}
|
||||||
|
@ -436,6 +452,9 @@ func (c *ConfigEntry) ResolveServiceConfig(args *structs.ServiceConfigRequest, r
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// usConfigs stores the opaque config map for each upstream and is keyed on the upstream's ID.
|
||||||
|
usConfigs := make(map[structs.ServiceID]map[string]interface{})
|
||||||
|
|
||||||
var (
|
var (
|
||||||
upstreamDefaults *structs.UpstreamConfig
|
upstreamDefaults *structs.UpstreamConfig
|
||||||
upstreamConfigs map[string]*structs.UpstreamConfig
|
upstreamConfigs map[string]*structs.UpstreamConfig
|
||||||
|
@ -443,15 +462,20 @@ func (c *ConfigEntry) ResolveServiceConfig(args *structs.ServiceConfigRequest, r
|
||||||
if serviceConf != nil && serviceConf.Connect != nil {
|
if serviceConf != nil && serviceConf.Connect != nil {
|
||||||
if serviceConf.Connect.UpstreamDefaults != nil {
|
if serviceConf.Connect.UpstreamDefaults != nil {
|
||||||
upstreamDefaults = serviceConf.Connect.UpstreamDefaults
|
upstreamDefaults = serviceConf.Connect.UpstreamDefaults
|
||||||
|
|
||||||
|
// Store the upstream defaults under a wildcard key so that they can be applied to
|
||||||
|
// upstreams that are inferred from intentions and do not have explicit upstream configuration.
|
||||||
|
cfgMap := make(map[string]interface{})
|
||||||
|
upstreamDefaults.MergeInto(cfgMap)
|
||||||
|
|
||||||
|
wildcard := structs.NewServiceID(structs.WildcardSpecifier, structs.WildcardEnterpriseMeta())
|
||||||
|
usConfigs[wildcard] = cfgMap
|
||||||
}
|
}
|
||||||
if serviceConf.Connect.UpstreamConfigs != nil {
|
if serviceConf.Connect.UpstreamConfigs != nil {
|
||||||
upstreamConfigs = serviceConf.Connect.UpstreamConfigs
|
upstreamConfigs = serviceConf.Connect.UpstreamConfigs
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// usConfigs stores the opaque config map for each upstream and is keyed on the upstream's ID.
|
|
||||||
usConfigs := make(map[structs.ServiceID]map[string]interface{})
|
|
||||||
|
|
||||||
for upstream := range seenUpstreams {
|
for upstream := range seenUpstreams {
|
||||||
resolvedCfg := make(map[string]interface{})
|
resolvedCfg := make(map[string]interface{})
|
||||||
|
|
||||||
|
|
|
@ -1000,6 +1000,7 @@ func TestConfigEntry_ResolveServiceConfig_Upstreams(t *testing.T) {
|
||||||
|
|
||||||
mysql := structs.NewServiceID("mysql", structs.DefaultEnterpriseMeta())
|
mysql := structs.NewServiceID("mysql", structs.DefaultEnterpriseMeta())
|
||||||
cache := structs.NewServiceID("cache", structs.DefaultEnterpriseMeta())
|
cache := structs.NewServiceID("cache", structs.DefaultEnterpriseMeta())
|
||||||
|
wildcard := structs.NewServiceID(structs.WildcardSpecifier, structs.WildcardEnterpriseMeta())
|
||||||
|
|
||||||
tt := []struct {
|
tt := []struct {
|
||||||
name string
|
name string
|
||||||
|
@ -1126,6 +1127,14 @@ func TestConfigEntry_ResolveServiceConfig_Upstreams(t *testing.T) {
|
||||||
},
|
},
|
||||||
expect: structs.ServiceConfigResponse{
|
expect: structs.ServiceConfigResponse{
|
||||||
UpstreamIDConfigs: structs.OpaqueUpstreamConfigs{
|
UpstreamIDConfigs: structs.OpaqueUpstreamConfigs{
|
||||||
|
{
|
||||||
|
Upstream: wildcard,
|
||||||
|
Config: map[string]interface{}{
|
||||||
|
"mesh_gateway": map[string]interface{}{
|
||||||
|
"Mode": "remote",
|
||||||
|
},
|
||||||
|
},
|
||||||
|
},
|
||||||
{
|
{
|
||||||
Upstream: mysql,
|
Upstream: mysql,
|
||||||
Config: map[string]interface{}{
|
Config: map[string]interface{}{
|
||||||
|
@ -1188,6 +1197,19 @@ func TestConfigEntry_ResolveServiceConfig_Upstreams(t *testing.T) {
|
||||||
"protocol": "udp",
|
"protocol": "udp",
|
||||||
},
|
},
|
||||||
UpstreamIDConfigs: structs.OpaqueUpstreamConfigs{
|
UpstreamIDConfigs: structs.OpaqueUpstreamConfigs{
|
||||||
|
{
|
||||||
|
Upstream: wildcard,
|
||||||
|
Config: map[string]interface{}{
|
||||||
|
"passive_health_check": map[string]interface{}{
|
||||||
|
"Interval": int64(10),
|
||||||
|
"MaxFailures": int64(2),
|
||||||
|
},
|
||||||
|
"mesh_gateway": map[string]interface{}{
|
||||||
|
"Mode": "remote",
|
||||||
|
},
|
||||||
|
"protocol": "http",
|
||||||
|
},
|
||||||
|
},
|
||||||
{
|
{
|
||||||
Upstream: mysql,
|
Upstream: mysql,
|
||||||
Config: map[string]interface{}{
|
Config: map[string]interface{}{
|
||||||
|
@ -1204,6 +1226,130 @@ func TestConfigEntry_ResolveServiceConfig_Upstreams(t *testing.T) {
|
||||||
},
|
},
|
||||||
},
|
},
|
||||||
},
|
},
|
||||||
|
{
|
||||||
|
name: "without upstream args we should return centralized config with tproxy arg",
|
||||||
|
entries: []structs.ConfigEntry{
|
||||||
|
&structs.ServiceConfigEntry{
|
||||||
|
Kind: structs.ServiceDefaults,
|
||||||
|
Name: "api",
|
||||||
|
Connect: &structs.ConnectConfiguration{
|
||||||
|
UpstreamDefaults: &structs.UpstreamConfig{
|
||||||
|
MeshGateway: structs.MeshGatewayConfig{Mode: structs.MeshGatewayModeRemote},
|
||||||
|
},
|
||||||
|
UpstreamConfigs: map[string]*structs.UpstreamConfig{
|
||||||
|
mysql.String(): {
|
||||||
|
Protocol: "grpc",
|
||||||
|
},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
request: structs.ServiceConfigRequest{
|
||||||
|
Name: "api",
|
||||||
|
Datacenter: "dc1",
|
||||||
|
TransparentProxy: true,
|
||||||
|
|
||||||
|
// Empty Upstreams/UpstreamIDs
|
||||||
|
},
|
||||||
|
expect: structs.ServiceConfigResponse{
|
||||||
|
UpstreamIDConfigs: structs.OpaqueUpstreamConfigs{
|
||||||
|
{
|
||||||
|
Upstream: wildcard,
|
||||||
|
Config: map[string]interface{}{
|
||||||
|
"mesh_gateway": map[string]interface{}{
|
||||||
|
"Mode": "remote",
|
||||||
|
},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
{
|
||||||
|
Upstream: mysql,
|
||||||
|
Config: map[string]interface{}{
|
||||||
|
"protocol": "grpc",
|
||||||
|
"mesh_gateway": map[string]interface{}{
|
||||||
|
"Mode": "remote",
|
||||||
|
},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
{
|
||||||
|
name: "without upstream args we should return centralized config with tproxy default",
|
||||||
|
entries: []structs.ConfigEntry{
|
||||||
|
&structs.ServiceConfigEntry{
|
||||||
|
Kind: structs.ServiceDefaults,
|
||||||
|
Name: "api",
|
||||||
|
Connect: &structs.ConnectConfiguration{
|
||||||
|
UpstreamDefaults: &structs.UpstreamConfig{
|
||||||
|
MeshGateway: structs.MeshGatewayConfig{Mode: structs.MeshGatewayModeRemote},
|
||||||
|
},
|
||||||
|
UpstreamConfigs: map[string]*structs.UpstreamConfig{
|
||||||
|
mysql.String(): {
|
||||||
|
Protocol: "grpc",
|
||||||
|
},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
|
||||||
|
// TransparentProxy on the config entry but not the config request
|
||||||
|
TransparentProxy: true,
|
||||||
|
},
|
||||||
|
},
|
||||||
|
request: structs.ServiceConfigRequest{
|
||||||
|
Name: "api",
|
||||||
|
Datacenter: "dc1",
|
||||||
|
|
||||||
|
// Empty Upstreams/UpstreamIDs
|
||||||
|
},
|
||||||
|
expect: structs.ServiceConfigResponse{
|
||||||
|
TransparentProxy: true,
|
||||||
|
UpstreamIDConfigs: structs.OpaqueUpstreamConfigs{
|
||||||
|
{
|
||||||
|
Upstream: wildcard,
|
||||||
|
Config: map[string]interface{}{
|
||||||
|
"mesh_gateway": map[string]interface{}{
|
||||||
|
"Mode": "remote",
|
||||||
|
},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
{
|
||||||
|
Upstream: mysql,
|
||||||
|
Config: map[string]interface{}{
|
||||||
|
"protocol": "grpc",
|
||||||
|
"mesh_gateway": map[string]interface{}{
|
||||||
|
"Mode": "remote",
|
||||||
|
},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
{
|
||||||
|
name: "without upstream args we should NOT return centralized config outside tproxy mode",
|
||||||
|
entries: []structs.ConfigEntry{
|
||||||
|
&structs.ServiceConfigEntry{
|
||||||
|
Kind: structs.ServiceDefaults,
|
||||||
|
Name: "api",
|
||||||
|
Connect: &structs.ConnectConfiguration{
|
||||||
|
UpstreamDefaults: &structs.UpstreamConfig{
|
||||||
|
MeshGateway: structs.MeshGatewayConfig{Mode: structs.MeshGatewayModeRemote},
|
||||||
|
},
|
||||||
|
UpstreamConfigs: map[string]*structs.UpstreamConfig{
|
||||||
|
mysql.String(): {
|
||||||
|
Protocol: "grpc",
|
||||||
|
},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
request: structs.ServiceConfigRequest{
|
||||||
|
Name: "api",
|
||||||
|
Datacenter: "dc1",
|
||||||
|
TransparentProxy: false,
|
||||||
|
|
||||||
|
// Empty Upstreams/UpstreamIDs
|
||||||
|
},
|
||||||
|
expect: structs.ServiceConfigResponse{},
|
||||||
|
},
|
||||||
}
|
}
|
||||||
|
|
||||||
for _, tc := range tt {
|
for _, tc := range tt {
|
||||||
|
|
|
@ -333,6 +333,12 @@ func (s *state) initWatchesConnectProxy(snap *ConfigSnapshot) error {
|
||||||
for i := range s.proxyCfg.Upstreams {
|
for i := range s.proxyCfg.Upstreams {
|
||||||
u := s.proxyCfg.Upstreams[i]
|
u := s.proxyCfg.Upstreams[i]
|
||||||
|
|
||||||
|
// Store defaults keyed under wildcard so they can be applied to centrally configured upstreams
|
||||||
|
if u.DestinationName == structs.WildcardSpecifier {
|
||||||
|
snap.ConnectProxy.UpstreamConfig[u.DestinationID().String()] = &u
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
|
||||||
// This can be true if the upstream is a synthetic entry populated from centralized upstream config.
|
// This can be true if the upstream is a synthetic entry populated from centralized upstream config.
|
||||||
// Watches should not be created for them.
|
// Watches should not be created for them.
|
||||||
if u.CentrallyConfigured {
|
if u.CentrallyConfigured {
|
||||||
|
@ -795,6 +801,17 @@ func (s *state) handleUpdateConnectProxy(u cache.UpdateEvent, snap *ConfigSnapsh
|
||||||
u, ok := snap.ConnectProxy.UpstreamConfig[svc.String()]
|
u, ok := snap.ConnectProxy.UpstreamConfig[svc.String()]
|
||||||
if ok {
|
if ok {
|
||||||
cfgMap = u.Config
|
cfgMap = u.Config
|
||||||
|
} else {
|
||||||
|
// Use the centralized upstream defaults if they exist and there isn't specific configuration for this upstream
|
||||||
|
// This is only relevant to upstreams from intentions because for explicit upstreams the defaulting is handled
|
||||||
|
// by the ResolveServiceConfig endpoint.
|
||||||
|
wildcardSID := structs.NewServiceID(structs.WildcardSpecifier, structs.WildcardEnterpriseMeta())
|
||||||
|
defaults, ok := snap.ConnectProxy.UpstreamConfig[wildcardSID.String()]
|
||||||
|
if ok {
|
||||||
|
u = defaults
|
||||||
|
cfgMap = defaults.Config
|
||||||
|
snap.ConnectProxy.UpstreamConfig[svc.String()] = defaults
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
cfg, err := parseReducedUpstreamConfig(cfgMap)
|
cfg, err := parseReducedUpstreamConfig(cfgMap)
|
||||||
|
@ -808,7 +825,18 @@ func (s *state) handleUpdateConnectProxy(u cache.UpdateEvent, snap *ConfigSnapsh
|
||||||
)
|
)
|
||||||
}
|
}
|
||||||
|
|
||||||
err = s.watchDiscoveryChain(snap, cfg, svc.String(), svc.Name, svc.NamespaceOrDefault())
|
meshGateway := s.proxyCfg.MeshGateway
|
||||||
|
if u != nil {
|
||||||
|
meshGateway = meshGateway.OverlayWith(u.MeshGateway)
|
||||||
|
}
|
||||||
|
watchOpts := discoveryChainWatchOpts{
|
||||||
|
id: svc.String(),
|
||||||
|
name: svc.Name,
|
||||||
|
namespace: svc.NamespaceOrDefault(),
|
||||||
|
cfg: cfg,
|
||||||
|
meshGateway: meshGateway,
|
||||||
|
}
|
||||||
|
err = s.watchDiscoveryChain(snap, watchOpts)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return fmt.Errorf("failed to watch discovery chain for %s: %v", svc.String(), err)
|
return fmt.Errorf("failed to watch discovery chain for %s: %v", svc.String(), err)
|
||||||
}
|
}
|
||||||
|
@ -1592,7 +1620,12 @@ func (s *state) handleUpdateIngressGateway(u cache.UpdateEvent, snap *ConfigSnap
|
||||||
for _, service := range services.Services {
|
for _, service := range services.Services {
|
||||||
u := makeUpstream(service)
|
u := makeUpstream(service)
|
||||||
|
|
||||||
err := s.watchDiscoveryChain(snap, reducedUpstreamConfig{}, u.Identifier(), u.DestinationName, u.DestinationNamespace)
|
watchOpts := discoveryChainWatchOpts{
|
||||||
|
id: u.Identifier(),
|
||||||
|
name: u.DestinationName,
|
||||||
|
namespace: u.DestinationNamespace,
|
||||||
|
}
|
||||||
|
err := s.watchDiscoveryChain(snap, watchOpts)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return fmt.Errorf("failed to watch discovery chain for %s: %v", u.Identifier(), err)
|
return fmt.Errorf("failed to watch discovery chain for %s: %v", u.Identifier(), err)
|
||||||
}
|
}
|
||||||
|
@ -1642,8 +1675,16 @@ func makeUpstream(g *structs.GatewayService) structs.Upstream {
|
||||||
return upstream
|
return upstream
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s *state) watchDiscoveryChain(snap *ConfigSnapshot, cfg reducedUpstreamConfig, id, name, namespace string) error {
|
type discoveryChainWatchOpts struct {
|
||||||
if _, ok := snap.ConnectProxy.WatchedDiscoveryChains[id]; ok {
|
id string
|
||||||
|
name string
|
||||||
|
namespace string
|
||||||
|
cfg reducedUpstreamConfig
|
||||||
|
meshGateway structs.MeshGatewayConfig
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *state) watchDiscoveryChain(snap *ConfigSnapshot, opts discoveryChainWatchOpts) error {
|
||||||
|
if _, ok := snap.ConnectProxy.WatchedDiscoveryChains[opts.id]; ok {
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -1651,12 +1692,13 @@ func (s *state) watchDiscoveryChain(snap *ConfigSnapshot, cfg reducedUpstreamCon
|
||||||
err := s.cache.Notify(ctx, cachetype.CompiledDiscoveryChainName, &structs.DiscoveryChainRequest{
|
err := s.cache.Notify(ctx, cachetype.CompiledDiscoveryChainName, &structs.DiscoveryChainRequest{
|
||||||
Datacenter: s.source.Datacenter,
|
Datacenter: s.source.Datacenter,
|
||||||
QueryOptions: structs.QueryOptions{Token: s.token},
|
QueryOptions: structs.QueryOptions{Token: s.token},
|
||||||
Name: name,
|
Name: opts.name,
|
||||||
EvaluateInDatacenter: s.source.Datacenter,
|
EvaluateInDatacenter: s.source.Datacenter,
|
||||||
EvaluateInNamespace: namespace,
|
EvaluateInNamespace: opts.namespace,
|
||||||
OverrideProtocol: cfg.Protocol,
|
OverrideProtocol: opts.cfg.Protocol,
|
||||||
OverrideConnectTimeout: cfg.ConnectTimeout(),
|
OverrideConnectTimeout: opts.cfg.ConnectTimeout(),
|
||||||
}, "discovery-chain:"+id, s.ch)
|
OverrideMeshGateway: opts.meshGateway,
|
||||||
|
}, "discovery-chain:"+opts.id, s.ch)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
cancel()
|
cancel()
|
||||||
return err
|
return err
|
||||||
|
@ -1664,9 +1706,9 @@ func (s *state) watchDiscoveryChain(snap *ConfigSnapshot, cfg reducedUpstreamCon
|
||||||
|
|
||||||
switch s.kind {
|
switch s.kind {
|
||||||
case structs.ServiceKindIngressGateway:
|
case structs.ServiceKindIngressGateway:
|
||||||
snap.IngressGateway.WatchedDiscoveryChains[id] = cancel
|
snap.IngressGateway.WatchedDiscoveryChains[opts.id] = cancel
|
||||||
case structs.ServiceKindConnectProxy:
|
case structs.ServiceKindConnectProxy:
|
||||||
snap.ConnectProxy.WatchedDiscoveryChains[id] = cancel
|
snap.ConnectProxy.WatchedDiscoveryChains[opts.id] = cancel
|
||||||
default:
|
default:
|
||||||
cancel()
|
cancel()
|
||||||
return fmt.Errorf("unsupported kind %s", s.kind)
|
return fmt.Errorf("unsupported kind %s", s.kind)
|
||||||
|
|
|
@ -5,6 +5,7 @@ import (
|
||||||
"fmt"
|
"fmt"
|
||||||
"sync"
|
"sync"
|
||||||
"testing"
|
"testing"
|
||||||
|
"time"
|
||||||
|
|
||||||
"github.com/stretchr/testify/require"
|
"github.com/stretchr/testify/require"
|
||||||
|
|
||||||
|
@ -1606,6 +1607,17 @@ func TestState_WatchesAndUpdates(t *testing.T) {
|
||||||
Proxy: structs.ConnectProxyConfig{
|
Proxy: structs.ConnectProxyConfig{
|
||||||
DestinationServiceName: "api",
|
DestinationServiceName: "api",
|
||||||
TransparentProxy: true,
|
TransparentProxy: true,
|
||||||
|
Upstreams: structs.Upstreams{
|
||||||
|
{
|
||||||
|
CentrallyConfigured: true,
|
||||||
|
DestinationName: structs.WildcardSpecifier,
|
||||||
|
DestinationNamespace: structs.WildcardSpecifier,
|
||||||
|
Config: map[string]interface{}{
|
||||||
|
"connect_timeout_ms": 6000,
|
||||||
|
},
|
||||||
|
MeshGateway: structs.MeshGatewayConfig{Mode: structs.MeshGatewayModeRemote},
|
||||||
|
},
|
||||||
|
},
|
||||||
},
|
},
|
||||||
},
|
},
|
||||||
sourceDC: "dc1",
|
sourceDC: "dc1",
|
||||||
|
@ -1622,10 +1634,13 @@ func TestState_WatchesAndUpdates(t *testing.T) {
|
||||||
},
|
},
|
||||||
verifySnapshot: func(t testing.TB, snap *ConfigSnapshot) {
|
verifySnapshot: func(t testing.TB, snap *ConfigSnapshot) {
|
||||||
require.False(t, snap.Valid(), "proxy without roots/leaf/intentions is not valid")
|
require.False(t, snap.Valid(), "proxy without roots/leaf/intentions is not valid")
|
||||||
require.True(t, snap.ConnectProxy.IsEmpty())
|
|
||||||
require.True(t, snap.MeshGateway.IsEmpty())
|
require.True(t, snap.MeshGateway.IsEmpty())
|
||||||
require.True(t, snap.IngressGateway.IsEmpty())
|
require.True(t, snap.IngressGateway.IsEmpty())
|
||||||
require.True(t, snap.TerminatingGateway.IsEmpty())
|
require.True(t, snap.TerminatingGateway.IsEmpty())
|
||||||
|
|
||||||
|
// Centrally configured upstream defaults should be stored so that upstreams from intentions can inherit them
|
||||||
|
require.Len(t, snap.ConnectProxy.UpstreamConfig, 1)
|
||||||
|
require.Contains(t, snap.ConnectProxy.UpstreamConfig, "*")
|
||||||
},
|
},
|
||||||
},
|
},
|
||||||
// Valid snapshot after roots, leaf, and intentions
|
// Valid snapshot after roots, leaf, and intentions
|
||||||
|
@ -1694,6 +1709,13 @@ func TestState_WatchesAndUpdates(t *testing.T) {
|
||||||
|
|
||||||
// Should not have results yet
|
// Should not have results yet
|
||||||
require.Empty(t, snap.ConnectProxy.DiscoveryChain)
|
require.Empty(t, snap.ConnectProxy.DiscoveryChain)
|
||||||
|
|
||||||
|
require.Len(t, snap.ConnectProxy.UpstreamConfig, 2)
|
||||||
|
cfg, ok := snap.ConnectProxy.UpstreamConfig[db.String()]
|
||||||
|
require.True(t, ok)
|
||||||
|
|
||||||
|
// Upstream config should have been inherited from defaults under wildcard key
|
||||||
|
require.Equal(t, cfg.Config["connect_timeout_ms"], 6000)
|
||||||
},
|
},
|
||||||
},
|
},
|
||||||
// Discovery chain updates should be stored
|
// Discovery chain updates should be stored
|
||||||
|
@ -1704,6 +1726,8 @@ func TestState_WatchesAndUpdates(t *testing.T) {
|
||||||
EvaluateInDatacenter: "dc1",
|
EvaluateInDatacenter: "dc1",
|
||||||
EvaluateInNamespace: "default",
|
EvaluateInNamespace: "default",
|
||||||
Datacenter: "dc1",
|
Datacenter: "dc1",
|
||||||
|
OverrideConnectTimeout: 6 * time.Second,
|
||||||
|
OverrideMeshGateway: structs.MeshGatewayConfig{Mode: structs.MeshGatewayModeRemote},
|
||||||
}),
|
}),
|
||||||
},
|
},
|
||||||
events: []cache.UpdateEvent{
|
events: []cache.UpdateEvent{
|
||||||
|
|
|
@ -250,7 +250,7 @@ func (w *serviceConfigWatch) runWatch(ctx context.Context, wg *sync.WaitGroup, u
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// handleUpdate receives an update event the global config defaults, updates
|
// handleUpdate receives an update event from the global config defaults, updates
|
||||||
// the local state and re-registers the service with the newly merged config.
|
// the local state and re-registers the service with the newly merged config.
|
||||||
//
|
//
|
||||||
// NOTE: the caller must NOT hold the Agent.stateLock!
|
// NOTE: the caller must NOT hold the Agent.stateLock!
|
||||||
|
@ -339,6 +339,7 @@ func makeConfigRequest(bd BaseDeps, addReq AddServiceRequest) *structs.ServiceCo
|
||||||
Datacenter: bd.RuntimeConfig.Datacenter,
|
Datacenter: bd.RuntimeConfig.Datacenter,
|
||||||
QueryOptions: structs.QueryOptions{Token: addReq.token},
|
QueryOptions: structs.QueryOptions{Token: addReq.token},
|
||||||
MeshGateway: ns.Proxy.MeshGateway,
|
MeshGateway: ns.Proxy.MeshGateway,
|
||||||
|
TransparentProxy: ns.Proxy.TransparentProxy,
|
||||||
UpstreamIDs: upstreams,
|
UpstreamIDs: upstreams,
|
||||||
EnterpriseMeta: ns.EnterpriseMeta,
|
EnterpriseMeta: ns.EnterpriseMeta,
|
||||||
}
|
}
|
||||||
|
@ -436,8 +437,8 @@ func mergeServiceConfig(defaults *structs.ServiceConfigResponse, service *struct
|
||||||
}
|
}
|
||||||
|
|
||||||
// Ensure upstreams present in central config are represented in the local configuration.
|
// Ensure upstreams present in central config are represented in the local configuration.
|
||||||
// This does not apply outside of TransparentProxy mode because in that situation every upstream needs to be defined
|
// This does not apply outside of TransparentProxy mode because in that situation every possible upstream already exists
|
||||||
// explicitly and locally with a local bind port.
|
// inside of ns.Proxy.Upstreams.
|
||||||
if ns.Proxy.TransparentProxy {
|
if ns.Proxy.TransparentProxy {
|
||||||
for id, remote := range remoteUpstreams {
|
for id, remote := range remoteUpstreams {
|
||||||
if _, ok := localUpstreams[id]; ok {
|
if _, ok := localUpstreams[id]; ok {
|
||||||
|
|
|
@ -194,7 +194,9 @@ func (cfg *ConnectConfiguration) Normalize() {
|
||||||
v.Normalize()
|
v.Normalize()
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if cfg.UpstreamDefaults != nil {
|
||||||
cfg.UpstreamDefaults.Normalize()
|
cfg.UpstreamDefaults.Normalize()
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func (cfg ConnectConfiguration) Validate() error {
|
func (cfg ConnectConfiguration) Validate() error {
|
||||||
|
@ -206,9 +208,12 @@ func (cfg ConnectConfiguration) Validate() error {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
if err := cfg.UpstreamDefaults.Validate(); err != nil {
|
if cfg.UpstreamDefaults != nil {
|
||||||
|
err := cfg.UpstreamDefaults.Validate()
|
||||||
|
if err != nil {
|
||||||
validationErr = multierror.Append(validationErr, fmt.Errorf("error in upstream defaults %v", err))
|
validationErr = multierror.Append(validationErr, fmt.Errorf("error in upstream defaults %v", err))
|
||||||
}
|
}
|
||||||
|
}
|
||||||
|
|
||||||
return validationErr
|
return validationErr
|
||||||
}
|
}
|
||||||
|
@ -590,6 +595,9 @@ type ServiceConfigRequest struct {
|
||||||
// MeshGateway contains the mesh gateway configuration from the requesting proxy's registration
|
// MeshGateway contains the mesh gateway configuration from the requesting proxy's registration
|
||||||
MeshGateway MeshGatewayConfig
|
MeshGateway MeshGatewayConfig
|
||||||
|
|
||||||
|
// TransparentProxy indicates whether the requesting proxy is in transparent proxy mode
|
||||||
|
TransparentProxy bool
|
||||||
|
|
||||||
UpstreamIDs []ServiceID
|
UpstreamIDs []ServiceID
|
||||||
|
|
||||||
// DEPRECATED
|
// DEPRECATED
|
||||||
|
|
|
@ -333,6 +333,9 @@ func (u *Upstream) Validate() error {
|
||||||
if u.DestinationName == "" {
|
if u.DestinationName == "" {
|
||||||
return fmt.Errorf("upstream destination name cannot be empty")
|
return fmt.Errorf("upstream destination name cannot be empty")
|
||||||
}
|
}
|
||||||
|
if u.DestinationName == WildcardSpecifier && !u.CentrallyConfigured {
|
||||||
|
return fmt.Errorf("upstream destination name cannot be a wildcard")
|
||||||
|
}
|
||||||
|
|
||||||
if u.LocalBindPort == 0 && !u.CentrallyConfigured {
|
if u.LocalBindPort == 0 && !u.CentrallyConfigured {
|
||||||
return fmt.Errorf("upstream local bind port cannot be zero")
|
return fmt.Errorf("upstream local bind port cannot be zero")
|
||||||
|
|
|
@ -1153,6 +1153,11 @@ func (s *NodeService) Validate() error {
|
||||||
"Proxy.DestinationServiceName must be non-empty for Connect proxy "+
|
"Proxy.DestinationServiceName must be non-empty for Connect proxy "+
|
||||||
"services"))
|
"services"))
|
||||||
}
|
}
|
||||||
|
if s.Proxy.DestinationServiceName == WildcardSpecifier {
|
||||||
|
result = multierror.Append(result, fmt.Errorf(
|
||||||
|
"Proxy.DestinationServiceName must not be a wildcard for Connect proxy "+
|
||||||
|
"services"))
|
||||||
|
}
|
||||||
|
|
||||||
if s.Port == 0 {
|
if s.Port == 0 {
|
||||||
result = multierror.Append(result, fmt.Errorf(
|
result = multierror.Append(result, fmt.Errorf(
|
||||||
|
@ -1189,7 +1194,9 @@ func (s *NodeService) Validate() error {
|
||||||
}
|
}
|
||||||
addr = net.JoinHostPort(addr, fmt.Sprintf("%d", u.LocalBindPort))
|
addr = net.JoinHostPort(addr, fmt.Sprintf("%d", u.LocalBindPort))
|
||||||
|
|
||||||
if _, ok := bindAddrs[addr]; ok {
|
// Centrally configured upstreams will fail this check if there are multiple because they do not have an address/port.
|
||||||
|
// Only consider non-centrally configured upstreams in this check since those are the ones we create listeners for.
|
||||||
|
if _, ok := bindAddrs[addr]; ok && !u.CentrallyConfigured {
|
||||||
result = multierror.Append(result, fmt.Errorf(
|
result = multierror.Append(result, fmt.Errorf(
|
||||||
"upstreams cannot contain duplicates by local bind address and port; %q is specified twice", addr))
|
"upstreams cannot contain duplicates by local bind address and port; %q is specified twice", addr))
|
||||||
continue
|
continue
|
||||||
|
|
|
@ -648,6 +648,12 @@ func TestStructs_NodeService_ValidateConnectProxy(t *testing.T) {
|
||||||
"Proxy.DestinationServiceName must be",
|
"Proxy.DestinationServiceName must be",
|
||||||
},
|
},
|
||||||
|
|
||||||
|
{
|
||||||
|
"connect-proxy: wildcard Proxy.DestinationServiceName",
|
||||||
|
func(x *NodeService) { x.Proxy.DestinationServiceName = "*" },
|
||||||
|
"Proxy.DestinationServiceName must not be",
|
||||||
|
},
|
||||||
|
|
||||||
{
|
{
|
||||||
"connect-proxy: valid Proxy.DestinationServiceName",
|
"connect-proxy: valid Proxy.DestinationServiceName",
|
||||||
func(x *NodeService) { x.Proxy.DestinationServiceName = "hello" },
|
func(x *NodeService) { x.Proxy.DestinationServiceName = "hello" },
|
||||||
|
@ -697,6 +703,28 @@ func TestStructs_NodeService_ValidateConnectProxy(t *testing.T) {
|
||||||
},
|
},
|
||||||
"upstream destination name cannot be empty",
|
"upstream destination name cannot be empty",
|
||||||
},
|
},
|
||||||
|
{
|
||||||
|
"connect-proxy: upstream wildcard name",
|
||||||
|
func(x *NodeService) {
|
||||||
|
x.Proxy.Upstreams = Upstreams{{
|
||||||
|
DestinationType: UpstreamDestTypeService,
|
||||||
|
DestinationName: WildcardSpecifier,
|
||||||
|
LocalBindPort: 5000,
|
||||||
|
}}
|
||||||
|
},
|
||||||
|
"upstream destination name cannot be a wildcard",
|
||||||
|
},
|
||||||
|
{
|
||||||
|
"connect-proxy: upstream can have wildcard name when centrally configured",
|
||||||
|
func(x *NodeService) {
|
||||||
|
x.Proxy.Upstreams = Upstreams{{
|
||||||
|
DestinationType: UpstreamDestTypeService,
|
||||||
|
DestinationName: WildcardSpecifier,
|
||||||
|
CentrallyConfigured: true,
|
||||||
|
}}
|
||||||
|
},
|
||||||
|
"",
|
||||||
|
},
|
||||||
{
|
{
|
||||||
"connect-proxy: upstream empty bind port",
|
"connect-proxy: upstream empty bind port",
|
||||||
func(x *NodeService) {
|
func(x *NodeService) {
|
||||||
|
@ -768,6 +796,24 @@ func TestStructs_NodeService_ValidateConnectProxy(t *testing.T) {
|
||||||
},
|
},
|
||||||
"upstreams cannot contain duplicates",
|
"upstreams cannot contain duplicates",
|
||||||
},
|
},
|
||||||
|
{
|
||||||
|
"connect-proxy: Centrally configured upstreams can have duplicate ip/port",
|
||||||
|
func(x *NodeService) {
|
||||||
|
x.Proxy.Upstreams = Upstreams{
|
||||||
|
{
|
||||||
|
DestinationType: UpstreamDestTypeService,
|
||||||
|
DestinationName: "foo",
|
||||||
|
CentrallyConfigured: true,
|
||||||
|
},
|
||||||
|
{
|
||||||
|
DestinationType: UpstreamDestTypeService,
|
||||||
|
DestinationName: "bar",
|
||||||
|
CentrallyConfigured: true,
|
||||||
|
},
|
||||||
|
}
|
||||||
|
},
|
||||||
|
"",
|
||||||
|
},
|
||||||
{
|
{
|
||||||
"connect-proxy: Upstreams duplicated by ip and port",
|
"connect-proxy: Upstreams duplicated by ip and port",
|
||||||
func(x *NodeService) {
|
func(x *NodeService) {
|
||||||
|
|
|
@ -98,7 +98,7 @@ type ConnectConfiguration struct {
|
||||||
UpstreamConfigs map[string]UpstreamConfig `json:",omitempty" alias:"upstream_configs"`
|
UpstreamConfigs map[string]UpstreamConfig `json:",omitempty" alias:"upstream_configs"`
|
||||||
|
|
||||||
// UpstreamDefaults contains default configuration for all upstreams of a given service
|
// UpstreamDefaults contains default configuration for all upstreams of a given service
|
||||||
UpstreamDefaults UpstreamConfig `json:",omitempty" alias:"upstream_defaults"`
|
UpstreamDefaults *UpstreamConfig `json:",omitempty" alias:"upstream_defaults"`
|
||||||
}
|
}
|
||||||
|
|
||||||
type UpstreamConfig struct {
|
type UpstreamConfig struct {
|
||||||
|
|
|
@ -393,7 +393,7 @@ func TestDecodeConfigEntry(t *testing.T) {
|
||||||
MeshGateway: MeshGatewayConfig{Mode: "remote"},
|
MeshGateway: MeshGatewayConfig{Mode: "remote"},
|
||||||
},
|
},
|
||||||
},
|
},
|
||||||
UpstreamDefaults: UpstreamConfig{
|
UpstreamDefaults: &UpstreamConfig{
|
||||||
EnvoyClusterJSON: "zip",
|
EnvoyClusterJSON: "zip",
|
||||||
EnvoyListenerJSON: "zop",
|
EnvoyListenerJSON: "zop",
|
||||||
Protocol: "http",
|
Protocol: "http",
|
||||||
|
|
|
@ -638,7 +638,7 @@ func TestParseConfigEntry(t *testing.T) {
|
||||||
},
|
},
|
||||||
},
|
},
|
||||||
},
|
},
|
||||||
UpstreamDefaults: api.UpstreamConfig{
|
UpstreamDefaults: &api.UpstreamConfig{
|
||||||
EnvoyClusterJSON: "zip",
|
EnvoyClusterJSON: "zip",
|
||||||
EnvoyListenerJSON: "zop",
|
EnvoyListenerJSON: "zop",
|
||||||
Protocol: "http",
|
Protocol: "http",
|
||||||
|
|
Loading…
Reference in New Issue