proxycfg-glue: server-local implementation of ResolvedServiceConfig

This is the OSS portion of enterprise PR 2460.

Introduces a server-local implementation of the proxycfg.ResolvedServiceConfig
interface that sources data from a blocking query against the server's state
store.

It moves the service config resolution logic into the agent/configentry package
so that it can be used in both the RPC handler and data source.

I've also done a little re-arranging and adding comments to call out data
sources for which there is to be no server-local equivalent.
This commit is contained in:
Daniel Upton 2022-09-01 15:45:07 +01:00 committed by Dan Upton
parent 3664ac54a5
commit 8cd6c9f95e
11 changed files with 532 additions and 310 deletions

View file

@ -4269,6 +4269,7 @@ func (a *Agent) proxyDataSources() proxycfg.DataSources {
sources.Intentions = proxycfgglue.ServerIntentions(deps)
sources.IntentionUpstreams = proxycfgglue.ServerIntentionUpstreams(deps)
sources.PeeredUpstreams = proxycfgglue.ServerPeeredUpstreams(deps)
sources.ResolvedServiceConfig = proxycfgglue.ServerResolvedServiceConfig(deps, proxycfgglue.CacheResolvedServiceConfig(a.cache))
sources.ServiceList = proxycfgglue.ServerServiceList(deps, proxycfgglue.CacheServiceList(a.cache))
sources.TrustBundle = proxycfgglue.ServerTrustBundle(deps)
sources.TrustBundleList = proxycfgglue.ServerTrustBundleList(deps)

View file

@ -0,0 +1,229 @@
package configentry
import (
"fmt"
"github.com/hashicorp/go-hclog"
"github.com/mitchellh/copystructure"
"github.com/hashicorp/consul/agent/structs"
)
func ComputeResolvedServiceConfig(
args *structs.ServiceConfigRequest,
upstreamIDs []structs.ServiceID,
legacyUpstreams bool,
entries *ResolvedServiceConfigSet,
logger hclog.Logger,
) (*structs.ServiceConfigResponse, error) {
var thisReply structs.ServiceConfigResponse
thisReply.MeshGateway.Mode = structs.MeshGatewayModeDefault
// TODO(freddy) Refactor this into smaller set of state store functions
// Pass the WatchSet to both the service and proxy config lookups. If either is updated during the
// blocking query, this function will be rerun and these state store lookups will both be current.
// We use the default enterprise meta to look up the global proxy defaults because they are not namespaced.
var proxyConfGlobalProtocol string
proxyConf := entries.GetProxyDefaults(args.PartitionOrDefault())
if proxyConf != nil {
// Apply the proxy defaults to the sidecar's proxy config
mapCopy, err := copystructure.Copy(proxyConf.Config)
if err != nil {
return nil, fmt.Errorf("failed to copy global proxy-defaults: %v", err)
}
thisReply.ProxyConfig = mapCopy.(map[string]interface{})
thisReply.Mode = proxyConf.Mode
thisReply.TransparentProxy = proxyConf.TransparentProxy
thisReply.MeshGateway = proxyConf.MeshGateway
thisReply.Expose = proxyConf.Expose
// Extract the global protocol from proxyConf for upstream configs.
rawProtocol := proxyConf.Config["protocol"]
if rawProtocol != nil {
var ok bool
proxyConfGlobalProtocol, ok = rawProtocol.(string)
if !ok {
return nil, fmt.Errorf("invalid protocol type %T", rawProtocol)
}
}
}
serviceConf := entries.GetServiceDefaults(
structs.NewServiceID(args.Name, &args.EnterpriseMeta),
)
if serviceConf != nil {
if serviceConf.Expose.Checks {
thisReply.Expose.Checks = true
}
if len(serviceConf.Expose.Paths) >= 1 {
thisReply.Expose.Paths = serviceConf.Expose.Paths
}
if serviceConf.MeshGateway.Mode != structs.MeshGatewayModeDefault {
thisReply.MeshGateway.Mode = serviceConf.MeshGateway.Mode
}
if serviceConf.Protocol != "" {
if thisReply.ProxyConfig == nil {
thisReply.ProxyConfig = make(map[string]interface{})
}
thisReply.ProxyConfig["protocol"] = serviceConf.Protocol
}
if serviceConf.TransparentProxy.OutboundListenerPort != 0 {
thisReply.TransparentProxy.OutboundListenerPort = serviceConf.TransparentProxy.OutboundListenerPort
}
if serviceConf.TransparentProxy.DialedDirectly {
thisReply.TransparentProxy.DialedDirectly = serviceConf.TransparentProxy.DialedDirectly
}
if serviceConf.Mode != structs.ProxyModeDefault {
thisReply.Mode = serviceConf.Mode
}
if serviceConf.Destination != nil {
thisReply.Destination = *serviceConf.Destination
}
if serviceConf.MaxInboundConnections > 0 {
if thisReply.ProxyConfig == nil {
thisReply.ProxyConfig = map[string]interface{}{}
}
thisReply.ProxyConfig["max_inbound_connections"] = serviceConf.MaxInboundConnections
}
thisReply.Meta = serviceConf.Meta
}
// First collect all upstreams into a set of seen upstreams.
// Upstreams can come from:
// - Explicitly from proxy registrations, and therefore as an argument to this RPC endpoint
// - Implicitly from centralized upstream config in service-defaults
seenUpstreams := map[structs.ServiceID]struct{}{}
var (
noUpstreamArgs = len(upstreamIDs) == 0 && len(args.Upstreams) == 0
// Check the args and the resolved value. If it was exclusively set via a config entry, then args.Mode
// will never be transparent because the service config request does not use the resolved value.
tproxy = args.Mode == structs.ProxyModeTransparent || thisReply.Mode == structs.ProxyModeTransparent
)
// The upstreams passed as arguments to this endpoint are the upstreams explicitly defined in a proxy registration.
// If no upstreams were passed, then we should only return the resolved config if the proxy is in transparent mode.
// Otherwise we would return a resolved upstream config to a proxy with no configured upstreams.
if noUpstreamArgs && !tproxy {
return &thisReply, nil
}
// First store all upstreams that were provided in the request
for _, sid := range upstreamIDs {
if _, ok := seenUpstreams[sid]; !ok {
seenUpstreams[sid] = struct{}{}
}
}
// Then store upstreams inferred from service-defaults and mapify the overrides.
var (
upstreamConfigs = make(map[structs.ServiceID]*structs.UpstreamConfig)
upstreamDefaults *structs.UpstreamConfig
// usConfigs stores the opaque config map for each upstream and is keyed on the upstream's ID.
usConfigs = make(map[structs.ServiceID]map[string]interface{})
)
if serviceConf != nil && serviceConf.UpstreamConfig != nil {
for i, override := range serviceConf.UpstreamConfig.Overrides {
if override.Name == "" {
logger.Warn(
"Skipping UpstreamConfig.Overrides entry without a required name field",
"entryIndex", i,
"kind", serviceConf.GetKind(),
"name", serviceConf.GetName(),
"namespace", serviceConf.GetEnterpriseMeta().NamespaceOrEmpty(),
)
continue // skip this impossible condition
}
seenUpstreams[override.ServiceID()] = struct{}{}
upstreamConfigs[override.ServiceID()] = override
}
if serviceConf.UpstreamConfig.Defaults != nil {
upstreamDefaults = serviceConf.UpstreamConfig.Defaults
// Store the upstream defaults under a wildcard key so that they can be applied to
// upstreams that are inferred from intentions and do not have explicit upstream configuration.
cfgMap := make(map[string]interface{})
upstreamDefaults.MergeInto(cfgMap)
wildcard := structs.NewServiceID(structs.WildcardSpecifier, args.WithWildcardNamespace())
usConfigs[wildcard] = cfgMap
}
}
for upstream := range seenUpstreams {
resolvedCfg := make(map[string]interface{})
// The protocol of an upstream is resolved in this order:
// 1. Default protocol from proxy-defaults (how all services should be addressed)
// 2. Protocol for upstream service defined in its service-defaults (how the upstream wants to be addressed)
// 3. Protocol defined for the upstream in the service-defaults.(upstream_config.defaults|upstream_config.overrides) of the downstream
// (how the downstream wants to address it)
protocol := proxyConfGlobalProtocol
upstreamSvcDefaults := entries.GetServiceDefaults(
structs.NewServiceID(upstream.ID, &upstream.EnterpriseMeta),
)
if upstreamSvcDefaults != nil {
if upstreamSvcDefaults.Protocol != "" {
protocol = upstreamSvcDefaults.Protocol
}
}
if protocol != "" {
resolvedCfg["protocol"] = protocol
}
// Merge centralized defaults for all upstreams before configuration for specific upstreams
if upstreamDefaults != nil {
upstreamDefaults.MergeInto(resolvedCfg)
}
// The MeshGateway value from the proxy registration overrides the one from upstream_defaults
// because it is specific to the proxy instance.
//
// The goal is to flatten the mesh gateway mode in this order:
// 0. Value from centralized upstream_defaults
// 1. Value from local proxy registration
// 2. Value from centralized upstream_config
// 3. Value from local upstream definition. This last step is done in the client's service manager.
if !args.MeshGateway.IsZero() {
resolvedCfg["mesh_gateway"] = args.MeshGateway
}
if upstreamConfigs[upstream] != nil {
upstreamConfigs[upstream].MergeInto(resolvedCfg)
}
if len(resolvedCfg) > 0 {
usConfigs[upstream] = resolvedCfg
}
}
// don't allocate the slices just to not fill them
if len(usConfigs) == 0 {
return &thisReply, nil
}
if legacyUpstreams {
// For legacy upstreams we return a map that is only keyed on the string ID, since they precede namespaces
thisReply.UpstreamConfigs = make(map[string]map[string]interface{})
for us, conf := range usConfigs {
thisReply.UpstreamConfigs[us.ID] = conf
}
} else {
thisReply.UpstreamIDConfigs = make(structs.OpaqueUpstreamConfigs, 0, len(usConfigs))
for us, conf := range usConfigs {
thisReply.UpstreamIDConfigs = append(thisReply.UpstreamIDConfigs,
structs.OpaqueUpstreamConfig{Upstream: us, Config: conf})
}
}
return &thisReply, nil
}

View file

@ -0,0 +1,56 @@
package configentry
import (
"testing"
"github.com/stretchr/testify/require"
"github.com/hashicorp/consul/agent/structs"
)
func Test_ComputeResolvedServiceConfig(t *testing.T) {
type args struct {
scReq *structs.ServiceConfigRequest
upstreamIDs []structs.ServiceID
entries *ResolvedServiceConfigSet
}
sid := structs.ServiceID{
ID: "sid",
EnterpriseMeta: *structs.DefaultEnterpriseMetaInDefaultPartition(),
}
tests := []struct {
name string
args args
want *structs.ServiceConfigResponse
}{
{
name: "proxy with maxinboundsconnections",
args: args{
scReq: &structs.ServiceConfigRequest{
Name: "sid",
},
entries: &ResolvedServiceConfigSet{
ServiceDefaults: map[structs.ServiceID]*structs.ServiceConfigEntry{
sid: {
MaxInboundConnections: 20,
},
},
},
},
want: &structs.ServiceConfigResponse{
ProxyConfig: map[string]interface{}{
"max_inbound_connections": 20,
},
},
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
got, err := ComputeResolvedServiceConfig(tt.args.scReq, tt.args.upstreamIDs,
false, tt.args.entries, nil)
require.NoError(t, err)
require.Equal(t, tt.want, got)
})
}
}

View file

@ -12,6 +12,7 @@ import (
hashstructure_v2 "github.com/mitchellh/hashstructure/v2"
"github.com/hashicorp/consul/acl"
"github.com/hashicorp/consul/agent/configentry"
"github.com/hashicorp/consul/agent/consul/state"
"github.com/hashicorp/consul/agent/structs"
)
@ -510,7 +511,7 @@ func (c *ConfigEntry) ResolveServiceConfig(args *structs.ServiceConfigRequest, r
ranOnce = true
}
thisReply, err := computeResolvedServiceConfig(
thisReply, err := configentry.ComputeResolvedServiceConfig(
args,
upstreamIDs,
legacyUpstreams,

View file

@ -3,13 +3,14 @@ package consul
import (
"fmt"
"github.com/hashicorp/consul/agent/configentry"
"github.com/hashicorp/consul/agent/consul/state"
"github.com/hashicorp/consul/agent/structs"
"github.com/hashicorp/go-hclog"
memdb "github.com/hashicorp/go-memdb"
"github.com/imdario/mergo"
"github.com/mitchellh/copystructure"
"github.com/hashicorp/consul/agent/configentry"
"github.com/hashicorp/consul/agent/consul/state"
"github.com/hashicorp/consul/agent/structs"
)
// mergeNodeServiceWithCentralConfig merges a service instance (NodeService) with the
@ -66,7 +67,7 @@ func mergeNodeServiceWithCentralConfig(
ns.ID, err)
}
defaults, err := computeResolvedServiceConfig(
defaults, err := configentry.ComputeResolvedServiceConfig(
configReq,
upstreams,
false,
@ -87,225 +88,6 @@ func mergeNodeServiceWithCentralConfig(
return cfgIndex, mergedns, nil
}
func computeResolvedServiceConfig(
args *structs.ServiceConfigRequest,
upstreamIDs []structs.ServiceID,
legacyUpstreams bool,
entries *configentry.ResolvedServiceConfigSet,
logger hclog.Logger,
) (*structs.ServiceConfigResponse, error) {
var thisReply structs.ServiceConfigResponse
thisReply.MeshGateway.Mode = structs.MeshGatewayModeDefault
// TODO(freddy) Refactor this into smaller set of state store functions
// Pass the WatchSet to both the service and proxy config lookups. If either is updated during the
// blocking query, this function will be rerun and these state store lookups will both be current.
// We use the default enterprise meta to look up the global proxy defaults because they are not namespaced.
var proxyConfGlobalProtocol string
proxyConf := entries.GetProxyDefaults(args.PartitionOrDefault())
if proxyConf != nil {
// Apply the proxy defaults to the sidecar's proxy config
mapCopy, err := copystructure.Copy(proxyConf.Config)
if err != nil {
return nil, fmt.Errorf("failed to copy global proxy-defaults: %v", err)
}
thisReply.ProxyConfig = mapCopy.(map[string]interface{})
thisReply.Mode = proxyConf.Mode
thisReply.TransparentProxy = proxyConf.TransparentProxy
thisReply.MeshGateway = proxyConf.MeshGateway
thisReply.Expose = proxyConf.Expose
// Extract the global protocol from proxyConf for upstream configs.
rawProtocol := proxyConf.Config["protocol"]
if rawProtocol != nil {
var ok bool
proxyConfGlobalProtocol, ok = rawProtocol.(string)
if !ok {
return nil, fmt.Errorf("invalid protocol type %T", rawProtocol)
}
}
}
serviceConf := entries.GetServiceDefaults(
structs.NewServiceID(args.Name, &args.EnterpriseMeta),
)
if serviceConf != nil {
if serviceConf.Expose.Checks {
thisReply.Expose.Checks = true
}
if len(serviceConf.Expose.Paths) >= 1 {
thisReply.Expose.Paths = serviceConf.Expose.Paths
}
if serviceConf.MeshGateway.Mode != structs.MeshGatewayModeDefault {
thisReply.MeshGateway.Mode = serviceConf.MeshGateway.Mode
}
if serviceConf.Protocol != "" {
if thisReply.ProxyConfig == nil {
thisReply.ProxyConfig = make(map[string]interface{})
}
thisReply.ProxyConfig["protocol"] = serviceConf.Protocol
}
if serviceConf.TransparentProxy.OutboundListenerPort != 0 {
thisReply.TransparentProxy.OutboundListenerPort = serviceConf.TransparentProxy.OutboundListenerPort
}
if serviceConf.TransparentProxy.DialedDirectly {
thisReply.TransparentProxy.DialedDirectly = serviceConf.TransparentProxy.DialedDirectly
}
if serviceConf.Mode != structs.ProxyModeDefault {
thisReply.Mode = serviceConf.Mode
}
if serviceConf.Destination != nil {
thisReply.Destination = *serviceConf.Destination
}
if serviceConf.MaxInboundConnections > 0 {
if thisReply.ProxyConfig == nil {
thisReply.ProxyConfig = map[string]interface{}{}
}
thisReply.ProxyConfig["max_inbound_connections"] = serviceConf.MaxInboundConnections
}
thisReply.Meta = serviceConf.Meta
}
// First collect all upstreams into a set of seen upstreams.
// Upstreams can come from:
// - Explicitly from proxy registrations, and therefore as an argument to this RPC endpoint
// - Implicitly from centralized upstream config in service-defaults
seenUpstreams := map[structs.ServiceID]struct{}{}
var (
noUpstreamArgs = len(upstreamIDs) == 0 && len(args.Upstreams) == 0
// Check the args and the resolved value. If it was exclusively set via a config entry, then args.Mode
// will never be transparent because the service config request does not use the resolved value.
tproxy = args.Mode == structs.ProxyModeTransparent || thisReply.Mode == structs.ProxyModeTransparent
)
// The upstreams passed as arguments to this endpoint are the upstreams explicitly defined in a proxy registration.
// If no upstreams were passed, then we should only return the resolved config if the proxy is in transparent mode.
// Otherwise we would return a resolved upstream config to a proxy with no configured upstreams.
if noUpstreamArgs && !tproxy {
return &thisReply, nil
}
// First store all upstreams that were provided in the request
for _, sid := range upstreamIDs {
if _, ok := seenUpstreams[sid]; !ok {
seenUpstreams[sid] = struct{}{}
}
}
// Then store upstreams inferred from service-defaults and mapify the overrides.
var (
upstreamConfigs = make(map[structs.ServiceID]*structs.UpstreamConfig)
upstreamDefaults *structs.UpstreamConfig
// usConfigs stores the opaque config map for each upstream and is keyed on the upstream's ID.
usConfigs = make(map[structs.ServiceID]map[string]interface{})
)
if serviceConf != nil && serviceConf.UpstreamConfig != nil {
for i, override := range serviceConf.UpstreamConfig.Overrides {
if override.Name == "" {
logger.Warn(
"Skipping UpstreamConfig.Overrides entry without a required name field",
"entryIndex", i,
"kind", serviceConf.GetKind(),
"name", serviceConf.GetName(),
"namespace", serviceConf.GetEnterpriseMeta().NamespaceOrEmpty(),
)
continue // skip this impossible condition
}
seenUpstreams[override.ServiceID()] = struct{}{}
upstreamConfigs[override.ServiceID()] = override
}
if serviceConf.UpstreamConfig.Defaults != nil {
upstreamDefaults = serviceConf.UpstreamConfig.Defaults
// Store the upstream defaults under a wildcard key so that they can be applied to
// upstreams that are inferred from intentions and do not have explicit upstream configuration.
cfgMap := make(map[string]interface{})
upstreamDefaults.MergeInto(cfgMap)
wildcard := structs.NewServiceID(structs.WildcardSpecifier, args.WithWildcardNamespace())
usConfigs[wildcard] = cfgMap
}
}
for upstream := range seenUpstreams {
resolvedCfg := make(map[string]interface{})
// The protocol of an upstream is resolved in this order:
// 1. Default protocol from proxy-defaults (how all services should be addressed)
// 2. Protocol for upstream service defined in its service-defaults (how the upstream wants to be addressed)
// 3. Protocol defined for the upstream in the service-defaults.(upstream_config.defaults|upstream_config.overrides) of the downstream
// (how the downstream wants to address it)
protocol := proxyConfGlobalProtocol
upstreamSvcDefaults := entries.GetServiceDefaults(
structs.NewServiceID(upstream.ID, &upstream.EnterpriseMeta),
)
if upstreamSvcDefaults != nil {
if upstreamSvcDefaults.Protocol != "" {
protocol = upstreamSvcDefaults.Protocol
}
}
if protocol != "" {
resolvedCfg["protocol"] = protocol
}
// Merge centralized defaults for all upstreams before configuration for specific upstreams
if upstreamDefaults != nil {
upstreamDefaults.MergeInto(resolvedCfg)
}
// The MeshGateway value from the proxy registration overrides the one from upstream_defaults
// because it is specific to the proxy instance.
//
// The goal is to flatten the mesh gateway mode in this order:
// 0. Value from centralized upstream_defaults
// 1. Value from local proxy registration
// 2. Value from centralized upstream_config
// 3. Value from local upstream definition. This last step is done in the client's service manager.
if !args.MeshGateway.IsZero() {
resolvedCfg["mesh_gateway"] = args.MeshGateway
}
if upstreamConfigs[upstream] != nil {
upstreamConfigs[upstream].MergeInto(resolvedCfg)
}
if len(resolvedCfg) > 0 {
usConfigs[upstream] = resolvedCfg
}
}
// don't allocate the slices just to not fill them
if len(usConfigs) == 0 {
return &thisReply, nil
}
if legacyUpstreams {
// For legacy upstreams we return a map that is only keyed on the string ID, since they precede namespaces
thisReply.UpstreamConfigs = make(map[string]map[string]interface{})
for us, conf := range usConfigs {
thisReply.UpstreamConfigs[us.ID] = conf
}
} else {
thisReply.UpstreamIDConfigs = make(structs.OpaqueUpstreamConfigs, 0, len(usConfigs))
for us, conf := range usConfigs {
thisReply.UpstreamIDConfigs = append(thisReply.UpstreamIDConfigs,
structs.OpaqueUpstreamConfig{Upstream: us, Config: conf})
}
}
return &thisReply, nil
}
// MergeServiceConfig merges the service into defaults to produce the final effective
// config for the specified service.
func MergeServiceConfig(defaults *structs.ServiceConfigResponse, service *structs.NodeService) (*structs.NodeService, error) {

View file

@ -3,60 +3,13 @@ package consul
import (
"testing"
"github.com/hashicorp/consul/agent/configentry"
"github.com/hashicorp/consul/agent/structs"
"github.com/mitchellh/copystructure"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"github.com/hashicorp/consul/agent/structs"
)
func Test_ComputeResolvedServiceConfig(t *testing.T) {
type args struct {
scReq *structs.ServiceConfigRequest
upstreamIDs []structs.ServiceID
entries *configentry.ResolvedServiceConfigSet
}
sid := structs.ServiceID{
ID: "sid",
EnterpriseMeta: *structs.DefaultEnterpriseMetaInDefaultPartition(),
}
tests := []struct {
name string
args args
want *structs.ServiceConfigResponse
}{
{
name: "proxy with maxinboundsconnections",
args: args{
scReq: &structs.ServiceConfigRequest{
Name: "sid",
},
entries: &configentry.ResolvedServiceConfigSet{
ServiceDefaults: map[structs.ServiceID]*structs.ServiceConfigEntry{
sid: {
MaxInboundConnections: 20,
},
},
},
},
want: &structs.ServiceConfigResponse{
ProxyConfig: map[string]interface{}{
"max_inbound_connections": 20,
},
},
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
got, err := computeResolvedServiceConfig(tt.args.scReq, tt.args.upstreamIDs,
false, tt.args.entries, nil)
require.NoError(t, err)
assert.Equal(t, tt.want, got)
})
}
}
func Test_MergeServiceConfig_TransparentProxy(t *testing.T) {
type args struct {
defaults *structs.ServiceConfigResponse

View file

@ -4,11 +4,9 @@ import (
"context"
"fmt"
"github.com/hashicorp/go-hclog"
"github.com/hashicorp/consul/acl"
"github.com/hashicorp/consul/agent/cache"
"github.com/hashicorp/consul/agent/consul/stream"
cachetype "github.com/hashicorp/consul/agent/cache-types"
"github.com/hashicorp/consul/agent/proxycfg"
"github.com/hashicorp/consul/agent/structs"
"github.com/hashicorp/consul/agent/submatview"
@ -17,15 +15,16 @@ import (
"github.com/hashicorp/consul/proto/pbsubscribe"
)
// ServerDataSourceDeps contains the dependencies needed for sourcing data from
// server-local sources (e.g. materialized views).
type ServerDataSourceDeps struct {
Datacenter string
ViewStore *submatview.Store
EventPublisher *stream.EventPublisher
Logger hclog.Logger
ACLResolver submatview.ACLResolver
GetStore func() Store
// CacheConfigEntry satisfies the proxycfg.ConfigEntry interface by sourcing
// data from the agent cache.
func CacheConfigEntry(c *cache.Cache) proxycfg.ConfigEntry {
return &cacheProxyDataSource[*structs.ConfigEntryQuery]{c, cachetype.ConfigEntryName}
}
// CacheConfigEntryList satisfies the proxycfg.ConfigEntryList interface by
// sourcing data from the agent cache.
func CacheConfigEntryList(c *cache.Cache) proxycfg.ConfigEntryList {
return &cacheProxyDataSource[*structs.ConfigEntryQuery]{c, cachetype.ConfigEntryListName}
}
// ServerConfigEntry satisfies the proxycfg.ConfigEntry interface by sourcing

View file

@ -3,20 +3,35 @@ package proxycfgglue
import (
"context"
"github.com/hashicorp/consul/proto/pbpeering"
"github.com/hashicorp/go-hclog"
"github.com/hashicorp/go-memdb"
"github.com/hashicorp/consul/proto/pbpeering"
"github.com/hashicorp/consul/acl"
"github.com/hashicorp/consul/agent/cache"
cachetype "github.com/hashicorp/consul/agent/cache-types"
"github.com/hashicorp/consul/agent/configentry"
"github.com/hashicorp/consul/agent/consul/discoverychain"
"github.com/hashicorp/consul/agent/consul/state"
"github.com/hashicorp/consul/agent/consul/stream"
"github.com/hashicorp/consul/agent/consul/watch"
"github.com/hashicorp/consul/agent/proxycfg"
"github.com/hashicorp/consul/agent/structs"
"github.com/hashicorp/consul/agent/submatview"
)
// ServerDataSourceDeps contains the dependencies needed for sourcing data from
// server-local sources (e.g. materialized views).
type ServerDataSourceDeps struct {
Datacenter string
ViewStore *submatview.Store
EventPublisher *stream.EventPublisher
Logger hclog.Logger
ACLResolver submatview.ACLResolver
GetStore func() Store
}
// Store is the state store interface required for server-local data sources.
type Store interface {
watch.StateStore
@ -25,6 +40,7 @@ type Store interface {
FederationStateList(ws memdb.WatchSet) (uint64, []*structs.FederationState, error)
GatewayServices(ws memdb.WatchSet, gateway string, entMeta *acl.EnterpriseMeta) (uint64, structs.GatewayServices, error)
IntentionTopology(ws memdb.WatchSet, target structs.ServiceName, downstreams bool, defaultDecision acl.EnforcementDecision, intentionTarget structs.IntentionTargetType) (uint64, structs.ServiceList, error)
ReadResolvedServiceConfigEntries(ws memdb.WatchSet, serviceName string, entMeta *acl.EnterpriseMeta, upstreamIDs []structs.ServiceID, proxyMode structs.ProxyMode) (uint64, *configentry.ResolvedServiceConfigSet, error)
ServiceDiscoveryChain(ws memdb.WatchSet, serviceName string, entMeta *acl.EnterpriseMeta, req discoverychain.CompileRequest) (uint64, *structs.CompiledDiscoveryChain, *configentry.DiscoveryChainSet, error)
PeeringTrustBundleRead(ws memdb.WatchSet, q state.Query) (uint64, *pbpeering.PeeringTrustBundle, error)
PeeringTrustBundleList(ws memdb.WatchSet, entMeta acl.EnterpriseMeta) (uint64, []*pbpeering.PeeringTrustBundle, error)
@ -34,24 +50,18 @@ type Store interface {
// CacheCARoots satisfies the proxycfg.CARoots interface by sourcing data from
// the agent cache.
//
// Note: there isn't a server-local equivalent of this data source because
// "agentless" proxies obtain certificates via SDS served by consul-dataplane.
func CacheCARoots(c *cache.Cache) proxycfg.CARoots {
return &cacheProxyDataSource[*structs.DCSpecificRequest]{c, cachetype.ConnectCARootName}
}
// CacheConfigEntry satisfies the proxycfg.ConfigEntry interface by sourcing
// data from the agent cache.
func CacheConfigEntry(c *cache.Cache) proxycfg.ConfigEntry {
return &cacheProxyDataSource[*structs.ConfigEntryQuery]{c, cachetype.ConfigEntryName}
}
// CacheConfigEntryList satisfies the proxycfg.ConfigEntryList interface by
// sourcing data from the agent cache.
func CacheConfigEntryList(c *cache.Cache) proxycfg.ConfigEntryList {
return &cacheProxyDataSource[*structs.ConfigEntryQuery]{c, cachetype.ConfigEntryListName}
}
// CacheDatacenters satisfies the proxycfg.Datacenters interface by sourcing
// data from the agent cache.
//
// Note: there isn't a server-local equivalent of this data source because it
// relies on polling (so a more efficient method isn't available).
func CacheDatacenters(c *cache.Cache) proxycfg.Datacenters {
return &cacheProxyDataSource[*structs.DatacentersRequest]{c, cachetype.CatalogDatacentersName}
}
@ -64,16 +74,13 @@ func CacheServiceGateways(c *cache.Cache) proxycfg.GatewayServices {
// CacheHTTPChecks satisifies the proxycfg.HTTPChecks interface by sourcing
// data from the agent cache.
//
// Note: there isn't a server-local equivalent of this data source because only
// services registered to the local agent can be health checked by it.
func CacheHTTPChecks(c *cache.Cache) proxycfg.HTTPChecks {
return &cacheProxyDataSource[*cachetype.ServiceHTTPChecksRequest]{c, cachetype.ServiceHTTPChecksName}
}
// CacheIntentionUpstreams satisfies the proxycfg.IntentionUpstreams interface
// by sourcing data from the agent cache.
func CacheIntentionUpstreams(c *cache.Cache) proxycfg.IntentionUpstreams {
return &cacheProxyDataSource[*structs.ServiceSpecificRequest]{c, cachetype.IntentionUpstreamsName}
}
// CacheIntentionUpstreamsDestination satisfies the proxycfg.IntentionUpstreamsDestination interface
// by sourcing data from the agent cache.
func CacheIntentionUpstreamsDestination(c *cache.Cache) proxycfg.IntentionUpstreams {
@ -88,22 +95,22 @@ func CacheInternalServiceDump(c *cache.Cache) proxycfg.InternalServiceDump {
// CacheLeafCertificate satisifies the proxycfg.LeafCertificate interface by
// sourcing data from the agent cache.
//
// Note: there isn't a server-local equivalent of this data source because
// "agentless" proxies obtain certificates via SDS served by consul-dataplane.
func CacheLeafCertificate(c *cache.Cache) proxycfg.LeafCertificate {
return &cacheProxyDataSource[*cachetype.ConnectCALeafRequest]{c, cachetype.ConnectCALeafName}
}
// CachePrepraredQuery satisfies the proxycfg.PreparedQuery interface by
// sourcing data from the agent cache.
//
// Note: there isn't a server-local equivalent of this data source because it
// relies on polling (so a more efficient method isn't available).
func CachePrepraredQuery(c *cache.Cache) proxycfg.PreparedQuery {
return &cacheProxyDataSource[*structs.PreparedQueryExecuteRequest]{c, cachetype.PreparedQueryName}
}
// CacheResolvedServiceConfig satisfies the proxycfg.ResolvedServiceConfig
// interface by sourcing data from the agent cache.
func CacheResolvedServiceConfig(c *cache.Cache) proxycfg.ResolvedServiceConfig {
return &cacheProxyDataSource[*structs.ServiceConfigRequest]{c, cachetype.ResolvedServiceConfigName}
}
// cacheProxyDataSource implements a generic wrapper around the agent cache to
// provide data to the proxycfg.Manager.
type cacheProxyDataSource[ReqType cache.Request] struct {

View file

@ -5,12 +5,20 @@ import (
"github.com/hashicorp/go-memdb"
"github.com/hashicorp/consul/agent/cache"
cachetype "github.com/hashicorp/consul/agent/cache-types"
"github.com/hashicorp/consul/agent/consul/watch"
"github.com/hashicorp/consul/agent/proxycfg"
"github.com/hashicorp/consul/agent/structs"
"github.com/hashicorp/consul/agent/structs/aclfilter"
)
// CacheIntentionUpstreams satisfies the proxycfg.IntentionUpstreams interface
// by sourcing data from the agent cache.
func CacheIntentionUpstreams(c *cache.Cache) proxycfg.IntentionUpstreams {
return &cacheProxyDataSource[*structs.ServiceSpecificRequest]{c, cachetype.IntentionUpstreamsName}
}
// ServerIntentionUpstreams satisfies the proxycfg.IntentionUpstreams interface
// by sourcing data from a blocking query against the server's state store.
func ServerIntentionUpstreams(deps ServerDataSourceDeps) proxycfg.IntentionUpstreams {

View file

@ -0,0 +1,70 @@
package proxycfgglue
import (
"context"
"errors"
"github.com/hashicorp/go-memdb"
"github.com/hashicorp/consul/agent/cache"
cachetype "github.com/hashicorp/consul/agent/cache-types"
"github.com/hashicorp/consul/agent/configentry"
"github.com/hashicorp/consul/agent/consul/watch"
"github.com/hashicorp/consul/agent/proxycfg"
"github.com/hashicorp/consul/agent/structs"
)
// CacheResolvedServiceConfig satisfies the proxycfg.ResolvedServiceConfig
// interface by sourcing data from the agent cache.
func CacheResolvedServiceConfig(c *cache.Cache) proxycfg.ResolvedServiceConfig {
return &cacheProxyDataSource[*structs.ServiceConfigRequest]{c, cachetype.ResolvedServiceConfigName}
}
// ServerResolvedServiceConfig satisfies the proxycfg.ResolvedServiceConfig
// interface by sourcing data from a blocking query against the server's state
// store.
func ServerResolvedServiceConfig(deps ServerDataSourceDeps, remoteSource proxycfg.ResolvedServiceConfig) proxycfg.ResolvedServiceConfig {
return &serverResolvedServiceConfig{deps, remoteSource}
}
type serverResolvedServiceConfig struct {
deps ServerDataSourceDeps
remoteSource proxycfg.ResolvedServiceConfig
}
func (s *serverResolvedServiceConfig) Notify(ctx context.Context, req *structs.ServiceConfigRequest, correlationID string, ch chan<- proxycfg.UpdateEvent) error {
if req.Datacenter != s.deps.Datacenter {
return s.remoteSource.Notify(ctx, req, correlationID, ch)
}
if len(req.Upstreams) != 0 {
return errors.New("ServerResolvedServiceConfig does not support the legacy Upstreams parameter")
}
return watch.ServerLocalNotify(ctx, correlationID, s.deps.GetStore,
func(ws memdb.WatchSet, store Store) (uint64, *structs.ServiceConfigResponse, error) {
authz, err := s.deps.ACLResolver.ResolveTokenAndDefaultMeta(req.Token, &req.EnterpriseMeta, nil)
if err != nil {
return 0, nil, err
}
if err := authz.ToAllowAuthorizer().ServiceReadAllowed(req.Name, nil); err != nil {
return 0, nil, err
}
idx, entries, err := store.ReadResolvedServiceConfigEntries(ws, req.Name, &req.EnterpriseMeta, req.UpstreamIDs, req.Mode)
if err != nil {
return 0, nil, err
}
reply, err := configentry.ComputeResolvedServiceConfig(req, req.UpstreamIDs, false, entries, s.deps.Logger)
if err != nil {
return 0, nil, err
}
reply.Index = idx
return idx, reply, nil
},
dispatchBlockingQueryUpdate[*structs.ServiceConfigResponse](ch),
)
}

View file

@ -0,0 +1,116 @@
package proxycfgglue
import (
"context"
"errors"
"fmt"
"testing"
"github.com/stretchr/testify/mock"
"github.com/stretchr/testify/require"
"github.com/hashicorp/consul/acl"
"github.com/hashicorp/consul/agent/consul/state"
"github.com/hashicorp/consul/agent/proxycfg"
"github.com/hashicorp/consul/agent/structs"
"github.com/hashicorp/consul/sdk/testutil"
)
func TestServerResolvedServiceConfig(t *testing.T) {
t.Run("remote queries are delegated to the remote source", func(t *testing.T) {
var (
ctx = context.Background()
req = &structs.ServiceConfigRequest{Datacenter: "dc2"}
correlationID = "correlation-id"
ch = make(chan<- proxycfg.UpdateEvent)
result = errors.New("KABOOM")
)
remoteSource := newMockResolvedServiceConfig(t)
remoteSource.On("Notify", ctx, req, correlationID, ch).Return(result)
dataSource := ServerResolvedServiceConfig(ServerDataSourceDeps{Datacenter: "dc1"}, remoteSource)
err := dataSource.Notify(ctx, req, correlationID, ch)
require.Equal(t, result, err)
})
t.Run("local queries are served from the state store", func(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
t.Cleanup(cancel)
const (
serviceName = "web"
datacenter = "dc1"
)
store := state.NewStateStore(nil)
nextIndex := indexGenerator()
require.NoError(t, store.EnsureConfigEntry(nextIndex(), &structs.ServiceConfigEntry{
Name: serviceName,
Protocol: "http",
}))
authz := newStaticResolver(
policyAuthorizer(t, fmt.Sprintf(`service "%s" { policy = "read" }`, serviceName)),
)
dataSource := ServerResolvedServiceConfig(ServerDataSourceDeps{
Datacenter: datacenter,
ACLResolver: authz,
GetStore: func() Store { return store },
}, nil)
eventCh := make(chan proxycfg.UpdateEvent)
require.NoError(t, dataSource.Notify(ctx, &structs.ServiceConfigRequest{Datacenter: datacenter, Name: serviceName}, "", eventCh))
testutil.RunStep(t, "initial state", func(t *testing.T) {
result := getEventResult[*structs.ServiceConfigResponse](t, eventCh)
require.Equal(t, map[string]any{"protocol": "http"}, result.ProxyConfig)
})
testutil.RunStep(t, "write proxy defaults", func(t *testing.T) {
require.NoError(t, store.EnsureConfigEntry(nextIndex(), &structs.ProxyConfigEntry{
Name: structs.ProxyConfigGlobal,
Mode: structs.ProxyModeDirect,
}))
result := getEventResult[*structs.ServiceConfigResponse](t, eventCh)
require.Equal(t, structs.ProxyModeDirect, result.Mode)
})
testutil.RunStep(t, "delete service config", func(t *testing.T) {
require.NoError(t, store.DeleteConfigEntry(nextIndex(), structs.ServiceDefaults, serviceName, nil))
result := getEventResult[*structs.ServiceConfigResponse](t, eventCh)
require.Empty(t, result.ProxyConfig)
})
testutil.RunStep(t, "revoke access", func(t *testing.T) {
authz.SwapAuthorizer(acl.DenyAll())
require.NoError(t, store.EnsureConfigEntry(nextIndex(), &structs.ServiceConfigEntry{
Name: serviceName,
Protocol: "http",
}))
expectNoEvent(t, eventCh)
})
})
}
func newMockResolvedServiceConfig(t *testing.T) *mockResolvedServiceConfig {
mock := &mockResolvedServiceConfig{}
mock.Mock.Test(t)
t.Cleanup(func() { mock.AssertExpectations(t) })
return mock
}
type mockResolvedServiceConfig struct {
mock.Mock
}
func (m *mockResolvedServiceConfig) Notify(ctx context.Context, req *structs.ServiceConfigRequest, correlationID string, ch chan<- proxycfg.UpdateEvent) error {
return m.Called(ctx, req, correlationID, ch).Error(0)
}