package xds import ( "errors" "fmt" envoy_api_v2 "github.com/envoyproxy/go-control-plane/envoy/api/v2" envoy_tls_v2 "github.com/envoyproxy/go-control-plane/envoy/api/v2/auth" envoy_core_v2 "github.com/envoyproxy/go-control-plane/envoy/api/v2/core" envoy_listener_v2 "github.com/envoyproxy/go-control-plane/envoy/api/v2/listener" envoy_route_v2 "github.com/envoyproxy/go-control-plane/envoy/api/v2/route" envoy_cluster_v3 "github.com/envoyproxy/go-control-plane/envoy/config/cluster/v3" envoy_endpoint_v3 "github.com/envoyproxy/go-control-plane/envoy/config/endpoint/v3" envoy_http_rbac_v2 "github.com/envoyproxy/go-control-plane/envoy/config/filter/http/rbac/v2" envoy_tls_inspector_v2 "github.com/envoyproxy/go-control-plane/envoy/config/filter/listener/tls_inspector/v2" envoy_http_v2 "github.com/envoyproxy/go-control-plane/envoy/config/filter/network/http_connection_manager/v2" envoy_network_rbac_v2 "github.com/envoyproxy/go-control-plane/envoy/config/filter/network/rbac/v2" envoy_sni_v2 "github.com/envoyproxy/go-control-plane/envoy/config/filter/network/sni_cluster/v2" envoy_tcp_proxy_v2 "github.com/envoyproxy/go-control-plane/envoy/config/filter/network/tcp_proxy/v2" envoy_listener_v3 "github.com/envoyproxy/go-control-plane/envoy/config/listener/v3" envoy_metrics_v2 "github.com/envoyproxy/go-control-plane/envoy/config/metrics/v2" envoy_metrics_v3 "github.com/envoyproxy/go-control-plane/envoy/config/metrics/v3" envoy_route_v3 "github.com/envoyproxy/go-control-plane/envoy/config/route/v3" envoy_trace_v2 "github.com/envoyproxy/go-control-plane/envoy/config/trace/v2" envoy_trace_v3 "github.com/envoyproxy/go-control-plane/envoy/config/trace/v3" envoy_http_rbac_v3 "github.com/envoyproxy/go-control-plane/envoy/extensions/filters/http/rbac/v3" envoy_tls_inspector_v3 "github.com/envoyproxy/go-control-plane/envoy/extensions/filters/listener/tls_inspector/v3" envoy_http_v3 "github.com/envoyproxy/go-control-plane/envoy/extensions/filters/network/http_connection_manager/v3" envoy_network_rbac_v3 "github.com/envoyproxy/go-control-plane/envoy/extensions/filters/network/rbac/v3" envoy_sni_v3 "github.com/envoyproxy/go-control-plane/envoy/extensions/filters/network/sni_cluster/v3" envoy_tcp_proxy_v3 "github.com/envoyproxy/go-control-plane/envoy/extensions/filters/network/tcp_proxy/v3" envoy_tls_v3 "github.com/envoyproxy/go-control-plane/envoy/extensions/transport_sockets/tls/v3" envoy_discovery_v2 "github.com/envoyproxy/go-control-plane/envoy/service/discovery/v2" envoy_discovery_v3 "github.com/envoyproxy/go-control-plane/envoy/service/discovery/v3" "github.com/golang/protobuf/proto" "github.com/golang/protobuf/ptypes" "github.com/golang/protobuf/ptypes/any" "google.golang.org/grpc" ) // The plumbing in this file supports converting xDS v3 requests and responses // to and from xDS v2 representations. This is primarily of use for envoy // sidecars configured and launched by Consul versions prior to 1.10. // // Once the servers and client agents in a datacenter have been upgraded to // 1.10+, and all of the sidecars have been restarted with a fresh bootstrap // config file generated by Consul 1.10+ then none of these abstractions are // used. // // At most we only need to retain this logic until our envoy support matrix // looks like: // // - 1.20.x (v2 deleted) // - 1.19.x (v2 deleted) // - 1.18.x (v2 deleted) // - 1.17.x (v2 opt-in) type adsServerV2Shim struct { srv *Server } // StreamAggregatedResources implements // envoy_discovery_v2.AggregatedDiscoveryServiceServer. This is the ADS endpoint which is // the only xDS API we directly support for now. func (s *adsServerV2Shim) StreamAggregatedResources(stream ADSStream_v2) error { shim := &adsStreamV3Shim{ stream: stream, ServerStream: stream, } return s.srv.StreamAggregatedResources(shim) } // DeltaAggregatedResources implements envoy_discovery_v2.AggregatedDiscoveryServiceServer func (s *adsServerV2Shim) DeltaAggregatedResources(_ envoy_discovery_v2.AggregatedDiscoveryService_DeltaAggregatedResourcesServer) error { return errors.New("not implemented") } type adsStreamV3Shim struct { stream ADSStream_v2 grpc.ServerStream } var _ ADSStream = (*adsStreamV3Shim)(nil) func (s *adsStreamV3Shim) Send(resp *envoy_discovery_v3.DiscoveryResponse) error { respv2, err := convertDiscoveryResponseToV2(resp) if err != nil { return fmt.Errorf("Error converting a v3 DiscoveryResponse to v2: %w", err) } return s.stream.Send(respv2) } func (s *adsStreamV3Shim) Recv() (*envoy_discovery_v3.DiscoveryRequest, error) { req, err := s.stream.Recv() if err != nil { return nil, err } reqv3, err := convertDiscoveryRequestToV3(req) if err != nil { return nil, fmt.Errorf("Error converting a v2 DiscoveryRequest to v3: %w", err) } return reqv3, nil } func convertDiscoveryRequestToV3(req *envoy_api_v2.DiscoveryRequest) (*envoy_discovery_v3.DiscoveryRequest, error) { var pbuf proto.Buffer if err := pbuf.Marshal(req); err != nil { return nil, err } var reqV3 envoy_discovery_v3.DiscoveryRequest if err := pbuf.Unmarshal(&reqV3); err != nil { return nil, err } // only one field to munge if err := convertTypeUrlsToV3(&reqV3.TypeUrl); err != nil { return nil, err } return &reqV3, nil } func convertDiscoveryResponseToV2(resp *envoy_discovery_v3.DiscoveryResponse) (*envoy_api_v2.DiscoveryResponse, error) { var pbuf proto.Buffer if err := pbuf.Marshal(resp); err != nil { return nil, err } var respV2 envoy_api_v2.DiscoveryResponse if err := pbuf.Unmarshal(&respV2); err != nil { return nil, err } if err := convertTypedConfigsToV2(&respV2); err != nil { return nil, err } return &respV2, nil } // convertNetFilterToV2 is only used in tests. func convertNetFilterToV2(filter *envoy_listener_v3.Filter) (*envoy_listener_v2.Filter, error) { var pbuf proto.Buffer if err := pbuf.Marshal(filter); err != nil { return nil, err } var filterV2 envoy_listener_v2.Filter if err := pbuf.Unmarshal(&filterV2); err != nil { return nil, err } if err := convertTypedConfigsToV2(&filterV2); err != nil { return nil, err } return &filterV2, nil } // convertHttpFilterToV2 is only used in tests. func convertHttpFilterToV2(filter *envoy_http_v3.HttpFilter) (*envoy_http_v2.HttpFilter, error) { var pbuf proto.Buffer if err := pbuf.Marshal(filter); err != nil { return nil, err } var filterV2 envoy_http_v2.HttpFilter if err := pbuf.Unmarshal(&filterV2); err != nil { return nil, err } if err := convertTypedConfigsToV2(&filterV2); err != nil { return nil, err } return &filterV2, nil } // Responses func convertTypedConfigsToV2(pb proto.Message) error { switch x := pb.(type) { case *envoy_api_v2.DiscoveryResponse: if err := convertTypeUrlsToV2(&x.TypeUrl); err != nil { return fmt.Errorf("%T: %w", x, err) } for _, res := range x.Resources { if err := convertTypedConfigsToV2(res); err != nil { return fmt.Errorf("%T: %w", x, err) } } return nil case *any.Any: // first flip the any.Any to v2 if err := convertTypeUrlsToV2(&x.TypeUrl); err != nil { return fmt.Errorf("%T(%s) convert type urls in envelope: %w", x, x.TypeUrl, err) } // now decode into a v2 type var dynAny ptypes.DynamicAny if err := ptypes.UnmarshalAny(x, &dynAny); err != nil { return fmt.Errorf("%T(%s) dynamic unmarshal: %w", x, x.TypeUrl, err) } // handle the contents and then put them back in the any.Any // handle contents first if err := convertTypedConfigsToV2(dynAny.Message); err != nil { return fmt.Errorf("%T(%s) convert type urls in body: %w", x, x.TypeUrl, err) } anyFixed, err := ptypes.MarshalAny(dynAny.Message) if err != nil { return fmt.Errorf("%T(%s) dynamic re-marshal: %w", x, x.TypeUrl, err) } x.Value = anyFixed.Value return nil case *envoy_api_v2.Listener: for _, chain := range x.FilterChains { if err := convertTypedConfigsToV2(chain); err != nil { return fmt.Errorf("%T: %w", x, err) } } for _, filter := range x.ListenerFilters { // We only ever plumb up the tls_inspector listener filter. if err := convertTypedConfigsToV2(filter); err != nil { return fmt.Errorf("%T: %w", x, err) } } return nil case *envoy_listener_v2.ListenerFilter: // This is really only here for when the tls inspector is for some // random reason plumbed using the @type instead of the name. if x.ConfigType != nil { tc, ok := x.ConfigType.(*envoy_listener_v2.ListenerFilter_TypedConfig) if !ok { return fmt.Errorf("%T: ConfigType type %T not handled", x, x.ConfigType) } if tc.TypedConfig != nil { if err := convertTypedConfigsToV2(tc.TypedConfig); err != nil { return fmt.Errorf("%T: %w", x, err) } } } return nil case *envoy_listener_v2.FilterChain: for _, filter := range x.Filters { if err := convertTypedConfigsToV2(filter); err != nil { return fmt.Errorf("%T: %w", x, err) } } if x.TransportSocket != nil { if err := convertTypedConfigsToV2(x.TransportSocket); err != nil { return fmt.Errorf("%T: %w", x, err) } } return nil case *envoy_listener_v2.Filter: // "envoy.filters.network.tcp_proxy" // "envoy.filters.network.http_connection_manager" // "envoy.filters.network.rbac" // "envoy.filters.network.sni_cluster" if x.ConfigType != nil { tc, ok := x.ConfigType.(*envoy_listener_v2.Filter_TypedConfig) if !ok { return fmt.Errorf("%T: ConfigType type %T not handled", x, x.ConfigType) } if tc.TypedConfig != nil { if err := convertTypedConfigsToV2(tc.TypedConfig); err != nil { return fmt.Errorf("%T: %w", x, err) } } } return nil case *envoy_core_v2.TransportSocket: if x.ConfigType != nil { tc, ok := x.ConfigType.(*envoy_core_v2.TransportSocket_TypedConfig) if !ok { return fmt.Errorf("%T: ConfigType type %T not handled", x, x.ConfigType) } if tc.TypedConfig != nil { if err := convertTypedConfigsToV2(tc.TypedConfig); err != nil { return fmt.Errorf("%T: %w", x, err) } } } return nil case *envoy_api_v2.ClusterLoadAssignment: return nil case *envoy_api_v2.Cluster: if x.TransportSocket != nil { if err := convertTypedConfigsToV2(x.TransportSocket); err != nil { return fmt.Errorf("%T: %w", x, err) } } for _, tsm := range x.TransportSocketMatches { if tsm.TransportSocket != nil { if err := convertTypedConfigsToV2(tsm.TransportSocket); err != nil { return fmt.Errorf("%T: %w", x, err) } } } if x.EdsClusterConfig != nil { if x.EdsClusterConfig.EdsConfig != nil { if err := convertTypedConfigsToV2(x.EdsClusterConfig.EdsConfig); err != nil { return fmt.Errorf("%T: %w", x, err) } } } return nil case *envoy_api_v2.RouteConfiguration: for _, vhost := range x.VirtualHosts { if err := convertTypedConfigsToV2(vhost); err != nil { return fmt.Errorf("%T: %w", x, err) } } if x.Vhds != nil && x.Vhds.ConfigSource != nil { if err := convertTypedConfigsToV2(x.Vhds.ConfigSource); err != nil { return fmt.Errorf("%T: %w", x, err) } } return nil case *envoy_route_v2.VirtualHost: if x.RetryPolicy != nil { if err := convertTypedConfigsToV2(x.RetryPolicy); err != nil { return fmt.Errorf("%T: %w", x, err) } } return nil case *envoy_route_v2.RetryPolicy: return nil case *envoy_http_v2.HttpFilter: if x.ConfigType != nil { tc, ok := x.ConfigType.(*envoy_http_v2.HttpFilter_TypedConfig) if !ok { return fmt.Errorf("%T: ConfigType type %T not handled", x, x.ConfigType) } if tc.TypedConfig != nil { if err := convertTypedConfigsToV2(tc.TypedConfig); err != nil { return fmt.Errorf("%T: %w", x, err) } } } return nil case *envoy_core_v2.ConfigSource: if x.ConfigSourceSpecifier != nil { if _, ok := x.ConfigSourceSpecifier.(*envoy_core_v2.ConfigSource_Ads); !ok { return fmt.Errorf("%T: ConfigSourceSpecifier type %T not handled", x, x.ConfigSourceSpecifier) } } x.ResourceApiVersion = envoy_core_v2.ApiVersion_V2 return nil case *envoy_http_v2.HttpConnectionManager: // "envoy.filters.network.http_connection_manager" if x.RouteSpecifier != nil { switch spec := x.RouteSpecifier.(type) { case *envoy_http_v2.HttpConnectionManager_Rds: if spec.Rds != nil && spec.Rds.ConfigSource != nil { if err := convertTypedConfigsToV2(spec.Rds.ConfigSource); err != nil { return fmt.Errorf("%T: %w", x, err) } } case *envoy_http_v2.HttpConnectionManager_RouteConfig: if spec.RouteConfig != nil { if err := convertTypedConfigsToV2(spec.RouteConfig); err != nil { return fmt.Errorf("%T: %w", x, err) } } default: return fmt.Errorf("%T: RouteSpecifier type %T not handled", x, spec) } } for _, filter := range x.HttpFilters { if err := convertTypedConfigsToV2(filter); err != nil { return fmt.Errorf("%T: %w", x, err) } } if x.Tracing != nil && x.Tracing.Provider != nil && x.Tracing.Provider.ConfigType != nil { tc, ok := x.Tracing.Provider.ConfigType.(*envoy_trace_v2.Tracing_Http_TypedConfig) if !ok { return fmt.Errorf("%T: Tracing.Provider.ConfigType type %T not handled", x, x.Tracing.Provider.ConfigType) } if err := convertTypedConfigsToV2(tc.TypedConfig); err != nil { return fmt.Errorf("%T: %w", x, err) } } return nil case *envoy_tls_inspector_v2.TlsInspector: // "envoy.filters.listener.tls_inspector" return nil case *envoy_tcp_proxy_v2.TcpProxy: // "envoy.filters.network.tcp_proxy" return nil case *envoy_network_rbac_v2.RBAC: // "envoy.filters.network.rbac" return nil case *envoy_sni_v2.SniCluster: // "envoy.filters.network.sni_cluster" return nil case *envoy_http_rbac_v2.RBAC: return nil case *envoy_tls_v2.UpstreamTlsContext: return nil case *envoy_tls_v2.DownstreamTlsContext: return nil default: return fmt.Errorf("could not convert unexpected type to v2: %T", pb) } } func convertTypeUrlsToV2(typeUrl *string) error { if _, ok := typeConvert2to3[*typeUrl]; ok { return nil // already happened } converted, ok := typeConvert3to2[*typeUrl] if !ok { return fmt.Errorf("could not convert type url to v2: %s", *typeUrl) } *typeUrl = converted return nil } func convertTypeUrlsToV3(typeUrl *string) error { if _, ok := typeConvert3to2[*typeUrl]; ok { return nil // already happened } converted, ok := typeConvert2to3[*typeUrl] if !ok { return fmt.Errorf("could not convert type url to v3: %s", *typeUrl) } *typeUrl = converted return nil } var ( typeConvert2to3 map[string]string typeConvert3to2 map[string]string ) func init() { typeConvert2to3 = make(map[string]string) typeConvert3to2 = make(map[string]string) reg := func(type2, type3 string) { if type2 == "" { panic("v2 type is empty") } if type3 == "" { panic("v3 type is empty") } typeConvert2to3[type2] = type3 typeConvert3to2[type3] = type2 } reg2 := func(pb2, pb3 proto.Message) { any2, err := ptypes.MarshalAny(pb2) if err != nil { panic(err) } any3, err := ptypes.MarshalAny(pb3) if err != nil { panic(err) } reg(any2.TypeUrl, any3.TypeUrl) } // primary resources reg2(&envoy_api_v2.Listener{}, &envoy_listener_v3.Listener{}) // LDS reg2(&envoy_api_v2.Cluster{}, &envoy_cluster_v3.Cluster{}) // CDS reg2(&envoy_api_v2.RouteConfiguration{}, &envoy_route_v3.RouteConfiguration{}) // RDS reg2(&envoy_api_v2.ClusterLoadAssignment{}, &envoy_endpoint_v3.ClusterLoadAssignment{}) // EDS // filters reg2(&envoy_http_v2.HttpConnectionManager{}, &envoy_http_v3.HttpConnectionManager{}) // "envoy.filters.network.http_connection_manager" reg2(&envoy_tcp_proxy_v2.TcpProxy{}, &envoy_tcp_proxy_v3.TcpProxy{}) // "envoy.filters.network.tcp_proxy" reg2(&envoy_network_rbac_v2.RBAC{}, &envoy_network_rbac_v3.RBAC{}) // "envoy.filters.network.rbac" reg2(&envoy_http_rbac_v2.RBAC{}, &envoy_http_rbac_v3.RBAC{}) // "envoy.filters.http.rbac reg2(&envoy_tls_inspector_v2.TlsInspector{}, &envoy_tls_inspector_v3.TlsInspector{}) // "envoy.filters.listener.tls_inspector" reg2(&envoy_sni_v2.SniCluster{}, &envoy_sni_v3.SniCluster{}) // "envoy.filters.network.sni_cluster" // cluster tls reg2(&envoy_tls_v2.UpstreamTlsContext{}, &envoy_tls_v3.UpstreamTlsContext{}) reg2(&envoy_tls_v2.DownstreamTlsContext{}, &envoy_tls_v3.DownstreamTlsContext{}) // extension elements reg2(&envoy_metrics_v2.DogStatsdSink{}, &envoy_metrics_v3.DogStatsdSink{}) reg2(&envoy_metrics_v2.StatsdSink{}, &envoy_metrics_v3.StatsdSink{}) reg2(&envoy_trace_v2.ZipkinConfig{}, &envoy_trace_v3.ZipkinConfig{}) }