Add PrioritizeByLocality to config entries. (#17007)

This commit adds the PrioritizeByLocality field to both proxy-config
and service-resolver config entries for locality-aware routing. The
field is currently intended for enterprise only, and will be used to
enable prioritization of service-mesh connections to services based
on geographical region / zone.
This commit is contained in:
Derek Menteer 2023-04-14 15:42:54 -05:00 committed by GitHub
parent 7db438d114
commit 7ce928a42e
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
14 changed files with 1571 additions and 1377 deletions

View file

@ -382,7 +382,6 @@ func (c *compiler) determineIfDefaultChain() bool {
} }
target := c.loadedTargets[node.Resolver.Target] target := c.loadedTargets[node.Resolver.Target]
return target.Service == c.serviceName && target.Namespace == c.evaluateInNamespace && target.Partition == c.evaluateInPartition return target.Service == c.serviceName && target.Namespace == c.evaluateInNamespace && target.Partition == c.evaluateInPartition
} }
@ -996,14 +995,21 @@ RESOLVE_AGAIN:
Type: structs.DiscoveryGraphNodeTypeResolver, Type: structs.DiscoveryGraphNodeTypeResolver,
Name: target.ID, Name: target.ID,
Resolver: &structs.DiscoveryResolver{ Resolver: &structs.DiscoveryResolver{
Default: resolver.IsDefault(), Default: resolver.IsDefault(),
Target: target.ID, Target: target.ID,
ConnectTimeout: connectTimeout, ConnectTimeout: connectTimeout,
RequestTimeout: resolver.RequestTimeout, RequestTimeout: resolver.RequestTimeout,
PrioritizeByLocality: resolver.PrioritizeByLocality.ToDiscovery(),
}, },
LoadBalancer: resolver.LoadBalancer, LoadBalancer: resolver.LoadBalancer,
} }
// Merge default values from the proxy defaults
proxyDefault := c.entries.GetProxyDefaults(targetID.PartitionOrDefault())
if proxyDefault != nil && node.Resolver.PrioritizeByLocality == nil {
node.Resolver.PrioritizeByLocality = proxyDefault.PrioritizeByLocality.ToDiscovery()
}
target.Subset = resolver.Subsets[target.ServiceSubset] target.Subset = resolver.Subsets[target.ServiceSubset]
if serviceDefault := c.entries.GetService(targetID); serviceDefault != nil && serviceDefault.ExternalSNI != "" { if serviceDefault := c.entries.GetService(targetID); serviceDefault != nil && serviceDefault.ExternalSNI != "" {
@ -1089,7 +1095,7 @@ RESOLVE_AGAIN:
// Determine which failover definitions apply. // Determine which failover definitions apply.
var failoverTargets []*structs.DiscoveryTarget var failoverTargets []*structs.DiscoveryTarget
var failoverPolicy *structs.ServiceResolverFailoverPolicy var failoverPolicy *structs.ServiceResolverFailoverPolicy
proxyDefault := c.entries.GetProxyDefaults(targetID.PartitionOrDefault())
if proxyDefault != nil { if proxyDefault != nil {
failoverPolicy = proxyDefault.FailoverPolicy failoverPolicy = proxyDefault.FailoverPolicy
} }

View file

@ -367,16 +367,17 @@ func IsIP(address string) bool {
// ProxyConfigEntry is the top-level struct for global proxy configuration defaults. // ProxyConfigEntry is the top-level struct for global proxy configuration defaults.
type ProxyConfigEntry struct { type ProxyConfigEntry struct {
Kind string Kind string
Name string Name string
Config map[string]interface{} Config map[string]interface{}
Mode ProxyMode `json:",omitempty"` Mode ProxyMode `json:",omitempty"`
TransparentProxy TransparentProxyConfig `json:",omitempty" alias:"transparent_proxy"` TransparentProxy TransparentProxyConfig `json:",omitempty" alias:"transparent_proxy"`
MeshGateway MeshGatewayConfig `json:",omitempty" alias:"mesh_gateway"` MeshGateway MeshGatewayConfig `json:",omitempty" alias:"mesh_gateway"`
Expose ExposeConfig `json:",omitempty"` Expose ExposeConfig `json:",omitempty"`
AccessLogs AccessLogsConfig `json:",omitempty" alias:"access_logs"` AccessLogs AccessLogsConfig `json:",omitempty" alias:"access_logs"`
EnvoyExtensions EnvoyExtensions `json:",omitempty" alias:"envoy_extensions"` EnvoyExtensions EnvoyExtensions `json:",omitempty" alias:"envoy_extensions"`
FailoverPolicy *ServiceResolverFailoverPolicy `json:",omitempty" alias:"failover_policy"` FailoverPolicy *ServiceResolverFailoverPolicy `json:",omitempty" alias:"failover_policy"`
PrioritizeByLocality *ServiceResolverPrioritizeByLocality `json:",omitempty" alias:"prioritize_by_locality"`
Meta map[string]string `json:",omitempty"` Meta map[string]string `json:",omitempty"`
acl.EnterpriseMeta `hcl:",squash" mapstructure:",squash"` acl.EnterpriseMeta `hcl:",squash" mapstructure:",squash"`
@ -443,8 +444,12 @@ func (e *ProxyConfigEntry) Validate() error {
return err return err
} }
if !e.FailoverPolicy.isValid() { if err := e.FailoverPolicy.validate(); err != nil {
return fmt.Errorf("Failover policy must be one of '', 'default', or 'order-by-locality'") return err
}
if err := e.PrioritizeByLocality.validate(); err != nil {
return err
} }
return e.validateEnterpriseMeta() return e.validateEnterpriseMeta()

View file

@ -857,6 +857,10 @@ type ServiceResolverConfigEntry struct {
// specified here. // specified here.
Failover map[string]ServiceResolverFailover `json:",omitempty"` Failover map[string]ServiceResolverFailover `json:",omitempty"`
// PrioritizeByLocality controls whether the locality of services within the
// local partition will be used to prioritize connectivity.
PrioritizeByLocality *ServiceResolverPrioritizeByLocality `json:",omitempty" alias:"prioritize_by_locality"`
// ConnectTimeout is the timeout for establishing new network connections // ConnectTimeout is the timeout for establishing new network connections
// to this service. // to this service.
ConnectTimeout time.Duration `json:",omitempty" alias:"connect_timeout"` ConnectTimeout time.Duration `json:",omitempty" alias:"connect_timeout"`
@ -960,7 +964,8 @@ func (e *ServiceResolverConfigEntry) IsDefault() bool {
len(e.Failover) == 0 && len(e.Failover) == 0 &&
e.ConnectTimeout == 0 && e.ConnectTimeout == 0 &&
e.RequestTimeout == 0 && e.RequestTimeout == 0 &&
e.LoadBalancer == nil e.LoadBalancer == nil &&
e.PrioritizeByLocality == nil
} }
func (e *ServiceResolverConfigEntry) GetKind() string { func (e *ServiceResolverConfigEntry) GetKind() string {
@ -1031,6 +1036,10 @@ func (e *ServiceResolverConfigEntry) Validate() error {
return fmt.Errorf("DefaultSubset %q is not a valid subset", e.DefaultSubset) return fmt.Errorf("DefaultSubset %q is not a valid subset", e.DefaultSubset)
} }
if err := e.PrioritizeByLocality.validate(); err != nil {
return err
}
if e.Redirect != nil { if e.Redirect != nil {
if !e.InDefaultPartition() && e.Redirect.Datacenter != "" { if !e.InDefaultPartition() && e.Redirect.Datacenter != "" {
return fmt.Errorf("Cross-datacenter redirect is only supported in the default partition") return fmt.Errorf("Cross-datacenter redirect is only supported in the default partition")
@ -1113,8 +1122,8 @@ func (e *ServiceResolverConfigEntry) Validate() error {
return fmt.Errorf("Bad Failover[%q]: %s", subset, err) return fmt.Errorf("Bad Failover[%q]: %s", subset, err)
} }
if !f.Policy.isValid() { if err := f.Policy.validate(); err != nil {
return fmt.Errorf("Bad Failover[%q]: Policy must be one of '', 'sequential', or 'order-by-locality'", subset) return fmt.Errorf("Bad Failover[%q]: %w", subset, err)
} }
if f.ServiceSubset != "" { if f.ServiceSubset != "" {
@ -1445,13 +1454,6 @@ type ServiceResolverFailover struct {
SamenessGroup string `json:",omitempty"` SamenessGroup string `json:",omitempty"`
} }
type ServiceResolverFailoverPolicy struct {
// Mode specifies the type of failover that will be performed. Valid values are
// "sequential", "" (equivalent to "sequential") and "order-by-locality".
Mode string `json:",omitempty"`
Regions []string `json:",omitempty"`
}
func (f *ServiceResolverFailover) ToDiscoveryTargetOpts() DiscoveryTargetOpts { func (f *ServiceResolverFailover) ToDiscoveryTargetOpts() DiscoveryTargetOpts {
return DiscoveryTargetOpts{ return DiscoveryTargetOpts{
Service: f.Service, Service: f.Service,
@ -1469,9 +1471,16 @@ func (f *ServiceResolverFailover) isEmpty() bool {
f.SamenessGroup == "" f.SamenessGroup == ""
} }
func (fp *ServiceResolverFailoverPolicy) isValid() bool { type ServiceResolverFailoverPolicy struct {
// Mode specifies the type of failover that will be performed. Valid values are
// "sequential", "" (equivalent to "sequential") and "order-by-locality".
Mode string `json:",omitempty"`
Regions []string `json:",omitempty"`
}
func (fp *ServiceResolverFailoverPolicy) validate() error {
if fp == nil { if fp == nil {
return true return nil
} }
switch fp.Mode { switch fp.Mode {
@ -1479,10 +1488,16 @@ func (fp *ServiceResolverFailoverPolicy) isValid() bool {
case "sequential": case "sequential":
case "order-by-locality": case "order-by-locality":
default: default:
return false return fmt.Errorf("Failover-policy mode must be one of '', 'sequential', or 'order-by-locality'")
} }
return nil
}
return true type ServiceResolverPrioritizeByLocality struct {
// Mode specifies the type of prioritization that will be performed
// when selecting nodes in the local partition.
// Valid values are: "" (default "none"), "none", and "failover".
Mode string `json:",omitempty"`
} }
type ServiceResolverFailoverTarget struct { type ServiceResolverFailoverTarget struct {

View file

@ -118,3 +118,11 @@ func (f *ServiceResolverFailoverPolicy) ValidateEnterprise() error {
func (e *ServiceResolverConfigEntry) RelatedSamenessGroups() []string { func (e *ServiceResolverConfigEntry) RelatedSamenessGroups() []string {
return nil return nil
} }
func (pbl *ServiceResolverPrioritizeByLocality) validate() error {
var zero ServiceResolverPrioritizeByLocality
if pbl == nil || *pbl == zero {
return nil
}
return fmt.Errorf("Prioritize-by-locality requires Consul Enterprise ")
}

View file

@ -3174,7 +3174,7 @@ func TestProxyConfigEntry(t *testing.T) {
Name: "global", Name: "global",
FailoverPolicy: &ServiceResolverFailoverPolicy{Mode: "bad"}, FailoverPolicy: &ServiceResolverFailoverPolicy{Mode: "bad"},
}, },
validateErr: `Failover policy must be one of '', 'default', or 'order-by-locality'`, validateErr: `Failover-policy mode must be one of '', 'sequential', or 'order-by-locality'`,
}, },
"proxy config with valid failover policy": { "proxy config with valid failover policy": {
entry: &ProxyConfigEntry{ entry: &ProxyConfigEntry{

View file

@ -117,11 +117,12 @@ func (s *DiscoveryGraphNode) MapKey() string {
// compiled form of ServiceResolverConfigEntry // compiled form of ServiceResolverConfigEntry
type DiscoveryResolver struct { type DiscoveryResolver struct {
Default bool `json:",omitempty"` Default bool `json:",omitempty"`
ConnectTimeout time.Duration `json:",omitempty"` ConnectTimeout time.Duration `json:",omitempty"`
RequestTimeout time.Duration `json:",omitempty"` RequestTimeout time.Duration `json:",omitempty"`
Target string `json:",omitempty"` Target string `json:",omitempty"`
Failover *DiscoveryFailover `json:",omitempty"` Failover *DiscoveryFailover `json:",omitempty"`
PrioritizeByLocality *DiscoveryPrioritizeByLocality `json:",omitempty"`
} }
func (r *DiscoveryResolver) MarshalJSON() ([]byte, error) { func (r *DiscoveryResolver) MarshalJSON() ([]byte, error) {
@ -186,6 +187,20 @@ type DiscoveryFailover struct {
Regions []string `json:",omitempty"` Regions []string `json:",omitempty"`
} }
// compiled form of ServiceResolverPrioritizeByLocality
type DiscoveryPrioritizeByLocality struct {
Mode string `json:",omitempty"`
}
func (pbl *ServiceResolverPrioritizeByLocality) ToDiscovery() *DiscoveryPrioritizeByLocality {
if pbl == nil {
return nil
}
return &DiscoveryPrioritizeByLocality{
Mode: pbl.Mode,
}
}
// DiscoveryTarget represents all of the inputs necessary to use a resolver // DiscoveryTarget represents all of the inputs necessary to use a resolver
// config entry to execute a catalog query to generate a list of service // config entry to execute a catalog query to generate a list of service
// instances during discovery. // instances during discovery.

View file

@ -232,6 +232,10 @@ func (o *DiscoveryResolver) DeepCopy() *DiscoveryResolver {
if o.Failover != nil { if o.Failover != nil {
cp.Failover = o.Failover.DeepCopy() cp.Failover = o.Failover.DeepCopy()
} }
if o.PrioritizeByLocality != nil {
cp.PrioritizeByLocality = new(DiscoveryPrioritizeByLocality)
*cp.PrioritizeByLocality = *o.PrioritizeByLocality
}
return &cp return &cp
} }
@ -909,6 +913,10 @@ func (o *ServiceResolverConfigEntry) DeepCopy() *ServiceResolverConfigEntry {
cp.Failover[k2] = cp_Failover_v2 cp.Failover[k2] = cp_Failover_v2
} }
} }
if o.PrioritizeByLocality != nil {
cp.PrioritizeByLocality = new(ServiceResolverPrioritizeByLocality)
*cp.PrioritizeByLocality = *o.PrioritizeByLocality
}
if o.LoadBalancer != nil { if o.LoadBalancer != nil {
cp.LoadBalancer = o.LoadBalancer.DeepCopy() cp.LoadBalancer = o.LoadBalancer.DeepCopy()
} }

View file

@ -314,18 +314,19 @@ func (s *ServiceConfigEntry) GetCreateIndex() uint64 { return s.CreateIndex
func (s *ServiceConfigEntry) GetModifyIndex() uint64 { return s.ModifyIndex } func (s *ServiceConfigEntry) GetModifyIndex() uint64 { return s.ModifyIndex }
type ProxyConfigEntry struct { type ProxyConfigEntry struct {
Kind string Kind string
Name string Name string
Partition string `json:",omitempty"` Partition string `json:",omitempty"`
Namespace string `json:",omitempty"` Namespace string `json:",omitempty"`
Mode ProxyMode `json:",omitempty"` Mode ProxyMode `json:",omitempty"`
TransparentProxy *TransparentProxyConfig `json:",omitempty" alias:"transparent_proxy"` TransparentProxy *TransparentProxyConfig `json:",omitempty" alias:"transparent_proxy"`
Config map[string]interface{} `json:",omitempty"` Config map[string]interface{} `json:",omitempty"`
MeshGateway MeshGatewayConfig `json:",omitempty" alias:"mesh_gateway"` MeshGateway MeshGatewayConfig `json:",omitempty" alias:"mesh_gateway"`
Expose ExposeConfig `json:",omitempty"` Expose ExposeConfig `json:",omitempty"`
AccessLogs *AccessLogsConfig `json:",omitempty" alias:"access_logs"` AccessLogs *AccessLogsConfig `json:",omitempty" alias:"access_logs"`
EnvoyExtensions []EnvoyExtension `json:",omitempty" alias:"envoy_extensions"` EnvoyExtensions []EnvoyExtension `json:",omitempty" alias:"envoy_extensions"`
FailoverPolicy *ServiceResolverFailoverPolicy `json:",omitempty" alias:"failover_policy"` FailoverPolicy *ServiceResolverFailoverPolicy `json:",omitempty" alias:"failover_policy"`
PrioritizeByLocality *ServiceResolverPrioritizeByLocality `json:",omitempty" alias:"prioritize_by_locality"`
Meta map[string]string `json:",omitempty"` Meta map[string]string `json:",omitempty"`
CreateIndex uint64 CreateIndex uint64

View file

@ -172,6 +172,10 @@ type ServiceResolverConfigEntry struct {
ConnectTimeout time.Duration `json:",omitempty" alias:"connect_timeout"` ConnectTimeout time.Duration `json:",omitempty" alias:"connect_timeout"`
RequestTimeout time.Duration `json:",omitempty" alias:"request_timeout"` RequestTimeout time.Duration `json:",omitempty" alias:"request_timeout"`
// PrioritizeByLocality controls whether the locality of services within the
// local partition will be used to prioritize connectivity.
PrioritizeByLocality *ServiceResolverPrioritizeByLocality `json:",omitempty" alias:"prioritize_by_locality"`
// LoadBalancer determines the load balancing policy and configuration for services // LoadBalancer determines the load balancing policy and configuration for services
// issuing requests to this upstream service. // issuing requests to this upstream service.
LoadBalancer *LoadBalancer `json:",omitempty" alias:"load_balancer"` LoadBalancer *LoadBalancer `json:",omitempty" alias:"load_balancer"`
@ -267,6 +271,13 @@ type ServiceResolverFailoverPolicy struct {
Regions []string `json:",omitempty"` Regions []string `json:",omitempty"`
} }
type ServiceResolverPrioritizeByLocality struct {
// Mode specifies the type of prioritization that will be performed
// when selecting nodes in the local partition.
// Valid values are: "" (default "none"), "none", and "failover".
Mode string `json:",omitempty"`
}
// LoadBalancer determines the load balancing policy and configuration for services // LoadBalancer determines the load balancing policy and configuration for services
// issuing requests to this upstream service. // issuing requests to this upstream service.
type LoadBalancer struct { type LoadBalancer struct {

View file

@ -1394,6 +1394,11 @@ func ServiceResolverToStructs(s *ServiceResolver, t *structs.ServiceResolverConf
t.Failover[k] = y t.Failover[k] = y
} }
} }
if s.PrioritizeByLocality != nil {
var x structs.ServiceResolverPrioritizeByLocality
ServiceResolverPrioritizeByLocalityToStructs(s.PrioritizeByLocality, &x)
t.PrioritizeByLocality = &x
}
t.ConnectTimeout = structs.DurationFromProto(s.ConnectTimeout) t.ConnectTimeout = structs.DurationFromProto(s.ConnectTimeout)
t.RequestTimeout = structs.DurationFromProto(s.RequestTimeout) t.RequestTimeout = structs.DurationFromProto(s.RequestTimeout)
if s.LoadBalancer != nil { if s.LoadBalancer != nil {
@ -1437,6 +1442,11 @@ func ServiceResolverFromStructs(t *structs.ServiceResolverConfigEntry, s *Servic
s.Failover[k] = y s.Failover[k] = y
} }
} }
if t.PrioritizeByLocality != nil {
var x ServiceResolverPrioritizeByLocality
ServiceResolverPrioritizeByLocalityFromStructs(t.PrioritizeByLocality, &x)
s.PrioritizeByLocality = &x
}
s.ConnectTimeout = structs.DurationToProto(t.ConnectTimeout) s.ConnectTimeout = structs.DurationToProto(t.ConnectTimeout)
s.RequestTimeout = structs.DurationToProto(t.RequestTimeout) s.RequestTimeout = structs.DurationToProto(t.RequestTimeout)
if t.LoadBalancer != nil { if t.LoadBalancer != nil {
@ -1530,6 +1540,18 @@ func ServiceResolverFailoverTargetFromStructs(t *structs.ServiceResolverFailover
s.Datacenter = t.Datacenter s.Datacenter = t.Datacenter
s.Peer = t.Peer s.Peer = t.Peer
} }
func ServiceResolverPrioritizeByLocalityToStructs(s *ServiceResolverPrioritizeByLocality, t *structs.ServiceResolverPrioritizeByLocality) {
if s == nil {
return
}
t.Mode = s.Mode
}
func ServiceResolverPrioritizeByLocalityFromStructs(t *structs.ServiceResolverPrioritizeByLocality, s *ServiceResolverPrioritizeByLocality) {
if s == nil {
return
}
s.Mode = t.Mode
}
func ServiceResolverRedirectToStructs(s *ServiceResolverRedirect, t *structs.ServiceResolverRedirect) { func ServiceResolverRedirectToStructs(s *ServiceResolverRedirect, t *structs.ServiceResolverRedirect) {
if s == nil { if s == nil {
return return

View file

@ -127,6 +127,16 @@ func (msg *ServiceResolverFailoverPolicy) UnmarshalBinary(b []byte) error {
return proto.Unmarshal(b, msg) return proto.Unmarshal(b, msg)
} }
// MarshalBinary implements encoding.BinaryMarshaler
func (msg *ServiceResolverPrioritizeByLocality) MarshalBinary() ([]byte, error) {
return proto.Marshal(msg)
}
// UnmarshalBinary implements encoding.BinaryUnmarshaler
func (msg *ServiceResolverPrioritizeByLocality) UnmarshalBinary(b []byte) error {
return proto.Unmarshal(b, msg)
}
// MarshalBinary implements encoding.BinaryMarshaler // MarshalBinary implements encoding.BinaryMarshaler
func (msg *ServiceResolverFailoverTarget) MarshalBinary() ([]byte, error) { func (msg *ServiceResolverFailoverTarget) MarshalBinary() ([]byte, error) {
return proto.Marshal(msg) return proto.Marshal(msg)

File diff suppressed because it is too large Load diff

View file

@ -128,6 +128,7 @@ message ServiceResolver {
map<string, string> Meta = 7; map<string, string> Meta = 7;
// mog: func-to=structs.DurationFromProto func-from=structs.DurationToProto // mog: func-to=structs.DurationFromProto func-from=structs.DurationToProto
google.protobuf.Duration RequestTimeout = 8; google.protobuf.Duration RequestTimeout = 8;
ServiceResolverPrioritizeByLocality PrioritizeByLocality = 9;
} }
// mog annotation: // mog annotation:
@ -180,6 +181,15 @@ message ServiceResolverFailoverPolicy {
repeated string Regions = 2; repeated string Regions = 2;
} }
// mog annotation:
//
// target=github.com/hashicorp/consul/agent/structs.ServiceResolverPrioritizeByLocality
// output=config_entry.gen.go
// name=Structs
message ServiceResolverPrioritizeByLocality {
string Mode = 1;
}
// mog annotation: // mog annotation:
// //
// target=github.com/hashicorp/consul/agent/structs.ServiceResolverFailoverTarget // target=github.com/hashicorp/consul/agent/structs.ServiceResolverFailoverTarget

View file

@ -19,15 +19,14 @@
package pbsubscribe package pbsubscribe
import ( import (
reflect "reflect"
sync "sync"
_ "github.com/hashicorp/consul/proto-public/annotations/ratelimit" _ "github.com/hashicorp/consul/proto-public/annotations/ratelimit"
pbcommon "github.com/hashicorp/consul/proto/private/pbcommon" pbcommon "github.com/hashicorp/consul/proto/private/pbcommon"
pbconfigentry "github.com/hashicorp/consul/proto/private/pbconfigentry" pbconfigentry "github.com/hashicorp/consul/proto/private/pbconfigentry"
pbservice "github.com/hashicorp/consul/proto/private/pbservice" pbservice "github.com/hashicorp/consul/proto/private/pbservice"
protoreflect "google.golang.org/protobuf/reflect/protoreflect" protoreflect "google.golang.org/protobuf/reflect/protoreflect"
protoimpl "google.golang.org/protobuf/runtime/protoimpl" protoimpl "google.golang.org/protobuf/runtime/protoimpl"
reflect "reflect"
sync "sync"
) )
const ( const (