375 lines
12 KiB
Go
375 lines
12 KiB
Go
package xds
|
|
|
|
import (
|
|
"encoding/json"
|
|
"errors"
|
|
"fmt"
|
|
"time"
|
|
|
|
envoy "github.com/envoyproxy/go-control-plane/envoy/api/v2"
|
|
envoyauth "github.com/envoyproxy/go-control-plane/envoy/api/v2/auth"
|
|
envoycluster "github.com/envoyproxy/go-control-plane/envoy/api/v2/cluster"
|
|
envoycore "github.com/envoyproxy/go-control-plane/envoy/api/v2/core"
|
|
envoyendpoint "github.com/envoyproxy/go-control-plane/envoy/api/v2/endpoint"
|
|
envoytype "github.com/envoyproxy/go-control-plane/envoy/type"
|
|
"github.com/gogo/protobuf/jsonpb"
|
|
"github.com/gogo/protobuf/proto"
|
|
"github.com/gogo/protobuf/types"
|
|
|
|
"github.com/hashicorp/consul/agent/proxycfg"
|
|
"github.com/hashicorp/consul/agent/structs"
|
|
)
|
|
|
|
// clustersFromSnapshot returns the xDS API representation of the "clusters" in the snapshot.
|
|
func (s *Server) clustersFromSnapshot(cfgSnap *proxycfg.ConfigSnapshot, token string) ([]proto.Message, error) {
|
|
if cfgSnap == nil {
|
|
return nil, errors.New("nil config given")
|
|
}
|
|
|
|
switch cfgSnap.Kind {
|
|
case structs.ServiceKindConnectProxy:
|
|
return s.clustersFromSnapshotConnectProxy(cfgSnap, token)
|
|
case structs.ServiceKindMeshGateway:
|
|
return s.clustersFromSnapshotMeshGateway(cfgSnap, token)
|
|
default:
|
|
return nil, fmt.Errorf("Invalid service kind: %v", cfgSnap.Kind)
|
|
}
|
|
}
|
|
|
|
// clustersFromSnapshot returns the xDS API representation of the "clusters"
|
|
// (upstreams) in the snapshot.
|
|
func (s *Server) clustersFromSnapshotConnectProxy(cfgSnap *proxycfg.ConfigSnapshot, token string) ([]proto.Message, error) {
|
|
// TODO(rb): this sizing is a low bound.
|
|
clusters := make([]proto.Message, 0, len(cfgSnap.Proxy.Upstreams)+1)
|
|
|
|
// Include the "app" cluster for the public listener
|
|
appCluster, err := s.makeAppCluster(cfgSnap)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
clusters = append(clusters, appCluster)
|
|
|
|
for _, u := range cfgSnap.Proxy.Upstreams {
|
|
id := u.Identifier()
|
|
var chain *structs.CompiledDiscoveryChain
|
|
if u.DestinationType != structs.UpstreamDestTypePreparedQuery {
|
|
chain = cfgSnap.ConnectProxy.DiscoveryChain[id]
|
|
}
|
|
|
|
if chain == nil {
|
|
// Either old-school upstream or prepared query.
|
|
upstreamCluster, err := s.makeUpstreamCluster(u, cfgSnap)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
clusters = append(clusters, upstreamCluster)
|
|
|
|
} else {
|
|
upstreamClusters, err := s.makeUpstreamClustersForDiscoveryChain(u, chain, cfgSnap)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
for _, cluster := range upstreamClusters {
|
|
clusters = append(clusters, cluster)
|
|
}
|
|
}
|
|
}
|
|
|
|
return clusters, nil
|
|
}
|
|
|
|
// clustersFromSnapshotMeshGateway returns the xDS API representation of the "clusters"
|
|
// for a mesh gateway. This will include 1 cluster per remote datacenter as well as
|
|
// 1 cluster for each service subset.
|
|
func (s *Server) clustersFromSnapshotMeshGateway(cfgSnap *proxycfg.ConfigSnapshot, token string) ([]proto.Message, error) {
|
|
// 1 cluster per remote dc + 1 cluster per local service (this is a lower bound - all subset specific clusters will be appended)
|
|
clusters := make([]proto.Message, 0, len(cfgSnap.MeshGateway.GatewayGroups)+len(cfgSnap.MeshGateway.ServiceGroups))
|
|
|
|
// generate the remote dc clusters
|
|
for dc, _ := range cfgSnap.MeshGateway.GatewayGroups {
|
|
clusterName := DatacenterSNI(dc, cfgSnap)
|
|
|
|
cluster, err := s.makeMeshGatewayCluster(clusterName, cfgSnap)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
clusters = append(clusters, cluster)
|
|
}
|
|
|
|
// generate the per-service clusters
|
|
for svc, _ := range cfgSnap.MeshGateway.ServiceGroups {
|
|
clusterName := ServiceSNI(svc, "", "default", cfgSnap.Datacenter, cfgSnap)
|
|
|
|
cluster, err := s.makeMeshGatewayCluster(clusterName, cfgSnap)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
clusters = append(clusters, cluster)
|
|
}
|
|
|
|
// generate the service subset clusters
|
|
for svc, resolver := range cfgSnap.MeshGateway.ServiceResolvers {
|
|
for subsetName, _ := range resolver.Subsets {
|
|
clusterName := ServiceSNI(svc, subsetName, "default", cfgSnap.Datacenter, cfgSnap)
|
|
|
|
cluster, err := s.makeMeshGatewayCluster(clusterName, cfgSnap)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
clusters = append(clusters, cluster)
|
|
}
|
|
}
|
|
|
|
return clusters, nil
|
|
}
|
|
|
|
func (s *Server) makeAppCluster(cfgSnap *proxycfg.ConfigSnapshot) (*envoy.Cluster, error) {
|
|
var c *envoy.Cluster
|
|
var err error
|
|
|
|
cfg, err := ParseProxyConfig(cfgSnap.Proxy.Config)
|
|
if err != nil {
|
|
// Don't hard fail on a config typo, just warn. The parse func returns
|
|
// default config if there is an error so it's safe to continue.
|
|
s.Logger.Printf("[WARN] envoy: failed to parse Connect.Proxy.Config: %s", err)
|
|
}
|
|
|
|
// If we have overridden local cluster config try to parse it into an Envoy cluster
|
|
if cfg.LocalClusterJSON != "" {
|
|
return makeClusterFromUserConfig(cfg.LocalClusterJSON)
|
|
}
|
|
|
|
addr := cfgSnap.Proxy.LocalServiceAddress
|
|
if addr == "" {
|
|
addr = "127.0.0.1"
|
|
}
|
|
c = &envoy.Cluster{
|
|
Name: LocalAppClusterName,
|
|
ConnectTimeout: time.Duration(cfg.LocalConnectTimeoutMs) * time.Millisecond,
|
|
ClusterDiscoveryType: &envoy.Cluster_Type{Type: envoy.Cluster_STATIC},
|
|
LoadAssignment: &envoy.ClusterLoadAssignment{
|
|
ClusterName: LocalAppClusterName,
|
|
Endpoints: []envoyendpoint.LocalityLbEndpoints{
|
|
{
|
|
LbEndpoints: []envoyendpoint.LbEndpoint{
|
|
makeEndpoint(LocalAppClusterName,
|
|
addr,
|
|
cfgSnap.Proxy.LocalServicePort),
|
|
},
|
|
},
|
|
},
|
|
},
|
|
}
|
|
if cfg.Protocol == "http2" || cfg.Protocol == "grpc" {
|
|
c.Http2ProtocolOptions = &envoycore.Http2ProtocolOptions{}
|
|
}
|
|
|
|
return c, err
|
|
}
|
|
|
|
func (s *Server) makeUpstreamCluster(upstream structs.Upstream, cfgSnap *proxycfg.ConfigSnapshot) (*envoy.Cluster, error) {
|
|
var c *envoy.Cluster
|
|
var err error
|
|
|
|
ns := "default"
|
|
if upstream.DestinationNamespace != "" {
|
|
ns = upstream.DestinationNamespace
|
|
}
|
|
dc := cfgSnap.Datacenter
|
|
if upstream.Datacenter != "" {
|
|
dc = upstream.Datacenter
|
|
}
|
|
|
|
sni := ServiceSNI(upstream.DestinationName, "", ns, dc, cfgSnap)
|
|
if upstream.DestinationType == "prepared_query" {
|
|
sni = QuerySNI(upstream.DestinationName, dc, cfgSnap)
|
|
}
|
|
cfg, err := ParseUpstreamConfig(upstream.Config)
|
|
if err != nil {
|
|
// Don't hard fail on a config typo, just warn. The parse func returns
|
|
// default config if there is an error so it's safe to continue.
|
|
s.Logger.Printf("[WARN] envoy: failed to parse Upstream[%s].Config: %s",
|
|
upstream.Identifier(), err)
|
|
}
|
|
if cfg.ClusterJSON != "" {
|
|
c, err = makeClusterFromUserConfig(cfg.ClusterJSON)
|
|
if err != nil {
|
|
return c, err
|
|
}
|
|
// In the happy path don't return yet as we need to inject TLS config still.
|
|
}
|
|
|
|
if c == nil {
|
|
c = &envoy.Cluster{
|
|
Name: sni,
|
|
ConnectTimeout: time.Duration(cfg.ConnectTimeoutMs) * time.Millisecond,
|
|
ClusterDiscoveryType: &envoy.Cluster_Type{Type: envoy.Cluster_EDS},
|
|
EdsClusterConfig: &envoy.Cluster_EdsClusterConfig{
|
|
EdsConfig: &envoycore.ConfigSource{
|
|
ConfigSourceSpecifier: &envoycore.ConfigSource_Ads{
|
|
Ads: &envoycore.AggregatedConfigSource{},
|
|
},
|
|
},
|
|
},
|
|
// Having an empty config enables outlier detection with default config.
|
|
OutlierDetection: &envoycluster.OutlierDetection{},
|
|
}
|
|
if cfg.Protocol == "http2" || cfg.Protocol == "grpc" {
|
|
c.Http2ProtocolOptions = &envoycore.Http2ProtocolOptions{}
|
|
}
|
|
}
|
|
|
|
// Enable TLS upstream with the configured client certificate.
|
|
c.TlsContext = &envoyauth.UpstreamTlsContext{
|
|
CommonTlsContext: makeCommonTLSContext(cfgSnap),
|
|
Sni: sni,
|
|
}
|
|
|
|
return c, nil
|
|
}
|
|
|
|
func (s *Server) makeUpstreamClustersForDiscoveryChain(
|
|
upstream structs.Upstream,
|
|
chain *structs.CompiledDiscoveryChain,
|
|
cfgSnap *proxycfg.ConfigSnapshot,
|
|
) ([]*envoy.Cluster, error) {
|
|
|
|
cfg, err := ParseUpstreamConfigNoDefaults(upstream.Config)
|
|
if err != nil {
|
|
// Don't hard fail on a config typo, just warn. The parse func returns
|
|
// default config if there is an error so it's safe to continue.
|
|
s.Logger.Printf("[WARN] envoy: failed to parse Upstream[%s].Config: %s",
|
|
upstream.Identifier(), err)
|
|
}
|
|
|
|
if chain == nil {
|
|
panic("chain must be provided")
|
|
}
|
|
|
|
// TODO(rb): make escape hatches work with chains
|
|
|
|
var out []*envoy.Cluster
|
|
for target, node := range chain.GroupResolverNodes {
|
|
groupResolver := node.GroupResolver
|
|
|
|
sni := TargetSNI(target, cfgSnap)
|
|
s.Logger.Printf("[DEBUG] xds.clusters - generating cluster for %s", sni)
|
|
c := &envoy.Cluster{
|
|
Name: sni,
|
|
AltStatName: sni, // TODO(rb): change this?
|
|
ConnectTimeout: groupResolver.ConnectTimeout,
|
|
ClusterDiscoveryType: &envoy.Cluster_Type{Type: envoy.Cluster_EDS},
|
|
CommonLbConfig: &envoy.Cluster_CommonLbConfig{
|
|
HealthyPanicThreshold: &envoytype.Percent{
|
|
Value: 0, // disable panic threshold
|
|
},
|
|
},
|
|
// TODO(rb): adjust load assignment
|
|
EdsClusterConfig: &envoy.Cluster_EdsClusterConfig{
|
|
EdsConfig: &envoycore.ConfigSource{
|
|
ConfigSourceSpecifier: &envoycore.ConfigSource_Ads{
|
|
Ads: &envoycore.AggregatedConfigSource{},
|
|
},
|
|
},
|
|
},
|
|
// Having an empty config enables outlier detection with default config.
|
|
OutlierDetection: &envoycluster.OutlierDetection{},
|
|
}
|
|
|
|
proto := cfg.Protocol
|
|
if proto == "" {
|
|
proto = chain.Protocol
|
|
}
|
|
|
|
if proto == "" {
|
|
proto = "tcp"
|
|
}
|
|
|
|
if proto == "http2" || proto == "grpc" {
|
|
c.Http2ProtocolOptions = &envoycore.Http2ProtocolOptions{}
|
|
}
|
|
|
|
// Enable TLS upstream with the configured client certificate.
|
|
c.TlsContext = &envoyauth.UpstreamTlsContext{
|
|
CommonTlsContext: makeCommonTLSContext(cfgSnap),
|
|
Sni: sni,
|
|
}
|
|
|
|
out = append(out, c)
|
|
}
|
|
|
|
return out, nil
|
|
}
|
|
|
|
// makeClusterFromUserConfig returns the listener config decoded from an
|
|
// arbitrary proto3 json format string or an error if it's invalid.
|
|
//
|
|
// For now we only support embedding in JSON strings because of the hcl parsing
|
|
// pain (see config.go comment above call to PatchSliceOfMaps). Until we
|
|
// refactor config parser a _lot_ user's opaque config that contains arrays will
|
|
// be mangled. We could actually fix that up in mapstructure which knows the
|
|
// type of the target so could resolve the slices to singletons unambiguously
|
|
// and it would work for us here... but we still have the problem that the
|
|
// config would render incorrectly in general in our HTTP API responses so we
|
|
// really need to fix it "properly".
|
|
//
|
|
// When we do that we can support just nesting the config directly into the
|
|
// JSON/hcl naturally but this is a stop-gap that gets us an escape hatch
|
|
// immediately. It's also probably not a bad thing to support long-term since
|
|
// any config generated by other systems will likely be in canonical protobuf
|
|
// from rather than our slight variant in JSON/hcl.
|
|
func makeClusterFromUserConfig(configJSON string) (*envoy.Cluster, error) {
|
|
var jsonFields map[string]*json.RawMessage
|
|
if err := json.Unmarshal([]byte(configJSON), &jsonFields); err != nil {
|
|
fmt.Println("Custom error", err, configJSON)
|
|
return nil, err
|
|
}
|
|
|
|
var c envoy.Cluster
|
|
|
|
if _, ok := jsonFields["@type"]; ok {
|
|
// Type field is present so decode it as a types.Any
|
|
var any types.Any
|
|
err := jsonpb.UnmarshalString(configJSON, &any)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
// And then unmarshal the listener again...
|
|
err = proto.Unmarshal(any.Value, &c)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
return &c, err
|
|
}
|
|
|
|
// No @type so try decoding as a straight listener.
|
|
err := jsonpb.UnmarshalString(configJSON, &c)
|
|
return &c, err
|
|
}
|
|
|
|
func (s *Server) makeMeshGatewayCluster(clusterName string, cfgSnap *proxycfg.ConfigSnapshot) (*envoy.Cluster, error) {
|
|
cfg, err := ParseMeshGatewayConfig(cfgSnap.Proxy.Config)
|
|
if err != nil {
|
|
// Don't hard fail on a config typo, just warn. The parse func returns
|
|
// default config if there is an error so it's safe to continue.
|
|
s.Logger.Printf("[WARN] envoy: failed to parse mesh gateway config: %s", err)
|
|
}
|
|
|
|
return &envoy.Cluster{
|
|
Name: clusterName,
|
|
ConnectTimeout: time.Duration(cfg.ConnectTimeoutMs) * time.Millisecond,
|
|
ClusterDiscoveryType: &envoy.Cluster_Type{Type: envoy.Cluster_EDS},
|
|
EdsClusterConfig: &envoy.Cluster_EdsClusterConfig{
|
|
EdsConfig: &envoycore.ConfigSource{
|
|
ConfigSourceSpecifier: &envoycore.ConfigSource_Ads{
|
|
Ads: &envoycore.AggregatedConfigSource{},
|
|
},
|
|
},
|
|
},
|
|
// Having an empty config enables outlier detection with default config.
|
|
OutlierDetection: &envoycluster.OutlierDetection{},
|
|
}, nil
|
|
}
|