From f99593a0546a265df16a62c83105d2b43e150d6c Mon Sep 17 00:00:00 2001 From: Chris Thain <32781396+cthain@users.noreply.github.com> Date: Fri, 12 May 2023 09:52:50 -0700 Subject: [PATCH] Add Network Filter Support for Envoy Extensions (#17325) --- agent/xds/extensionruntime/runtime_config.go | 11 + .../extensioncommon/list_envoy_extender.go | 268 ++++++++++++++++++ .../extensioncommon/runtime_config.go | 3 + 3 files changed, 282 insertions(+) create mode 100644 envoyextensions/extensioncommon/list_envoy_extender.go diff --git a/agent/xds/extensionruntime/runtime_config.go b/agent/xds/extensionruntime/runtime_config.go index 31f2ce642..455c32001 100644 --- a/agent/xds/extensionruntime/runtime_config.go +++ b/agent/xds/extensionruntime/runtime_config.go @@ -92,6 +92,7 @@ func GetRuntimeConfigurations(cfgSnap *proxycfg.ConfigSnapshot) map[api.Compound Upstreams: nil, LocalUpstreams: upstreamMap, Kind: kind, + Protocol: proxyConfigProtocol(cfgSnap.Proxy.Config), } extensionConfigurationsMap[localSvc] = append(extensionConfigurationsMap[localSvc], extCfg) } @@ -131,6 +132,7 @@ func GetRuntimeConfigurations(cfgSnap *proxycfg.ConfigSnapshot) map[api.Compound Kind: kind, ServiceName: svc, Upstreams: upstreamMap, + Protocol: proxyConfigProtocol(cfgSnap.Proxy.Config), } extensionConfigurationsMap[svc] = append(extensionConfigurationsMap[svc], extCfg) } @@ -158,3 +160,12 @@ func upstreamIDToCompoundServiceName(uid proxycfg.UpstreamID) api.CompoundServic func convertEnvoyExtensions(structExtensions structs.EnvoyExtensions) []api.EnvoyExtension { return structExtensions.ToAPI() } + +func proxyConfigProtocol(cfg map[string]any) string { + if p, exists := cfg["protocol"]; exists { + if protocol, ok := p.(string); ok { + return protocol + } + } + return "" +} diff --git a/envoyextensions/extensioncommon/list_envoy_extender.go b/envoyextensions/extensioncommon/list_envoy_extender.go new file mode 100644 index 000000000..6c491a5d0 --- /dev/null +++ b/envoyextensions/extensioncommon/list_envoy_extender.go @@ -0,0 +1,268 @@ +// Copyright (c) HashiCorp, Inc. +// SPDX-License-Identifier: MPL-2.0 + +package extensioncommon + +import ( + "fmt" + "strings" + + envoy_cluster_v3 "github.com/envoyproxy/go-control-plane/envoy/config/cluster/v3" + envoy_listener_v3 "github.com/envoyproxy/go-control-plane/envoy/config/listener/v3" + envoy_route_v3 "github.com/envoyproxy/go-control-plane/envoy/config/route/v3" + "github.com/hashicorp/go-multierror" + + "github.com/hashicorp/consul/api" + "github.com/hashicorp/consul/envoyextensions/xdscommon" +) + +type ClusterMap map[string]*envoy_cluster_v3.Cluster +type ListenerMap map[string]*envoy_listener_v3.Listener +type RouteMap map[string]*envoy_route_v3.RouteConfiguration + +// ListExtension is the interface that each user of ListEnvoyExtender must implement. It +// is responsible for modifying the xDS structures based on only the state of the extension. +type ListExtension interface { + // CanApply determines if the extension can mutate resources for the given runtime configuration. + CanApply(*RuntimeConfig) bool + + // PatchRoutes patches routes to include the custom Envoy configuration + // required to integrate with the built in extension template. + PatchRoutes(*RuntimeConfig, RouteMap) (RouteMap, error) + + // PatchClusters patches clusters to include the custom Envoy configuration + // required to integrate with the built in extension template. + PatchClusters(*RuntimeConfig, ClusterMap) (ClusterMap, error) + + // PatchFilters patches Envoy filters to include the custom Envoy + // configuration required to integrate with the built in extension template. + PatchFilters(*RuntimeConfig, []*envoy_listener_v3.Filter) ([]*envoy_listener_v3.Filter, error) +} + +var _ EnvoyExtender = (*ListEnvoyExtender)(nil) + +// ListEnvoyExtender provides convenience functions for iterating and applying modifications +// to lists of Envoy resources. +type ListEnvoyExtender struct { + Extension ListExtension +} + +func (*ListEnvoyExtender) Validate(config *RuntimeConfig) error { + return nil +} + +func (e *ListEnvoyExtender) Extend(resources *xdscommon.IndexedResources, config *RuntimeConfig) (*xdscommon.IndexedResources, error) { + var resultErr error + + switch config.Kind { + case api.ServiceKindTerminatingGateway, api.ServiceKindConnectProxy: + default: + return resources, nil + } + + if !e.Extension.CanApply(config) { + return resources, nil + } + + clusters := make(ClusterMap) + routes := make(RouteMap) + listeners := make(ListenerMap) + isUpstream := config.IsUpstream() + + for _, indexType := range []string{ + xdscommon.ListenerType, + xdscommon.RouteType, + xdscommon.ClusterType, + } { + for nameOrSNI, msg := range resources.Index[indexType] { + switch resource := msg.(type) { + case *envoy_cluster_v3.Cluster: + // If the Envoy extension configuration is for an upstream service, the Cluster's + // name must match the upstream service's SNI. + if isUpstream && !config.MatchesUpstreamServiceSNI(nameOrSNI) { + continue + } + + // If the extension's config is for an an inbound listener, the Cluster's name + // must be xdscommon.LocalAppClusterName. + if !isUpstream && nameOrSNI == xdscommon.LocalAppClusterName { + continue + } + + clusters[nameOrSNI] = resource + + case *envoy_listener_v3.Listener: + listeners[nameOrSNI] = resource + + case *envoy_route_v3.RouteConfiguration: + // If the Envoy extension configuration is for an upstream service, the route's + // name must match the upstream service's Envoy ID. + matchesEnvoyID := config.EnvoyID() == nameOrSNI + if isUpstream && !config.MatchesUpstreamServiceSNI(nameOrSNI) && !matchesEnvoyID { + continue + } + + // There aren't routes for inbound services. + if !isUpstream { + continue + } + + routes[nameOrSNI] = resource + + default: + resultErr = multierror.Append(resultErr, fmt.Errorf("unsupported type was skipped: %T", resource)) + } + } + } + + patchedClusters, err := e.Extension.PatchClusters(config, clusters) + if err == nil { + for nameOrSNI, cluster := range patchedClusters { + resources.Index[xdscommon.ClusterType][nameOrSNI] = cluster + } + } else { + resultErr = multierror.Append(resultErr, fmt.Errorf("error patching clusters: %w", err)) + } + + patchedListeners, err := e.patchListeners(config, listeners) + if err == nil { + for nameOrSNI, listener := range patchedListeners { + resources.Index[xdscommon.ListenerType][nameOrSNI] = listener + } + } else { + resultErr = multierror.Append(resultErr, fmt.Errorf("error patching listeners: %w", err)) + } + + patchedRoutes, err := e.Extension.PatchRoutes(config, routes) + if err == nil { + for nameOrSNI, route := range patchedRoutes { + resources.Index[xdscommon.RouteType][nameOrSNI] = route + } + } else { + resultErr = multierror.Append(resultErr, fmt.Errorf("error patching routes: %w", err)) + } + + return resources, resultErr +} + +func (e ListEnvoyExtender) patchListeners(config *RuntimeConfig, m ListenerMap) (ListenerMap, error) { + switch config.Kind { + case api.ServiceKindTerminatingGateway: + return e.patchTerminatingGatewayListeners(config, m) + case api.ServiceKindConnectProxy: + return e.patchConnectProxyListeners(config, m) + } + return m, nil +} + +func (e ListEnvoyExtender) patchTerminatingGatewayListeners(config *RuntimeConfig, l ListenerMap) (ListenerMap, error) { + // We don't support directly targeting terminating gateways with extensions. + if !config.IsUpstream() { + return l, nil + } + + var resultErr error + for _, listener := range l { + for _, filterChain := range listener.FilterChains { + sni := getSNI(filterChain) + + if sni == "" { + continue + } + + // The filter chain's SNI must match the upstream service's SNI. + if !config.MatchesUpstreamServiceSNI(sni) { + continue + } + + patchedFilters, err := e.Extension.PatchFilters(config, filterChain.Filters) + if err == nil { + filterChain.Filters = patchedFilters + } else { + resultErr = multierror.Append(resultErr, fmt.Errorf("error patching listener filters for %q: %w", sni, err)) + continue + } + } + } + + return l, resultErr +} + +func (e ListEnvoyExtender) patchConnectProxyListeners(config *RuntimeConfig, l ListenerMap) (ListenerMap, error) { + var resultErr error + + isUpstream := config.IsUpstream() + for nameOrSNI, listener := range l { + envoyID := "" + if id, _, found := strings.Cut(listener.Name, ":"); found { + envoyID = id + } + + if isUpstream && envoyID == xdscommon.OutboundListenerName { + patchedListener, err := e.patchTProxyListener(config, listener) + if err == nil { + l[nameOrSNI] = patchedListener + } else { + resultErr = multierror.Append(resultErr, fmt.Errorf("error patching TProxy listener %q: %w", nameOrSNI, err)) + } + continue + } + + // If the Envoy extension configuration is for an upstream service, the listener's + // name must match the upstream service's EnvoyID or be the outbound listener. + if isUpstream && envoyID != config.EnvoyID() { + continue + } + + // If the Envoy extension configuration is for inbound resources, the + // listener must be named xdscommon.PublicListenerName. + if config.IsLocal() && envoyID != xdscommon.PublicListenerName { + continue + } + + patchedListener, err := e.patchConnectProxyListener(config, listener) + if err == nil { + l[nameOrSNI] = patchedListener + } else { + resultErr = multierror.Append(resultErr, fmt.Errorf("error patching connect proxy listener %q: %w", nameOrSNI, err)) + } + } + + return l, resultErr +} + +func (e ListEnvoyExtender) patchConnectProxyListener(config *RuntimeConfig, l *envoy_listener_v3.Listener) (*envoy_listener_v3.Listener, error) { + var resultErr error + + for _, filterChain := range l.FilterChains { + patchedFilters, err := e.Extension.PatchFilters(config, filterChain.Filters) + if err == nil { + filterChain.Filters = patchedFilters + } else { + resultErr = multierror.Append(resultErr, fmt.Errorf("error patching filters: %w", err)) + } + } + return l, resultErr +} + +func (e ListEnvoyExtender) patchTProxyListener(config *RuntimeConfig, l *envoy_listener_v3.Listener) (*envoy_listener_v3.Listener, error) { + var resultErr error + + vip := config.Upstreams[config.ServiceName].VIP + + for _, filterChain := range l.FilterChains { + match := filterChainTProxyMatch(vip, filterChain) + if !match { + continue + } + + patchedFilters, err := e.Extension.PatchFilters(config, filterChain.Filters) + if err == nil { + filterChain.Filters = patchedFilters + } else { + resultErr = multierror.Append(resultErr, fmt.Errorf("error patching filters: %w", err)) + } + } + + return l, resultErr +} diff --git a/envoyextensions/extensioncommon/runtime_config.go b/envoyextensions/extensioncommon/runtime_config.go index d36f389ac..b9ab1b38c 100644 --- a/envoyextensions/extensioncommon/runtime_config.go +++ b/envoyextensions/extensioncommon/runtime_config.go @@ -44,6 +44,9 @@ type RuntimeConfig struct { // Kind is mode the local Envoy proxy is running in. For now, only connect proxy and // terminating gateways are supported. Kind api.ServiceKind + + // Protocol is the protocol configured for the local service. It may be empty which implies tcp. + Protocol string } // IsLocal indicates if the extension configuration is for the proxy's local service.