finish moving UpstreamConfig and related fields to structs pkg

This commit is contained in:
freddygv 2021-03-10 21:04:13 -07:00
parent 4bbd495b54
commit 68148a1dae
7 changed files with 406 additions and 181 deletions

View File

@ -136,9 +136,12 @@ func (e *ServiceConfigEntry) Normalize() error {
func (e *ServiceConfigEntry) Validate() error { func (e *ServiceConfigEntry) Validate() error {
validationErr := validateConfigEntryMeta(e.Meta) validationErr := validateConfigEntryMeta(e.Meta)
if err := e.Connect.Validate(); err != nil { if e.Connect != nil {
err := e.Connect.Validate()
if err != nil {
validationErr = multierror.Append(validationErr, err) validationErr = multierror.Append(validationErr, err)
} }
}
return validationErr return validationErr
} }
@ -663,6 +666,41 @@ type UpstreamConfig struct {
MeshGateway MeshGatewayConfig `json:",omitempty" alias:"mesh_gateway" ` MeshGateway MeshGatewayConfig `json:",omitempty" alias:"mesh_gateway" `
} }
func (cfg UpstreamConfig) MergeInto(dst map[string]interface{}, legacy bool) {
var (
listenerKey = "listener_json"
clusterKey = "cluster_json"
)
// Starting in Consul 1.10, the "envoy_" prefix was removed from these flags
if legacy {
listenerKey = fmt.Sprintf("envoy_%s", listenerKey)
clusterKey = fmt.Sprintf("envoy_%s", clusterKey)
}
// Avoid storing empty values in the map, since these can act as overrides
if cfg.ListenerJSON != "" {
dst[listenerKey] = cfg.ListenerJSON
}
if cfg.ClusterJSON != "" {
dst[clusterKey] = cfg.ClusterJSON
}
if cfg.Protocol != "" {
dst["protocol"] = cfg.Protocol
}
if cfg.ConnectTimeoutMs != 0 {
dst["connect_timeout_ms"] = cfg.ConnectTimeoutMs
}
if !cfg.MeshGateway.IsZero() {
dst["mesh_gateway"] = cfg.MeshGateway
}
if !cfg.Limits.IsZero() {
dst["limits"] = cfg.Limits
}
if !cfg.PassiveHealthCheck.IsZero() {
dst["passive_health_check"] = cfg.PassiveHealthCheck
}
}
func (cfg *UpstreamConfig) Normalize() { func (cfg *UpstreamConfig) Normalize() {
if cfg.Protocol == "" { if cfg.Protocol == "" {
cfg.Protocol = "tcp" cfg.Protocol = "tcp"
@ -688,6 +726,39 @@ func (cfg UpstreamConfig) Validate() error {
return validationErr return validationErr
} }
func ParseUpstreamConfigNoDefaults(m map[string]interface{}) (UpstreamConfig, error) {
var cfg UpstreamConfig
config := &mapstructure.DecoderConfig{
DecodeHook: mapstructure.ComposeDecodeHookFunc(
decode.HookWeakDecodeFromSlice,
decode.HookTranslateKeys,
mapstructure.StringToTimeDurationHookFunc(),
),
Result: &cfg,
WeaklyTypedInput: true,
}
decoder, err := mapstructure.NewDecoder(config)
if err != nil {
return cfg, err
}
err = decoder.Decode(m)
return cfg, err
}
// ParseUpstreamConfig returns the UpstreamConfig parsed from an opaque map.
// If an error occurs during parsing it is returned along with the default
// config this allows caller to choose whether and how to report the error.
func ParseUpstreamConfig(m map[string]interface{}) (UpstreamConfig, error) {
cfg, err := ParseUpstreamConfigNoDefaults(m)
// Set defaults (even if error is returned)
cfg.Normalize()
return cfg, err
}
type PassiveHealthCheck struct { type PassiveHealthCheck struct {
// Interval between health check analysis sweeps. Each sweep may remove // Interval between health check analysis sweeps. Each sweep may remove
// hosts or return hosts to the pool. // hosts or return hosts to the pool.
@ -698,6 +769,11 @@ type PassiveHealthCheck struct {
MaxFailures uint32 `alias:"max_failures"` MaxFailures uint32 `alias:"max_failures"`
} }
func (chk *PassiveHealthCheck) IsZero() bool {
zeroVal := PassiveHealthCheck{}
return *chk == zeroVal
}
func (chk PassiveHealthCheck) Validate() error { func (chk PassiveHealthCheck) Validate() error {
if chk.Interval <= 0*time.Second { if chk.Interval <= 0*time.Second {
return fmt.Errorf("passive health check interval must be greater than 0s") return fmt.Errorf("passive health check interval must be greater than 0s")
@ -724,6 +800,11 @@ type UpstreamLimits struct {
MaxConcurrentRequests *int `alias:"max_concurrent_requests"` MaxConcurrentRequests *int `alias:"max_concurrent_requests"`
} }
func (ul *UpstreamLimits) IsZero() bool {
zeroVal := UpstreamLimits{}
return *ul == zeroVal
}
func (ul UpstreamLimits) Validate() error { func (ul UpstreamLimits) Validate() error {
if ul.MaxConnections != nil && *ul.MaxConnections <= 0 { if ul.MaxConnections != nil && *ul.MaxConnections <= 0 {
return fmt.Errorf("max connections must be at least 0") return fmt.Errorf("max connections must be at least 0")

View File

@ -8,6 +8,7 @@ import (
"github.com/hashicorp/go-msgpack/codec" "github.com/hashicorp/go-msgpack/codec"
"github.com/hashicorp/hcl" "github.com/hashicorp/hcl"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require" "github.com/stretchr/testify/require"
) )
@ -1596,6 +1597,322 @@ func TestServiceConfigEntry_Normalize(t *testing.T) {
} }
} }
func TestUpstreamConfig_MergeInto(t *testing.T) {
tt := []struct {
name string
source UpstreamConfig
destination map[string]interface{}
legacy bool
want map[string]interface{}
}{
{
name: "kitchen sink",
legacy: false,
source: UpstreamConfig{
ListenerJSON: "foo",
ClusterJSON: "bar",
ConnectTimeoutMs: 5,
Protocol: "http",
Limits: UpstreamLimits{
MaxConnections: intPointer(3),
MaxPendingRequests: intPointer(4),
MaxConcurrentRequests: intPointer(5),
},
PassiveHealthCheck: PassiveHealthCheck{
MaxFailures: 3,
Interval: 2 * time.Second,
},
MeshGateway: MeshGatewayConfig{Mode: MeshGatewayModeRemote},
},
destination: make(map[string]interface{}),
want: map[string]interface{}{
"listener_json": "foo",
"cluster_json": "bar",
"connect_timeout_ms": 5,
"protocol": "http",
"limits": UpstreamLimits{
MaxConnections: intPointer(3),
MaxPendingRequests: intPointer(4),
MaxConcurrentRequests: intPointer(5),
},
"passive_health_check": PassiveHealthCheck{
MaxFailures: 3,
Interval: 2 * time.Second,
},
"mesh_gateway": MeshGatewayConfig{Mode: MeshGatewayModeRemote},
},
},
{
name: "kitchen sink override of destination",
legacy: false,
source: UpstreamConfig{
ListenerJSON: "foo",
ClusterJSON: "bar",
ConnectTimeoutMs: 5,
Protocol: "http",
Limits: UpstreamLimits{
MaxConnections: intPointer(3),
MaxPendingRequests: intPointer(4),
MaxConcurrentRequests: intPointer(5),
},
PassiveHealthCheck: PassiveHealthCheck{
MaxFailures: 3,
Interval: 2 * time.Second,
},
MeshGateway: MeshGatewayConfig{Mode: MeshGatewayModeRemote},
},
destination: map[string]interface{}{
"listener_json": "zip",
"cluster_json": "zap",
"connect_timeout_ms": 10,
"protocol": "grpc",
"limits": UpstreamLimits{
MaxConnections: intPointer(10),
MaxPendingRequests: intPointer(11),
MaxConcurrentRequests: intPointer(12),
},
"passive_health_check": PassiveHealthCheck{
MaxFailures: 13,
Interval: 14 * time.Second,
},
"mesh_gateway": MeshGatewayConfig{Mode: MeshGatewayModeLocal},
},
want: map[string]interface{}{
"listener_json": "foo",
"cluster_json": "bar",
"connect_timeout_ms": 5,
"protocol": "http",
"limits": UpstreamLimits{
MaxConnections: intPointer(3),
MaxPendingRequests: intPointer(4),
MaxConcurrentRequests: intPointer(5),
},
"passive_health_check": PassiveHealthCheck{
MaxFailures: 3,
Interval: 2 * time.Second,
},
"mesh_gateway": MeshGatewayConfig{Mode: MeshGatewayModeRemote},
},
},
{
name: "legacy flag adds envoy prefix",
legacy: true,
source: UpstreamConfig{
ListenerJSON: "foo",
ClusterJSON: "bar",
},
destination: make(map[string]interface{}),
want: map[string]interface{}{
"envoy_listener_json": "foo",
"envoy_cluster_json": "bar",
},
},
{
name: "empty source leaves destination intact",
legacy: true,
source: UpstreamConfig{},
destination: map[string]interface{}{
"listener_json": "zip",
"cluster_json": "zap",
"connect_timeout_ms": 10,
"protocol": "grpc",
"limits": UpstreamLimits{
MaxConnections: intPointer(10),
MaxPendingRequests: intPointer(11),
MaxConcurrentRequests: intPointer(12),
},
"passive_health_check": PassiveHealthCheck{
MaxFailures: 13,
Interval: 14 * time.Second,
},
"mesh_gateway": MeshGatewayConfig{Mode: MeshGatewayModeLocal},
},
want: map[string]interface{}{
"listener_json": "zip",
"cluster_json": "zap",
"connect_timeout_ms": 10,
"protocol": "grpc",
"limits": UpstreamLimits{
MaxConnections: intPointer(10),
MaxPendingRequests: intPointer(11),
MaxConcurrentRequests: intPointer(12),
},
"passive_health_check": PassiveHealthCheck{
MaxFailures: 13,
Interval: 14 * time.Second,
},
"mesh_gateway": MeshGatewayConfig{Mode: MeshGatewayModeLocal},
},
},
{
name: "empty source and destination is a noop",
legacy: true,
source: UpstreamConfig{},
destination: make(map[string]interface{}),
want: map[string]interface{}{},
},
}
for _, tc := range tt {
t.Run(tc.name, func(t *testing.T) {
tc.source.MergeInto(tc.destination, tc.legacy)
assert.Equal(t, tc.want, tc.destination)
})
}
}
func TestParseUpstreamConfig(t *testing.T) {
tests := []struct {
name string
input map[string]interface{}
want UpstreamConfig
}{
{
name: "defaults - nil",
input: nil,
want: UpstreamConfig{
ConnectTimeoutMs: 5000,
Protocol: "tcp",
},
},
{
name: "defaults - empty",
input: map[string]interface{}{},
want: UpstreamConfig{
ConnectTimeoutMs: 5000,
Protocol: "tcp",
},
},
{
name: "defaults - other stuff",
input: map[string]interface{}{
"foo": "bar",
"envoy_foo": "envoy_bar",
},
want: UpstreamConfig{
ConnectTimeoutMs: 5000,
Protocol: "tcp",
},
},
{
name: "protocol override",
input: map[string]interface{}{
"protocol": "http",
},
want: UpstreamConfig{
ConnectTimeoutMs: 5000,
Protocol: "http",
},
},
{
name: "connect timeout override, string",
input: map[string]interface{}{
"connect_timeout_ms": "1000",
},
want: UpstreamConfig{
ConnectTimeoutMs: 1000,
Protocol: "tcp",
},
},
{
name: "connect timeout override, float ",
input: map[string]interface{}{
"connect_timeout_ms": float64(1000.0),
},
want: UpstreamConfig{
ConnectTimeoutMs: 1000,
Protocol: "tcp",
},
},
{
name: "connect timeout override, int ",
input: map[string]interface{}{
"connect_timeout_ms": 1000,
},
want: UpstreamConfig{
ConnectTimeoutMs: 1000,
Protocol: "tcp",
},
},
{
name: "connect limits map",
input: map[string]interface{}{
"limits": map[string]interface{}{
"max_connections": 50,
"max_pending_requests": 60,
"max_concurrent_requests": 70,
},
},
want: UpstreamConfig{
ConnectTimeoutMs: 5000,
Protocol: "tcp",
Limits: UpstreamLimits{
MaxConnections: intPointer(50),
MaxPendingRequests: intPointer(60),
MaxConcurrentRequests: intPointer(70),
},
},
},
{
name: "connect limits map zero",
input: map[string]interface{}{
"limits": map[string]interface{}{
"max_connections": 0,
"max_pending_requests": 0,
"max_concurrent_requests": 0,
},
},
want: UpstreamConfig{
ConnectTimeoutMs: 5000,
Protocol: "tcp",
Limits: UpstreamLimits{
MaxConnections: intPointer(0),
MaxPendingRequests: intPointer(0),
MaxConcurrentRequests: intPointer(0),
},
},
},
{
name: "passive health check map",
input: map[string]interface{}{
"passive_health_check": map[string]interface{}{
"interval": "22s",
"max_failures": 7,
},
},
want: UpstreamConfig{
ConnectTimeoutMs: 5000,
Protocol: "tcp",
PassiveHealthCheck: PassiveHealthCheck{
Interval: 22 * time.Second,
MaxFailures: 7,
},
},
},
{
name: "mesh gateway map",
input: map[string]interface{}{
"mesh_gateway": map[string]interface{}{
"Mode": "remote",
},
},
want: UpstreamConfig{
ConnectTimeoutMs: 5000,
Protocol: "tcp",
MeshGateway: MeshGatewayConfig{
Mode: MeshGatewayModeRemote,
},
},
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
got, err := ParseUpstreamConfig(tt.input)
require.NoError(t, err)
require.Equal(t, tt.want, got)
})
}
}
func requireContainsLower(t *testing.T, haystack, needle string) { func requireContainsLower(t *testing.T, haystack, needle string) {
t.Helper() t.Helper()
require.Contains(t, strings.ToLower(haystack), strings.ToLower(needle)) require.Contains(t, strings.ToLower(haystack), strings.ToLower(needle))

View File

@ -386,7 +386,7 @@ func (s *Server) makeUpstreamClusterForPreparedQuery(upstream structs.Upstream,
} }
sni := connect.UpstreamSNI(&upstream, "", dc, cfgSnap.Roots.TrustDomain) sni := connect.UpstreamSNI(&upstream, "", dc, cfgSnap.Roots.TrustDomain)
cfg, err := ParseUpstreamConfig(upstream.Config) cfg, err := structs.ParseUpstreamConfig(upstream.Config)
if err != nil { if err != nil {
// Don't hard fail on a config typo, just warn. The parse func returns // Don't hard fail on a config typo, just warn. The parse func returns
// default config if there is an error so it's safe to continue. // default config if there is an error so it's safe to continue.
@ -448,7 +448,7 @@ func (s *Server) makeUpstreamClustersForDiscoveryChain(
return nil, fmt.Errorf("cannot create upstream cluster without discovery chain for %s", upstream.Identifier()) return nil, fmt.Errorf("cannot create upstream cluster without discovery chain for %s", upstream.Identifier())
} }
cfg, err := ParseUpstreamConfigNoDefaults(upstream.Config) cfg, err := structs.ParseUpstreamConfigNoDefaults(upstream.Config)
if err != nil { if err != nil {
// Don't hard fail on a config typo, just warn. The parse func returns // Don't hard fail on a config typo, just warn. The parse func returns
// default config if there is an error so it's safe to continue. // default config if there is an error so it's safe to continue.

View File

@ -161,36 +161,3 @@ func ToOutlierDetection(p structs.PassiveHealthCheck) *envoy_cluster_v3.OutlierD
} }
return od return od
} }
func ParseUpstreamConfigNoDefaults(m map[string]interface{}) (structs.UpstreamConfig, error) {
var cfg structs.UpstreamConfig
config := &mapstructure.DecoderConfig{
DecodeHook: mapstructure.ComposeDecodeHookFunc(
decode.HookWeakDecodeFromSlice,
decode.HookTranslateKeys,
mapstructure.StringToTimeDurationHookFunc(),
),
Result: &cfg,
WeaklyTypedInput: true,
}
decoder, err := mapstructure.NewDecoder(config)
if err != nil {
return cfg, err
}
err = decoder.Decode(m)
return cfg, err
}
// ParseUpstreamConfig returns the UpstreamConfig parsed from an opaque map.
// If an error occurs during parsing it is returned along with the default
// config this allows caller to choose whether and how to report the error.
func ParseUpstreamConfig(m map[string]interface{}) (structs.UpstreamConfig, error) {
cfg, err := ParseUpstreamConfigNoDefaults(m)
// Set defaults (even if error is returned)
cfg.Normalize()
return cfg, err
}

View File

@ -2,11 +2,9 @@ package xds
import ( import (
"testing" "testing"
"time"
"github.com/stretchr/testify/require"
"github.com/hashicorp/consul/agent/structs" "github.com/hashicorp/consul/agent/structs"
"github.com/stretchr/testify/require"
) )
func TestParseProxyConfig(t *testing.T) { func TestParseProxyConfig(t *testing.T) {
@ -168,144 +166,6 @@ func TestParseProxyConfig(t *testing.T) {
} }
} }
func TestParseUpstreamConfig(t *testing.T) {
tests := []struct {
name string
input map[string]interface{}
want structs.UpstreamConfig
}{
{
name: "defaults - nil",
input: nil,
want: structs.UpstreamConfig{
ConnectTimeoutMs: 5000,
Protocol: "tcp",
},
},
{
name: "defaults - empty",
input: map[string]interface{}{},
want: structs.UpstreamConfig{
ConnectTimeoutMs: 5000,
Protocol: "tcp",
},
},
{
name: "defaults - other stuff",
input: map[string]interface{}{
"foo": "bar",
"envoy_foo": "envoy_bar",
},
want: structs.UpstreamConfig{
ConnectTimeoutMs: 5000,
Protocol: "tcp",
},
},
{
name: "protocol override",
input: map[string]interface{}{
"protocol": "http",
},
want: structs.UpstreamConfig{
ConnectTimeoutMs: 5000,
Protocol: "http",
},
},
{
name: "connect timeout override, string",
input: map[string]interface{}{
"connect_timeout_ms": "1000",
},
want: structs.UpstreamConfig{
ConnectTimeoutMs: 1000,
Protocol: "tcp",
},
},
{
name: "connect timeout override, float ",
input: map[string]interface{}{
"connect_timeout_ms": float64(1000.0),
},
want: structs.UpstreamConfig{
ConnectTimeoutMs: 1000,
Protocol: "tcp",
},
},
{
name: "connect timeout override, int ",
input: map[string]interface{}{
"connect_timeout_ms": 1000,
},
want: structs.UpstreamConfig{
ConnectTimeoutMs: 1000,
Protocol: "tcp",
},
},
{
name: "connect limits map",
input: map[string]interface{}{
"limits": map[string]interface{}{
"max_connections": 50,
"max_pending_requests": 60,
"max_concurrent_requests": 70,
},
},
want: structs.UpstreamConfig{
ConnectTimeoutMs: 5000,
Protocol: "tcp",
Limits: structs.UpstreamLimits{
MaxConnections: intPointer(50),
MaxPendingRequests: intPointer(60),
MaxConcurrentRequests: intPointer(70),
},
},
},
{
name: "connect limits map zero",
input: map[string]interface{}{
"limits": map[string]interface{}{
"max_connections": 0,
"max_pending_requests": 0,
"max_concurrent_requests": 0,
},
},
want: structs.UpstreamConfig{
ConnectTimeoutMs: 5000,
Protocol: "tcp",
Limits: structs.UpstreamLimits{
MaxConnections: intPointer(0),
MaxPendingRequests: intPointer(0),
MaxConcurrentRequests: intPointer(0),
},
},
},
{
name: "passive health check map",
input: map[string]interface{}{
"passive_health_check": map[string]interface{}{
"interval": "22s",
"max_failures": 7,
},
},
want: structs.UpstreamConfig{
ConnectTimeoutMs: 5000,
Protocol: "tcp",
PassiveHealthCheck: structs.PassiveHealthCheck{
Interval: 22 * time.Second,
MaxFailures: 7,
},
},
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
got, err := ParseUpstreamConfig(tt.input)
require.NoError(t, err)
require.Equal(t, tt.want, got)
})
}
}
func TestParseGatewayConfig(t *testing.T) { func TestParseGatewayConfig(t *testing.T) {
tests := []struct { tests := []struct {
name string name string

View File

@ -312,7 +312,7 @@ func (s *Server) endpointsFromDiscoveryChain(
return resources return resources
} }
cfg, err := ParseUpstreamConfigNoDefaults(upstream.Config) cfg, err := structs.ParseUpstreamConfigNoDefaults(upstream.Config)
if err != nil { if err != nil {
// Don't hard fail on a config typo, just warn. The parse func returns // Don't hard fail on a config typo, just warn. The parse func returns
// default config if there is an error so it's safe to continue. // default config if there is an error so it's safe to continue.

View File

@ -1078,7 +1078,7 @@ func getAndModifyUpstreamConfigForListener(logger hclog.Logger, u *structs.Upstr
) )
if chain == nil || chain.IsDefault() { if chain == nil || chain.IsDefault() {
cfg, err = ParseUpstreamConfig(u.Config) cfg, err = structs.ParseUpstreamConfig(u.Config)
if err != nil { if err != nil {
// Don't hard fail on a config typo, just warn. The parse func returns // Don't hard fail on a config typo, just warn. The parse func returns
// default config if there is an error so it's safe to continue. // default config if there is an error so it's safe to continue.
@ -1087,7 +1087,7 @@ func getAndModifyUpstreamConfigForListener(logger hclog.Logger, u *structs.Upstr
} else { } else {
// Use NoDefaults here so that we can set the protocol to the chain // Use NoDefaults here so that we can set the protocol to the chain
// protocol if necessary // protocol if necessary
cfg, err = ParseUpstreamConfigNoDefaults(u.Config) cfg, err = structs.ParseUpstreamConfigNoDefaults(u.Config)
if err != nil { if err != nil {
// Don't hard fail on a config typo, just warn. The parse func returns // Don't hard fail on a config typo, just warn. The parse func returns
// default config if there is an error so it's safe to continue. // default config if there is an error so it's safe to continue.