From 940b7a98d1e8074a9841c015e9c3c802085ee9e5 Mon Sep 17 00:00:00 2001 From: freddygv Date: Mon, 15 Mar 2021 16:38:01 -0600 Subject: [PATCH] Finish cleanup from ServiceConfigRequest changes --- agent/agent.go | 4 - agent/agent_endpoint.go | 2 - .../resolved_service_config_test.go | 4 - agent/consul/config_endpoint.go | 2 +- agent/consul/config_endpoint_test.go | 263 ------------------ agent/consul/state/catalog.go | 33 --- 6 files changed, 1 insertion(+), 307 deletions(-) diff --git a/agent/agent.go b/agent/agent.go index 43ec86877..201dc7aee 100644 --- a/agent/agent.go +++ b/agent/agent.go @@ -1948,7 +1948,6 @@ type addServiceLockedRequest struct { // agent using Agent.AddService. type AddServiceRequest struct { Service *structs.NodeService - nodeName string chkTypes []*structs.CheckType persist bool token string @@ -3108,7 +3107,6 @@ func (a *Agent) loadServices(conf *config.RuntimeConfig, snap map[structs.CheckI err = a.addServiceLocked(addServiceLockedRequest{ AddServiceRequest: AddServiceRequest{ Service: ns, - nodeName: a.config.NodeName, chkTypes: chkTypes, persist: false, // don't rewrite the file with the same data we just read token: service.Token, @@ -3129,7 +3127,6 @@ func (a *Agent) loadServices(conf *config.RuntimeConfig, snap map[structs.CheckI err = a.addServiceLocked(addServiceLockedRequest{ AddServiceRequest: AddServiceRequest{ Service: sidecar, - nodeName: a.config.NodeName, chkTypes: sidecarChecks, persist: false, // don't rewrite the file with the same data we just read token: sidecarToken, @@ -3228,7 +3225,6 @@ func (a *Agent) loadServices(conf *config.RuntimeConfig, snap map[structs.CheckI err = a.addServiceLocked(addServiceLockedRequest{ AddServiceRequest: AddServiceRequest{ Service: p.Service, - nodeName: a.config.NodeName, chkTypes: nil, persist: false, // don't rewrite the file with the same data we just read token: p.Token, diff --git a/agent/agent_endpoint.go b/agent/agent_endpoint.go index 19789c543..10fe70c6d 100644 --- a/agent/agent_endpoint.go +++ b/agent/agent_endpoint.go @@ -994,7 +994,6 @@ func (s *HTTPHandlers) AgentRegisterService(resp http.ResponseWriter, req *http. addReq := AddServiceRequest{ Service: ns, - nodeName: s.agent.config.NodeName, chkTypes: chkTypes, persist: true, token: token, @@ -1008,7 +1007,6 @@ func (s *HTTPHandlers) AgentRegisterService(resp http.ResponseWriter, req *http. if sidecar != nil { addReq := AddServiceRequest{ Service: sidecar, - nodeName: s.agent.config.NodeName, chkTypes: sidecarChecks, persist: true, token: sidecarToken, diff --git a/agent/cache-types/resolved_service_config_test.go b/agent/cache-types/resolved_service_config_test.go index fd01f1bbb..34c2ddea0 100644 --- a/agent/cache-types/resolved_service_config_test.go +++ b/agent/cache-types/resolved_service_config_test.go @@ -25,8 +25,6 @@ func TestResolvedServiceConfig(t *testing.T) { require.Equal(uint64(24), req.QueryOptions.MinQueryIndex) require.Equal(1*time.Second, req.QueryOptions.MaxQueryTime) require.Equal("foo", req.Name) - require.Equal("foo-1", req.ID) - require.Equal("foo-node", req.NodeName) require.True(req.AllowStale) reply := args.Get(2).(*structs.ServiceConfigResponse) @@ -50,8 +48,6 @@ func TestResolvedServiceConfig(t *testing.T) { }, &structs.ServiceConfigRequest{ Datacenter: "dc1", Name: "foo", - ID: "foo-1", - NodeName: "foo-node", }) require.NoError(err) require.Equal(cache.FetchResult{ diff --git a/agent/consul/config_endpoint.go b/agent/consul/config_endpoint.go index 9033b9f9f..9869a93e5 100644 --- a/agent/consul/config_endpoint.go +++ b/agent/consul/config_endpoint.go @@ -333,7 +333,7 @@ func (c *ConfigEntry) ResolveServiceConfig(args *structs.ServiceConfigRequest, r // Pass the WatchSet to both the service and proxy config lookups. If either is updated during the // blocking query, this function will be rerun and these state store lookups will both be current. - // We use the default enterprise meta to look up the global proxy defaults because their are not namespaced. + // We use the default enterprise meta to look up the global proxy defaults because they are not namespaced. _, proxyEntry, err := state.ConfigEntry(ws, structs.ProxyDefaults, structs.ProxyConfigGlobal, structs.DefaultEnterpriseMeta()) if err != nil { return err diff --git a/agent/consul/config_endpoint_test.go b/agent/consul/config_endpoint_test.go index 04d7f7824..d65705957 100644 --- a/agent/consul/config_endpoint_test.go +++ b/agent/consul/config_endpoint_test.go @@ -1145,269 +1145,6 @@ func TestConfigEntry_ResolveServiceConfig_Upstreams(t *testing.T) { } } -func TestConfigEntry_ResolveServiceConfig_Upstreams_RegistrationBlocking(t *testing.T) { - if testing.Short() { - t.Skip("too slow for testing.Short") - } - t.Parallel() - - dir1, s1 := testServer(t) - defer os.RemoveAll(dir1) - defer s1.Shutdown() - - codec := rpcClient(t, s1) - defer codec.Close() - - testrpc.WaitForTestAgent(t, s1.RPC, "dc1") - - nodeName := "foo-node" - - // Create a dummy proxy/service config in the state store to look up. - state := s1.fsm.State() - require.NoError(t, state.EnsureConfigEntry(1, &structs.ProxyConfigEntry{ - Kind: structs.ProxyDefaults, - Name: structs.ProxyConfigGlobal, - Config: map[string]interface{}{ - "foo": 1, - }, - })) - require.NoError(t, state.EnsureConfigEntry(2, &structs.ServiceConfigEntry{ - Kind: structs.ServiceDefaults, - Name: "foo", - Protocol: "http", - })) - require.NoError(t, state.EnsureConfigEntry(3, &structs.ServiceConfigEntry{ - Kind: structs.ServiceDefaults, - Name: "bar", - Protocol: "grpc", - })) - require.NoError(t, state.EnsureNode(4, &structs.Node{ - ID: "9c6e733c-c39d-4555-8d41-0f174a31c489", - Node: nodeName, - })) - - args := structs.ServiceConfigRequest{ - Name: "foo", - Datacenter: s1.config.Datacenter, - Upstreams: []string{"bar", "baz"}, - } - var out structs.ServiceConfigResponse - require.NoError(t, msgpackrpc.CallWithCodec(codec, "ConfigEntry.ResolveServiceConfig", &args, &out)) - - var index uint64 - expected := structs.ServiceConfigResponse{ - ProxyConfig: map[string]interface{}{ - "foo": int64(1), - "protocol": "http", - }, - // This mesh gateway configuration is pulled from foo-proxy's registration - UpstreamConfigs: map[string]map[string]interface{}{ - "bar": { - "protocol": "grpc", - }, - }, - // Don't know what this is deterministically - QueryMeta: out.QueryMeta, - } - require.Equal(t, expected, out) - index = out.Index - - // Now setup a blocking query for 'foo' while we add the proxy registration for foo-proxy. - // Adding the foo proxy registration should cause the blocking query to fire because it is - // watched when the ID and NodeName are provided. - { - // Async cause a change - start := time.Now() - go func() { - time.Sleep(100 * time.Millisecond) - require.NoError(t, state.EnsureService(index+1, nodeName, &structs.NodeService{ - ID: "foo-proxy", - Service: "foo-proxy", - Proxy: structs.ConnectProxyConfig{ - MeshGateway: structs.MeshGatewayConfig{ - Mode: structs.MeshGatewayModeLocal, - }, - }, - })) - }() - - // Re-run the query - var out structs.ServiceConfigResponse - require.NoError(t, msgpackrpc.CallWithCodec(codec, "ConfigEntry.ResolveServiceConfig", - &structs.ServiceConfigRequest{ - Name: "foo", - Datacenter: "dc1", - Upstreams: []string{"bar", "baz"}, - QueryOptions: structs.QueryOptions{ - MinQueryIndex: index, - MaxQueryTime: time.Second, - }, - }, - &out, - )) - - // Should block at least 100ms - require.True(t, time.Since(start) >= 100*time.Millisecond, "too fast") - - // Check the indexes - require.Equal(t, out.Index, index+1) - - // The mesh gateway config from the proxy registration should no longer be present - expected := structs.ServiceConfigResponse{ - ProxyConfig: map[string]interface{}{ - "foo": int64(1), - "protocol": "http", - }, - UpstreamConfigs: map[string]map[string]interface{}{ - "bar": { - "protocol": "grpc", - "mesh_gateway": map[string]interface{}{"Mode": string(structs.MeshGatewayModeLocal)}, - }, - "baz": { - "mesh_gateway": map[string]interface{}{"Mode": string(structs.MeshGatewayModeLocal)}, - }, - }, - // Don't know what this is deterministically - QueryMeta: out.QueryMeta, - } - require.Equal(t, expected, out) - } -} - -func TestConfigEntry_ResolveServiceConfig_Upstreams_DegistrationBlocking(t *testing.T) { - if testing.Short() { - t.Skip("too slow for testing.Short") - } - t.Parallel() - - dir1, s1 := testServer(t) - defer os.RemoveAll(dir1) - defer s1.Shutdown() - - codec := rpcClient(t, s1) - defer codec.Close() - - testrpc.WaitForTestAgent(t, s1.RPC, "dc1") - - nodeName := "foo-node" - - // Create a dummy proxy/service config in the state store to look up. - state := s1.fsm.State() - require.NoError(t, state.EnsureConfigEntry(1, &structs.ProxyConfigEntry{ - Kind: structs.ProxyDefaults, - Name: structs.ProxyConfigGlobal, - Config: map[string]interface{}{ - "foo": 1, - }, - })) - require.NoError(t, state.EnsureConfigEntry(2, &structs.ServiceConfigEntry{ - Kind: structs.ServiceDefaults, - Name: "foo", - Protocol: "http", - })) - require.NoError(t, state.EnsureConfigEntry(3, &structs.ServiceConfigEntry{ - Kind: structs.ServiceDefaults, - Name: "bar", - Protocol: "grpc", - })) - require.NoError(t, state.EnsureNode(4, &structs.Node{ - ID: "9c6e733c-c39d-4555-8d41-0f174a31c489", - Node: nodeName, - })) - - registration := structs.NodeService{ - ID: "foo-proxy", - Service: "foo-proxy", - Proxy: structs.ConnectProxyConfig{ - MeshGateway: structs.MeshGatewayConfig{ - Mode: structs.MeshGatewayModeLocal, - }, - }, - } - require.NoError(t, state.EnsureService(5, nodeName, ®istration)) - - args := structs.ServiceConfigRequest{ - Name: "foo", - Datacenter: s1.config.Datacenter, - MeshGateway: registration.Proxy.MeshGateway, - Upstreams: []string{"bar", "baz"}, - } - var out structs.ServiceConfigResponse - require.NoError(t, msgpackrpc.CallWithCodec(codec, "ConfigEntry.ResolveServiceConfig", &args, &out)) - - var index uint64 - expected := structs.ServiceConfigResponse{ - ProxyConfig: map[string]interface{}{ - "foo": int64(1), - "protocol": "http", - }, - // This mesh gateway configuration is pulled from foo-proxy's registration - UpstreamConfigs: map[string]map[string]interface{}{ - "bar": { - "protocol": "grpc", - "mesh_gateway": map[string]interface{}{"Mode": string(structs.MeshGatewayModeLocal)}, - }, - "baz": { - "mesh_gateway": map[string]interface{}{"Mode": string(structs.MeshGatewayModeLocal)}, - }, - }, - // Don't know what this is deterministically - QueryMeta: out.QueryMeta, - } - require.Equal(t, expected, out) - index = out.Index - - // Now setup a blocking query for 'foo' while we erase the proxy registration for foo-proxy. - // Deleting the foo proxy registration should cause the blocking query to fire because it is - // watched when the ID and NodeName are provided. - { - // Async cause a change - start := time.Now() - go func() { - time.Sleep(100 * time.Millisecond) - require.NoError(t, state.DeleteService(index+1, nodeName, "foo-proxy", nil)) - }() - - // Re-run the query - var out structs.ServiceConfigResponse - require.NoError(t, msgpackrpc.CallWithCodec(codec, "ConfigEntry.ResolveServiceConfig", - &structs.ServiceConfigRequest{ - Name: "foo", - Datacenter: "dc1", - MeshGateway: registration.Proxy.MeshGateway, - Upstreams: []string{"bar", "baz"}, - QueryOptions: structs.QueryOptions{ - MinQueryIndex: index, - MaxQueryTime: time.Second, - }, - }, - &out, - )) - - // Should block at least 100ms - require.True(t, time.Since(start) >= 100*time.Millisecond, "too fast") - - // Check the indexes - require.Equal(t, out.Index, index+1) - - // The mesh gateway config from the proxy registration should no longer be present - expected := structs.ServiceConfigResponse{ - ProxyConfig: map[string]interface{}{ - "foo": int64(1), - "protocol": "http", - }, - UpstreamConfigs: map[string]map[string]interface{}{ - "bar": { - "protocol": "grpc", - }, - }, - // Don't know what this is deterministically - QueryMeta: out.QueryMeta, - } - require.Equal(t, expected, out) - } -} - func TestConfigEntry_ResolveServiceConfig_Blocking(t *testing.T) { if testing.Short() { t.Skip("too slow for testing.Short") diff --git a/agent/consul/state/catalog.go b/agent/consul/state/catalog.go index e525196c9..71a31d272 100644 --- a/agent/consul/state/catalog.go +++ b/agent/consul/state/catalog.go @@ -1136,24 +1136,6 @@ func (s *Store) NodeService(nodeName string, serviceID string, entMeta *structs. return idx, service, nil } -// NodeServiceWatch is used to retrieve a specific service associated with the given -// node, and add it to the watch set. -func (s *Store) NodeServiceWatch(ws memdb.WatchSet, nodeName string, serviceID string, entMeta *structs.EnterpriseMeta) (uint64, *structs.NodeService, error) { - tx := s.db.Txn(false) - defer tx.Abort() - - // Get the table index. - idx := catalogServicesMaxIndex(tx, entMeta) - - // Query the service - service, err := getNodeServiceWatchTxn(tx, ws, nodeName, serviceID, entMeta) - if err != nil { - return 0, nil, fmt.Errorf("failed querying service for node %q: %s", nodeName, err) - } - - return idx, service, nil -} - func getNodeServiceTxn(tx ReadTxn, nodeName, serviceID string, entMeta *structs.EnterpriseMeta) (*structs.NodeService, error) { // Query the service _, service, err := firstWatchCompoundWithTxn(tx, "services", "id", entMeta, nodeName, serviceID) @@ -1168,21 +1150,6 @@ func getNodeServiceTxn(tx ReadTxn, nodeName, serviceID string, entMeta *structs. return nil, nil } -func getNodeServiceWatchTxn(tx ReadTxn, ws memdb.WatchSet, nodeName, serviceID string, entMeta *structs.EnterpriseMeta) (*structs.NodeService, error) { - // Query the service - watchCh, service, err := firstWatchCompoundWithTxn(tx, "services", "id", entMeta, nodeName, serviceID) - if err != nil { - return nil, fmt.Errorf("failed querying service for node %q: %s", nodeName, err) - } - ws.Add(watchCh) - - if service != nil { - return service.(*structs.ServiceNode).ToNodeService(), nil - } - - return nil, nil -} - func (s *Store) nodeServices(ws memdb.WatchSet, nodeNameOrID string, entMeta *structs.EnterpriseMeta, allowWildcard bool) (bool, uint64, *structs.Node, memdb.ResultIterator, error) { tx := s.db.Txn(false) defer tx.Abort()