diff --git a/.changelog/13450.txt b/.changelog/13450.txt new file mode 100644 index 000000000..3346d6936 --- /dev/null +++ b/.changelog/13450.txt @@ -0,0 +1,3 @@ +```release-note:enhancement +api: `merge-central-config` query parameter support added to `/catalog/node-services/:node-name` API, to view a fully resolved service definition (especially when not written into the catalog that way). +``` \ No newline at end of file diff --git a/agent/catalog_endpoint.go b/agent/catalog_endpoint.go index fbee9cbfc..9623da6af 100644 --- a/agent/catalog_endpoint.go +++ b/agent/catalog_endpoint.go @@ -499,6 +499,10 @@ func (s *HTTPHandlers) CatalogNodeServiceList(resp http.ResponseWriter, req *htt return nil, HTTPError{StatusCode: http.StatusBadRequest, Reason: "Missing node name"} } + if _, ok := req.URL.Query()["merge-central-config"]; ok { + args.MergeCentralConfig = true + } + // Make the RPC request var out structs.IndexedNodeServiceList defer setMeta(resp, &out.QueryMeta) diff --git a/agent/catalog_endpoint_test.go b/agent/catalog_endpoint_test.go index eb1b509e0..d437d6e4d 100644 --- a/agent/catalog_endpoint_test.go +++ b/agent/catalog_endpoint_test.go @@ -1529,6 +1529,111 @@ func TestCatalogNodeServiceList(t *testing.T) { require.Equal(t, args.Service.Proxy, proxySvc.Proxy) } +func TestCatalogNodeServiceList_MergeCentralConfig(t *testing.T) { + if testing.Short() { + t.Skip("too slow for testing.Short") + } + + t.Parallel() + + a := NewTestAgent(t, "") + defer a.Shutdown() + testrpc.WaitForLeader(t, a.RPC, "dc1") + + // Register the service + registerServiceReq := registerService(t, a) + // Register proxy-defaults + proxyGlobalEntry := registerProxyDefaults(t, a) + // Register service-defaults + serviceDefaultsConfigEntry := registerServiceDefaults(t, a, registerServiceReq.Service.Proxy.DestinationServiceName) + + url := fmt.Sprintf("/v1/catalog/node-services/%s?merge-central-config", registerServiceReq.Node) + req, _ := http.NewRequest("GET", url, nil) + resp := httptest.NewRecorder() + obj, err := a.srv.CatalogNodeServiceList(resp, req) + require.NoError(t, err) + assertIndex(t, resp) + + nodeServices := obj.(*structs.NodeServiceList) + // validate response + require.Len(t, nodeServices.Services, 1) + validateMergeCentralConfigResponse(t, nodeServices.Services[0].ToServiceNode(nodeServices.Node.Node), registerServiceReq, proxyGlobalEntry, serviceDefaultsConfigEntry) +} + +func TestCatalogNodeServiceList_MergeCentralConfigBlocking(t *testing.T) { + if testing.Short() { + t.Skip("too slow for testing.Short") + } + + t.Parallel() + + a := NewTestAgent(t, "") + defer a.Shutdown() + testrpc.WaitForLeader(t, a.RPC, "dc1") + + // Register the service + registerServiceReq := registerService(t, a) + // Register proxy-defaults + proxyGlobalEntry := registerProxyDefaults(t, a) + + // Run the query + rpcReq := structs.NodeSpecificRequest{ + Datacenter: "dc1", + Node: registerServiceReq.Node, + MergeCentralConfig: true, + } + var rpcResp structs.IndexedNodeServiceList + require.NoError(t, a.RPC("Catalog.NodeServiceList", &rpcReq, &rpcResp)) + require.Len(t, rpcResp.NodeServices.Services, 1) + nodeService := rpcResp.NodeServices.Services[0] + require.Equal(t, registerServiceReq.Service.Service, nodeService.Service) + // validate proxy global defaults are resolved in the merged service config + require.Equal(t, proxyGlobalEntry.Config, nodeService.Proxy.Config) + require.Equal(t, proxyGlobalEntry.Mode, nodeService.Proxy.Mode) + + // Async cause a change - register service defaults + waitIndex := rpcResp.Index + start := time.Now() + var serviceDefaultsConfigEntry structs.ServiceConfigEntry + go func() { + time.Sleep(100 * time.Millisecond) + // Register service-defaults + serviceDefaultsConfigEntry = registerServiceDefaults(t, a, registerServiceReq.Service.Proxy.DestinationServiceName) + }() + + const waitDuration = 3 * time.Second +RUN_BLOCKING_QUERY: + + url := fmt.Sprintf("/v1/catalog/node-services/%s?merge-central-config&wait=%s&index=%d", + registerServiceReq.Node, waitDuration.String(), waitIndex) + req, _ := http.NewRequest("GET", url, nil) + resp := httptest.NewRecorder() + obj, err := a.srv.CatalogNodeServiceList(resp, req) + require.NoError(t, err) + assertIndex(t, resp) + + elapsed := time.Since(start) + idx := getIndex(t, resp) + if idx < waitIndex { + t.Fatalf("bad index returned: %v", idx) + } else if idx == waitIndex { + if elapsed > waitDuration { + // This should prevent the loop from running longer than the waitDuration + t.Fatalf("too slow: %v", elapsed) + } + goto RUN_BLOCKING_QUERY + } + // Should block at least 100ms before getting the changed results + if elapsed < 100*time.Millisecond { + t.Fatalf("too fast: %v", elapsed) + } + + nodeServices := obj.(*structs.NodeServiceList) + // validate response + require.Len(t, nodeServices.Services, 1) + validateMergeCentralConfigResponse(t, nodeServices.Services[0].ToServiceNode(nodeServices.Node.Node), registerServiceReq, proxyGlobalEntry, serviceDefaultsConfigEntry) +} + func TestCatalogNodeServices_Filter(t *testing.T) { if testing.Short() { t.Skip("too slow for testing.Short") diff --git a/agent/consul/catalog_endpoint.go b/agent/consul/catalog_endpoint.go index 5805d5555..fa940cc7f 100644 --- a/agent/consul/catalog_endpoint.go +++ b/agent/consul/catalog_endpoint.go @@ -869,6 +869,11 @@ func (c *Catalog) NodeServiceList(args *structs.NodeSpecificRequest, reply *stru return err } + var ( + priorMergeHash uint64 + ranMergeOnce bool + ) + return c.srv.blockingQuery( &args.QueryOptions, &reply.QueryMeta, @@ -878,10 +883,55 @@ func (c *Catalog) NodeServiceList(args *structs.NodeSpecificRequest, reply *stru return err } + mergedServices := services + var cfgIndex uint64 + if services != nil && args.MergeCentralConfig { + var mergedNodeServices []*structs.NodeService + for _, ns := range services.Services { + mergedns := ns + if ns.IsSidecarProxy() || ns.IsGateway() { + serviceSpecificReq := structs.ServiceSpecificRequest{ + Datacenter: args.Datacenter, + QueryOptions: args.QueryOptions, + } + cfgIndex, mergedns, err = mergeNodeServiceWithCentralConfig(ws, state, &serviceSpecificReq, ns, c.logger) + if err != nil { + return err + } + if cfgIndex > index { + index = cfgIndex + } + } + mergedNodeServices = append(mergedNodeServices, mergedns) + } + if len(mergedNodeServices) > 0 { + mergedServices.Services = mergedNodeServices + } + + // Generate a hash of the mergedServices driving this response. + // Use it to determine if the response is identical to a prior wakeup. + newMergeHash, err := hashstructure_v2.Hash(mergedServices, hashstructure_v2.FormatV2, nil) + if err != nil { + return fmt.Errorf("error hashing reply for spurious wakeup suppression: %w", err) + } + if ranMergeOnce && priorMergeHash == newMergeHash { + // the below assignment is not required as the if condition already validates equality, + // but makes it more clear that prior value is being reset to the new hash on each run. + priorMergeHash = newMergeHash + reply.Index = index + // NOTE: the prior response is still alive inside of *reply, which is desirable + return errNotChanged + } else { + priorMergeHash = newMergeHash + ranMergeOnce = true + } + + } + reply.Index = index - if services != nil { - reply.NodeServices = *services + if mergedServices != nil { + reply.NodeServices = *mergedServices raw, err := filter.Execute(reply.NodeServices.Services) if err != nil { diff --git a/agent/structs/structs.go b/agent/structs/structs.go index 0bccf8a41..918597b8d 100644 --- a/agent/structs/structs.go +++ b/agent/structs/structs.go @@ -776,9 +776,15 @@ func (r *ServiceSpecificRequest) CacheMinIndex() uint64 { // NodeSpecificRequest is used to request the information about a single node type NodeSpecificRequest struct { - Datacenter string - Node string - PeerName string + Datacenter string + Node string + PeerName string + // MergeCentralConfig when set to true returns a service definition merged with + // the proxy-defaults/global and service-defaults/:service config entries. + // This can be used to ensure a full service definition is returned in the response + // especially when the service might not be written into the catalog that way. + MergeCentralConfig bool + acl.EnterpriseMeta `hcl:",squash" mapstructure:",squash"` QueryOptions } @@ -801,6 +807,7 @@ func (r *NodeSpecificRequest) CacheInfo() cache.RequestInfo { r.Node, r.Filter, r.EnterpriseMeta, + r.MergeCentralConfig, }, nil) if err == nil { // If there is an error, we don't set the key. A blank key forces diff --git a/command/connect/envoy/envoy.go b/command/connect/envoy/envoy.go index 8b86aac1d..a78ae54d5 100644 --- a/command/connect/envoy/envoy.go +++ b/command/connect/envoy/envoy.go @@ -531,13 +531,18 @@ func (c *cmd) generateConfig() ([]byte, error) { datacenter = svc.Datacenter } else { filter := fmt.Sprintf("ID == %q", c.proxyID) - svcList, _, err := c.client.Catalog().NodeServiceList(c.nodeName, &api.QueryOptions{Filter: filter}) + svcList, _, err := c.client.Catalog().NodeServiceList(c.nodeName, + &api.QueryOptions{Filter: filter, MergeCentralConfig: true}) if err != nil { return nil, fmt.Errorf("failed to fetch proxy config from catalog for node %q: %w", c.nodeName, err) } - if len(svcList.Services) != 1 { - return nil, fmt.Errorf("expected to find only one proxy service with ID: %q", c.proxyID) + if len(svcList.Services) == 0 { + return nil, fmt.Errorf("Proxy service with ID %q not found", c.proxyID) } + if len(svcList.Services) > 1 { + return nil, fmt.Errorf("Expected to find only one proxy service with ID %q, but more were found", c.proxyID) + } + svcProxyConfig = svcList.Services[0].Proxy serviceName = svcList.Services[0].Service ns = svcList.Services[0].Namespace