open-consul/agent/xds/listeners.go

1122 lines
35 KiB
Go

package xds
import (
"encoding/json"
"errors"
"fmt"
"net"
"net/url"
"regexp"
"strconv"
"strings"
"github.com/hashicorp/consul/logging"
"github.com/hashicorp/go-hclog"
envoy "github.com/envoyproxy/go-control-plane/envoy/api/v2"
envoyauth "github.com/envoyproxy/go-control-plane/envoy/api/v2/auth"
envoycore "github.com/envoyproxy/go-control-plane/envoy/api/v2/core"
envoylistener "github.com/envoyproxy/go-control-plane/envoy/api/v2/listener"
envoyroute "github.com/envoyproxy/go-control-plane/envoy/api/v2/route"
extauthz "github.com/envoyproxy/go-control-plane/envoy/config/filter/network/ext_authz/v2"
envoyhttp "github.com/envoyproxy/go-control-plane/envoy/config/filter/network/http_connection_manager/v2"
envoytcp "github.com/envoyproxy/go-control-plane/envoy/config/filter/network/tcp_proxy/v2"
envoytype "github.com/envoyproxy/go-control-plane/envoy/type"
"github.com/envoyproxy/go-control-plane/pkg/util"
"github.com/gogo/protobuf/jsonpb"
"github.com/gogo/protobuf/proto"
"github.com/gogo/protobuf/types"
"github.com/hashicorp/consul/agent/connect"
"github.com/hashicorp/consul/agent/proxycfg"
"github.com/hashicorp/consul/agent/structs"
)
// listenersFromSnapshot returns the xDS API representation of the "listeners" in the snapshot.
func (s *Server) listenersFromSnapshot(cfgSnap *proxycfg.ConfigSnapshot, token string) ([]proto.Message, error) {
if cfgSnap == nil {
return nil, errors.New("nil config given")
}
switch cfgSnap.Kind {
case structs.ServiceKindConnectProxy:
return s.listenersFromSnapshotConnectProxy(cfgSnap, token)
case structs.ServiceKindTerminatingGateway:
return s.listenersFromSnapshotGateway(cfgSnap, token)
case structs.ServiceKindMeshGateway:
return s.listenersFromSnapshotGateway(cfgSnap, token)
case structs.ServiceKindIngressGateway:
return s.listenersFromSnapshotGateway(cfgSnap, token)
default:
return nil, fmt.Errorf("Invalid service kind: %v", cfgSnap.Kind)
}
}
// listenersFromSnapshotConnectProxy returns the "listeners" for a connect proxy service
func (s *Server) listenersFromSnapshotConnectProxy(cfgSnap *proxycfg.ConfigSnapshot, token string) ([]proto.Message, error) {
// One listener for each upstream plus the public one
resources := make([]proto.Message, len(cfgSnap.Proxy.Upstreams)+1)
// Configure public listener
var err error
resources[0], err = s.makePublicListener(cfgSnap, token)
if err != nil {
return nil, err
}
for i, u := range cfgSnap.Proxy.Upstreams {
id := u.Identifier()
var chain *structs.CompiledDiscoveryChain
if u.DestinationType != structs.UpstreamDestTypePreparedQuery {
chain = cfgSnap.ConnectProxy.DiscoveryChain[id]
}
var upstreamListener proto.Message
upstreamListener, err = s.makeUpstreamListenerForDiscoveryChain(
&u,
u.LocalBindAddress,
chain,
cfgSnap,
nil,
)
if err != nil {
return nil, err
}
resources[i+1] = upstreamListener
}
cfgSnap.Proxy.Expose.Finalize()
paths := cfgSnap.Proxy.Expose.Paths
// Add service health checks to the list of paths to create listeners for if needed
if cfgSnap.Proxy.Expose.Checks {
psid := structs.NewServiceID(cfgSnap.Proxy.DestinationServiceID, &cfgSnap.ProxyID.EnterpriseMeta)
for _, check := range s.CheckFetcher.ServiceHTTPBasedChecks(psid) {
p, err := parseCheckPath(check)
if err != nil {
s.Logger.Warn("failed to create listener for", "check", check.CheckID, "error", err)
continue
}
paths = append(paths, p)
}
}
// Configure additional listener for exposed check paths
for _, path := range paths {
clusterName := LocalAppClusterName
if path.LocalPathPort != cfgSnap.Proxy.LocalServicePort {
clusterName = makeExposeClusterName(path.LocalPathPort)
}
l, err := s.makeExposedCheckListener(cfgSnap, clusterName, path)
if err != nil {
return nil, err
}
resources = append(resources, l)
}
return resources, nil
}
func parseCheckPath(check structs.CheckType) (structs.ExposePath, error) {
var path structs.ExposePath
if check.HTTP != "" {
path.Protocol = "http"
// Get path and local port from original HTTP target
u, err := url.Parse(check.HTTP)
if err != nil {
return path, fmt.Errorf("failed to parse url '%s': %v", check.HTTP, err)
}
path.Path = u.Path
_, portStr, err := net.SplitHostPort(u.Host)
if err != nil {
return path, fmt.Errorf("failed to parse port from '%s': %v", check.HTTP, err)
}
path.LocalPathPort, err = strconv.Atoi(portStr)
if err != nil {
return path, fmt.Errorf("failed to parse port from '%s': %v", check.HTTP, err)
}
// Get listener port from proxied HTTP target
u, err = url.Parse(check.ProxyHTTP)
if err != nil {
return path, fmt.Errorf("failed to parse url '%s': %v", check.ProxyHTTP, err)
}
_, portStr, err = net.SplitHostPort(u.Host)
if err != nil {
return path, fmt.Errorf("failed to parse port from '%s': %v", check.ProxyHTTP, err)
}
path.ListenerPort, err = strconv.Atoi(portStr)
if err != nil {
return path, fmt.Errorf("failed to parse port from '%s': %v", check.ProxyHTTP, err)
}
}
if check.GRPC != "" {
path.Path = "/grpc.health.v1.Health/Check"
path.Protocol = "http2"
// Get local port from original GRPC target of the form: host/service
proxyServerAndService := strings.SplitN(check.GRPC, "/", 2)
_, portStr, err := net.SplitHostPort(proxyServerAndService[0])
if err != nil {
return path, fmt.Errorf("failed to split host/port from '%s': %v", check.GRPC, err)
}
path.LocalPathPort, err = strconv.Atoi(portStr)
if err != nil {
return path, fmt.Errorf("failed to parse port from '%s': %v", check.GRPC, err)
}
// Get listener port from proxied GRPC target of the form: host/service
proxyServerAndService = strings.SplitN(check.ProxyGRPC, "/", 2)
_, portStr, err = net.SplitHostPort(proxyServerAndService[0])
if err != nil {
return path, fmt.Errorf("failed to split host/port from '%s': %v", check.ProxyGRPC, err)
}
path.ListenerPort, err = strconv.Atoi(portStr)
if err != nil {
return path, fmt.Errorf("failed to parse port from '%s': %v", check.ProxyGRPC, err)
}
}
path.ParsedFromCheck = true
return path, nil
}
// listenersFromSnapshotGateway returns the "listener" for a terminating-gateway or mesh-gateway service
func (s *Server) listenersFromSnapshotGateway(cfgSnap *proxycfg.ConfigSnapshot, token string) ([]proto.Message, error) {
cfg, err := ParseGatewayConfig(cfgSnap.Proxy.Config)
if err != nil {
// Don't hard fail on a config typo, just warn. The parse func returns
// default config if there is an error so it's safe to continue.
s.Logger.Warn("failed to parse Connect.Proxy.Config", "error", err)
}
// Prevent invalid configurations of binding to the same port/addr twice
// including with the any addresses
type namedAddress struct {
name string
structs.ServiceAddress
}
seen := make(map[structs.ServiceAddress]bool)
addrs := make([]namedAddress, 0)
var resources []proto.Message
if !cfg.NoDefaultBind {
addr := cfgSnap.Address
if addr == "" {
addr = "0.0.0.0"
}
a := structs.ServiceAddress{
Address: addr,
Port: cfgSnap.Port,
}
if !seen[a] {
addrs = append(addrs, namedAddress{name: "default", ServiceAddress: a})
seen[a] = true
}
}
if cfg.BindTaggedAddresses {
for name, addrCfg := range cfgSnap.TaggedAddresses {
a := structs.ServiceAddress{
Address: addrCfg.Address,
Port: addrCfg.Port,
}
if !seen[a] {
addrs = append(addrs, namedAddress{name: name, ServiceAddress: a})
seen[a] = true
}
}
}
for name, addrCfg := range cfg.BindAddresses {
a := structs.ServiceAddress{
Address: addrCfg.Address,
Port: addrCfg.Port,
}
if !seen[a] {
addrs = append(addrs, namedAddress{name: name, ServiceAddress: a})
seen[a] = true
}
}
// Make listeners once deduplicated
for _, a := range addrs {
var l *envoy.Listener
switch cfgSnap.Kind {
case structs.ServiceKindTerminatingGateway:
l, err = s.makeTerminatingGatewayListener(a.name, a.Address, a.Port, cfgSnap, token)
if err != nil {
return nil, err
}
case structs.ServiceKindIngressGateway:
listeners, err := s.makeIngressGatewayListeners(a.Address, cfgSnap)
if err != nil {
return nil, err
}
resources = append(resources, listeners...)
case structs.ServiceKindMeshGateway:
l, err = s.makeMeshGatewayListener(a.name, a.Address, a.Port, cfgSnap)
if err != nil {
return nil, err
}
}
if l != nil {
resources = append(resources, l)
}
}
return resources, err
}
func (s *Server) makeIngressGatewayListeners(address string, cfgSnap *proxycfg.ConfigSnapshot) ([]proto.Message, error) {
var resources []proto.Message
for listenerKey, upstreams := range cfgSnap.IngressGateway.Upstreams {
var tlsContext *envoyauth.DownstreamTlsContext
if cfgSnap.IngressGateway.TLSEnabled {
tlsContext = &envoyauth.DownstreamTlsContext{
CommonTlsContext: makeCommonTLSContextFromLeaf(cfgSnap, cfgSnap.Leaf()),
RequireClientCertificate: &types.BoolValue{Value: false},
}
}
if listenerKey.Protocol == "tcp" {
// We rely on the invariant of upstreams slice always having at least 1
// member, because this key/value pair is created only when a
// GatewayService is returned in the RPC
u := upstreams[0]
id := u.Identifier()
chain := cfgSnap.IngressGateway.DiscoveryChain[id]
var upstreamListener proto.Message
upstreamListener, err := s.makeUpstreamListenerForDiscoveryChain(
&u,
address,
chain,
cfgSnap,
tlsContext,
)
if err != nil {
return nil, err
}
resources = append(resources, upstreamListener)
} else {
// If multiple upstreams share this port, make a special listener for the protocol.
listener := makeListener(listenerKey.Protocol, address, listenerKey.Port)
filter, err := makeListenerFilter(
true, listenerKey.Protocol, listenerKey.RouteName(), "", "ingress_upstream_", "", false)
if err != nil {
return nil, err
}
listener.FilterChains = []envoylistener.FilterChain{
{
Filters: []envoylistener.Filter{
filter,
},
TlsContext: tlsContext,
},
}
resources = append(resources, listener)
}
}
return resources, nil
}
// makeListener returns a listener with name and bind details set. Filters must
// be added before it's useful.
//
// Note on names: Envoy listeners attempt graceful transitions of connections
// when their config changes but that means they can't have their bind address
// or port changed in a running instance. Since our users might choose to change
// a bind address or port for the public or upstream listeners, we need to
// encode those into the unique name for the listener such that if the user
// changes them, we actually create a whole new listener on the new address and
// port. Envoy should take care of closing the old one once it sees it's no
// longer in the config.
func makeListener(name, addr string, port int) *envoy.Listener {
return &envoy.Listener{
Name: fmt.Sprintf("%s:%s:%d", name, addr, port),
Address: makeAddress(addr, port),
}
}
// makeListenerFromUserConfig returns the listener config decoded from an
// arbitrary proto3 json format string or an error if it's invalid.
//
// For now we only support embedding in JSON strings because of the hcl parsing
// pain (see Background section in the comment for decode.HookWeakDecodeFromSlice).
// This may be fixed in decode.HookWeakDecodeFromSlice in the future.
//
// When we do that we can support just nesting the config directly into the
// JSON/hcl naturally but this is a stop-gap that gets us an escape hatch
// immediately. It's also probably not a bad thing to support long-term since
// any config generated by other systems will likely be in canonical protobuf
// from rather than our slight variant in JSON/hcl.
func makeListenerFromUserConfig(configJSON string) (*envoy.Listener, error) {
// Figure out if there is an @type field. We don't require is since we know
// this will be a listener but unmarshalling into types.Any fails if it's not
// there and unmarshalling into listener directly fails if it is...
var jsonFields map[string]*json.RawMessage
if err := json.Unmarshal([]byte(configJSON), &jsonFields); err != nil {
return nil, err
}
var l envoy.Listener
if _, ok := jsonFields["@type"]; ok {
// Type field is present so decode it as a types.Any
var any types.Any
err := jsonpb.UnmarshalString(configJSON, &any)
if err != nil {
return nil, err
}
// And then unmarshal the listener again...
err = proto.Unmarshal(any.Value, &l)
if err != nil {
return nil, err
}
return &l, err
}
// No @type so try decoding as a straight listener.
err := jsonpb.UnmarshalString(configJSON, &l)
return &l, err
}
// Ensure that the first filter in each filter chain of a public listener is the
// authz filter to prevent unauthorized access and that every filter chain uses
// our TLS certs. We might allow users to work around this later if there is a
// good use case but this is actually a feature for now as it allows them to
// specify custom listener params in config but still get our certs delivered
// dynamically and intentions enforced without coming up with some complicated
// templating/merging solution.
func injectConnectFilters(cfgSnap *proxycfg.ConfigSnapshot, token string, listener *envoy.Listener, setTLS bool) error {
authFilter, err := makeExtAuthFilter(token)
if err != nil {
return err
}
for idx := range listener.FilterChains {
// Insert our authz filter before any others
listener.FilterChains[idx].Filters =
append([]envoylistener.Filter{authFilter}, listener.FilterChains[idx].Filters...)
listener.FilterChains[idx].TlsContext = &envoyauth.DownstreamTlsContext{
CommonTlsContext: makeCommonTLSContextFromLeaf(cfgSnap, cfgSnap.Leaf()),
RequireClientCertificate: &types.BoolValue{Value: true},
}
}
return nil
}
func (s *Server) makePublicListener(cfgSnap *proxycfg.ConfigSnapshot, token string) (proto.Message, error) {
var l *envoy.Listener
var err error
cfg, err := ParseProxyConfig(cfgSnap.Proxy.Config)
if err != nil {
// Don't hard fail on a config typo, just warn. The parse func returns
// default config if there is an error so it's safe to continue.
s.Logger.Warn("failed to parse Connect.Proxy.Config", "error", err)
}
if cfg.PublicListenerJSON != "" {
l, err = makeListenerFromUserConfig(cfg.PublicListenerJSON)
if err != nil {
return l, err
}
// In the happy path don't return yet as we need to inject TLS config still.
}
if l == nil {
// No user config, use default listener
addr := cfgSnap.Address
// Override with bind address if one is set, otherwise default
// to 0.0.0.0
if cfg.BindAddress != "" {
addr = cfg.BindAddress
} else if addr == "" {
addr = "0.0.0.0"
}
// Override with bind port if one is set, otherwise default to
// proxy service's address
port := cfgSnap.Port
if cfg.BindPort != 0 {
port = cfg.BindPort
}
l = makeListener(PublicListenerName, addr, port)
filter, err := makeListenerFilter(
false, cfg.Protocol, "public_listener", LocalAppClusterName, "", "", true)
if err != nil {
return nil, err
}
l.FilterChains = []envoylistener.FilterChain{
{
Filters: []envoylistener.Filter{
filter,
},
},
}
}
err = injectConnectFilters(cfgSnap, token, l, true)
return l, err
}
func (s *Server) makeExposedCheckListener(cfgSnap *proxycfg.ConfigSnapshot, cluster string, path structs.ExposePath) (proto.Message, error) {
cfg, err := ParseProxyConfig(cfgSnap.Proxy.Config)
if err != nil {
// Don't hard fail on a config typo, just warn. The parse func returns
// default config if there is an error so it's safe to continue.
s.Logger.Warn("failed to parse Connect.Proxy.Config", "error", err)
}
// No user config, use default listener
addr := cfgSnap.Address
// Override with bind address if one is set, otherwise default to 0.0.0.0
if cfg.BindAddress != "" {
addr = cfg.BindAddress
} else if addr == "" {
addr = "0.0.0.0"
}
// Strip any special characters from path to make a valid and hopefully unique name
r := regexp.MustCompile(`[^a-zA-Z0-9]+`)
strippedPath := r.ReplaceAllString(path.Path, "")
listenerName := fmt.Sprintf("exposed_path_%s", strippedPath)
l := makeListener(listenerName, addr, path.ListenerPort)
filterName := fmt.Sprintf("exposed_path_filter_%s_%d", strippedPath, path.ListenerPort)
f, err := makeListenerFilter(false, path.Protocol, filterName, cluster, "", path.Path, true)
if err != nil {
return nil, err
}
chain := envoylistener.FilterChain{
Filters: []envoylistener.Filter{f},
}
// For registered checks restrict traffic sources to localhost and Consul's advertise addr
if path.ParsedFromCheck {
// For the advertise addr we use a CidrRange that only matches one address
advertise := s.CfgFetcher.AdvertiseAddrLAN()
// Get prefix length based on whether address is ipv4 (32 bits) or ipv6 (128 bits)
advertiseLen := 32
ip := net.ParseIP(advertise)
if ip != nil && strings.Contains(advertise, ":") {
advertiseLen = 128
}
chain.FilterChainMatch = &envoylistener.FilterChainMatch{
SourcePrefixRanges: []*envoycore.CidrRange{
{AddressPrefix: "127.0.0.1", PrefixLen: &types.UInt32Value{Value: 8}},
{AddressPrefix: "::1", PrefixLen: &types.UInt32Value{Value: 128}},
{AddressPrefix: advertise, PrefixLen: &types.UInt32Value{Value: uint32(advertiseLen)}},
},
}
}
l.FilterChains = []envoylistener.FilterChain{chain}
return l, err
}
func (s *Server) makeTerminatingGatewayListener(name, addr string, port int, cfgSnap *proxycfg.ConfigSnapshot, token string) (*envoy.Listener, error) {
l := makeListener(name, addr, port)
tlsInspector, err := makeTLSInspectorListenerFilter()
if err != nil {
return nil, err
}
l.ListenerFilters = []envoylistener.ListenerFilter{tlsInspector}
// Make a FilterChain for each linked service
// Match on the cluster name,
for svc := range cfgSnap.TerminatingGateway.ServiceGroups {
clusterName := connect.ServiceSNI(svc.Name, "", svc.NamespaceOrDefault(), cfgSnap.Datacenter, cfgSnap.Roots.TrustDomain)
resolver, hasResolver := cfgSnap.TerminatingGateway.ServiceResolvers[svc]
// Skip the service if we don't have a cert to present for mTLS
if cert, ok := cfgSnap.TerminatingGateway.ServiceLeaves[svc]; !ok || cert == nil {
// TODO (gateways) (freddy) Should the error suggest that the issue may be ACLs? (need service:write on service)
s.Logger.Named(logging.TerminatingGateway).
Error("no client certificate available for linked service, skipping filter chain creation",
"service", svc.String(), "error", err)
continue
}
clusterChain, err := s.sniFilterChainTerminatingGateway(name, clusterName, token, svc, cfgSnap)
if err != nil {
return nil, fmt.Errorf("failed to make filter chain for cluster %q: %v", clusterName, err)
}
l.FilterChains = append(l.FilterChains, clusterChain)
// if there is a service-resolver for this service then also setup subset filter chains for it
if hasResolver {
// generate 1 filter chain for each service subset
for subsetName := range resolver.Subsets {
clusterName := connect.ServiceSNI(svc.Name, subsetName, svc.NamespaceOrDefault(), cfgSnap.Datacenter, cfgSnap.Roots.TrustDomain)
clusterChain, err := s.sniFilterChainTerminatingGateway(name, clusterName, token, svc, cfgSnap)
if err != nil {
return nil, fmt.Errorf("failed to make filter chain for cluster %q: %v", clusterName, err)
}
l.FilterChains = append(l.FilterChains, clusterChain)
}
}
}
// This fallback catch-all filter ensures a listener will be present for health checks to pass
// Envoy will reset these connections since known endpoints are caught by filter chain matches above
tcpProxy, err := makeTCPProxyFilter(name, "", "terminating_gateway_")
if err != nil {
return nil, err
}
fallback := envoylistener.FilterChain{
Filters: []envoylistener.Filter{
{Name: "envoy.filters.network.sni_cluster"},
tcpProxy,
},
}
l.FilterChains = append(l.FilterChains, fallback)
return l, nil
}
func (s *Server) sniFilterChainTerminatingGateway(listener, cluster, token string, service structs.ServiceName,
cfgSnap *proxycfg.ConfigSnapshot) (envoylistener.FilterChain, error) {
authFilter, err := makeExtAuthFilter(token)
if err != nil {
return envoylistener.FilterChain{}, err
}
sniCluster, err := makeSNIClusterFilter()
if err != nil {
return envoylistener.FilterChain{}, err
}
// The cluster name here doesn't matter as the sni_cluster filter will fill it in for us.
statPrefix := fmt.Sprintf("terminating_gateway_%s_%s_", service.NamespaceOrDefault(), service.Name)
tcpProxy, err := makeTCPProxyFilter(listener, "", statPrefix)
if err != nil {
return envoylistener.FilterChain{}, err
}
return envoylistener.FilterChain{
FilterChainMatch: makeSNIFilterChainMatch(cluster),
Filters: []envoylistener.Filter{
authFilter,
sniCluster,
tcpProxy,
},
TlsContext: &envoyauth.DownstreamTlsContext{
CommonTlsContext: makeCommonTLSContextFromLeaf(cfgSnap, cfgSnap.TerminatingGateway.ServiceLeaves[service]),
RequireClientCertificate: &types.BoolValue{Value: true},
},
}, err
}
func (s *Server) makeMeshGatewayListener(name, addr string, port int, cfgSnap *proxycfg.ConfigSnapshot) (*envoy.Listener, error) {
tlsInspector, err := makeTLSInspectorListenerFilter()
if err != nil {
return nil, err
}
sniCluster, err := makeSNIClusterFilter()
if err != nil {
return nil, err
}
// The cluster name here doesn't matter as the sni_cluster
// filter will fill it in for us.
tcpProxy, err := makeTCPProxyFilter(name, "", "mesh_gateway_local_")
if err != nil {
return nil, err
}
sniClusterChain := envoylistener.FilterChain{
Filters: []envoylistener.Filter{
sniCluster,
tcpProxy,
},
}
l := makeListener(name, addr, port)
l.ListenerFilters = []envoylistener.ListenerFilter{tlsInspector}
// TODO (mesh-gateway) - Do we need to create clusters for all the old trust domains as well?
// We need 1 Filter Chain per datacenter
datacenters := cfgSnap.MeshGateway.Datacenters()
for _, dc := range datacenters {
if dc == cfgSnap.Datacenter {
continue // skip local
}
clusterName := connect.DatacenterSNI(dc, cfgSnap.Roots.TrustDomain)
filterName := fmt.Sprintf("%s_%s", name, dc)
dcTCPProxy, err := makeTCPProxyFilter(filterName, clusterName, "mesh_gateway_remote_")
if err != nil {
return nil, err
}
l.FilterChains = append(l.FilterChains, envoylistener.FilterChain{
FilterChainMatch: &envoylistener.FilterChainMatch{
ServerNames: []string{fmt.Sprintf("*.%s", clusterName)},
},
Filters: []envoylistener.Filter{
dcTCPProxy,
},
})
}
if cfgSnap.ServiceMeta[structs.MetaWANFederationKey] == "1" && cfgSnap.ServerSNIFn != nil {
for _, dc := range datacenters {
if dc == cfgSnap.Datacenter {
continue // skip local
}
clusterName := cfgSnap.ServerSNIFn(dc, "")
filterName := fmt.Sprintf("%s_%s", name, dc)
dcTCPProxy, err := makeTCPProxyFilter(filterName, clusterName, "mesh_gateway_remote_")
if err != nil {
return nil, err
}
l.FilterChains = append(l.FilterChains, envoylistener.FilterChain{
FilterChainMatch: &envoylistener.FilterChainMatch{
ServerNames: []string{fmt.Sprintf("*.%s", clusterName)},
},
Filters: []envoylistener.Filter{
dcTCPProxy,
},
})
}
// Wildcard all flavors to each server.
for _, srv := range cfgSnap.MeshGateway.ConsulServers {
clusterName := cfgSnap.ServerSNIFn(cfgSnap.Datacenter, srv.Node.Node)
filterName := fmt.Sprintf("%s_%s", name, cfgSnap.Datacenter)
dcTCPProxy, err := makeTCPProxyFilter(filterName, clusterName, "mesh_gateway_local_server_")
if err != nil {
return nil, err
}
l.FilterChains = append(l.FilterChains, envoylistener.FilterChain{
FilterChainMatch: &envoylistener.FilterChainMatch{
ServerNames: []string{fmt.Sprintf("%s", clusterName)},
},
Filters: []envoylistener.Filter{
dcTCPProxy,
},
})
}
}
// This needs to get tacked on at the end as it has no
// matching and will act as a catch all
l.FilterChains = append(l.FilterChains, sniClusterChain)
return l, nil
}
func (s *Server) makeUpstreamListenerForDiscoveryChain(
u *structs.Upstream,
address string,
chain *structs.CompiledDiscoveryChain,
cfgSnap *proxycfg.ConfigSnapshot,
tlsContext *envoyauth.DownstreamTlsContext,
) (proto.Message, error) {
if address == "" {
address = "127.0.0.1"
}
upstreamID := u.Identifier()
l := makeListener(upstreamID, address, u.LocalBindPort)
cfg := getAndModifyUpstreamConfigForListener(s.Logger, u, chain)
if cfg.ListenerJSON != "" {
return makeListenerFromUserConfig(cfg.ListenerJSON)
}
useRDS := true
clusterName := ""
if chain == nil || chain.IsDefault() {
dc := u.Datacenter
if dc == "" {
dc = cfgSnap.Datacenter
}
sni := connect.UpstreamSNI(u, "", dc, cfgSnap.Roots.TrustDomain)
useRDS = false
clusterName = CustomizeClusterName(sni, chain)
} else if cfg.Protocol == "tcp" {
startNode := chain.Nodes[chain.StartNode]
if startNode == nil {
panic("missing first node in compiled discovery chain for: " + chain.ServiceName)
} else if startNode.Type != structs.DiscoveryGraphNodeTypeResolver {
panic(fmt.Sprintf("unexpected first node in discovery chain using protocol=%q: %s", cfg.Protocol, startNode.Type))
}
targetID := startNode.Resolver.Target
target := chain.Targets[targetID]
useRDS = false
clusterName = CustomizeClusterName(target.Name, chain)
}
filter, err := makeListenerFilter(
useRDS, cfg.Protocol, upstreamID, clusterName, "upstream_", "", false)
if err != nil {
return nil, err
}
l.FilterChains = []envoylistener.FilterChain{
{
Filters: []envoylistener.Filter{
filter,
},
TlsContext: tlsContext,
},
}
return l, nil
}
func getAndModifyUpstreamConfigForListener(logger hclog.Logger, u *structs.Upstream, chain *structs.CompiledDiscoveryChain) UpstreamConfig {
var (
cfg UpstreamConfig
err error
)
if chain == nil || chain.IsDefault() {
cfg, err = ParseUpstreamConfig(u.Config)
if err != nil {
// Don't hard fail on a config typo, just warn. The parse func returns
// default config if there is an error so it's safe to continue.
logger.Warn("failed to parse", "upstream", u.Identifier(), "error", err)
}
} else {
// Use NoDefaults here so that we can set the protocol to the chain
// protocol if necessary
cfg, err = ParseUpstreamConfigNoDefaults(u.Config)
if err != nil {
// Don't hard fail on a config typo, just warn. The parse func returns
// default config if there is an error so it's safe to continue.
logger.Warn("failed to parse", "upstream", u.Identifier(), "error", err)
}
if cfg.ListenerJSON != "" {
logger.Warn("ignoring escape hatch setting because already configured for",
"discovery chain", chain.ServiceName, "upstream", u.Identifier(), "config", "envoy_listener_json")
// Remove from config struct so we don't use it later on
cfg.ListenerJSON = ""
}
proto := cfg.Protocol
if proto == "" {
proto = chain.Protocol
}
if proto == "" {
proto = "tcp"
}
// set back on the config so that we can use it from return value
cfg.Protocol = proto
}
return cfg
}
func makeListenerFilter(
useRDS bool,
protocol, filterName, cluster, statPrefix, routePath string, ingress bool) (envoylistener.Filter, error) {
switch protocol {
case "grpc":
return makeHTTPFilter(useRDS, filterName, cluster, statPrefix, routePath, ingress, true, true)
case "http2":
return makeHTTPFilter(useRDS, filterName, cluster, statPrefix, routePath, ingress, false, true)
case "http":
return makeHTTPFilter(useRDS, filterName, cluster, statPrefix, routePath, ingress, false, false)
case "tcp":
fallthrough
default:
if useRDS {
return envoylistener.Filter{}, fmt.Errorf("RDS is not compatible with the tcp proxy filter")
} else if cluster == "" {
return envoylistener.Filter{}, fmt.Errorf("cluster name is required for a tcp proxy filter")
}
return makeTCPProxyFilter(filterName, cluster, statPrefix)
}
}
func makeTLSInspectorListenerFilter() (envoylistener.ListenerFilter, error) {
return envoylistener.ListenerFilter{Name: util.TlsInspector}, nil
}
func makeSNIFilterChainMatch(sniMatch string) *envoylistener.FilterChainMatch {
return &envoylistener.FilterChainMatch{
ServerNames: []string{sniMatch},
}
}
func makeSNIClusterFilter() (envoylistener.Filter, error) {
// This filter has no config which is why we are not calling make
return envoylistener.Filter{Name: "envoy.filters.network.sni_cluster"}, nil
}
func makeTCPProxyFilter(filterName, cluster, statPrefix string) (envoylistener.Filter, error) {
cfg := &envoytcp.TcpProxy{
StatPrefix: makeStatPrefix("tcp", statPrefix, filterName),
ClusterSpecifier: &envoytcp.TcpProxy_Cluster{Cluster: cluster},
}
return makeFilter("envoy.tcp_proxy", cfg)
}
func makeStatPrefix(protocol, prefix, filterName string) string {
// Replace colons here because Envoy does that in the metrics for the actual
// clusters but doesn't in the stat prefix here while dashboards assume they
// will match.
return fmt.Sprintf("%s%s_%s", prefix, strings.Replace(filterName, ":", "_", -1), protocol)
}
func makeHTTPFilter(
useRDS bool,
filterName, cluster, statPrefix, routePath string,
ingress, grpc, http2 bool,
) (envoylistener.Filter, error) {
op := envoyhttp.INGRESS
if !ingress {
op = envoyhttp.EGRESS
}
proto := "http"
if grpc {
proto = "grpc"
}
cfg := &envoyhttp.HttpConnectionManager{
StatPrefix: makeStatPrefix(proto, statPrefix, filterName),
CodecType: envoyhttp.AUTO,
HttpFilters: []*envoyhttp.HttpFilter{
{
Name: "envoy.router",
},
},
Tracing: &envoyhttp.HttpConnectionManager_Tracing{
OperationName: op,
// Don't trace any requests by default unless the client application
// explicitly propagates trace headers that indicate this should be
// sampled.
RandomSampling: &envoytype.Percent{Value: 0.0},
},
}
if useRDS {
if cluster != "" {
return envoylistener.Filter{}, fmt.Errorf("cannot specify cluster name when using RDS")
}
cfg.RouteSpecifier = &envoyhttp.HttpConnectionManager_Rds{
Rds: &envoyhttp.Rds{
RouteConfigName: filterName,
ConfigSource: envoycore.ConfigSource{
ConfigSourceSpecifier: &envoycore.ConfigSource_Ads{
Ads: &envoycore.AggregatedConfigSource{},
},
},
},
}
} else {
if cluster == "" {
return envoylistener.Filter{}, fmt.Errorf("must specify cluster name when not using RDS")
}
route := envoyroute.Route{
Match: envoyroute.RouteMatch{
PathSpecifier: &envoyroute.RouteMatch_Prefix{
Prefix: "/",
},
// TODO(banks) Envoy supports matching only valid GRPC
// requests which might be nice to add here for gRPC services
// but it's not supported in our current envoy SDK version
// although docs say it was supported by 1.8.0. Going to defer
// that until we've updated the deps.
},
Action: &envoyroute.Route_Route{
Route: &envoyroute.RouteAction{
ClusterSpecifier: &envoyroute.RouteAction_Cluster{
Cluster: cluster,
},
},
},
}
// If a path is provided, do not match on a catch-all prefix
if routePath != "" {
route.Match.PathSpecifier = &envoyroute.RouteMatch_Path{Path: routePath}
}
cfg.RouteSpecifier = &envoyhttp.HttpConnectionManager_RouteConfig{
RouteConfig: &envoy.RouteConfiguration{
Name: filterName,
VirtualHosts: []envoyroute.VirtualHost{
{
Name: filterName,
Domains: []string{"*"},
Routes: []envoyroute.Route{
route,
},
},
},
},
}
}
if http2 {
cfg.Http2ProtocolOptions = &envoycore.Http2ProtocolOptions{}
}
if grpc {
// Add grpc bridge before router
cfg.HttpFilters = append([]*envoyhttp.HttpFilter{{
Name: "envoy.grpc_http1_bridge",
ConfigType: &envoyhttp.HttpFilter_Config{Config: &types.Struct{}},
}}, cfg.HttpFilters...)
}
return makeFilter("envoy.http_connection_manager", cfg)
}
func makeExtAuthFilter(token string) (envoylistener.Filter, error) {
cfg := &extauthz.ExtAuthz{
StatPrefix: "connect_authz",
GrpcService: &envoycore.GrpcService{
// Attach token header so we can authorize the callbacks. Technically
// authorize is not really protected data but we locked down the HTTP
// implementation to need service:write and since we have the token that
// has that it's pretty reasonable to set it up here.
InitialMetadata: []*envoycore.HeaderValue{
{
Key: "x-consul-token",
Value: token,
},
},
TargetSpecifier: &envoycore.GrpcService_EnvoyGrpc_{
EnvoyGrpc: &envoycore.GrpcService_EnvoyGrpc{
ClusterName: LocalAgentClusterName,
},
},
},
FailureModeAllow: false,
}
return makeFilter("envoy.ext_authz", cfg)
}
func makeFilter(name string, cfg proto.Message) (envoylistener.Filter, error) {
// Ridiculous dance to make that pbstruct into types.Struct by... encoding it
// as JSON and decoding again!!
cfgStruct, err := util.MessageToStruct(cfg)
if err != nil {
return envoylistener.Filter{}, err
}
return envoylistener.Filter{
Name: name,
ConfigType: &envoylistener.Filter_Config{Config: cfgStruct},
}, nil
}
func makeCommonTLSContextFromLeaf(cfgSnap *proxycfg.ConfigSnapshot, leaf *structs.IssuedCert) *envoyauth.CommonTlsContext {
// Concatenate all the root PEMs into one.
// TODO(banks): verify this actually works with Envoy (docs are not clear).
rootPEMS := ""
if cfgSnap.Roots == nil {
return nil
}
for _, root := range cfgSnap.Roots.Roots {
rootPEMS += root.RootCert
}
return &envoyauth.CommonTlsContext{
TlsParams: &envoyauth.TlsParameters{},
TlsCertificates: []*envoyauth.TlsCertificate{
{
CertificateChain: &envoycore.DataSource{
Specifier: &envoycore.DataSource_InlineString{
InlineString: leaf.CertPEM,
},
},
PrivateKey: &envoycore.DataSource{
Specifier: &envoycore.DataSource_InlineString{
InlineString: leaf.PrivateKeyPEM,
},
},
},
},
ValidationContextType: &envoyauth.CommonTlsContext_ValidationContext{
ValidationContext: &envoyauth.CertificateValidationContext{
// TODO(banks): later for L7 support we may need to configure ALPN here.
TrustedCa: &envoycore.DataSource{
Specifier: &envoycore.DataSource_InlineString{
InlineString: rootPEMS,
},
},
},
},
}
}
func makeCommonTLSContextFromFiles(caFile, certFile, keyFile string) *envoyauth.CommonTlsContext {
ctx := envoyauth.CommonTlsContext{
TlsParams: &envoyauth.TlsParameters{},
}
// Verify certificate of peer if caFile is specified
if caFile != "" {
ctx.ValidationContextType = &envoyauth.CommonTlsContext_ValidationContext{
ValidationContext: &envoyauth.CertificateValidationContext{
TrustedCa: &envoycore.DataSource{
Specifier: &envoycore.DataSource_Filename{
Filename: caFile,
},
},
},
}
}
// Present certificate for mTLS if cert and key files are specified
if certFile != "" && keyFile != "" {
ctx.TlsCertificates = []*envoyauth.TlsCertificate{
{
CertificateChain: &envoycore.DataSource{
Specifier: &envoycore.DataSource_Filename{
Filename: certFile,
},
},
PrivateKey: &envoycore.DataSource{
Specifier: &envoycore.DataSource_Filename{
Filename: keyFile,
},
},
},
}
}
return &ctx
}