352 lines
12 KiB
Go
352 lines
12 KiB
Go
|
package proxycfg
|
||
|
|
||
|
import (
|
||
|
"context"
|
||
|
"fmt"
|
||
|
"strings"
|
||
|
|
||
|
"github.com/hashicorp/consul/agent/cache"
|
||
|
cachetype "github.com/hashicorp/consul/agent/cache-types"
|
||
|
"github.com/hashicorp/consul/agent/structs"
|
||
|
)
|
||
|
|
||
|
type handlerConnectProxy struct {
|
||
|
handlerState
|
||
|
}
|
||
|
|
||
|
// initialize sets up the watches needed based on current proxy registration
|
||
|
// state.
|
||
|
func (s *handlerConnectProxy) initialize(ctx context.Context) (ConfigSnapshot, error) {
|
||
|
snap := newConfigSnapshotFromServiceInstance(s.serviceInstance, s.stateConfig)
|
||
|
snap.ConnectProxy.DiscoveryChain = make(map[string]*structs.CompiledDiscoveryChain)
|
||
|
snap.ConnectProxy.WatchedDiscoveryChains = make(map[string]context.CancelFunc)
|
||
|
snap.ConnectProxy.WatchedUpstreams = make(map[string]map[string]context.CancelFunc)
|
||
|
snap.ConnectProxy.WatchedUpstreamEndpoints = make(map[string]map[string]structs.CheckServiceNodes)
|
||
|
snap.ConnectProxy.WatchedGateways = make(map[string]map[string]context.CancelFunc)
|
||
|
snap.ConnectProxy.WatchedGatewayEndpoints = make(map[string]map[string]structs.CheckServiceNodes)
|
||
|
snap.ConnectProxy.WatchedServiceChecks = make(map[structs.ServiceID][]structs.CheckType)
|
||
|
snap.ConnectProxy.PreparedQueryEndpoints = make(map[string]structs.CheckServiceNodes)
|
||
|
snap.ConnectProxy.UpstreamConfig = make(map[string]*structs.Upstream)
|
||
|
snap.ConnectProxy.PassthroughUpstreams = make(map[string]ServicePassthroughAddrs)
|
||
|
|
||
|
// Watch for root changes
|
||
|
err := s.cache.Notify(ctx, cachetype.ConnectCARootName, &structs.DCSpecificRequest{
|
||
|
Datacenter: s.source.Datacenter,
|
||
|
QueryOptions: structs.QueryOptions{Token: s.token},
|
||
|
Source: *s.source,
|
||
|
}, rootsWatchID, s.ch)
|
||
|
if err != nil {
|
||
|
return snap, err
|
||
|
}
|
||
|
|
||
|
// Watch the leaf cert
|
||
|
err = s.cache.Notify(ctx, cachetype.ConnectCALeafName, &cachetype.ConnectCALeafRequest{
|
||
|
Datacenter: s.source.Datacenter,
|
||
|
Token: s.token,
|
||
|
Service: s.proxyCfg.DestinationServiceName,
|
||
|
EnterpriseMeta: s.proxyID.EnterpriseMeta,
|
||
|
}, leafWatchID, s.ch)
|
||
|
if err != nil {
|
||
|
return snap, err
|
||
|
}
|
||
|
|
||
|
// Watch for intention updates
|
||
|
err = s.cache.Notify(ctx, cachetype.IntentionMatchName, &structs.IntentionQueryRequest{
|
||
|
Datacenter: s.source.Datacenter,
|
||
|
QueryOptions: structs.QueryOptions{Token: s.token},
|
||
|
Match: &structs.IntentionQueryMatch{
|
||
|
Type: structs.IntentionMatchDestination,
|
||
|
Entries: []structs.IntentionMatchEntry{
|
||
|
{
|
||
|
Namespace: s.proxyID.NamespaceOrDefault(),
|
||
|
Name: s.proxyCfg.DestinationServiceName,
|
||
|
},
|
||
|
},
|
||
|
},
|
||
|
}, intentionsWatchID, s.ch)
|
||
|
if err != nil {
|
||
|
return snap, err
|
||
|
}
|
||
|
|
||
|
// Watch for service check updates
|
||
|
err = s.cache.Notify(ctx, cachetype.ServiceHTTPChecksName, &cachetype.ServiceHTTPChecksRequest{
|
||
|
ServiceID: s.proxyCfg.DestinationServiceID,
|
||
|
EnterpriseMeta: s.proxyID.EnterpriseMeta,
|
||
|
}, svcChecksWatchIDPrefix+structs.ServiceIDString(s.proxyCfg.DestinationServiceID, &s.proxyID.EnterpriseMeta), s.ch)
|
||
|
if err != nil {
|
||
|
return snap, err
|
||
|
}
|
||
|
|
||
|
// default the namespace to the namespace of this proxy service
|
||
|
currentNamespace := s.proxyID.NamespaceOrDefault()
|
||
|
|
||
|
if s.proxyCfg.Mode == structs.ProxyModeTransparent {
|
||
|
// When in transparent proxy we will infer upstreams from intentions with this source
|
||
|
err := s.cache.Notify(ctx, cachetype.IntentionUpstreamsName, &structs.ServiceSpecificRequest{
|
||
|
Datacenter: s.source.Datacenter,
|
||
|
QueryOptions: structs.QueryOptions{Token: s.token},
|
||
|
ServiceName: s.proxyCfg.DestinationServiceName,
|
||
|
EnterpriseMeta: structs.NewEnterpriseMeta(s.proxyID.NamespaceOrEmpty()),
|
||
|
}, intentionUpstreamsID, s.ch)
|
||
|
if err != nil {
|
||
|
return snap, err
|
||
|
}
|
||
|
|
||
|
err = s.cache.Notify(ctx, cachetype.ConfigEntryName, &structs.ConfigEntryQuery{
|
||
|
Kind: structs.MeshConfig,
|
||
|
Name: structs.MeshConfigMesh,
|
||
|
Datacenter: s.source.Datacenter,
|
||
|
QueryOptions: structs.QueryOptions{Token: s.token},
|
||
|
EnterpriseMeta: *structs.DefaultEnterpriseMeta(),
|
||
|
}, meshConfigEntryID, s.ch)
|
||
|
if err != nil {
|
||
|
return snap, err
|
||
|
}
|
||
|
}
|
||
|
|
||
|
// Watch for updates to service endpoints for all upstreams
|
||
|
for i := range s.proxyCfg.Upstreams {
|
||
|
u := s.proxyCfg.Upstreams[i]
|
||
|
|
||
|
// Store defaults keyed under wildcard so they can be applied to centrally configured upstreams
|
||
|
if u.DestinationName == structs.WildcardSpecifier {
|
||
|
snap.ConnectProxy.UpstreamConfig[u.DestinationID().String()] = &u
|
||
|
continue
|
||
|
}
|
||
|
|
||
|
// This can be true if the upstream is a synthetic entry populated from centralized upstream config.
|
||
|
// Watches should not be created for them.
|
||
|
if u.CentrallyConfigured {
|
||
|
continue
|
||
|
}
|
||
|
snap.ConnectProxy.UpstreamConfig[u.Identifier()] = &u
|
||
|
|
||
|
dc := s.source.Datacenter
|
||
|
if u.Datacenter != "" {
|
||
|
dc = u.Datacenter
|
||
|
}
|
||
|
if s.proxyCfg.Mode == structs.ProxyModeTransparent && (dc == "" || dc == s.source.Datacenter) {
|
||
|
// In transparent proxy mode, watches for upstreams in the local DC are handled by the IntentionUpstreams watch.
|
||
|
continue
|
||
|
}
|
||
|
|
||
|
ns := currentNamespace
|
||
|
if u.DestinationNamespace != "" {
|
||
|
ns = u.DestinationNamespace
|
||
|
}
|
||
|
|
||
|
cfg, err := parseReducedUpstreamConfig(u.Config)
|
||
|
if err != nil {
|
||
|
// Don't hard fail on a config typo, just warn. We'll fall back on
|
||
|
// the plain discovery chain if there is an error so it's safe to
|
||
|
// continue.
|
||
|
s.logger.Warn("failed to parse upstream config",
|
||
|
"upstream", u.Identifier(),
|
||
|
"error", err,
|
||
|
)
|
||
|
}
|
||
|
|
||
|
switch u.DestinationType {
|
||
|
case structs.UpstreamDestTypePreparedQuery:
|
||
|
err = s.cache.Notify(ctx, cachetype.PreparedQueryName, &structs.PreparedQueryExecuteRequest{
|
||
|
Datacenter: dc,
|
||
|
QueryOptions: structs.QueryOptions{Token: s.token, MaxAge: defaultPreparedQueryPollInterval},
|
||
|
QueryIDOrName: u.DestinationName,
|
||
|
Connect: true,
|
||
|
Source: *s.source,
|
||
|
}, "upstream:"+u.Identifier(), s.ch)
|
||
|
if err != nil {
|
||
|
return snap, err
|
||
|
}
|
||
|
|
||
|
case structs.UpstreamDestTypeService:
|
||
|
fallthrough
|
||
|
|
||
|
case "": // Treat unset as the default Service type
|
||
|
err = s.cache.Notify(ctx, cachetype.CompiledDiscoveryChainName, &structs.DiscoveryChainRequest{
|
||
|
Datacenter: s.source.Datacenter,
|
||
|
QueryOptions: structs.QueryOptions{Token: s.token},
|
||
|
Name: u.DestinationName,
|
||
|
EvaluateInDatacenter: dc,
|
||
|
EvaluateInNamespace: ns,
|
||
|
OverrideMeshGateway: s.proxyCfg.MeshGateway.OverlayWith(u.MeshGateway),
|
||
|
OverrideProtocol: cfg.Protocol,
|
||
|
OverrideConnectTimeout: cfg.ConnectTimeout(),
|
||
|
}, "discovery-chain:"+u.Identifier(), s.ch)
|
||
|
if err != nil {
|
||
|
return snap, fmt.Errorf("failed to watch discovery chain for %s: %v", u.Identifier(), err)
|
||
|
}
|
||
|
|
||
|
default:
|
||
|
return snap, fmt.Errorf("unknown upstream type: %q", u.DestinationType)
|
||
|
}
|
||
|
}
|
||
|
|
||
|
return snap, nil
|
||
|
}
|
||
|
|
||
|
func (s *handlerConnectProxy) handleUpdate(ctx context.Context, u cache.UpdateEvent, snap *ConfigSnapshot) error {
|
||
|
if u.Err != nil {
|
||
|
return fmt.Errorf("error filling agent cache: %v", u.Err)
|
||
|
}
|
||
|
|
||
|
switch {
|
||
|
case u.CorrelationID == rootsWatchID:
|
||
|
roots, ok := u.Result.(*structs.IndexedCARoots)
|
||
|
if !ok {
|
||
|
return fmt.Errorf("invalid type for response: %T", u.Result)
|
||
|
}
|
||
|
snap.Roots = roots
|
||
|
case u.CorrelationID == intentionsWatchID:
|
||
|
resp, ok := u.Result.(*structs.IndexedIntentionMatches)
|
||
|
if !ok {
|
||
|
return fmt.Errorf("invalid type for response: %T", u.Result)
|
||
|
}
|
||
|
if len(resp.Matches) > 0 {
|
||
|
// RPC supports matching multiple services at once but we only ever
|
||
|
// query with the one service we represent currently so just pick
|
||
|
// the one result set up.
|
||
|
snap.ConnectProxy.Intentions = resp.Matches[0]
|
||
|
}
|
||
|
snap.ConnectProxy.IntentionsSet = true
|
||
|
|
||
|
case u.CorrelationID == intentionUpstreamsID:
|
||
|
resp, ok := u.Result.(*structs.IndexedServiceList)
|
||
|
if !ok {
|
||
|
return fmt.Errorf("invalid type for response %T", u.Result)
|
||
|
}
|
||
|
|
||
|
seenServices := make(map[string]struct{})
|
||
|
for _, svc := range resp.Services {
|
||
|
seenServices[svc.String()] = struct{}{}
|
||
|
|
||
|
cfgMap := make(map[string]interface{})
|
||
|
u, ok := snap.ConnectProxy.UpstreamConfig[svc.String()]
|
||
|
if ok {
|
||
|
cfgMap = u.Config
|
||
|
} else {
|
||
|
// Use the centralized upstream defaults if they exist and there isn't specific configuration for this upstream
|
||
|
// This is only relevant to upstreams from intentions because for explicit upstreams the defaulting is handled
|
||
|
// by the ResolveServiceConfig endpoint.
|
||
|
wildcardSID := structs.NewServiceID(structs.WildcardSpecifier, structs.WildcardEnterpriseMeta())
|
||
|
defaults, ok := snap.ConnectProxy.UpstreamConfig[wildcardSID.String()]
|
||
|
if ok {
|
||
|
u = defaults
|
||
|
cfgMap = defaults.Config
|
||
|
snap.ConnectProxy.UpstreamConfig[svc.String()] = defaults
|
||
|
}
|
||
|
}
|
||
|
|
||
|
cfg, err := parseReducedUpstreamConfig(cfgMap)
|
||
|
if err != nil {
|
||
|
// Don't hard fail on a config typo, just warn. We'll fall back on
|
||
|
// the plain discovery chain if there is an error so it's safe to
|
||
|
// continue.
|
||
|
s.logger.Warn("failed to parse upstream config",
|
||
|
"upstream", u.Identifier(),
|
||
|
"error", err,
|
||
|
)
|
||
|
}
|
||
|
|
||
|
meshGateway := s.proxyCfg.MeshGateway
|
||
|
if u != nil {
|
||
|
meshGateway = meshGateway.OverlayWith(u.MeshGateway)
|
||
|
}
|
||
|
watchOpts := discoveryChainWatchOpts{
|
||
|
id: svc.String(),
|
||
|
name: svc.Name,
|
||
|
namespace: svc.NamespaceOrDefault(),
|
||
|
datacenter: s.source.Datacenter,
|
||
|
cfg: cfg,
|
||
|
meshGateway: meshGateway,
|
||
|
}
|
||
|
up := &handlerUpstreams{handlerState: s.handlerState}
|
||
|
err = up.watchDiscoveryChain(ctx, snap, watchOpts)
|
||
|
if err != nil {
|
||
|
return fmt.Errorf("failed to watch discovery chain for %s: %v", svc.String(), err)
|
||
|
}
|
||
|
}
|
||
|
|
||
|
// Clean up data from services that were not in the update
|
||
|
for sn := range snap.ConnectProxy.WatchedUpstreams {
|
||
|
if upstream, ok := snap.ConnectProxy.UpstreamConfig[sn]; ok && upstream.Datacenter != "" && upstream.Datacenter != s.source.Datacenter {
|
||
|
continue
|
||
|
}
|
||
|
if _, ok := seenServices[sn]; !ok {
|
||
|
delete(snap.ConnectProxy.WatchedUpstreams, sn)
|
||
|
}
|
||
|
}
|
||
|
for sn := range snap.ConnectProxy.WatchedUpstreamEndpoints {
|
||
|
if upstream, ok := snap.ConnectProxy.UpstreamConfig[sn]; ok && upstream.Datacenter != "" && upstream.Datacenter != s.source.Datacenter {
|
||
|
continue
|
||
|
}
|
||
|
if _, ok := seenServices[sn]; !ok {
|
||
|
delete(snap.ConnectProxy.WatchedUpstreamEndpoints, sn)
|
||
|
}
|
||
|
}
|
||
|
for sn := range snap.ConnectProxy.WatchedGateways {
|
||
|
if upstream, ok := snap.ConnectProxy.UpstreamConfig[sn]; ok && upstream.Datacenter != "" && upstream.Datacenter != s.source.Datacenter {
|
||
|
continue
|
||
|
}
|
||
|
if _, ok := seenServices[sn]; !ok {
|
||
|
delete(snap.ConnectProxy.WatchedGateways, sn)
|
||
|
}
|
||
|
}
|
||
|
for sn := range snap.ConnectProxy.WatchedGatewayEndpoints {
|
||
|
if upstream, ok := snap.ConnectProxy.UpstreamConfig[sn]; ok && upstream.Datacenter != "" && upstream.Datacenter != s.source.Datacenter {
|
||
|
continue
|
||
|
}
|
||
|
if _, ok := seenServices[sn]; !ok {
|
||
|
delete(snap.ConnectProxy.WatchedGatewayEndpoints, sn)
|
||
|
}
|
||
|
}
|
||
|
for sn, cancelFn := range snap.ConnectProxy.WatchedDiscoveryChains {
|
||
|
if upstream, ok := snap.ConnectProxy.UpstreamConfig[sn]; ok && upstream.Datacenter != "" && upstream.Datacenter != s.source.Datacenter {
|
||
|
continue
|
||
|
}
|
||
|
if _, ok := seenServices[sn]; !ok {
|
||
|
cancelFn()
|
||
|
delete(snap.ConnectProxy.WatchedDiscoveryChains, sn)
|
||
|
delete(snap.ConnectProxy.DiscoveryChain, sn)
|
||
|
}
|
||
|
}
|
||
|
|
||
|
case strings.HasPrefix(u.CorrelationID, "upstream:"+preparedQueryIDPrefix):
|
||
|
resp, ok := u.Result.(*structs.PreparedQueryExecuteResponse)
|
||
|
if !ok {
|
||
|
return fmt.Errorf("invalid type for response: %T", u.Result)
|
||
|
}
|
||
|
pq := strings.TrimPrefix(u.CorrelationID, "upstream:")
|
||
|
snap.ConnectProxy.PreparedQueryEndpoints[pq] = resp.Nodes
|
||
|
|
||
|
case strings.HasPrefix(u.CorrelationID, svcChecksWatchIDPrefix):
|
||
|
resp, ok := u.Result.([]structs.CheckType)
|
||
|
if !ok {
|
||
|
return fmt.Errorf("invalid type for service checks response: %T, want: []structs.CheckType", u.Result)
|
||
|
}
|
||
|
svcID := structs.ServiceIDFromString(strings.TrimPrefix(u.CorrelationID, svcChecksWatchIDPrefix))
|
||
|
snap.ConnectProxy.WatchedServiceChecks[svcID] = resp
|
||
|
|
||
|
case u.CorrelationID == meshConfigEntryID:
|
||
|
resp, ok := u.Result.(*structs.ConfigEntryResponse)
|
||
|
if !ok {
|
||
|
return fmt.Errorf("invalid type for response: %T", u.Result)
|
||
|
}
|
||
|
|
||
|
if resp.Entry != nil {
|
||
|
meshConf, ok := resp.Entry.(*structs.MeshConfigEntry)
|
||
|
if !ok {
|
||
|
return fmt.Errorf("invalid type for config entry: %T", resp.Entry)
|
||
|
}
|
||
|
snap.ConnectProxy.MeshConfig = meshConf
|
||
|
} else {
|
||
|
snap.ConnectProxy.MeshConfig = nil
|
||
|
}
|
||
|
snap.ConnectProxy.MeshConfigSet = true
|
||
|
|
||
|
default:
|
||
|
return (*handlerUpstreams)(s).handleUpdateUpstreams(ctx, u, snap)
|
||
|
}
|
||
|
return nil
|
||
|
}
|