Add support for merge-central-config query param (#13001)

Adds a new query param merge-central-config for use with the below endpoints:

/catalog/service/:service
/catalog/connect/:service
/health/service/:service
/health/connect/:service

If set on the request, the response will include a fully resolved service definition which is merged with the proxy-defaults/global and service-defaults/:service config entries (on-demand style). This is useful to view the full service definition for a mesh service (connect-proxy kind or gateway kind) which might not be merged before being written into the catalog (example: in case of services in the agentless model).
This commit is contained in:
Riddhi Shah 2022-05-25 13:20:17 -07:00 committed by GitHub
parent 4f9a9bb851
commit e5f1d8dce4
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
18 changed files with 1403 additions and 785 deletions

3
.changelog/13001.txt Normal file
View File

@ -0,0 +1,3 @@
```release-note:enhancement
api: `merge-central-config` query parameter support added to some catalog and health endpoints to view a fully resolved service definition (especially when not written into the catalog that way).
```

View File

@ -356,6 +356,10 @@ func (s *HTTPHandlers) catalogServiceNodes(resp http.ResponseWriter, req *http.R
args.TagFilter = true args.TagFilter = true
} }
if _, ok := params["merge-central-config"]; ok {
args.MergeCentralConfig = true
}
// Pull out the service name // Pull out the service name
var err error var err error
args.ServiceName, err = getPathSuffixUnescaped(req.URL.Path, pathPrefix) args.ServiceName, err = getPathSuffixUnescaped(req.URL.Path, pathPrefix)

View File

@ -1052,6 +1052,236 @@ func TestCatalogServiceNodes_ConnectProxy(t *testing.T) {
assert.Equal(t, args.Service.Proxy, nodes[0].ServiceProxy) assert.Equal(t, args.Service.Proxy, nodes[0].ServiceProxy)
} }
func registerProxyDefaults(t *testing.T, a *TestAgent) (proxyGlobalEntry structs.ProxyConfigEntry) {
t.Helper()
// Register proxy-defaults
proxyGlobalEntry = structs.ProxyConfigEntry{
Kind: structs.ProxyDefaults,
Name: structs.ProxyConfigGlobal,
Mode: structs.ProxyModeDirect,
Config: map[string]interface{}{
"local_connect_timeout_ms": uint64(1000),
"handshake_timeout_ms": uint64(1000),
},
}
proxyDefaultsConfigEntryReq := &structs.ConfigEntryRequest{
Op: structs.ConfigEntryUpsert,
Datacenter: "dc1",
Entry: &proxyGlobalEntry,
}
var proxyDefaultsConfigEntryResp bool
assert.Nil(t, a.RPC("ConfigEntry.Apply", &proxyDefaultsConfigEntryReq, &proxyDefaultsConfigEntryResp))
return
}
func registerServiceDefaults(t *testing.T, a *TestAgent, serviceName string) (serviceDefaultsConfigEntry structs.ServiceConfigEntry) {
t.Helper()
limits := 512
serviceDefaultsConfigEntry = structs.ServiceConfigEntry{
Kind: structs.ServiceDefaults,
Name: serviceName,
Mode: structs.ProxyModeTransparent,
UpstreamConfig: &structs.UpstreamConfiguration{
Defaults: &structs.UpstreamConfig{
MeshGateway: structs.MeshGatewayConfig{
Mode: structs.MeshGatewayModeLocal,
},
Limits: &structs.UpstreamLimits{
MaxConnections: &limits,
},
},
},
}
serviceDefaultsConfigEntryReq := &structs.ConfigEntryRequest{
Op: structs.ConfigEntryUpsert,
Datacenter: "dc1",
Entry: &serviceDefaultsConfigEntry,
}
var serviceDefaultsConfigEntryResp bool
assert.Nil(t, a.RPC("ConfigEntry.Apply", &serviceDefaultsConfigEntryReq, &serviceDefaultsConfigEntryResp))
return
}
func validateMergeCentralConfigResponse(t *testing.T, v *structs.ServiceNode,
registerServiceReq *structs.RegisterRequest,
proxyGlobalEntry structs.ProxyConfigEntry,
serviceDefaultsConfigEntry structs.ServiceConfigEntry) {
t.Helper()
assert.Equal(t, registerServiceReq.Service.Service, v.ServiceName)
// validate proxy global defaults are resolved in the merged service config
assert.Equal(t, proxyGlobalEntry.Config, v.ServiceProxy.Config)
// validate service defaults override proxy-defaults/global
assert.NotEqual(t, proxyGlobalEntry.Mode, v.ServiceProxy.Mode)
assert.Equal(t, serviceDefaultsConfigEntry.Mode, v.ServiceProxy.Mode)
// validate service defaults are resolved in the merged service config
// expected number of upstreams = (number of upstreams defined in the register request proxy config +
// 1 default from service defaults)
assert.Equal(t, len(registerServiceReq.Service.Proxy.Upstreams)+1, len(v.ServiceProxy.Upstreams))
for _, up := range v.ServiceProxy.Upstreams {
if up.DestinationType != "" {
continue
}
assert.Contains(t, up.Config, "limits")
upstreamLimits := up.Config["limits"].(*structs.UpstreamLimits)
assert.Equal(t, serviceDefaultsConfigEntry.UpstreamConfig.Defaults.Limits.MaxConnections, upstreamLimits.MaxConnections)
assert.Equal(t, serviceDefaultsConfigEntry.UpstreamConfig.Defaults.MeshGateway.Mode, up.MeshGateway.Mode)
}
}
func TestListServiceNodes_MergeCentralConfig(t *testing.T) {
if testing.Short() {
t.Skip("too slow for testing.Short")
}
t.Parallel()
a := NewTestAgent(t, "")
defer a.Shutdown()
testrpc.WaitForLeader(t, a.RPC, "dc1")
// Register the service
registerServiceReq := structs.TestRegisterRequestProxy(t)
var out struct{}
assert.Nil(t, a.RPC("Catalog.Register", registerServiceReq, &out))
// Register proxy-defaults
proxyGlobalEntry := registerProxyDefaults(t, a)
// Register service-defaults
serviceDefaultsConfigEntry := registerServiceDefaults(t, a, registerServiceReq.Service.Proxy.DestinationServiceName)
type testCase struct {
testCaseName string
serviceName string
connect bool
}
run := func(t *testing.T, tc testCase) {
url := fmt.Sprintf("/v1/catalog/service/%s?merge-central-config", tc.serviceName)
if tc.connect {
url = fmt.Sprintf("/v1/catalog/connect/%s?merge-central-config", tc.serviceName)
}
req, _ := http.NewRequest("GET", url, nil)
resp := httptest.NewRecorder()
var obj interface{}
var err error
if tc.connect {
obj, err = a.srv.CatalogConnectServiceNodes(resp, req)
} else {
obj, err = a.srv.CatalogServiceNodes(resp, req)
}
assert.Nil(t, err)
assertIndex(t, resp)
serviceNodes := obj.(structs.ServiceNodes)
// validate response
assert.Len(t, serviceNodes, 1)
v := serviceNodes[0]
validateMergeCentralConfigResponse(t, v, registerServiceReq, proxyGlobalEntry, serviceDefaultsConfigEntry)
}
testCases := []testCase{
{
testCaseName: "List service instances with merge-central-config",
serviceName: registerServiceReq.Service.Service,
},
{
testCaseName: "List connect capable service instances with merge-central-config",
serviceName: registerServiceReq.Service.Proxy.DestinationServiceName,
connect: true,
},
}
for _, tc := range testCases {
t.Run(tc.testCaseName, func(t *testing.T) {
run(t, tc)
})
}
}
func TestCatalogServiceNodes_MergeCentralConfigBlocking(t *testing.T) {
if testing.Short() {
t.Skip("too slow for testing.Short")
}
t.Parallel()
a := NewTestAgent(t, "")
defer a.Shutdown()
testrpc.WaitForLeader(t, a.RPC, "dc1")
// Register the service
registerServiceReq := structs.TestRegisterRequestProxy(t)
var out struct{}
assert.Nil(t, a.RPC("Catalog.Register", registerServiceReq, &out))
// Register proxy-defaults
proxyGlobalEntry := registerProxyDefaults(t, a)
// Run the query
rpcReq := structs.ServiceSpecificRequest{
Datacenter: "dc1",
ServiceName: registerServiceReq.Service.Service,
MergeCentralConfig: true,
}
var rpcResp structs.IndexedServiceNodes
assert.Nil(t, a.RPC("Catalog.ServiceNodes", &rpcReq, &rpcResp))
assert.Len(t, rpcResp.ServiceNodes, 1)
serviceNode := rpcResp.ServiceNodes[0]
assert.Equal(t, registerServiceReq.Service.Service, serviceNode.ServiceName)
// validate proxy global defaults are resolved in the merged service config
assert.Equal(t, proxyGlobalEntry.Config, serviceNode.ServiceProxy.Config)
assert.Equal(t, proxyGlobalEntry.Mode, serviceNode.ServiceProxy.Mode)
// Async cause a change - register service defaults
waitIndex := rpcResp.Index
start := time.Now()
var serviceDefaultsConfigEntry structs.ServiceConfigEntry
go func() {
time.Sleep(100 * time.Millisecond)
// Register service-defaults
serviceDefaultsConfigEntry = registerServiceDefaults(t, a, registerServiceReq.Service.Proxy.DestinationServiceName)
}()
const waitDuration = 3 * time.Second
RUN_BLOCKING_QUERY:
url := fmt.Sprintf("/v1/catalog/service/%s?merge-central-config&wait=%s&index=%d",
registerServiceReq.Service.Service, waitDuration.String(), waitIndex)
req, _ := http.NewRequest("GET", url, nil)
resp := httptest.NewRecorder()
obj, err := a.srv.CatalogServiceNodes(resp, req)
assert.Nil(t, err)
assertIndex(t, resp)
elapsed := time.Since(start)
idx := getIndex(t, resp)
if idx < waitIndex {
t.Fatalf("bad index returned: %v", idx)
} else if idx == waitIndex {
if elapsed > waitDuration {
// This should prevent the loop from running longer than the waitDuration
t.Fatalf("too slow: %v", elapsed)
}
goto RUN_BLOCKING_QUERY
}
// Should block at least 100ms before getting the changed results
if elapsed < 100*time.Millisecond {
t.Fatalf("too fast: %v", elapsed)
}
serviceNodes := obj.(structs.ServiceNodes)
// validate response
assert.Len(t, serviceNodes, 1)
v := serviceNodes[0]
validateMergeCentralConfigResponse(t, v, registerServiceReq, proxyGlobalEntry, serviceDefaultsConfigEntry)
}
// Test that the Connect-compatible endpoints can be queried for a // Test that the Connect-compatible endpoints can be queried for a
// service via /v1/catalog/connect/:service. // service via /v1/catalog/connect/:service.
func TestCatalogConnectServiceNodes_good(t *testing.T) { func TestCatalogConnectServiceNodes_good(t *testing.T) {

View File

@ -12,6 +12,7 @@ import (
"github.com/hashicorp/go-hclog" "github.com/hashicorp/go-hclog"
"github.com/hashicorp/go-memdb" "github.com/hashicorp/go-memdb"
"github.com/hashicorp/go-uuid" "github.com/hashicorp/go-uuid"
hashstructure_v2 "github.com/mitchellh/hashstructure/v2"
"github.com/hashicorp/consul/acl" "github.com/hashicorp/consul/acl"
"github.com/hashicorp/consul/agent/consul/state" "github.com/hashicorp/consul/agent/consul/state"
@ -663,6 +664,11 @@ func (c *Catalog) ServiceNodes(args *structs.ServiceSpecificRequest, reply *stru
return err return err
} }
var (
priorMergeHash uint64
ranMergeOnce bool
)
err = c.srv.blockingQuery( err = c.srv.blockingQuery(
&args.QueryOptions, &args.QueryOptions,
&reply.QueryMeta, &reply.QueryMeta,
@ -672,10 +678,53 @@ func (c *Catalog) ServiceNodes(args *structs.ServiceSpecificRequest, reply *stru
return err return err
} }
reply.Index, reply.ServiceNodes = index, services mergedServices := services
if args.MergeCentralConfig {
var mergedServiceNodes structs.ServiceNodes
for _, sn := range services {
mergedsn := sn
ns := sn.ToNodeService()
if ns.IsSidecarProxy() || ns.IsGateway() {
cfgIndex, mergedns, err := mergeNodeServiceWithCentralConfig(ws, state, args, ns, c.logger)
if err != nil {
return err
}
if cfgIndex > index {
index = cfgIndex
}
mergedsn = mergedns.ToServiceNode(sn.Node)
}
mergedServiceNodes = append(mergedServiceNodes, mergedsn)
}
if len(mergedServiceNodes) > 0 {
mergedServices = mergedServiceNodes
}
// Generate a hash of the mergedServices driving this response.
// Use it to determine if the response is identical to a prior wakeup.
newMergeHash, err := hashstructure_v2.Hash(mergedServices, hashstructure_v2.FormatV2, nil)
if err != nil {
return fmt.Errorf("error hashing reply for spurious wakeup suppression: %w", err)
}
if ranMergeOnce && priorMergeHash == newMergeHash {
// the below assignment is not required as the if condition already validates equality,
// but makes it more clear that prior value is being reset to the new hash on each run.
priorMergeHash = newMergeHash
reply.Index = index
// NOTE: the prior response is still alive inside of *reply, which is desirable
return errNotChanged
} else {
priorMergeHash = newMergeHash
ranMergeOnce = true
}
}
reply.Index, reply.ServiceNodes = index, mergedServices
if len(args.NodeMetaFilters) > 0 { if len(args.NodeMetaFilters) > 0 {
var filtered structs.ServiceNodes var filtered structs.ServiceNodes
for _, service := range services { for _, service := range mergedServices {
if structs.SatisfiesMetaFilters(service.NodeMeta, args.NodeMetaFilters) { if structs.SatisfiesMetaFilters(service.NodeMeta, args.NodeMetaFilters) {
filtered = append(filtered, service) filtered = append(filtered, service)
} }

View File

@ -9,11 +9,9 @@ import (
"github.com/armon/go-metrics/prometheus" "github.com/armon/go-metrics/prometheus"
"github.com/hashicorp/go-hclog" "github.com/hashicorp/go-hclog"
memdb "github.com/hashicorp/go-memdb" memdb "github.com/hashicorp/go-memdb"
"github.com/mitchellh/copystructure"
hashstructure_v2 "github.com/mitchellh/hashstructure/v2" hashstructure_v2 "github.com/mitchellh/hashstructure/v2"
"github.com/hashicorp/consul/acl" "github.com/hashicorp/consul/acl"
"github.com/hashicorp/consul/agent/configentry"
"github.com/hashicorp/consul/agent/consul/state" "github.com/hashicorp/consul/agent/consul/state"
"github.com/hashicorp/consul/agent/structs" "github.com/hashicorp/consul/agent/structs"
) )
@ -482,7 +480,6 @@ func (c *ConfigEntry) ResolveServiceConfig(args *structs.ServiceConfigRequest, r
} }
// Fetch all relevant config entries. // Fetch all relevant config entries.
index, entries, err := state.ReadResolvedServiceConfigEntries( index, entries, err := state.ReadResolvedServiceConfigEntries(
ws, ws,
args.Name, args.Name,
@ -513,11 +510,12 @@ func (c *ConfigEntry) ResolveServiceConfig(args *structs.ServiceConfigRequest, r
ranOnce = true ranOnce = true
} }
thisReply, err := c.computeResolvedServiceConfig( thisReply, err := computeResolvedServiceConfig(
args, args,
upstreamIDs, upstreamIDs,
legacyUpstreams, legacyUpstreams,
entries, entries,
c.logger,
) )
if err != nil { if err != nil {
return err return err
@ -534,214 +532,6 @@ func (c *ConfigEntry) ResolveServiceConfig(args *structs.ServiceConfigRequest, r
}) })
} }
func (c *ConfigEntry) computeResolvedServiceConfig(
args *structs.ServiceConfigRequest,
upstreamIDs []structs.ServiceID,
legacyUpstreams bool,
entries *configentry.ResolvedServiceConfigSet,
) (*structs.ServiceConfigResponse, error) {
var thisReply structs.ServiceConfigResponse
thisReply.MeshGateway.Mode = structs.MeshGatewayModeDefault
// TODO(freddy) Refactor this into smaller set of state store functions
// Pass the WatchSet to both the service and proxy config lookups. If either is updated during the
// blocking query, this function will be rerun and these state store lookups will both be current.
// We use the default enterprise meta to look up the global proxy defaults because they are not namespaced.
var proxyConfGlobalProtocol string
proxyConf := entries.GetProxyDefaults(args.PartitionOrDefault())
if proxyConf != nil {
// Apply the proxy defaults to the sidecar's proxy config
mapCopy, err := copystructure.Copy(proxyConf.Config)
if err != nil {
return nil, fmt.Errorf("failed to copy global proxy-defaults: %v", err)
}
thisReply.ProxyConfig = mapCopy.(map[string]interface{})
thisReply.Mode = proxyConf.Mode
thisReply.TransparentProxy = proxyConf.TransparentProxy
thisReply.MeshGateway = proxyConf.MeshGateway
thisReply.Expose = proxyConf.Expose
// Extract the global protocol from proxyConf for upstream configs.
rawProtocol := proxyConf.Config["protocol"]
if rawProtocol != nil {
var ok bool
proxyConfGlobalProtocol, ok = rawProtocol.(string)
if !ok {
return nil, fmt.Errorf("invalid protocol type %T", rawProtocol)
}
}
}
serviceConf := entries.GetServiceDefaults(
structs.NewServiceID(args.Name, &args.EnterpriseMeta),
)
if serviceConf != nil {
if serviceConf.Expose.Checks {
thisReply.Expose.Checks = true
}
if len(serviceConf.Expose.Paths) >= 1 {
thisReply.Expose.Paths = serviceConf.Expose.Paths
}
if serviceConf.MeshGateway.Mode != structs.MeshGatewayModeDefault {
thisReply.MeshGateway.Mode = serviceConf.MeshGateway.Mode
}
if serviceConf.Protocol != "" {
if thisReply.ProxyConfig == nil {
thisReply.ProxyConfig = make(map[string]interface{})
}
thisReply.ProxyConfig["protocol"] = serviceConf.Protocol
}
if serviceConf.TransparentProxy.OutboundListenerPort != 0 {
thisReply.TransparentProxy.OutboundListenerPort = serviceConf.TransparentProxy.OutboundListenerPort
}
if serviceConf.TransparentProxy.DialedDirectly {
thisReply.TransparentProxy.DialedDirectly = serviceConf.TransparentProxy.DialedDirectly
}
if serviceConf.Mode != structs.ProxyModeDefault {
thisReply.Mode = serviceConf.Mode
}
thisReply.Meta = serviceConf.Meta
}
// First collect all upstreams into a set of seen upstreams.
// Upstreams can come from:
// - Explicitly from proxy registrations, and therefore as an argument to this RPC endpoint
// - Implicitly from centralized upstream config in service-defaults
seenUpstreams := map[structs.ServiceID]struct{}{}
var (
noUpstreamArgs = len(upstreamIDs) == 0 && len(args.Upstreams) == 0
// Check the args and the resolved value. If it was exclusively set via a config entry, then args.Mode
// will never be transparent because the service config request does not use the resolved value.
tproxy = args.Mode == structs.ProxyModeTransparent || thisReply.Mode == structs.ProxyModeTransparent
)
// The upstreams passed as arguments to this endpoint are the upstreams explicitly defined in a proxy registration.
// If no upstreams were passed, then we should only return the resolved config if the proxy is in transparent mode.
// Otherwise we would return a resolved upstream config to a proxy with no configured upstreams.
if noUpstreamArgs && !tproxy {
return &thisReply, nil
}
// First store all upstreams that were provided in the request
for _, sid := range upstreamIDs {
if _, ok := seenUpstreams[sid]; !ok {
seenUpstreams[sid] = struct{}{}
}
}
// Then store upstreams inferred from service-defaults and mapify the overrides.
var (
upstreamConfigs = make(map[structs.ServiceID]*structs.UpstreamConfig)
upstreamDefaults *structs.UpstreamConfig
// usConfigs stores the opaque config map for each upstream and is keyed on the upstream's ID.
usConfigs = make(map[structs.ServiceID]map[string]interface{})
)
if serviceConf != nil && serviceConf.UpstreamConfig != nil {
for i, override := range serviceConf.UpstreamConfig.Overrides {
if override.Name == "" {
c.logger.Warn(
"Skipping UpstreamConfig.Overrides entry without a required name field",
"entryIndex", i,
"kind", serviceConf.GetKind(),
"name", serviceConf.GetName(),
"namespace", serviceConf.GetEnterpriseMeta().NamespaceOrEmpty(),
)
continue // skip this impossible condition
}
seenUpstreams[override.ServiceID()] = struct{}{}
upstreamConfigs[override.ServiceID()] = override
}
if serviceConf.UpstreamConfig.Defaults != nil {
upstreamDefaults = serviceConf.UpstreamConfig.Defaults
// Store the upstream defaults under a wildcard key so that they can be applied to
// upstreams that are inferred from intentions and do not have explicit upstream configuration.
cfgMap := make(map[string]interface{})
upstreamDefaults.MergeInto(cfgMap)
wildcard := structs.NewServiceID(structs.WildcardSpecifier, args.WithWildcardNamespace())
usConfigs[wildcard] = cfgMap
}
}
for upstream := range seenUpstreams {
resolvedCfg := make(map[string]interface{})
// The protocol of an upstream is resolved in this order:
// 1. Default protocol from proxy-defaults (how all services should be addressed)
// 2. Protocol for upstream service defined in its service-defaults (how the upstream wants to be addressed)
// 3. Protocol defined for the upstream in the service-defaults.(upstream_config.defaults|upstream_config.overrides) of the downstream
// (how the downstream wants to address it)
protocol := proxyConfGlobalProtocol
upstreamSvcDefaults := entries.GetServiceDefaults(
structs.NewServiceID(upstream.ID, &upstream.EnterpriseMeta),
)
if upstreamSvcDefaults != nil {
if upstreamSvcDefaults.Protocol != "" {
protocol = upstreamSvcDefaults.Protocol
}
}
if protocol != "" {
resolvedCfg["protocol"] = protocol
}
// Merge centralized defaults for all upstreams before configuration for specific upstreams
if upstreamDefaults != nil {
upstreamDefaults.MergeInto(resolvedCfg)
}
// The MeshGateway value from the proxy registration overrides the one from upstream_defaults
// because it is specific to the proxy instance.
//
// The goal is to flatten the mesh gateway mode in this order:
// 0. Value from centralized upstream_defaults
// 1. Value from local proxy registration
// 2. Value from centralized upstream_config
// 3. Value from local upstream definition. This last step is done in the client's service manager.
if !args.MeshGateway.IsZero() {
resolvedCfg["mesh_gateway"] = args.MeshGateway
}
if upstreamConfigs[upstream] != nil {
upstreamConfigs[upstream].MergeInto(resolvedCfg)
}
if len(resolvedCfg) > 0 {
usConfigs[upstream] = resolvedCfg
}
}
// don't allocate the slices just to not fill them
if len(usConfigs) == 0 {
return &thisReply, nil
}
if legacyUpstreams {
// For legacy upstreams we return a map that is only keyed on the string ID, since they precede namespaces
thisReply.UpstreamConfigs = make(map[string]map[string]interface{})
for us, conf := range usConfigs {
thisReply.UpstreamConfigs[us.ID] = conf
}
} else {
thisReply.UpstreamIDConfigs = make(structs.OpaqueUpstreamConfigs, 0, len(usConfigs))
for us, conf := range usConfigs {
thisReply.UpstreamIDConfigs = append(thisReply.UpstreamIDConfigs,
structs.OpaqueUpstreamConfig{Upstream: us, Config: conf})
}
}
return &thisReply, nil
}
func gateWriteToSecondary(targetDC, localDC, primaryDC, kind string) error { func gateWriteToSecondary(targetDC, localDC, primaryDC, kind string) error {
// ExportedServices entries are gated from interactions from secondary DCs // ExportedServices entries are gated from interactions from secondary DCs
// because non-default partitions cannot be created in secondaries // because non-default partitions cannot be created in secondaries

View File

@ -6,7 +6,9 @@ import (
"github.com/armon/go-metrics" "github.com/armon/go-metrics"
bexpr "github.com/hashicorp/go-bexpr" bexpr "github.com/hashicorp/go-bexpr"
"github.com/hashicorp/go-hclog"
"github.com/hashicorp/go-memdb" "github.com/hashicorp/go-memdb"
hashstructure_v2 "github.com/mitchellh/hashstructure/v2"
"github.com/hashicorp/consul/acl" "github.com/hashicorp/consul/acl"
"github.com/hashicorp/consul/agent/consul/state" "github.com/hashicorp/consul/agent/consul/state"
@ -15,7 +17,8 @@ import (
// Health endpoint is used to query the health information // Health endpoint is used to query the health information
type Health struct { type Health struct {
srv *Server srv *Server
logger hclog.Logger
} }
// ChecksInState is used to get all the checks in a given state // ChecksInState is used to get all the checks in a given state
@ -232,6 +235,11 @@ func (h *Health) ServiceNodes(args *structs.ServiceSpecificRequest, reply *struc
return err return err
} }
var (
priorMergeHash uint64
ranMergeOnce bool
)
err = h.srv.blockingQuery( err = h.srv.blockingQuery(
&args.QueryOptions, &args.QueryOptions,
&reply.QueryMeta, &reply.QueryMeta,
@ -243,7 +251,43 @@ func (h *Health) ServiceNodes(args *structs.ServiceSpecificRequest, reply *struc
return err return err
} }
thisReply.Index, thisReply.Nodes = index, nodes resolvedNodes := nodes
if args.MergeCentralConfig {
for _, node := range resolvedNodes {
ns := node.Service
if ns.IsSidecarProxy() || ns.IsGateway() {
cfgIndex, mergedns, err := mergeNodeServiceWithCentralConfig(ws, state, args, ns, h.logger)
if err != nil {
return err
}
if cfgIndex > index {
index = cfgIndex
}
*node.Service = *mergedns
}
}
// Generate a hash of the resolvedNodes driving this response.
// Use it to determine if the response is identical to a prior wakeup.
newMergeHash, err := hashstructure_v2.Hash(resolvedNodes, hashstructure_v2.FormatV2, nil)
if err != nil {
return fmt.Errorf("error hashing reply for spurious wakeup suppression: %w", err)
}
if ranMergeOnce && priorMergeHash == newMergeHash {
// the below assignment is not required as the if condition already validates equality,
// but makes it more clear that prior value is being reset to the new hash on each run.
priorMergeHash = newMergeHash
reply.Index = index
// NOTE: the prior response is still alive inside of *reply, which is desirable
return errNotChanged
} else {
priorMergeHash = newMergeHash
ranMergeOnce = true
}
}
thisReply.Index, thisReply.Nodes = index, resolvedNodes
if len(args.NodeMetaFilters) > 0 { if len(args.NodeMetaFilters) > 0 {
thisReply.Nodes = nodeMetaFilter(args.NodeMetaFilters, thisReply.Nodes) thisReply.Nodes = nodeMetaFilter(args.NodeMetaFilters, thisReply.Nodes)

View File

@ -0,0 +1,408 @@
package consul
import (
"fmt"
"github.com/hashicorp/consul/agent/configentry"
"github.com/hashicorp/consul/agent/consul/state"
"github.com/hashicorp/consul/agent/structs"
"github.com/hashicorp/go-hclog"
memdb "github.com/hashicorp/go-memdb"
"github.com/imdario/mergo"
"github.com/mitchellh/copystructure"
)
// mergeNodeServiceWithCentralConfig merges a service instance (NodeService) with the
// proxy-defaults/global and service-defaults/:service config entries.
// This common helper is used by the blocking query function of different RPC endpoints
// that need to return a fully resolved service defintion.
func mergeNodeServiceWithCentralConfig(
ws memdb.WatchSet,
state *state.Store,
args *structs.ServiceSpecificRequest,
ns *structs.NodeService,
logger hclog.Logger) (uint64, *structs.NodeService, error) {
serviceName := ns.Service
var upstreams []structs.ServiceID
if ns.IsSidecarProxy() {
// This is a sidecar proxy, ignore the proxy service's config since we are
// managed by the target service config.
serviceName = ns.Proxy.DestinationServiceName
// Also if we have any upstreams defined, add them to the defaults lookup request
// so we can learn about their configs.
for _, us := range ns.Proxy.Upstreams {
if us.DestinationType == "" || us.DestinationType == structs.UpstreamDestTypeService {
sid := us.DestinationID()
sid.EnterpriseMeta.Merge(&ns.EnterpriseMeta)
upstreams = append(upstreams, sid)
}
}
}
configReq := &structs.ServiceConfigRequest{
Name: serviceName,
Datacenter: args.Datacenter,
QueryOptions: args.QueryOptions,
MeshGateway: ns.Proxy.MeshGateway,
Mode: ns.Proxy.Mode,
UpstreamIDs: upstreams,
EnterpriseMeta: ns.EnterpriseMeta,
}
// prefer using this vs directly calling the ConfigEntry.ResolveServiceConfig RPC
// so as to pass down the same watch set to also watch on changes to
// proxy-defaults/global and service-defaults.
cfgIndex, configEntries, err := state.ReadResolvedServiceConfigEntries(
ws,
configReq.Name,
&configReq.EnterpriseMeta,
upstreams,
configReq.Mode,
)
if err != nil {
return 0, nil, fmt.Errorf("Failure looking up service config entries for %s: %v",
ns.ID, err)
}
defaults, err := computeResolvedServiceConfig(
configReq,
upstreams,
false,
configEntries,
logger,
)
if err != nil {
return 0, nil, fmt.Errorf("Failure computing service defaults for %s: %v",
ns.ID, err)
}
mergedns, err := MergeServiceConfig(defaults, ns)
if err != nil {
return 0, nil, fmt.Errorf("Failure merging service definition with config entry defaults for %s: %v",
ns.ID, err)
}
return cfgIndex, mergedns, nil
}
func computeResolvedServiceConfig(
args *structs.ServiceConfigRequest,
upstreamIDs []structs.ServiceID,
legacyUpstreams bool,
entries *configentry.ResolvedServiceConfigSet,
logger hclog.Logger,
) (*structs.ServiceConfigResponse, error) {
var thisReply structs.ServiceConfigResponse
thisReply.MeshGateway.Mode = structs.MeshGatewayModeDefault
// TODO(freddy) Refactor this into smaller set of state store functions
// Pass the WatchSet to both the service and proxy config lookups. If either is updated during the
// blocking query, this function will be rerun and these state store lookups will both be current.
// We use the default enterprise meta to look up the global proxy defaults because they are not namespaced.
var proxyConfGlobalProtocol string
proxyConf := entries.GetProxyDefaults(args.PartitionOrDefault())
if proxyConf != nil {
// Apply the proxy defaults to the sidecar's proxy config
mapCopy, err := copystructure.Copy(proxyConf.Config)
if err != nil {
return nil, fmt.Errorf("failed to copy global proxy-defaults: %v", err)
}
thisReply.ProxyConfig = mapCopy.(map[string]interface{})
thisReply.Mode = proxyConf.Mode
thisReply.TransparentProxy = proxyConf.TransparentProxy
thisReply.MeshGateway = proxyConf.MeshGateway
thisReply.Expose = proxyConf.Expose
// Extract the global protocol from proxyConf for upstream configs.
rawProtocol := proxyConf.Config["protocol"]
if rawProtocol != nil {
var ok bool
proxyConfGlobalProtocol, ok = rawProtocol.(string)
if !ok {
return nil, fmt.Errorf("invalid protocol type %T", rawProtocol)
}
}
}
serviceConf := entries.GetServiceDefaults(
structs.NewServiceID(args.Name, &args.EnterpriseMeta),
)
if serviceConf != nil {
if serviceConf.Expose.Checks {
thisReply.Expose.Checks = true
}
if len(serviceConf.Expose.Paths) >= 1 {
thisReply.Expose.Paths = serviceConf.Expose.Paths
}
if serviceConf.MeshGateway.Mode != structs.MeshGatewayModeDefault {
thisReply.MeshGateway.Mode = serviceConf.MeshGateway.Mode
}
if serviceConf.Protocol != "" {
if thisReply.ProxyConfig == nil {
thisReply.ProxyConfig = make(map[string]interface{})
}
thisReply.ProxyConfig["protocol"] = serviceConf.Protocol
}
if serviceConf.TransparentProxy.OutboundListenerPort != 0 {
thisReply.TransparentProxy.OutboundListenerPort = serviceConf.TransparentProxy.OutboundListenerPort
}
if serviceConf.TransparentProxy.DialedDirectly {
thisReply.TransparentProxy.DialedDirectly = serviceConf.TransparentProxy.DialedDirectly
}
if serviceConf.Mode != structs.ProxyModeDefault {
thisReply.Mode = serviceConf.Mode
}
thisReply.Meta = serviceConf.Meta
}
// First collect all upstreams into a set of seen upstreams.
// Upstreams can come from:
// - Explicitly from proxy registrations, and therefore as an argument to this RPC endpoint
// - Implicitly from centralized upstream config in service-defaults
seenUpstreams := map[structs.ServiceID]struct{}{}
var (
noUpstreamArgs = len(upstreamIDs) == 0 && len(args.Upstreams) == 0
// Check the args and the resolved value. If it was exclusively set via a config entry, then args.Mode
// will never be transparent because the service config request does not use the resolved value.
tproxy = args.Mode == structs.ProxyModeTransparent || thisReply.Mode == structs.ProxyModeTransparent
)
// The upstreams passed as arguments to this endpoint are the upstreams explicitly defined in a proxy registration.
// If no upstreams were passed, then we should only return the resolved config if the proxy is in transparent mode.
// Otherwise we would return a resolved upstream config to a proxy with no configured upstreams.
if noUpstreamArgs && !tproxy {
return &thisReply, nil
}
// First store all upstreams that were provided in the request
for _, sid := range upstreamIDs {
if _, ok := seenUpstreams[sid]; !ok {
seenUpstreams[sid] = struct{}{}
}
}
// Then store upstreams inferred from service-defaults and mapify the overrides.
var (
upstreamConfigs = make(map[structs.ServiceID]*structs.UpstreamConfig)
upstreamDefaults *structs.UpstreamConfig
// usConfigs stores the opaque config map for each upstream and is keyed on the upstream's ID.
usConfigs = make(map[structs.ServiceID]map[string]interface{})
)
if serviceConf != nil && serviceConf.UpstreamConfig != nil {
for i, override := range serviceConf.UpstreamConfig.Overrides {
if override.Name == "" {
logger.Warn(
"Skipping UpstreamConfig.Overrides entry without a required name field",
"entryIndex", i,
"kind", serviceConf.GetKind(),
"name", serviceConf.GetName(),
"namespace", serviceConf.GetEnterpriseMeta().NamespaceOrEmpty(),
)
continue // skip this impossible condition
}
seenUpstreams[override.ServiceID()] = struct{}{}
upstreamConfigs[override.ServiceID()] = override
}
if serviceConf.UpstreamConfig.Defaults != nil {
upstreamDefaults = serviceConf.UpstreamConfig.Defaults
// Store the upstream defaults under a wildcard key so that they can be applied to
// upstreams that are inferred from intentions and do not have explicit upstream configuration.
cfgMap := make(map[string]interface{})
upstreamDefaults.MergeInto(cfgMap)
wildcard := structs.NewServiceID(structs.WildcardSpecifier, args.WithWildcardNamespace())
usConfigs[wildcard] = cfgMap
}
}
for upstream := range seenUpstreams {
resolvedCfg := make(map[string]interface{})
// The protocol of an upstream is resolved in this order:
// 1. Default protocol from proxy-defaults (how all services should be addressed)
// 2. Protocol for upstream service defined in its service-defaults (how the upstream wants to be addressed)
// 3. Protocol defined for the upstream in the service-defaults.(upstream_config.defaults|upstream_config.overrides) of the downstream
// (how the downstream wants to address it)
protocol := proxyConfGlobalProtocol
upstreamSvcDefaults := entries.GetServiceDefaults(
structs.NewServiceID(upstream.ID, &upstream.EnterpriseMeta),
)
if upstreamSvcDefaults != nil {
if upstreamSvcDefaults.Protocol != "" {
protocol = upstreamSvcDefaults.Protocol
}
}
if protocol != "" {
resolvedCfg["protocol"] = protocol
}
// Merge centralized defaults for all upstreams before configuration for specific upstreams
if upstreamDefaults != nil {
upstreamDefaults.MergeInto(resolvedCfg)
}
// The MeshGateway value from the proxy registration overrides the one from upstream_defaults
// because it is specific to the proxy instance.
//
// The goal is to flatten the mesh gateway mode in this order:
// 0. Value from centralized upstream_defaults
// 1. Value from local proxy registration
// 2. Value from centralized upstream_config
// 3. Value from local upstream definition. This last step is done in the client's service manager.
if !args.MeshGateway.IsZero() {
resolvedCfg["mesh_gateway"] = args.MeshGateway
}
if upstreamConfigs[upstream] != nil {
upstreamConfigs[upstream].MergeInto(resolvedCfg)
}
if len(resolvedCfg) > 0 {
usConfigs[upstream] = resolvedCfg
}
}
// don't allocate the slices just to not fill them
if len(usConfigs) == 0 {
return &thisReply, nil
}
if legacyUpstreams {
// For legacy upstreams we return a map that is only keyed on the string ID, since they precede namespaces
thisReply.UpstreamConfigs = make(map[string]map[string]interface{})
for us, conf := range usConfigs {
thisReply.UpstreamConfigs[us.ID] = conf
}
} else {
thisReply.UpstreamIDConfigs = make(structs.OpaqueUpstreamConfigs, 0, len(usConfigs))
for us, conf := range usConfigs {
thisReply.UpstreamIDConfigs = append(thisReply.UpstreamIDConfigs,
structs.OpaqueUpstreamConfig{Upstream: us, Config: conf})
}
}
return &thisReply, nil
}
// MergeServiceConfig merges the service into defaults to produce the final effective
// config for the specified service.
func MergeServiceConfig(defaults *structs.ServiceConfigResponse, service *structs.NodeService) (*structs.NodeService, error) {
if defaults == nil {
return service, nil
}
// We don't want to change s.registration in place since it is our source of
// truth about what was actually registered before defaults applied. So copy
// it first.
nsRaw, err := copystructure.Copy(service)
if err != nil {
return nil, err
}
// Merge proxy defaults
ns := nsRaw.(*structs.NodeService)
if err := mergo.Merge(&ns.Proxy.Config, defaults.ProxyConfig); err != nil {
return nil, err
}
if err := mergo.Merge(&ns.Proxy.Expose, defaults.Expose); err != nil {
return nil, err
}
if ns.Proxy.MeshGateway.Mode == structs.MeshGatewayModeDefault {
ns.Proxy.MeshGateway.Mode = defaults.MeshGateway.Mode
}
if ns.Proxy.Mode == structs.ProxyModeDefault {
ns.Proxy.Mode = defaults.Mode
}
if ns.Proxy.TransparentProxy.OutboundListenerPort == 0 {
ns.Proxy.TransparentProxy.OutboundListenerPort = defaults.TransparentProxy.OutboundListenerPort
}
if !ns.Proxy.TransparentProxy.DialedDirectly {
ns.Proxy.TransparentProxy.DialedDirectly = defaults.TransparentProxy.DialedDirectly
}
// remoteUpstreams contains synthetic Upstreams generated from central config (service-defaults.UpstreamConfigs).
remoteUpstreams := make(map[structs.ServiceID]structs.Upstream)
for _, us := range defaults.UpstreamIDConfigs {
parsed, err := structs.ParseUpstreamConfigNoDefaults(us.Config)
if err != nil {
return nil, fmt.Errorf("failed to parse upstream config map for %s: %v", us.Upstream.String(), err)
}
remoteUpstreams[us.Upstream] = structs.Upstream{
DestinationNamespace: us.Upstream.NamespaceOrDefault(),
DestinationPartition: us.Upstream.PartitionOrDefault(),
DestinationName: us.Upstream.ID,
Config: us.Config,
MeshGateway: parsed.MeshGateway,
CentrallyConfigured: true,
}
}
// localUpstreams stores the upstreams seen from the local registration so that we can merge in the synthetic entries.
// In transparent proxy mode ns.Proxy.Upstreams will likely be empty because users do not need to define upstreams explicitly.
// So to store upstream-specific flags from central config, we add entries to ns.Proxy.Upstream with those values.
localUpstreams := make(map[structs.ServiceID]struct{})
// Merge upstream defaults into the local registration
for i := range ns.Proxy.Upstreams {
// Get a pointer not a value copy of the upstream struct
us := &ns.Proxy.Upstreams[i]
if us.DestinationType != "" && us.DestinationType != structs.UpstreamDestTypeService {
continue
}
localUpstreams[us.DestinationID()] = struct{}{}
remoteCfg, ok := remoteUpstreams[us.DestinationID()]
if !ok {
// No config defaults to merge
continue
}
// The local upstream config mode has the highest precedence, so only overwrite when it's set to the default
if us.MeshGateway.Mode == structs.MeshGatewayModeDefault {
us.MeshGateway.Mode = remoteCfg.MeshGateway.Mode
}
// Merge in everything else that is read from the map
if err := mergo.Merge(&us.Config, remoteCfg.Config); err != nil {
return nil, err
}
// Delete the mesh gateway key from opaque config since this is the value that was resolved from
// the servers and NOT the final merged value for this upstream.
// Note that we use the "mesh_gateway" key and not other variants like "MeshGateway" because
// UpstreamConfig.MergeInto and ResolveServiceConfig only use "mesh_gateway".
delete(us.Config, "mesh_gateway")
}
// Ensure upstreams present in central config are represented in the local configuration.
// This does not apply outside of transparent mode because in that situation every possible upstream already exists
// inside of ns.Proxy.Upstreams.
if ns.Proxy.Mode == structs.ProxyModeTransparent {
for id, remote := range remoteUpstreams {
if _, ok := localUpstreams[id]; ok {
// Remote upstream is already present locally
continue
}
ns.Proxy.Upstreams = append(ns.Proxy.Upstreams, remote)
}
}
return ns, err
}

View File

@ -0,0 +1,458 @@
package consul
import (
"testing"
"github.com/hashicorp/consul/agent/structs"
"github.com/mitchellh/copystructure"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)
func Test_MergeServiceConfig_TransparentProxy(t *testing.T) {
type args struct {
defaults *structs.ServiceConfigResponse
service *structs.NodeService
}
tests := []struct {
name string
args args
want *structs.NodeService
}{
{
name: "inherit transparent proxy settings",
args: args{
defaults: &structs.ServiceConfigResponse{
Mode: structs.ProxyModeTransparent,
TransparentProxy: structs.TransparentProxyConfig{
OutboundListenerPort: 10101,
DialedDirectly: true,
},
},
service: &structs.NodeService{
ID: "foo-proxy",
Service: "foo-proxy",
Proxy: structs.ConnectProxyConfig{
DestinationServiceName: "foo",
DestinationServiceID: "foo",
Mode: structs.ProxyModeDefault,
TransparentProxy: structs.TransparentProxyConfig{},
},
},
},
want: &structs.NodeService{
ID: "foo-proxy",
Service: "foo-proxy",
Proxy: structs.ConnectProxyConfig{
DestinationServiceName: "foo",
DestinationServiceID: "foo",
Mode: structs.ProxyModeTransparent,
TransparentProxy: structs.TransparentProxyConfig{
OutboundListenerPort: 10101,
DialedDirectly: true,
},
},
},
},
{
name: "override transparent proxy settings",
args: args{
defaults: &structs.ServiceConfigResponse{
Mode: structs.ProxyModeTransparent,
TransparentProxy: structs.TransparentProxyConfig{
OutboundListenerPort: 10101,
DialedDirectly: false,
},
},
service: &structs.NodeService{
ID: "foo-proxy",
Service: "foo-proxy",
Proxy: structs.ConnectProxyConfig{
DestinationServiceName: "foo",
DestinationServiceID: "foo",
Mode: structs.ProxyModeDirect,
TransparentProxy: structs.TransparentProxyConfig{
OutboundListenerPort: 808,
DialedDirectly: true,
},
},
},
},
want: &structs.NodeService{
ID: "foo-proxy",
Service: "foo-proxy",
Proxy: structs.ConnectProxyConfig{
DestinationServiceName: "foo",
DestinationServiceID: "foo",
Mode: structs.ProxyModeDirect,
TransparentProxy: structs.TransparentProxyConfig{
OutboundListenerPort: 808,
DialedDirectly: true,
},
},
},
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
defaultsCopy, err := copystructure.Copy(tt.args.defaults)
require.NoError(t, err)
got, err := MergeServiceConfig(tt.args.defaults, tt.args.service)
require.NoError(t, err)
assert.Equal(t, tt.want, got)
// The input defaults must not be modified by the merge.
// See PR #10647
assert.Equal(t, tt.args.defaults, defaultsCopy)
})
}
}
func Test_MergeServiceConfig_UpstreamOverrides(t *testing.T) {
type args struct {
defaults *structs.ServiceConfigResponse
service *structs.NodeService
}
tests := []struct {
name string
args args
want *structs.NodeService
}{
{
name: "new config fields",
args: args{
defaults: &structs.ServiceConfigResponse{
UpstreamIDConfigs: structs.OpaqueUpstreamConfigs{
{
Upstream: structs.ServiceID{
ID: "zap",
EnterpriseMeta: *structs.DefaultEnterpriseMetaInDefaultPartition(),
},
Config: map[string]interface{}{
"passive_health_check": map[string]interface{}{
"Interval": int64(10),
"MaxFailures": int64(2),
},
"mesh_gateway": map[string]interface{}{
"Mode": "local",
},
"protocol": "grpc",
},
},
},
},
service: &structs.NodeService{
ID: "foo-proxy",
Service: "foo-proxy",
Proxy: structs.ConnectProxyConfig{
DestinationServiceName: "foo",
DestinationServiceID: "foo",
Upstreams: structs.Upstreams{
structs.Upstream{
DestinationNamespace: "default",
DestinationPartition: "default",
DestinationName: "zap",
},
},
},
},
},
want: &structs.NodeService{
ID: "foo-proxy",
Service: "foo-proxy",
Proxy: structs.ConnectProxyConfig{
DestinationServiceName: "foo",
DestinationServiceID: "foo",
Upstreams: structs.Upstreams{
structs.Upstream{
DestinationNamespace: "default",
DestinationPartition: "default",
DestinationName: "zap",
Config: map[string]interface{}{
"passive_health_check": map[string]interface{}{
"Interval": int64(10),
"MaxFailures": int64(2),
},
"protocol": "grpc",
},
MeshGateway: structs.MeshGatewayConfig{
Mode: structs.MeshGatewayModeLocal,
},
},
},
},
},
},
{
name: "remote upstream config expands local upstream list in transparent mode",
args: args{
defaults: &structs.ServiceConfigResponse{
UpstreamIDConfigs: structs.OpaqueUpstreamConfigs{
{
Upstream: structs.ServiceID{
ID: "zap",
EnterpriseMeta: *structs.DefaultEnterpriseMetaInDefaultPartition(),
},
Config: map[string]interface{}{
"protocol": "grpc",
},
},
},
},
service: &structs.NodeService{
ID: "foo-proxy",
Service: "foo-proxy",
Proxy: structs.ConnectProxyConfig{
DestinationServiceName: "foo",
DestinationServiceID: "foo",
Mode: structs.ProxyModeTransparent,
TransparentProxy: structs.TransparentProxyConfig{
OutboundListenerPort: 10101,
DialedDirectly: true,
},
Upstreams: structs.Upstreams{
structs.Upstream{
DestinationNamespace: "default",
DestinationPartition: "default",
DestinationName: "zip",
LocalBindPort: 8080,
Config: map[string]interface{}{
"protocol": "http",
},
},
},
},
},
},
want: &structs.NodeService{
ID: "foo-proxy",
Service: "foo-proxy",
Proxy: structs.ConnectProxyConfig{
DestinationServiceName: "foo",
DestinationServiceID: "foo",
Mode: structs.ProxyModeTransparent,
TransparentProxy: structs.TransparentProxyConfig{
OutboundListenerPort: 10101,
DialedDirectly: true,
},
Upstreams: structs.Upstreams{
structs.Upstream{
DestinationNamespace: "default",
DestinationPartition: "default",
DestinationName: "zip",
LocalBindPort: 8080,
Config: map[string]interface{}{
"protocol": "http",
},
},
structs.Upstream{
DestinationNamespace: "default",
DestinationPartition: "default",
DestinationName: "zap",
Config: map[string]interface{}{
"protocol": "grpc",
},
CentrallyConfigured: true,
},
},
},
},
},
{
name: "remote upstream config not added to local upstream list outside of transparent mode",
args: args{
defaults: &structs.ServiceConfigResponse{
UpstreamIDConfigs: structs.OpaqueUpstreamConfigs{
{
Upstream: structs.ServiceID{
ID: "zap",
EnterpriseMeta: *structs.DefaultEnterpriseMetaInDefaultPartition(),
},
Config: map[string]interface{}{
"protocol": "grpc",
},
},
},
},
service: &structs.NodeService{
ID: "foo-proxy",
Service: "foo-proxy",
Proxy: structs.ConnectProxyConfig{
DestinationServiceName: "foo",
DestinationServiceID: "foo",
Mode: structs.ProxyModeDirect,
Upstreams: structs.Upstreams{
structs.Upstream{
DestinationNamespace: "default",
DestinationPartition: "default",
DestinationName: "zip",
LocalBindPort: 8080,
Config: map[string]interface{}{
"protocol": "http",
},
},
},
},
},
},
want: &structs.NodeService{
ID: "foo-proxy",
Service: "foo-proxy",
Proxy: structs.ConnectProxyConfig{
DestinationServiceName: "foo",
DestinationServiceID: "foo",
Mode: structs.ProxyModeDirect,
Upstreams: structs.Upstreams{
structs.Upstream{
DestinationNamespace: "default",
DestinationPartition: "default",
DestinationName: "zip",
LocalBindPort: 8080,
Config: map[string]interface{}{
"protocol": "http",
},
},
},
},
},
},
{
name: "upstream mode from remote defaults overrides local default",
args: args{
defaults: &structs.ServiceConfigResponse{
UpstreamIDConfigs: structs.OpaqueUpstreamConfigs{
{
Upstream: structs.ServiceID{
ID: "zap",
EnterpriseMeta: *structs.DefaultEnterpriseMetaInDefaultPartition(),
},
Config: map[string]interface{}{
"mesh_gateway": map[string]interface{}{
"Mode": "local",
},
},
},
},
},
service: &structs.NodeService{
ID: "foo-proxy",
Service: "foo-proxy",
Proxy: structs.ConnectProxyConfig{
DestinationServiceName: "foo",
DestinationServiceID: "foo",
MeshGateway: structs.MeshGatewayConfig{
Mode: structs.MeshGatewayModeRemote,
},
Upstreams: structs.Upstreams{
structs.Upstream{
DestinationNamespace: "default",
DestinationPartition: "default",
DestinationName: "zap",
},
},
},
},
},
want: &structs.NodeService{
ID: "foo-proxy",
Service: "foo-proxy",
Proxy: structs.ConnectProxyConfig{
DestinationServiceName: "foo",
DestinationServiceID: "foo",
MeshGateway: structs.MeshGatewayConfig{
Mode: structs.MeshGatewayModeRemote,
},
Upstreams: structs.Upstreams{
structs.Upstream{
DestinationNamespace: "default",
DestinationPartition: "default",
DestinationName: "zap",
Config: map[string]interface{}{},
MeshGateway: structs.MeshGatewayConfig{
Mode: structs.MeshGatewayModeLocal,
},
},
},
},
},
},
{
name: "mode in local upstream config overrides all",
args: args{
defaults: &structs.ServiceConfigResponse{
UpstreamIDConfigs: structs.OpaqueUpstreamConfigs{
{
Upstream: structs.ServiceID{
ID: "zap",
EnterpriseMeta: *structs.DefaultEnterpriseMetaInDefaultPartition(),
},
Config: map[string]interface{}{
"mesh_gateway": map[string]interface{}{
"Mode": "local",
},
},
},
},
},
service: &structs.NodeService{
ID: "foo-proxy",
Service: "foo-proxy",
Proxy: structs.ConnectProxyConfig{
DestinationServiceName: "foo",
DestinationServiceID: "foo",
MeshGateway: structs.MeshGatewayConfig{
Mode: structs.MeshGatewayModeRemote,
},
Upstreams: structs.Upstreams{
structs.Upstream{
DestinationNamespace: "default",
DestinationPartition: "default",
DestinationName: "zap",
MeshGateway: structs.MeshGatewayConfig{
Mode: structs.MeshGatewayModeNone,
},
},
},
},
},
},
want: &structs.NodeService{
ID: "foo-proxy",
Service: "foo-proxy",
Proxy: structs.ConnectProxyConfig{
DestinationServiceName: "foo",
DestinationServiceID: "foo",
MeshGateway: structs.MeshGatewayConfig{
Mode: structs.MeshGatewayModeRemote,
},
Upstreams: structs.Upstreams{
structs.Upstream{
DestinationNamespace: "default",
DestinationPartition: "default",
DestinationName: "zap",
Config: map[string]interface{}{},
MeshGateway: structs.MeshGatewayConfig{
Mode: structs.MeshGatewayModeNone,
},
},
},
},
},
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
defaultsCopy, err := copystructure.Copy(tt.args.defaults)
require.NoError(t, err)
got, err := MergeServiceConfig(tt.args.defaults, tt.args.service)
require.NoError(t, err)
assert.Equal(t, tt.want, got)
// The input defaults must not be modified by the merge.
// See PR #10647
assert.Equal(t, tt.args.defaults, defaultsCopy)
})
}
}

View File

@ -10,7 +10,7 @@ func init() {
registerEndpoint(func(s *Server) interface{} { return &ConnectCA{srv: s, logger: s.loggers.Named(logging.Connect)} }) registerEndpoint(func(s *Server) interface{} { return &ConnectCA{srv: s, logger: s.loggers.Named(logging.Connect)} })
registerEndpoint(func(s *Server) interface{} { return &FederationState{s} }) registerEndpoint(func(s *Server) interface{} { return &FederationState{s} })
registerEndpoint(func(s *Server) interface{} { return &DiscoveryChain{s} }) registerEndpoint(func(s *Server) interface{} { return &DiscoveryChain{s} })
registerEndpoint(func(s *Server) interface{} { return &Health{s} }) registerEndpoint(func(s *Server) interface{} { return &Health{s, s.loggers.Named(logging.Health)} })
registerEndpoint(func(s *Server) interface{} { return &Intention{s, s.loggers.Named(logging.Intentions)} }) registerEndpoint(func(s *Server) interface{} { return &Intention{s, s.loggers.Named(logging.Intentions)} })
registerEndpoint(func(s *Server) interface{} { return &Internal{s, s.loggers.Named(logging.Internal)} }) registerEndpoint(func(s *Server) interface{} { return &Internal{s, s.loggers.Named(logging.Internal)} })
registerEndpoint(func(s *Server) interface{} { return &KVS{s, s.loggers.Named(logging.KV)} }) registerEndpoint(func(s *Server) interface{} { return &KVS{s, s.loggers.Named(logging.KV)} })

View File

@ -203,6 +203,10 @@ func (s *HTTPHandlers) healthServiceNodes(resp http.ResponseWriter, req *http.Re
args.TagFilter = true args.TagFilter = true
} }
if _, ok := params["merge-central-config"]; ok {
args.MergeCentralConfig = true
}
// Determine the prefix // Determine the prefix
var prefix string var prefix string
switch healthType { switch healthType {

View File

@ -1882,6 +1882,167 @@ func TestFilterNonPassing(t *testing.T) {
} }
} }
func TestListHealthyServiceNodes_MergeCentralConfig(t *testing.T) {
if testing.Short() {
t.Skip("too slow for testing.Short")
}
t.Parallel()
a := NewTestAgent(t, "")
defer a.Shutdown()
testrpc.WaitForLeader(t, a.RPC, "dc1")
// Register the service
registerServiceReq := structs.TestRegisterRequestProxy(t)
registerServiceReq.Check = &structs.HealthCheck{
Node: registerServiceReq.Node,
Name: "check1",
}
var out struct{}
assert.Nil(t, a.RPC("Catalog.Register", registerServiceReq, &out))
// Register proxy-defaults
proxyGlobalEntry := registerProxyDefaults(t, a)
// Register service-defaults
serviceDefaultsConfigEntry := registerServiceDefaults(t, a, registerServiceReq.Service.Proxy.DestinationServiceName)
type testCase struct {
testCaseName string
serviceName string
connect bool
}
run := func(t *testing.T, tc testCase) {
url := fmt.Sprintf("/v1/health/service/%s?merge-central-config", tc.serviceName)
if tc.connect {
url = fmt.Sprintf("/v1/health/connect/%s?merge-central-config", tc.serviceName)
}
req, _ := http.NewRequest("GET", url, nil)
resp := httptest.NewRecorder()
var obj interface{}
var err error
if tc.connect {
obj, err = a.srv.HealthConnectServiceNodes(resp, req)
} else {
obj, err = a.srv.HealthServiceNodes(resp, req)
}
assert.Nil(t, err)
assertIndex(t, resp)
checkServiceNodes := obj.(structs.CheckServiceNodes)
// validate response
assert.Len(t, checkServiceNodes, 1)
v := checkServiceNodes[0]
validateMergeCentralConfigResponse(t, v.Service.ToServiceNode(registerServiceReq.Node), registerServiceReq, proxyGlobalEntry, serviceDefaultsConfigEntry)
}
testCases := []testCase{
{
testCaseName: "List healthy service instances with merge-central-config",
serviceName: registerServiceReq.Service.Service,
},
{
testCaseName: "List healthy connect capable service instances with merge-central-config",
serviceName: registerServiceReq.Service.Proxy.DestinationServiceName,
connect: true,
},
}
for _, tc := range testCases {
t.Run(tc.testCaseName, func(t *testing.T) {
run(t, tc)
})
}
}
func TestHealthServiceNodes_MergeCentralConfigBlocking(t *testing.T) {
if testing.Short() {
t.Skip("too slow for testing.Short")
}
t.Parallel()
a := NewTestAgent(t, "")
defer a.Shutdown()
testrpc.WaitForLeader(t, a.RPC, "dc1")
// Register the service
registerServiceReq := structs.TestRegisterRequestProxy(t)
registerServiceReq.Check = &structs.HealthCheck{
Node: registerServiceReq.Node,
Name: "check1",
}
var out struct{}
assert.Nil(t, a.RPC("Catalog.Register", registerServiceReq, &out))
// Register proxy-defaults
proxyGlobalEntry := registerProxyDefaults(t, a)
// Run the query
rpcReq := structs.ServiceSpecificRequest{
Datacenter: "dc1",
ServiceName: registerServiceReq.Service.Service,
MergeCentralConfig: true,
}
var rpcResp structs.IndexedCheckServiceNodes
assert.Nil(t, a.RPC("Health.ServiceNodes", &rpcReq, &rpcResp))
assert.Len(t, rpcResp.Nodes, 1)
nodeService := rpcResp.Nodes[0].Service
assert.Equal(t, registerServiceReq.Service.Service, nodeService.Service)
// validate proxy global defaults are resolved in the merged service config
assert.Equal(t, proxyGlobalEntry.Config, nodeService.Proxy.Config)
assert.Equal(t, proxyGlobalEntry.Mode, nodeService.Proxy.Mode)
// Async cause a change - register service defaults
waitIndex := rpcResp.Index
start := time.Now()
var serviceDefaultsConfigEntry structs.ServiceConfigEntry
go func() {
time.Sleep(100 * time.Millisecond)
// Register service-defaults
serviceDefaultsConfigEntry = registerServiceDefaults(t, a, registerServiceReq.Service.Proxy.DestinationServiceName)
}()
const waitDuration = 3 * time.Second
RUN_BLOCKING_QUERY:
url := fmt.Sprintf("/v1/health/service/%s?merge-central-config&wait=%s&index=%d",
registerServiceReq.Service.Service, waitDuration.String(), waitIndex)
req, _ := http.NewRequest("GET", url, nil)
resp := httptest.NewRecorder()
obj, err := a.srv.HealthServiceNodes(resp, req)
assert.Nil(t, err)
assertIndex(t, resp)
elapsed := time.Since(start)
idx := getIndex(t, resp)
if idx < waitIndex {
t.Fatalf("bad index returned: %v", idx)
} else if idx == waitIndex {
if elapsed > waitDuration {
// This should prevent the loop from running longer than the waitDuration
t.Fatalf("too slow: %v", elapsed)
}
goto RUN_BLOCKING_QUERY
}
// Should block at least 100ms before getting the changed results
if elapsed < 100*time.Millisecond {
t.Fatalf("too fast: %v", elapsed)
}
checkServiceNodes := obj.(structs.CheckServiceNodes)
// validate response
assert.Len(t, checkServiceNodes, 1)
v := checkServiceNodes[0].Service.ToServiceNode(registerServiceReq.Node)
validateMergeCentralConfigResponse(t, v, registerServiceReq, proxyGlobalEntry, serviceDefaultsConfigEntry)
}
func peerQuerySuffix(peerName string) string { func peerQuerySuffix(peerName string) string {
if peerName == "" { if peerName == "" {
return "" return ""

View File

@ -38,7 +38,9 @@ func (c *Client) ServiceNodes(
ctx context.Context, ctx context.Context,
req structs.ServiceSpecificRequest, req structs.ServiceSpecificRequest,
) (structs.IndexedCheckServiceNodes, cache.ResultMeta, error) { ) (structs.IndexedCheckServiceNodes, cache.ResultMeta, error) {
if c.useStreaming(req) && (req.QueryOptions.UseCache || req.QueryOptions.MinQueryIndex > 0) { // Note: if MergeCentralConfig is requested, default to using the RPC backend for now
// as the streaming backend and materializer does not have support for merging yet.
if c.useStreaming(req) && (req.QueryOptions.UseCache || req.QueryOptions.MinQueryIndex > 0) && !req.MergeCentralConfig {
c.QueryOptionDefaults(&req.QueryOptions) c.QueryOptionDefaults(&req.QueryOptions)
result, err := c.ViewStore.Get(ctx, c.newServiceRequest(req)) result, err := c.ViewStore.Get(ctx, c.newServiceRequest(req))

View File

@ -82,6 +82,16 @@ func TestClient_ServiceNodes_BackendRouting(t *testing.T) {
}, },
expected: useCache, expected: useCache,
}, },
{
name: "rpc if merge-central-config",
req: structs.ServiceSpecificRequest{
Datacenter: "dc1",
ServiceName: "web1",
MergeCentralConfig: true,
QueryOptions: structs.QueryOptions{MinQueryIndex: 22},
},
expected: useRPC,
},
} }
for _, tc := range testCases { for _, tc := range testCases {

View File

@ -4,12 +4,11 @@ import (
"fmt" "fmt"
"sync" "sync"
"github.com/imdario/mergo"
"github.com/mitchellh/copystructure"
"golang.org/x/net/context" "golang.org/x/net/context"
"github.com/hashicorp/consul/agent/cache" "github.com/hashicorp/consul/agent/cache"
cachetype "github.com/hashicorp/consul/agent/cache-types" cachetype "github.com/hashicorp/consul/agent/cache-types"
"github.com/hashicorp/consul/agent/consul"
"github.com/hashicorp/consul/agent/structs" "github.com/hashicorp/consul/agent/structs"
) )
@ -146,7 +145,7 @@ func (w *serviceConfigWatch) register(ctx context.Context) error {
// Merge the local registration with the central defaults and update this service // Merge the local registration with the central defaults and update this service
// in the local state. // in the local state.
merged, err := mergeServiceConfig(serviceDefaults, w.registration.Service) merged, err := consul.MergeServiceConfig(serviceDefaults, w.registration.Service)
if err != nil { if err != nil {
return err return err
} }
@ -276,7 +275,7 @@ func (w *serviceConfigWatch) handleUpdate(ctx context.Context, event cache.Updat
// Merge the local registration with the central defaults and update this service // Merge the local registration with the central defaults and update this service
// in the local state. // in the local state.
merged, err := mergeServiceConfig(serviceDefaults, w.registration.Service) merged, err := consul.MergeServiceConfig(serviceDefaults, w.registration.Service)
if err != nil { if err != nil {
return err return err
} }
@ -348,114 +347,3 @@ func makeConfigRequest(bd BaseDeps, addReq AddServiceRequest) *structs.ServiceCo
} }
return req return req
} }
// mergeServiceConfig from service into defaults to produce the final effective
// config for the watched service.
func mergeServiceConfig(defaults *structs.ServiceConfigResponse, service *structs.NodeService) (*structs.NodeService, error) {
if defaults == nil {
return service, nil
}
// We don't want to change s.registration in place since it is our source of
// truth about what was actually registered before defaults applied. So copy
// it first.
nsRaw, err := copystructure.Copy(service)
if err != nil {
return nil, err
}
// Merge proxy defaults
ns := nsRaw.(*structs.NodeService)
if err := mergo.Merge(&ns.Proxy.Config, defaults.ProxyConfig); err != nil {
return nil, err
}
if err := mergo.Merge(&ns.Proxy.Expose, defaults.Expose); err != nil {
return nil, err
}
if ns.Proxy.MeshGateway.Mode == structs.MeshGatewayModeDefault {
ns.Proxy.MeshGateway.Mode = defaults.MeshGateway.Mode
}
if ns.Proxy.Mode == structs.ProxyModeDefault {
ns.Proxy.Mode = defaults.Mode
}
if ns.Proxy.TransparentProxy.OutboundListenerPort == 0 {
ns.Proxy.TransparentProxy.OutboundListenerPort = defaults.TransparentProxy.OutboundListenerPort
}
if !ns.Proxy.TransparentProxy.DialedDirectly {
ns.Proxy.TransparentProxy.DialedDirectly = defaults.TransparentProxy.DialedDirectly
}
// remoteUpstreams contains synthetic Upstreams generated from central config (service-defaults.UpstreamConfigs).
remoteUpstreams := make(map[structs.ServiceID]structs.Upstream)
for _, us := range defaults.UpstreamIDConfigs {
parsed, err := structs.ParseUpstreamConfigNoDefaults(us.Config)
if err != nil {
return nil, fmt.Errorf("failed to parse upstream config map for %s: %v", us.Upstream.String(), err)
}
remoteUpstreams[us.Upstream] = structs.Upstream{
DestinationNamespace: us.Upstream.NamespaceOrDefault(),
DestinationPartition: us.Upstream.PartitionOrDefault(),
DestinationName: us.Upstream.ID,
Config: us.Config,
MeshGateway: parsed.MeshGateway,
CentrallyConfigured: true,
}
}
// localUpstreams stores the upstreams seen from the local registration so that we can merge in the synthetic entries.
// In transparent proxy mode ns.Proxy.Upstreams will likely be empty because users do not need to define upstreams explicitly.
// So to store upstream-specific flags from central config, we add entries to ns.Proxy.Upstream with those values.
localUpstreams := make(map[structs.ServiceID]struct{})
// Merge upstream defaults into the local registration
for i := range ns.Proxy.Upstreams {
// Get a pointer not a value copy of the upstream struct
us := &ns.Proxy.Upstreams[i]
if us.DestinationType != "" && us.DestinationType != structs.UpstreamDestTypeService {
continue
}
localUpstreams[us.DestinationID()] = struct{}{}
remoteCfg, ok := remoteUpstreams[us.DestinationID()]
if !ok {
// No config defaults to merge
continue
}
// The local upstream config mode has the highest precedence, so only overwrite when it's set to the default
if us.MeshGateway.Mode == structs.MeshGatewayModeDefault {
us.MeshGateway.Mode = remoteCfg.MeshGateway.Mode
}
// Merge in everything else that is read from the map
if err := mergo.Merge(&us.Config, remoteCfg.Config); err != nil {
return nil, err
}
// Delete the mesh gateway key from opaque config since this is the value that was resolved from
// the servers and NOT the final merged value for this upstream.
// Note that we use the "mesh_gateway" key and not other variants like "MeshGateway" because
// UpstreamConfig.MergeInto and ResolveServiceConfig only use "mesh_gateway".
delete(us.Config, "mesh_gateway")
}
// Ensure upstreams present in central config are represented in the local configuration.
// This does not apply outside of transparent mode because in that situation every possible upstream already exists
// inside of ns.Proxy.Upstreams.
if ns.Proxy.Mode == structs.ProxyModeTransparent {
for id, remote := range remoteUpstreams {
if _, ok := localUpstreams[id]; ok {
// Remote upstream is already present locally
continue
}
ns.Proxy.Upstreams = append(ns.Proxy.Upstreams, remote)
}
}
return ns, err
}

View File

@ -8,9 +8,6 @@ import (
"path/filepath" "path/filepath"
"testing" "testing"
"github.com/mitchellh/copystructure"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require" "github.com/stretchr/testify/require"
"github.com/hashicorp/consul/agent/structs" "github.com/hashicorp/consul/agent/structs"
@ -860,451 +857,3 @@ func convertToMap(v interface{}) (map[string]interface{}, error) {
return raw, nil return raw, nil
} }
func Test_mergeServiceConfig_UpstreamOverrides(t *testing.T) {
type args struct {
defaults *structs.ServiceConfigResponse
service *structs.NodeService
}
tests := []struct {
name string
args args
want *structs.NodeService
}{
{
name: "new config fields",
args: args{
defaults: &structs.ServiceConfigResponse{
UpstreamIDConfigs: structs.OpaqueUpstreamConfigs{
{
Upstream: structs.ServiceID{
ID: "zap",
EnterpriseMeta: *structs.DefaultEnterpriseMetaInDefaultPartition(),
},
Config: map[string]interface{}{
"passive_health_check": map[string]interface{}{
"Interval": int64(10),
"MaxFailures": int64(2),
},
"mesh_gateway": map[string]interface{}{
"Mode": "local",
},
"protocol": "grpc",
},
},
},
},
service: &structs.NodeService{
ID: "foo-proxy",
Service: "foo-proxy",
Proxy: structs.ConnectProxyConfig{
DestinationServiceName: "foo",
DestinationServiceID: "foo",
Upstreams: structs.Upstreams{
structs.Upstream{
DestinationNamespace: "default",
DestinationPartition: "default",
DestinationName: "zap",
},
},
},
},
},
want: &structs.NodeService{
ID: "foo-proxy",
Service: "foo-proxy",
Proxy: structs.ConnectProxyConfig{
DestinationServiceName: "foo",
DestinationServiceID: "foo",
Upstreams: structs.Upstreams{
structs.Upstream{
DestinationNamespace: "default",
DestinationPartition: "default",
DestinationName: "zap",
Config: map[string]interface{}{
"passive_health_check": map[string]interface{}{
"Interval": int64(10),
"MaxFailures": int64(2),
},
"protocol": "grpc",
},
MeshGateway: structs.MeshGatewayConfig{
Mode: structs.MeshGatewayModeLocal,
},
},
},
},
},
},
{
name: "remote upstream config expands local upstream list in transparent mode",
args: args{
defaults: &structs.ServiceConfigResponse{
UpstreamIDConfigs: structs.OpaqueUpstreamConfigs{
{
Upstream: structs.ServiceID{
ID: "zap",
EnterpriseMeta: *structs.DefaultEnterpriseMetaInDefaultPartition(),
},
Config: map[string]interface{}{
"protocol": "grpc",
},
},
},
},
service: &structs.NodeService{
ID: "foo-proxy",
Service: "foo-proxy",
Proxy: structs.ConnectProxyConfig{
DestinationServiceName: "foo",
DestinationServiceID: "foo",
Mode: structs.ProxyModeTransparent,
TransparentProxy: structs.TransparentProxyConfig{
OutboundListenerPort: 10101,
DialedDirectly: true,
},
Upstreams: structs.Upstreams{
structs.Upstream{
DestinationNamespace: "default",
DestinationPartition: "default",
DestinationName: "zip",
LocalBindPort: 8080,
Config: map[string]interface{}{
"protocol": "http",
},
},
},
},
},
},
want: &structs.NodeService{
ID: "foo-proxy",
Service: "foo-proxy",
Proxy: structs.ConnectProxyConfig{
DestinationServiceName: "foo",
DestinationServiceID: "foo",
Mode: structs.ProxyModeTransparent,
TransparentProxy: structs.TransparentProxyConfig{
OutboundListenerPort: 10101,
DialedDirectly: true,
},
Upstreams: structs.Upstreams{
structs.Upstream{
DestinationNamespace: "default",
DestinationPartition: "default",
DestinationName: "zip",
LocalBindPort: 8080,
Config: map[string]interface{}{
"protocol": "http",
},
},
structs.Upstream{
DestinationNamespace: "default",
DestinationPartition: "default",
DestinationName: "zap",
Config: map[string]interface{}{
"protocol": "grpc",
},
CentrallyConfigured: true,
},
},
},
},
},
{
name: "remote upstream config not added to local upstream list outside of transparent mode",
args: args{
defaults: &structs.ServiceConfigResponse{
UpstreamIDConfigs: structs.OpaqueUpstreamConfigs{
{
Upstream: structs.ServiceID{
ID: "zap",
EnterpriseMeta: *structs.DefaultEnterpriseMetaInDefaultPartition(),
},
Config: map[string]interface{}{
"protocol": "grpc",
},
},
},
},
service: &structs.NodeService{
ID: "foo-proxy",
Service: "foo-proxy",
Proxy: structs.ConnectProxyConfig{
DestinationServiceName: "foo",
DestinationServiceID: "foo",
Mode: structs.ProxyModeDirect,
Upstreams: structs.Upstreams{
structs.Upstream{
DestinationNamespace: "default",
DestinationPartition: "default",
DestinationName: "zip",
LocalBindPort: 8080,
Config: map[string]interface{}{
"protocol": "http",
},
},
},
},
},
},
want: &structs.NodeService{
ID: "foo-proxy",
Service: "foo-proxy",
Proxy: structs.ConnectProxyConfig{
DestinationServiceName: "foo",
DestinationServiceID: "foo",
Mode: structs.ProxyModeDirect,
Upstreams: structs.Upstreams{
structs.Upstream{
DestinationNamespace: "default",
DestinationPartition: "default",
DestinationName: "zip",
LocalBindPort: 8080,
Config: map[string]interface{}{
"protocol": "http",
},
},
},
},
},
},
{
name: "upstream mode from remote defaults overrides local default",
args: args{
defaults: &structs.ServiceConfigResponse{
UpstreamIDConfigs: structs.OpaqueUpstreamConfigs{
{
Upstream: structs.ServiceID{
ID: "zap",
EnterpriseMeta: *structs.DefaultEnterpriseMetaInDefaultPartition(),
},
Config: map[string]interface{}{
"mesh_gateway": map[string]interface{}{
"Mode": "local",
},
},
},
},
},
service: &structs.NodeService{
ID: "foo-proxy",
Service: "foo-proxy",
Proxy: structs.ConnectProxyConfig{
DestinationServiceName: "foo",
DestinationServiceID: "foo",
MeshGateway: structs.MeshGatewayConfig{
Mode: structs.MeshGatewayModeRemote,
},
Upstreams: structs.Upstreams{
structs.Upstream{
DestinationNamespace: "default",
DestinationPartition: "default",
DestinationName: "zap",
},
},
},
},
},
want: &structs.NodeService{
ID: "foo-proxy",
Service: "foo-proxy",
Proxy: structs.ConnectProxyConfig{
DestinationServiceName: "foo",
DestinationServiceID: "foo",
MeshGateway: structs.MeshGatewayConfig{
Mode: structs.MeshGatewayModeRemote,
},
Upstreams: structs.Upstreams{
structs.Upstream{
DestinationNamespace: "default",
DestinationPartition: "default",
DestinationName: "zap",
Config: map[string]interface{}{},
MeshGateway: structs.MeshGatewayConfig{
Mode: structs.MeshGatewayModeLocal,
},
},
},
},
},
},
{
name: "mode in local upstream config overrides all",
args: args{
defaults: &structs.ServiceConfigResponse{
UpstreamIDConfigs: structs.OpaqueUpstreamConfigs{
{
Upstream: structs.ServiceID{
ID: "zap",
EnterpriseMeta: *structs.DefaultEnterpriseMetaInDefaultPartition(),
},
Config: map[string]interface{}{
"mesh_gateway": map[string]interface{}{
"Mode": "local",
},
},
},
},
},
service: &structs.NodeService{
ID: "foo-proxy",
Service: "foo-proxy",
Proxy: structs.ConnectProxyConfig{
DestinationServiceName: "foo",
DestinationServiceID: "foo",
MeshGateway: structs.MeshGatewayConfig{
Mode: structs.MeshGatewayModeRemote,
},
Upstreams: structs.Upstreams{
structs.Upstream{
DestinationNamespace: "default",
DestinationPartition: "default",
DestinationName: "zap",
MeshGateway: structs.MeshGatewayConfig{
Mode: structs.MeshGatewayModeNone,
},
},
},
},
},
},
want: &structs.NodeService{
ID: "foo-proxy",
Service: "foo-proxy",
Proxy: structs.ConnectProxyConfig{
DestinationServiceName: "foo",
DestinationServiceID: "foo",
MeshGateway: structs.MeshGatewayConfig{
Mode: structs.MeshGatewayModeRemote,
},
Upstreams: structs.Upstreams{
structs.Upstream{
DestinationNamespace: "default",
DestinationPartition: "default",
DestinationName: "zap",
Config: map[string]interface{}{},
MeshGateway: structs.MeshGatewayConfig{
Mode: structs.MeshGatewayModeNone,
},
},
},
},
},
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
defaultsCopy, err := copystructure.Copy(tt.args.defaults)
require.NoError(t, err)
got, err := mergeServiceConfig(tt.args.defaults, tt.args.service)
require.NoError(t, err)
assert.Equal(t, tt.want, got)
// The input defaults must not be modified by the merge.
// See PR #10647
assert.Equal(t, tt.args.defaults, defaultsCopy)
})
}
}
func Test_mergeServiceConfig_TransparentProxy(t *testing.T) {
type args struct {
defaults *structs.ServiceConfigResponse
service *structs.NodeService
}
tests := []struct {
name string
args args
want *structs.NodeService
}{
{
name: "inherit transparent proxy settings",
args: args{
defaults: &structs.ServiceConfigResponse{
Mode: structs.ProxyModeTransparent,
TransparentProxy: structs.TransparentProxyConfig{
OutboundListenerPort: 10101,
DialedDirectly: true,
},
},
service: &structs.NodeService{
ID: "foo-proxy",
Service: "foo-proxy",
Proxy: structs.ConnectProxyConfig{
DestinationServiceName: "foo",
DestinationServiceID: "foo",
Mode: structs.ProxyModeDefault,
TransparentProxy: structs.TransparentProxyConfig{},
},
},
},
want: &structs.NodeService{
ID: "foo-proxy",
Service: "foo-proxy",
Proxy: structs.ConnectProxyConfig{
DestinationServiceName: "foo",
DestinationServiceID: "foo",
Mode: structs.ProxyModeTransparent,
TransparentProxy: structs.TransparentProxyConfig{
OutboundListenerPort: 10101,
DialedDirectly: true,
},
},
},
},
{
name: "override transparent proxy settings",
args: args{
defaults: &structs.ServiceConfigResponse{
Mode: structs.ProxyModeTransparent,
TransparentProxy: structs.TransparentProxyConfig{
OutboundListenerPort: 10101,
DialedDirectly: false,
},
},
service: &structs.NodeService{
ID: "foo-proxy",
Service: "foo-proxy",
Proxy: structs.ConnectProxyConfig{
DestinationServiceName: "foo",
DestinationServiceID: "foo",
Mode: structs.ProxyModeDirect,
TransparentProxy: structs.TransparentProxyConfig{
OutboundListenerPort: 808,
DialedDirectly: true,
},
},
},
},
want: &structs.NodeService{
ID: "foo-proxy",
Service: "foo-proxy",
Proxy: structs.ConnectProxyConfig{
DestinationServiceName: "foo",
DestinationServiceID: "foo",
Mode: structs.ProxyModeDirect,
TransparentProxy: structs.TransparentProxyConfig{
OutboundListenerPort: 808,
DialedDirectly: true,
},
},
},
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
defaultsCopy, err := copystructure.Copy(tt.args.defaults)
require.NoError(t, err)
got, err := mergeServiceConfig(tt.args.defaults, tt.args.service)
require.NoError(t, err)
assert.Equal(t, tt.want, got)
// The input defaults must not be modified by the merge.
// See PR #10647
assert.Equal(t, tt.args.defaults, defaultsCopy)
})
}
}

View File

@ -706,6 +706,12 @@ type ServiceSpecificRequest struct {
// Ingress if true will only search for Ingress gateways for the given service. // Ingress if true will only search for Ingress gateways for the given service.
Ingress bool Ingress bool
// MergeCentralConfig when set to true returns a service definition merged with
// the proxy-defaults/global and service-defaults/:service config entries.
// This can be used to ensure a full service definition is returned in the response
// especially when the service might not be written into the catalog that way.
MergeCentralConfig bool
acl.EnterpriseMeta `hcl:",squash" mapstructure:",squash"` acl.EnterpriseMeta `hcl:",squash" mapstructure:",squash"`
QueryOptions QueryOptions
} }
@ -752,6 +758,7 @@ func (r *ServiceSpecificRequest) CacheInfo() cache.RequestInfo {
r.PeerName, r.PeerName,
r.Ingress, r.Ingress,
r.ServiceKind, r.ServiceKind,
r.MergeCentralConfig,
}, nil) }, nil)
if err == nil { if err == nil {
// If there is an error, we don't set the key. A blank key forces // If there is an error, we don't set the key. A blank key forces
@ -1330,7 +1337,8 @@ func (t *ServiceConnect) UnmarshalJSON(data []byte) (err error) {
// IsSidecarProxy returns true if the NodeService is a sidecar proxy. // IsSidecarProxy returns true if the NodeService is a sidecar proxy.
func (s *NodeService) IsSidecarProxy() bool { func (s *NodeService) IsSidecarProxy() bool {
return s.Kind == ServiceKindConnectProxy && s.Proxy.DestinationServiceID != "" return s.Kind == ServiceKindConnectProxy &&
(s.Proxy.DestinationServiceID != "" || s.Proxy.DestinationServiceName != "")
} }
func (s *NodeService) IsGateway() bool { func (s *NodeService) IsGateway() bool {

View File

@ -190,6 +190,12 @@ type QueryOptions struct {
// Filter requests filtering data prior to it being returned. The string // Filter requests filtering data prior to it being returned. The string
// is a go-bexpr compatible expression. // is a go-bexpr compatible expression.
Filter string Filter string
// MergeCentralConfig returns a service definition merged with the
// proxy-defaults/global and service-defaults/:service config entries.
// This can be used to ensure a full service definition is returned in the response
// especially when the service might not be written into the catalog that way.
MergeCentralConfig bool
} }
func (o *QueryOptions) Context() context.Context { func (o *QueryOptions) Context() context.Context {
@ -858,6 +864,9 @@ func (r *request) setQueryOptions(q *QueryOptions) {
r.header.Set("Cache-Control", strings.Join(cc, ", ")) r.header.Set("Cache-Control", strings.Join(cc, ", "))
} }
} }
if q.MergeCentralConfig {
r.params.Set("merge-central-config", "")
}
r.ctx = q.ctx r.ctx = q.ctx
} }

View File

@ -61,4 +61,5 @@ const (
Watch string = "watch" Watch string = "watch"
XDS string = "xds" XDS string = "xds"
Vault string = "vault" Vault string = "vault"
Health string = "health"
) )