From e5f1d8dce47a6d4114650970f842662148281dfd Mon Sep 17 00:00:00 2001 From: Riddhi Shah Date: Wed, 25 May 2022 13:20:17 -0700 Subject: [PATCH] Add support for merge-central-config query param (#13001) Adds a new query param merge-central-config for use with the below endpoints: /catalog/service/:service /catalog/connect/:service /health/service/:service /health/connect/:service If set on the request, the response will include a fully resolved service definition which is merged with the proxy-defaults/global and service-defaults/:service config entries (on-demand style). This is useful to view the full service definition for a mesh service (connect-proxy kind or gateway kind) which might not be merged before being written into the catalog (example: in case of services in the agentless model). --- .changelog/13001.txt | 3 + agent/catalog_endpoint.go | 4 + agent/catalog_endpoint_test.go | 230 +++++++++++ agent/consul/catalog_endpoint.go | 53 ++- agent/consul/config_endpoint.go | 214 +--------- agent/consul/health_endpoint.go | 48 ++- agent/consul/merge_service_config.go | 408 +++++++++++++++++++ agent/consul/merge_service_config_test.go | 458 ++++++++++++++++++++++ agent/consul/server_register.go | 2 +- agent/health_endpoint.go | 4 + agent/health_endpoint_test.go | 161 ++++++++ agent/rpcclient/health/health.go | 4 +- agent/rpcclient/health/health_test.go | 10 + agent/service_manager.go | 118 +----- agent/service_manager_test.go | 451 --------------------- agent/structs/structs.go | 10 +- api/api.go | 9 + logging/names.go | 1 + 18 files changed, 1403 insertions(+), 785 deletions(-) create mode 100644 .changelog/13001.txt create mode 100644 agent/consul/merge_service_config.go create mode 100644 agent/consul/merge_service_config_test.go diff --git a/.changelog/13001.txt b/.changelog/13001.txt new file mode 100644 index 000000000..0d47397a1 --- /dev/null +++ b/.changelog/13001.txt @@ -0,0 +1,3 @@ +```release-note:enhancement +api: `merge-central-config` query parameter support added to some catalog and health endpoints to view a fully resolved service definition (especially when not written into the catalog that way). +``` diff --git a/agent/catalog_endpoint.go b/agent/catalog_endpoint.go index b873b8b5e..3617a3d3a 100644 --- a/agent/catalog_endpoint.go +++ b/agent/catalog_endpoint.go @@ -356,6 +356,10 @@ func (s *HTTPHandlers) catalogServiceNodes(resp http.ResponseWriter, req *http.R args.TagFilter = true } + if _, ok := params["merge-central-config"]; ok { + args.MergeCentralConfig = true + } + // Pull out the service name var err error args.ServiceName, err = getPathSuffixUnescaped(req.URL.Path, pathPrefix) diff --git a/agent/catalog_endpoint_test.go b/agent/catalog_endpoint_test.go index f1e08c076..68e3eddaf 100644 --- a/agent/catalog_endpoint_test.go +++ b/agent/catalog_endpoint_test.go @@ -1052,6 +1052,236 @@ func TestCatalogServiceNodes_ConnectProxy(t *testing.T) { assert.Equal(t, args.Service.Proxy, nodes[0].ServiceProxy) } +func registerProxyDefaults(t *testing.T, a *TestAgent) (proxyGlobalEntry structs.ProxyConfigEntry) { + t.Helper() + // Register proxy-defaults + proxyGlobalEntry = structs.ProxyConfigEntry{ + Kind: structs.ProxyDefaults, + Name: structs.ProxyConfigGlobal, + Mode: structs.ProxyModeDirect, + Config: map[string]interface{}{ + "local_connect_timeout_ms": uint64(1000), + "handshake_timeout_ms": uint64(1000), + }, + } + proxyDefaultsConfigEntryReq := &structs.ConfigEntryRequest{ + Op: structs.ConfigEntryUpsert, + Datacenter: "dc1", + Entry: &proxyGlobalEntry, + } + var proxyDefaultsConfigEntryResp bool + assert.Nil(t, a.RPC("ConfigEntry.Apply", &proxyDefaultsConfigEntryReq, &proxyDefaultsConfigEntryResp)) + return +} + +func registerServiceDefaults(t *testing.T, a *TestAgent, serviceName string) (serviceDefaultsConfigEntry structs.ServiceConfigEntry) { + t.Helper() + limits := 512 + serviceDefaultsConfigEntry = structs.ServiceConfigEntry{ + Kind: structs.ServiceDefaults, + Name: serviceName, + Mode: structs.ProxyModeTransparent, + UpstreamConfig: &structs.UpstreamConfiguration{ + Defaults: &structs.UpstreamConfig{ + MeshGateway: structs.MeshGatewayConfig{ + Mode: structs.MeshGatewayModeLocal, + }, + Limits: &structs.UpstreamLimits{ + MaxConnections: &limits, + }, + }, + }, + } + serviceDefaultsConfigEntryReq := &structs.ConfigEntryRequest{ + Op: structs.ConfigEntryUpsert, + Datacenter: "dc1", + Entry: &serviceDefaultsConfigEntry, + } + var serviceDefaultsConfigEntryResp bool + assert.Nil(t, a.RPC("ConfigEntry.Apply", &serviceDefaultsConfigEntryReq, &serviceDefaultsConfigEntryResp)) + return +} + +func validateMergeCentralConfigResponse(t *testing.T, v *structs.ServiceNode, + registerServiceReq *structs.RegisterRequest, + proxyGlobalEntry structs.ProxyConfigEntry, + serviceDefaultsConfigEntry structs.ServiceConfigEntry) { + + t.Helper() + assert.Equal(t, registerServiceReq.Service.Service, v.ServiceName) + // validate proxy global defaults are resolved in the merged service config + assert.Equal(t, proxyGlobalEntry.Config, v.ServiceProxy.Config) + // validate service defaults override proxy-defaults/global + assert.NotEqual(t, proxyGlobalEntry.Mode, v.ServiceProxy.Mode) + assert.Equal(t, serviceDefaultsConfigEntry.Mode, v.ServiceProxy.Mode) + // validate service defaults are resolved in the merged service config + // expected number of upstreams = (number of upstreams defined in the register request proxy config + + // 1 default from service defaults) + assert.Equal(t, len(registerServiceReq.Service.Proxy.Upstreams)+1, len(v.ServiceProxy.Upstreams)) + for _, up := range v.ServiceProxy.Upstreams { + if up.DestinationType != "" { + continue + } + assert.Contains(t, up.Config, "limits") + upstreamLimits := up.Config["limits"].(*structs.UpstreamLimits) + assert.Equal(t, serviceDefaultsConfigEntry.UpstreamConfig.Defaults.Limits.MaxConnections, upstreamLimits.MaxConnections) + assert.Equal(t, serviceDefaultsConfigEntry.UpstreamConfig.Defaults.MeshGateway.Mode, up.MeshGateway.Mode) + } +} + +func TestListServiceNodes_MergeCentralConfig(t *testing.T) { + if testing.Short() { + t.Skip("too slow for testing.Short") + } + + t.Parallel() + + a := NewTestAgent(t, "") + defer a.Shutdown() + testrpc.WaitForLeader(t, a.RPC, "dc1") + + // Register the service + registerServiceReq := structs.TestRegisterRequestProxy(t) + var out struct{} + assert.Nil(t, a.RPC("Catalog.Register", registerServiceReq, &out)) + + // Register proxy-defaults + proxyGlobalEntry := registerProxyDefaults(t, a) + + // Register service-defaults + serviceDefaultsConfigEntry := registerServiceDefaults(t, a, registerServiceReq.Service.Proxy.DestinationServiceName) + + type testCase struct { + testCaseName string + serviceName string + connect bool + } + + run := func(t *testing.T, tc testCase) { + url := fmt.Sprintf("/v1/catalog/service/%s?merge-central-config", tc.serviceName) + if tc.connect { + url = fmt.Sprintf("/v1/catalog/connect/%s?merge-central-config", tc.serviceName) + } + req, _ := http.NewRequest("GET", url, nil) + resp := httptest.NewRecorder() + var obj interface{} + var err error + if tc.connect { + obj, err = a.srv.CatalogConnectServiceNodes(resp, req) + } else { + obj, err = a.srv.CatalogServiceNodes(resp, req) + } + + assert.Nil(t, err) + assertIndex(t, resp) + + serviceNodes := obj.(structs.ServiceNodes) + + // validate response + assert.Len(t, serviceNodes, 1) + v := serviceNodes[0] + + validateMergeCentralConfigResponse(t, v, registerServiceReq, proxyGlobalEntry, serviceDefaultsConfigEntry) + } + testCases := []testCase{ + { + testCaseName: "List service instances with merge-central-config", + serviceName: registerServiceReq.Service.Service, + }, + { + testCaseName: "List connect capable service instances with merge-central-config", + serviceName: registerServiceReq.Service.Proxy.DestinationServiceName, + connect: true, + }, + } + for _, tc := range testCases { + t.Run(tc.testCaseName, func(t *testing.T) { + run(t, tc) + }) + } +} + +func TestCatalogServiceNodes_MergeCentralConfigBlocking(t *testing.T) { + if testing.Short() { + t.Skip("too slow for testing.Short") + } + + t.Parallel() + a := NewTestAgent(t, "") + defer a.Shutdown() + + testrpc.WaitForLeader(t, a.RPC, "dc1") + + // Register the service + registerServiceReq := structs.TestRegisterRequestProxy(t) + var out struct{} + assert.Nil(t, a.RPC("Catalog.Register", registerServiceReq, &out)) + + // Register proxy-defaults + proxyGlobalEntry := registerProxyDefaults(t, a) + + // Run the query + rpcReq := structs.ServiceSpecificRequest{ + Datacenter: "dc1", + ServiceName: registerServiceReq.Service.Service, + MergeCentralConfig: true, + } + var rpcResp structs.IndexedServiceNodes + assert.Nil(t, a.RPC("Catalog.ServiceNodes", &rpcReq, &rpcResp)) + + assert.Len(t, rpcResp.ServiceNodes, 1) + serviceNode := rpcResp.ServiceNodes[0] + assert.Equal(t, registerServiceReq.Service.Service, serviceNode.ServiceName) + // validate proxy global defaults are resolved in the merged service config + assert.Equal(t, proxyGlobalEntry.Config, serviceNode.ServiceProxy.Config) + assert.Equal(t, proxyGlobalEntry.Mode, serviceNode.ServiceProxy.Mode) + + // Async cause a change - register service defaults + waitIndex := rpcResp.Index + start := time.Now() + var serviceDefaultsConfigEntry structs.ServiceConfigEntry + go func() { + time.Sleep(100 * time.Millisecond) + // Register service-defaults + serviceDefaultsConfigEntry = registerServiceDefaults(t, a, registerServiceReq.Service.Proxy.DestinationServiceName) + }() + + const waitDuration = 3 * time.Second +RUN_BLOCKING_QUERY: + url := fmt.Sprintf("/v1/catalog/service/%s?merge-central-config&wait=%s&index=%d", + registerServiceReq.Service.Service, waitDuration.String(), waitIndex) + req, _ := http.NewRequest("GET", url, nil) + resp := httptest.NewRecorder() + obj, err := a.srv.CatalogServiceNodes(resp, req) + + assert.Nil(t, err) + assertIndex(t, resp) + + elapsed := time.Since(start) + idx := getIndex(t, resp) + if idx < waitIndex { + t.Fatalf("bad index returned: %v", idx) + } else if idx == waitIndex { + if elapsed > waitDuration { + // This should prevent the loop from running longer than the waitDuration + t.Fatalf("too slow: %v", elapsed) + } + goto RUN_BLOCKING_QUERY + } + // Should block at least 100ms before getting the changed results + if elapsed < 100*time.Millisecond { + t.Fatalf("too fast: %v", elapsed) + } + + serviceNodes := obj.(structs.ServiceNodes) + + // validate response + assert.Len(t, serviceNodes, 1) + v := serviceNodes[0] + + validateMergeCentralConfigResponse(t, v, registerServiceReq, proxyGlobalEntry, serviceDefaultsConfigEntry) +} + // Test that the Connect-compatible endpoints can be queried for a // service via /v1/catalog/connect/:service. func TestCatalogConnectServiceNodes_good(t *testing.T) { diff --git a/agent/consul/catalog_endpoint.go b/agent/consul/catalog_endpoint.go index 742ddd1b3..84826f3a8 100644 --- a/agent/consul/catalog_endpoint.go +++ b/agent/consul/catalog_endpoint.go @@ -12,6 +12,7 @@ import ( "github.com/hashicorp/go-hclog" "github.com/hashicorp/go-memdb" "github.com/hashicorp/go-uuid" + hashstructure_v2 "github.com/mitchellh/hashstructure/v2" "github.com/hashicorp/consul/acl" "github.com/hashicorp/consul/agent/consul/state" @@ -663,6 +664,11 @@ func (c *Catalog) ServiceNodes(args *structs.ServiceSpecificRequest, reply *stru return err } + var ( + priorMergeHash uint64 + ranMergeOnce bool + ) + err = c.srv.blockingQuery( &args.QueryOptions, &reply.QueryMeta, @@ -672,10 +678,53 @@ func (c *Catalog) ServiceNodes(args *structs.ServiceSpecificRequest, reply *stru return err } - reply.Index, reply.ServiceNodes = index, services + mergedServices := services + + if args.MergeCentralConfig { + var mergedServiceNodes structs.ServiceNodes + for _, sn := range services { + mergedsn := sn + ns := sn.ToNodeService() + if ns.IsSidecarProxy() || ns.IsGateway() { + cfgIndex, mergedns, err := mergeNodeServiceWithCentralConfig(ws, state, args, ns, c.logger) + if err != nil { + return err + } + if cfgIndex > index { + index = cfgIndex + } + mergedsn = mergedns.ToServiceNode(sn.Node) + } + mergedServiceNodes = append(mergedServiceNodes, mergedsn) + } + if len(mergedServiceNodes) > 0 { + mergedServices = mergedServiceNodes + } + + // Generate a hash of the mergedServices driving this response. + // Use it to determine if the response is identical to a prior wakeup. + newMergeHash, err := hashstructure_v2.Hash(mergedServices, hashstructure_v2.FormatV2, nil) + if err != nil { + return fmt.Errorf("error hashing reply for spurious wakeup suppression: %w", err) + } + if ranMergeOnce && priorMergeHash == newMergeHash { + // the below assignment is not required as the if condition already validates equality, + // but makes it more clear that prior value is being reset to the new hash on each run. + priorMergeHash = newMergeHash + reply.Index = index + // NOTE: the prior response is still alive inside of *reply, which is desirable + return errNotChanged + } else { + priorMergeHash = newMergeHash + ranMergeOnce = true + } + + } + + reply.Index, reply.ServiceNodes = index, mergedServices if len(args.NodeMetaFilters) > 0 { var filtered structs.ServiceNodes - for _, service := range services { + for _, service := range mergedServices { if structs.SatisfiesMetaFilters(service.NodeMeta, args.NodeMetaFilters) { filtered = append(filtered, service) } diff --git a/agent/consul/config_endpoint.go b/agent/consul/config_endpoint.go index 2e72f992e..0926fe9f5 100644 --- a/agent/consul/config_endpoint.go +++ b/agent/consul/config_endpoint.go @@ -9,11 +9,9 @@ import ( "github.com/armon/go-metrics/prometheus" "github.com/hashicorp/go-hclog" memdb "github.com/hashicorp/go-memdb" - "github.com/mitchellh/copystructure" hashstructure_v2 "github.com/mitchellh/hashstructure/v2" "github.com/hashicorp/consul/acl" - "github.com/hashicorp/consul/agent/configentry" "github.com/hashicorp/consul/agent/consul/state" "github.com/hashicorp/consul/agent/structs" ) @@ -482,7 +480,6 @@ func (c *ConfigEntry) ResolveServiceConfig(args *structs.ServiceConfigRequest, r } // Fetch all relevant config entries. - index, entries, err := state.ReadResolvedServiceConfigEntries( ws, args.Name, @@ -513,11 +510,12 @@ func (c *ConfigEntry) ResolveServiceConfig(args *structs.ServiceConfigRequest, r ranOnce = true } - thisReply, err := c.computeResolvedServiceConfig( + thisReply, err := computeResolvedServiceConfig( args, upstreamIDs, legacyUpstreams, entries, + c.logger, ) if err != nil { return err @@ -534,214 +532,6 @@ func (c *ConfigEntry) ResolveServiceConfig(args *structs.ServiceConfigRequest, r }) } -func (c *ConfigEntry) computeResolvedServiceConfig( - args *structs.ServiceConfigRequest, - upstreamIDs []structs.ServiceID, - legacyUpstreams bool, - entries *configentry.ResolvedServiceConfigSet, -) (*structs.ServiceConfigResponse, error) { - var thisReply structs.ServiceConfigResponse - - thisReply.MeshGateway.Mode = structs.MeshGatewayModeDefault - - // TODO(freddy) Refactor this into smaller set of state store functions - // Pass the WatchSet to both the service and proxy config lookups. If either is updated during the - // blocking query, this function will be rerun and these state store lookups will both be current. - // We use the default enterprise meta to look up the global proxy defaults because they are not namespaced. - var proxyConfGlobalProtocol string - proxyConf := entries.GetProxyDefaults(args.PartitionOrDefault()) - if proxyConf != nil { - // Apply the proxy defaults to the sidecar's proxy config - mapCopy, err := copystructure.Copy(proxyConf.Config) - if err != nil { - return nil, fmt.Errorf("failed to copy global proxy-defaults: %v", err) - } - thisReply.ProxyConfig = mapCopy.(map[string]interface{}) - thisReply.Mode = proxyConf.Mode - thisReply.TransparentProxy = proxyConf.TransparentProxy - thisReply.MeshGateway = proxyConf.MeshGateway - thisReply.Expose = proxyConf.Expose - - // Extract the global protocol from proxyConf for upstream configs. - rawProtocol := proxyConf.Config["protocol"] - if rawProtocol != nil { - var ok bool - proxyConfGlobalProtocol, ok = rawProtocol.(string) - if !ok { - return nil, fmt.Errorf("invalid protocol type %T", rawProtocol) - } - } - } - - serviceConf := entries.GetServiceDefaults( - structs.NewServiceID(args.Name, &args.EnterpriseMeta), - ) - if serviceConf != nil { - if serviceConf.Expose.Checks { - thisReply.Expose.Checks = true - } - if len(serviceConf.Expose.Paths) >= 1 { - thisReply.Expose.Paths = serviceConf.Expose.Paths - } - if serviceConf.MeshGateway.Mode != structs.MeshGatewayModeDefault { - thisReply.MeshGateway.Mode = serviceConf.MeshGateway.Mode - } - if serviceConf.Protocol != "" { - if thisReply.ProxyConfig == nil { - thisReply.ProxyConfig = make(map[string]interface{}) - } - thisReply.ProxyConfig["protocol"] = serviceConf.Protocol - } - if serviceConf.TransparentProxy.OutboundListenerPort != 0 { - thisReply.TransparentProxy.OutboundListenerPort = serviceConf.TransparentProxy.OutboundListenerPort - } - if serviceConf.TransparentProxy.DialedDirectly { - thisReply.TransparentProxy.DialedDirectly = serviceConf.TransparentProxy.DialedDirectly - } - if serviceConf.Mode != structs.ProxyModeDefault { - thisReply.Mode = serviceConf.Mode - } - - thisReply.Meta = serviceConf.Meta - } - - // First collect all upstreams into a set of seen upstreams. - // Upstreams can come from: - // - Explicitly from proxy registrations, and therefore as an argument to this RPC endpoint - // - Implicitly from centralized upstream config in service-defaults - seenUpstreams := map[structs.ServiceID]struct{}{} - - var ( - noUpstreamArgs = len(upstreamIDs) == 0 && len(args.Upstreams) == 0 - - // Check the args and the resolved value. If it was exclusively set via a config entry, then args.Mode - // will never be transparent because the service config request does not use the resolved value. - tproxy = args.Mode == structs.ProxyModeTransparent || thisReply.Mode == structs.ProxyModeTransparent - ) - - // The upstreams passed as arguments to this endpoint are the upstreams explicitly defined in a proxy registration. - // If no upstreams were passed, then we should only return the resolved config if the proxy is in transparent mode. - // Otherwise we would return a resolved upstream config to a proxy with no configured upstreams. - if noUpstreamArgs && !tproxy { - return &thisReply, nil - } - - // First store all upstreams that were provided in the request - for _, sid := range upstreamIDs { - if _, ok := seenUpstreams[sid]; !ok { - seenUpstreams[sid] = struct{}{} - } - } - - // Then store upstreams inferred from service-defaults and mapify the overrides. - var ( - upstreamConfigs = make(map[structs.ServiceID]*structs.UpstreamConfig) - upstreamDefaults *structs.UpstreamConfig - // usConfigs stores the opaque config map for each upstream and is keyed on the upstream's ID. - usConfigs = make(map[structs.ServiceID]map[string]interface{}) - ) - if serviceConf != nil && serviceConf.UpstreamConfig != nil { - for i, override := range serviceConf.UpstreamConfig.Overrides { - if override.Name == "" { - c.logger.Warn( - "Skipping UpstreamConfig.Overrides entry without a required name field", - "entryIndex", i, - "kind", serviceConf.GetKind(), - "name", serviceConf.GetName(), - "namespace", serviceConf.GetEnterpriseMeta().NamespaceOrEmpty(), - ) - continue // skip this impossible condition - } - seenUpstreams[override.ServiceID()] = struct{}{} - upstreamConfigs[override.ServiceID()] = override - } - if serviceConf.UpstreamConfig.Defaults != nil { - upstreamDefaults = serviceConf.UpstreamConfig.Defaults - - // Store the upstream defaults under a wildcard key so that they can be applied to - // upstreams that are inferred from intentions and do not have explicit upstream configuration. - cfgMap := make(map[string]interface{}) - upstreamDefaults.MergeInto(cfgMap) - - wildcard := structs.NewServiceID(structs.WildcardSpecifier, args.WithWildcardNamespace()) - usConfigs[wildcard] = cfgMap - } - } - - for upstream := range seenUpstreams { - resolvedCfg := make(map[string]interface{}) - - // The protocol of an upstream is resolved in this order: - // 1. Default protocol from proxy-defaults (how all services should be addressed) - // 2. Protocol for upstream service defined in its service-defaults (how the upstream wants to be addressed) - // 3. Protocol defined for the upstream in the service-defaults.(upstream_config.defaults|upstream_config.overrides) of the downstream - // (how the downstream wants to address it) - protocol := proxyConfGlobalProtocol - - upstreamSvcDefaults := entries.GetServiceDefaults( - structs.NewServiceID(upstream.ID, &upstream.EnterpriseMeta), - ) - if upstreamSvcDefaults != nil { - if upstreamSvcDefaults.Protocol != "" { - protocol = upstreamSvcDefaults.Protocol - } - } - - if protocol != "" { - resolvedCfg["protocol"] = protocol - } - - // Merge centralized defaults for all upstreams before configuration for specific upstreams - if upstreamDefaults != nil { - upstreamDefaults.MergeInto(resolvedCfg) - } - - // The MeshGateway value from the proxy registration overrides the one from upstream_defaults - // because it is specific to the proxy instance. - // - // The goal is to flatten the mesh gateway mode in this order: - // 0. Value from centralized upstream_defaults - // 1. Value from local proxy registration - // 2. Value from centralized upstream_config - // 3. Value from local upstream definition. This last step is done in the client's service manager. - if !args.MeshGateway.IsZero() { - resolvedCfg["mesh_gateway"] = args.MeshGateway - } - - if upstreamConfigs[upstream] != nil { - upstreamConfigs[upstream].MergeInto(resolvedCfg) - } - - if len(resolvedCfg) > 0 { - usConfigs[upstream] = resolvedCfg - } - } - - // don't allocate the slices just to not fill them - if len(usConfigs) == 0 { - return &thisReply, nil - } - - if legacyUpstreams { - // For legacy upstreams we return a map that is only keyed on the string ID, since they precede namespaces - thisReply.UpstreamConfigs = make(map[string]map[string]interface{}) - - for us, conf := range usConfigs { - thisReply.UpstreamConfigs[us.ID] = conf - } - - } else { - thisReply.UpstreamIDConfigs = make(structs.OpaqueUpstreamConfigs, 0, len(usConfigs)) - - for us, conf := range usConfigs { - thisReply.UpstreamIDConfigs = append(thisReply.UpstreamIDConfigs, - structs.OpaqueUpstreamConfig{Upstream: us, Config: conf}) - } - } - - return &thisReply, nil -} - func gateWriteToSecondary(targetDC, localDC, primaryDC, kind string) error { // ExportedServices entries are gated from interactions from secondary DCs // because non-default partitions cannot be created in secondaries diff --git a/agent/consul/health_endpoint.go b/agent/consul/health_endpoint.go index 5f38ba189..ee8f32888 100644 --- a/agent/consul/health_endpoint.go +++ b/agent/consul/health_endpoint.go @@ -6,7 +6,9 @@ import ( "github.com/armon/go-metrics" bexpr "github.com/hashicorp/go-bexpr" + "github.com/hashicorp/go-hclog" "github.com/hashicorp/go-memdb" + hashstructure_v2 "github.com/mitchellh/hashstructure/v2" "github.com/hashicorp/consul/acl" "github.com/hashicorp/consul/agent/consul/state" @@ -15,7 +17,8 @@ import ( // Health endpoint is used to query the health information type Health struct { - srv *Server + srv *Server + logger hclog.Logger } // ChecksInState is used to get all the checks in a given state @@ -232,6 +235,11 @@ func (h *Health) ServiceNodes(args *structs.ServiceSpecificRequest, reply *struc return err } + var ( + priorMergeHash uint64 + ranMergeOnce bool + ) + err = h.srv.blockingQuery( &args.QueryOptions, &reply.QueryMeta, @@ -243,7 +251,43 @@ func (h *Health) ServiceNodes(args *structs.ServiceSpecificRequest, reply *struc return err } - thisReply.Index, thisReply.Nodes = index, nodes + resolvedNodes := nodes + if args.MergeCentralConfig { + for _, node := range resolvedNodes { + ns := node.Service + if ns.IsSidecarProxy() || ns.IsGateway() { + cfgIndex, mergedns, err := mergeNodeServiceWithCentralConfig(ws, state, args, ns, h.logger) + if err != nil { + return err + } + if cfgIndex > index { + index = cfgIndex + } + *node.Service = *mergedns + } + } + + // Generate a hash of the resolvedNodes driving this response. + // Use it to determine if the response is identical to a prior wakeup. + newMergeHash, err := hashstructure_v2.Hash(resolvedNodes, hashstructure_v2.FormatV2, nil) + if err != nil { + return fmt.Errorf("error hashing reply for spurious wakeup suppression: %w", err) + } + if ranMergeOnce && priorMergeHash == newMergeHash { + // the below assignment is not required as the if condition already validates equality, + // but makes it more clear that prior value is being reset to the new hash on each run. + priorMergeHash = newMergeHash + reply.Index = index + // NOTE: the prior response is still alive inside of *reply, which is desirable + return errNotChanged + } else { + priorMergeHash = newMergeHash + ranMergeOnce = true + } + + } + + thisReply.Index, thisReply.Nodes = index, resolvedNodes if len(args.NodeMetaFilters) > 0 { thisReply.Nodes = nodeMetaFilter(args.NodeMetaFilters, thisReply.Nodes) diff --git a/agent/consul/merge_service_config.go b/agent/consul/merge_service_config.go new file mode 100644 index 000000000..802655390 --- /dev/null +++ b/agent/consul/merge_service_config.go @@ -0,0 +1,408 @@ +package consul + +import ( + "fmt" + + "github.com/hashicorp/consul/agent/configentry" + "github.com/hashicorp/consul/agent/consul/state" + "github.com/hashicorp/consul/agent/structs" + "github.com/hashicorp/go-hclog" + memdb "github.com/hashicorp/go-memdb" + "github.com/imdario/mergo" + "github.com/mitchellh/copystructure" +) + +// mergeNodeServiceWithCentralConfig merges a service instance (NodeService) with the +// proxy-defaults/global and service-defaults/:service config entries. +// This common helper is used by the blocking query function of different RPC endpoints +// that need to return a fully resolved service defintion. +func mergeNodeServiceWithCentralConfig( + ws memdb.WatchSet, + state *state.Store, + args *structs.ServiceSpecificRequest, + ns *structs.NodeService, + logger hclog.Logger) (uint64, *structs.NodeService, error) { + + serviceName := ns.Service + var upstreams []structs.ServiceID + if ns.IsSidecarProxy() { + // This is a sidecar proxy, ignore the proxy service's config since we are + // managed by the target service config. + serviceName = ns.Proxy.DestinationServiceName + + // Also if we have any upstreams defined, add them to the defaults lookup request + // so we can learn about their configs. + for _, us := range ns.Proxy.Upstreams { + if us.DestinationType == "" || us.DestinationType == structs.UpstreamDestTypeService { + sid := us.DestinationID() + sid.EnterpriseMeta.Merge(&ns.EnterpriseMeta) + upstreams = append(upstreams, sid) + } + } + } + + configReq := &structs.ServiceConfigRequest{ + Name: serviceName, + Datacenter: args.Datacenter, + QueryOptions: args.QueryOptions, + MeshGateway: ns.Proxy.MeshGateway, + Mode: ns.Proxy.Mode, + UpstreamIDs: upstreams, + EnterpriseMeta: ns.EnterpriseMeta, + } + + // prefer using this vs directly calling the ConfigEntry.ResolveServiceConfig RPC + // so as to pass down the same watch set to also watch on changes to + // proxy-defaults/global and service-defaults. + cfgIndex, configEntries, err := state.ReadResolvedServiceConfigEntries( + ws, + configReq.Name, + &configReq.EnterpriseMeta, + upstreams, + configReq.Mode, + ) + if err != nil { + return 0, nil, fmt.Errorf("Failure looking up service config entries for %s: %v", + ns.ID, err) + } + + defaults, err := computeResolvedServiceConfig( + configReq, + upstreams, + false, + configEntries, + logger, + ) + if err != nil { + return 0, nil, fmt.Errorf("Failure computing service defaults for %s: %v", + ns.ID, err) + } + + mergedns, err := MergeServiceConfig(defaults, ns) + if err != nil { + return 0, nil, fmt.Errorf("Failure merging service definition with config entry defaults for %s: %v", + ns.ID, err) + } + + return cfgIndex, mergedns, nil +} + +func computeResolvedServiceConfig( + args *structs.ServiceConfigRequest, + upstreamIDs []structs.ServiceID, + legacyUpstreams bool, + entries *configentry.ResolvedServiceConfigSet, + logger hclog.Logger, +) (*structs.ServiceConfigResponse, error) { + var thisReply structs.ServiceConfigResponse + + thisReply.MeshGateway.Mode = structs.MeshGatewayModeDefault + + // TODO(freddy) Refactor this into smaller set of state store functions + // Pass the WatchSet to both the service and proxy config lookups. If either is updated during the + // blocking query, this function will be rerun and these state store lookups will both be current. + // We use the default enterprise meta to look up the global proxy defaults because they are not namespaced. + var proxyConfGlobalProtocol string + proxyConf := entries.GetProxyDefaults(args.PartitionOrDefault()) + if proxyConf != nil { + // Apply the proxy defaults to the sidecar's proxy config + mapCopy, err := copystructure.Copy(proxyConf.Config) + if err != nil { + return nil, fmt.Errorf("failed to copy global proxy-defaults: %v", err) + } + thisReply.ProxyConfig = mapCopy.(map[string]interface{}) + thisReply.Mode = proxyConf.Mode + thisReply.TransparentProxy = proxyConf.TransparentProxy + thisReply.MeshGateway = proxyConf.MeshGateway + thisReply.Expose = proxyConf.Expose + + // Extract the global protocol from proxyConf for upstream configs. + rawProtocol := proxyConf.Config["protocol"] + if rawProtocol != nil { + var ok bool + proxyConfGlobalProtocol, ok = rawProtocol.(string) + if !ok { + return nil, fmt.Errorf("invalid protocol type %T", rawProtocol) + } + } + } + + serviceConf := entries.GetServiceDefaults( + structs.NewServiceID(args.Name, &args.EnterpriseMeta), + ) + if serviceConf != nil { + if serviceConf.Expose.Checks { + thisReply.Expose.Checks = true + } + if len(serviceConf.Expose.Paths) >= 1 { + thisReply.Expose.Paths = serviceConf.Expose.Paths + } + if serviceConf.MeshGateway.Mode != structs.MeshGatewayModeDefault { + thisReply.MeshGateway.Mode = serviceConf.MeshGateway.Mode + } + if serviceConf.Protocol != "" { + if thisReply.ProxyConfig == nil { + thisReply.ProxyConfig = make(map[string]interface{}) + } + thisReply.ProxyConfig["protocol"] = serviceConf.Protocol + } + if serviceConf.TransparentProxy.OutboundListenerPort != 0 { + thisReply.TransparentProxy.OutboundListenerPort = serviceConf.TransparentProxy.OutboundListenerPort + } + if serviceConf.TransparentProxy.DialedDirectly { + thisReply.TransparentProxy.DialedDirectly = serviceConf.TransparentProxy.DialedDirectly + } + if serviceConf.Mode != structs.ProxyModeDefault { + thisReply.Mode = serviceConf.Mode + } + + thisReply.Meta = serviceConf.Meta + } + + // First collect all upstreams into a set of seen upstreams. + // Upstreams can come from: + // - Explicitly from proxy registrations, and therefore as an argument to this RPC endpoint + // - Implicitly from centralized upstream config in service-defaults + seenUpstreams := map[structs.ServiceID]struct{}{} + + var ( + noUpstreamArgs = len(upstreamIDs) == 0 && len(args.Upstreams) == 0 + + // Check the args and the resolved value. If it was exclusively set via a config entry, then args.Mode + // will never be transparent because the service config request does not use the resolved value. + tproxy = args.Mode == structs.ProxyModeTransparent || thisReply.Mode == structs.ProxyModeTransparent + ) + + // The upstreams passed as arguments to this endpoint are the upstreams explicitly defined in a proxy registration. + // If no upstreams were passed, then we should only return the resolved config if the proxy is in transparent mode. + // Otherwise we would return a resolved upstream config to a proxy with no configured upstreams. + if noUpstreamArgs && !tproxy { + return &thisReply, nil + } + + // First store all upstreams that were provided in the request + for _, sid := range upstreamIDs { + if _, ok := seenUpstreams[sid]; !ok { + seenUpstreams[sid] = struct{}{} + } + } + + // Then store upstreams inferred from service-defaults and mapify the overrides. + var ( + upstreamConfigs = make(map[structs.ServiceID]*structs.UpstreamConfig) + upstreamDefaults *structs.UpstreamConfig + // usConfigs stores the opaque config map for each upstream and is keyed on the upstream's ID. + usConfigs = make(map[structs.ServiceID]map[string]interface{}) + ) + if serviceConf != nil && serviceConf.UpstreamConfig != nil { + for i, override := range serviceConf.UpstreamConfig.Overrides { + if override.Name == "" { + logger.Warn( + "Skipping UpstreamConfig.Overrides entry without a required name field", + "entryIndex", i, + "kind", serviceConf.GetKind(), + "name", serviceConf.GetName(), + "namespace", serviceConf.GetEnterpriseMeta().NamespaceOrEmpty(), + ) + continue // skip this impossible condition + } + seenUpstreams[override.ServiceID()] = struct{}{} + upstreamConfigs[override.ServiceID()] = override + } + if serviceConf.UpstreamConfig.Defaults != nil { + upstreamDefaults = serviceConf.UpstreamConfig.Defaults + + // Store the upstream defaults under a wildcard key so that they can be applied to + // upstreams that are inferred from intentions and do not have explicit upstream configuration. + cfgMap := make(map[string]interface{}) + upstreamDefaults.MergeInto(cfgMap) + + wildcard := structs.NewServiceID(structs.WildcardSpecifier, args.WithWildcardNamespace()) + usConfigs[wildcard] = cfgMap + } + } + + for upstream := range seenUpstreams { + resolvedCfg := make(map[string]interface{}) + + // The protocol of an upstream is resolved in this order: + // 1. Default protocol from proxy-defaults (how all services should be addressed) + // 2. Protocol for upstream service defined in its service-defaults (how the upstream wants to be addressed) + // 3. Protocol defined for the upstream in the service-defaults.(upstream_config.defaults|upstream_config.overrides) of the downstream + // (how the downstream wants to address it) + protocol := proxyConfGlobalProtocol + + upstreamSvcDefaults := entries.GetServiceDefaults( + structs.NewServiceID(upstream.ID, &upstream.EnterpriseMeta), + ) + if upstreamSvcDefaults != nil { + if upstreamSvcDefaults.Protocol != "" { + protocol = upstreamSvcDefaults.Protocol + } + } + + if protocol != "" { + resolvedCfg["protocol"] = protocol + } + + // Merge centralized defaults for all upstreams before configuration for specific upstreams + if upstreamDefaults != nil { + upstreamDefaults.MergeInto(resolvedCfg) + } + + // The MeshGateway value from the proxy registration overrides the one from upstream_defaults + // because it is specific to the proxy instance. + // + // The goal is to flatten the mesh gateway mode in this order: + // 0. Value from centralized upstream_defaults + // 1. Value from local proxy registration + // 2. Value from centralized upstream_config + // 3. Value from local upstream definition. This last step is done in the client's service manager. + if !args.MeshGateway.IsZero() { + resolvedCfg["mesh_gateway"] = args.MeshGateway + } + + if upstreamConfigs[upstream] != nil { + upstreamConfigs[upstream].MergeInto(resolvedCfg) + } + + if len(resolvedCfg) > 0 { + usConfigs[upstream] = resolvedCfg + } + } + + // don't allocate the slices just to not fill them + if len(usConfigs) == 0 { + return &thisReply, nil + } + + if legacyUpstreams { + // For legacy upstreams we return a map that is only keyed on the string ID, since they precede namespaces + thisReply.UpstreamConfigs = make(map[string]map[string]interface{}) + + for us, conf := range usConfigs { + thisReply.UpstreamConfigs[us.ID] = conf + } + + } else { + thisReply.UpstreamIDConfigs = make(structs.OpaqueUpstreamConfigs, 0, len(usConfigs)) + + for us, conf := range usConfigs { + thisReply.UpstreamIDConfigs = append(thisReply.UpstreamIDConfigs, + structs.OpaqueUpstreamConfig{Upstream: us, Config: conf}) + } + } + + return &thisReply, nil +} + +// MergeServiceConfig merges the service into defaults to produce the final effective +// config for the specified service. +func MergeServiceConfig(defaults *structs.ServiceConfigResponse, service *structs.NodeService) (*structs.NodeService, error) { + if defaults == nil { + return service, nil + } + + // We don't want to change s.registration in place since it is our source of + // truth about what was actually registered before defaults applied. So copy + // it first. + nsRaw, err := copystructure.Copy(service) + if err != nil { + return nil, err + } + + // Merge proxy defaults + ns := nsRaw.(*structs.NodeService) + + if err := mergo.Merge(&ns.Proxy.Config, defaults.ProxyConfig); err != nil { + return nil, err + } + if err := mergo.Merge(&ns.Proxy.Expose, defaults.Expose); err != nil { + return nil, err + } + + if ns.Proxy.MeshGateway.Mode == structs.MeshGatewayModeDefault { + ns.Proxy.MeshGateway.Mode = defaults.MeshGateway.Mode + } + if ns.Proxy.Mode == structs.ProxyModeDefault { + ns.Proxy.Mode = defaults.Mode + } + if ns.Proxy.TransparentProxy.OutboundListenerPort == 0 { + ns.Proxy.TransparentProxy.OutboundListenerPort = defaults.TransparentProxy.OutboundListenerPort + } + if !ns.Proxy.TransparentProxy.DialedDirectly { + ns.Proxy.TransparentProxy.DialedDirectly = defaults.TransparentProxy.DialedDirectly + } + + // remoteUpstreams contains synthetic Upstreams generated from central config (service-defaults.UpstreamConfigs). + remoteUpstreams := make(map[structs.ServiceID]structs.Upstream) + + for _, us := range defaults.UpstreamIDConfigs { + parsed, err := structs.ParseUpstreamConfigNoDefaults(us.Config) + if err != nil { + return nil, fmt.Errorf("failed to parse upstream config map for %s: %v", us.Upstream.String(), err) + } + + remoteUpstreams[us.Upstream] = structs.Upstream{ + DestinationNamespace: us.Upstream.NamespaceOrDefault(), + DestinationPartition: us.Upstream.PartitionOrDefault(), + DestinationName: us.Upstream.ID, + Config: us.Config, + MeshGateway: parsed.MeshGateway, + CentrallyConfigured: true, + } + } + + // localUpstreams stores the upstreams seen from the local registration so that we can merge in the synthetic entries. + // In transparent proxy mode ns.Proxy.Upstreams will likely be empty because users do not need to define upstreams explicitly. + // So to store upstream-specific flags from central config, we add entries to ns.Proxy.Upstream with those values. + localUpstreams := make(map[structs.ServiceID]struct{}) + + // Merge upstream defaults into the local registration + for i := range ns.Proxy.Upstreams { + // Get a pointer not a value copy of the upstream struct + us := &ns.Proxy.Upstreams[i] + if us.DestinationType != "" && us.DestinationType != structs.UpstreamDestTypeService { + continue + } + localUpstreams[us.DestinationID()] = struct{}{} + + remoteCfg, ok := remoteUpstreams[us.DestinationID()] + if !ok { + // No config defaults to merge + continue + } + + // The local upstream config mode has the highest precedence, so only overwrite when it's set to the default + if us.MeshGateway.Mode == structs.MeshGatewayModeDefault { + us.MeshGateway.Mode = remoteCfg.MeshGateway.Mode + } + + // Merge in everything else that is read from the map + if err := mergo.Merge(&us.Config, remoteCfg.Config); err != nil { + return nil, err + } + + // Delete the mesh gateway key from opaque config since this is the value that was resolved from + // the servers and NOT the final merged value for this upstream. + // Note that we use the "mesh_gateway" key and not other variants like "MeshGateway" because + // UpstreamConfig.MergeInto and ResolveServiceConfig only use "mesh_gateway". + delete(us.Config, "mesh_gateway") + } + + // Ensure upstreams present in central config are represented in the local configuration. + // This does not apply outside of transparent mode because in that situation every possible upstream already exists + // inside of ns.Proxy.Upstreams. + if ns.Proxy.Mode == structs.ProxyModeTransparent { + for id, remote := range remoteUpstreams { + if _, ok := localUpstreams[id]; ok { + // Remote upstream is already present locally + continue + } + + ns.Proxy.Upstreams = append(ns.Proxy.Upstreams, remote) + } + } + + return ns, err +} diff --git a/agent/consul/merge_service_config_test.go b/agent/consul/merge_service_config_test.go new file mode 100644 index 000000000..5a866dce2 --- /dev/null +++ b/agent/consul/merge_service_config_test.go @@ -0,0 +1,458 @@ +package consul + +import ( + "testing" + + "github.com/hashicorp/consul/agent/structs" + "github.com/mitchellh/copystructure" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" +) + +func Test_MergeServiceConfig_TransparentProxy(t *testing.T) { + type args struct { + defaults *structs.ServiceConfigResponse + service *structs.NodeService + } + tests := []struct { + name string + args args + want *structs.NodeService + }{ + { + name: "inherit transparent proxy settings", + args: args{ + defaults: &structs.ServiceConfigResponse{ + Mode: structs.ProxyModeTransparent, + TransparentProxy: structs.TransparentProxyConfig{ + OutboundListenerPort: 10101, + DialedDirectly: true, + }, + }, + service: &structs.NodeService{ + ID: "foo-proxy", + Service: "foo-proxy", + Proxy: structs.ConnectProxyConfig{ + DestinationServiceName: "foo", + DestinationServiceID: "foo", + Mode: structs.ProxyModeDefault, + TransparentProxy: structs.TransparentProxyConfig{}, + }, + }, + }, + want: &structs.NodeService{ + ID: "foo-proxy", + Service: "foo-proxy", + Proxy: structs.ConnectProxyConfig{ + DestinationServiceName: "foo", + DestinationServiceID: "foo", + Mode: structs.ProxyModeTransparent, + TransparentProxy: structs.TransparentProxyConfig{ + OutboundListenerPort: 10101, + DialedDirectly: true, + }, + }, + }, + }, + { + name: "override transparent proxy settings", + args: args{ + defaults: &structs.ServiceConfigResponse{ + Mode: structs.ProxyModeTransparent, + TransparentProxy: structs.TransparentProxyConfig{ + OutboundListenerPort: 10101, + DialedDirectly: false, + }, + }, + service: &structs.NodeService{ + ID: "foo-proxy", + Service: "foo-proxy", + Proxy: structs.ConnectProxyConfig{ + DestinationServiceName: "foo", + DestinationServiceID: "foo", + Mode: structs.ProxyModeDirect, + TransparentProxy: structs.TransparentProxyConfig{ + OutboundListenerPort: 808, + DialedDirectly: true, + }, + }, + }, + }, + want: &structs.NodeService{ + ID: "foo-proxy", + Service: "foo-proxy", + Proxy: structs.ConnectProxyConfig{ + DestinationServiceName: "foo", + DestinationServiceID: "foo", + Mode: structs.ProxyModeDirect, + TransparentProxy: structs.TransparentProxyConfig{ + OutboundListenerPort: 808, + DialedDirectly: true, + }, + }, + }, + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + defaultsCopy, err := copystructure.Copy(tt.args.defaults) + require.NoError(t, err) + + got, err := MergeServiceConfig(tt.args.defaults, tt.args.service) + require.NoError(t, err) + assert.Equal(t, tt.want, got) + + // The input defaults must not be modified by the merge. + // See PR #10647 + assert.Equal(t, tt.args.defaults, defaultsCopy) + }) + } +} + +func Test_MergeServiceConfig_UpstreamOverrides(t *testing.T) { + type args struct { + defaults *structs.ServiceConfigResponse + service *structs.NodeService + } + tests := []struct { + name string + args args + want *structs.NodeService + }{ + { + name: "new config fields", + args: args{ + defaults: &structs.ServiceConfigResponse{ + UpstreamIDConfigs: structs.OpaqueUpstreamConfigs{ + { + Upstream: structs.ServiceID{ + ID: "zap", + EnterpriseMeta: *structs.DefaultEnterpriseMetaInDefaultPartition(), + }, + Config: map[string]interface{}{ + "passive_health_check": map[string]interface{}{ + "Interval": int64(10), + "MaxFailures": int64(2), + }, + "mesh_gateway": map[string]interface{}{ + "Mode": "local", + }, + "protocol": "grpc", + }, + }, + }, + }, + service: &structs.NodeService{ + ID: "foo-proxy", + Service: "foo-proxy", + Proxy: structs.ConnectProxyConfig{ + DestinationServiceName: "foo", + DestinationServiceID: "foo", + Upstreams: structs.Upstreams{ + structs.Upstream{ + DestinationNamespace: "default", + DestinationPartition: "default", + DestinationName: "zap", + }, + }, + }, + }, + }, + want: &structs.NodeService{ + ID: "foo-proxy", + Service: "foo-proxy", + Proxy: structs.ConnectProxyConfig{ + DestinationServiceName: "foo", + DestinationServiceID: "foo", + Upstreams: structs.Upstreams{ + structs.Upstream{ + DestinationNamespace: "default", + DestinationPartition: "default", + DestinationName: "zap", + Config: map[string]interface{}{ + "passive_health_check": map[string]interface{}{ + "Interval": int64(10), + "MaxFailures": int64(2), + }, + "protocol": "grpc", + }, + MeshGateway: structs.MeshGatewayConfig{ + Mode: structs.MeshGatewayModeLocal, + }, + }, + }, + }, + }, + }, + { + name: "remote upstream config expands local upstream list in transparent mode", + args: args{ + defaults: &structs.ServiceConfigResponse{ + UpstreamIDConfigs: structs.OpaqueUpstreamConfigs{ + { + Upstream: structs.ServiceID{ + ID: "zap", + EnterpriseMeta: *structs.DefaultEnterpriseMetaInDefaultPartition(), + }, + Config: map[string]interface{}{ + "protocol": "grpc", + }, + }, + }, + }, + service: &structs.NodeService{ + ID: "foo-proxy", + Service: "foo-proxy", + Proxy: structs.ConnectProxyConfig{ + DestinationServiceName: "foo", + DestinationServiceID: "foo", + Mode: structs.ProxyModeTransparent, + TransparentProxy: structs.TransparentProxyConfig{ + OutboundListenerPort: 10101, + DialedDirectly: true, + }, + Upstreams: structs.Upstreams{ + structs.Upstream{ + DestinationNamespace: "default", + DestinationPartition: "default", + DestinationName: "zip", + LocalBindPort: 8080, + Config: map[string]interface{}{ + "protocol": "http", + }, + }, + }, + }, + }, + }, + want: &structs.NodeService{ + ID: "foo-proxy", + Service: "foo-proxy", + Proxy: structs.ConnectProxyConfig{ + DestinationServiceName: "foo", + DestinationServiceID: "foo", + Mode: structs.ProxyModeTransparent, + TransparentProxy: structs.TransparentProxyConfig{ + OutboundListenerPort: 10101, + DialedDirectly: true, + }, + Upstreams: structs.Upstreams{ + structs.Upstream{ + DestinationNamespace: "default", + DestinationPartition: "default", + DestinationName: "zip", + LocalBindPort: 8080, + Config: map[string]interface{}{ + "protocol": "http", + }, + }, + structs.Upstream{ + DestinationNamespace: "default", + DestinationPartition: "default", + DestinationName: "zap", + Config: map[string]interface{}{ + "protocol": "grpc", + }, + CentrallyConfigured: true, + }, + }, + }, + }, + }, + { + name: "remote upstream config not added to local upstream list outside of transparent mode", + args: args{ + defaults: &structs.ServiceConfigResponse{ + UpstreamIDConfigs: structs.OpaqueUpstreamConfigs{ + { + Upstream: structs.ServiceID{ + ID: "zap", + EnterpriseMeta: *structs.DefaultEnterpriseMetaInDefaultPartition(), + }, + Config: map[string]interface{}{ + "protocol": "grpc", + }, + }, + }, + }, + service: &structs.NodeService{ + ID: "foo-proxy", + Service: "foo-proxy", + Proxy: structs.ConnectProxyConfig{ + DestinationServiceName: "foo", + DestinationServiceID: "foo", + Mode: structs.ProxyModeDirect, + Upstreams: structs.Upstreams{ + structs.Upstream{ + DestinationNamespace: "default", + DestinationPartition: "default", + DestinationName: "zip", + LocalBindPort: 8080, + Config: map[string]interface{}{ + "protocol": "http", + }, + }, + }, + }, + }, + }, + want: &structs.NodeService{ + ID: "foo-proxy", + Service: "foo-proxy", + Proxy: structs.ConnectProxyConfig{ + DestinationServiceName: "foo", + DestinationServiceID: "foo", + Mode: structs.ProxyModeDirect, + Upstreams: structs.Upstreams{ + structs.Upstream{ + DestinationNamespace: "default", + DestinationPartition: "default", + DestinationName: "zip", + LocalBindPort: 8080, + Config: map[string]interface{}{ + "protocol": "http", + }, + }, + }, + }, + }, + }, + { + name: "upstream mode from remote defaults overrides local default", + args: args{ + defaults: &structs.ServiceConfigResponse{ + UpstreamIDConfigs: structs.OpaqueUpstreamConfigs{ + { + Upstream: structs.ServiceID{ + ID: "zap", + EnterpriseMeta: *structs.DefaultEnterpriseMetaInDefaultPartition(), + }, + Config: map[string]interface{}{ + "mesh_gateway": map[string]interface{}{ + "Mode": "local", + }, + }, + }, + }, + }, + service: &structs.NodeService{ + ID: "foo-proxy", + Service: "foo-proxy", + Proxy: structs.ConnectProxyConfig{ + DestinationServiceName: "foo", + DestinationServiceID: "foo", + MeshGateway: structs.MeshGatewayConfig{ + Mode: structs.MeshGatewayModeRemote, + }, + Upstreams: structs.Upstreams{ + structs.Upstream{ + DestinationNamespace: "default", + DestinationPartition: "default", + DestinationName: "zap", + }, + }, + }, + }, + }, + want: &structs.NodeService{ + ID: "foo-proxy", + Service: "foo-proxy", + Proxy: structs.ConnectProxyConfig{ + DestinationServiceName: "foo", + DestinationServiceID: "foo", + MeshGateway: structs.MeshGatewayConfig{ + Mode: structs.MeshGatewayModeRemote, + }, + Upstreams: structs.Upstreams{ + structs.Upstream{ + DestinationNamespace: "default", + DestinationPartition: "default", + DestinationName: "zap", + Config: map[string]interface{}{}, + MeshGateway: structs.MeshGatewayConfig{ + Mode: structs.MeshGatewayModeLocal, + }, + }, + }, + }, + }, + }, + { + name: "mode in local upstream config overrides all", + args: args{ + defaults: &structs.ServiceConfigResponse{ + UpstreamIDConfigs: structs.OpaqueUpstreamConfigs{ + { + Upstream: structs.ServiceID{ + ID: "zap", + EnterpriseMeta: *structs.DefaultEnterpriseMetaInDefaultPartition(), + }, + Config: map[string]interface{}{ + "mesh_gateway": map[string]interface{}{ + "Mode": "local", + }, + }, + }, + }, + }, + service: &structs.NodeService{ + ID: "foo-proxy", + Service: "foo-proxy", + Proxy: structs.ConnectProxyConfig{ + DestinationServiceName: "foo", + DestinationServiceID: "foo", + MeshGateway: structs.MeshGatewayConfig{ + Mode: structs.MeshGatewayModeRemote, + }, + Upstreams: structs.Upstreams{ + structs.Upstream{ + DestinationNamespace: "default", + DestinationPartition: "default", + DestinationName: "zap", + MeshGateway: structs.MeshGatewayConfig{ + Mode: structs.MeshGatewayModeNone, + }, + }, + }, + }, + }, + }, + want: &structs.NodeService{ + ID: "foo-proxy", + Service: "foo-proxy", + Proxy: structs.ConnectProxyConfig{ + DestinationServiceName: "foo", + DestinationServiceID: "foo", + MeshGateway: structs.MeshGatewayConfig{ + Mode: structs.MeshGatewayModeRemote, + }, + Upstreams: structs.Upstreams{ + structs.Upstream{ + DestinationNamespace: "default", + DestinationPartition: "default", + DestinationName: "zap", + Config: map[string]interface{}{}, + MeshGateway: structs.MeshGatewayConfig{ + Mode: structs.MeshGatewayModeNone, + }, + }, + }, + }, + }, + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + defaultsCopy, err := copystructure.Copy(tt.args.defaults) + require.NoError(t, err) + + got, err := MergeServiceConfig(tt.args.defaults, tt.args.service) + require.NoError(t, err) + assert.Equal(t, tt.want, got) + + // The input defaults must not be modified by the merge. + // See PR #10647 + assert.Equal(t, tt.args.defaults, defaultsCopy) + }) + } +} diff --git a/agent/consul/server_register.go b/agent/consul/server_register.go index 4f91a73e1..7268a3903 100644 --- a/agent/consul/server_register.go +++ b/agent/consul/server_register.go @@ -10,7 +10,7 @@ func init() { registerEndpoint(func(s *Server) interface{} { return &ConnectCA{srv: s, logger: s.loggers.Named(logging.Connect)} }) registerEndpoint(func(s *Server) interface{} { return &FederationState{s} }) registerEndpoint(func(s *Server) interface{} { return &DiscoveryChain{s} }) - registerEndpoint(func(s *Server) interface{} { return &Health{s} }) + registerEndpoint(func(s *Server) interface{} { return &Health{s, s.loggers.Named(logging.Health)} }) registerEndpoint(func(s *Server) interface{} { return &Intention{s, s.loggers.Named(logging.Intentions)} }) registerEndpoint(func(s *Server) interface{} { return &Internal{s, s.loggers.Named(logging.Internal)} }) registerEndpoint(func(s *Server) interface{} { return &KVS{s, s.loggers.Named(logging.KV)} }) diff --git a/agent/health_endpoint.go b/agent/health_endpoint.go index 656fc2a04..c8b8cf15c 100644 --- a/agent/health_endpoint.go +++ b/agent/health_endpoint.go @@ -203,6 +203,10 @@ func (s *HTTPHandlers) healthServiceNodes(resp http.ResponseWriter, req *http.Re args.TagFilter = true } + if _, ok := params["merge-central-config"]; ok { + args.MergeCentralConfig = true + } + // Determine the prefix var prefix string switch healthType { diff --git a/agent/health_endpoint_test.go b/agent/health_endpoint_test.go index 627d83ce2..da09efedb 100644 --- a/agent/health_endpoint_test.go +++ b/agent/health_endpoint_test.go @@ -1882,6 +1882,167 @@ func TestFilterNonPassing(t *testing.T) { } } +func TestListHealthyServiceNodes_MergeCentralConfig(t *testing.T) { + if testing.Short() { + t.Skip("too slow for testing.Short") + } + + t.Parallel() + + a := NewTestAgent(t, "") + defer a.Shutdown() + testrpc.WaitForLeader(t, a.RPC, "dc1") + + // Register the service + registerServiceReq := structs.TestRegisterRequestProxy(t) + registerServiceReq.Check = &structs.HealthCheck{ + Node: registerServiceReq.Node, + Name: "check1", + } + var out struct{} + assert.Nil(t, a.RPC("Catalog.Register", registerServiceReq, &out)) + + // Register proxy-defaults + proxyGlobalEntry := registerProxyDefaults(t, a) + + // Register service-defaults + serviceDefaultsConfigEntry := registerServiceDefaults(t, a, registerServiceReq.Service.Proxy.DestinationServiceName) + + type testCase struct { + testCaseName string + serviceName string + connect bool + } + + run := func(t *testing.T, tc testCase) { + url := fmt.Sprintf("/v1/health/service/%s?merge-central-config", tc.serviceName) + if tc.connect { + url = fmt.Sprintf("/v1/health/connect/%s?merge-central-config", tc.serviceName) + } + req, _ := http.NewRequest("GET", url, nil) + resp := httptest.NewRecorder() + var obj interface{} + var err error + if tc.connect { + obj, err = a.srv.HealthConnectServiceNodes(resp, req) + } else { + obj, err = a.srv.HealthServiceNodes(resp, req) + } + + assert.Nil(t, err) + assertIndex(t, resp) + + checkServiceNodes := obj.(structs.CheckServiceNodes) + + // validate response + assert.Len(t, checkServiceNodes, 1) + v := checkServiceNodes[0] + + validateMergeCentralConfigResponse(t, v.Service.ToServiceNode(registerServiceReq.Node), registerServiceReq, proxyGlobalEntry, serviceDefaultsConfigEntry) + } + testCases := []testCase{ + { + testCaseName: "List healthy service instances with merge-central-config", + serviceName: registerServiceReq.Service.Service, + }, + { + testCaseName: "List healthy connect capable service instances with merge-central-config", + serviceName: registerServiceReq.Service.Proxy.DestinationServiceName, + connect: true, + }, + } + for _, tc := range testCases { + t.Run(tc.testCaseName, func(t *testing.T) { + run(t, tc) + }) + } +} + +func TestHealthServiceNodes_MergeCentralConfigBlocking(t *testing.T) { + if testing.Short() { + t.Skip("too slow for testing.Short") + } + + t.Parallel() + a := NewTestAgent(t, "") + defer a.Shutdown() + + testrpc.WaitForLeader(t, a.RPC, "dc1") + + // Register the service + registerServiceReq := structs.TestRegisterRequestProxy(t) + registerServiceReq.Check = &structs.HealthCheck{ + Node: registerServiceReq.Node, + Name: "check1", + } + var out struct{} + assert.Nil(t, a.RPC("Catalog.Register", registerServiceReq, &out)) + + // Register proxy-defaults + proxyGlobalEntry := registerProxyDefaults(t, a) + + // Run the query + rpcReq := structs.ServiceSpecificRequest{ + Datacenter: "dc1", + ServiceName: registerServiceReq.Service.Service, + MergeCentralConfig: true, + } + var rpcResp structs.IndexedCheckServiceNodes + assert.Nil(t, a.RPC("Health.ServiceNodes", &rpcReq, &rpcResp)) + + assert.Len(t, rpcResp.Nodes, 1) + nodeService := rpcResp.Nodes[0].Service + assert.Equal(t, registerServiceReq.Service.Service, nodeService.Service) + // validate proxy global defaults are resolved in the merged service config + assert.Equal(t, proxyGlobalEntry.Config, nodeService.Proxy.Config) + assert.Equal(t, proxyGlobalEntry.Mode, nodeService.Proxy.Mode) + + // Async cause a change - register service defaults + waitIndex := rpcResp.Index + start := time.Now() + var serviceDefaultsConfigEntry structs.ServiceConfigEntry + go func() { + time.Sleep(100 * time.Millisecond) + // Register service-defaults + serviceDefaultsConfigEntry = registerServiceDefaults(t, a, registerServiceReq.Service.Proxy.DestinationServiceName) + }() + + const waitDuration = 3 * time.Second +RUN_BLOCKING_QUERY: + url := fmt.Sprintf("/v1/health/service/%s?merge-central-config&wait=%s&index=%d", + registerServiceReq.Service.Service, waitDuration.String(), waitIndex) + req, _ := http.NewRequest("GET", url, nil) + resp := httptest.NewRecorder() + obj, err := a.srv.HealthServiceNodes(resp, req) + + assert.Nil(t, err) + assertIndex(t, resp) + + elapsed := time.Since(start) + idx := getIndex(t, resp) + if idx < waitIndex { + t.Fatalf("bad index returned: %v", idx) + } else if idx == waitIndex { + if elapsed > waitDuration { + // This should prevent the loop from running longer than the waitDuration + t.Fatalf("too slow: %v", elapsed) + } + goto RUN_BLOCKING_QUERY + } + // Should block at least 100ms before getting the changed results + if elapsed < 100*time.Millisecond { + t.Fatalf("too fast: %v", elapsed) + } + + checkServiceNodes := obj.(structs.CheckServiceNodes) + + // validate response + assert.Len(t, checkServiceNodes, 1) + v := checkServiceNodes[0].Service.ToServiceNode(registerServiceReq.Node) + + validateMergeCentralConfigResponse(t, v, registerServiceReq, proxyGlobalEntry, serviceDefaultsConfigEntry) +} + func peerQuerySuffix(peerName string) string { if peerName == "" { return "" diff --git a/agent/rpcclient/health/health.go b/agent/rpcclient/health/health.go index 97ed1ae31..dd4be64ce 100644 --- a/agent/rpcclient/health/health.go +++ b/agent/rpcclient/health/health.go @@ -38,7 +38,9 @@ func (c *Client) ServiceNodes( ctx context.Context, req structs.ServiceSpecificRequest, ) (structs.IndexedCheckServiceNodes, cache.ResultMeta, error) { - if c.useStreaming(req) && (req.QueryOptions.UseCache || req.QueryOptions.MinQueryIndex > 0) { + // Note: if MergeCentralConfig is requested, default to using the RPC backend for now + // as the streaming backend and materializer does not have support for merging yet. + if c.useStreaming(req) && (req.QueryOptions.UseCache || req.QueryOptions.MinQueryIndex > 0) && !req.MergeCentralConfig { c.QueryOptionDefaults(&req.QueryOptions) result, err := c.ViewStore.Get(ctx, c.newServiceRequest(req)) diff --git a/agent/rpcclient/health/health_test.go b/agent/rpcclient/health/health_test.go index 0e6042cc9..00bc224b7 100644 --- a/agent/rpcclient/health/health_test.go +++ b/agent/rpcclient/health/health_test.go @@ -82,6 +82,16 @@ func TestClient_ServiceNodes_BackendRouting(t *testing.T) { }, expected: useCache, }, + { + name: "rpc if merge-central-config", + req: structs.ServiceSpecificRequest{ + Datacenter: "dc1", + ServiceName: "web1", + MergeCentralConfig: true, + QueryOptions: structs.QueryOptions{MinQueryIndex: 22}, + }, + expected: useRPC, + }, } for _, tc := range testCases { diff --git a/agent/service_manager.go b/agent/service_manager.go index 29c538e88..f9f449874 100644 --- a/agent/service_manager.go +++ b/agent/service_manager.go @@ -4,12 +4,11 @@ import ( "fmt" "sync" - "github.com/imdario/mergo" - "github.com/mitchellh/copystructure" "golang.org/x/net/context" "github.com/hashicorp/consul/agent/cache" cachetype "github.com/hashicorp/consul/agent/cache-types" + "github.com/hashicorp/consul/agent/consul" "github.com/hashicorp/consul/agent/structs" ) @@ -146,7 +145,7 @@ func (w *serviceConfigWatch) register(ctx context.Context) error { // Merge the local registration with the central defaults and update this service // in the local state. - merged, err := mergeServiceConfig(serviceDefaults, w.registration.Service) + merged, err := consul.MergeServiceConfig(serviceDefaults, w.registration.Service) if err != nil { return err } @@ -276,7 +275,7 @@ func (w *serviceConfigWatch) handleUpdate(ctx context.Context, event cache.Updat // Merge the local registration with the central defaults and update this service // in the local state. - merged, err := mergeServiceConfig(serviceDefaults, w.registration.Service) + merged, err := consul.MergeServiceConfig(serviceDefaults, w.registration.Service) if err != nil { return err } @@ -348,114 +347,3 @@ func makeConfigRequest(bd BaseDeps, addReq AddServiceRequest) *structs.ServiceCo } return req } - -// mergeServiceConfig from service into defaults to produce the final effective -// config for the watched service. -func mergeServiceConfig(defaults *structs.ServiceConfigResponse, service *structs.NodeService) (*structs.NodeService, error) { - if defaults == nil { - return service, nil - } - - // We don't want to change s.registration in place since it is our source of - // truth about what was actually registered before defaults applied. So copy - // it first. - nsRaw, err := copystructure.Copy(service) - if err != nil { - return nil, err - } - - // Merge proxy defaults - ns := nsRaw.(*structs.NodeService) - - if err := mergo.Merge(&ns.Proxy.Config, defaults.ProxyConfig); err != nil { - return nil, err - } - if err := mergo.Merge(&ns.Proxy.Expose, defaults.Expose); err != nil { - return nil, err - } - - if ns.Proxy.MeshGateway.Mode == structs.MeshGatewayModeDefault { - ns.Proxy.MeshGateway.Mode = defaults.MeshGateway.Mode - } - if ns.Proxy.Mode == structs.ProxyModeDefault { - ns.Proxy.Mode = defaults.Mode - } - if ns.Proxy.TransparentProxy.OutboundListenerPort == 0 { - ns.Proxy.TransparentProxy.OutboundListenerPort = defaults.TransparentProxy.OutboundListenerPort - } - if !ns.Proxy.TransparentProxy.DialedDirectly { - ns.Proxy.TransparentProxy.DialedDirectly = defaults.TransparentProxy.DialedDirectly - } - - // remoteUpstreams contains synthetic Upstreams generated from central config (service-defaults.UpstreamConfigs). - remoteUpstreams := make(map[structs.ServiceID]structs.Upstream) - - for _, us := range defaults.UpstreamIDConfigs { - parsed, err := structs.ParseUpstreamConfigNoDefaults(us.Config) - if err != nil { - return nil, fmt.Errorf("failed to parse upstream config map for %s: %v", us.Upstream.String(), err) - } - - remoteUpstreams[us.Upstream] = structs.Upstream{ - DestinationNamespace: us.Upstream.NamespaceOrDefault(), - DestinationPartition: us.Upstream.PartitionOrDefault(), - DestinationName: us.Upstream.ID, - Config: us.Config, - MeshGateway: parsed.MeshGateway, - CentrallyConfigured: true, - } - } - - // localUpstreams stores the upstreams seen from the local registration so that we can merge in the synthetic entries. - // In transparent proxy mode ns.Proxy.Upstreams will likely be empty because users do not need to define upstreams explicitly. - // So to store upstream-specific flags from central config, we add entries to ns.Proxy.Upstream with those values. - localUpstreams := make(map[structs.ServiceID]struct{}) - - // Merge upstream defaults into the local registration - for i := range ns.Proxy.Upstreams { - // Get a pointer not a value copy of the upstream struct - us := &ns.Proxy.Upstreams[i] - if us.DestinationType != "" && us.DestinationType != structs.UpstreamDestTypeService { - continue - } - localUpstreams[us.DestinationID()] = struct{}{} - - remoteCfg, ok := remoteUpstreams[us.DestinationID()] - if !ok { - // No config defaults to merge - continue - } - - // The local upstream config mode has the highest precedence, so only overwrite when it's set to the default - if us.MeshGateway.Mode == structs.MeshGatewayModeDefault { - us.MeshGateway.Mode = remoteCfg.MeshGateway.Mode - } - - // Merge in everything else that is read from the map - if err := mergo.Merge(&us.Config, remoteCfg.Config); err != nil { - return nil, err - } - - // Delete the mesh gateway key from opaque config since this is the value that was resolved from - // the servers and NOT the final merged value for this upstream. - // Note that we use the "mesh_gateway" key and not other variants like "MeshGateway" because - // UpstreamConfig.MergeInto and ResolveServiceConfig only use "mesh_gateway". - delete(us.Config, "mesh_gateway") - } - - // Ensure upstreams present in central config are represented in the local configuration. - // This does not apply outside of transparent mode because in that situation every possible upstream already exists - // inside of ns.Proxy.Upstreams. - if ns.Proxy.Mode == structs.ProxyModeTransparent { - for id, remote := range remoteUpstreams { - if _, ok := localUpstreams[id]; ok { - // Remote upstream is already present locally - continue - } - - ns.Proxy.Upstreams = append(ns.Proxy.Upstreams, remote) - } - } - - return ns, err -} diff --git a/agent/service_manager_test.go b/agent/service_manager_test.go index b21597773..cbbd9e5e9 100644 --- a/agent/service_manager_test.go +++ b/agent/service_manager_test.go @@ -8,9 +8,6 @@ import ( "path/filepath" "testing" - "github.com/mitchellh/copystructure" - - "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" "github.com/hashicorp/consul/agent/structs" @@ -860,451 +857,3 @@ func convertToMap(v interface{}) (map[string]interface{}, error) { return raw, nil } - -func Test_mergeServiceConfig_UpstreamOverrides(t *testing.T) { - type args struct { - defaults *structs.ServiceConfigResponse - service *structs.NodeService - } - tests := []struct { - name string - args args - want *structs.NodeService - }{ - { - name: "new config fields", - args: args{ - defaults: &structs.ServiceConfigResponse{ - UpstreamIDConfigs: structs.OpaqueUpstreamConfigs{ - { - Upstream: structs.ServiceID{ - ID: "zap", - EnterpriseMeta: *structs.DefaultEnterpriseMetaInDefaultPartition(), - }, - Config: map[string]interface{}{ - "passive_health_check": map[string]interface{}{ - "Interval": int64(10), - "MaxFailures": int64(2), - }, - "mesh_gateway": map[string]interface{}{ - "Mode": "local", - }, - "protocol": "grpc", - }, - }, - }, - }, - service: &structs.NodeService{ - ID: "foo-proxy", - Service: "foo-proxy", - Proxy: structs.ConnectProxyConfig{ - DestinationServiceName: "foo", - DestinationServiceID: "foo", - Upstreams: structs.Upstreams{ - structs.Upstream{ - DestinationNamespace: "default", - DestinationPartition: "default", - DestinationName: "zap", - }, - }, - }, - }, - }, - want: &structs.NodeService{ - ID: "foo-proxy", - Service: "foo-proxy", - Proxy: structs.ConnectProxyConfig{ - DestinationServiceName: "foo", - DestinationServiceID: "foo", - Upstreams: structs.Upstreams{ - structs.Upstream{ - DestinationNamespace: "default", - DestinationPartition: "default", - DestinationName: "zap", - Config: map[string]interface{}{ - "passive_health_check": map[string]interface{}{ - "Interval": int64(10), - "MaxFailures": int64(2), - }, - "protocol": "grpc", - }, - MeshGateway: structs.MeshGatewayConfig{ - Mode: structs.MeshGatewayModeLocal, - }, - }, - }, - }, - }, - }, - { - name: "remote upstream config expands local upstream list in transparent mode", - args: args{ - defaults: &structs.ServiceConfigResponse{ - UpstreamIDConfigs: structs.OpaqueUpstreamConfigs{ - { - Upstream: structs.ServiceID{ - ID: "zap", - EnterpriseMeta: *structs.DefaultEnterpriseMetaInDefaultPartition(), - }, - Config: map[string]interface{}{ - "protocol": "grpc", - }, - }, - }, - }, - service: &structs.NodeService{ - ID: "foo-proxy", - Service: "foo-proxy", - Proxy: structs.ConnectProxyConfig{ - DestinationServiceName: "foo", - DestinationServiceID: "foo", - Mode: structs.ProxyModeTransparent, - TransparentProxy: structs.TransparentProxyConfig{ - OutboundListenerPort: 10101, - DialedDirectly: true, - }, - Upstreams: structs.Upstreams{ - structs.Upstream{ - DestinationNamespace: "default", - DestinationPartition: "default", - DestinationName: "zip", - LocalBindPort: 8080, - Config: map[string]interface{}{ - "protocol": "http", - }, - }, - }, - }, - }, - }, - want: &structs.NodeService{ - ID: "foo-proxy", - Service: "foo-proxy", - Proxy: structs.ConnectProxyConfig{ - DestinationServiceName: "foo", - DestinationServiceID: "foo", - Mode: structs.ProxyModeTransparent, - TransparentProxy: structs.TransparentProxyConfig{ - OutboundListenerPort: 10101, - DialedDirectly: true, - }, - Upstreams: structs.Upstreams{ - structs.Upstream{ - DestinationNamespace: "default", - DestinationPartition: "default", - DestinationName: "zip", - LocalBindPort: 8080, - Config: map[string]interface{}{ - "protocol": "http", - }, - }, - structs.Upstream{ - DestinationNamespace: "default", - DestinationPartition: "default", - DestinationName: "zap", - Config: map[string]interface{}{ - "protocol": "grpc", - }, - CentrallyConfigured: true, - }, - }, - }, - }, - }, - { - name: "remote upstream config not added to local upstream list outside of transparent mode", - args: args{ - defaults: &structs.ServiceConfigResponse{ - UpstreamIDConfigs: structs.OpaqueUpstreamConfigs{ - { - Upstream: structs.ServiceID{ - ID: "zap", - EnterpriseMeta: *structs.DefaultEnterpriseMetaInDefaultPartition(), - }, - Config: map[string]interface{}{ - "protocol": "grpc", - }, - }, - }, - }, - service: &structs.NodeService{ - ID: "foo-proxy", - Service: "foo-proxy", - Proxy: structs.ConnectProxyConfig{ - DestinationServiceName: "foo", - DestinationServiceID: "foo", - Mode: structs.ProxyModeDirect, - Upstreams: structs.Upstreams{ - structs.Upstream{ - DestinationNamespace: "default", - DestinationPartition: "default", - DestinationName: "zip", - LocalBindPort: 8080, - Config: map[string]interface{}{ - "protocol": "http", - }, - }, - }, - }, - }, - }, - want: &structs.NodeService{ - ID: "foo-proxy", - Service: "foo-proxy", - Proxy: structs.ConnectProxyConfig{ - DestinationServiceName: "foo", - DestinationServiceID: "foo", - Mode: structs.ProxyModeDirect, - Upstreams: structs.Upstreams{ - structs.Upstream{ - DestinationNamespace: "default", - DestinationPartition: "default", - DestinationName: "zip", - LocalBindPort: 8080, - Config: map[string]interface{}{ - "protocol": "http", - }, - }, - }, - }, - }, - }, - { - name: "upstream mode from remote defaults overrides local default", - args: args{ - defaults: &structs.ServiceConfigResponse{ - UpstreamIDConfigs: structs.OpaqueUpstreamConfigs{ - { - Upstream: structs.ServiceID{ - ID: "zap", - EnterpriseMeta: *structs.DefaultEnterpriseMetaInDefaultPartition(), - }, - Config: map[string]interface{}{ - "mesh_gateway": map[string]interface{}{ - "Mode": "local", - }, - }, - }, - }, - }, - service: &structs.NodeService{ - ID: "foo-proxy", - Service: "foo-proxy", - Proxy: structs.ConnectProxyConfig{ - DestinationServiceName: "foo", - DestinationServiceID: "foo", - MeshGateway: structs.MeshGatewayConfig{ - Mode: structs.MeshGatewayModeRemote, - }, - Upstreams: structs.Upstreams{ - structs.Upstream{ - DestinationNamespace: "default", - DestinationPartition: "default", - DestinationName: "zap", - }, - }, - }, - }, - }, - want: &structs.NodeService{ - ID: "foo-proxy", - Service: "foo-proxy", - Proxy: structs.ConnectProxyConfig{ - DestinationServiceName: "foo", - DestinationServiceID: "foo", - MeshGateway: structs.MeshGatewayConfig{ - Mode: structs.MeshGatewayModeRemote, - }, - Upstreams: structs.Upstreams{ - structs.Upstream{ - DestinationNamespace: "default", - DestinationPartition: "default", - DestinationName: "zap", - Config: map[string]interface{}{}, - MeshGateway: structs.MeshGatewayConfig{ - Mode: structs.MeshGatewayModeLocal, - }, - }, - }, - }, - }, - }, - { - name: "mode in local upstream config overrides all", - args: args{ - defaults: &structs.ServiceConfigResponse{ - UpstreamIDConfigs: structs.OpaqueUpstreamConfigs{ - { - Upstream: structs.ServiceID{ - ID: "zap", - EnterpriseMeta: *structs.DefaultEnterpriseMetaInDefaultPartition(), - }, - Config: map[string]interface{}{ - "mesh_gateway": map[string]interface{}{ - "Mode": "local", - }, - }, - }, - }, - }, - service: &structs.NodeService{ - ID: "foo-proxy", - Service: "foo-proxy", - Proxy: structs.ConnectProxyConfig{ - DestinationServiceName: "foo", - DestinationServiceID: "foo", - MeshGateway: structs.MeshGatewayConfig{ - Mode: structs.MeshGatewayModeRemote, - }, - Upstreams: structs.Upstreams{ - structs.Upstream{ - DestinationNamespace: "default", - DestinationPartition: "default", - DestinationName: "zap", - MeshGateway: structs.MeshGatewayConfig{ - Mode: structs.MeshGatewayModeNone, - }, - }, - }, - }, - }, - }, - want: &structs.NodeService{ - ID: "foo-proxy", - Service: "foo-proxy", - Proxy: structs.ConnectProxyConfig{ - DestinationServiceName: "foo", - DestinationServiceID: "foo", - MeshGateway: structs.MeshGatewayConfig{ - Mode: structs.MeshGatewayModeRemote, - }, - Upstreams: structs.Upstreams{ - structs.Upstream{ - DestinationNamespace: "default", - DestinationPartition: "default", - DestinationName: "zap", - Config: map[string]interface{}{}, - MeshGateway: structs.MeshGatewayConfig{ - Mode: structs.MeshGatewayModeNone, - }, - }, - }, - }, - }, - }, - } - for _, tt := range tests { - t.Run(tt.name, func(t *testing.T) { - defaultsCopy, err := copystructure.Copy(tt.args.defaults) - require.NoError(t, err) - - got, err := mergeServiceConfig(tt.args.defaults, tt.args.service) - require.NoError(t, err) - assert.Equal(t, tt.want, got) - - // The input defaults must not be modified by the merge. - // See PR #10647 - assert.Equal(t, tt.args.defaults, defaultsCopy) - }) - } -} - -func Test_mergeServiceConfig_TransparentProxy(t *testing.T) { - type args struct { - defaults *structs.ServiceConfigResponse - service *structs.NodeService - } - tests := []struct { - name string - args args - want *structs.NodeService - }{ - { - name: "inherit transparent proxy settings", - args: args{ - defaults: &structs.ServiceConfigResponse{ - Mode: structs.ProxyModeTransparent, - TransparentProxy: structs.TransparentProxyConfig{ - OutboundListenerPort: 10101, - DialedDirectly: true, - }, - }, - service: &structs.NodeService{ - ID: "foo-proxy", - Service: "foo-proxy", - Proxy: structs.ConnectProxyConfig{ - DestinationServiceName: "foo", - DestinationServiceID: "foo", - Mode: structs.ProxyModeDefault, - TransparentProxy: structs.TransparentProxyConfig{}, - }, - }, - }, - want: &structs.NodeService{ - ID: "foo-proxy", - Service: "foo-proxy", - Proxy: structs.ConnectProxyConfig{ - DestinationServiceName: "foo", - DestinationServiceID: "foo", - Mode: structs.ProxyModeTransparent, - TransparentProxy: structs.TransparentProxyConfig{ - OutboundListenerPort: 10101, - DialedDirectly: true, - }, - }, - }, - }, - { - name: "override transparent proxy settings", - args: args{ - defaults: &structs.ServiceConfigResponse{ - Mode: structs.ProxyModeTransparent, - TransparentProxy: structs.TransparentProxyConfig{ - OutboundListenerPort: 10101, - DialedDirectly: false, - }, - }, - service: &structs.NodeService{ - ID: "foo-proxy", - Service: "foo-proxy", - Proxy: structs.ConnectProxyConfig{ - DestinationServiceName: "foo", - DestinationServiceID: "foo", - Mode: structs.ProxyModeDirect, - TransparentProxy: structs.TransparentProxyConfig{ - OutboundListenerPort: 808, - DialedDirectly: true, - }, - }, - }, - }, - want: &structs.NodeService{ - ID: "foo-proxy", - Service: "foo-proxy", - Proxy: structs.ConnectProxyConfig{ - DestinationServiceName: "foo", - DestinationServiceID: "foo", - Mode: structs.ProxyModeDirect, - TransparentProxy: structs.TransparentProxyConfig{ - OutboundListenerPort: 808, - DialedDirectly: true, - }, - }, - }, - }, - } - for _, tt := range tests { - t.Run(tt.name, func(t *testing.T) { - defaultsCopy, err := copystructure.Copy(tt.args.defaults) - require.NoError(t, err) - - got, err := mergeServiceConfig(tt.args.defaults, tt.args.service) - require.NoError(t, err) - assert.Equal(t, tt.want, got) - - // The input defaults must not be modified by the merge. - // See PR #10647 - assert.Equal(t, tt.args.defaults, defaultsCopy) - }) - } -} diff --git a/agent/structs/structs.go b/agent/structs/structs.go index 6b2ab7bf1..0e5a2d91e 100644 --- a/agent/structs/structs.go +++ b/agent/structs/structs.go @@ -706,6 +706,12 @@ type ServiceSpecificRequest struct { // Ingress if true will only search for Ingress gateways for the given service. Ingress bool + // MergeCentralConfig when set to true returns a service definition merged with + // the proxy-defaults/global and service-defaults/:service config entries. + // This can be used to ensure a full service definition is returned in the response + // especially when the service might not be written into the catalog that way. + MergeCentralConfig bool + acl.EnterpriseMeta `hcl:",squash" mapstructure:",squash"` QueryOptions } @@ -752,6 +758,7 @@ func (r *ServiceSpecificRequest) CacheInfo() cache.RequestInfo { r.PeerName, r.Ingress, r.ServiceKind, + r.MergeCentralConfig, }, nil) if err == nil { // If there is an error, we don't set the key. A blank key forces @@ -1330,7 +1337,8 @@ func (t *ServiceConnect) UnmarshalJSON(data []byte) (err error) { // IsSidecarProxy returns true if the NodeService is a sidecar proxy. func (s *NodeService) IsSidecarProxy() bool { - return s.Kind == ServiceKindConnectProxy && s.Proxy.DestinationServiceID != "" + return s.Kind == ServiceKindConnectProxy && + (s.Proxy.DestinationServiceID != "" || s.Proxy.DestinationServiceName != "") } func (s *NodeService) IsGateway() bool { diff --git a/api/api.go b/api/api.go index ff3be4607..c92546b50 100644 --- a/api/api.go +++ b/api/api.go @@ -190,6 +190,12 @@ type QueryOptions struct { // Filter requests filtering data prior to it being returned. The string // is a go-bexpr compatible expression. Filter string + + // MergeCentralConfig returns a service definition merged with the + // proxy-defaults/global and service-defaults/:service config entries. + // This can be used to ensure a full service definition is returned in the response + // especially when the service might not be written into the catalog that way. + MergeCentralConfig bool } func (o *QueryOptions) Context() context.Context { @@ -858,6 +864,9 @@ func (r *request) setQueryOptions(q *QueryOptions) { r.header.Set("Cache-Control", strings.Join(cc, ", ")) } } + if q.MergeCentralConfig { + r.params.Set("merge-central-config", "") + } r.ctx = q.ctx } diff --git a/logging/names.go b/logging/names.go index 015e4a0fc..5fd904c7f 100644 --- a/logging/names.go +++ b/logging/names.go @@ -61,4 +61,5 @@ const ( Watch string = "watch" XDS string = "xds" Vault string = "vault" + Health string = "health" )