diff --git a/.changelog/15789.txt b/.changelog/15789.txt new file mode 100644 index 000000000..682b4bd1e --- /dev/null +++ b/.changelog/15789.txt @@ -0,0 +1,3 @@ +```release-note:bug +xds: fix bug where sessions for locally-managed services could fail with "this server has too many xDS streams open" +``` diff --git a/GNUmakefile b/GNUmakefile index 116581cce..deec78f43 100644 --- a/GNUmakefile +++ b/GNUmakefile @@ -8,7 +8,7 @@ SHELL = bash # or the string @DEV to imply use what is currently installed locally. ### GOLANGCI_LINT_VERSION='v1.50.1' -MOCKERY_VERSION='v2.12.2' +MOCKERY_VERSION='v2.15.0' BUF_VERSION='v1.4.0' PROTOC_GEN_GO_GRPC_VERSION="v1.2.0" MOG_VERSION='v0.3.0' diff --git a/agent/agent.go b/agent/agent.go index 984fd4e5d..3832bd93e 100644 --- a/agent/agent.go +++ b/agent/agent.go @@ -855,6 +855,7 @@ func (a *Agent) listenAndServeGRPC() error { Manager: a.proxyConfig, GetStore: func() catalogproxycfg.Store { return server.FSM().State() }, Logger: a.proxyConfig.Logger.Named("server-catalog"), + SessionLimiter: a.baseDeps.XDSStreamLimiter, }) go func() { <-a.shutdownCh @@ -870,7 +871,6 @@ func (a *Agent) listenAndServeGRPC() error { return a.delegate.ResolveTokenAndDefaultMeta(id, nil, nil) }, a, - a.baseDeps.XDSStreamLimiter, ) a.xdsServer.Register(a.externalGRPCServer) diff --git a/agent/cache-types/catalog_datacenters_test.go b/agent/cache-types/catalog_datacenters_test.go index dd78b0ac9..23b923afc 100644 --- a/agent/cache-types/catalog_datacenters_test.go +++ b/agent/cache-types/catalog_datacenters_test.go @@ -3,10 +3,11 @@ package cachetype import ( "testing" - "github.com/hashicorp/consul/agent/cache" - "github.com/hashicorp/consul/agent/structs" "github.com/stretchr/testify/mock" "github.com/stretchr/testify/require" + + "github.com/hashicorp/consul/agent/cache" + "github.com/hashicorp/consul/agent/structs" ) func TestCatalogDatacenters(t *testing.T) { @@ -18,34 +19,34 @@ func TestCatalogDatacenters(t *testing.T) { var resp *[]string var resp2 *[]string var resp3 *[]string - rpc.On("RPC", "Catalog.ListDatacenters", mock.Anything, mock.Anything).Once().Return(nil). + rpc.On("RPC", mock.Anything, "Catalog.ListDatacenters", mock.Anything, mock.Anything).Once().Return(nil). Run(func(args mock.Arguments) { - req := args.Get(1).(*structs.DatacentersRequest) + req := args.Get(2).(*structs.DatacentersRequest) require.True(t, req.AllowStale) - reply := args.Get(2).(*[]string) + reply := args.Get(3).(*[]string) *reply = []string{ "primary", "secondary", "tertiary", } resp = reply }) - rpc.On("RPC", "Catalog.ListDatacenters", mock.Anything, mock.Anything).Once().Return(nil). + rpc.On("RPC", mock.Anything, "Catalog.ListDatacenters", mock.Anything, mock.Anything).Once().Return(nil). Run(func(args mock.Arguments) { - req := args.Get(1).(*structs.DatacentersRequest) + req := args.Get(2).(*structs.DatacentersRequest) require.True(t, req.AllowStale) - reply := args.Get(2).(*[]string) + reply := args.Get(3).(*[]string) *reply = []string{ "primary", "tertiary", "secondary", } resp2 = reply }) - rpc.On("RPC", "Catalog.ListDatacenters", mock.Anything, mock.Anything).Once().Return(nil). + rpc.On("RPC", mock.Anything, "Catalog.ListDatacenters", mock.Anything, mock.Anything).Once().Return(nil). Run(func(args mock.Arguments) { - req := args.Get(1).(*structs.DatacentersRequest) + req := args.Get(2).(*structs.DatacentersRequest) require.True(t, req.AllowStale) - reply := args.Get(2).(*[]string) + reply := args.Get(3).(*[]string) *reply = []string{ "primary", "secondary", } diff --git a/agent/cache-types/catalog_list_services_test.go b/agent/cache-types/catalog_list_services_test.go index 60aa4ed81..90ea7be81 100644 --- a/agent/cache-types/catalog_list_services_test.go +++ b/agent/cache-types/catalog_list_services_test.go @@ -19,14 +19,14 @@ func TestCatalogListServices(t *testing.T) { // Expect the proper RPC call. This also sets the expected value // since that is return-by-pointer in the arguments. var resp *structs.IndexedServices - rpc.On("RPC", "Catalog.ListServices", mock.Anything, mock.Anything).Return(nil). + rpc.On("RPC", mock.Anything, "Catalog.ListServices", mock.Anything, mock.Anything).Return(nil). Run(func(args mock.Arguments) { - req := args.Get(1).(*structs.DCSpecificRequest) + req := args.Get(2).(*structs.DCSpecificRequest) require.Equal(t, uint64(24), req.QueryOptions.MinQueryIndex) require.Equal(t, 1*time.Second, req.QueryOptions.MaxQueryTime) require.True(t, req.AllowStale) - reply := args.Get(2).(*structs.IndexedServices) + reply := args.Get(3).(*structs.IndexedServices) reply.Services = map[string][]string{ "foo": {"prod", "linux"}, "bar": {"qa", "windows"}, @@ -75,14 +75,14 @@ func TestCatalogListServices_IntegrationWithCache_NotModifiedResponse(t *testing "foo": {"prod", "linux"}, "bar": {"qa", "windows"}, } - rpc.On("RPC", "Catalog.ListServices", mock.Anything, mock.Anything). + rpc.On("RPC", mock.Anything, "Catalog.ListServices", mock.Anything, mock.Anything). Return(nil). Run(func(args mock.Arguments) { - req := args.Get(1).(*structs.DCSpecificRequest) + req := args.Get(2).(*structs.DCSpecificRequest) require.True(t, req.AllowStale) require.True(t, req.AllowNotModifiedResponse) - reply := args.Get(2).(*structs.IndexedServices) + reply := args.Get(3).(*structs.IndexedServices) reply.QueryMeta.Index = 44 reply.NotModified = true }) diff --git a/agent/cache-types/catalog_service_list_test.go b/agent/cache-types/catalog_service_list_test.go index 0bd0b602c..9eef36bbc 100644 --- a/agent/cache-types/catalog_service_list_test.go +++ b/agent/cache-types/catalog_service_list_test.go @@ -4,10 +4,11 @@ import ( "testing" "time" - "github.com/hashicorp/consul/agent/cache" - "github.com/hashicorp/consul/agent/structs" "github.com/stretchr/testify/mock" "github.com/stretchr/testify/require" + + "github.com/hashicorp/consul/agent/cache" + "github.com/hashicorp/consul/agent/structs" ) func TestCatalogServiceList(t *testing.T) { @@ -17,14 +18,14 @@ func TestCatalogServiceList(t *testing.T) { // Expect the proper RPC call. This also sets the expected value // since that is return-by-pointer in the arguments. var resp *structs.IndexedServiceList - rpc.On("RPC", "Catalog.ServiceList", mock.Anything, mock.Anything).Return(nil). + rpc.On("RPC", mock.Anything, "Catalog.ServiceList", mock.Anything, mock.Anything).Return(nil). Run(func(args mock.Arguments) { - req := args.Get(1).(*structs.DCSpecificRequest) + req := args.Get(2).(*structs.DCSpecificRequest) require.Equal(t, uint64(24), req.QueryOptions.MinQueryIndex) require.Equal(t, 1*time.Second, req.QueryOptions.MaxQueryTime) require.True(t, req.AllowStale) - reply := args.Get(2).(*structs.IndexedServiceList) + reply := args.Get(3).(*structs.IndexedServiceList) reply.Services = structs.ServiceList{ structs.ServiceName{ Name: "foo", diff --git a/agent/cache-types/catalog_services_test.go b/agent/cache-types/catalog_services_test.go index c7adf2820..c28aa2a01 100644 --- a/agent/cache-types/catalog_services_test.go +++ b/agent/cache-types/catalog_services_test.go @@ -4,10 +4,11 @@ import ( "testing" "time" - "github.com/hashicorp/consul/agent/cache" - "github.com/hashicorp/consul/agent/structs" "github.com/stretchr/testify/mock" "github.com/stretchr/testify/require" + + "github.com/hashicorp/consul/agent/cache" + "github.com/hashicorp/consul/agent/structs" ) func TestCatalogServices(t *testing.T) { @@ -18,15 +19,15 @@ func TestCatalogServices(t *testing.T) { // Expect the proper RPC call. This also sets the expected value // since that is return-by-pointer in the arguments. var resp *structs.IndexedServiceNodes - rpc.On("RPC", "Catalog.ServiceNodes", mock.Anything, mock.Anything).Return(nil). + rpc.On("RPC", mock.Anything, "Catalog.ServiceNodes", mock.Anything, mock.Anything).Return(nil). Run(func(args mock.Arguments) { - req := args.Get(1).(*structs.ServiceSpecificRequest) + req := args.Get(2).(*structs.ServiceSpecificRequest) require.Equal(t, uint64(24), req.QueryOptions.MinQueryIndex) require.Equal(t, 1*time.Second, req.QueryOptions.MaxQueryTime) require.Equal(t, "web", req.ServiceName) require.True(t, req.AllowStale) - reply := args.Get(2).(*structs.IndexedServiceNodes) + reply := args.Get(3).(*structs.IndexedServiceNodes) reply.ServiceNodes = []*structs.ServiceNode{ {ServiceTags: req.ServiceTags}, } diff --git a/agent/cache-types/config_entry_test.go b/agent/cache-types/config_entry_test.go index 19522b796..50954f35c 100644 --- a/agent/cache-types/config_entry_test.go +++ b/agent/cache-types/config_entry_test.go @@ -4,10 +4,11 @@ import ( "testing" "time" - "github.com/hashicorp/consul/agent/cache" - "github.com/hashicorp/consul/agent/structs" "github.com/stretchr/testify/mock" "github.com/stretchr/testify/require" + + "github.com/hashicorp/consul/agent/cache" + "github.com/hashicorp/consul/agent/structs" ) func TestConfigEntries(t *testing.T) { @@ -17,16 +18,16 @@ func TestConfigEntries(t *testing.T) { // Expect the proper RPC call. This also sets the expected value // since that is return-by-pointer in the arguments. var resp *structs.IndexedConfigEntries - rpc.On("RPC", "ConfigEntry.List", mock.Anything, mock.Anything).Return(nil). + rpc.On("RPC", mock.Anything, "ConfigEntry.List", mock.Anything, mock.Anything).Return(nil). Run(func(args mock.Arguments) { - req := args.Get(1).(*structs.ConfigEntryQuery) + req := args.Get(2).(*structs.ConfigEntryQuery) require.Equal(t, uint64(24), req.QueryOptions.MinQueryIndex) require.Equal(t, 1*time.Second, req.QueryOptions.MaxQueryTime) require.True(t, req.AllowStale) require.Equal(t, structs.ServiceResolver, req.Kind) require.Equal(t, "", req.Name) - reply := args.Get(2).(*structs.IndexedConfigEntries) + reply := args.Get(3).(*structs.IndexedConfigEntries) reply.Kind = structs.ServiceResolver reply.Entries = []structs.ConfigEntry{ &structs.ServiceResolverConfigEntry{Kind: structs.ServiceResolver, Name: "foo"}, @@ -60,9 +61,9 @@ func TestConfigEntry(t *testing.T) { // Expect the proper RPC call. This also sets the expected value // since that is return-by-pointer in the arguments. var resp *structs.ConfigEntryResponse - rpc.On("RPC", "ConfigEntry.Get", mock.Anything, mock.Anything).Return(nil). + rpc.On("RPC", mock.Anything, "ConfigEntry.Get", mock.Anything, mock.Anything).Return(nil). Run(func(args mock.Arguments) { - req := args.Get(1).(*structs.ConfigEntryQuery) + req := args.Get(2).(*structs.ConfigEntryQuery) require.Equal(t, uint64(24), req.QueryOptions.MinQueryIndex) require.Equal(t, 1*time.Second, req.QueryOptions.MaxQueryTime) require.True(t, req.AllowStale) @@ -73,7 +74,7 @@ func TestConfigEntry(t *testing.T) { Name: "foo", Kind: structs.ServiceResolver, } - reply := args.Get(2).(*structs.ConfigEntryResponse) + reply := args.Get(3).(*structs.ConfigEntryResponse) reply.Entry = entry reply.QueryMeta.Index = 48 resp = reply diff --git a/agent/cache-types/connect_ca_leaf_test.go b/agent/cache-types/connect_ca_leaf_test.go index 1f0724065..48e5a1792 100644 --- a/agent/cache-types/connect_ca_leaf_test.go +++ b/agent/cache-types/connect_ca_leaf_test.go @@ -181,7 +181,7 @@ func TestConnectCALeaf_changingRoots(t *testing.T) { var resp *structs.IssuedCert var idx uint64 - rpc.On("RPC", "ConnectCA.Sign", mock.Anything, mock.Anything).Return(nil). + rpc.On("RPC", mock.Anything, "ConnectCA.Sign", mock.Anything, mock.Anything).Return(nil). Run(func(args mock.Arguments) { ca := caRoot cIdx := atomic.AddUint64(&idx, 1) @@ -189,7 +189,7 @@ func TestConnectCALeaf_changingRoots(t *testing.T) { // Second time round use the new CA ca = caRoot2 } - reply := args.Get(2).(*structs.IssuedCert) + reply := args.Get(3).(*structs.IssuedCert) leaf, _ := connect.TestLeaf(t, "web", ca) reply.CertPEM = leaf reply.ValidAfter = time.Now().Add(-1 * time.Hour) @@ -292,9 +292,9 @@ func TestConnectCALeaf_changingRootsJitterBetweenCalls(t *testing.T) { // Instrument ConnectCA.Sign to return signed cert var resp *structs.IssuedCert var idx uint64 - rpc.On("RPC", "ConnectCA.Sign", mock.Anything, mock.Anything).Return(nil). + rpc.On("RPC", mock.Anything, "ConnectCA.Sign", mock.Anything, mock.Anything).Return(nil). Run(func(args mock.Arguments) { - reply := args.Get(2).(*structs.IssuedCert) + reply := args.Get(3).(*structs.IssuedCert) leaf, _ := connect.TestLeaf(t, "web", caRoot) reply.CertPEM = leaf reply.ValidAfter = time.Now().Add(-1 * time.Hour) @@ -433,9 +433,9 @@ func TestConnectCALeaf_changingRootsBetweenBlockingCalls(t *testing.T) { // Instrument ConnectCA.Sign to return signed cert var resp *structs.IssuedCert var idx uint64 - rpc.On("RPC", "ConnectCA.Sign", mock.Anything, mock.Anything).Return(nil). + rpc.On("RPC", mock.Anything, "ConnectCA.Sign", mock.Anything, mock.Anything).Return(nil). Run(func(args mock.Arguments) { - reply := args.Get(2).(*structs.IssuedCert) + reply := args.Get(3).(*structs.IssuedCert) leaf, _ := connect.TestLeaf(t, "web", caRoot) reply.CertPEM = leaf reply.ValidAfter = time.Now().Add(-1 * time.Hour) @@ -548,7 +548,7 @@ func TestConnectCALeaf_CSRRateLimiting(t *testing.T) { var idx, rateLimitedRPCs uint64 genCert := func(args mock.Arguments) { - reply := args.Get(2).(*structs.IssuedCert) + reply := args.Get(3).(*structs.IssuedCert) leaf, _ := connect.TestLeaf(t, "web", caRoot) reply.CertPEM = leaf reply.ValidAfter = time.Now().Add(-1 * time.Hour) @@ -565,16 +565,16 @@ func TestConnectCALeaf_CSRRateLimiting(t *testing.T) { // First call return rate limit error. This is important as it checks // behavior when cache is empty and we have to return a nil Value but need to // save state to do the right thing for retry. - rpc.On("RPC", "ConnectCA.Sign", mock.Anything, mock.Anything). + rpc.On("RPC", mock.Anything, "ConnectCA.Sign", mock.Anything, mock.Anything). Return(consul.ErrRateLimited).Once().Run(incRateLimit) // Then succeed on second call - rpc.On("RPC", "ConnectCA.Sign", mock.Anything, mock.Anything). + rpc.On("RPC", mock.Anything, "ConnectCA.Sign", mock.Anything, mock.Anything). Return(nil).Run(genCert).Once() // Then be rate limited again on several further calls - rpc.On("RPC", "ConnectCA.Sign", mock.Anything, mock.Anything). + rpc.On("RPC", mock.Anything, "ConnectCA.Sign", mock.Anything, mock.Anything). Return(consul.ErrRateLimited).Twice().Run(incRateLimit) // Then fine after that - rpc.On("RPC", "ConnectCA.Sign", mock.Anything, mock.Anything). + rpc.On("RPC", mock.Anything, "ConnectCA.Sign", mock.Anything, mock.Anything). Return(nil).Run(genCert) opts := cache.FetchOptions{MinIndex: 0, Timeout: 10 * time.Minute} @@ -727,9 +727,9 @@ func TestConnectCALeaf_watchRootsDedupingMultipleCallers(t *testing.T) { // Instrument ConnectCA.Sign to return signed cert var idx uint64 - rpc.On("RPC", "ConnectCA.Sign", mock.Anything, mock.Anything).Return(nil). + rpc.On("RPC", mock.Anything, "ConnectCA.Sign", mock.Anything, mock.Anything).Return(nil). Run(func(args mock.Arguments) { - reply := args.Get(2).(*structs.IssuedCert) + reply := args.Get(3).(*structs.IssuedCert) // Note we will sign certs for same service name each time because // otherwise we have to re-invent whole CSR endpoint here to be able to // control things - parse PEM sign with right key etc. It doesn't matter - @@ -924,9 +924,9 @@ func TestConnectCALeaf_expiringLeaf(t *testing.T) { // Instrument ConnectCA.Sign to var resp *structs.IssuedCert var idx uint64 - rpc.On("RPC", "ConnectCA.Sign", mock.Anything, mock.Anything).Return(nil). + rpc.On("RPC", mock.Anything, "ConnectCA.Sign", mock.Anything, mock.Anything).Return(nil). Run(func(args mock.Arguments) { - reply := args.Get(2).(*structs.IssuedCert) + reply := args.Get(3).(*structs.IssuedCert) reply.CreateIndex = atomic.AddUint64(&idx, 1) reply.ModifyIndex = reply.CreateIndex @@ -1017,13 +1017,13 @@ func TestConnectCALeaf_DNSSANForService(t *testing.T) { // Instrument ConnectCA.Sign to var caReq *structs.CASignRequest - rpc.On("RPC", "ConnectCA.Sign", mock.Anything, mock.Anything).Return(nil). + rpc.On("RPC", mock.Anything, "ConnectCA.Sign", mock.Anything, mock.Anything).Return(nil). Run(func(args mock.Arguments) { - reply := args.Get(2).(*structs.IssuedCert) + reply := args.Get(3).(*structs.IssuedCert) leaf, _ := connect.TestLeaf(t, "web", caRoot) reply.CertPEM = leaf - caReq = args.Get(1).(*structs.CASignRequest) + caReq = args.Get(2).(*structs.CASignRequest) }) opts := cache.FetchOptions{MinIndex: 0, Timeout: 10 * time.Second} diff --git a/agent/cache-types/connect_ca_root_test.go b/agent/cache-types/connect_ca_root_test.go index ea1d50050..7fd23264c 100644 --- a/agent/cache-types/connect_ca_root_test.go +++ b/agent/cache-types/connect_ca_root_test.go @@ -4,10 +4,11 @@ import ( "testing" "time" - "github.com/hashicorp/consul/agent/cache" - "github.com/hashicorp/consul/agent/structs" "github.com/stretchr/testify/mock" "github.com/stretchr/testify/require" + + "github.com/hashicorp/consul/agent/cache" + "github.com/hashicorp/consul/agent/structs" ) func TestConnectCARoot(t *testing.T) { @@ -18,13 +19,13 @@ func TestConnectCARoot(t *testing.T) { // Expect the proper RPC call. This also sets the expected value // since that is return-by-pointer in the arguments. var resp *structs.IndexedCARoots - rpc.On("RPC", "ConnectCA.Roots", mock.Anything, mock.Anything).Return(nil). + rpc.On("RPC", mock.Anything, "ConnectCA.Roots", mock.Anything, mock.Anything).Return(nil). Run(func(args mock.Arguments) { - req := args.Get(1).(*structs.DCSpecificRequest) + req := args.Get(2).(*structs.DCSpecificRequest) require.Equal(t, uint64(24), req.QueryOptions.MinQueryIndex) require.Equal(t, 1*time.Second, req.QueryOptions.MaxQueryTime) - reply := args.Get(2).(*structs.IndexedCARoots) + reply := args.Get(3).(*structs.IndexedCARoots) reply.QueryMeta.Index = 48 resp = reply }) diff --git a/agent/cache-types/discovery_chain_test.go b/agent/cache-types/discovery_chain_test.go index 4242d4d4a..62fe634e8 100644 --- a/agent/cache-types/discovery_chain_test.go +++ b/agent/cache-types/discovery_chain_test.go @@ -4,11 +4,12 @@ import ( "testing" "time" + "github.com/stretchr/testify/mock" + "github.com/stretchr/testify/require" + "github.com/hashicorp/consul/agent/cache" "github.com/hashicorp/consul/agent/consul/discoverychain" "github.com/hashicorp/consul/agent/structs" - "github.com/stretchr/testify/mock" - "github.com/stretchr/testify/require" ) func TestCompiledDiscoveryChain(t *testing.T) { @@ -21,14 +22,14 @@ func TestCompiledDiscoveryChain(t *testing.T) { // Expect the proper RPC call. This also sets the expected value // since that is return-by-pointer in the arguments. var resp *structs.DiscoveryChainResponse - rpc.On("RPC", "DiscoveryChain.Get", mock.Anything, mock.Anything).Return(nil). + rpc.On("RPC", mock.Anything, "DiscoveryChain.Get", mock.Anything, mock.Anything).Return(nil). Run(func(args mock.Arguments) { - req := args.Get(1).(*structs.DiscoveryChainRequest) + req := args.Get(2).(*structs.DiscoveryChainRequest) require.Equal(t, uint64(24), req.QueryOptions.MinQueryIndex) require.Equal(t, 1*time.Second, req.QueryOptions.MaxQueryTime) require.True(t, req.AllowStale) - reply := args.Get(2).(*structs.DiscoveryChainResponse) + reply := args.Get(3).(*structs.DiscoveryChainResponse) reply.Chain = chain reply.QueryMeta.Index = 48 resp = reply diff --git a/agent/cache-types/exported_peered_services_test.go b/agent/cache-types/exported_peered_services_test.go index 74b0cd7b3..a946c3296 100644 --- a/agent/cache-types/exported_peered_services_test.go +++ b/agent/cache-types/exported_peered_services_test.go @@ -18,14 +18,14 @@ func TestExportedPeeredServices(t *testing.T) { // Expect the proper RPC call. This also sets the expected value // since that is return-by-pointer in the arguments. var resp *structs.IndexedExportedServiceList - rpc.On("RPC", "Internal.ExportedPeeredServices", mock.Anything, mock.Anything).Return(nil). + rpc.On("RPC", mock.Anything, "Internal.ExportedPeeredServices", mock.Anything, mock.Anything).Return(nil). Run(func(args mock.Arguments) { - req := args.Get(1).(*structs.DCSpecificRequest) + req := args.Get(2).(*structs.DCSpecificRequest) require.Equal(t, uint64(24), req.QueryOptions.MinQueryIndex) require.Equal(t, 1*time.Second, req.QueryOptions.MaxQueryTime) require.True(t, req.AllowStale) - reply := args.Get(2).(*structs.IndexedExportedServiceList) + reply := args.Get(3).(*structs.IndexedExportedServiceList) reply.Services = map[string]structs.ServiceList{ "my-peer": { structs.ServiceName{ diff --git a/agent/cache-types/federation_state_list_gateways_test.go b/agent/cache-types/federation_state_list_gateways_test.go index 56477b8b5..3be25f69a 100644 --- a/agent/cache-types/federation_state_list_gateways_test.go +++ b/agent/cache-types/federation_state_list_gateways_test.go @@ -4,11 +4,12 @@ import ( "testing" "time" + "github.com/stretchr/testify/mock" + "github.com/stretchr/testify/require" + "github.com/hashicorp/consul/agent/cache" "github.com/hashicorp/consul/agent/structs" "github.com/hashicorp/consul/api" - "github.com/stretchr/testify/mock" - "github.com/stretchr/testify/require" ) func TestFederationStateListMeshGateways(t *testing.T) { @@ -18,14 +19,14 @@ func TestFederationStateListMeshGateways(t *testing.T) { // Expect the proper RPC call. This also sets the expected value // since that is return-by-pointer in the arguments. var resp *structs.DatacenterIndexedCheckServiceNodes - rpc.On("RPC", "FederationState.ListMeshGateways", mock.Anything, mock.Anything).Return(nil). + rpc.On("RPC", mock.Anything, "FederationState.ListMeshGateways", mock.Anything, mock.Anything).Return(nil). Run(func(args mock.Arguments) { - req := args.Get(1).(*structs.DCSpecificRequest) + req := args.Get(2).(*structs.DCSpecificRequest) require.Equal(t, uint64(24), req.QueryOptions.MinQueryIndex) require.Equal(t, 1*time.Second, req.QueryOptions.MaxQueryTime) require.True(t, req.AllowStale) - reply := args.Get(2).(*structs.DatacenterIndexedCheckServiceNodes) + reply := args.Get(3).(*structs.DatacenterIndexedCheckServiceNodes) reply.DatacenterNodes = map[string]structs.CheckServiceNodes{ "dc9": []structs.CheckServiceNode{ { diff --git a/agent/cache-types/gateway_services_test.go b/agent/cache-types/gateway_services_test.go index 98faddec0..e3c399cd2 100644 --- a/agent/cache-types/gateway_services_test.go +++ b/agent/cache-types/gateway_services_test.go @@ -4,10 +4,11 @@ import ( "testing" "time" - "github.com/hashicorp/consul/agent/cache" - "github.com/hashicorp/consul/agent/structs" "github.com/stretchr/testify/mock" "github.com/stretchr/testify/require" + + "github.com/hashicorp/consul/agent/cache" + "github.com/hashicorp/consul/agent/structs" ) func TestGatewayServices(t *testing.T) { @@ -17,9 +18,9 @@ func TestGatewayServices(t *testing.T) { // Expect the proper RPC call. This also sets the expected value // since that is return-by-pointer in the arguments. var resp *structs.IndexedGatewayServices - rpc.On("RPC", "Catalog.GatewayServices", mock.Anything, mock.Anything).Return(nil). + rpc.On("RPC", mock.Anything, "Catalog.GatewayServices", mock.Anything, mock.Anything).Return(nil). Run(func(args mock.Arguments) { - req := args.Get(1).(*structs.ServiceSpecificRequest) + req := args.Get(2).(*structs.ServiceSpecificRequest) require.Equal(t, uint64(24), req.QueryOptions.MinQueryIndex) require.Equal(t, 1*time.Second, req.QueryOptions.MaxQueryTime) require.True(t, req.AllowStale) @@ -37,7 +38,7 @@ func TestGatewayServices(t *testing.T) { SNI: "my-domain", }, } - reply := args.Get(2).(*structs.IndexedGatewayServices) + reply := args.Get(3).(*structs.IndexedGatewayServices) reply.Services = services reply.QueryMeta.Index = 48 resp = reply diff --git a/agent/cache-types/health_services_test.go b/agent/cache-types/health_services_test.go index 6aa900b3f..d1524ca2c 100644 --- a/agent/cache-types/health_services_test.go +++ b/agent/cache-types/health_services_test.go @@ -4,10 +4,11 @@ import ( "testing" "time" - "github.com/hashicorp/consul/agent/cache" - "github.com/hashicorp/consul/agent/structs" "github.com/stretchr/testify/mock" "github.com/stretchr/testify/require" + + "github.com/hashicorp/consul/agent/cache" + "github.com/hashicorp/consul/agent/structs" ) func TestHealthServices(t *testing.T) { @@ -18,15 +19,15 @@ func TestHealthServices(t *testing.T) { // Expect the proper RPC call. This also sets the expected value // since that is return-by-pointer in the arguments. var resp *structs.IndexedCheckServiceNodes - rpc.On("RPC", "Health.ServiceNodes", mock.Anything, mock.Anything).Return(nil). + rpc.On("RPC", mock.Anything, "Health.ServiceNodes", mock.Anything, mock.Anything).Return(nil). Run(func(args mock.Arguments) { - req := args.Get(1).(*structs.ServiceSpecificRequest) + req := args.Get(2).(*structs.ServiceSpecificRequest) require.Equal(t, uint64(24), req.QueryOptions.MinQueryIndex) require.Equal(t, 1*time.Second, req.QueryOptions.MaxQueryTime) require.Equal(t, "web", req.ServiceName) require.True(t, req.AllowStale) - reply := args.Get(2).(*structs.IndexedCheckServiceNodes) + reply := args.Get(3).(*structs.IndexedCheckServiceNodes) reply.Nodes = []structs.CheckServiceNode{ {Service: &structs.NodeService{Tags: req.ServiceTags}}, } diff --git a/agent/cache-types/intention_match_test.go b/agent/cache-types/intention_match_test.go index 869792cff..ec313013f 100644 --- a/agent/cache-types/intention_match_test.go +++ b/agent/cache-types/intention_match_test.go @@ -4,10 +4,11 @@ import ( "testing" "time" - "github.com/hashicorp/consul/agent/cache" - "github.com/hashicorp/consul/agent/structs" "github.com/stretchr/testify/mock" "github.com/stretchr/testify/require" + + "github.com/hashicorp/consul/agent/cache" + "github.com/hashicorp/consul/agent/structs" ) func TestIntentionMatch(t *testing.T) { @@ -18,13 +19,13 @@ func TestIntentionMatch(t *testing.T) { // Expect the proper RPC call. This also sets the expected value // since that is return-by-pointer in the arguments. var resp *structs.IndexedIntentionMatches - rpc.On("RPC", "Intention.Match", mock.Anything, mock.Anything).Return(nil). + rpc.On("RPC", mock.Anything, "Intention.Match", mock.Anything, mock.Anything).Return(nil). Run(func(args mock.Arguments) { - req := args.Get(1).(*structs.IntentionQueryRequest) + req := args.Get(2).(*structs.IntentionQueryRequest) require.Equal(t, uint64(24), req.MinQueryIndex) require.Equal(t, 1*time.Second, req.MaxQueryTime) - reply := args.Get(2).(*structs.IndexedIntentionMatches) + reply := args.Get(3).(*structs.IndexedIntentionMatches) reply.Index = 48 resp = reply }) diff --git a/agent/cache-types/intention_upstreams_destination_test.go b/agent/cache-types/intention_upstreams_destination_test.go index 7aa2d02ef..660cdb6b4 100644 --- a/agent/cache-types/intention_upstreams_destination_test.go +++ b/agent/cache-types/intention_upstreams_destination_test.go @@ -4,10 +4,11 @@ import ( "testing" "time" - "github.com/hashicorp/consul/agent/cache" - "github.com/hashicorp/consul/agent/structs" "github.com/stretchr/testify/mock" "github.com/stretchr/testify/require" + + "github.com/hashicorp/consul/agent/cache" + "github.com/hashicorp/consul/agent/structs" ) func TestIntentionUpstreamsDestination(t *testing.T) { @@ -17,9 +18,9 @@ func TestIntentionUpstreamsDestination(t *testing.T) { // Expect the proper RPC call. This also sets the expected value // since that is return-by-pointer in the arguments. var resp *structs.IndexedServiceList - rpc.On("RPC", "Internal.IntentionUpstreamsDestination", mock.Anything, mock.Anything).Return(nil). + rpc.On("RPC", mock.Anything, "Internal.IntentionUpstreamsDestination", mock.Anything, mock.Anything).Return(nil). Run(func(args mock.Arguments) { - req := args.Get(1).(*structs.ServiceSpecificRequest) + req := args.Get(2).(*structs.ServiceSpecificRequest) require.Equal(t, uint64(24), req.QueryOptions.MinQueryIndex) require.Equal(t, 1*time.Second, req.QueryOptions.MaxQueryTime) require.True(t, req.AllowStale) @@ -28,7 +29,7 @@ func TestIntentionUpstreamsDestination(t *testing.T) { services := structs.ServiceList{ {Name: "foo"}, } - reply := args.Get(2).(*structs.IndexedServiceList) + reply := args.Get(3).(*structs.IndexedServiceList) reply.Services = services reply.QueryMeta.Index = 48 resp = reply diff --git a/agent/cache-types/intention_upstreams_test.go b/agent/cache-types/intention_upstreams_test.go index eb2169388..a6f2e3aa0 100644 --- a/agent/cache-types/intention_upstreams_test.go +++ b/agent/cache-types/intention_upstreams_test.go @@ -4,10 +4,11 @@ import ( "testing" "time" - "github.com/hashicorp/consul/agent/cache" - "github.com/hashicorp/consul/agent/structs" "github.com/stretchr/testify/mock" "github.com/stretchr/testify/require" + + "github.com/hashicorp/consul/agent/cache" + "github.com/hashicorp/consul/agent/structs" ) func TestIntentionUpstreams(t *testing.T) { @@ -17,9 +18,9 @@ func TestIntentionUpstreams(t *testing.T) { // Expect the proper RPC call. This also sets the expected value // since that is return-by-pointer in the arguments. var resp *structs.IndexedServiceList - rpc.On("RPC", "Internal.IntentionUpstreams", mock.Anything, mock.Anything).Return(nil). + rpc.On("RPC", mock.Anything, "Internal.IntentionUpstreams", mock.Anything, mock.Anything).Return(nil). Run(func(args mock.Arguments) { - req := args.Get(1).(*structs.ServiceSpecificRequest) + req := args.Get(2).(*structs.ServiceSpecificRequest) require.Equal(t, uint64(24), req.QueryOptions.MinQueryIndex) require.Equal(t, 1*time.Second, req.QueryOptions.MaxQueryTime) require.True(t, req.AllowStale) @@ -28,7 +29,7 @@ func TestIntentionUpstreams(t *testing.T) { services := structs.ServiceList{ {Name: "foo"}, } - reply := args.Get(2).(*structs.IndexedServiceList) + reply := args.Get(3).(*structs.IndexedServiceList) reply.Services = services reply.QueryMeta.Index = 48 resp = reply diff --git a/agent/cache-types/mock_PeeringLister_test.go b/agent/cache-types/mock_PeeringLister_test.go index f3cb48c24..c9141e28a 100644 --- a/agent/cache-types/mock_PeeringLister_test.go +++ b/agent/cache-types/mock_PeeringLister_test.go @@ -1,4 +1,4 @@ -// Code generated by mockery v2.14.0. DO NOT EDIT. +// Code generated by mockery v2.15.0. DO NOT EDIT. package cachetype diff --git a/agent/cache-types/mock_RPC.go b/agent/cache-types/mock_RPC.go index a4289afa3..ea96d28e2 100644 --- a/agent/cache-types/mock_RPC.go +++ b/agent/cache-types/mock_RPC.go @@ -1,10 +1,9 @@ -// Code generated by mockery v2.12.2. DO NOT EDIT. +// Code generated by mockery v2.15.0. DO NOT EDIT. package cachetype import ( - "context" - testing "testing" + context "context" mock "github.com/stretchr/testify/mock" ) @@ -16,11 +15,11 @@ type MockRPC struct { // RPC provides a mock function with given fields: ctx, method, args, reply func (_m *MockRPC) RPC(ctx context.Context, method string, args interface{}, reply interface{}) error { - ret := _m.Called(method, args, reply) + ret := _m.Called(ctx, method, args, reply) var r0 error - if rf, ok := ret.Get(0).(func(string, interface{}, interface{}) error); ok { - r0 = rf(method, args, reply) + if rf, ok := ret.Get(0).(func(context.Context, string, interface{}, interface{}) error); ok { + r0 = rf(ctx, method, args, reply) } else { r0 = ret.Error(0) } @@ -28,8 +27,13 @@ func (_m *MockRPC) RPC(ctx context.Context, method string, args interface{}, rep return r0 } -// NewMockRPC creates a new instance of MockRPC. It also registers the testing.TB interface on the mock and a cleanup function to assert the mocks expectations. -func NewMockRPC(t testing.TB) *MockRPC { +type mockConstructorTestingTNewMockRPC interface { + mock.TestingT + Cleanup(func()) +} + +// NewMockRPC creates a new instance of MockRPC. It also registers a testing interface on the mock and a cleanup function to assert the mocks expectations. +func NewMockRPC(t mockConstructorTestingTNewMockRPC) *MockRPC { mock := &MockRPC{} mock.Mock.Test(t) diff --git a/agent/cache-types/mock_TrustBundleLister_test.go b/agent/cache-types/mock_TrustBundleLister_test.go index aeb4ac88f..4fde13129 100644 --- a/agent/cache-types/mock_TrustBundleLister_test.go +++ b/agent/cache-types/mock_TrustBundleLister_test.go @@ -1,4 +1,4 @@ -// Code generated by mockery v2.12.2. DO NOT EDIT. +// Code generated by mockery v2.15.0. DO NOT EDIT. package cachetype @@ -10,8 +10,6 @@ import ( mock "github.com/stretchr/testify/mock" pbpeering "github.com/hashicorp/consul/proto/pbpeering" - - testing "testing" ) // MockTrustBundleLister is an autogenerated mock type for the TrustBundleLister type @@ -49,8 +47,13 @@ func (_m *MockTrustBundleLister) TrustBundleListByService(ctx context.Context, i return r0, r1 } -// NewMockTrustBundleLister creates a new instance of MockTrustBundleLister. It also registers the testing.TB interface on the mock and a cleanup function to assert the mocks expectations. -func NewMockTrustBundleLister(t testing.TB) *MockTrustBundleLister { +type mockConstructorTestingTNewMockTrustBundleLister interface { + mock.TestingT + Cleanup(func()) +} + +// NewMockTrustBundleLister creates a new instance of MockTrustBundleLister. It also registers a testing interface on the mock and a cleanup function to assert the mocks expectations. +func NewMockTrustBundleLister(t mockConstructorTestingTNewMockTrustBundleLister) *MockTrustBundleLister { mock := &MockTrustBundleLister{} mock.Mock.Test(t) diff --git a/agent/cache-types/mock_TrustBundleReader_test.go b/agent/cache-types/mock_TrustBundleReader_test.go index 7ea636b3d..119f246fb 100644 --- a/agent/cache-types/mock_TrustBundleReader_test.go +++ b/agent/cache-types/mock_TrustBundleReader_test.go @@ -1,4 +1,4 @@ -// Code generated by mockery v2.12.2. DO NOT EDIT. +// Code generated by mockery v2.15.0. DO NOT EDIT. package cachetype @@ -10,8 +10,6 @@ import ( mock "github.com/stretchr/testify/mock" pbpeering "github.com/hashicorp/consul/proto/pbpeering" - - testing "testing" ) // MockTrustBundleReader is an autogenerated mock type for the TrustBundleReader type @@ -49,8 +47,13 @@ func (_m *MockTrustBundleReader) TrustBundleRead(ctx context.Context, in *pbpeer return r0, r1 } -// NewMockTrustBundleReader creates a new instance of MockTrustBundleReader. It also registers the testing.TB interface on the mock and a cleanup function to assert the mocks expectations. -func NewMockTrustBundleReader(t testing.TB) *MockTrustBundleReader { +type mockConstructorTestingTNewMockTrustBundleReader interface { + mock.TestingT + Cleanup(func()) +} + +// NewMockTrustBundleReader creates a new instance of MockTrustBundleReader. It also registers a testing interface on the mock and a cleanup function to assert the mocks expectations. +func NewMockTrustBundleReader(t mockConstructorTestingTNewMockTrustBundleReader) *MockTrustBundleReader { mock := &MockTrustBundleReader{} mock.Mock.Test(t) diff --git a/agent/cache-types/node_services_test.go b/agent/cache-types/node_services_test.go index fa600d012..049ac3c24 100644 --- a/agent/cache-types/node_services_test.go +++ b/agent/cache-types/node_services_test.go @@ -4,10 +4,11 @@ import ( "testing" "time" - "github.com/hashicorp/consul/agent/cache" - "github.com/hashicorp/consul/agent/structs" "github.com/stretchr/testify/mock" "github.com/stretchr/testify/require" + + "github.com/hashicorp/consul/agent/cache" + "github.com/hashicorp/consul/agent/structs" ) func TestNodeServices(t *testing.T) { @@ -18,15 +19,15 @@ func TestNodeServices(t *testing.T) { // Expect the proper RPC call. This also sets the expected value // since that is return-by-pointer in the arguments. var resp *structs.IndexedNodeServices - rpc.On("RPC", "Catalog.NodeServices", mock.Anything, mock.Anything).Return(nil). + rpc.On("RPC", mock.Anything, "Catalog.NodeServices", mock.Anything, mock.Anything).Return(nil). Run(func(args mock.Arguments) { - req := args.Get(1).(*structs.NodeSpecificRequest) + req := args.Get(2).(*structs.NodeSpecificRequest) require.Equal(t, uint64(24), req.QueryOptions.MinQueryIndex) require.Equal(t, 1*time.Second, req.QueryOptions.MaxQueryTime) require.Equal(t, "node-01", req.Node) require.True(t, req.AllowStale) - reply := args.Get(2).(*structs.IndexedNodeServices) + reply := args.Get(3).(*structs.IndexedNodeServices) reply.NodeServices = &structs.NodeServices{ Node: &structs.Node{ ID: "abcdef", diff --git a/agent/cache-types/peered_upstreams_test.go b/agent/cache-types/peered_upstreams_test.go index 31312f4fa..7f8481ffc 100644 --- a/agent/cache-types/peered_upstreams_test.go +++ b/agent/cache-types/peered_upstreams_test.go @@ -20,14 +20,14 @@ func TestPeeredUpstreams(t *testing.T) { // Expect the proper RPC call. This also sets the expected value // since that is return-by-pointer in the arguments. var resp *structs.IndexedPeeredServiceList - rpc.On("RPC", "Internal.PeeredUpstreams", mock.Anything, mock.Anything).Return(nil). + rpc.On("RPC", mock.Anything, "Internal.PeeredUpstreams", mock.Anything, mock.Anything).Return(nil). Run(func(args mock.Arguments) { - req := args.Get(1).(*structs.PartitionSpecificRequest) + req := args.Get(2).(*structs.PartitionSpecificRequest) require.Equal(t, uint64(24), req.MinQueryIndex) require.Equal(t, 1*time.Second, req.QueryOptions.MaxQueryTime) require.True(t, req.AllowStale) - reply := args.Get(2).(*structs.IndexedPeeredServiceList) + reply := args.Get(3).(*structs.IndexedPeeredServiceList) reply.Index = 48 resp = reply }) diff --git a/agent/cache-types/prepared_query_test.go b/agent/cache-types/prepared_query_test.go index 870bdbd52..c45386bd7 100644 --- a/agent/cache-types/prepared_query_test.go +++ b/agent/cache-types/prepared_query_test.go @@ -3,10 +3,11 @@ package cachetype import ( "testing" - "github.com/hashicorp/consul/agent/cache" - "github.com/hashicorp/consul/agent/structs" "github.com/stretchr/testify/mock" "github.com/stretchr/testify/require" + + "github.com/hashicorp/consul/agent/cache" + "github.com/hashicorp/consul/agent/structs" ) func TestPreparedQuery(t *testing.T) { @@ -17,14 +18,14 @@ func TestPreparedQuery(t *testing.T) { // Expect the proper RPC call. This also sets the expected value // since that is return-by-pointer in the arguments. var resp *structs.PreparedQueryExecuteResponse - rpc.On("RPC", "PreparedQuery.Execute", mock.Anything, mock.Anything).Return(nil). + rpc.On("RPC", mock.Anything, "PreparedQuery.Execute", mock.Anything, mock.Anything).Return(nil). Run(func(args mock.Arguments) { - req := args.Get(1).(*structs.PreparedQueryExecuteRequest) + req := args.Get(2).(*structs.PreparedQueryExecuteRequest) require.Equal(t, "geo-db", req.QueryIDOrName) require.Equal(t, 10, req.Limit) require.True(t, req.AllowStale) - reply := args.Get(2).(*structs.PreparedQueryExecuteResponse) + reply := args.Get(3).(*structs.PreparedQueryExecuteResponse) reply.QueryMeta.Index = 48 resp = reply }) diff --git a/agent/cache-types/resolved_service_config_test.go b/agent/cache-types/resolved_service_config_test.go index 0f99aa9c0..e6750a819 100644 --- a/agent/cache-types/resolved_service_config_test.go +++ b/agent/cache-types/resolved_service_config_test.go @@ -4,10 +4,11 @@ import ( "testing" "time" - "github.com/hashicorp/consul/agent/cache" - "github.com/hashicorp/consul/agent/structs" "github.com/stretchr/testify/mock" "github.com/stretchr/testify/require" + + "github.com/hashicorp/consul/agent/cache" + "github.com/hashicorp/consul/agent/structs" ) func TestResolvedServiceConfig(t *testing.T) { @@ -18,15 +19,15 @@ func TestResolvedServiceConfig(t *testing.T) { // Expect the proper RPC call. This also sets the expected value // since that is return-by-pointer in the arguments. var resp *structs.ServiceConfigResponse - rpc.On("RPC", "ConfigEntry.ResolveServiceConfig", mock.Anything, mock.Anything).Return(nil). + rpc.On("RPC", mock.Anything, "ConfigEntry.ResolveServiceConfig", mock.Anything, mock.Anything).Return(nil). Run(func(args mock.Arguments) { - req := args.Get(1).(*structs.ServiceConfigRequest) + req := args.Get(2).(*structs.ServiceConfigRequest) require.Equal(t, uint64(24), req.QueryOptions.MinQueryIndex) require.Equal(t, 1*time.Second, req.QueryOptions.MaxQueryTime) require.Equal(t, "foo", req.Name) require.True(t, req.AllowStale) - reply := args.Get(2).(*structs.ServiceConfigResponse) + reply := args.Get(3).(*structs.ServiceConfigResponse) reply.ProxyConfig = map[string]interface{}{ "protocol": "http", } diff --git a/agent/cache-types/service_dump_test.go b/agent/cache-types/service_dump_test.go index 0f49c965f..b261a5046 100644 --- a/agent/cache-types/service_dump_test.go +++ b/agent/cache-types/service_dump_test.go @@ -4,10 +4,11 @@ import ( "testing" "time" - "github.com/hashicorp/consul/agent/cache" - "github.com/hashicorp/consul/agent/structs" "github.com/stretchr/testify/mock" "github.com/stretchr/testify/require" + + "github.com/hashicorp/consul/agent/cache" + "github.com/hashicorp/consul/agent/structs" ) func TestInternalServiceDump(t *testing.T) { @@ -17,14 +18,14 @@ func TestInternalServiceDump(t *testing.T) { // Expect the proper RPC call. This also sets the expected value // since that is return-by-pointer in the arguments. var resp *structs.IndexedNodesWithGateways - rpc.On("RPC", "Internal.ServiceDump", mock.Anything, mock.Anything).Return(nil). + rpc.On("RPC", mock.Anything, "Internal.ServiceDump", mock.Anything, mock.Anything).Return(nil). Run(func(args mock.Arguments) { - req := args.Get(1).(*structs.ServiceDumpRequest) + req := args.Get(2).(*structs.ServiceDumpRequest) require.Equal(t, uint64(24), req.QueryOptions.MinQueryIndex) require.Equal(t, 1*time.Second, req.QueryOptions.MaxQueryTime) require.True(t, req.AllowStale) - reply := args.Get(2).(*structs.IndexedNodesWithGateways) + reply := args.Get(3).(*structs.IndexedNodesWithGateways) reply.Nodes = []structs.CheckServiceNode{ {Service: &structs.NodeService{Kind: req.ServiceKind, Service: "foo"}}, } diff --git a/agent/cache-types/service_gateways_test.go b/agent/cache-types/service_gateways_test.go index 39c6b474d..27333d68d 100644 --- a/agent/cache-types/service_gateways_test.go +++ b/agent/cache-types/service_gateways_test.go @@ -4,10 +4,11 @@ import ( "testing" "time" - "github.com/hashicorp/consul/agent/cache" - "github.com/hashicorp/consul/agent/structs" "github.com/stretchr/testify/mock" "github.com/stretchr/testify/require" + + "github.com/hashicorp/consul/agent/cache" + "github.com/hashicorp/consul/agent/structs" ) func TestServiceGateways(t *testing.T) { @@ -17,9 +18,9 @@ func TestServiceGateways(t *testing.T) { // Expect the proper RPC call. This also sets the expected value // since that is return-by-pointer in the arguments. var resp *structs.IndexedCheckServiceNodes - rpc.On("RPC", "Internal.ServiceGateways", mock.Anything, mock.Anything).Return(nil). + rpc.On("RPC", mock.Anything, "Internal.ServiceGateways", mock.Anything, mock.Anything).Return(nil). Run(func(args mock.Arguments) { - req := args.Get(1).(*structs.ServiceSpecificRequest) + req := args.Get(2).(*structs.ServiceSpecificRequest) require.Equal(t, uint64(24), req.QueryOptions.MinQueryIndex) require.Equal(t, 1*time.Second, req.QueryOptions.MaxQueryTime) require.True(t, req.AllowStale) @@ -33,7 +34,7 @@ func TestServiceGateways(t *testing.T) { }, } - reply := args.Get(2).(*structs.IndexedCheckServiceNodes) + reply := args.Get(3).(*structs.IndexedCheckServiceNodes) reply.Nodes = nodes reply.QueryMeta.Index = 48 resp = reply diff --git a/agent/cache/mock_Request.go b/agent/cache/mock_Request.go index dd585c57e..30fda5646 100644 --- a/agent/cache/mock_Request.go +++ b/agent/cache/mock_Request.go @@ -1,12 +1,8 @@ -// Code generated by mockery v2.12.2. DO NOT EDIT. +// Code generated by mockery v2.15.0. DO NOT EDIT. package cache -import ( - testing "testing" - - mock "github.com/stretchr/testify/mock" -) +import mock "github.com/stretchr/testify/mock" // MockRequest is an autogenerated mock type for the Request type type MockRequest struct { @@ -27,8 +23,13 @@ func (_m *MockRequest) CacheInfo() RequestInfo { return r0 } -// NewMockRequest creates a new instance of MockRequest. It also registers the testing.TB interface on the mock and a cleanup function to assert the mocks expectations. -func NewMockRequest(t testing.TB) *MockRequest { +type mockConstructorTestingTNewMockRequest interface { + mock.TestingT + Cleanup(func()) +} + +// NewMockRequest creates a new instance of MockRequest. It also registers a testing interface on the mock and a cleanup function to assert the mocks expectations. +func NewMockRequest(t mockConstructorTestingTNewMockRequest) *MockRequest { mock := &MockRequest{} mock.Mock.Test(t) diff --git a/agent/cache/mock_Type.go b/agent/cache/mock_Type.go index 76642bb5c..9f1a32422 100644 --- a/agent/cache/mock_Type.go +++ b/agent/cache/mock_Type.go @@ -1,12 +1,8 @@ -// Code generated by mockery v2.12.2. DO NOT EDIT. +// Code generated by mockery v2.15.0. DO NOT EDIT. package cache -import ( - testing "testing" - - mock "github.com/stretchr/testify/mock" -) +import mock "github.com/stretchr/testify/mock" // MockType is an autogenerated mock type for the Type type type MockType struct { @@ -48,8 +44,13 @@ func (_m *MockType) RegisterOptions() RegisterOptions { return r0 } -// NewMockType creates a new instance of MockType. It also registers the testing.TB interface on the mock and a cleanup function to assert the mocks expectations. -func NewMockType(t testing.TB) *MockType { +type mockConstructorTestingTNewMockType interface { + mock.TestingT + Cleanup(func()) +} + +// NewMockType creates a new instance of MockType. It also registers a testing interface on the mock and a cleanup function to assert the mocks expectations. +func NewMockType(t mockConstructorTestingTNewMockType) *MockType { mock := &MockType{} mock.Mock.Test(t) diff --git a/agent/connect/ca/mock_Provider.go b/agent/connect/ca/mock_Provider.go index 5c9bfcfcd..50136cbc7 100644 --- a/agent/connect/ca/mock_Provider.go +++ b/agent/connect/ca/mock_Provider.go @@ -1,13 +1,11 @@ -// Code generated by mockery v2.11.0. DO NOT EDIT. +// Code generated by mockery v2.15.0. DO NOT EDIT. package ca import ( - testing "testing" + x509 "crypto/x509" mock "github.com/stretchr/testify/mock" - - x509 "crypto/x509" ) // MockProvider is an autogenerated mock type for the Provider type @@ -155,13 +153,13 @@ func (_m *MockProvider) GenerateRoot() (RootResult, error) { return r0, r1 } -// SetIntermediate provides a mock function with given fields: intermediatePEM, rootPEM -func (_m *MockProvider) SetIntermediate(intermediatePEM string, rootPEM string, keyId string) error { - ret := _m.Called(intermediatePEM, rootPEM, keyId) +// SetIntermediate provides a mock function with given fields: intermediatePEM, rootPEM, opaque +func (_m *MockProvider) SetIntermediate(intermediatePEM string, rootPEM string, opaque string) error { + ret := _m.Called(intermediatePEM, rootPEM, opaque) var r0 error if rf, ok := ret.Get(0).(func(string, string, string) error); ok { - r0 = rf(intermediatePEM, rootPEM, keyId) + r0 = rf(intermediatePEM, rootPEM, opaque) } else { r0 = ret.Error(0) } @@ -255,9 +253,15 @@ func (_m *MockProvider) SupportsCrossSigning() (bool, error) { return r0, r1 } -// NewMockProvider creates a new instance of MockProvider. It also registers a cleanup function to assert the mocks expectations. -func NewMockProvider(t testing.TB) *MockProvider { +type mockConstructorTestingTNewMockProvider interface { + mock.TestingT + Cleanup(func()) +} + +// NewMockProvider creates a new instance of MockProvider. It also registers a testing interface on the mock and a cleanup function to assert the mocks expectations. +func NewMockProvider(t mockConstructorTestingTNewMockProvider) *MockProvider { mock := &MockProvider{} + mock.Mock.Test(t) t.Cleanup(func() { mock.AssertExpectations(t) }) diff --git a/agent/consul/auth/mock_ACLCache.go b/agent/consul/auth/mock_ACLCache.go index e8e5c6828..2df0b0e38 100644 --- a/agent/consul/auth/mock_ACLCache.go +++ b/agent/consul/auth/mock_ACLCache.go @@ -1,12 +1,8 @@ -// Code generated by mockery v2.12.0. DO NOT EDIT. +// Code generated by mockery v2.15.0. DO NOT EDIT. package auth -import ( - testing "testing" - - mock "github.com/stretchr/testify/mock" -) +import mock "github.com/stretchr/testify/mock" // MockACLCache is an autogenerated mock type for the ACLCache type type MockACLCache struct { @@ -18,8 +14,13 @@ func (_m *MockACLCache) RemoveIdentityWithSecretToken(secretToken string) { _m.Called(secretToken) } -// NewMockACLCache creates a new instance of MockACLCache. It also registers the testing.TB interface on the mock and a cleanup function to assert the mocks expectations. -func NewMockACLCache(t testing.TB) *MockACLCache { +type mockConstructorTestingTNewMockACLCache interface { + mock.TestingT + Cleanup(func()) +} + +// NewMockACLCache creates a new instance of MockACLCache. It also registers a testing interface on the mock and a cleanup function to assert the mocks expectations. +func NewMockACLCache(t mockConstructorTestingTNewMockACLCache) *MockACLCache { mock := &MockACLCache{} mock.Mock.Test(t) diff --git a/agent/consul/autopilotevents/mock_Publisher_test.go b/agent/consul/autopilotevents/mock_Publisher_test.go index c0a736be3..aeaf39f5b 100644 --- a/agent/consul/autopilotevents/mock_Publisher_test.go +++ b/agent/consul/autopilotevents/mock_Publisher_test.go @@ -1,10 +1,8 @@ -// Code generated by mockery v2.12.2. DO NOT EDIT. +// Code generated by mockery v2.15.0. DO NOT EDIT. package autopilotevents import ( - testing "testing" - stream "github.com/hashicorp/consul/agent/consul/stream" mock "github.com/stretchr/testify/mock" ) @@ -19,8 +17,13 @@ func (_m *MockPublisher) Publish(_a0 []stream.Event) { _m.Called(_a0) } -// NewMockPublisher creates a new instance of MockPublisher. It also registers the testing.TB interface on the mock and a cleanup function to assert the mocks expectations. -func NewMockPublisher(t testing.TB) *MockPublisher { +type mockConstructorTestingTNewMockPublisher interface { + mock.TestingT + Cleanup(func()) +} + +// NewMockPublisher creates a new instance of MockPublisher. It also registers a testing interface on the mock and a cleanup function to assert the mocks expectations. +func NewMockPublisher(t mockConstructorTestingTNewMockPublisher) *MockPublisher { mock := &MockPublisher{} mock.Mock.Test(t) diff --git a/agent/consul/autopilotevents/mock_StateStore_test.go b/agent/consul/autopilotevents/mock_StateStore_test.go index dd048e58e..9391f21da 100644 --- a/agent/consul/autopilotevents/mock_StateStore_test.go +++ b/agent/consul/autopilotevents/mock_StateStore_test.go @@ -1,4 +1,4 @@ -// Code generated by mockery v2.12.2. DO NOT EDIT. +// Code generated by mockery v2.15.0. DO NOT EDIT. package autopilotevents @@ -10,8 +10,6 @@ import ( structs "github.com/hashicorp/consul/agent/structs" - testing "testing" - types "github.com/hashicorp/consul/types" ) @@ -80,8 +78,13 @@ func (_m *MockStateStore) NodeService(ws memdb.WatchSet, nodeName string, servic return r0, r1, r2 } -// NewMockStateStore creates a new instance of MockStateStore. It also registers the testing.TB interface on the mock and a cleanup function to assert the mocks expectations. -func NewMockStateStore(t testing.TB) *MockStateStore { +type mockConstructorTestingTNewMockStateStore interface { + mock.TestingT + Cleanup(func()) +} + +// NewMockStateStore creates a new instance of MockStateStore. It also registers a testing interface on the mock and a cleanup function to assert the mocks expectations. +func NewMockStateStore(t mockConstructorTestingTNewMockStateStore) *MockStateStore { mock := &MockStateStore{} mock.Mock.Test(t) diff --git a/agent/consul/autopilotevents/mock_timeProvider_test.go b/agent/consul/autopilotevents/mock_timeProvider_test.go index 4640fadde..ccc312df3 100644 --- a/agent/consul/autopilotevents/mock_timeProvider_test.go +++ b/agent/consul/autopilotevents/mock_timeProvider_test.go @@ -1,13 +1,11 @@ -// Code generated by mockery v2.12.2. DO NOT EDIT. +// Code generated by mockery v2.15.0. DO NOT EDIT. package autopilotevents import ( - testing "testing" + time "time" mock "github.com/stretchr/testify/mock" - - time "time" ) // mockTimeProvider is an autogenerated mock type for the timeProvider type @@ -29,8 +27,13 @@ func (_m *mockTimeProvider) Now() time.Time { return r0 } -// newMockTimeProvider creates a new instance of mockTimeProvider. It also registers the testing.TB interface on the mock and a cleanup function to assert the mocks expectations. -func newMockTimeProvider(t testing.TB) *mockTimeProvider { +type mockConstructorTestingTnewMockTimeProvider interface { + mock.TestingT + Cleanup(func()) +} + +// newMockTimeProvider creates a new instance of mockTimeProvider. It also registers a testing interface on the mock and a cleanup function to assert the mocks expectations. +func newMockTimeProvider(t mockConstructorTestingTnewMockTimeProvider) *mockTimeProvider { mock := &mockTimeProvider{} mock.Mock.Test(t) diff --git a/agent/consul/rate/handler.go b/agent/consul/rate/handler.go index c28a4562c..ddd83b120 100644 --- a/agent/consul/rate/handler.go +++ b/agent/consul/rate/handler.go @@ -105,7 +105,7 @@ type Operation struct { Type OperationType } -//go:generate mockery --name RequestLimitsHandler --inpackage --filename mock_RequestLimitsHandler_test.go +//go:generate mockery --name RequestLimitsHandler --inpackage type RequestLimitsHandler interface { Run(ctx context.Context) Allow(op Operation) error diff --git a/agent/consul/watch/mock_StateStore_test.go b/agent/consul/watch/mock_StateStore_test.go index 08d58e2f0..37cc38366 100644 --- a/agent/consul/watch/mock_StateStore_test.go +++ b/agent/consul/watch/mock_StateStore_test.go @@ -1,12 +1,8 @@ -// Code generated by mockery v2.12.2. DO NOT EDIT. +// Code generated by mockery v2.15.0. DO NOT EDIT. package watch -import ( - testing "testing" - - mock "github.com/stretchr/testify/mock" -) +import mock "github.com/stretchr/testify/mock" // MockStateStore is an autogenerated mock type for the StateStore type type MockStateStore struct { @@ -29,8 +25,13 @@ func (_m *MockStateStore) AbandonCh() <-chan struct{} { return r0 } -// NewMockStateStore creates a new instance of MockStateStore. It also registers the testing.TB interface on the mock and a cleanup function to assert the mocks expectations. -func NewMockStateStore(t testing.TB) *MockStateStore { +type mockConstructorTestingTNewMockStateStore interface { + mock.TestingT + Cleanup(func()) +} + +// NewMockStateStore creates a new instance of MockStateStore. It also registers a testing interface on the mock and a cleanup function to assert the mocks expectations. +func NewMockStateStore(t mockConstructorTestingTNewMockStateStore) *MockStateStore { mock := &MockStateStore{} mock.Mock.Test(t) diff --git a/agent/grpc-external/limiter/limiter.go b/agent/grpc-external/limiter/limiter.go index 115ce0623..9c0fcf795 100644 --- a/agent/grpc-external/limiter/limiter.go +++ b/agent/grpc-external/limiter/limiter.go @@ -213,6 +213,10 @@ func (l *SessionLimiter) deleteSessionWithID(id uint64) { l.deleteSessionLocked(idx, id) } +// SessionTerminatedChan is a channel that will be closed to notify session- +// holders that a session has been terminated. +type SessionTerminatedChan <-chan struct{} + // Session allows its holder to perform an operation (e.g. serve a gRPC stream) // concurrenly with other session-holders. Sessions may be terminated abruptly // by the SessionLimiter, so it is the responsibility of the holder to receive @@ -228,7 +232,7 @@ type Session interface { // // The session-holder MUST receive on it and exit (e.g. close the gRPC stream) // when it is closed. - Terminated() <-chan struct{} + Terminated() SessionTerminatedChan } type session struct { @@ -240,6 +244,6 @@ type session struct { func (s *session) End() { s.l.deleteSessionWithID(s.id) } -func (s *session) Terminated() <-chan struct{} { return s.termCh } +func (s *session) Terminated() SessionTerminatedChan { return s.termCh } func (s *session) terminate() { close(s.termCh) } diff --git a/agent/grpc-external/services/acl/mock_Login.go b/agent/grpc-external/services/acl/mock_Login.go index 3c33169a8..d20d676d4 100644 --- a/agent/grpc-external/services/acl/mock_Login.go +++ b/agent/grpc-external/services/acl/mock_Login.go @@ -1,4 +1,4 @@ -// Code generated by mockery v2.12.0. DO NOT EDIT. +// Code generated by mockery v2.15.0. DO NOT EDIT. package acl @@ -7,8 +7,6 @@ import ( mock "github.com/stretchr/testify/mock" structs "github.com/hashicorp/consul/agent/structs" - - testing "testing" ) // MockLogin is an autogenerated mock type for the Login type @@ -39,8 +37,13 @@ func (_m *MockLogin) TokenForVerifiedIdentity(identity *authmethod.Identity, aut return r0, r1 } -// NewMockLogin creates a new instance of MockLogin. It also registers the testing.TB interface on the mock and a cleanup function to assert the mocks expectations. -func NewMockLogin(t testing.TB) *MockLogin { +type mockConstructorTestingTNewMockLogin interface { + mock.TestingT + Cleanup(func()) +} + +// NewMockLogin creates a new instance of MockLogin. It also registers a testing interface on the mock and a cleanup function to assert the mocks expectations. +func NewMockLogin(t mockConstructorTestingTNewMockLogin) *MockLogin { mock := &MockLogin{} mock.Mock.Test(t) diff --git a/agent/grpc-external/services/acl/mock_TokenWriter.go b/agent/grpc-external/services/acl/mock_TokenWriter.go index 19408afc8..a0a31b948 100644 --- a/agent/grpc-external/services/acl/mock_TokenWriter.go +++ b/agent/grpc-external/services/acl/mock_TokenWriter.go @@ -1,12 +1,8 @@ -// Code generated by mockery v2.12.0. DO NOT EDIT. +// Code generated by mockery v2.15.0. DO NOT EDIT. package acl -import ( - testing "testing" - - mock "github.com/stretchr/testify/mock" -) +import mock "github.com/stretchr/testify/mock" // MockTokenWriter is an autogenerated mock type for the TokenWriter type type MockTokenWriter struct { @@ -27,8 +23,13 @@ func (_m *MockTokenWriter) Delete(secretID string, fromLogout bool) error { return r0 } -// NewMockTokenWriter creates a new instance of MockTokenWriter. It also registers the testing.TB interface on the mock and a cleanup function to assert the mocks expectations. -func NewMockTokenWriter(t testing.TB) *MockTokenWriter { +type mockConstructorTestingTNewMockTokenWriter interface { + mock.TestingT + Cleanup(func()) +} + +// NewMockTokenWriter creates a new instance of MockTokenWriter. It also registers a testing interface on the mock and a cleanup function to assert the mocks expectations. +func NewMockTokenWriter(t mockConstructorTestingTNewMockTokenWriter) *MockTokenWriter { mock := &MockTokenWriter{} mock.Mock.Test(t) diff --git a/agent/grpc-external/services/acl/mock_Validator.go b/agent/grpc-external/services/acl/mock_Validator.go index 3c27ec38b..3436762d2 100644 --- a/agent/grpc-external/services/acl/mock_Validator.go +++ b/agent/grpc-external/services/acl/mock_Validator.go @@ -1,4 +1,4 @@ -// Code generated by mockery v2.12.0. DO NOT EDIT. +// Code generated by mockery v2.15.0. DO NOT EDIT. package acl @@ -8,8 +8,6 @@ import ( authmethod "github.com/hashicorp/consul/agent/consul/authmethod" mock "github.com/stretchr/testify/mock" - - testing "testing" ) // MockValidator is an autogenerated mock type for the Validator type @@ -40,8 +38,13 @@ func (_m *MockValidator) ValidateLogin(ctx context.Context, loginToken string) ( return r0, r1 } -// NewMockValidator creates a new instance of MockValidator. It also registers the testing.TB interface on the mock and a cleanup function to assert the mocks expectations. -func NewMockValidator(t testing.TB) *MockValidator { +type mockConstructorTestingTNewMockValidator interface { + mock.TestingT + Cleanup(func()) +} + +// NewMockValidator creates a new instance of MockValidator. It also registers a testing interface on the mock and a cleanup function to assert the mocks expectations. +func NewMockValidator(t mockConstructorTestingTNewMockValidator) *MockValidator { mock := &MockValidator{} mock.Mock.Test(t) diff --git a/agent/grpc-external/services/connectca/mock_ACLResolver.go b/agent/grpc-external/services/connectca/mock_ACLResolver.go index 24fb26a22..9c2f2cac2 100644 --- a/agent/grpc-external/services/connectca/mock_ACLResolver.go +++ b/agent/grpc-external/services/connectca/mock_ACLResolver.go @@ -1,4 +1,4 @@ -// Code generated by mockery v2.12.0. DO NOT EDIT. +// Code generated by mockery v2.15.0. DO NOT EDIT. package connectca @@ -7,8 +7,6 @@ import ( mock "github.com/stretchr/testify/mock" resolver "github.com/hashicorp/consul/acl/resolver" - - testing "testing" ) // MockACLResolver is an autogenerated mock type for the ACLResolver type @@ -37,8 +35,13 @@ func (_m *MockACLResolver) ResolveTokenAndDefaultMeta(token string, entMeta *acl return r0, r1 } -// NewMockACLResolver creates a new instance of MockACLResolver. It also registers the testing.TB interface on the mock and a cleanup function to assert the mocks expectations. -func NewMockACLResolver(t testing.TB) *MockACLResolver { +type mockConstructorTestingTNewMockACLResolver interface { + mock.TestingT + Cleanup(func()) +} + +// NewMockACLResolver creates a new instance of MockACLResolver. It also registers a testing interface on the mock and a cleanup function to assert the mocks expectations. +func NewMockACLResolver(t mockConstructorTestingTNewMockACLResolver) *MockACLResolver { mock := &MockACLResolver{} mock.Mock.Test(t) diff --git a/agent/grpc-external/services/connectca/mock_CAManager.go b/agent/grpc-external/services/connectca/mock_CAManager.go index 2667692c3..296186bbe 100644 --- a/agent/grpc-external/services/connectca/mock_CAManager.go +++ b/agent/grpc-external/services/connectca/mock_CAManager.go @@ -1,4 +1,4 @@ -// Code generated by mockery v2.12.0. DO NOT EDIT. +// Code generated by mockery v2.15.0. DO NOT EDIT. package connectca @@ -8,8 +8,6 @@ import ( structs "github.com/hashicorp/consul/agent/structs" - testing "testing" - x509 "crypto/x509" ) @@ -41,8 +39,13 @@ func (_m *MockCAManager) AuthorizeAndSignCertificate(csr *x509.CertificateReques return r0, r1 } -// NewMockCAManager creates a new instance of MockCAManager. It also registers the testing.TB interface on the mock and a cleanup function to assert the mocks expectations. -func NewMockCAManager(t testing.TB) *MockCAManager { +type mockConstructorTestingTNewMockCAManager interface { + mock.TestingT + Cleanup(func()) +} + +// NewMockCAManager creates a new instance of MockCAManager. It also registers a testing interface on the mock and a cleanup function to assert the mocks expectations. +func NewMockCAManager(t mockConstructorTestingTNewMockCAManager) *MockCAManager { mock := &MockCAManager{} mock.Mock.Test(t) diff --git a/agent/grpc-external/services/dataplane/mock_ACLResolver.go b/agent/grpc-external/services/dataplane/mock_ACLResolver.go index 0408d3a50..781811624 100644 --- a/agent/grpc-external/services/dataplane/mock_ACLResolver.go +++ b/agent/grpc-external/services/dataplane/mock_ACLResolver.go @@ -1,4 +1,4 @@ -// Code generated by mockery v2.12.0. DO NOT EDIT. +// Code generated by mockery v2.15.0. DO NOT EDIT. package dataplane @@ -7,8 +7,6 @@ import ( mock "github.com/stretchr/testify/mock" resolver "github.com/hashicorp/consul/acl/resolver" - - testing "testing" ) // MockACLResolver is an autogenerated mock type for the ACLResolver type @@ -37,8 +35,13 @@ func (_m *MockACLResolver) ResolveTokenAndDefaultMeta(_a0 string, _a1 *acl.Enter return r0, r1 } -// NewMockACLResolver creates a new instance of MockACLResolver. It also registers the testing.TB interface on the mock and a cleanup function to assert the mocks expectations. -func NewMockACLResolver(t testing.TB) *MockACLResolver { +type mockConstructorTestingTNewMockACLResolver interface { + mock.TestingT + Cleanup(func()) +} + +// NewMockACLResolver creates a new instance of MockACLResolver. It also registers a testing interface on the mock and a cleanup function to assert the mocks expectations. +func NewMockACLResolver(t mockConstructorTestingTNewMockACLResolver) *MockACLResolver { mock := &MockACLResolver{} mock.Mock.Test(t) diff --git a/agent/grpc-external/services/peerstream/mock_ACLResolver.go b/agent/grpc-external/services/peerstream/mock_ACLResolver.go index d0e672088..e4027a5da 100644 --- a/agent/grpc-external/services/peerstream/mock_ACLResolver.go +++ b/agent/grpc-external/services/peerstream/mock_ACLResolver.go @@ -1,4 +1,4 @@ -// Code generated by mockery v2.12.2. DO NOT EDIT. +// Code generated by mockery v2.15.0. DO NOT EDIT. package peerstream @@ -7,8 +7,6 @@ import ( mock "github.com/stretchr/testify/mock" resolver "github.com/hashicorp/consul/acl/resolver" - - testing "testing" ) // MockACLResolver is an autogenerated mock type for the ACLResolver type @@ -37,8 +35,13 @@ func (_m *MockACLResolver) ResolveTokenAndDefaultMeta(_a0 string, _a1 *acl.Enter return r0, r1 } -// NewMockACLResolver creates a new instance of MockACLResolver. It also registers the testing.TB interface on the mock and a cleanup function to assert the mocks expectations. -func NewMockACLResolver(t testing.TB) *MockACLResolver { +type mockConstructorTestingTNewMockACLResolver interface { + mock.TestingT + Cleanup(func()) +} + +// NewMockACLResolver creates a new instance of MockACLResolver. It also registers a testing interface on the mock and a cleanup function to assert the mocks expectations. +func NewMockACLResolver(t mockConstructorTestingTNewMockACLResolver) *MockACLResolver { mock := &MockACLResolver{} mock.Mock.Test(t) diff --git a/agent/grpc-external/services/serverdiscovery/mock_ACLResolver.go b/agent/grpc-external/services/serverdiscovery/mock_ACLResolver.go index 850ec8bb9..4192be1ab 100644 --- a/agent/grpc-external/services/serverdiscovery/mock_ACLResolver.go +++ b/agent/grpc-external/services/serverdiscovery/mock_ACLResolver.go @@ -1,4 +1,4 @@ -// Code generated by mockery v2.12.0. DO NOT EDIT. +// Code generated by mockery v2.15.0. DO NOT EDIT. package serverdiscovery @@ -7,8 +7,6 @@ import ( mock "github.com/stretchr/testify/mock" resolver "github.com/hashicorp/consul/acl/resolver" - - testing "testing" ) // MockACLResolver is an autogenerated mock type for the ACLResolver type @@ -37,8 +35,13 @@ func (_m *MockACLResolver) ResolveTokenAndDefaultMeta(_a0 string, _a1 *acl.Enter return r0, r1 } -// NewMockACLResolver creates a new instance of MockACLResolver. It also registers the testing.TB interface on the mock and a cleanup function to assert the mocks expectations. -func NewMockACLResolver(t testing.TB) *MockACLResolver { +type mockConstructorTestingTNewMockACLResolver interface { + mock.TestingT + Cleanup(func()) +} + +// NewMockACLResolver creates a new instance of MockACLResolver. It also registers a testing interface on the mock and a cleanup function to assert the mocks expectations. +func NewMockACLResolver(t mockConstructorTestingTNewMockACLResolver) *MockACLResolver { mock := &MockACLResolver{} mock.Mock.Test(t) diff --git a/agent/hcp/mock_Client.go b/agent/hcp/mock_Client.go index 4466bf238..29bd27cbf 100644 --- a/agent/hcp/mock_Client.go +++ b/agent/hcp/mock_Client.go @@ -1,4 +1,4 @@ -// Code generated by mockery v2.13.1. DO NOT EDIT. +// Code generated by mockery v2.15.0. DO NOT EDIT. package hcp diff --git a/agent/hcp/scada/mock_Provider.go b/agent/hcp/scada/mock_Provider.go index 6e12d63f6..251178095 100644 --- a/agent/hcp/scada/mock_Provider.go +++ b/agent/hcp/scada/mock_Provider.go @@ -1,4 +1,4 @@ -// Code generated by mockery v2.14.0. DO NOT EDIT. +// Code generated by mockery v2.15.0. DO NOT EDIT. package scada diff --git a/agent/proxycfg-sources/catalog/config_source.go b/agent/proxycfg-sources/catalog/config_source.go index 0f86a3a37..3ba1fc83b 100644 --- a/agent/proxycfg-sources/catalog/config_source.go +++ b/agent/proxycfg-sources/catalog/config_source.go @@ -10,6 +10,7 @@ import ( "github.com/hashicorp/consul/acl" "github.com/hashicorp/consul/agent/configentry" + "github.com/hashicorp/consul/agent/grpc-external/limiter" "github.com/hashicorp/consul/agent/local" "github.com/hashicorp/consul/agent/proxycfg" "github.com/hashicorp/consul/agent/structs" @@ -44,13 +45,24 @@ func NewConfigSource(cfg Config) *ConfigSource { // Watch wraps the underlying proxycfg.Manager and dynamically registers // services from the catalog with it when requested by the xDS server. -func (m *ConfigSource) Watch(serviceID structs.ServiceID, nodeName string, token string) (<-chan *proxycfg.ConfigSnapshot, proxycfg.CancelFunc, error) { +func (m *ConfigSource) Watch(serviceID structs.ServiceID, nodeName string, token string) (<-chan *proxycfg.ConfigSnapshot, limiter.SessionTerminatedChan, proxycfg.CancelFunc, error) { // If the service is registered to the local agent, use the LocalConfigSource // rather than trying to configure it from the catalog. if nodeName == m.NodeName && m.LocalState.ServiceExists(serviceID) { return m.LocalConfigSource.Watch(serviceID, nodeName, token) } + // Begin a session with the xDS session concurrency limiter. + // + // We do this here rather than in the xDS server because we don't want to apply + // the limit to services from the LocalConfigSource. + // + // See: https://github.com/hashicorp/consul/issues/15753 + session, err := m.SessionLimiter.BeginSession() + if err != nil { + return nil, nil, nil, err + } + proxyID := proxycfg.ProxyID{ ServiceID: serviceID, NodeName: nodeName, @@ -66,6 +78,7 @@ func (m *ConfigSource) Watch(serviceID structs.ServiceID, nodeName string, token cancelOnce.Do(func() { cancelWatch() m.cleanup(proxyID) + session.End() }) } @@ -82,11 +95,12 @@ func (m *ConfigSource) Watch(serviceID structs.ServiceID, nodeName string, token if err := m.startSync(w.closeCh, proxyID); err != nil { delete(m.watches, proxyID) cancelWatch() - return nil, nil, err + session.End() + return nil, nil, nil, err } } - return snapCh, cancel, nil + return snapCh, session.Terminated(), cancel, nil } func (m *ConfigSource) Shutdown() { @@ -252,6 +266,9 @@ type Config struct { // Logger will be used to write log messages. Logger hclog.Logger + + // SessionLimiter is used to enforce xDS concurrency limits. + SessionLimiter SessionLimiter } //go:generate mockery --name ConfigManager --inpackage @@ -269,5 +286,10 @@ type Store interface { //go:generate mockery --name Watcher --inpackage type Watcher interface { - Watch(proxyID structs.ServiceID, nodeName string, token string) (<-chan *proxycfg.ConfigSnapshot, proxycfg.CancelFunc, error) + Watch(proxyID structs.ServiceID, nodeName string, token string) (<-chan *proxycfg.ConfigSnapshot, limiter.SessionTerminatedChan, proxycfg.CancelFunc, error) +} + +//go:generate mockery --name SessionLimiter --inpackage +type SessionLimiter interface { + BeginSession() (limiter.Session, error) } diff --git a/agent/proxycfg-sources/catalog/config_source_test.go b/agent/proxycfg-sources/catalog/config_source_test.go index cf34103c9..c6c0e9d8b 100644 --- a/agent/proxycfg-sources/catalog/config_source_test.go +++ b/agent/proxycfg-sources/catalog/config_source_test.go @@ -11,6 +11,7 @@ import ( "github.com/hashicorp/consul/agent/consul/state" "github.com/hashicorp/consul/agent/consul/stream" + "github.com/hashicorp/consul/agent/grpc-external/limiter" "github.com/hashicorp/consul/agent/local" "github.com/hashicorp/consul/agent/proxycfg" "github.com/hashicorp/consul/agent/structs" @@ -47,16 +48,33 @@ func TestConfigSource_Success(t *testing.T) { // behavior without sleeping which leads to slow/racy tests. cfgMgr := testConfigManager(t, serviceID, nodeName, token) + lim := NewMockSessionLimiter(t) + + session1 := newMockSession(t) + session1TermCh := make(limiter.SessionTerminatedChan) + session1.On("Terminated").Return(session1TermCh) + session1.On("End").Return() + + session2 := newMockSession(t) + session2TermCh := make(limiter.SessionTerminatedChan) + session2.On("Terminated").Return(session2TermCh) + session2.On("End").Return() + + lim.On("BeginSession").Return(session1, nil).Once() + lim.On("BeginSession").Return(session2, nil).Once() + mgr := NewConfigSource(Config{ - Manager: cfgMgr, - LocalState: testLocalState(t), - Logger: hclog.NewNullLogger(), - GetStore: func() Store { return store }, + Manager: cfgMgr, + LocalState: testLocalState(t), + Logger: hclog.NewNullLogger(), + GetStore: func() Store { return store }, + SessionLimiter: lim, }) t.Cleanup(mgr.Shutdown) - snapCh, cancelWatch1, err := mgr.Watch(serviceID, nodeName, token) + snapCh, termCh, cancelWatch1, err := mgr.Watch(serviceID, nodeName, token) require.NoError(t, err) + require.Equal(t, session1TermCh, termCh) // Expect Register to have been called with the proxy's inital port. select { @@ -111,8 +129,9 @@ func TestConfigSource_Success(t *testing.T) { } // Start another watch. - _, cancelWatch2, err := mgr.Watch(serviceID, nodeName, token) + _, termCh2, cancelWatch2, err := mgr.Watch(serviceID, nodeName, token) require.NoError(t, err) + require.Equal(t, session2TermCh, termCh2) // Expect the service to have not been re-registered by the second watch. select { @@ -137,6 +156,9 @@ func TestConfigSource_Success(t *testing.T) { case <-time.After(100 * time.Millisecond): t.Fatal("timeout waiting for service to be de-registered") } + + session1.AssertCalled(t, "End") + session2.AssertCalled(t, "End") } func TestConfigSource_LocallyManagedService(t *testing.T) { @@ -149,7 +171,7 @@ func TestConfigSource_LocallyManagedService(t *testing.T) { localWatcher := NewMockWatcher(t) localWatcher.On("Watch", serviceID, nodeName, token). - Return(make(<-chan *proxycfg.ConfigSnapshot), proxycfg.CancelFunc(func() {}), nil) + Return(make(<-chan *proxycfg.ConfigSnapshot), nil, proxycfg.CancelFunc(func() {}), nil) mgr := NewConfigSource(Config{ NodeName: nodeName, @@ -157,10 +179,11 @@ func TestConfigSource_LocallyManagedService(t *testing.T) { LocalConfigSource: localWatcher, Logger: hclog.NewNullLogger(), GetStore: func() Store { panic("state store shouldn't have been used") }, + SessionLimiter: nullSessionLimiter{}, }) t.Cleanup(mgr.Shutdown) - _, _, err := mgr.Watch(serviceID, nodeName, token) + _, _, _, err := mgr.Watch(serviceID, nodeName, token) require.NoError(t, err) } @@ -192,17 +215,26 @@ func TestConfigSource_ErrorRegisteringService(t *testing.T) { cfgMgr.On("Register", mock.Anything, mock.Anything, mock.Anything, mock.Anything, mock.Anything). Return(errors.New("KABOOM")) + session := newMockSession(t) + session.On("End").Return() + + lim := NewMockSessionLimiter(t) + lim.On("BeginSession").Return(session, nil) + mgr := NewConfigSource(Config{ - Manager: cfgMgr, - LocalState: testLocalState(t), - Logger: hclog.NewNullLogger(), - GetStore: func() Store { return store }, + Manager: cfgMgr, + LocalState: testLocalState(t), + Logger: hclog.NewNullLogger(), + GetStore: func() Store { return store }, + SessionLimiter: lim, }) t.Cleanup(mgr.Shutdown) - _, _, err := mgr.Watch(serviceID, nodeName, token) + _, _, _, err := mgr.Watch(serviceID, nodeName, token) require.Error(t, err) require.True(t, canceledWatch, "watch should've been canceled") + + session.AssertCalled(t, "End") } func TestConfigSource_NotProxyService(t *testing.T) { @@ -231,19 +263,38 @@ func TestConfigSource_NotProxyService(t *testing.T) { Return(make(<-chan *proxycfg.ConfigSnapshot), cancel) mgr := NewConfigSource(Config{ - Manager: cfgMgr, - LocalState: testLocalState(t), - Logger: hclog.NewNullLogger(), - GetStore: func() Store { return store }, + Manager: cfgMgr, + LocalState: testLocalState(t), + Logger: hclog.NewNullLogger(), + GetStore: func() Store { return store }, + SessionLimiter: nullSessionLimiter{}, }) t.Cleanup(mgr.Shutdown) - _, _, err := mgr.Watch(serviceID, nodeName, token) + _, _, _, err := mgr.Watch(serviceID, nodeName, token) require.Error(t, err) require.Contains(t, err.Error(), "must be a sidecar proxy or gateway") require.True(t, canceledWatch, "watch should've been canceled") } +func TestConfigSource_SessionLimiterError(t *testing.T) { + lim := NewMockSessionLimiter(t) + lim.On("BeginSession").Return(nil, limiter.ErrCapacityReached) + + src := NewConfigSource(Config{ + LocalState: testLocalState(t), + SessionLimiter: lim, + }) + t.Cleanup(src.Shutdown) + + _, _, _, err := src.Watch( + structs.NewServiceID("web-sidecar-proxy-1", nil), + "node-name", + "token", + ) + require.Equal(t, limiter.ErrCapacityReached, err) +} + func testConfigManager(t *testing.T, serviceID structs.ServiceID, nodeName string, token string) ConfigManager { t.Helper() @@ -294,3 +345,34 @@ func testLocalState(t *testing.T) *local.State { l.TriggerSyncChanges = func() {} return l } + +type nullSessionLimiter struct{} + +func (nullSessionLimiter) BeginSession() (limiter.Session, error) { + return nullSession{}, nil +} + +type nullSession struct{} + +func (nullSession) End() {} + +func (nullSession) Terminated() limiter.SessionTerminatedChan { return nil } + +type mockSession struct { + mock.Mock +} + +func newMockSession(t *testing.T) *mockSession { + m := &mockSession{} + m.Mock.Test(t) + + t.Cleanup(func() { m.AssertExpectations(t) }) + + return m +} + +func (m *mockSession) End() { m.Called() } + +func (m *mockSession) Terminated() limiter.SessionTerminatedChan { + return m.Called().Get(0).(limiter.SessionTerminatedChan) +} diff --git a/agent/proxycfg-sources/catalog/mock_ConfigManager.go b/agent/proxycfg-sources/catalog/mock_ConfigManager.go index 047b61c87..3ae51c5f6 100644 --- a/agent/proxycfg-sources/catalog/mock_ConfigManager.go +++ b/agent/proxycfg-sources/catalog/mock_ConfigManager.go @@ -1,4 +1,4 @@ -// Code generated by mockery v2.12.0. DO NOT EDIT. +// Code generated by mockery v2.15.0. DO NOT EDIT. package catalog @@ -7,8 +7,6 @@ import ( mock "github.com/stretchr/testify/mock" structs "github.com/hashicorp/consul/agent/structs" - - testing "testing" ) // MockConfigManager is an autogenerated mock type for the ConfigManager type @@ -60,8 +58,13 @@ func (_m *MockConfigManager) Watch(req proxycfg.ProxyID) (<-chan *proxycfg.Confi return r0, r1 } -// NewMockConfigManager creates a new instance of MockConfigManager. It also registers the testing.TB interface on the mock and a cleanup function to assert the mocks expectations. -func NewMockConfigManager(t testing.TB) *MockConfigManager { +type mockConstructorTestingTNewMockConfigManager interface { + mock.TestingT + Cleanup(func()) +} + +// NewMockConfigManager creates a new instance of MockConfigManager. It also registers a testing interface on the mock and a cleanup function to assert the mocks expectations. +func NewMockConfigManager(t mockConstructorTestingTNewMockConfigManager) *MockConfigManager { mock := &MockConfigManager{} mock.Mock.Test(t) diff --git a/agent/proxycfg-sources/catalog/mock_SessionLimiter.go b/agent/proxycfg-sources/catalog/mock_SessionLimiter.go new file mode 100644 index 000000000..3b7147cb0 --- /dev/null +++ b/agent/proxycfg-sources/catalog/mock_SessionLimiter.go @@ -0,0 +1,51 @@ +// Code generated by mockery v2.15.0. DO NOT EDIT. + +package catalog + +import ( + limiter "github.com/hashicorp/consul/agent/grpc-external/limiter" + mock "github.com/stretchr/testify/mock" +) + +// MockSessionLimiter is an autogenerated mock type for the SessionLimiter type +type MockSessionLimiter struct { + mock.Mock +} + +// BeginSession provides a mock function with given fields: +func (_m *MockSessionLimiter) BeginSession() (limiter.Session, error) { + ret := _m.Called() + + var r0 limiter.Session + if rf, ok := ret.Get(0).(func() limiter.Session); ok { + r0 = rf() + } else { + if ret.Get(0) != nil { + r0 = ret.Get(0).(limiter.Session) + } + } + + var r1 error + if rf, ok := ret.Get(1).(func() error); ok { + r1 = rf() + } else { + r1 = ret.Error(1) + } + + return r0, r1 +} + +type mockConstructorTestingTNewMockSessionLimiter interface { + mock.TestingT + Cleanup(func()) +} + +// NewMockSessionLimiter creates a new instance of MockSessionLimiter. It also registers a testing interface on the mock and a cleanup function to assert the mocks expectations. +func NewMockSessionLimiter(t mockConstructorTestingTNewMockSessionLimiter) *MockSessionLimiter { + mock := &MockSessionLimiter{} + mock.Mock.Test(t) + + t.Cleanup(func() { mock.AssertExpectations(t) }) + + return mock +} diff --git a/agent/proxycfg-sources/catalog/mock_Watcher.go b/agent/proxycfg-sources/catalog/mock_Watcher.go index 193b7d844..d5ca046a4 100644 --- a/agent/proxycfg-sources/catalog/mock_Watcher.go +++ b/agent/proxycfg-sources/catalog/mock_Watcher.go @@ -1,14 +1,14 @@ -// Code generated by mockery v2.12.0. DO NOT EDIT. +// Code generated by mockery v2.15.0. DO NOT EDIT. package catalog import ( - proxycfg "github.com/hashicorp/consul/agent/proxycfg" + limiter "github.com/hashicorp/consul/agent/grpc-external/limiter" mock "github.com/stretchr/testify/mock" - structs "github.com/hashicorp/consul/agent/structs" + proxycfg "github.com/hashicorp/consul/agent/proxycfg" - testing "testing" + structs "github.com/hashicorp/consul/agent/structs" ) // MockWatcher is an autogenerated mock type for the Watcher type @@ -17,7 +17,7 @@ type MockWatcher struct { } // Watch provides a mock function with given fields: proxyID, nodeName, token -func (_m *MockWatcher) Watch(proxyID structs.ServiceID, nodeName string, token string) (<-chan *proxycfg.ConfigSnapshot, proxycfg.CancelFunc, error) { +func (_m *MockWatcher) Watch(proxyID structs.ServiceID, nodeName string, token string) (<-chan *proxycfg.ConfigSnapshot, limiter.SessionTerminatedChan, proxycfg.CancelFunc, error) { ret := _m.Called(proxyID, nodeName, token) var r0 <-chan *proxycfg.ConfigSnapshot @@ -29,27 +29,41 @@ func (_m *MockWatcher) Watch(proxyID structs.ServiceID, nodeName string, token s } } - var r1 proxycfg.CancelFunc - if rf, ok := ret.Get(1).(func(structs.ServiceID, string, string) proxycfg.CancelFunc); ok { + var r1 limiter.SessionTerminatedChan + if rf, ok := ret.Get(1).(func(structs.ServiceID, string, string) limiter.SessionTerminatedChan); ok { r1 = rf(proxyID, nodeName, token) } else { if ret.Get(1) != nil { - r1 = ret.Get(1).(proxycfg.CancelFunc) + r1 = ret.Get(1).(limiter.SessionTerminatedChan) } } - var r2 error - if rf, ok := ret.Get(2).(func(structs.ServiceID, string, string) error); ok { + var r2 proxycfg.CancelFunc + if rf, ok := ret.Get(2).(func(structs.ServiceID, string, string) proxycfg.CancelFunc); ok { r2 = rf(proxyID, nodeName, token) } else { - r2 = ret.Error(2) + if ret.Get(2) != nil { + r2 = ret.Get(2).(proxycfg.CancelFunc) + } } - return r0, r1, r2 + var r3 error + if rf, ok := ret.Get(3).(func(structs.ServiceID, string, string) error); ok { + r3 = rf(proxyID, nodeName, token) + } else { + r3 = ret.Error(3) + } + + return r0, r1, r2, r3 } -// NewMockWatcher creates a new instance of MockWatcher. It also registers the testing.TB interface on the mock and a cleanup function to assert the mocks expectations. -func NewMockWatcher(t testing.TB) *MockWatcher { +type mockConstructorTestingTNewMockWatcher interface { + mock.TestingT + Cleanup(func()) +} + +// NewMockWatcher creates a new instance of MockWatcher. It also registers a testing interface on the mock and a cleanup function to assert the mocks expectations. +func NewMockWatcher(t mockConstructorTestingTNewMockWatcher) *MockWatcher { mock := &MockWatcher{} mock.Mock.Test(t) diff --git a/agent/proxycfg-sources/local/config_source.go b/agent/proxycfg-sources/local/config_source.go index b23316d53..903ad7a6b 100644 --- a/agent/proxycfg-sources/local/config_source.go +++ b/agent/proxycfg-sources/local/config_source.go @@ -1,6 +1,7 @@ package local import ( + "github.com/hashicorp/consul/agent/grpc-external/limiter" "github.com/hashicorp/consul/agent/proxycfg" structs "github.com/hashicorp/consul/agent/structs" ) @@ -16,7 +17,7 @@ func NewConfigSource(cfgMgr ConfigManager) *ConfigSource { return &ConfigSource{cfgMgr} } -func (m *ConfigSource) Watch(serviceID structs.ServiceID, nodeName string, _ string) (<-chan *proxycfg.ConfigSnapshot, proxycfg.CancelFunc, error) { +func (m *ConfigSource) Watch(serviceID structs.ServiceID, nodeName string, _ string) (<-chan *proxycfg.ConfigSnapshot, limiter.SessionTerminatedChan, proxycfg.CancelFunc, error) { watchCh, cancelWatch := m.manager.Watch(proxycfg.ProxyID{ ServiceID: serviceID, NodeName: nodeName, @@ -27,5 +28,5 @@ func (m *ConfigSource) Watch(serviceID structs.ServiceID, nodeName string, _ str // is checked before the watch is created). Token: "", }) - return watchCh, cancelWatch, nil + return watchCh, nil, cancelWatch, nil } diff --git a/agent/proxycfg-sources/local/mock_ConfigManager.go b/agent/proxycfg-sources/local/mock_ConfigManager.go index 0f77ce065..8f2c8fc6c 100644 --- a/agent/proxycfg-sources/local/mock_ConfigManager.go +++ b/agent/proxycfg-sources/local/mock_ConfigManager.go @@ -1,4 +1,4 @@ -// Code generated by mockery v2.12.0. DO NOT EDIT. +// Code generated by mockery v2.15.0. DO NOT EDIT. package local @@ -7,8 +7,6 @@ import ( mock "github.com/stretchr/testify/mock" structs "github.com/hashicorp/consul/agent/structs" - - testing "testing" ) // MockConfigManager is an autogenerated mock type for the ConfigManager type @@ -76,8 +74,13 @@ func (_m *MockConfigManager) Watch(id proxycfg.ProxyID) (<-chan *proxycfg.Config return r0, r1 } -// NewMockConfigManager creates a new instance of MockConfigManager. It also registers the testing.TB interface on the mock and a cleanup function to assert the mocks expectations. -func NewMockConfigManager(t testing.TB) *MockConfigManager { +type mockConstructorTestingTNewMockConfigManager interface { + mock.TestingT + Cleanup(func()) +} + +// NewMockConfigManager creates a new instance of MockConfigManager. It also registers a testing interface on the mock and a cleanup function to assert the mocks expectations. +func NewMockConfigManager(t mockConstructorTestingTNewMockConfigManager) *MockConfigManager { mock := &MockConfigManager{} mock.Mock.Test(t) diff --git a/agent/submatview/mock_ACLResolver.go b/agent/submatview/mock_ACLResolver.go index 70ac4ac33..7e1d810c6 100644 --- a/agent/submatview/mock_ACLResolver.go +++ b/agent/submatview/mock_ACLResolver.go @@ -1,4 +1,4 @@ -// Code generated by mockery v2.12.0. DO NOT EDIT. +// Code generated by mockery v2.15.0. DO NOT EDIT. package submatview @@ -7,8 +7,6 @@ import ( mock "github.com/stretchr/testify/mock" resolver "github.com/hashicorp/consul/acl/resolver" - - testing "testing" ) // MockACLResolver is an autogenerated mock type for the ACLResolver type @@ -37,8 +35,13 @@ func (_m *MockACLResolver) ResolveTokenAndDefaultMeta(token string, entMeta *acl return r0, r1 } -// NewMockACLResolver creates a new instance of MockACLResolver. It also registers the testing.TB interface on the mock and a cleanup function to assert the mocks expectations. -func NewMockACLResolver(t testing.TB) *MockACLResolver { +type mockConstructorTestingTNewMockACLResolver interface { + mock.TestingT + Cleanup(func()) +} + +// NewMockACLResolver creates a new instance of MockACLResolver. It also registers a testing interface on the mock and a cleanup function to assert the mocks expectations. +func NewMockACLResolver(t mockConstructorTestingTNewMockACLResolver) *MockACLResolver { mock := &MockACLResolver{} mock.Mock.Test(t) diff --git a/agent/xds/delta.go b/agent/xds/delta.go index 1394298d3..acb31c85a 100644 --- a/agent/xds/delta.go +++ b/agent/xds/delta.go @@ -3,6 +3,7 @@ package xds import ( "crypto/sha256" "encoding/hex" + "errors" "fmt" "sync" "sync/atomic" @@ -23,6 +24,7 @@ import ( "google.golang.org/protobuf/types/known/anypb" external "github.com/hashicorp/consul/agent/grpc-external" + "github.com/hashicorp/consul/agent/grpc-external/limiter" "github.com/hashicorp/consul/agent/proxycfg" "github.com/hashicorp/consul/agent/structs" "github.com/hashicorp/consul/agent/xds/xdscommon" @@ -88,17 +90,12 @@ func (s *Server) processDelta(stream ADSDeltaStream, reqCh <-chan *envoy_discove return err } - session, err := s.SessionLimiter.BeginSession() - if err != nil { - return errOverwhelmed - } - defer session.End() - // Loop state var ( cfgSnap *proxycfg.ConfigSnapshot node *envoy_config_core_v3.Node stateCh <-chan *proxycfg.ConfigSnapshot + drainCh limiter.SessionTerminatedChan watchCancel func() proxyID structs.ServiceID nonce uint64 // xDS requires a unique nonce to correlate response/request pairs @@ -176,7 +173,7 @@ func (s *Server) processDelta(stream ADSDeltaStream, reqCh <-chan *envoy_discove for { select { - case <-session.Terminated(): + case <-drainCh: generator.Logger.Debug("draining stream to rebalance load") metrics.IncrCounter([]string{"xds", "server", "streamDrained"}, 1) return errOverwhelmed @@ -293,8 +290,11 @@ func (s *Server) processDelta(stream ADSDeltaStream, reqCh <-chan *envoy_discove return status.Errorf(codes.Internal, "failed to watch proxy service: %s", err) } - stateCh, watchCancel, err = s.CfgSrc.Watch(proxyID, nodeName, options.Token) - if err != nil { + stateCh, drainCh, watchCancel, err = s.CfgSrc.Watch(proxyID, nodeName, options.Token) + switch { + case errors.Is(err, limiter.ErrCapacityReached): + return errOverwhelmed + case err != nil: return status.Errorf(codes.Internal, "failed to watch proxy service: %s", err) } // Note that in this case we _intend_ the defer to only be triggered when diff --git a/agent/xds/delta_test.go b/agent/xds/delta_test.go index 55959e495..50ba84af9 100644 --- a/agent/xds/delta_test.go +++ b/agent/xds/delta_test.go @@ -32,7 +32,7 @@ func TestServer_DeltaAggregatedResources_v3_BasicProtocol_TCP(t *testing.T) { // Allow all return acl.RootAuthorizer("manage"), nil } - scenario := newTestServerDeltaScenario(t, aclResolve, "web-sidecar-proxy", "", 0, nil) + scenario := newTestServerDeltaScenario(t, aclResolve, "web-sidecar-proxy", "", 0) mgr, errCh, envoy := scenario.mgr, scenario.errCh, scenario.envoy sid := structs.NewServiceID("web-sidecar-proxy", nil) @@ -231,7 +231,7 @@ func TestServer_DeltaAggregatedResources_v3_NackLoop(t *testing.T) { // Allow all return acl.RootAuthorizer("manage"), nil } - scenario := newTestServerDeltaScenario(t, aclResolve, "web-sidecar-proxy", "", 0, nil) + scenario := newTestServerDeltaScenario(t, aclResolve, "web-sidecar-proxy", "", 0) mgr, errCh, envoy := scenario.mgr, scenario.errCh, scenario.envoy sid := structs.NewServiceID("web-sidecar-proxy", nil) @@ -363,7 +363,7 @@ func TestServer_DeltaAggregatedResources_v3_BasicProtocol_HTTP2(t *testing.T) { // Allow all return acl.RootAuthorizer("manage"), nil } - scenario := newTestServerDeltaScenario(t, aclResolve, "web-sidecar-proxy", "", 0, nil) + scenario := newTestServerDeltaScenario(t, aclResolve, "web-sidecar-proxy", "", 0) mgr, errCh, envoy := scenario.mgr, scenario.errCh, scenario.envoy sid := structs.NewServiceID("web-sidecar-proxy", nil) @@ -515,7 +515,7 @@ func TestServer_DeltaAggregatedResources_v3_SlowEndpointPopulation(t *testing.T) // Allow all return acl.RootAuthorizer("manage"), nil } - scenario := newTestServerDeltaScenario(t, aclResolve, "web-sidecar-proxy", "", 0, nil) + scenario := newTestServerDeltaScenario(t, aclResolve, "web-sidecar-proxy", "", 0) server, mgr, errCh, envoy := scenario.server, scenario.mgr, scenario.errCh, scenario.envoy // This mutateFn causes any endpoint with a name containing "geo-cache" to be @@ -660,7 +660,7 @@ func TestServer_DeltaAggregatedResources_v3_BasicProtocol_TCP_clusterChangesImpa // Allow all return acl.RootAuthorizer("manage"), nil } - scenario := newTestServerDeltaScenario(t, aclResolve, "web-sidecar-proxy", "", 0, nil) + scenario := newTestServerDeltaScenario(t, aclResolve, "web-sidecar-proxy", "", 0) mgr, errCh, envoy := scenario.mgr, scenario.errCh, scenario.envoy sid := structs.NewServiceID("web-sidecar-proxy", nil) @@ -797,7 +797,7 @@ func TestServer_DeltaAggregatedResources_v3_BasicProtocol_HTTP2_RDS_listenerChan // Allow all return acl.RootAuthorizer("manage"), nil } - scenario := newTestServerDeltaScenario(t, aclResolve, "web-sidecar-proxy", "", 0, nil) + scenario := newTestServerDeltaScenario(t, aclResolve, "web-sidecar-proxy", "", 0) mgr, errCh, envoy := scenario.mgr, scenario.errCh, scenario.envoy sid := structs.NewServiceID("web-sidecar-proxy", nil) @@ -1055,7 +1055,7 @@ func TestServer_DeltaAggregatedResources_v3_ACLEnforcement(t *testing.T) { return acl.NewPolicyAuthorizerWithDefaults(acl.RootAuthorizer("deny"), []*acl.Policy{policy}, nil) } - scenario := newTestServerDeltaScenario(t, aclResolve, "web-sidecar-proxy", tt.token, 0, nil) + scenario := newTestServerDeltaScenario(t, aclResolve, "web-sidecar-proxy", tt.token, 0) mgr, errCh, envoy := scenario.mgr, scenario.errCh, scenario.envoy sid := structs.NewServiceID("web-sidecar-proxy", nil) @@ -1154,7 +1154,6 @@ func TestServer_DeltaAggregatedResources_v3_ACLTokenDeleted_StreamTerminatedDuri } scenario := newTestServerDeltaScenario(t, aclResolve, "web-sidecar-proxy", token, 100*time.Millisecond, // Make this short. - nil, ) mgr, errCh, envoy := scenario.mgr, scenario.errCh, scenario.envoy @@ -1253,7 +1252,6 @@ func TestServer_DeltaAggregatedResources_v3_ACLTokenDeleted_StreamTerminatedInBa } scenario := newTestServerDeltaScenario(t, aclResolve, "web-sidecar-proxy", token, 100*time.Millisecond, // Make this short. - nil, ) mgr, errCh, envoy := scenario.mgr, scenario.errCh, scenario.envoy @@ -1334,7 +1332,7 @@ func TestServer_DeltaAggregatedResources_v3_IngressEmptyResponse(t *testing.T) { // Allow all return acl.RootAuthorizer("manage"), nil } - scenario := newTestServerDeltaScenario(t, aclResolve, "ingress-gateway", "", 0, nil) + scenario := newTestServerDeltaScenario(t, aclResolve, "ingress-gateway", "", 0) mgr, errCh, envoy := scenario.mgr, scenario.errCh, scenario.envoy sid := structs.NewServiceID("ingress-gateway", nil) @@ -1389,12 +1387,13 @@ func TestServer_DeltaAggregatedResources_v3_IngressEmptyResponse(t *testing.T) { func TestServer_DeltaAggregatedResources_v3_CapacityReached(t *testing.T) { aclResolve := func(id string) (acl.Authorizer, error) { return acl.ManageAll(), nil } - scenario := newTestServerDeltaScenario(t, aclResolve, "web-sidecar-proxy", "", 0, capacityReachedLimiter{}) + scenario := newTestServerDeltaScenario(t, aclResolve, "web-sidecar-proxy", "", 0) mgr, errCh, envoy := scenario.mgr, scenario.errCh, scenario.envoy sid := structs.NewServiceID("web-sidecar-proxy", nil) mgr.RegisterProxy(t, sid) + mgr.DrainStreams(sid) snap := newTestSnapshot(t, nil, "") @@ -1420,10 +1419,8 @@ func (capacityReachedLimiter) BeginSession() (limiter.Session, error) { } func TestServer_DeltaAggregatedResources_v3_StreamDrained(t *testing.T) { - limiter := &testLimiter{} - aclResolve := func(id string) (acl.Authorizer, error) { return acl.ManageAll(), nil } - scenario := newTestServerDeltaScenario(t, aclResolve, "web-sidecar-proxy", "", 0, limiter) + scenario := newTestServerDeltaScenario(t, aclResolve, "web-sidecar-proxy", "", 0) mgr, errCh, envoy := scenario.mgr, scenario.errCh, scenario.envoy sid := structs.NewServiceID("web-sidecar-proxy", nil) @@ -1452,7 +1449,7 @@ func TestServer_DeltaAggregatedResources_v3_StreamDrained(t *testing.T) { }) testutil.RunStep(t, "terminate limiter session", func(t *testing.T) { - limiter.TerminateSession() + mgr.DrainStreams(sid) select { case err := <-errCh: @@ -1488,25 +1485,6 @@ func TestServer_DeltaAggregatedResources_v3_StreamDrained(t *testing.T) { }) } -type testLimiter struct { - termCh chan struct{} -} - -func (t *testLimiter) BeginSession() (limiter.Session, error) { - t.termCh = make(chan struct{}) - return &testSession{termCh: t.termCh}, nil -} - -func (t *testLimiter) TerminateSession() { close(t.termCh) } - -type testSession struct { - termCh chan struct{} -} - -func (t *testSession) Terminated() <-chan struct{} { return t.termCh } - -func (*testSession) End() {} - func assertDeltaChanBlocked(t *testing.T, ch chan *envoy_discovery_v3.DeltaDiscoveryResponse) { t.Helper() select { diff --git a/agent/xds/server.go b/agent/xds/server.go index c990be3b9..dc37dbb11 100644 --- a/agent/xds/server.go +++ b/agent/xds/server.go @@ -96,13 +96,7 @@ type ConfigFetcher interface { // ProxyConfigSource is the interface xds.Server requires to consume proxy // config updates. type ProxyConfigSource interface { - Watch(id structs.ServiceID, nodeName string, token string) (<-chan *proxycfg.ConfigSnapshot, proxycfg.CancelFunc, error) -} - -// SessionLimiter is the interface exposed by limiter.SessionLimiter. We depend -// on an interface rather than the concrete type so we can mock it in tests. -type SessionLimiter interface { - BeginSession() (limiter.Session, error) + Watch(id structs.ServiceID, nodeName string, token string) (<-chan *proxycfg.ConfigSnapshot, limiter.SessionTerminatedChan, proxycfg.CancelFunc, error) } // Server represents a gRPC server that can handle xDS requests from Envoy. All @@ -111,12 +105,11 @@ type SessionLimiter interface { // A full description of the XDS protocol can be found at // https://www.envoyproxy.io/docs/envoy/latest/api-docs/xds_protocol type Server struct { - NodeName string - Logger hclog.Logger - CfgSrc ProxyConfigSource - ResolveToken ACLResolverFunc - CfgFetcher ConfigFetcher - SessionLimiter SessionLimiter + NodeName string + Logger hclog.Logger + CfgSrc ProxyConfigSource + ResolveToken ACLResolverFunc + CfgFetcher ConfigFetcher // AuthCheckFrequency is how often we should re-check the credentials used // during a long-lived gRPC Stream after it has been initially established. @@ -168,7 +161,6 @@ func NewServer( cfgMgr ProxyConfigSource, resolveToken ACLResolverFunc, cfgFetcher ConfigFetcher, - limiter SessionLimiter, ) *Server { return &Server{ NodeName: nodeName, @@ -176,7 +168,6 @@ func NewServer( CfgSrc: cfgMgr, ResolveToken: resolveToken, CfgFetcher: cfgFetcher, - SessionLimiter: limiter, AuthCheckFrequency: DefaultAuthCheckFrequency, activeStreams: &activeStreamCounters{}, } diff --git a/agent/xds/xds_protocol_helpers_test.go b/agent/xds/xds_protocol_helpers_test.go index 3bd432c4e..b1094ae8d 100644 --- a/agent/xds/xds_protocol_helpers_test.go +++ b/agent/xds/xds_protocol_helpers_test.go @@ -66,14 +66,16 @@ func newTestSnapshot( // testing. It also implements ConnectAuthz to allow control over authorization. type testManager struct { sync.Mutex - chans map[structs.ServiceID]chan *proxycfg.ConfigSnapshot - cancels chan structs.ServiceID + stateChans map[structs.ServiceID]chan *proxycfg.ConfigSnapshot + drainChans map[structs.ServiceID]chan struct{} + cancels chan structs.ServiceID } func newTestManager(t *testing.T) *testManager { return &testManager{ - chans: map[structs.ServiceID]chan *proxycfg.ConfigSnapshot{}, - cancels: make(chan structs.ServiceID, 10), + stateChans: map[structs.ServiceID]chan *proxycfg.ConfigSnapshot{}, + drainChans: map[structs.ServiceID]chan struct{}{}, + cancels: make(chan structs.ServiceID, 10), } } @@ -81,7 +83,8 @@ func newTestManager(t *testing.T) *testManager { func (m *testManager) RegisterProxy(t *testing.T, proxyID structs.ServiceID) { m.Lock() defer m.Unlock() - m.chans[proxyID] = make(chan *proxycfg.ConfigSnapshot, 1) + m.stateChans[proxyID] = make(chan *proxycfg.ConfigSnapshot, 1) + m.drainChans[proxyID] = make(chan struct{}) } // Deliver simulates a proxy registration @@ -90,18 +93,42 @@ func (m *testManager) DeliverConfig(t *testing.T, proxyID structs.ServiceID, cfg m.Lock() defer m.Unlock() select { - case m.chans[proxyID] <- cfg: + case m.stateChans[proxyID] <- cfg: case <-time.After(10 * time.Millisecond): t.Fatalf("took too long to deliver config") } } -// Watch implements ConfigManager -func (m *testManager) Watch(proxyID structs.ServiceID, _ string, _ string) (<-chan *proxycfg.ConfigSnapshot, proxycfg.CancelFunc, error) { +// DrainStreams drains any open streams for the given proxyID. If there aren't +// any open streams, it'll create a marker so that future attempts to watch the +// given proxyID will return limiter.ErrCapacityReached. +func (m *testManager) DrainStreams(proxyID structs.ServiceID) { m.Lock() defer m.Unlock() + + ch, ok := m.drainChans[proxyID] + if !ok { + ch = make(chan struct{}) + m.drainChans[proxyID] = ch + } + close(ch) +} + +// Watch implements ConfigManager +func (m *testManager) Watch(proxyID structs.ServiceID, _ string, _ string) (<-chan *proxycfg.ConfigSnapshot, limiter.SessionTerminatedChan, proxycfg.CancelFunc, error) { + m.Lock() + defer m.Unlock() + + // If the drain chan has already been closed, return limiter.ErrCapacityReached. + drainCh := m.drainChans[proxyID] + select { + case <-drainCh: + return nil, nil, nil, limiter.ErrCapacityReached + default: + } + // ch might be nil but then it will just block forever - return m.chans[proxyID], func() { + return m.stateChans[proxyID], drainCh, func() { m.cancels <- proxyID }, nil } @@ -135,7 +162,6 @@ func newTestServerDeltaScenario( proxyID string, token string, authCheckFrequency time.Duration, - sessionLimiter SessionLimiter, ) *testServerScenario { mgr := newTestManager(t) envoy := NewTestEnvoy(t, proxyID, token) @@ -154,17 +180,12 @@ func newTestServerDeltaScenario( metrics.NewGlobal(cfg, sink) }) - if sessionLimiter == nil { - sessionLimiter = limiter.NewSessionLimiter() - } - s := NewServer( "node-123", testutil.Logger(t), mgr, resolveToken, nil, /*cfgFetcher ConfigFetcher*/ - sessionLimiter, ) if authCheckFrequency > 0 { s.AuthCheckFrequency = authCheckFrequency diff --git a/proto-public/pbdns/mock_DNSServiceClient.go b/proto-public/pbdns/mock_DNSServiceClient.go index a11f1e963..24906ab85 100644 --- a/proto-public/pbdns/mock_DNSServiceClient.go +++ b/proto-public/pbdns/mock_DNSServiceClient.go @@ -1,4 +1,4 @@ -// Code generated by mockery v2.12.2. DO NOT EDIT. +// Code generated by mockery v2.15.0. DO NOT EDIT. package pbdns @@ -8,8 +8,6 @@ import ( grpc "google.golang.org/grpc" mock "github.com/stretchr/testify/mock" - - testing "testing" ) // MockDNSServiceClient is an autogenerated mock type for the DNSServiceClient type @@ -47,8 +45,13 @@ func (_m *MockDNSServiceClient) Query(ctx context.Context, in *QueryRequest, opt return r0, r1 } -// NewMockDNSServiceClient creates a new instance of MockDNSServiceClient. It also registers the testing.TB interface on the mock and a cleanup function to assert the mocks expectations. -func NewMockDNSServiceClient(t testing.TB) *MockDNSServiceClient { +type mockConstructorTestingTNewMockDNSServiceClient interface { + mock.TestingT + Cleanup(func()) +} + +// NewMockDNSServiceClient creates a new instance of MockDNSServiceClient. It also registers a testing interface on the mock and a cleanup function to assert the mocks expectations. +func NewMockDNSServiceClient(t mockConstructorTestingTNewMockDNSServiceClient) *MockDNSServiceClient { mock := &MockDNSServiceClient{} mock.Mock.Test(t) diff --git a/proto-public/pbdns/mock_DNSServiceServer.go b/proto-public/pbdns/mock_DNSServiceServer.go index 97b98dddb..e9bd338da 100644 --- a/proto-public/pbdns/mock_DNSServiceServer.go +++ b/proto-public/pbdns/mock_DNSServiceServer.go @@ -1,10 +1,9 @@ -// Code generated by mockery v2.12.2. DO NOT EDIT. +// Code generated by mockery v2.15.0. DO NOT EDIT. package pbdns import ( context "context" - testing "testing" mock "github.com/stretchr/testify/mock" ) @@ -37,8 +36,13 @@ func (_m *MockDNSServiceServer) Query(_a0 context.Context, _a1 *QueryRequest) (* return r0, r1 } -// NewMockDNSServiceServer creates a new instance of MockDNSServiceServer. It also registers the testing.TB interface on the mock and a cleanup function to assert the mocks expectations. -func NewMockDNSServiceServer(t testing.TB) *MockDNSServiceServer { +type mockConstructorTestingTNewMockDNSServiceServer interface { + mock.TestingT + Cleanup(func()) +} + +// NewMockDNSServiceServer creates a new instance of MockDNSServiceServer. It also registers a testing interface on the mock and a cleanup function to assert the mocks expectations. +func NewMockDNSServiceServer(t mockConstructorTestingTNewMockDNSServiceServer) *MockDNSServiceServer { mock := &MockDNSServiceServer{} mock.Mock.Test(t) diff --git a/proto-public/pbdns/mock_UnsafeDNSServiceServer.go b/proto-public/pbdns/mock_UnsafeDNSServiceServer.go index a56e55bcb..0a6c47c2c 100644 --- a/proto-public/pbdns/mock_UnsafeDNSServiceServer.go +++ b/proto-public/pbdns/mock_UnsafeDNSServiceServer.go @@ -1,12 +1,8 @@ -// Code generated by mockery v2.12.2. DO NOT EDIT. +// Code generated by mockery v2.15.0. DO NOT EDIT. package pbdns -import ( - testing "testing" - - mock "github.com/stretchr/testify/mock" -) +import mock "github.com/stretchr/testify/mock" // MockUnsafeDNSServiceServer is an autogenerated mock type for the UnsafeDNSServiceServer type type MockUnsafeDNSServiceServer struct { @@ -18,8 +14,13 @@ func (_m *MockUnsafeDNSServiceServer) mustEmbedUnimplementedDNSServiceServer() { _m.Called() } -// NewMockUnsafeDNSServiceServer creates a new instance of MockUnsafeDNSServiceServer. It also registers the testing.TB interface on the mock and a cleanup function to assert the mocks expectations. -func NewMockUnsafeDNSServiceServer(t testing.TB) *MockUnsafeDNSServiceServer { +type mockConstructorTestingTNewMockUnsafeDNSServiceServer interface { + mock.TestingT + Cleanup(func()) +} + +// NewMockUnsafeDNSServiceServer creates a new instance of MockUnsafeDNSServiceServer. It also registers a testing interface on the mock and a cleanup function to assert the mocks expectations. +func NewMockUnsafeDNSServiceServer(t mockConstructorTestingTNewMockUnsafeDNSServiceServer) *MockUnsafeDNSServiceServer { mock := &MockUnsafeDNSServiceServer{} mock.Mock.Test(t)