Create new types for service-defaults upstream cfg
This commit is contained in:
parent
f0e34dfadb
commit
4bbd495b54
|
@ -418,8 +418,8 @@ func TestServiceManager_PersistService_API(t *testing.T) {
|
|||
"foo": 1,
|
||||
"protocol": "http",
|
||||
},
|
||||
UpstreamIDConfigs: structs.UpstreamConfigs{
|
||||
structs.UpstreamConfig{
|
||||
UpstreamIDConfigs: structs.OpaqueUpstreamConfigs{
|
||||
structs.OpaqueUpstreamConfig{
|
||||
Upstream: structs.NewServiceID("redis", nil),
|
||||
Config: map[string]interface{}{
|
||||
"protocol": "tcp",
|
||||
|
@ -459,8 +459,8 @@ func TestServiceManager_PersistService_API(t *testing.T) {
|
|||
"foo": 1,
|
||||
"protocol": "http",
|
||||
},
|
||||
UpstreamIDConfigs: structs.UpstreamConfigs{
|
||||
structs.UpstreamConfig{
|
||||
UpstreamIDConfigs: structs.OpaqueUpstreamConfigs{
|
||||
structs.OpaqueUpstreamConfig{
|
||||
Upstream: structs.NewServiceID("redis", nil),
|
||||
Config: map[string]interface{}{
|
||||
"protocol": "tcp",
|
||||
|
@ -634,8 +634,8 @@ func TestServiceManager_PersistService_ConfigFiles(t *testing.T) {
|
|||
"foo": 1,
|
||||
"protocol": "http",
|
||||
},
|
||||
UpstreamIDConfigs: structs.UpstreamConfigs{
|
||||
structs.UpstreamConfig{
|
||||
UpstreamIDConfigs: structs.OpaqueUpstreamConfigs{
|
||||
structs.OpaqueUpstreamConfig{
|
||||
Upstream: structs.NewServiceID("redis", nil),
|
||||
Config: map[string]interface{}{
|
||||
"protocol": "tcp",
|
||||
|
|
|
@ -87,11 +87,7 @@ type ServiceConfigEntry struct {
|
|||
|
||||
ExternalSNI string `json:",omitempty" alias:"external_sni"`
|
||||
|
||||
// TODO(banks): enable this once we have upstreams supported too. Enabling
|
||||
// sidecars actually makes no sense and adds complications when you don't
|
||||
// allow upstreams to be specified centrally too.
|
||||
//
|
||||
// Connect ConnectConfiguration
|
||||
Connect *ConnectConfiguration `json:",omitempty"`
|
||||
|
||||
Meta map[string]string `json:",omitempty"`
|
||||
EnterpriseMeta `hcl:",squash" mapstructure:",squash"`
|
||||
|
@ -131,13 +127,20 @@ func (e *ServiceConfigEntry) Normalize() error {
|
|||
e.Kind = ServiceDefaults
|
||||
e.Protocol = strings.ToLower(e.Protocol)
|
||||
|
||||
e.Connect.Normalize()
|
||||
e.EnterpriseMeta.Normalize()
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (e *ServiceConfigEntry) Validate() error {
|
||||
return validateConfigEntryMeta(e.Meta)
|
||||
validationErr := validateConfigEntryMeta(e.Meta)
|
||||
|
||||
if err := e.Connect.Validate(); err != nil {
|
||||
validationErr = multierror.Append(validationErr, err)
|
||||
}
|
||||
|
||||
return validationErr
|
||||
}
|
||||
|
||||
func (e *ServiceConfigEntry) CanRead(authz acl.Authorizer) bool {
|
||||
|
@ -169,7 +172,38 @@ func (e *ServiceConfigEntry) GetEnterpriseMeta() *EnterpriseMeta {
|
|||
}
|
||||
|
||||
type ConnectConfiguration struct {
|
||||
SidecarProxy bool
|
||||
// UpstreamConfigs is a map of <namespace/>service to per-upstream configuration
|
||||
UpstreamConfigs map[string]*UpstreamConfig `json:",omitempty" alias:"upstream_configs"`
|
||||
|
||||
// UpstreamDefaults contains default configuration for all upstreams of a given service
|
||||
UpstreamDefaults *UpstreamConfig `json:",omitempty" alias:"upstream_defaults"`
|
||||
}
|
||||
|
||||
func (cfg *ConnectConfiguration) Normalize() {
|
||||
if cfg == nil {
|
||||
return
|
||||
}
|
||||
for _, v := range cfg.UpstreamConfigs {
|
||||
v.Normalize()
|
||||
}
|
||||
|
||||
cfg.UpstreamDefaults.Normalize()
|
||||
}
|
||||
|
||||
func (cfg ConnectConfiguration) Validate() error {
|
||||
var validationErr error
|
||||
|
||||
for k, v := range cfg.UpstreamConfigs {
|
||||
if err := v.Validate(); err != nil {
|
||||
validationErr = multierror.Append(validationErr, fmt.Errorf("error in upstream config for %s: %v", k, err))
|
||||
}
|
||||
}
|
||||
|
||||
if err := cfg.UpstreamDefaults.Validate(); err != nil {
|
||||
validationErr = multierror.Append(validationErr, fmt.Errorf("error in upstream defaults %v", err))
|
||||
}
|
||||
|
||||
return validationErr
|
||||
}
|
||||
|
||||
// ProxyConfigEntry is the top-level struct for global proxy configuration defaults.
|
||||
|
@ -592,13 +626,125 @@ func (r *ServiceConfigRequest) CacheInfo() cache.RequestInfo {
|
|||
}
|
||||
|
||||
type UpstreamConfig struct {
|
||||
// ListenerJSON is a complete override ("escape hatch") for the upstream's
|
||||
// listener.
|
||||
//
|
||||
// Note: This escape hatch is NOT compatible with the discovery chain and
|
||||
// will be ignored if a discovery chain is active.
|
||||
ListenerJSON string `json:",omitempty" alias:"listener_json"`
|
||||
|
||||
// ClusterJSON is a complete override ("escape hatch") for the upstream's
|
||||
// cluster. The Connect client TLS certificate and context will be injected
|
||||
// overriding any TLS settings present.
|
||||
//
|
||||
// Note: This escape hatch is NOT compatible with the discovery chain and
|
||||
// will be ignored if a discovery chain is active.
|
||||
ClusterJSON string `alias:"cluster_json"`
|
||||
|
||||
// Protocol describes the upstream's service protocol. Valid values are "tcp",
|
||||
// "http" and "grpc". Anything else is treated as tcp. The enables protocol
|
||||
// aware features like per-request metrics and connection pooling, tracing,
|
||||
// routing etc.
|
||||
Protocol string
|
||||
|
||||
// ConnectTimeoutMs is the number of milliseconds to timeout making a new
|
||||
// connection to this upstream. Defaults to 5000 (5 seconds) if not set.
|
||||
ConnectTimeoutMs int `alias:"connect_timeout_ms"`
|
||||
|
||||
// Limits are the set of limits that are applied to the proxy for a specific upstream of a
|
||||
// service instance.
|
||||
Limits UpstreamLimits
|
||||
|
||||
// PassiveHealthCheck configuration determines how upstream proxy instances will
|
||||
// be monitored for removal from the load balancing pool.
|
||||
PassiveHealthCheck PassiveHealthCheck `json:",omitempty" alias:"passive_health_check"`
|
||||
|
||||
// MeshGatewayConfig controls how Mesh Gateways are configured and used
|
||||
MeshGateway MeshGatewayConfig `json:",omitempty" alias:"mesh_gateway" `
|
||||
}
|
||||
|
||||
func (cfg *UpstreamConfig) Normalize() {
|
||||
if cfg.Protocol == "" {
|
||||
cfg.Protocol = "tcp"
|
||||
} else {
|
||||
cfg.Protocol = strings.ToLower(cfg.Protocol)
|
||||
}
|
||||
|
||||
if cfg.ConnectTimeoutMs < 1 {
|
||||
cfg.ConnectTimeoutMs = 5000
|
||||
}
|
||||
}
|
||||
|
||||
func (cfg UpstreamConfig) Validate() error {
|
||||
var validationErr error
|
||||
|
||||
if err := cfg.PassiveHealthCheck.Validate(); err != nil {
|
||||
validationErr = multierror.Append(validationErr, err)
|
||||
}
|
||||
if err := cfg.Limits.Validate(); err != nil {
|
||||
validationErr = multierror.Append(validationErr, err)
|
||||
}
|
||||
|
||||
return validationErr
|
||||
}
|
||||
|
||||
type PassiveHealthCheck struct {
|
||||
// Interval between health check analysis sweeps. Each sweep may remove
|
||||
// hosts or return hosts to the pool.
|
||||
Interval time.Duration
|
||||
|
||||
// MaxFailures is the count of consecutive failures that results in a host
|
||||
// being removed from the pool.
|
||||
MaxFailures uint32 `alias:"max_failures"`
|
||||
}
|
||||
|
||||
func (chk PassiveHealthCheck) Validate() error {
|
||||
if chk.Interval <= 0*time.Second {
|
||||
return fmt.Errorf("passive health check interval must be greater than 0s")
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// UpstreamLimits describes the limits that are associated with a specific
|
||||
// upstream of a service instance.
|
||||
type UpstreamLimits struct {
|
||||
// MaxConnections is the maximum number of connections the local proxy can
|
||||
// make to the upstream service.
|
||||
MaxConnections *int `alias:"max_connections"`
|
||||
|
||||
// MaxPendingRequests is the maximum number of requests that will be queued
|
||||
// waiting for an available connection. This is mostly applicable to HTTP/1.1
|
||||
// clusters since all HTTP/2 requests are streamed over a single
|
||||
// connection.
|
||||
MaxPendingRequests *int `alias:"max_pending_requests"`
|
||||
|
||||
// MaxConcurrentRequests is the maximum number of in-flight requests that will be allowed
|
||||
// to the upstream cluster at a point in time. This is mostly applicable to HTTP/2
|
||||
// clusters since all HTTP/1.1 requests are limited by MaxConnections.
|
||||
MaxConcurrentRequests *int `alias:"max_concurrent_requests"`
|
||||
}
|
||||
|
||||
func (ul UpstreamLimits) Validate() error {
|
||||
if ul.MaxConnections != nil && *ul.MaxConnections <= 0 {
|
||||
return fmt.Errorf("max connections must be at least 0")
|
||||
}
|
||||
if ul.MaxPendingRequests != nil && *ul.MaxPendingRequests <= 0 {
|
||||
return fmt.Errorf("max pending requests must be at least 0")
|
||||
}
|
||||
if ul.MaxConcurrentRequests != nil && *ul.MaxConcurrentRequests <= 0 {
|
||||
return fmt.Errorf("max concurrent requests must be at least 0")
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
type OpaqueUpstreamConfig struct {
|
||||
Upstream ServiceID
|
||||
Config map[string]interface{}
|
||||
}
|
||||
|
||||
type UpstreamConfigs []UpstreamConfig
|
||||
type OpaqueUpstreamConfigs []OpaqueUpstreamConfig
|
||||
|
||||
func (configs UpstreamConfigs) GetUpstreamConfig(sid ServiceID) (config map[string]interface{}, found bool) {
|
||||
func (configs OpaqueUpstreamConfigs) GetUpstreamConfig(sid ServiceID) (config map[string]interface{}, found bool) {
|
||||
for _, usconf := range configs {
|
||||
if usconf.Upstream.Matches(sid) {
|
||||
return usconf.Config, true
|
||||
|
@ -611,7 +757,7 @@ func (configs UpstreamConfigs) GetUpstreamConfig(sid ServiceID) (config map[stri
|
|||
type ServiceConfigResponse struct {
|
||||
ProxyConfig map[string]interface{}
|
||||
UpstreamConfigs map[string]map[string]interface{}
|
||||
UpstreamIDConfigs UpstreamConfigs
|
||||
UpstreamIDConfigs OpaqueUpstreamConfigs
|
||||
MeshGateway MeshGatewayConfig `json:",omitempty"`
|
||||
Expose ExposeConfig `json:",omitempty"`
|
||||
QueryMeta
|
||||
|
|
|
@ -112,6 +112,33 @@ func TestDecodeConfigEntry(t *testing.T) {
|
|||
mesh_gateway {
|
||||
mode = "remote"
|
||||
}
|
||||
connect {
|
||||
upstream_configs {
|
||||
redis {
|
||||
passive_health_check {
|
||||
interval = "2s"
|
||||
max_failures = 3
|
||||
}
|
||||
}
|
||||
|
||||
"finance/billing" {
|
||||
mesh_gateway {
|
||||
mode = "remote"
|
||||
}
|
||||
}
|
||||
}
|
||||
upstream_defaults {
|
||||
connect_timeout_ms = 5
|
||||
protocol = "http"
|
||||
listener_json = "foo"
|
||||
cluster_json = "bar"
|
||||
limits {
|
||||
max_connections = 3
|
||||
max_pending_requests = 4
|
||||
max_concurrent_requests = 5
|
||||
}
|
||||
}
|
||||
}
|
||||
`,
|
||||
camel: `
|
||||
Kind = "service-defaults"
|
||||
|
@ -125,6 +152,33 @@ func TestDecodeConfigEntry(t *testing.T) {
|
|||
MeshGateway {
|
||||
Mode = "remote"
|
||||
}
|
||||
Connect {
|
||||
UpstreamConfigs {
|
||||
"redis" {
|
||||
PassiveHealthCheck {
|
||||
MaxFailures = 3
|
||||
Interval = "2s"
|
||||
}
|
||||
}
|
||||
|
||||
"finance/billing" {
|
||||
MeshGateway {
|
||||
Mode = "remote"
|
||||
}
|
||||
}
|
||||
}
|
||||
UpstreamDefaults {
|
||||
ListenerJSON = "foo"
|
||||
ClusterJSON = "bar"
|
||||
ConnectTimeoutMs = 5
|
||||
Protocol = "http"
|
||||
Limits {
|
||||
MaxConnections = 3
|
||||
MaxPendingRequests = 4
|
||||
MaxConcurrentRequests = 5
|
||||
}
|
||||
}
|
||||
}
|
||||
`,
|
||||
expect: &ServiceConfigEntry{
|
||||
Kind: "service-defaults",
|
||||
|
@ -138,6 +192,30 @@ func TestDecodeConfigEntry(t *testing.T) {
|
|||
MeshGateway: MeshGatewayConfig{
|
||||
Mode: MeshGatewayModeRemote,
|
||||
},
|
||||
Connect: &ConnectConfiguration{
|
||||
UpstreamConfigs: map[string]*UpstreamConfig{
|
||||
"redis": {
|
||||
PassiveHealthCheck: PassiveHealthCheck{
|
||||
MaxFailures: 3,
|
||||
Interval: 2 * time.Second,
|
||||
},
|
||||
},
|
||||
"finance/billing": {
|
||||
MeshGateway: MeshGatewayConfig{Mode: MeshGatewayModeRemote},
|
||||
},
|
||||
},
|
||||
UpstreamDefaults: &UpstreamConfig{
|
||||
ListenerJSON: "foo",
|
||||
ClusterJSON: "bar",
|
||||
ConnectTimeoutMs: 5,
|
||||
Protocol: "http",
|
||||
Limits: UpstreamLimits{
|
||||
MaxConnections: intPointer(3),
|
||||
MaxPendingRequests: intPointer(4),
|
||||
MaxConcurrentRequests: intPointer(5),
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
{
|
||||
|
@ -1330,7 +1408,199 @@ func TestConfigEntryResponseMarshalling(t *testing.T) {
|
|||
}
|
||||
}
|
||||
|
||||
func TestPassiveHealthCheck_Validate(t *testing.T) {
|
||||
tt := []struct {
|
||||
name string
|
||||
input PassiveHealthCheck
|
||||
wantErr bool
|
||||
wantMsg string
|
||||
}{
|
||||
{
|
||||
name: "valid-interval",
|
||||
input: PassiveHealthCheck{Interval: 2 * time.Second},
|
||||
wantErr: false,
|
||||
},
|
||||
{
|
||||
name: "negative-interval",
|
||||
input: PassiveHealthCheck{Interval: -1 * time.Second},
|
||||
wantErr: true,
|
||||
wantMsg: "greater than 0s",
|
||||
},
|
||||
{
|
||||
name: "zero-interval",
|
||||
input: PassiveHealthCheck{Interval: 0 * time.Second},
|
||||
wantErr: true,
|
||||
wantMsg: "greater than 0s",
|
||||
},
|
||||
}
|
||||
|
||||
for _, tc := range tt {
|
||||
t.Run(tc.name, func(t *testing.T) {
|
||||
err := tc.input.Validate()
|
||||
if err == nil {
|
||||
require.False(t, tc.wantErr)
|
||||
return
|
||||
}
|
||||
require.Contains(t, err.Error(), tc.wantMsg)
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
func TestUpstreamLimits_Validate(t *testing.T) {
|
||||
tt := []struct {
|
||||
name string
|
||||
input UpstreamLimits
|
||||
wantErr bool
|
||||
wantMsg string
|
||||
}{
|
||||
{
|
||||
name: "valid-max-conns",
|
||||
input: UpstreamLimits{MaxConnections: intPointer(1)},
|
||||
wantErr: false,
|
||||
},
|
||||
{
|
||||
name: "zero-max-conns",
|
||||
input: UpstreamLimits{MaxConnections: intPointer(0)},
|
||||
wantErr: true,
|
||||
wantMsg: "at least 0",
|
||||
},
|
||||
{
|
||||
name: "negative-max-conns",
|
||||
input: UpstreamLimits{MaxConnections: intPointer(-1)},
|
||||
wantErr: true,
|
||||
wantMsg: "at least 0",
|
||||
},
|
||||
{
|
||||
name: "valid-max-concurrent",
|
||||
input: UpstreamLimits{MaxConcurrentRequests: intPointer(1)},
|
||||
wantErr: false,
|
||||
},
|
||||
{
|
||||
name: "zero-max-concurrent",
|
||||
input: UpstreamLimits{MaxConcurrentRequests: intPointer(0)},
|
||||
wantErr: true,
|
||||
wantMsg: "at least 0",
|
||||
},
|
||||
{
|
||||
name: "negative-max-concurrent",
|
||||
input: UpstreamLimits{MaxConcurrentRequests: intPointer(-1)},
|
||||
wantErr: true,
|
||||
wantMsg: "at least 0",
|
||||
},
|
||||
{
|
||||
name: "valid-max-pending",
|
||||
input: UpstreamLimits{MaxPendingRequests: intPointer(1)},
|
||||
wantErr: false,
|
||||
},
|
||||
{
|
||||
name: "zero-max-pending",
|
||||
input: UpstreamLimits{MaxPendingRequests: intPointer(0)},
|
||||
wantErr: true,
|
||||
wantMsg: "at least 0",
|
||||
},
|
||||
{
|
||||
name: "negative-max-pending",
|
||||
input: UpstreamLimits{MaxPendingRequests: intPointer(-1)},
|
||||
wantErr: true,
|
||||
wantMsg: "at least 0",
|
||||
},
|
||||
}
|
||||
|
||||
for _, tc := range tt {
|
||||
t.Run(tc.name, func(t *testing.T) {
|
||||
err := tc.input.Validate()
|
||||
if err == nil {
|
||||
require.False(t, tc.wantErr)
|
||||
return
|
||||
}
|
||||
require.Contains(t, err.Error(), tc.wantMsg)
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
func TestServiceConfigEntry_Normalize(t *testing.T) {
|
||||
tt := []struct {
|
||||
name string
|
||||
input ServiceConfigEntry
|
||||
expect ServiceConfigEntry
|
||||
}{
|
||||
{
|
||||
name: "fill-in-kind",
|
||||
input: ServiceConfigEntry{
|
||||
Name: "web",
|
||||
},
|
||||
expect: ServiceConfigEntry{
|
||||
Kind: ServiceDefaults,
|
||||
Name: "web",
|
||||
},
|
||||
},
|
||||
{
|
||||
name: "lowercase-protocol",
|
||||
input: ServiceConfigEntry{
|
||||
Kind: ServiceDefaults,
|
||||
Name: "web",
|
||||
Protocol: "PrOtoCoL",
|
||||
},
|
||||
expect: ServiceConfigEntry{
|
||||
Kind: ServiceDefaults,
|
||||
Name: "web",
|
||||
Protocol: "protocol",
|
||||
},
|
||||
},
|
||||
{
|
||||
name: "connect-kitchen-sink",
|
||||
input: ServiceConfigEntry{
|
||||
Kind: ServiceDefaults,
|
||||
Name: "web",
|
||||
Connect: &ConnectConfiguration{
|
||||
UpstreamConfigs: map[string]*UpstreamConfig{
|
||||
"redis": {
|
||||
Protocol: "TcP",
|
||||
},
|
||||
"memcached": {
|
||||
ConnectTimeoutMs: -1,
|
||||
},
|
||||
},
|
||||
UpstreamDefaults: &UpstreamConfig{ConnectTimeoutMs: -20},
|
||||
},
|
||||
},
|
||||
expect: ServiceConfigEntry{
|
||||
Kind: ServiceDefaults,
|
||||
Name: "web",
|
||||
Connect: &ConnectConfiguration{
|
||||
UpstreamConfigs: map[string]*UpstreamConfig{
|
||||
"redis": {
|
||||
Protocol: "tcp",
|
||||
ConnectTimeoutMs: 5000,
|
||||
},
|
||||
"memcached": {
|
||||
Protocol: "tcp",
|
||||
ConnectTimeoutMs: 5000,
|
||||
},
|
||||
},
|
||||
UpstreamDefaults: &UpstreamConfig{
|
||||
Protocol: "tcp",
|
||||
ConnectTimeoutMs: 5000,
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
}
|
||||
|
||||
for _, tc := range tt {
|
||||
t.Run(tc.name, func(t *testing.T) {
|
||||
err := tc.input.Normalize()
|
||||
require.NoError(t, err)
|
||||
require.Equal(t, tc.expect, tc.input)
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
func requireContainsLower(t *testing.T, haystack, needle string) {
|
||||
t.Helper()
|
||||
require.Contains(t, strings.ToLower(haystack), strings.ToLower(needle))
|
||||
}
|
||||
|
||||
func intPointer(i int) *int {
|
||||
return &i
|
||||
}
|
||||
|
|
|
@ -416,7 +416,7 @@ func (s *Server) makeUpstreamClusterForPreparedQuery(upstream structs.Upstream,
|
|||
CircuitBreakers: &envoy_cluster_v3.CircuitBreakers{
|
||||
Thresholds: makeThresholdsIfNeeded(cfg.Limits),
|
||||
},
|
||||
OutlierDetection: cfg.PassiveHealthCheck.AsOutlierDetection(),
|
||||
OutlierDetection: ToOutlierDetection(cfg.PassiveHealthCheck),
|
||||
}
|
||||
if cfg.Protocol == "http2" || cfg.Protocol == "grpc" {
|
||||
c.Http2ProtocolOptions = &envoy_core_v3.Http2ProtocolOptions{}
|
||||
|
@ -524,7 +524,7 @@ func (s *Server) makeUpstreamClustersForDiscoveryChain(
|
|||
CircuitBreakers: &envoy_cluster_v3.CircuitBreakers{
|
||||
Thresholds: makeThresholdsIfNeeded(cfg.Limits),
|
||||
},
|
||||
OutlierDetection: cfg.PassiveHealthCheck.AsOutlierDetection(),
|
||||
OutlierDetection: ToOutlierDetection(cfg.PassiveHealthCheck),
|
||||
}
|
||||
|
||||
var lb *structs.LoadBalancer
|
||||
|
@ -734,8 +734,8 @@ func (s *Server) makeGatewayCluster(snap *proxycfg.ConfigSnapshot, opts gatewayC
|
|||
return cluster
|
||||
}
|
||||
|
||||
func makeThresholdsIfNeeded(limits UpstreamLimits) []*envoy_cluster_v3.CircuitBreakers_Thresholds {
|
||||
var empty UpstreamLimits
|
||||
func makeThresholdsIfNeeded(limits structs.UpstreamLimits) []*envoy_cluster_v3.CircuitBreakers_Thresholds {
|
||||
var empty structs.UpstreamLimits
|
||||
// Make sure to not create any thresholds when passed the zero-value in order
|
||||
// to rely on Envoy defaults
|
||||
if limits == empty {
|
||||
|
@ -743,6 +743,7 @@ func makeThresholdsIfNeeded(limits UpstreamLimits) []*envoy_cluster_v3.CircuitBr
|
|||
}
|
||||
|
||||
threshold := &envoy_cluster_v3.CircuitBreakers_Thresholds{}
|
||||
|
||||
// Likewise, make sure to not set any threshold values on the zero-value in
|
||||
// order to rely on Envoy defaults
|
||||
if limits.MaxConnections != nil {
|
||||
|
|
|
@ -1,10 +1,8 @@
|
|||
package xds
|
||||
|
||||
import (
|
||||
"strings"
|
||||
"time"
|
||||
|
||||
envoy_cluster_v3 "github.com/envoyproxy/go-control-plane/envoy/config/cluster/v3"
|
||||
"strings"
|
||||
|
||||
"github.com/golang/protobuf/ptypes"
|
||||
"github.com/golang/protobuf/ptypes/wrappers"
|
||||
|
@ -150,74 +148,10 @@ func ParseGatewayConfig(m map[string]interface{}) (GatewayConfig, error) {
|
|||
return cfg, err
|
||||
}
|
||||
|
||||
// UpstreamLimits describes the limits that are associated with a specific
|
||||
// upstream of a service instance.
|
||||
type UpstreamLimits struct {
|
||||
// MaxConnections is the maximum number of connections the local proxy can
|
||||
// make to the upstream service.
|
||||
MaxConnections *int `mapstructure:"max_connections"`
|
||||
|
||||
// MaxPendingRequests is the maximum number of requests that will be queued
|
||||
// waiting for an available connection. This is mostly applicable to HTTP/1.1
|
||||
// clusters since all HTTP/2 requests are streamed over a single
|
||||
// connection.
|
||||
MaxPendingRequests *int `mapstructure:"max_pending_requests"`
|
||||
|
||||
// MaxConcurrentRequests is the maximum number of in-flight requests that will be allowed
|
||||
// to the upstream cluster at a point in time. This is mostly applicable to HTTP/2
|
||||
// clusters since all HTTP/1.1 requests are limited by MaxConnections.
|
||||
MaxConcurrentRequests *int `mapstructure:"max_concurrent_requests"`
|
||||
}
|
||||
|
||||
// UpstreamConfig describes the keys we understand from
|
||||
// Connect.Proxy.Upstream[*].Config.
|
||||
type UpstreamConfig struct {
|
||||
// ListenerJSON is a complete override ("escape hatch") for the upstream's
|
||||
// listener.
|
||||
//
|
||||
// Note: This escape hatch is NOT compatible with the discovery chain and
|
||||
// will be ignored if a discovery chain is active.
|
||||
ListenerJSON string `mapstructure:"envoy_listener_json"`
|
||||
|
||||
// ClusterJSON is a complete override ("escape hatch") for the upstream's
|
||||
// cluster. The Connect client TLS certificate and context will be injected
|
||||
// overriding any TLS settings present.
|
||||
//
|
||||
// Note: This escape hatch is NOT compatible with the discovery chain and
|
||||
// will be ignored if a discovery chain is active.
|
||||
ClusterJSON string `mapstructure:"envoy_cluster_json"`
|
||||
|
||||
// Protocol describes the upstream's service protocol. Valid values are "tcp",
|
||||
// "http" and "grpc". Anything else is treated as tcp. The enables protocol
|
||||
// aware features like per-request metrics and connection pooling, tracing,
|
||||
// routing etc.
|
||||
Protocol string `mapstructure:"protocol"`
|
||||
|
||||
// ConnectTimeoutMs is the number of milliseconds to timeout making a new
|
||||
// connection to this upstream. Defaults to 5000 (5 seconds) if not set.
|
||||
ConnectTimeoutMs int `mapstructure:"connect_timeout_ms"`
|
||||
|
||||
// Limits are the set of limits that are applied to the proxy for a specific upstream of a
|
||||
// service instance.
|
||||
Limits UpstreamLimits `mapstructure:"limits"`
|
||||
|
||||
// PassiveHealthCheck configuration
|
||||
PassiveHealthCheck PassiveHealthCheck `mapstructure:"passive_health_check"`
|
||||
}
|
||||
|
||||
type PassiveHealthCheck struct {
|
||||
// Interval between health check analysis sweeps. Each sweep may remove
|
||||
// hosts or return hosts to the pool.
|
||||
Interval time.Duration
|
||||
// MaxFailures is the count of consecutive failures that results in a host
|
||||
// being removed from the pool.
|
||||
MaxFailures uint32 `mapstructure:"max_failures"`
|
||||
}
|
||||
|
||||
// Return an envoy.OutlierDetection populated by the values from this struct.
|
||||
// If all values are zero a default empty OutlierDetection will be returned to
|
||||
// enable outlier detection with default values.
|
||||
func (p PassiveHealthCheck) AsOutlierDetection() *envoy_cluster_v3.OutlierDetection {
|
||||
func ToOutlierDetection(p structs.PassiveHealthCheck) *envoy_cluster_v3.OutlierDetection {
|
||||
od := &envoy_cluster_v3.OutlierDetection{}
|
||||
if p.Interval != 0 {
|
||||
od.Interval = ptypes.DurationProto(p.Interval)
|
||||
|
@ -228,8 +162,8 @@ func (p PassiveHealthCheck) AsOutlierDetection() *envoy_cluster_v3.OutlierDetect
|
|||
return od
|
||||
}
|
||||
|
||||
func ParseUpstreamConfigNoDefaults(m map[string]interface{}) (UpstreamConfig, error) {
|
||||
var cfg UpstreamConfig
|
||||
func ParseUpstreamConfigNoDefaults(m map[string]interface{}) (structs.UpstreamConfig, error) {
|
||||
var cfg structs.UpstreamConfig
|
||||
config := &mapstructure.DecoderConfig{
|
||||
DecodeHook: mapstructure.ComposeDecodeHookFunc(
|
||||
decode.HookWeakDecodeFromSlice,
|
||||
|
@ -252,16 +186,11 @@ func ParseUpstreamConfigNoDefaults(m map[string]interface{}) (UpstreamConfig, er
|
|||
// ParseUpstreamConfig returns the UpstreamConfig parsed from an opaque map.
|
||||
// If an error occurs during parsing it is returned along with the default
|
||||
// config this allows caller to choose whether and how to report the error.
|
||||
func ParseUpstreamConfig(m map[string]interface{}) (UpstreamConfig, error) {
|
||||
func ParseUpstreamConfig(m map[string]interface{}) (structs.UpstreamConfig, error) {
|
||||
cfg, err := ParseUpstreamConfigNoDefaults(m)
|
||||
|
||||
// Set defaults (even if error is returned)
|
||||
if cfg.Protocol == "" {
|
||||
cfg.Protocol = "tcp"
|
||||
} else {
|
||||
cfg.Protocol = strings.ToLower(cfg.Protocol)
|
||||
}
|
||||
if cfg.ConnectTimeoutMs < 1 {
|
||||
cfg.ConnectTimeoutMs = 5000
|
||||
}
|
||||
cfg.Normalize()
|
||||
|
||||
return cfg, err
|
||||
}
|
||||
|
|
|
@ -172,12 +172,12 @@ func TestParseUpstreamConfig(t *testing.T) {
|
|||
tests := []struct {
|
||||
name string
|
||||
input map[string]interface{}
|
||||
want UpstreamConfig
|
||||
want structs.UpstreamConfig
|
||||
}{
|
||||
{
|
||||
name: "defaults - nil",
|
||||
input: nil,
|
||||
want: UpstreamConfig{
|
||||
want: structs.UpstreamConfig{
|
||||
ConnectTimeoutMs: 5000,
|
||||
Protocol: "tcp",
|
||||
},
|
||||
|
@ -185,7 +185,7 @@ func TestParseUpstreamConfig(t *testing.T) {
|
|||
{
|
||||
name: "defaults - empty",
|
||||
input: map[string]interface{}{},
|
||||
want: UpstreamConfig{
|
||||
want: structs.UpstreamConfig{
|
||||
ConnectTimeoutMs: 5000,
|
||||
Protocol: "tcp",
|
||||
},
|
||||
|
@ -196,7 +196,7 @@ func TestParseUpstreamConfig(t *testing.T) {
|
|||
"foo": "bar",
|
||||
"envoy_foo": "envoy_bar",
|
||||
},
|
||||
want: UpstreamConfig{
|
||||
want: structs.UpstreamConfig{
|
||||
ConnectTimeoutMs: 5000,
|
||||
Protocol: "tcp",
|
||||
},
|
||||
|
@ -206,7 +206,7 @@ func TestParseUpstreamConfig(t *testing.T) {
|
|||
input: map[string]interface{}{
|
||||
"protocol": "http",
|
||||
},
|
||||
want: UpstreamConfig{
|
||||
want: structs.UpstreamConfig{
|
||||
ConnectTimeoutMs: 5000,
|
||||
Protocol: "http",
|
||||
},
|
||||
|
@ -216,7 +216,7 @@ func TestParseUpstreamConfig(t *testing.T) {
|
|||
input: map[string]interface{}{
|
||||
"connect_timeout_ms": "1000",
|
||||
},
|
||||
want: UpstreamConfig{
|
||||
want: structs.UpstreamConfig{
|
||||
ConnectTimeoutMs: 1000,
|
||||
Protocol: "tcp",
|
||||
},
|
||||
|
@ -226,7 +226,7 @@ func TestParseUpstreamConfig(t *testing.T) {
|
|||
input: map[string]interface{}{
|
||||
"connect_timeout_ms": float64(1000.0),
|
||||
},
|
||||
want: UpstreamConfig{
|
||||
want: structs.UpstreamConfig{
|
||||
ConnectTimeoutMs: 1000,
|
||||
Protocol: "tcp",
|
||||
},
|
||||
|
@ -236,7 +236,7 @@ func TestParseUpstreamConfig(t *testing.T) {
|
|||
input: map[string]interface{}{
|
||||
"connect_timeout_ms": 1000,
|
||||
},
|
||||
want: UpstreamConfig{
|
||||
want: structs.UpstreamConfig{
|
||||
ConnectTimeoutMs: 1000,
|
||||
Protocol: "tcp",
|
||||
},
|
||||
|
@ -250,10 +250,10 @@ func TestParseUpstreamConfig(t *testing.T) {
|
|||
"max_concurrent_requests": 70,
|
||||
},
|
||||
},
|
||||
want: UpstreamConfig{
|
||||
want: structs.UpstreamConfig{
|
||||
ConnectTimeoutMs: 5000,
|
||||
Protocol: "tcp",
|
||||
Limits: UpstreamLimits{
|
||||
Limits: structs.UpstreamLimits{
|
||||
MaxConnections: intPointer(50),
|
||||
MaxPendingRequests: intPointer(60),
|
||||
MaxConcurrentRequests: intPointer(70),
|
||||
|
@ -269,10 +269,10 @@ func TestParseUpstreamConfig(t *testing.T) {
|
|||
"max_concurrent_requests": 0,
|
||||
},
|
||||
},
|
||||
want: UpstreamConfig{
|
||||
want: structs.UpstreamConfig{
|
||||
ConnectTimeoutMs: 5000,
|
||||
Protocol: "tcp",
|
||||
Limits: UpstreamLimits{
|
||||
Limits: structs.UpstreamLimits{
|
||||
MaxConnections: intPointer(0),
|
||||
MaxPendingRequests: intPointer(0),
|
||||
MaxConcurrentRequests: intPointer(0),
|
||||
|
@ -287,10 +287,10 @@ func TestParseUpstreamConfig(t *testing.T) {
|
|||
"max_failures": 7,
|
||||
},
|
||||
},
|
||||
want: UpstreamConfig{
|
||||
want: structs.UpstreamConfig{
|
||||
ConnectTimeoutMs: 5000,
|
||||
Protocol: "tcp",
|
||||
PassiveHealthCheck: PassiveHealthCheck{
|
||||
PassiveHealthCheck: structs.PassiveHealthCheck{
|
||||
Interval: 22 * time.Second,
|
||||
MaxFailures: 7,
|
||||
},
|
||||
|
|
|
@ -1071,9 +1071,9 @@ func (s *Server) makeUpstreamListenerForDiscoveryChain(
|
|||
return l, nil
|
||||
}
|
||||
|
||||
func getAndModifyUpstreamConfigForListener(logger hclog.Logger, u *structs.Upstream, chain *structs.CompiledDiscoveryChain) UpstreamConfig {
|
||||
func getAndModifyUpstreamConfigForListener(logger hclog.Logger, u *structs.Upstream, chain *structs.CompiledDiscoveryChain) structs.UpstreamConfig {
|
||||
var (
|
||||
cfg UpstreamConfig
|
||||
cfg structs.UpstreamConfig
|
||||
err error
|
||||
)
|
||||
|
||||
|
|
|
@ -91,15 +91,91 @@ type ExposePath struct {
|
|||
ParsedFromCheck bool
|
||||
}
|
||||
|
||||
type ConnectConfiguration struct {
|
||||
// UpstreamConfigs is a map of <namespace/>service to per-upstream configuration
|
||||
UpstreamConfigs map[string]UpstreamConfig `json:",omitempty" alias:"upstream_configs"`
|
||||
|
||||
// UpstreamDefaults contains default configuration for all upstreams of a given service
|
||||
UpstreamDefaults UpstreamConfig `json:",omitempty" alias:"upstream_defaults"`
|
||||
}
|
||||
|
||||
type UpstreamConfig struct {
|
||||
// ListenerJSON is a complete override ("escape hatch") for the upstream's
|
||||
// listener.
|
||||
//
|
||||
// Note: This escape hatch is NOT compatible with the discovery chain and
|
||||
// will be ignored if a discovery chain is active.
|
||||
ListenerJSON string `json:",omitempty" alias:"listener_json"`
|
||||
|
||||
// ClusterJSON is a complete override ("escape hatch") for the upstream's
|
||||
// cluster. The Connect client TLS certificate and context will be injected
|
||||
// overriding any TLS settings present.
|
||||
//
|
||||
// Note: This escape hatch is NOT compatible with the discovery chain and
|
||||
// will be ignored if a discovery chain is active.
|
||||
ClusterJSON string `alias:"cluster_json"`
|
||||
|
||||
// Protocol describes the upstream's service protocol. Valid values are "tcp",
|
||||
// "http" and "grpc". Anything else is treated as tcp. The enables protocol
|
||||
// aware features like per-request metrics and connection pooling, tracing,
|
||||
// routing etc.
|
||||
Protocol string
|
||||
|
||||
// ConnectTimeoutMs is the number of milliseconds to timeout making a new
|
||||
// connection to this upstream. Defaults to 5000 (5 seconds) if not set.
|
||||
ConnectTimeoutMs int `alias:"connect_timeout_ms"`
|
||||
|
||||
// Limits are the set of limits that are applied to the proxy for a specific upstream of a
|
||||
// service instance.
|
||||
Limits UpstreamLimits
|
||||
|
||||
// PassiveHealthCheck configuration determines how upstream proxy instances will
|
||||
// be monitored for removal from the load balancing pool.
|
||||
PassiveHealthCheck PassiveHealthCheck `json:",omitempty" alias:"passive_health_check"`
|
||||
|
||||
// MeshGatewayConfig controls how Mesh Gateways are configured and used
|
||||
MeshGateway MeshGatewayConfig `json:",omitempty" alias:"mesh_gateway" `
|
||||
}
|
||||
|
||||
type PassiveHealthCheck struct {
|
||||
// Interval between health check analysis sweeps. Each sweep may remove
|
||||
// hosts or return hosts to the pool.
|
||||
Interval time.Duration
|
||||
|
||||
// MaxFailures is the count of consecutive failures that results in a host
|
||||
// being removed from the pool.
|
||||
MaxFailures uint32 `alias:"max_failures"`
|
||||
}
|
||||
|
||||
// UpstreamLimits describes the limits that are associated with a specific
|
||||
// upstream of a service instance.
|
||||
type UpstreamLimits struct {
|
||||
// MaxConnections is the maximum number of connections the local proxy can
|
||||
// make to the upstream service.
|
||||
MaxConnections int `alias:"max_connections"`
|
||||
|
||||
// MaxPendingRequests is the maximum number of requests that will be queued
|
||||
// waiting for an available connection. This is mostly applicable to HTTP/1.1
|
||||
// clusters since all HTTP/2 requests are streamed over a single
|
||||
// connection.
|
||||
MaxPendingRequests int `alias:"max_pending_requests"`
|
||||
|
||||
// MaxConcurrentRequests is the maximum number of in-flight requests that will be allowed
|
||||
// to the upstream cluster at a point in time. This is mostly applicable to HTTP/2
|
||||
// clusters since all HTTP/1.1 requests are limited by MaxConnections.
|
||||
MaxConcurrentRequests int `alias:"max_concurrent_requests"`
|
||||
}
|
||||
|
||||
type ServiceConfigEntry struct {
|
||||
Kind string
|
||||
Name string
|
||||
Namespace string `json:",omitempty"`
|
||||
Protocol string `json:",omitempty"`
|
||||
MeshGateway MeshGatewayConfig `json:",omitempty" alias:"mesh_gateway"`
|
||||
Expose ExposeConfig `json:",omitempty"`
|
||||
ExternalSNI string `json:",omitempty" alias:"external_sni"`
|
||||
Meta map[string]string `json:",omitempty"`
|
||||
Namespace string `json:",omitempty"`
|
||||
Protocol string `json:",omitempty"`
|
||||
MeshGateway MeshGatewayConfig `json:",omitempty" alias:"mesh_gateway"`
|
||||
Connect ConnectConfiguration `json:",omitempty"`
|
||||
Expose ExposeConfig `json:",omitempty"`
|
||||
ExternalSNI string `json:",omitempty" alias:"external_sni"`
|
||||
Meta map[string]string `json:",omitempty"`
|
||||
CreateIndex uint64
|
||||
ModifyIndex uint64
|
||||
}
|
||||
|
|
|
@ -332,6 +332,36 @@ func TestDecodeConfigEntry(t *testing.T) {
|
|||
"ExternalSNI": "abc-123",
|
||||
"MeshGateway": {
|
||||
"Mode": "remote"
|
||||
},
|
||||
"Connect": {
|
||||
"UpstreamConfigs": {
|
||||
"redis": {
|
||||
"PassiveHealthCheck": {
|
||||
"MaxFailures": 3,
|
||||
"Interval": "2s"
|
||||
}
|
||||
},
|
||||
"finance/billing": {
|
||||
"MeshGateway": {
|
||||
"Mode": "remote"
|
||||
}
|
||||
}
|
||||
},
|
||||
"UpstreamDefaults": {
|
||||
"ClusterJSON": "zip",
|
||||
"ListenerJSON": "zop",
|
||||
"ConnectTimeoutMs": 5000,
|
||||
"Protocol": "http",
|
||||
"Limits": {
|
||||
"MaxConnections": 3,
|
||||
"MaxPendingRequests": 4,
|
||||
"MaxConcurrentRequests": 5
|
||||
},
|
||||
"PassiveHealthCheck": {
|
||||
"MaxFailures": 5,
|
||||
"Interval": "4s"
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
`,
|
||||
|
@ -347,6 +377,34 @@ func TestDecodeConfigEntry(t *testing.T) {
|
|||
MeshGateway: MeshGatewayConfig{
|
||||
Mode: MeshGatewayModeRemote,
|
||||
},
|
||||
Connect: ConnectConfiguration{
|
||||
UpstreamConfigs: map[string]UpstreamConfig{
|
||||
"redis": {
|
||||
PassiveHealthCheck: PassiveHealthCheck{
|
||||
MaxFailures: 3,
|
||||
Interval: 2 * time.Second,
|
||||
},
|
||||
},
|
||||
"finance/billing": {
|
||||
MeshGateway: MeshGatewayConfig{Mode: "remote"},
|
||||
},
|
||||
},
|
||||
UpstreamDefaults: UpstreamConfig{
|
||||
ClusterJSON: "zip",
|
||||
ListenerJSON: "zop",
|
||||
Protocol: "http",
|
||||
ConnectTimeoutMs: 5000,
|
||||
Limits: UpstreamLimits{
|
||||
MaxConnections: 3,
|
||||
MaxPendingRequests: 4,
|
||||
MaxConcurrentRequests: 5,
|
||||
},
|
||||
PassiveHealthCheck: PassiveHealthCheck{
|
||||
MaxFailures: 5,
|
||||
Interval: 4 * time.Second,
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
{
|
||||
|
|
|
@ -423,7 +423,7 @@ func TestParseConfigEntry(t *testing.T) {
|
|||
},
|
||||
},
|
||||
{
|
||||
name: "service-defaults",
|
||||
name: "service-defaults: kitchen sink",
|
||||
snake: `
|
||||
kind = "service-defaults"
|
||||
name = "main"
|
||||
|
@ -436,6 +436,36 @@ func TestParseConfigEntry(t *testing.T) {
|
|||
mesh_gateway {
|
||||
mode = "remote"
|
||||
}
|
||||
connect {
|
||||
upstream_configs {
|
||||
"redis" {
|
||||
passive_health_check {
|
||||
max_failures = 3
|
||||
interval = "2s"
|
||||
}
|
||||
}
|
||||
"finance/billing" {
|
||||
mesh_gateway {
|
||||
mode = "remote"
|
||||
}
|
||||
}
|
||||
}
|
||||
upstream_defaults {
|
||||
cluster_json = "zip"
|
||||
listener_json = "zop"
|
||||
connect_timeout_ms = 5000
|
||||
protocol = "http"
|
||||
limits {
|
||||
max_connections = 3
|
||||
max_pending_requests = 4
|
||||
max_concurrent_requests = 5
|
||||
}
|
||||
passive_health_check {
|
||||
max_failures = 5
|
||||
interval = "4s"
|
||||
}
|
||||
}
|
||||
}
|
||||
`,
|
||||
camel: `
|
||||
Kind = "service-defaults"
|
||||
|
@ -449,6 +479,36 @@ func TestParseConfigEntry(t *testing.T) {
|
|||
MeshGateway {
|
||||
Mode = "remote"
|
||||
}
|
||||
connect = {
|
||||
upstream_configs = {
|
||||
"redis" = {
|
||||
passive_health_check = {
|
||||
max_failures = 3
|
||||
interval = "2s"
|
||||
}
|
||||
}
|
||||
"finance/billing" = {
|
||||
mesh_gateway = {
|
||||
mode = "remote"
|
||||
}
|
||||
}
|
||||
}
|
||||
upstream_defaults = {
|
||||
cluster_json = "zip"
|
||||
listener_json = "zop"
|
||||
connect_timeout_ms = 5000
|
||||
protocol = "http"
|
||||
limits = {
|
||||
max_connections = 3
|
||||
max_pending_requests = 4
|
||||
max_concurrent_requests = 5
|
||||
}
|
||||
passive_health_check = {
|
||||
max_failures = 5
|
||||
interval = "4s"
|
||||
}
|
||||
}
|
||||
}
|
||||
`,
|
||||
snakeJSON: `
|
||||
{
|
||||
|
@ -462,6 +522,36 @@ func TestParseConfigEntry(t *testing.T) {
|
|||
"external_sni": "abc-123",
|
||||
"mesh_gateway": {
|
||||
"mode": "remote"
|
||||
},
|
||||
"connect": {
|
||||
"upstream_configs": {
|
||||
"redis": {
|
||||
"passive_health_check": {
|
||||
"max_failures": 3,
|
||||
"interval": "2s"
|
||||
}
|
||||
},
|
||||
"finance/billing": {
|
||||
"mesh_gateway": {
|
||||
"mode": "remote"
|
||||
}
|
||||
}
|
||||
},
|
||||
"upstream_defaults": {
|
||||
"cluster_json": "zip",
|
||||
"listener_json": "zop",
|
||||
"connect_timeout_ms": 5000,
|
||||
"protocol": "http",
|
||||
"limits": {
|
||||
"max_connections": 3,
|
||||
"max_pending_requests": 4,
|
||||
"max_concurrent_requests": 5
|
||||
},
|
||||
"passive_health_check": {
|
||||
"max_failures": 5,
|
||||
"interval": "4s"
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
`,
|
||||
|
@ -477,6 +567,36 @@ func TestParseConfigEntry(t *testing.T) {
|
|||
"ExternalSNI": "abc-123",
|
||||
"MeshGateway": {
|
||||
"Mode": "remote"
|
||||
},
|
||||
"Connect": {
|
||||
"UpstreamConfigs": {
|
||||
"redis": {
|
||||
"PassiveHealthCheck": {
|
||||
"MaxFailures": 3,
|
||||
"Interval": "2s"
|
||||
}
|
||||
},
|
||||
"finance/billing": {
|
||||
"MeshGateway": {
|
||||
"Mode": "remote"
|
||||
}
|
||||
}
|
||||
},
|
||||
"UpstreamDefaults": {
|
||||
"ClusterJSON": "zip",
|
||||
"ListenerJSON": "zop",
|
||||
"ConnectTimeoutMs": 5000,
|
||||
"Protocol": "http",
|
||||
"Limits": {
|
||||
"MaxConnections": 3,
|
||||
"MaxPendingRequests": 4,
|
||||
"MaxConcurrentRequests": 5
|
||||
},
|
||||
"PassiveHealthCheck": {
|
||||
"MaxFailures": 5,
|
||||
"Interval": "4s"
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
`,
|
||||
|
@ -492,6 +612,36 @@ func TestParseConfigEntry(t *testing.T) {
|
|||
MeshGateway: api.MeshGatewayConfig{
|
||||
Mode: api.MeshGatewayModeRemote,
|
||||
},
|
||||
Connect: api.ConnectConfiguration{
|
||||
UpstreamConfigs: map[string]api.UpstreamConfig{
|
||||
"redis": {
|
||||
PassiveHealthCheck: api.PassiveHealthCheck{
|
||||
MaxFailures: 3,
|
||||
Interval: 2 * time.Second,
|
||||
},
|
||||
},
|
||||
"finance/billing": {
|
||||
MeshGateway: api.MeshGatewayConfig{
|
||||
Mode: "remote",
|
||||
},
|
||||
},
|
||||
},
|
||||
UpstreamDefaults: api.UpstreamConfig{
|
||||
ClusterJSON: "zip",
|
||||
ListenerJSON: "zop",
|
||||
Protocol: "http",
|
||||
ConnectTimeoutMs: 5000,
|
||||
Limits: api.UpstreamLimits{
|
||||
MaxConnections: 3,
|
||||
MaxPendingRequests: 4,
|
||||
MaxConcurrentRequests: 5,
|
||||
},
|
||||
PassiveHealthCheck: api.PassiveHealthCheck{
|
||||
MaxFailures: 5,
|
||||
Interval: 4 * time.Second,
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
{
|
||||
|
|
Loading…
Reference in New Issue