package xds import ( "context" "errors" "fmt" "sync/atomic" "time" "github.com/hashicorp/consul/logging" envoy_config_core_v3 "github.com/envoyproxy/go-control-plane/envoy/config/core/v3" envoy_discovery_v2 "github.com/envoyproxy/go-control-plane/envoy/service/discovery/v2" envoy_discovery_v3 "github.com/envoyproxy/go-control-plane/envoy/service/discovery/v3" "github.com/golang/protobuf/proto" "github.com/hashicorp/go-hclog" "google.golang.org/grpc" "google.golang.org/grpc/codes" "google.golang.org/grpc/credentials" "google.golang.org/grpc/metadata" "google.golang.org/grpc/status" "github.com/hashicorp/consul/acl" "github.com/hashicorp/consul/agent/proxycfg" "github.com/hashicorp/consul/agent/structs" "github.com/hashicorp/consul/tlsutil" ) // ADSStream is a shorter way of referring to this thing... type ADSStream = envoy_discovery_v3.AggregatedDiscoveryService_StreamAggregatedResourcesServer type ADSStream_v2 = envoy_discovery_v2.AggregatedDiscoveryService_StreamAggregatedResourcesServer const ( // Resource types in xDS v3. These are copied from // envoyproxy/go-control-plane/pkg/resource/v3/resource.go since we don't need any of // the rest of that package. apiTypePrefix = "type.googleapis.com/" // EndpointType is the TypeURL for Endpoint discovery responses. EndpointType = apiTypePrefix + "envoy.config.endpoint.v3.ClusterLoadAssignment" // ClusterType is the TypeURL for Cluster discovery responses. ClusterType = apiTypePrefix + "envoy.config.cluster.v3.Cluster" // RouteType is the TypeURL for Route discovery responses. RouteType = apiTypePrefix + "envoy.config.route.v3.RouteConfiguration" // ListenerType is the TypeURL for Listener discovery responses. ListenerType = apiTypePrefix + "envoy.config.listener.v3.Listener" // PublicListenerName is the name we give the public listener in Envoy config. PublicListenerName = "public_listener" // OutboundListenerName is the name we give the outbound Envoy listener when TransparentProxy mode is enabled. OutboundListenerName = "outbound_listener" // LocalAppClusterName is the name we give the local application "cluster" in // Envoy config. Note that all cluster names may collide with service names // since we want cluster names and service names to match to enable nice // metrics correlation without massaging prefixes on cluster names. // // We should probably make this more unlikely to collide however changing it // potentially breaks upgrade compatibility without restarting all Envoy's as // it will no longer match their existing cluster name. Changing this will // affect metrics output so could break dashboards (for local app traffic). // // We should probably just make it configurable if anyone actually has // services named "local_app" in the future. LocalAppClusterName = "local_app" // LocalAgentClusterName is the name we give the local agent "cluster" in // Envoy config. Note that all cluster names may collide with service names // since we want cluster names and service names to match to enable nice // metrics correlation without massaging prefixes on cluster names. // // We should probably make this more unlikely to collied however changing it // potentially breaks upgrade compatibility without restarting all Envoy's as // it will no longer match their existing cluster name. Changing this will // affect metrics output so could break dashboards (for local agent traffic). // // We should probably just make it configurable if anyone actually has // services named "local_agent" in the future. LocalAgentClusterName = "local_agent" // OriginalDestinationClusterName is the name we give to the passthrough // cluster which redirects transparently-proxied requests to their original // destination. This cluster prevents Consul from blocking connections to // destinations outside of the catalog when in TransparentProxy mode. OriginalDestinationClusterName = "original-destination" // DefaultAuthCheckFrequency is the default value for // Server.AuthCheckFrequency to use when the zero value is provided. DefaultAuthCheckFrequency = 5 * time.Minute ) // ACLResolverFunc is a shim to resolve ACLs. Since ACL enforcement is so far // entirely agent-local and all uses private methods this allows a simple shim // to be written in the agent package to allow resolving without tightly // coupling this to the agent. type ACLResolverFunc func(id string) (acl.Authorizer, error) // ServiceChecks is the interface the agent needs to expose // for the xDS server to fetch a service's HTTP check definitions type HTTPCheckFetcher interface { ServiceHTTPBasedChecks(serviceID structs.ServiceID) []structs.CheckType } // ConfigFetcher is the interface the agent needs to expose // for the xDS server to fetch agent config, currently only one field is fetched type ConfigFetcher interface { AdvertiseAddrLAN() string } // ConfigManager is the interface xds.Server requires to consume proxy config // updates. It's satisfied normally by the agent's proxycfg.Manager, but allows // easier testing without several layers of mocked cache, local state and // proxycfg.Manager. type ConfigManager interface { Watch(proxyID structs.ServiceID) (<-chan *proxycfg.ConfigSnapshot, proxycfg.CancelFunc) } // Server represents a gRPC server that can handle xDS requests from Envoy. All // of it's public members must be set before the gRPC server is started. // // A full description of the XDS protocol can be found at // https://www.envoyproxy.io/docs/envoy/latest/api-docs/xds_protocol type Server struct { Logger hclog.Logger CfgMgr ConfigManager ResolveToken ACLResolverFunc // AuthCheckFrequency is how often we should re-check the credentials used // during a long-lived gRPC Stream after it has been initially established. // This is only used during idle periods of stream interactions (i.e. when // there has been no recent DiscoveryRequest). AuthCheckFrequency time.Duration CheckFetcher HTTPCheckFetcher CfgFetcher ConfigFetcher DisableV2Protocol bool } // StreamAggregatedResources implements // envoy_discovery_v3.AggregatedDiscoveryServiceServer. This is the ADS endpoint which is // the only xDS API we directly support for now. func (s *Server) StreamAggregatedResources(stream ADSStream) error { // a channel for receiving incoming requests reqCh := make(chan *envoy_discovery_v3.DiscoveryRequest) reqStop := int32(0) go func() { for { req, err := stream.Recv() if atomic.LoadInt32(&reqStop) != 0 { return } if err != nil { close(reqCh) return } reqCh <- req } }() err := s.process(stream, reqCh) if err != nil { s.Logger.Error("Error handling ADS stream", "xdsVersion", "v3", "error", err) } // prevents writing to a closed channel if send failed on blocked recv atomic.StoreInt32(&reqStop, 1) return err } const ( stateInit int = iota statePendingInitialConfig stateRunning ) func (s *Server) process(stream ADSStream, reqCh <-chan *envoy_discovery_v3.DiscoveryRequest) error { logger := s.Logger.Named(logging.XDS) // xDS requires a unique nonce to correlate response/request pairs var nonce uint64 // xDS works with versions of configs. Internally we don't have a consistent // version. We could hash the config since versions don't have to be // ordered as far as I can tell, but it is cheaper to increment a counter // every time we observe a new config since the upstream proxycfg package only // delivers updates when there are actual changes. var configVersion uint64 // Loop state var ( cfgSnap *proxycfg.ConfigSnapshot req *envoy_discovery_v3.DiscoveryRequest node *envoy_config_core_v3.Node proxyFeatures supportedProxyFeatures ok bool stateCh <-chan *proxycfg.ConfigSnapshot watchCancel func() proxyID structs.ServiceID ) // need to run a small state machine to get through initial authentication. var state = stateInit // Configure handlers for each type of request handlers := map[string]*xDSType{ EndpointType: { typeURL: EndpointType, resources: s.endpointsFromSnapshot, stream: stream, }, ClusterType: { typeURL: ClusterType, resources: s.clustersFromSnapshot, stream: stream, allowEmptyFn: func(cfgSnap *proxycfg.ConfigSnapshot) bool { // Mesh, Ingress, and Terminating gateways are allowed to inform CDS of // no clusters. return cfgSnap.Kind == structs.ServiceKindMeshGateway || cfgSnap.Kind == structs.ServiceKindTerminatingGateway || cfgSnap.Kind == structs.ServiceKindIngressGateway }, }, RouteType: { typeURL: RouteType, resources: s.routesFromSnapshot, stream: stream, allowEmptyFn: func(cfgSnap *proxycfg.ConfigSnapshot) bool { return cfgSnap.Kind == structs.ServiceKindIngressGateway }, }, ListenerType: { typeURL: ListenerType, resources: s.listenersFromSnapshot, stream: stream, allowEmptyFn: func(cfgSnap *proxycfg.ConfigSnapshot) bool { return cfgSnap.Kind == structs.ServiceKindIngressGateway }, }, } var authTimer <-chan time.Time extendAuthTimer := func() { authTimer = time.After(s.AuthCheckFrequency) } checkStreamACLs := func(cfgSnap *proxycfg.ConfigSnapshot) error { if cfgSnap == nil { return status.Errorf(codes.Unauthenticated, "unauthenticated: no config snapshot") } rule, err := s.ResolveToken(tokenFromContext(stream.Context())) if acl.IsErrNotFound(err) { return status.Errorf(codes.Unauthenticated, "unauthenticated: %v", err) } else if acl.IsErrPermissionDenied(err) { return status.Errorf(codes.PermissionDenied, "permission denied: %v", err) } else if err != nil { return err } var authzContext acl.AuthorizerContext switch cfgSnap.Kind { case structs.ServiceKindConnectProxy: cfgSnap.ProxyID.EnterpriseMeta.FillAuthzContext(&authzContext) if rule != nil && rule.ServiceWrite(cfgSnap.Proxy.DestinationServiceName, &authzContext) != acl.Allow { return status.Errorf(codes.PermissionDenied, "permission denied") } case structs.ServiceKindMeshGateway, structs.ServiceKindTerminatingGateway, structs.ServiceKindIngressGateway: cfgSnap.ProxyID.EnterpriseMeta.FillAuthzContext(&authzContext) if rule != nil && rule.ServiceWrite(cfgSnap.Service, &authzContext) != acl.Allow { return status.Errorf(codes.PermissionDenied, "permission denied") } default: return status.Errorf(codes.Internal, "Invalid service kind") } // Authed OK! return nil } for { select { case <-authTimer: // It's been too long since a Discovery{Request,Response} so recheck ACLs. if err := checkStreamACLs(cfgSnap); err != nil { return err } extendAuthTimer() case req, ok = <-reqCh: if !ok { // reqCh is closed when stream.Recv errors which is how we detect client // going away. AFAICT the stream.Context() is only canceled once the // RPC method returns which it can't until we return from this one so // there's no point in blocking on that. return nil } if req.TypeUrl == "" { return status.Errorf(codes.InvalidArgument, "type URL is required for ADS") } if node == nil && req.Node != nil { node = req.Node var err error proxyFeatures, err = determineSupportedProxyFeatures(req.Node) if err != nil { return status.Errorf(codes.InvalidArgument, err.Error()) } } if handler, ok := handlers[req.TypeUrl]; ok { handler.Recv(req, node, proxyFeatures) } case cfgSnap = <-stateCh: // We got a new config, update the version counter configVersion++ } // Trigger state machine switch state { case stateInit: if req == nil { // This can't happen (tm) since stateCh is nil until after the first req // is received but lets not panic about it. continue } // Start authentication process, we need the proxyID proxyID = structs.NewServiceID(req.Node.Id, parseEnterpriseMeta(req.Node)) // Start watching config for that proxy stateCh, watchCancel = s.CfgMgr.Watch(proxyID) // Note that in this case we _intend_ the defer to only be triggered when // this whole process method ends (i.e. when streaming RPC aborts) not at // the end of the current loop iteration. We have to do it in the loop // here since we can't start watching until we get to this state in the // state machine. defer watchCancel() logger.Trace("watching proxy, pending initial proxycfg snapshot", "service_id", proxyID.String()) // Now wait for the config so we can check ACL state = statePendingInitialConfig case statePendingInitialConfig: if cfgSnap == nil { // Nothing we can do until we get the initial config continue } // Got config, try to authenticate next. state = stateRunning logger.Trace("Got initial config snapshot", "service_id", cfgSnap.ProxyID.String()) // Lets actually process the config we just got or we'll mis responding fallthrough case stateRunning: // Check ACLs on every Discovery{Request,Response}. if err := checkStreamACLs(cfgSnap); err != nil { return err } // For the first time through the state machine, this is when the // timer is first started. extendAuthTimer() logger.Trace("Invoking all xDS resource handlers and sending new data if there is any", "service_id", cfgSnap.ProxyID.String()) // See if any handlers need to have the current (possibly new) config // sent. Note the order here is actually significant so we can't just // range the map which has no determined order. It's important because: // // 1. Envoy needs to see a consistent snapshot to avoid potentially // dropping traffic due to inconsistencies. This is the // main win of ADS after all - we get to control this order. // 2. Non-determinsic order of complex protobuf responses which are // compared for non-exact JSON equivalence makes the tests uber-messy // to handle for _, typeURL := range []string{ClusterType, EndpointType, RouteType, ListenerType} { handler := handlers[typeURL] if err := handler.SendIfNew(cfgSnap, configVersion, &nonce); err != nil { return err } } } } } type xDSType struct { typeURL string stream ADSStream req *envoy_discovery_v3.DiscoveryRequest node *envoy_config_core_v3.Node proxyFeatures supportedProxyFeatures lastNonce string // lastVersion is the version that was last sent to the proxy. It is needed // because we don't want to send the same version more than once. // req.VersionInfo may be an older version than the most recent once sent in // two cases: 1) if the ACK wasn't received yet and `req` still points to the // previous request we already responded to and 2) if the proxy rejected the // last version we sent with a Nack then req.VersionInfo will be the older // version it's hanging on to. lastVersion uint64 resources func(cInfo connectionInfo, cfgSnap *proxycfg.ConfigSnapshot) ([]proto.Message, error) allowEmptyFn func(cfgSnap *proxycfg.ConfigSnapshot) bool } // connectionInfo represents details specific to this connection type connectionInfo struct { Token string ProxyFeatures supportedProxyFeatures } func (t *xDSType) Recv(req *envoy_discovery_v3.DiscoveryRequest, node *envoy_config_core_v3.Node, proxyFeatures supportedProxyFeatures) { if t.lastNonce == "" || t.lastNonce == req.GetResponseNonce() { t.req = req t.node = node t.proxyFeatures = proxyFeatures } } func (t *xDSType) SendIfNew(cfgSnap *proxycfg.ConfigSnapshot, version uint64, nonce *uint64) error { if t.req == nil { return nil } if t.lastVersion >= version { // Already sent this version return nil } cInfo := connectionInfo{ Token: tokenFromContext(t.stream.Context()), ProxyFeatures: t.proxyFeatures, } resources, err := t.resources(cInfo, cfgSnap) if err != nil { return err } allowEmpty := t.allowEmptyFn != nil && t.allowEmptyFn(cfgSnap) // Zero length resource responses should be ignored and are the result of no // data yet. Notice that this caused a bug originally where we had zero // healthy endpoints for an upstream that would cause Envoy to hang waiting // for the EDS response. This is fixed though by ensuring we send an explicit // empty LoadAssignment resource for the cluster rather than allowing junky // empty resources. if len(resources) == 0 && !allowEmpty { // Nothing to send yet return nil } // Note we only increment nonce when we actually send - not important for // correctness but makes tests much simpler when we skip a type like Routes // with nothing to send. *nonce++ nonceStr := fmt.Sprintf("%08x", *nonce) versionStr := fmt.Sprintf("%08x", version) resp, err := createResponse(t.typeURL, versionStr, nonceStr, resources) if err != nil { return err } err = t.stream.Send(resp) if err != nil { return err } t.lastVersion = version t.lastNonce = nonceStr return nil } func tokenFromContext(ctx context.Context) string { md, ok := metadata.FromIncomingContext(ctx) if !ok { return "" } toks, ok := md["x-consul-token"] if ok && len(toks) > 0 { return toks[0] } return "" } // DeltaAggregatedResources implements envoy_discovery_v3.AggregatedDiscoveryServiceServer func (s *Server) DeltaAggregatedResources(_ envoy_discovery_v3.AggregatedDiscoveryService_DeltaAggregatedResourcesServer) error { return errors.New("not implemented") } // GRPCServer returns a server instance that can handle xDS requests. func (s *Server) GRPCServer(tlsConfigurator *tlsutil.Configurator) (*grpc.Server, error) { opts := []grpc.ServerOption{ grpc.MaxConcurrentStreams(2048), } if tlsConfigurator != nil { if tlsConfigurator.Cert() != nil { creds := credentials.NewTLS(tlsConfigurator.IncomingGRPCConfig()) opts = append(opts, grpc.Creds(creds)) } } srv := grpc.NewServer(opts...) envoy_discovery_v3.RegisterAggregatedDiscoveryServiceServer(srv, s) if !s.DisableV2Protocol { envoy_discovery_v2.RegisterAggregatedDiscoveryServiceServer(srv, &adsServerV2Shim{srv: s}) } return srv, nil }