package proxycfg import ( "context" "fmt" "strings" "github.com/hashicorp/consul/agent/cache" cachetype "github.com/hashicorp/consul/agent/cache-types" "github.com/hashicorp/consul/agent/structs" ) type handlerConnectProxy struct { handlerState } // initialize sets up the watches needed based on current proxy registration // state. func (s *handlerConnectProxy) initialize(ctx context.Context) (ConfigSnapshot, error) { snap := newConfigSnapshotFromServiceInstance(s.serviceInstance, s.stateConfig) snap.ConnectProxy.DiscoveryChain = make(map[string]*structs.CompiledDiscoveryChain) snap.ConnectProxy.WatchedDiscoveryChains = make(map[string]context.CancelFunc) snap.ConnectProxy.WatchedUpstreams = make(map[string]map[string]context.CancelFunc) snap.ConnectProxy.WatchedUpstreamEndpoints = make(map[string]map[string]structs.CheckServiceNodes) snap.ConnectProxy.WatchedGateways = make(map[string]map[string]context.CancelFunc) snap.ConnectProxy.WatchedGatewayEndpoints = make(map[string]map[string]structs.CheckServiceNodes) snap.ConnectProxy.WatchedServiceChecks = make(map[structs.ServiceID][]structs.CheckType) snap.ConnectProxy.PreparedQueryEndpoints = make(map[string]structs.CheckServiceNodes) snap.ConnectProxy.UpstreamConfig = make(map[string]*structs.Upstream) snap.ConnectProxy.PassthroughUpstreams = make(map[string]ServicePassthroughAddrs) // Watch for root changes err := s.cache.Notify(ctx, cachetype.ConnectCARootName, &structs.DCSpecificRequest{ Datacenter: s.source.Datacenter, QueryOptions: structs.QueryOptions{Token: s.token}, Source: *s.source, }, rootsWatchID, s.ch) if err != nil { return snap, err } // Watch the leaf cert err = s.cache.Notify(ctx, cachetype.ConnectCALeafName, &cachetype.ConnectCALeafRequest{ Datacenter: s.source.Datacenter, Token: s.token, Service: s.proxyCfg.DestinationServiceName, EnterpriseMeta: s.proxyID.EnterpriseMeta, }, leafWatchID, s.ch) if err != nil { return snap, err } // Watch for intention updates err = s.cache.Notify(ctx, cachetype.IntentionMatchName, &structs.IntentionQueryRequest{ Datacenter: s.source.Datacenter, QueryOptions: structs.QueryOptions{Token: s.token}, Match: &structs.IntentionQueryMatch{ Type: structs.IntentionMatchDestination, Entries: []structs.IntentionMatchEntry{ { Namespace: s.proxyID.NamespaceOrDefault(), Name: s.proxyCfg.DestinationServiceName, }, }, }, }, intentionsWatchID, s.ch) if err != nil { return snap, err } // Watch for service check updates err = s.cache.Notify(ctx, cachetype.ServiceHTTPChecksName, &cachetype.ServiceHTTPChecksRequest{ ServiceID: s.proxyCfg.DestinationServiceID, EnterpriseMeta: s.proxyID.EnterpriseMeta, }, svcChecksWatchIDPrefix+structs.ServiceIDString(s.proxyCfg.DestinationServiceID, &s.proxyID.EnterpriseMeta), s.ch) if err != nil { return snap, err } // default the namespace to the namespace of this proxy service currentNamespace := s.proxyID.NamespaceOrDefault() if s.proxyCfg.Mode == structs.ProxyModeTransparent { // When in transparent proxy we will infer upstreams from intentions with this source err := s.cache.Notify(ctx, cachetype.IntentionUpstreamsName, &structs.ServiceSpecificRequest{ Datacenter: s.source.Datacenter, QueryOptions: structs.QueryOptions{Token: s.token}, ServiceName: s.proxyCfg.DestinationServiceName, EnterpriseMeta: s.proxyID.EnterpriseMeta, }, intentionUpstreamsID, s.ch) if err != nil { return snap, err } err = s.cache.Notify(ctx, cachetype.ConfigEntryName, &structs.ConfigEntryQuery{ Kind: structs.MeshConfig, Name: structs.MeshConfigMesh, Datacenter: s.source.Datacenter, QueryOptions: structs.QueryOptions{Token: s.token}, EnterpriseMeta: *s.proxyID.DefaultEnterpriseMetaForPartition(), }, meshConfigEntryID, s.ch) if err != nil { return snap, err } } // Watch for updates to service endpoints for all upstreams for i := range s.proxyCfg.Upstreams { u := s.proxyCfg.Upstreams[i] // Store defaults keyed under wildcard so they can be applied to centrally configured upstreams if u.DestinationName == structs.WildcardSpecifier { snap.ConnectProxy.UpstreamConfig[u.DestinationID().String()] = &u continue } // This can be true if the upstream is a synthetic entry populated from centralized upstream config. // Watches should not be created for them. if u.CentrallyConfigured { continue } snap.ConnectProxy.UpstreamConfig[u.Identifier()] = &u dc := s.source.Datacenter if u.Datacenter != "" { dc = u.Datacenter } if s.proxyCfg.Mode == structs.ProxyModeTransparent && (dc == "" || dc == s.source.Datacenter) { // In transparent proxy mode, watches for upstreams in the local DC are handled by the IntentionUpstreams watch. continue } ns := currentNamespace if u.DestinationNamespace != "" { ns = u.DestinationNamespace } cfg, err := parseReducedUpstreamConfig(u.Config) if err != nil { // Don't hard fail on a config typo, just warn. We'll fall back on // the plain discovery chain if there is an error so it's safe to // continue. s.logger.Warn("failed to parse upstream config", "upstream", u.Identifier(), "error", err, ) } switch u.DestinationType { case structs.UpstreamDestTypePreparedQuery: err = s.cache.Notify(ctx, cachetype.PreparedQueryName, &structs.PreparedQueryExecuteRequest{ Datacenter: dc, QueryOptions: structs.QueryOptions{Token: s.token, MaxAge: defaultPreparedQueryPollInterval}, QueryIDOrName: u.DestinationName, Connect: true, Source: *s.source, }, "upstream:"+u.Identifier(), s.ch) if err != nil { return snap, err } case structs.UpstreamDestTypeService: fallthrough // TODO (partition): pass Partition to DiscoveryChainRequest? case "": // Treat unset as the default Service type err = s.cache.Notify(ctx, cachetype.CompiledDiscoveryChainName, &structs.DiscoveryChainRequest{ Datacenter: s.source.Datacenter, QueryOptions: structs.QueryOptions{Token: s.token}, Name: u.DestinationName, EvaluateInDatacenter: dc, EvaluateInNamespace: ns, OverrideMeshGateway: s.proxyCfg.MeshGateway.OverlayWith(u.MeshGateway), OverrideProtocol: cfg.Protocol, OverrideConnectTimeout: cfg.ConnectTimeout(), }, "discovery-chain:"+u.Identifier(), s.ch) if err != nil { return snap, fmt.Errorf("failed to watch discovery chain for %s: %v", u.Identifier(), err) } default: return snap, fmt.Errorf("unknown upstream type: %q", u.DestinationType) } } return snap, nil } func (s *handlerConnectProxy) handleUpdate(ctx context.Context, u cache.UpdateEvent, snap *ConfigSnapshot) error { if u.Err != nil { return fmt.Errorf("error filling agent cache: %v", u.Err) } switch { case u.CorrelationID == rootsWatchID: roots, ok := u.Result.(*structs.IndexedCARoots) if !ok { return fmt.Errorf("invalid type for response: %T", u.Result) } snap.Roots = roots case u.CorrelationID == intentionsWatchID: resp, ok := u.Result.(*structs.IndexedIntentionMatches) if !ok { return fmt.Errorf("invalid type for response: %T", u.Result) } if len(resp.Matches) > 0 { // RPC supports matching multiple services at once but we only ever // query with the one service we represent currently so just pick // the one result set up. snap.ConnectProxy.Intentions = resp.Matches[0] } snap.ConnectProxy.IntentionsSet = true case u.CorrelationID == intentionUpstreamsID: resp, ok := u.Result.(*structs.IndexedServiceList) if !ok { return fmt.Errorf("invalid type for response %T", u.Result) } seenServices := make(map[string]struct{}) for _, svc := range resp.Services { seenServices[svc.String()] = struct{}{} cfgMap := make(map[string]interface{}) u, ok := snap.ConnectProxy.UpstreamConfig[svc.String()] if ok { cfgMap = u.Config } else { // Use the centralized upstream defaults if they exist and there isn't specific configuration for this upstream // This is only relevant to upstreams from intentions because for explicit upstreams the defaulting is handled // by the ResolveServiceConfig endpoint. wildcardSID := structs.NewServiceID(structs.WildcardSpecifier, s.proxyID.WildcardEnterpriseMetaForPartition()) defaults, ok := snap.ConnectProxy.UpstreamConfig[wildcardSID.String()] if ok { u = defaults cfgMap = defaults.Config snap.ConnectProxy.UpstreamConfig[svc.String()] = defaults } } cfg, err := parseReducedUpstreamConfig(cfgMap) if err != nil { // Don't hard fail on a config typo, just warn. We'll fall back on // the plain discovery chain if there is an error so it's safe to // continue. s.logger.Warn("failed to parse upstream config", "upstream", u.Identifier(), "error", err, ) } meshGateway := s.proxyCfg.MeshGateway if u != nil { meshGateway = meshGateway.OverlayWith(u.MeshGateway) } watchOpts := discoveryChainWatchOpts{ id: svc.String(), name: svc.Name, namespace: svc.NamespaceOrDefault(), datacenter: s.source.Datacenter, cfg: cfg, meshGateway: meshGateway, } up := &handlerUpstreams{handlerState: s.handlerState} err = up.watchDiscoveryChain(ctx, snap, watchOpts) if err != nil { return fmt.Errorf("failed to watch discovery chain for %s: %v", svc.String(), err) } } // Clean up data from services that were not in the update for sn := range snap.ConnectProxy.WatchedUpstreams { if upstream, ok := snap.ConnectProxy.UpstreamConfig[sn]; ok && upstream.Datacenter != "" && upstream.Datacenter != s.source.Datacenter { continue } if _, ok := seenServices[sn]; !ok { delete(snap.ConnectProxy.WatchedUpstreams, sn) } } for sn := range snap.ConnectProxy.WatchedUpstreamEndpoints { if upstream, ok := snap.ConnectProxy.UpstreamConfig[sn]; ok && upstream.Datacenter != "" && upstream.Datacenter != s.source.Datacenter { continue } if _, ok := seenServices[sn]; !ok { delete(snap.ConnectProxy.WatchedUpstreamEndpoints, sn) } } for sn := range snap.ConnectProxy.WatchedGateways { if upstream, ok := snap.ConnectProxy.UpstreamConfig[sn]; ok && upstream.Datacenter != "" && upstream.Datacenter != s.source.Datacenter { continue } if _, ok := seenServices[sn]; !ok { delete(snap.ConnectProxy.WatchedGateways, sn) } } for sn := range snap.ConnectProxy.WatchedGatewayEndpoints { if upstream, ok := snap.ConnectProxy.UpstreamConfig[sn]; ok && upstream.Datacenter != "" && upstream.Datacenter != s.source.Datacenter { continue } if _, ok := seenServices[sn]; !ok { delete(snap.ConnectProxy.WatchedGatewayEndpoints, sn) } } for sn, cancelFn := range snap.ConnectProxy.WatchedDiscoveryChains { if upstream, ok := snap.ConnectProxy.UpstreamConfig[sn]; ok && upstream.Datacenter != "" && upstream.Datacenter != s.source.Datacenter { continue } if _, ok := seenServices[sn]; !ok { cancelFn() delete(snap.ConnectProxy.WatchedDiscoveryChains, sn) delete(snap.ConnectProxy.DiscoveryChain, sn) } } case strings.HasPrefix(u.CorrelationID, "upstream:"+preparedQueryIDPrefix): resp, ok := u.Result.(*structs.PreparedQueryExecuteResponse) if !ok { return fmt.Errorf("invalid type for response: %T", u.Result) } pq := strings.TrimPrefix(u.CorrelationID, "upstream:") snap.ConnectProxy.PreparedQueryEndpoints[pq] = resp.Nodes case strings.HasPrefix(u.CorrelationID, svcChecksWatchIDPrefix): resp, ok := u.Result.([]structs.CheckType) if !ok { return fmt.Errorf("invalid type for service checks response: %T, want: []structs.CheckType", u.Result) } svcID := structs.ServiceIDFromString(strings.TrimPrefix(u.CorrelationID, svcChecksWatchIDPrefix)) snap.ConnectProxy.WatchedServiceChecks[svcID] = resp case u.CorrelationID == meshConfigEntryID: resp, ok := u.Result.(*structs.ConfigEntryResponse) if !ok { return fmt.Errorf("invalid type for response: %T", u.Result) } if resp.Entry != nil { meshConf, ok := resp.Entry.(*structs.MeshConfigEntry) if !ok { return fmt.Errorf("invalid type for config entry: %T", resp.Entry) } snap.ConnectProxy.MeshConfig = meshConf } else { snap.ConnectProxy.MeshConfig = nil } snap.ConnectProxy.MeshConfigSet = true default: return (*handlerUpstreams)(s).handleUpdateUpstreams(ctx, u, snap) } return nil }