[OSS] Support merge-central-config option in node services list API (#13450)
Adds the merge-central-config query param option to the /catalog/node-services/:node-name API, to get a service definition in the response that is merged with central defaults (proxy-defaults/service-defaults). Updated the consul connect envoy command to use this option when retrieving the proxy service details so as to render the bootstrap configuration correctly.
This commit is contained in:
parent
eb9c341f5e
commit
414bb7e34e
|
@ -0,0 +1,3 @@
|
||||||
|
```release-note:enhancement
|
||||||
|
api: `merge-central-config` query parameter support added to `/catalog/node-services/:node-name` API, to view a fully resolved service definition (especially when not written into the catalog that way).
|
||||||
|
```
|
|
@ -499,6 +499,10 @@ func (s *HTTPHandlers) CatalogNodeServiceList(resp http.ResponseWriter, req *htt
|
||||||
return nil, HTTPError{StatusCode: http.StatusBadRequest, Reason: "Missing node name"}
|
return nil, HTTPError{StatusCode: http.StatusBadRequest, Reason: "Missing node name"}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if _, ok := req.URL.Query()["merge-central-config"]; ok {
|
||||||
|
args.MergeCentralConfig = true
|
||||||
|
}
|
||||||
|
|
||||||
// Make the RPC request
|
// Make the RPC request
|
||||||
var out structs.IndexedNodeServiceList
|
var out structs.IndexedNodeServiceList
|
||||||
defer setMeta(resp, &out.QueryMeta)
|
defer setMeta(resp, &out.QueryMeta)
|
||||||
|
|
|
@ -1529,6 +1529,111 @@ func TestCatalogNodeServiceList(t *testing.T) {
|
||||||
require.Equal(t, args.Service.Proxy, proxySvc.Proxy)
|
require.Equal(t, args.Service.Proxy, proxySvc.Proxy)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func TestCatalogNodeServiceList_MergeCentralConfig(t *testing.T) {
|
||||||
|
if testing.Short() {
|
||||||
|
t.Skip("too slow for testing.Short")
|
||||||
|
}
|
||||||
|
|
||||||
|
t.Parallel()
|
||||||
|
|
||||||
|
a := NewTestAgent(t, "")
|
||||||
|
defer a.Shutdown()
|
||||||
|
testrpc.WaitForLeader(t, a.RPC, "dc1")
|
||||||
|
|
||||||
|
// Register the service
|
||||||
|
registerServiceReq := registerService(t, a)
|
||||||
|
// Register proxy-defaults
|
||||||
|
proxyGlobalEntry := registerProxyDefaults(t, a)
|
||||||
|
// Register service-defaults
|
||||||
|
serviceDefaultsConfigEntry := registerServiceDefaults(t, a, registerServiceReq.Service.Proxy.DestinationServiceName)
|
||||||
|
|
||||||
|
url := fmt.Sprintf("/v1/catalog/node-services/%s?merge-central-config", registerServiceReq.Node)
|
||||||
|
req, _ := http.NewRequest("GET", url, nil)
|
||||||
|
resp := httptest.NewRecorder()
|
||||||
|
obj, err := a.srv.CatalogNodeServiceList(resp, req)
|
||||||
|
require.NoError(t, err)
|
||||||
|
assertIndex(t, resp)
|
||||||
|
|
||||||
|
nodeServices := obj.(*structs.NodeServiceList)
|
||||||
|
// validate response
|
||||||
|
require.Len(t, nodeServices.Services, 1)
|
||||||
|
validateMergeCentralConfigResponse(t, nodeServices.Services[0].ToServiceNode(nodeServices.Node.Node), registerServiceReq, proxyGlobalEntry, serviceDefaultsConfigEntry)
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestCatalogNodeServiceList_MergeCentralConfigBlocking(t *testing.T) {
|
||||||
|
if testing.Short() {
|
||||||
|
t.Skip("too slow for testing.Short")
|
||||||
|
}
|
||||||
|
|
||||||
|
t.Parallel()
|
||||||
|
|
||||||
|
a := NewTestAgent(t, "")
|
||||||
|
defer a.Shutdown()
|
||||||
|
testrpc.WaitForLeader(t, a.RPC, "dc1")
|
||||||
|
|
||||||
|
// Register the service
|
||||||
|
registerServiceReq := registerService(t, a)
|
||||||
|
// Register proxy-defaults
|
||||||
|
proxyGlobalEntry := registerProxyDefaults(t, a)
|
||||||
|
|
||||||
|
// Run the query
|
||||||
|
rpcReq := structs.NodeSpecificRequest{
|
||||||
|
Datacenter: "dc1",
|
||||||
|
Node: registerServiceReq.Node,
|
||||||
|
MergeCentralConfig: true,
|
||||||
|
}
|
||||||
|
var rpcResp structs.IndexedNodeServiceList
|
||||||
|
require.NoError(t, a.RPC("Catalog.NodeServiceList", &rpcReq, &rpcResp))
|
||||||
|
require.Len(t, rpcResp.NodeServices.Services, 1)
|
||||||
|
nodeService := rpcResp.NodeServices.Services[0]
|
||||||
|
require.Equal(t, registerServiceReq.Service.Service, nodeService.Service)
|
||||||
|
// validate proxy global defaults are resolved in the merged service config
|
||||||
|
require.Equal(t, proxyGlobalEntry.Config, nodeService.Proxy.Config)
|
||||||
|
require.Equal(t, proxyGlobalEntry.Mode, nodeService.Proxy.Mode)
|
||||||
|
|
||||||
|
// Async cause a change - register service defaults
|
||||||
|
waitIndex := rpcResp.Index
|
||||||
|
start := time.Now()
|
||||||
|
var serviceDefaultsConfigEntry structs.ServiceConfigEntry
|
||||||
|
go func() {
|
||||||
|
time.Sleep(100 * time.Millisecond)
|
||||||
|
// Register service-defaults
|
||||||
|
serviceDefaultsConfigEntry = registerServiceDefaults(t, a, registerServiceReq.Service.Proxy.DestinationServiceName)
|
||||||
|
}()
|
||||||
|
|
||||||
|
const waitDuration = 3 * time.Second
|
||||||
|
RUN_BLOCKING_QUERY:
|
||||||
|
|
||||||
|
url := fmt.Sprintf("/v1/catalog/node-services/%s?merge-central-config&wait=%s&index=%d",
|
||||||
|
registerServiceReq.Node, waitDuration.String(), waitIndex)
|
||||||
|
req, _ := http.NewRequest("GET", url, nil)
|
||||||
|
resp := httptest.NewRecorder()
|
||||||
|
obj, err := a.srv.CatalogNodeServiceList(resp, req)
|
||||||
|
require.NoError(t, err)
|
||||||
|
assertIndex(t, resp)
|
||||||
|
|
||||||
|
elapsed := time.Since(start)
|
||||||
|
idx := getIndex(t, resp)
|
||||||
|
if idx < waitIndex {
|
||||||
|
t.Fatalf("bad index returned: %v", idx)
|
||||||
|
} else if idx == waitIndex {
|
||||||
|
if elapsed > waitDuration {
|
||||||
|
// This should prevent the loop from running longer than the waitDuration
|
||||||
|
t.Fatalf("too slow: %v", elapsed)
|
||||||
|
}
|
||||||
|
goto RUN_BLOCKING_QUERY
|
||||||
|
}
|
||||||
|
// Should block at least 100ms before getting the changed results
|
||||||
|
if elapsed < 100*time.Millisecond {
|
||||||
|
t.Fatalf("too fast: %v", elapsed)
|
||||||
|
}
|
||||||
|
|
||||||
|
nodeServices := obj.(*structs.NodeServiceList)
|
||||||
|
// validate response
|
||||||
|
require.Len(t, nodeServices.Services, 1)
|
||||||
|
validateMergeCentralConfigResponse(t, nodeServices.Services[0].ToServiceNode(nodeServices.Node.Node), registerServiceReq, proxyGlobalEntry, serviceDefaultsConfigEntry)
|
||||||
|
}
|
||||||
|
|
||||||
func TestCatalogNodeServices_Filter(t *testing.T) {
|
func TestCatalogNodeServices_Filter(t *testing.T) {
|
||||||
if testing.Short() {
|
if testing.Short() {
|
||||||
t.Skip("too slow for testing.Short")
|
t.Skip("too slow for testing.Short")
|
||||||
|
|
|
@ -869,6 +869,11 @@ func (c *Catalog) NodeServiceList(args *structs.NodeSpecificRequest, reply *stru
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
|
var (
|
||||||
|
priorMergeHash uint64
|
||||||
|
ranMergeOnce bool
|
||||||
|
)
|
||||||
|
|
||||||
return c.srv.blockingQuery(
|
return c.srv.blockingQuery(
|
||||||
&args.QueryOptions,
|
&args.QueryOptions,
|
||||||
&reply.QueryMeta,
|
&reply.QueryMeta,
|
||||||
|
@ -878,10 +883,55 @@ func (c *Catalog) NodeServiceList(args *structs.NodeSpecificRequest, reply *stru
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
|
mergedServices := services
|
||||||
|
var cfgIndex uint64
|
||||||
|
if services != nil && args.MergeCentralConfig {
|
||||||
|
var mergedNodeServices []*structs.NodeService
|
||||||
|
for _, ns := range services.Services {
|
||||||
|
mergedns := ns
|
||||||
|
if ns.IsSidecarProxy() || ns.IsGateway() {
|
||||||
|
serviceSpecificReq := structs.ServiceSpecificRequest{
|
||||||
|
Datacenter: args.Datacenter,
|
||||||
|
QueryOptions: args.QueryOptions,
|
||||||
|
}
|
||||||
|
cfgIndex, mergedns, err = mergeNodeServiceWithCentralConfig(ws, state, &serviceSpecificReq, ns, c.logger)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
if cfgIndex > index {
|
||||||
|
index = cfgIndex
|
||||||
|
}
|
||||||
|
}
|
||||||
|
mergedNodeServices = append(mergedNodeServices, mergedns)
|
||||||
|
}
|
||||||
|
if len(mergedNodeServices) > 0 {
|
||||||
|
mergedServices.Services = mergedNodeServices
|
||||||
|
}
|
||||||
|
|
||||||
|
// Generate a hash of the mergedServices driving this response.
|
||||||
|
// Use it to determine if the response is identical to a prior wakeup.
|
||||||
|
newMergeHash, err := hashstructure_v2.Hash(mergedServices, hashstructure_v2.FormatV2, nil)
|
||||||
|
if err != nil {
|
||||||
|
return fmt.Errorf("error hashing reply for spurious wakeup suppression: %w", err)
|
||||||
|
}
|
||||||
|
if ranMergeOnce && priorMergeHash == newMergeHash {
|
||||||
|
// the below assignment is not required as the if condition already validates equality,
|
||||||
|
// but makes it more clear that prior value is being reset to the new hash on each run.
|
||||||
|
priorMergeHash = newMergeHash
|
||||||
|
reply.Index = index
|
||||||
|
// NOTE: the prior response is still alive inside of *reply, which is desirable
|
||||||
|
return errNotChanged
|
||||||
|
} else {
|
||||||
|
priorMergeHash = newMergeHash
|
||||||
|
ranMergeOnce = true
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
reply.Index = index
|
reply.Index = index
|
||||||
|
|
||||||
if services != nil {
|
if mergedServices != nil {
|
||||||
reply.NodeServices = *services
|
reply.NodeServices = *mergedServices
|
||||||
|
|
||||||
raw, err := filter.Execute(reply.NodeServices.Services)
|
raw, err := filter.Execute(reply.NodeServices.Services)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
|
|
@ -776,9 +776,15 @@ func (r *ServiceSpecificRequest) CacheMinIndex() uint64 {
|
||||||
|
|
||||||
// NodeSpecificRequest is used to request the information about a single node
|
// NodeSpecificRequest is used to request the information about a single node
|
||||||
type NodeSpecificRequest struct {
|
type NodeSpecificRequest struct {
|
||||||
Datacenter string
|
Datacenter string
|
||||||
Node string
|
Node string
|
||||||
PeerName string
|
PeerName string
|
||||||
|
// MergeCentralConfig when set to true returns a service definition merged with
|
||||||
|
// the proxy-defaults/global and service-defaults/:service config entries.
|
||||||
|
// This can be used to ensure a full service definition is returned in the response
|
||||||
|
// especially when the service might not be written into the catalog that way.
|
||||||
|
MergeCentralConfig bool
|
||||||
|
|
||||||
acl.EnterpriseMeta `hcl:",squash" mapstructure:",squash"`
|
acl.EnterpriseMeta `hcl:",squash" mapstructure:",squash"`
|
||||||
QueryOptions
|
QueryOptions
|
||||||
}
|
}
|
||||||
|
@ -801,6 +807,7 @@ func (r *NodeSpecificRequest) CacheInfo() cache.RequestInfo {
|
||||||
r.Node,
|
r.Node,
|
||||||
r.Filter,
|
r.Filter,
|
||||||
r.EnterpriseMeta,
|
r.EnterpriseMeta,
|
||||||
|
r.MergeCentralConfig,
|
||||||
}, nil)
|
}, nil)
|
||||||
if err == nil {
|
if err == nil {
|
||||||
// If there is an error, we don't set the key. A blank key forces
|
// If there is an error, we don't set the key. A blank key forces
|
||||||
|
|
|
@ -531,13 +531,18 @@ func (c *cmd) generateConfig() ([]byte, error) {
|
||||||
datacenter = svc.Datacenter
|
datacenter = svc.Datacenter
|
||||||
} else {
|
} else {
|
||||||
filter := fmt.Sprintf("ID == %q", c.proxyID)
|
filter := fmt.Sprintf("ID == %q", c.proxyID)
|
||||||
svcList, _, err := c.client.Catalog().NodeServiceList(c.nodeName, &api.QueryOptions{Filter: filter})
|
svcList, _, err := c.client.Catalog().NodeServiceList(c.nodeName,
|
||||||
|
&api.QueryOptions{Filter: filter, MergeCentralConfig: true})
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, fmt.Errorf("failed to fetch proxy config from catalog for node %q: %w", c.nodeName, err)
|
return nil, fmt.Errorf("failed to fetch proxy config from catalog for node %q: %w", c.nodeName, err)
|
||||||
}
|
}
|
||||||
if len(svcList.Services) != 1 {
|
if len(svcList.Services) == 0 {
|
||||||
return nil, fmt.Errorf("expected to find only one proxy service with ID: %q", c.proxyID)
|
return nil, fmt.Errorf("Proxy service with ID %q not found", c.proxyID)
|
||||||
}
|
}
|
||||||
|
if len(svcList.Services) > 1 {
|
||||||
|
return nil, fmt.Errorf("Expected to find only one proxy service with ID %q, but more were found", c.proxyID)
|
||||||
|
}
|
||||||
|
|
||||||
svcProxyConfig = svcList.Services[0].Proxy
|
svcProxyConfig = svcList.Services[0].Proxy
|
||||||
serviceName = svcList.Services[0].Service
|
serviceName = svcList.Services[0].Service
|
||||||
ns = svcList.Services[0].Namespace
|
ns = svcList.Services[0].Namespace
|
||||||
|
|
Loading…
Reference in New Issue