xds: don't attempt to load-balance sessions for local proxies (#15789)

Previously, we'd begin a session with the xDS concurrency limiter
regardless of whether the proxy was registered in the catalog or in
the server's local agent state.

This caused problems for users who run `consul connect envoy` directly
against a server rather than a client agent, as the server's locally
registered proxies wouldn't be included in the limiter's capacity.

Now, the `ConfigSource` is responsible for beginning the session and we
only do so for services in the catalog.

Fixes: https://github.com/hashicorp/consul/issues/15753
This commit is contained in:
Dan Upton 2023-01-18 18:33:21 +00:00 committed by GitHub
parent e92392781e
commit 618deae657
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
63 changed files with 644 additions and 394 deletions

3
.changelog/15789.txt Normal file
View File

@ -0,0 +1,3 @@
```release-note:bug
xds: fix bug where sessions for locally-managed services could fail with "this server has too many xDS streams open"
```

View File

@ -8,7 +8,7 @@ SHELL = bash
# or the string @DEV to imply use what is currently installed locally.
###
GOLANGCI_LINT_VERSION='v1.50.1'
MOCKERY_VERSION='v2.12.2'
MOCKERY_VERSION='v2.15.0'
BUF_VERSION='v1.4.0'
PROTOC_GEN_GO_GRPC_VERSION="v1.2.0"
MOG_VERSION='v0.3.0'

View File

@ -855,6 +855,7 @@ func (a *Agent) listenAndServeGRPC() error {
Manager: a.proxyConfig,
GetStore: func() catalogproxycfg.Store { return server.FSM().State() },
Logger: a.proxyConfig.Logger.Named("server-catalog"),
SessionLimiter: a.baseDeps.XDSStreamLimiter,
})
go func() {
<-a.shutdownCh
@ -870,7 +871,6 @@ func (a *Agent) listenAndServeGRPC() error {
return a.delegate.ResolveTokenAndDefaultMeta(id, nil, nil)
},
a,
a.baseDeps.XDSStreamLimiter,
)
a.xdsServer.Register(a.externalGRPCServer)

View File

@ -3,10 +3,11 @@ package cachetype
import (
"testing"
"github.com/hashicorp/consul/agent/cache"
"github.com/hashicorp/consul/agent/structs"
"github.com/stretchr/testify/mock"
"github.com/stretchr/testify/require"
"github.com/hashicorp/consul/agent/cache"
"github.com/hashicorp/consul/agent/structs"
)
func TestCatalogDatacenters(t *testing.T) {
@ -18,34 +19,34 @@ func TestCatalogDatacenters(t *testing.T) {
var resp *[]string
var resp2 *[]string
var resp3 *[]string
rpc.On("RPC", "Catalog.ListDatacenters", mock.Anything, mock.Anything).Once().Return(nil).
rpc.On("RPC", mock.Anything, "Catalog.ListDatacenters", mock.Anything, mock.Anything).Once().Return(nil).
Run(func(args mock.Arguments) {
req := args.Get(1).(*structs.DatacentersRequest)
req := args.Get(2).(*structs.DatacentersRequest)
require.True(t, req.AllowStale)
reply := args.Get(2).(*[]string)
reply := args.Get(3).(*[]string)
*reply = []string{
"primary", "secondary", "tertiary",
}
resp = reply
})
rpc.On("RPC", "Catalog.ListDatacenters", mock.Anything, mock.Anything).Once().Return(nil).
rpc.On("RPC", mock.Anything, "Catalog.ListDatacenters", mock.Anything, mock.Anything).Once().Return(nil).
Run(func(args mock.Arguments) {
req := args.Get(1).(*structs.DatacentersRequest)
req := args.Get(2).(*structs.DatacentersRequest)
require.True(t, req.AllowStale)
reply := args.Get(2).(*[]string)
reply := args.Get(3).(*[]string)
*reply = []string{
"primary", "tertiary", "secondary",
}
resp2 = reply
})
rpc.On("RPC", "Catalog.ListDatacenters", mock.Anything, mock.Anything).Once().Return(nil).
rpc.On("RPC", mock.Anything, "Catalog.ListDatacenters", mock.Anything, mock.Anything).Once().Return(nil).
Run(func(args mock.Arguments) {
req := args.Get(1).(*structs.DatacentersRequest)
req := args.Get(2).(*structs.DatacentersRequest)
require.True(t, req.AllowStale)
reply := args.Get(2).(*[]string)
reply := args.Get(3).(*[]string)
*reply = []string{
"primary", "secondary",
}

View File

@ -19,14 +19,14 @@ func TestCatalogListServices(t *testing.T) {
// Expect the proper RPC call. This also sets the expected value
// since that is return-by-pointer in the arguments.
var resp *structs.IndexedServices
rpc.On("RPC", "Catalog.ListServices", mock.Anything, mock.Anything).Return(nil).
rpc.On("RPC", mock.Anything, "Catalog.ListServices", mock.Anything, mock.Anything).Return(nil).
Run(func(args mock.Arguments) {
req := args.Get(1).(*structs.DCSpecificRequest)
req := args.Get(2).(*structs.DCSpecificRequest)
require.Equal(t, uint64(24), req.QueryOptions.MinQueryIndex)
require.Equal(t, 1*time.Second, req.QueryOptions.MaxQueryTime)
require.True(t, req.AllowStale)
reply := args.Get(2).(*structs.IndexedServices)
reply := args.Get(3).(*structs.IndexedServices)
reply.Services = map[string][]string{
"foo": {"prod", "linux"},
"bar": {"qa", "windows"},
@ -75,14 +75,14 @@ func TestCatalogListServices_IntegrationWithCache_NotModifiedResponse(t *testing
"foo": {"prod", "linux"},
"bar": {"qa", "windows"},
}
rpc.On("RPC", "Catalog.ListServices", mock.Anything, mock.Anything).
rpc.On("RPC", mock.Anything, "Catalog.ListServices", mock.Anything, mock.Anything).
Return(nil).
Run(func(args mock.Arguments) {
req := args.Get(1).(*structs.DCSpecificRequest)
req := args.Get(2).(*structs.DCSpecificRequest)
require.True(t, req.AllowStale)
require.True(t, req.AllowNotModifiedResponse)
reply := args.Get(2).(*structs.IndexedServices)
reply := args.Get(3).(*structs.IndexedServices)
reply.QueryMeta.Index = 44
reply.NotModified = true
})

View File

@ -4,10 +4,11 @@ import (
"testing"
"time"
"github.com/hashicorp/consul/agent/cache"
"github.com/hashicorp/consul/agent/structs"
"github.com/stretchr/testify/mock"
"github.com/stretchr/testify/require"
"github.com/hashicorp/consul/agent/cache"
"github.com/hashicorp/consul/agent/structs"
)
func TestCatalogServiceList(t *testing.T) {
@ -17,14 +18,14 @@ func TestCatalogServiceList(t *testing.T) {
// Expect the proper RPC call. This also sets the expected value
// since that is return-by-pointer in the arguments.
var resp *structs.IndexedServiceList
rpc.On("RPC", "Catalog.ServiceList", mock.Anything, mock.Anything).Return(nil).
rpc.On("RPC", mock.Anything, "Catalog.ServiceList", mock.Anything, mock.Anything).Return(nil).
Run(func(args mock.Arguments) {
req := args.Get(1).(*structs.DCSpecificRequest)
req := args.Get(2).(*structs.DCSpecificRequest)
require.Equal(t, uint64(24), req.QueryOptions.MinQueryIndex)
require.Equal(t, 1*time.Second, req.QueryOptions.MaxQueryTime)
require.True(t, req.AllowStale)
reply := args.Get(2).(*structs.IndexedServiceList)
reply := args.Get(3).(*structs.IndexedServiceList)
reply.Services = structs.ServiceList{
structs.ServiceName{
Name: "foo",

View File

@ -4,10 +4,11 @@ import (
"testing"
"time"
"github.com/hashicorp/consul/agent/cache"
"github.com/hashicorp/consul/agent/structs"
"github.com/stretchr/testify/mock"
"github.com/stretchr/testify/require"
"github.com/hashicorp/consul/agent/cache"
"github.com/hashicorp/consul/agent/structs"
)
func TestCatalogServices(t *testing.T) {
@ -18,15 +19,15 @@ func TestCatalogServices(t *testing.T) {
// Expect the proper RPC call. This also sets the expected value
// since that is return-by-pointer in the arguments.
var resp *structs.IndexedServiceNodes
rpc.On("RPC", "Catalog.ServiceNodes", mock.Anything, mock.Anything).Return(nil).
rpc.On("RPC", mock.Anything, "Catalog.ServiceNodes", mock.Anything, mock.Anything).Return(nil).
Run(func(args mock.Arguments) {
req := args.Get(1).(*structs.ServiceSpecificRequest)
req := args.Get(2).(*structs.ServiceSpecificRequest)
require.Equal(t, uint64(24), req.QueryOptions.MinQueryIndex)
require.Equal(t, 1*time.Second, req.QueryOptions.MaxQueryTime)
require.Equal(t, "web", req.ServiceName)
require.True(t, req.AllowStale)
reply := args.Get(2).(*structs.IndexedServiceNodes)
reply := args.Get(3).(*structs.IndexedServiceNodes)
reply.ServiceNodes = []*structs.ServiceNode{
{ServiceTags: req.ServiceTags},
}

View File

@ -4,10 +4,11 @@ import (
"testing"
"time"
"github.com/hashicorp/consul/agent/cache"
"github.com/hashicorp/consul/agent/structs"
"github.com/stretchr/testify/mock"
"github.com/stretchr/testify/require"
"github.com/hashicorp/consul/agent/cache"
"github.com/hashicorp/consul/agent/structs"
)
func TestConfigEntries(t *testing.T) {
@ -17,16 +18,16 @@ func TestConfigEntries(t *testing.T) {
// Expect the proper RPC call. This also sets the expected value
// since that is return-by-pointer in the arguments.
var resp *structs.IndexedConfigEntries
rpc.On("RPC", "ConfigEntry.List", mock.Anything, mock.Anything).Return(nil).
rpc.On("RPC", mock.Anything, "ConfigEntry.List", mock.Anything, mock.Anything).Return(nil).
Run(func(args mock.Arguments) {
req := args.Get(1).(*structs.ConfigEntryQuery)
req := args.Get(2).(*structs.ConfigEntryQuery)
require.Equal(t, uint64(24), req.QueryOptions.MinQueryIndex)
require.Equal(t, 1*time.Second, req.QueryOptions.MaxQueryTime)
require.True(t, req.AllowStale)
require.Equal(t, structs.ServiceResolver, req.Kind)
require.Equal(t, "", req.Name)
reply := args.Get(2).(*structs.IndexedConfigEntries)
reply := args.Get(3).(*structs.IndexedConfigEntries)
reply.Kind = structs.ServiceResolver
reply.Entries = []structs.ConfigEntry{
&structs.ServiceResolverConfigEntry{Kind: structs.ServiceResolver, Name: "foo"},
@ -60,9 +61,9 @@ func TestConfigEntry(t *testing.T) {
// Expect the proper RPC call. This also sets the expected value
// since that is return-by-pointer in the arguments.
var resp *structs.ConfigEntryResponse
rpc.On("RPC", "ConfigEntry.Get", mock.Anything, mock.Anything).Return(nil).
rpc.On("RPC", mock.Anything, "ConfigEntry.Get", mock.Anything, mock.Anything).Return(nil).
Run(func(args mock.Arguments) {
req := args.Get(1).(*structs.ConfigEntryQuery)
req := args.Get(2).(*structs.ConfigEntryQuery)
require.Equal(t, uint64(24), req.QueryOptions.MinQueryIndex)
require.Equal(t, 1*time.Second, req.QueryOptions.MaxQueryTime)
require.True(t, req.AllowStale)
@ -73,7 +74,7 @@ func TestConfigEntry(t *testing.T) {
Name: "foo",
Kind: structs.ServiceResolver,
}
reply := args.Get(2).(*structs.ConfigEntryResponse)
reply := args.Get(3).(*structs.ConfigEntryResponse)
reply.Entry = entry
reply.QueryMeta.Index = 48
resp = reply

View File

@ -181,7 +181,7 @@ func TestConnectCALeaf_changingRoots(t *testing.T) {
var resp *structs.IssuedCert
var idx uint64
rpc.On("RPC", "ConnectCA.Sign", mock.Anything, mock.Anything).Return(nil).
rpc.On("RPC", mock.Anything, "ConnectCA.Sign", mock.Anything, mock.Anything).Return(nil).
Run(func(args mock.Arguments) {
ca := caRoot
cIdx := atomic.AddUint64(&idx, 1)
@ -189,7 +189,7 @@ func TestConnectCALeaf_changingRoots(t *testing.T) {
// Second time round use the new CA
ca = caRoot2
}
reply := args.Get(2).(*structs.IssuedCert)
reply := args.Get(3).(*structs.IssuedCert)
leaf, _ := connect.TestLeaf(t, "web", ca)
reply.CertPEM = leaf
reply.ValidAfter = time.Now().Add(-1 * time.Hour)
@ -292,9 +292,9 @@ func TestConnectCALeaf_changingRootsJitterBetweenCalls(t *testing.T) {
// Instrument ConnectCA.Sign to return signed cert
var resp *structs.IssuedCert
var idx uint64
rpc.On("RPC", "ConnectCA.Sign", mock.Anything, mock.Anything).Return(nil).
rpc.On("RPC", mock.Anything, "ConnectCA.Sign", mock.Anything, mock.Anything).Return(nil).
Run(func(args mock.Arguments) {
reply := args.Get(2).(*structs.IssuedCert)
reply := args.Get(3).(*structs.IssuedCert)
leaf, _ := connect.TestLeaf(t, "web", caRoot)
reply.CertPEM = leaf
reply.ValidAfter = time.Now().Add(-1 * time.Hour)
@ -433,9 +433,9 @@ func TestConnectCALeaf_changingRootsBetweenBlockingCalls(t *testing.T) {
// Instrument ConnectCA.Sign to return signed cert
var resp *structs.IssuedCert
var idx uint64
rpc.On("RPC", "ConnectCA.Sign", mock.Anything, mock.Anything).Return(nil).
rpc.On("RPC", mock.Anything, "ConnectCA.Sign", mock.Anything, mock.Anything).Return(nil).
Run(func(args mock.Arguments) {
reply := args.Get(2).(*structs.IssuedCert)
reply := args.Get(3).(*structs.IssuedCert)
leaf, _ := connect.TestLeaf(t, "web", caRoot)
reply.CertPEM = leaf
reply.ValidAfter = time.Now().Add(-1 * time.Hour)
@ -548,7 +548,7 @@ func TestConnectCALeaf_CSRRateLimiting(t *testing.T) {
var idx, rateLimitedRPCs uint64
genCert := func(args mock.Arguments) {
reply := args.Get(2).(*structs.IssuedCert)
reply := args.Get(3).(*structs.IssuedCert)
leaf, _ := connect.TestLeaf(t, "web", caRoot)
reply.CertPEM = leaf
reply.ValidAfter = time.Now().Add(-1 * time.Hour)
@ -565,16 +565,16 @@ func TestConnectCALeaf_CSRRateLimiting(t *testing.T) {
// First call return rate limit error. This is important as it checks
// behavior when cache is empty and we have to return a nil Value but need to
// save state to do the right thing for retry.
rpc.On("RPC", "ConnectCA.Sign", mock.Anything, mock.Anything).
rpc.On("RPC", mock.Anything, "ConnectCA.Sign", mock.Anything, mock.Anything).
Return(consul.ErrRateLimited).Once().Run(incRateLimit)
// Then succeed on second call
rpc.On("RPC", "ConnectCA.Sign", mock.Anything, mock.Anything).
rpc.On("RPC", mock.Anything, "ConnectCA.Sign", mock.Anything, mock.Anything).
Return(nil).Run(genCert).Once()
// Then be rate limited again on several further calls
rpc.On("RPC", "ConnectCA.Sign", mock.Anything, mock.Anything).
rpc.On("RPC", mock.Anything, "ConnectCA.Sign", mock.Anything, mock.Anything).
Return(consul.ErrRateLimited).Twice().Run(incRateLimit)
// Then fine after that
rpc.On("RPC", "ConnectCA.Sign", mock.Anything, mock.Anything).
rpc.On("RPC", mock.Anything, "ConnectCA.Sign", mock.Anything, mock.Anything).
Return(nil).Run(genCert)
opts := cache.FetchOptions{MinIndex: 0, Timeout: 10 * time.Minute}
@ -727,9 +727,9 @@ func TestConnectCALeaf_watchRootsDedupingMultipleCallers(t *testing.T) {
// Instrument ConnectCA.Sign to return signed cert
var idx uint64
rpc.On("RPC", "ConnectCA.Sign", mock.Anything, mock.Anything).Return(nil).
rpc.On("RPC", mock.Anything, "ConnectCA.Sign", mock.Anything, mock.Anything).Return(nil).
Run(func(args mock.Arguments) {
reply := args.Get(2).(*structs.IssuedCert)
reply := args.Get(3).(*structs.IssuedCert)
// Note we will sign certs for same service name each time because
// otherwise we have to re-invent whole CSR endpoint here to be able to
// control things - parse PEM sign with right key etc. It doesn't matter -
@ -924,9 +924,9 @@ func TestConnectCALeaf_expiringLeaf(t *testing.T) {
// Instrument ConnectCA.Sign to
var resp *structs.IssuedCert
var idx uint64
rpc.On("RPC", "ConnectCA.Sign", mock.Anything, mock.Anything).Return(nil).
rpc.On("RPC", mock.Anything, "ConnectCA.Sign", mock.Anything, mock.Anything).Return(nil).
Run(func(args mock.Arguments) {
reply := args.Get(2).(*structs.IssuedCert)
reply := args.Get(3).(*structs.IssuedCert)
reply.CreateIndex = atomic.AddUint64(&idx, 1)
reply.ModifyIndex = reply.CreateIndex
@ -1017,13 +1017,13 @@ func TestConnectCALeaf_DNSSANForService(t *testing.T) {
// Instrument ConnectCA.Sign to
var caReq *structs.CASignRequest
rpc.On("RPC", "ConnectCA.Sign", mock.Anything, mock.Anything).Return(nil).
rpc.On("RPC", mock.Anything, "ConnectCA.Sign", mock.Anything, mock.Anything).Return(nil).
Run(func(args mock.Arguments) {
reply := args.Get(2).(*structs.IssuedCert)
reply := args.Get(3).(*structs.IssuedCert)
leaf, _ := connect.TestLeaf(t, "web", caRoot)
reply.CertPEM = leaf
caReq = args.Get(1).(*structs.CASignRequest)
caReq = args.Get(2).(*structs.CASignRequest)
})
opts := cache.FetchOptions{MinIndex: 0, Timeout: 10 * time.Second}

View File

@ -4,10 +4,11 @@ import (
"testing"
"time"
"github.com/hashicorp/consul/agent/cache"
"github.com/hashicorp/consul/agent/structs"
"github.com/stretchr/testify/mock"
"github.com/stretchr/testify/require"
"github.com/hashicorp/consul/agent/cache"
"github.com/hashicorp/consul/agent/structs"
)
func TestConnectCARoot(t *testing.T) {
@ -18,13 +19,13 @@ func TestConnectCARoot(t *testing.T) {
// Expect the proper RPC call. This also sets the expected value
// since that is return-by-pointer in the arguments.
var resp *structs.IndexedCARoots
rpc.On("RPC", "ConnectCA.Roots", mock.Anything, mock.Anything).Return(nil).
rpc.On("RPC", mock.Anything, "ConnectCA.Roots", mock.Anything, mock.Anything).Return(nil).
Run(func(args mock.Arguments) {
req := args.Get(1).(*structs.DCSpecificRequest)
req := args.Get(2).(*structs.DCSpecificRequest)
require.Equal(t, uint64(24), req.QueryOptions.MinQueryIndex)
require.Equal(t, 1*time.Second, req.QueryOptions.MaxQueryTime)
reply := args.Get(2).(*structs.IndexedCARoots)
reply := args.Get(3).(*structs.IndexedCARoots)
reply.QueryMeta.Index = 48
resp = reply
})

View File

@ -4,11 +4,12 @@ import (
"testing"
"time"
"github.com/stretchr/testify/mock"
"github.com/stretchr/testify/require"
"github.com/hashicorp/consul/agent/cache"
"github.com/hashicorp/consul/agent/consul/discoverychain"
"github.com/hashicorp/consul/agent/structs"
"github.com/stretchr/testify/mock"
"github.com/stretchr/testify/require"
)
func TestCompiledDiscoveryChain(t *testing.T) {
@ -21,14 +22,14 @@ func TestCompiledDiscoveryChain(t *testing.T) {
// Expect the proper RPC call. This also sets the expected value
// since that is return-by-pointer in the arguments.
var resp *structs.DiscoveryChainResponse
rpc.On("RPC", "DiscoveryChain.Get", mock.Anything, mock.Anything).Return(nil).
rpc.On("RPC", mock.Anything, "DiscoveryChain.Get", mock.Anything, mock.Anything).Return(nil).
Run(func(args mock.Arguments) {
req := args.Get(1).(*structs.DiscoveryChainRequest)
req := args.Get(2).(*structs.DiscoveryChainRequest)
require.Equal(t, uint64(24), req.QueryOptions.MinQueryIndex)
require.Equal(t, 1*time.Second, req.QueryOptions.MaxQueryTime)
require.True(t, req.AllowStale)
reply := args.Get(2).(*structs.DiscoveryChainResponse)
reply := args.Get(3).(*structs.DiscoveryChainResponse)
reply.Chain = chain
reply.QueryMeta.Index = 48
resp = reply

View File

@ -18,14 +18,14 @@ func TestExportedPeeredServices(t *testing.T) {
// Expect the proper RPC call. This also sets the expected value
// since that is return-by-pointer in the arguments.
var resp *structs.IndexedExportedServiceList
rpc.On("RPC", "Internal.ExportedPeeredServices", mock.Anything, mock.Anything).Return(nil).
rpc.On("RPC", mock.Anything, "Internal.ExportedPeeredServices", mock.Anything, mock.Anything).Return(nil).
Run(func(args mock.Arguments) {
req := args.Get(1).(*structs.DCSpecificRequest)
req := args.Get(2).(*structs.DCSpecificRequest)
require.Equal(t, uint64(24), req.QueryOptions.MinQueryIndex)
require.Equal(t, 1*time.Second, req.QueryOptions.MaxQueryTime)
require.True(t, req.AllowStale)
reply := args.Get(2).(*structs.IndexedExportedServiceList)
reply := args.Get(3).(*structs.IndexedExportedServiceList)
reply.Services = map[string]structs.ServiceList{
"my-peer": {
structs.ServiceName{

View File

@ -4,11 +4,12 @@ import (
"testing"
"time"
"github.com/stretchr/testify/mock"
"github.com/stretchr/testify/require"
"github.com/hashicorp/consul/agent/cache"
"github.com/hashicorp/consul/agent/structs"
"github.com/hashicorp/consul/api"
"github.com/stretchr/testify/mock"
"github.com/stretchr/testify/require"
)
func TestFederationStateListMeshGateways(t *testing.T) {
@ -18,14 +19,14 @@ func TestFederationStateListMeshGateways(t *testing.T) {
// Expect the proper RPC call. This also sets the expected value
// since that is return-by-pointer in the arguments.
var resp *structs.DatacenterIndexedCheckServiceNodes
rpc.On("RPC", "FederationState.ListMeshGateways", mock.Anything, mock.Anything).Return(nil).
rpc.On("RPC", mock.Anything, "FederationState.ListMeshGateways", mock.Anything, mock.Anything).Return(nil).
Run(func(args mock.Arguments) {
req := args.Get(1).(*structs.DCSpecificRequest)
req := args.Get(2).(*structs.DCSpecificRequest)
require.Equal(t, uint64(24), req.QueryOptions.MinQueryIndex)
require.Equal(t, 1*time.Second, req.QueryOptions.MaxQueryTime)
require.True(t, req.AllowStale)
reply := args.Get(2).(*structs.DatacenterIndexedCheckServiceNodes)
reply := args.Get(3).(*structs.DatacenterIndexedCheckServiceNodes)
reply.DatacenterNodes = map[string]structs.CheckServiceNodes{
"dc9": []structs.CheckServiceNode{
{

View File

@ -4,10 +4,11 @@ import (
"testing"
"time"
"github.com/hashicorp/consul/agent/cache"
"github.com/hashicorp/consul/agent/structs"
"github.com/stretchr/testify/mock"
"github.com/stretchr/testify/require"
"github.com/hashicorp/consul/agent/cache"
"github.com/hashicorp/consul/agent/structs"
)
func TestGatewayServices(t *testing.T) {
@ -17,9 +18,9 @@ func TestGatewayServices(t *testing.T) {
// Expect the proper RPC call. This also sets the expected value
// since that is return-by-pointer in the arguments.
var resp *structs.IndexedGatewayServices
rpc.On("RPC", "Catalog.GatewayServices", mock.Anything, mock.Anything).Return(nil).
rpc.On("RPC", mock.Anything, "Catalog.GatewayServices", mock.Anything, mock.Anything).Return(nil).
Run(func(args mock.Arguments) {
req := args.Get(1).(*structs.ServiceSpecificRequest)
req := args.Get(2).(*structs.ServiceSpecificRequest)
require.Equal(t, uint64(24), req.QueryOptions.MinQueryIndex)
require.Equal(t, 1*time.Second, req.QueryOptions.MaxQueryTime)
require.True(t, req.AllowStale)
@ -37,7 +38,7 @@ func TestGatewayServices(t *testing.T) {
SNI: "my-domain",
},
}
reply := args.Get(2).(*structs.IndexedGatewayServices)
reply := args.Get(3).(*structs.IndexedGatewayServices)
reply.Services = services
reply.QueryMeta.Index = 48
resp = reply

View File

@ -4,10 +4,11 @@ import (
"testing"
"time"
"github.com/hashicorp/consul/agent/cache"
"github.com/hashicorp/consul/agent/structs"
"github.com/stretchr/testify/mock"
"github.com/stretchr/testify/require"
"github.com/hashicorp/consul/agent/cache"
"github.com/hashicorp/consul/agent/structs"
)
func TestHealthServices(t *testing.T) {
@ -18,15 +19,15 @@ func TestHealthServices(t *testing.T) {
// Expect the proper RPC call. This also sets the expected value
// since that is return-by-pointer in the arguments.
var resp *structs.IndexedCheckServiceNodes
rpc.On("RPC", "Health.ServiceNodes", mock.Anything, mock.Anything).Return(nil).
rpc.On("RPC", mock.Anything, "Health.ServiceNodes", mock.Anything, mock.Anything).Return(nil).
Run(func(args mock.Arguments) {
req := args.Get(1).(*structs.ServiceSpecificRequest)
req := args.Get(2).(*structs.ServiceSpecificRequest)
require.Equal(t, uint64(24), req.QueryOptions.MinQueryIndex)
require.Equal(t, 1*time.Second, req.QueryOptions.MaxQueryTime)
require.Equal(t, "web", req.ServiceName)
require.True(t, req.AllowStale)
reply := args.Get(2).(*structs.IndexedCheckServiceNodes)
reply := args.Get(3).(*structs.IndexedCheckServiceNodes)
reply.Nodes = []structs.CheckServiceNode{
{Service: &structs.NodeService{Tags: req.ServiceTags}},
}

View File

@ -4,10 +4,11 @@ import (
"testing"
"time"
"github.com/hashicorp/consul/agent/cache"
"github.com/hashicorp/consul/agent/structs"
"github.com/stretchr/testify/mock"
"github.com/stretchr/testify/require"
"github.com/hashicorp/consul/agent/cache"
"github.com/hashicorp/consul/agent/structs"
)
func TestIntentionMatch(t *testing.T) {
@ -18,13 +19,13 @@ func TestIntentionMatch(t *testing.T) {
// Expect the proper RPC call. This also sets the expected value
// since that is return-by-pointer in the arguments.
var resp *structs.IndexedIntentionMatches
rpc.On("RPC", "Intention.Match", mock.Anything, mock.Anything).Return(nil).
rpc.On("RPC", mock.Anything, "Intention.Match", mock.Anything, mock.Anything).Return(nil).
Run(func(args mock.Arguments) {
req := args.Get(1).(*structs.IntentionQueryRequest)
req := args.Get(2).(*structs.IntentionQueryRequest)
require.Equal(t, uint64(24), req.MinQueryIndex)
require.Equal(t, 1*time.Second, req.MaxQueryTime)
reply := args.Get(2).(*structs.IndexedIntentionMatches)
reply := args.Get(3).(*structs.IndexedIntentionMatches)
reply.Index = 48
resp = reply
})

View File

@ -4,10 +4,11 @@ import (
"testing"
"time"
"github.com/hashicorp/consul/agent/cache"
"github.com/hashicorp/consul/agent/structs"
"github.com/stretchr/testify/mock"
"github.com/stretchr/testify/require"
"github.com/hashicorp/consul/agent/cache"
"github.com/hashicorp/consul/agent/structs"
)
func TestIntentionUpstreamsDestination(t *testing.T) {
@ -17,9 +18,9 @@ func TestIntentionUpstreamsDestination(t *testing.T) {
// Expect the proper RPC call. This also sets the expected value
// since that is return-by-pointer in the arguments.
var resp *structs.IndexedServiceList
rpc.On("RPC", "Internal.IntentionUpstreamsDestination", mock.Anything, mock.Anything).Return(nil).
rpc.On("RPC", mock.Anything, "Internal.IntentionUpstreamsDestination", mock.Anything, mock.Anything).Return(nil).
Run(func(args mock.Arguments) {
req := args.Get(1).(*structs.ServiceSpecificRequest)
req := args.Get(2).(*structs.ServiceSpecificRequest)
require.Equal(t, uint64(24), req.QueryOptions.MinQueryIndex)
require.Equal(t, 1*time.Second, req.QueryOptions.MaxQueryTime)
require.True(t, req.AllowStale)
@ -28,7 +29,7 @@ func TestIntentionUpstreamsDestination(t *testing.T) {
services := structs.ServiceList{
{Name: "foo"},
}
reply := args.Get(2).(*structs.IndexedServiceList)
reply := args.Get(3).(*structs.IndexedServiceList)
reply.Services = services
reply.QueryMeta.Index = 48
resp = reply

View File

@ -4,10 +4,11 @@ import (
"testing"
"time"
"github.com/hashicorp/consul/agent/cache"
"github.com/hashicorp/consul/agent/structs"
"github.com/stretchr/testify/mock"
"github.com/stretchr/testify/require"
"github.com/hashicorp/consul/agent/cache"
"github.com/hashicorp/consul/agent/structs"
)
func TestIntentionUpstreams(t *testing.T) {
@ -17,9 +18,9 @@ func TestIntentionUpstreams(t *testing.T) {
// Expect the proper RPC call. This also sets the expected value
// since that is return-by-pointer in the arguments.
var resp *structs.IndexedServiceList
rpc.On("RPC", "Internal.IntentionUpstreams", mock.Anything, mock.Anything).Return(nil).
rpc.On("RPC", mock.Anything, "Internal.IntentionUpstreams", mock.Anything, mock.Anything).Return(nil).
Run(func(args mock.Arguments) {
req := args.Get(1).(*structs.ServiceSpecificRequest)
req := args.Get(2).(*structs.ServiceSpecificRequest)
require.Equal(t, uint64(24), req.QueryOptions.MinQueryIndex)
require.Equal(t, 1*time.Second, req.QueryOptions.MaxQueryTime)
require.True(t, req.AllowStale)
@ -28,7 +29,7 @@ func TestIntentionUpstreams(t *testing.T) {
services := structs.ServiceList{
{Name: "foo"},
}
reply := args.Get(2).(*structs.IndexedServiceList)
reply := args.Get(3).(*structs.IndexedServiceList)
reply.Services = services
reply.QueryMeta.Index = 48
resp = reply

View File

@ -1,4 +1,4 @@
// Code generated by mockery v2.14.0. DO NOT EDIT.
// Code generated by mockery v2.15.0. DO NOT EDIT.
package cachetype

View File

@ -1,10 +1,9 @@
// Code generated by mockery v2.12.2. DO NOT EDIT.
// Code generated by mockery v2.15.0. DO NOT EDIT.
package cachetype
import (
"context"
testing "testing"
context "context"
mock "github.com/stretchr/testify/mock"
)
@ -16,11 +15,11 @@ type MockRPC struct {
// RPC provides a mock function with given fields: ctx, method, args, reply
func (_m *MockRPC) RPC(ctx context.Context, method string, args interface{}, reply interface{}) error {
ret := _m.Called(method, args, reply)
ret := _m.Called(ctx, method, args, reply)
var r0 error
if rf, ok := ret.Get(0).(func(string, interface{}, interface{}) error); ok {
r0 = rf(method, args, reply)
if rf, ok := ret.Get(0).(func(context.Context, string, interface{}, interface{}) error); ok {
r0 = rf(ctx, method, args, reply)
} else {
r0 = ret.Error(0)
}
@ -28,8 +27,13 @@ func (_m *MockRPC) RPC(ctx context.Context, method string, args interface{}, rep
return r0
}
// NewMockRPC creates a new instance of MockRPC. It also registers the testing.TB interface on the mock and a cleanup function to assert the mocks expectations.
func NewMockRPC(t testing.TB) *MockRPC {
type mockConstructorTestingTNewMockRPC interface {
mock.TestingT
Cleanup(func())
}
// NewMockRPC creates a new instance of MockRPC. It also registers a testing interface on the mock and a cleanup function to assert the mocks expectations.
func NewMockRPC(t mockConstructorTestingTNewMockRPC) *MockRPC {
mock := &MockRPC{}
mock.Mock.Test(t)

View File

@ -1,4 +1,4 @@
// Code generated by mockery v2.12.2. DO NOT EDIT.
// Code generated by mockery v2.15.0. DO NOT EDIT.
package cachetype
@ -10,8 +10,6 @@ import (
mock "github.com/stretchr/testify/mock"
pbpeering "github.com/hashicorp/consul/proto/pbpeering"
testing "testing"
)
// MockTrustBundleLister is an autogenerated mock type for the TrustBundleLister type
@ -49,8 +47,13 @@ func (_m *MockTrustBundleLister) TrustBundleListByService(ctx context.Context, i
return r0, r1
}
// NewMockTrustBundleLister creates a new instance of MockTrustBundleLister. It also registers the testing.TB interface on the mock and a cleanup function to assert the mocks expectations.
func NewMockTrustBundleLister(t testing.TB) *MockTrustBundleLister {
type mockConstructorTestingTNewMockTrustBundleLister interface {
mock.TestingT
Cleanup(func())
}
// NewMockTrustBundleLister creates a new instance of MockTrustBundleLister. It also registers a testing interface on the mock and a cleanup function to assert the mocks expectations.
func NewMockTrustBundleLister(t mockConstructorTestingTNewMockTrustBundleLister) *MockTrustBundleLister {
mock := &MockTrustBundleLister{}
mock.Mock.Test(t)

View File

@ -1,4 +1,4 @@
// Code generated by mockery v2.12.2. DO NOT EDIT.
// Code generated by mockery v2.15.0. DO NOT EDIT.
package cachetype
@ -10,8 +10,6 @@ import (
mock "github.com/stretchr/testify/mock"
pbpeering "github.com/hashicorp/consul/proto/pbpeering"
testing "testing"
)
// MockTrustBundleReader is an autogenerated mock type for the TrustBundleReader type
@ -49,8 +47,13 @@ func (_m *MockTrustBundleReader) TrustBundleRead(ctx context.Context, in *pbpeer
return r0, r1
}
// NewMockTrustBundleReader creates a new instance of MockTrustBundleReader. It also registers the testing.TB interface on the mock and a cleanup function to assert the mocks expectations.
func NewMockTrustBundleReader(t testing.TB) *MockTrustBundleReader {
type mockConstructorTestingTNewMockTrustBundleReader interface {
mock.TestingT
Cleanup(func())
}
// NewMockTrustBundleReader creates a new instance of MockTrustBundleReader. It also registers a testing interface on the mock and a cleanup function to assert the mocks expectations.
func NewMockTrustBundleReader(t mockConstructorTestingTNewMockTrustBundleReader) *MockTrustBundleReader {
mock := &MockTrustBundleReader{}
mock.Mock.Test(t)

View File

@ -4,10 +4,11 @@ import (
"testing"
"time"
"github.com/hashicorp/consul/agent/cache"
"github.com/hashicorp/consul/agent/structs"
"github.com/stretchr/testify/mock"
"github.com/stretchr/testify/require"
"github.com/hashicorp/consul/agent/cache"
"github.com/hashicorp/consul/agent/structs"
)
func TestNodeServices(t *testing.T) {
@ -18,15 +19,15 @@ func TestNodeServices(t *testing.T) {
// Expect the proper RPC call. This also sets the expected value
// since that is return-by-pointer in the arguments.
var resp *structs.IndexedNodeServices
rpc.On("RPC", "Catalog.NodeServices", mock.Anything, mock.Anything).Return(nil).
rpc.On("RPC", mock.Anything, "Catalog.NodeServices", mock.Anything, mock.Anything).Return(nil).
Run(func(args mock.Arguments) {
req := args.Get(1).(*structs.NodeSpecificRequest)
req := args.Get(2).(*structs.NodeSpecificRequest)
require.Equal(t, uint64(24), req.QueryOptions.MinQueryIndex)
require.Equal(t, 1*time.Second, req.QueryOptions.MaxQueryTime)
require.Equal(t, "node-01", req.Node)
require.True(t, req.AllowStale)
reply := args.Get(2).(*structs.IndexedNodeServices)
reply := args.Get(3).(*structs.IndexedNodeServices)
reply.NodeServices = &structs.NodeServices{
Node: &structs.Node{
ID: "abcdef",

View File

@ -20,14 +20,14 @@ func TestPeeredUpstreams(t *testing.T) {
// Expect the proper RPC call. This also sets the expected value
// since that is return-by-pointer in the arguments.
var resp *structs.IndexedPeeredServiceList
rpc.On("RPC", "Internal.PeeredUpstreams", mock.Anything, mock.Anything).Return(nil).
rpc.On("RPC", mock.Anything, "Internal.PeeredUpstreams", mock.Anything, mock.Anything).Return(nil).
Run(func(args mock.Arguments) {
req := args.Get(1).(*structs.PartitionSpecificRequest)
req := args.Get(2).(*structs.PartitionSpecificRequest)
require.Equal(t, uint64(24), req.MinQueryIndex)
require.Equal(t, 1*time.Second, req.QueryOptions.MaxQueryTime)
require.True(t, req.AllowStale)
reply := args.Get(2).(*structs.IndexedPeeredServiceList)
reply := args.Get(3).(*structs.IndexedPeeredServiceList)
reply.Index = 48
resp = reply
})

View File

@ -3,10 +3,11 @@ package cachetype
import (
"testing"
"github.com/hashicorp/consul/agent/cache"
"github.com/hashicorp/consul/agent/structs"
"github.com/stretchr/testify/mock"
"github.com/stretchr/testify/require"
"github.com/hashicorp/consul/agent/cache"
"github.com/hashicorp/consul/agent/structs"
)
func TestPreparedQuery(t *testing.T) {
@ -17,14 +18,14 @@ func TestPreparedQuery(t *testing.T) {
// Expect the proper RPC call. This also sets the expected value
// since that is return-by-pointer in the arguments.
var resp *structs.PreparedQueryExecuteResponse
rpc.On("RPC", "PreparedQuery.Execute", mock.Anything, mock.Anything).Return(nil).
rpc.On("RPC", mock.Anything, "PreparedQuery.Execute", mock.Anything, mock.Anything).Return(nil).
Run(func(args mock.Arguments) {
req := args.Get(1).(*structs.PreparedQueryExecuteRequest)
req := args.Get(2).(*structs.PreparedQueryExecuteRequest)
require.Equal(t, "geo-db", req.QueryIDOrName)
require.Equal(t, 10, req.Limit)
require.True(t, req.AllowStale)
reply := args.Get(2).(*structs.PreparedQueryExecuteResponse)
reply := args.Get(3).(*structs.PreparedQueryExecuteResponse)
reply.QueryMeta.Index = 48
resp = reply
})

View File

@ -4,10 +4,11 @@ import (
"testing"
"time"
"github.com/hashicorp/consul/agent/cache"
"github.com/hashicorp/consul/agent/structs"
"github.com/stretchr/testify/mock"
"github.com/stretchr/testify/require"
"github.com/hashicorp/consul/agent/cache"
"github.com/hashicorp/consul/agent/structs"
)
func TestResolvedServiceConfig(t *testing.T) {
@ -18,15 +19,15 @@ func TestResolvedServiceConfig(t *testing.T) {
// Expect the proper RPC call. This also sets the expected value
// since that is return-by-pointer in the arguments.
var resp *structs.ServiceConfigResponse
rpc.On("RPC", "ConfigEntry.ResolveServiceConfig", mock.Anything, mock.Anything).Return(nil).
rpc.On("RPC", mock.Anything, "ConfigEntry.ResolveServiceConfig", mock.Anything, mock.Anything).Return(nil).
Run(func(args mock.Arguments) {
req := args.Get(1).(*structs.ServiceConfigRequest)
req := args.Get(2).(*structs.ServiceConfigRequest)
require.Equal(t, uint64(24), req.QueryOptions.MinQueryIndex)
require.Equal(t, 1*time.Second, req.QueryOptions.MaxQueryTime)
require.Equal(t, "foo", req.Name)
require.True(t, req.AllowStale)
reply := args.Get(2).(*structs.ServiceConfigResponse)
reply := args.Get(3).(*structs.ServiceConfigResponse)
reply.ProxyConfig = map[string]interface{}{
"protocol": "http",
}

View File

@ -4,10 +4,11 @@ import (
"testing"
"time"
"github.com/hashicorp/consul/agent/cache"
"github.com/hashicorp/consul/agent/structs"
"github.com/stretchr/testify/mock"
"github.com/stretchr/testify/require"
"github.com/hashicorp/consul/agent/cache"
"github.com/hashicorp/consul/agent/structs"
)
func TestInternalServiceDump(t *testing.T) {
@ -17,14 +18,14 @@ func TestInternalServiceDump(t *testing.T) {
// Expect the proper RPC call. This also sets the expected value
// since that is return-by-pointer in the arguments.
var resp *structs.IndexedNodesWithGateways
rpc.On("RPC", "Internal.ServiceDump", mock.Anything, mock.Anything).Return(nil).
rpc.On("RPC", mock.Anything, "Internal.ServiceDump", mock.Anything, mock.Anything).Return(nil).
Run(func(args mock.Arguments) {
req := args.Get(1).(*structs.ServiceDumpRequest)
req := args.Get(2).(*structs.ServiceDumpRequest)
require.Equal(t, uint64(24), req.QueryOptions.MinQueryIndex)
require.Equal(t, 1*time.Second, req.QueryOptions.MaxQueryTime)
require.True(t, req.AllowStale)
reply := args.Get(2).(*structs.IndexedNodesWithGateways)
reply := args.Get(3).(*structs.IndexedNodesWithGateways)
reply.Nodes = []structs.CheckServiceNode{
{Service: &structs.NodeService{Kind: req.ServiceKind, Service: "foo"}},
}

View File

@ -4,10 +4,11 @@ import (
"testing"
"time"
"github.com/hashicorp/consul/agent/cache"
"github.com/hashicorp/consul/agent/structs"
"github.com/stretchr/testify/mock"
"github.com/stretchr/testify/require"
"github.com/hashicorp/consul/agent/cache"
"github.com/hashicorp/consul/agent/structs"
)
func TestServiceGateways(t *testing.T) {
@ -17,9 +18,9 @@ func TestServiceGateways(t *testing.T) {
// Expect the proper RPC call. This also sets the expected value
// since that is return-by-pointer in the arguments.
var resp *structs.IndexedCheckServiceNodes
rpc.On("RPC", "Internal.ServiceGateways", mock.Anything, mock.Anything).Return(nil).
rpc.On("RPC", mock.Anything, "Internal.ServiceGateways", mock.Anything, mock.Anything).Return(nil).
Run(func(args mock.Arguments) {
req := args.Get(1).(*structs.ServiceSpecificRequest)
req := args.Get(2).(*structs.ServiceSpecificRequest)
require.Equal(t, uint64(24), req.QueryOptions.MinQueryIndex)
require.Equal(t, 1*time.Second, req.QueryOptions.MaxQueryTime)
require.True(t, req.AllowStale)
@ -33,7 +34,7 @@ func TestServiceGateways(t *testing.T) {
},
}
reply := args.Get(2).(*structs.IndexedCheckServiceNodes)
reply := args.Get(3).(*structs.IndexedCheckServiceNodes)
reply.Nodes = nodes
reply.QueryMeta.Index = 48
resp = reply

View File

@ -1,12 +1,8 @@
// Code generated by mockery v2.12.2. DO NOT EDIT.
// Code generated by mockery v2.15.0. DO NOT EDIT.
package cache
import (
testing "testing"
mock "github.com/stretchr/testify/mock"
)
import mock "github.com/stretchr/testify/mock"
// MockRequest is an autogenerated mock type for the Request type
type MockRequest struct {
@ -27,8 +23,13 @@ func (_m *MockRequest) CacheInfo() RequestInfo {
return r0
}
// NewMockRequest creates a new instance of MockRequest. It also registers the testing.TB interface on the mock and a cleanup function to assert the mocks expectations.
func NewMockRequest(t testing.TB) *MockRequest {
type mockConstructorTestingTNewMockRequest interface {
mock.TestingT
Cleanup(func())
}
// NewMockRequest creates a new instance of MockRequest. It also registers a testing interface on the mock and a cleanup function to assert the mocks expectations.
func NewMockRequest(t mockConstructorTestingTNewMockRequest) *MockRequest {
mock := &MockRequest{}
mock.Mock.Test(t)

View File

@ -1,12 +1,8 @@
// Code generated by mockery v2.12.2. DO NOT EDIT.
// Code generated by mockery v2.15.0. DO NOT EDIT.
package cache
import (
testing "testing"
mock "github.com/stretchr/testify/mock"
)
import mock "github.com/stretchr/testify/mock"
// MockType is an autogenerated mock type for the Type type
type MockType struct {
@ -48,8 +44,13 @@ func (_m *MockType) RegisterOptions() RegisterOptions {
return r0
}
// NewMockType creates a new instance of MockType. It also registers the testing.TB interface on the mock and a cleanup function to assert the mocks expectations.
func NewMockType(t testing.TB) *MockType {
type mockConstructorTestingTNewMockType interface {
mock.TestingT
Cleanup(func())
}
// NewMockType creates a new instance of MockType. It also registers a testing interface on the mock and a cleanup function to assert the mocks expectations.
func NewMockType(t mockConstructorTestingTNewMockType) *MockType {
mock := &MockType{}
mock.Mock.Test(t)

View File

@ -1,13 +1,11 @@
// Code generated by mockery v2.11.0. DO NOT EDIT.
// Code generated by mockery v2.15.0. DO NOT EDIT.
package ca
import (
testing "testing"
x509 "crypto/x509"
mock "github.com/stretchr/testify/mock"
x509 "crypto/x509"
)
// MockProvider is an autogenerated mock type for the Provider type
@ -155,13 +153,13 @@ func (_m *MockProvider) GenerateRoot() (RootResult, error) {
return r0, r1
}
// SetIntermediate provides a mock function with given fields: intermediatePEM, rootPEM
func (_m *MockProvider) SetIntermediate(intermediatePEM string, rootPEM string, keyId string) error {
ret := _m.Called(intermediatePEM, rootPEM, keyId)
// SetIntermediate provides a mock function with given fields: intermediatePEM, rootPEM, opaque
func (_m *MockProvider) SetIntermediate(intermediatePEM string, rootPEM string, opaque string) error {
ret := _m.Called(intermediatePEM, rootPEM, opaque)
var r0 error
if rf, ok := ret.Get(0).(func(string, string, string) error); ok {
r0 = rf(intermediatePEM, rootPEM, keyId)
r0 = rf(intermediatePEM, rootPEM, opaque)
} else {
r0 = ret.Error(0)
}
@ -255,9 +253,15 @@ func (_m *MockProvider) SupportsCrossSigning() (bool, error) {
return r0, r1
}
// NewMockProvider creates a new instance of MockProvider. It also registers a cleanup function to assert the mocks expectations.
func NewMockProvider(t testing.TB) *MockProvider {
type mockConstructorTestingTNewMockProvider interface {
mock.TestingT
Cleanup(func())
}
// NewMockProvider creates a new instance of MockProvider. It also registers a testing interface on the mock and a cleanup function to assert the mocks expectations.
func NewMockProvider(t mockConstructorTestingTNewMockProvider) *MockProvider {
mock := &MockProvider{}
mock.Mock.Test(t)
t.Cleanup(func() { mock.AssertExpectations(t) })

View File

@ -1,12 +1,8 @@
// Code generated by mockery v2.12.0. DO NOT EDIT.
// Code generated by mockery v2.15.0. DO NOT EDIT.
package auth
import (
testing "testing"
mock "github.com/stretchr/testify/mock"
)
import mock "github.com/stretchr/testify/mock"
// MockACLCache is an autogenerated mock type for the ACLCache type
type MockACLCache struct {
@ -18,8 +14,13 @@ func (_m *MockACLCache) RemoveIdentityWithSecretToken(secretToken string) {
_m.Called(secretToken)
}
// NewMockACLCache creates a new instance of MockACLCache. It also registers the testing.TB interface on the mock and a cleanup function to assert the mocks expectations.
func NewMockACLCache(t testing.TB) *MockACLCache {
type mockConstructorTestingTNewMockACLCache interface {
mock.TestingT
Cleanup(func())
}
// NewMockACLCache creates a new instance of MockACLCache. It also registers a testing interface on the mock and a cleanup function to assert the mocks expectations.
func NewMockACLCache(t mockConstructorTestingTNewMockACLCache) *MockACLCache {
mock := &MockACLCache{}
mock.Mock.Test(t)

View File

@ -1,10 +1,8 @@
// Code generated by mockery v2.12.2. DO NOT EDIT.
// Code generated by mockery v2.15.0. DO NOT EDIT.
package autopilotevents
import (
testing "testing"
stream "github.com/hashicorp/consul/agent/consul/stream"
mock "github.com/stretchr/testify/mock"
)
@ -19,8 +17,13 @@ func (_m *MockPublisher) Publish(_a0 []stream.Event) {
_m.Called(_a0)
}
// NewMockPublisher creates a new instance of MockPublisher. It also registers the testing.TB interface on the mock and a cleanup function to assert the mocks expectations.
func NewMockPublisher(t testing.TB) *MockPublisher {
type mockConstructorTestingTNewMockPublisher interface {
mock.TestingT
Cleanup(func())
}
// NewMockPublisher creates a new instance of MockPublisher. It also registers a testing interface on the mock and a cleanup function to assert the mocks expectations.
func NewMockPublisher(t mockConstructorTestingTNewMockPublisher) *MockPublisher {
mock := &MockPublisher{}
mock.Mock.Test(t)

View File

@ -1,4 +1,4 @@
// Code generated by mockery v2.12.2. DO NOT EDIT.
// Code generated by mockery v2.15.0. DO NOT EDIT.
package autopilotevents
@ -10,8 +10,6 @@ import (
structs "github.com/hashicorp/consul/agent/structs"
testing "testing"
types "github.com/hashicorp/consul/types"
)
@ -80,8 +78,13 @@ func (_m *MockStateStore) NodeService(ws memdb.WatchSet, nodeName string, servic
return r0, r1, r2
}
// NewMockStateStore creates a new instance of MockStateStore. It also registers the testing.TB interface on the mock and a cleanup function to assert the mocks expectations.
func NewMockStateStore(t testing.TB) *MockStateStore {
type mockConstructorTestingTNewMockStateStore interface {
mock.TestingT
Cleanup(func())
}
// NewMockStateStore creates a new instance of MockStateStore. It also registers a testing interface on the mock and a cleanup function to assert the mocks expectations.
func NewMockStateStore(t mockConstructorTestingTNewMockStateStore) *MockStateStore {
mock := &MockStateStore{}
mock.Mock.Test(t)

View File

@ -1,13 +1,11 @@
// Code generated by mockery v2.12.2. DO NOT EDIT.
// Code generated by mockery v2.15.0. DO NOT EDIT.
package autopilotevents
import (
testing "testing"
time "time"
mock "github.com/stretchr/testify/mock"
time "time"
)
// mockTimeProvider is an autogenerated mock type for the timeProvider type
@ -29,8 +27,13 @@ func (_m *mockTimeProvider) Now() time.Time {
return r0
}
// newMockTimeProvider creates a new instance of mockTimeProvider. It also registers the testing.TB interface on the mock and a cleanup function to assert the mocks expectations.
func newMockTimeProvider(t testing.TB) *mockTimeProvider {
type mockConstructorTestingTnewMockTimeProvider interface {
mock.TestingT
Cleanup(func())
}
// newMockTimeProvider creates a new instance of mockTimeProvider. It also registers a testing interface on the mock and a cleanup function to assert the mocks expectations.
func newMockTimeProvider(t mockConstructorTestingTnewMockTimeProvider) *mockTimeProvider {
mock := &mockTimeProvider{}
mock.Mock.Test(t)

View File

@ -105,7 +105,7 @@ type Operation struct {
Type OperationType
}
//go:generate mockery --name RequestLimitsHandler --inpackage --filename mock_RequestLimitsHandler_test.go
//go:generate mockery --name RequestLimitsHandler --inpackage
type RequestLimitsHandler interface {
Run(ctx context.Context)
Allow(op Operation) error

View File

@ -1,12 +1,8 @@
// Code generated by mockery v2.12.2. DO NOT EDIT.
// Code generated by mockery v2.15.0. DO NOT EDIT.
package watch
import (
testing "testing"
mock "github.com/stretchr/testify/mock"
)
import mock "github.com/stretchr/testify/mock"
// MockStateStore is an autogenerated mock type for the StateStore type
type MockStateStore struct {
@ -29,8 +25,13 @@ func (_m *MockStateStore) AbandonCh() <-chan struct{} {
return r0
}
// NewMockStateStore creates a new instance of MockStateStore. It also registers the testing.TB interface on the mock and a cleanup function to assert the mocks expectations.
func NewMockStateStore(t testing.TB) *MockStateStore {
type mockConstructorTestingTNewMockStateStore interface {
mock.TestingT
Cleanup(func())
}
// NewMockStateStore creates a new instance of MockStateStore. It also registers a testing interface on the mock and a cleanup function to assert the mocks expectations.
func NewMockStateStore(t mockConstructorTestingTNewMockStateStore) *MockStateStore {
mock := &MockStateStore{}
mock.Mock.Test(t)

View File

@ -213,6 +213,10 @@ func (l *SessionLimiter) deleteSessionWithID(id uint64) {
l.deleteSessionLocked(idx, id)
}
// SessionTerminatedChan is a channel that will be closed to notify session-
// holders that a session has been terminated.
type SessionTerminatedChan <-chan struct{}
// Session allows its holder to perform an operation (e.g. serve a gRPC stream)
// concurrenly with other session-holders. Sessions may be terminated abruptly
// by the SessionLimiter, so it is the responsibility of the holder to receive
@ -228,7 +232,7 @@ type Session interface {
//
// The session-holder MUST receive on it and exit (e.g. close the gRPC stream)
// when it is closed.
Terminated() <-chan struct{}
Terminated() SessionTerminatedChan
}
type session struct {
@ -240,6 +244,6 @@ type session struct {
func (s *session) End() { s.l.deleteSessionWithID(s.id) }
func (s *session) Terminated() <-chan struct{} { return s.termCh }
func (s *session) Terminated() SessionTerminatedChan { return s.termCh }
func (s *session) terminate() { close(s.termCh) }

View File

@ -1,4 +1,4 @@
// Code generated by mockery v2.12.0. DO NOT EDIT.
// Code generated by mockery v2.15.0. DO NOT EDIT.
package acl
@ -7,8 +7,6 @@ import (
mock "github.com/stretchr/testify/mock"
structs "github.com/hashicorp/consul/agent/structs"
testing "testing"
)
// MockLogin is an autogenerated mock type for the Login type
@ -39,8 +37,13 @@ func (_m *MockLogin) TokenForVerifiedIdentity(identity *authmethod.Identity, aut
return r0, r1
}
// NewMockLogin creates a new instance of MockLogin. It also registers the testing.TB interface on the mock and a cleanup function to assert the mocks expectations.
func NewMockLogin(t testing.TB) *MockLogin {
type mockConstructorTestingTNewMockLogin interface {
mock.TestingT
Cleanup(func())
}
// NewMockLogin creates a new instance of MockLogin. It also registers a testing interface on the mock and a cleanup function to assert the mocks expectations.
func NewMockLogin(t mockConstructorTestingTNewMockLogin) *MockLogin {
mock := &MockLogin{}
mock.Mock.Test(t)

View File

@ -1,12 +1,8 @@
// Code generated by mockery v2.12.0. DO NOT EDIT.
// Code generated by mockery v2.15.0. DO NOT EDIT.
package acl
import (
testing "testing"
mock "github.com/stretchr/testify/mock"
)
import mock "github.com/stretchr/testify/mock"
// MockTokenWriter is an autogenerated mock type for the TokenWriter type
type MockTokenWriter struct {
@ -27,8 +23,13 @@ func (_m *MockTokenWriter) Delete(secretID string, fromLogout bool) error {
return r0
}
// NewMockTokenWriter creates a new instance of MockTokenWriter. It also registers the testing.TB interface on the mock and a cleanup function to assert the mocks expectations.
func NewMockTokenWriter(t testing.TB) *MockTokenWriter {
type mockConstructorTestingTNewMockTokenWriter interface {
mock.TestingT
Cleanup(func())
}
// NewMockTokenWriter creates a new instance of MockTokenWriter. It also registers a testing interface on the mock and a cleanup function to assert the mocks expectations.
func NewMockTokenWriter(t mockConstructorTestingTNewMockTokenWriter) *MockTokenWriter {
mock := &MockTokenWriter{}
mock.Mock.Test(t)

View File

@ -1,4 +1,4 @@
// Code generated by mockery v2.12.0. DO NOT EDIT.
// Code generated by mockery v2.15.0. DO NOT EDIT.
package acl
@ -8,8 +8,6 @@ import (
authmethod "github.com/hashicorp/consul/agent/consul/authmethod"
mock "github.com/stretchr/testify/mock"
testing "testing"
)
// MockValidator is an autogenerated mock type for the Validator type
@ -40,8 +38,13 @@ func (_m *MockValidator) ValidateLogin(ctx context.Context, loginToken string) (
return r0, r1
}
// NewMockValidator creates a new instance of MockValidator. It also registers the testing.TB interface on the mock and a cleanup function to assert the mocks expectations.
func NewMockValidator(t testing.TB) *MockValidator {
type mockConstructorTestingTNewMockValidator interface {
mock.TestingT
Cleanup(func())
}
// NewMockValidator creates a new instance of MockValidator. It also registers a testing interface on the mock and a cleanup function to assert the mocks expectations.
func NewMockValidator(t mockConstructorTestingTNewMockValidator) *MockValidator {
mock := &MockValidator{}
mock.Mock.Test(t)

View File

@ -1,4 +1,4 @@
// Code generated by mockery v2.12.0. DO NOT EDIT.
// Code generated by mockery v2.15.0. DO NOT EDIT.
package connectca
@ -7,8 +7,6 @@ import (
mock "github.com/stretchr/testify/mock"
resolver "github.com/hashicorp/consul/acl/resolver"
testing "testing"
)
// MockACLResolver is an autogenerated mock type for the ACLResolver type
@ -37,8 +35,13 @@ func (_m *MockACLResolver) ResolveTokenAndDefaultMeta(token string, entMeta *acl
return r0, r1
}
// NewMockACLResolver creates a new instance of MockACLResolver. It also registers the testing.TB interface on the mock and a cleanup function to assert the mocks expectations.
func NewMockACLResolver(t testing.TB) *MockACLResolver {
type mockConstructorTestingTNewMockACLResolver interface {
mock.TestingT
Cleanup(func())
}
// NewMockACLResolver creates a new instance of MockACLResolver. It also registers a testing interface on the mock and a cleanup function to assert the mocks expectations.
func NewMockACLResolver(t mockConstructorTestingTNewMockACLResolver) *MockACLResolver {
mock := &MockACLResolver{}
mock.Mock.Test(t)

View File

@ -1,4 +1,4 @@
// Code generated by mockery v2.12.0. DO NOT EDIT.
// Code generated by mockery v2.15.0. DO NOT EDIT.
package connectca
@ -8,8 +8,6 @@ import (
structs "github.com/hashicorp/consul/agent/structs"
testing "testing"
x509 "crypto/x509"
)
@ -41,8 +39,13 @@ func (_m *MockCAManager) AuthorizeAndSignCertificate(csr *x509.CertificateReques
return r0, r1
}
// NewMockCAManager creates a new instance of MockCAManager. It also registers the testing.TB interface on the mock and a cleanup function to assert the mocks expectations.
func NewMockCAManager(t testing.TB) *MockCAManager {
type mockConstructorTestingTNewMockCAManager interface {
mock.TestingT
Cleanup(func())
}
// NewMockCAManager creates a new instance of MockCAManager. It also registers a testing interface on the mock and a cleanup function to assert the mocks expectations.
func NewMockCAManager(t mockConstructorTestingTNewMockCAManager) *MockCAManager {
mock := &MockCAManager{}
mock.Mock.Test(t)

View File

@ -1,4 +1,4 @@
// Code generated by mockery v2.12.0. DO NOT EDIT.
// Code generated by mockery v2.15.0. DO NOT EDIT.
package dataplane
@ -7,8 +7,6 @@ import (
mock "github.com/stretchr/testify/mock"
resolver "github.com/hashicorp/consul/acl/resolver"
testing "testing"
)
// MockACLResolver is an autogenerated mock type for the ACLResolver type
@ -37,8 +35,13 @@ func (_m *MockACLResolver) ResolveTokenAndDefaultMeta(_a0 string, _a1 *acl.Enter
return r0, r1
}
// NewMockACLResolver creates a new instance of MockACLResolver. It also registers the testing.TB interface on the mock and a cleanup function to assert the mocks expectations.
func NewMockACLResolver(t testing.TB) *MockACLResolver {
type mockConstructorTestingTNewMockACLResolver interface {
mock.TestingT
Cleanup(func())
}
// NewMockACLResolver creates a new instance of MockACLResolver. It also registers a testing interface on the mock and a cleanup function to assert the mocks expectations.
func NewMockACLResolver(t mockConstructorTestingTNewMockACLResolver) *MockACLResolver {
mock := &MockACLResolver{}
mock.Mock.Test(t)

View File

@ -1,4 +1,4 @@
// Code generated by mockery v2.12.2. DO NOT EDIT.
// Code generated by mockery v2.15.0. DO NOT EDIT.
package peerstream
@ -7,8 +7,6 @@ import (
mock "github.com/stretchr/testify/mock"
resolver "github.com/hashicorp/consul/acl/resolver"
testing "testing"
)
// MockACLResolver is an autogenerated mock type for the ACLResolver type
@ -37,8 +35,13 @@ func (_m *MockACLResolver) ResolveTokenAndDefaultMeta(_a0 string, _a1 *acl.Enter
return r0, r1
}
// NewMockACLResolver creates a new instance of MockACLResolver. It also registers the testing.TB interface on the mock and a cleanup function to assert the mocks expectations.
func NewMockACLResolver(t testing.TB) *MockACLResolver {
type mockConstructorTestingTNewMockACLResolver interface {
mock.TestingT
Cleanup(func())
}
// NewMockACLResolver creates a new instance of MockACLResolver. It also registers a testing interface on the mock and a cleanup function to assert the mocks expectations.
func NewMockACLResolver(t mockConstructorTestingTNewMockACLResolver) *MockACLResolver {
mock := &MockACLResolver{}
mock.Mock.Test(t)

View File

@ -1,4 +1,4 @@
// Code generated by mockery v2.12.0. DO NOT EDIT.
// Code generated by mockery v2.15.0. DO NOT EDIT.
package serverdiscovery
@ -7,8 +7,6 @@ import (
mock "github.com/stretchr/testify/mock"
resolver "github.com/hashicorp/consul/acl/resolver"
testing "testing"
)
// MockACLResolver is an autogenerated mock type for the ACLResolver type
@ -37,8 +35,13 @@ func (_m *MockACLResolver) ResolveTokenAndDefaultMeta(_a0 string, _a1 *acl.Enter
return r0, r1
}
// NewMockACLResolver creates a new instance of MockACLResolver. It also registers the testing.TB interface on the mock and a cleanup function to assert the mocks expectations.
func NewMockACLResolver(t testing.TB) *MockACLResolver {
type mockConstructorTestingTNewMockACLResolver interface {
mock.TestingT
Cleanup(func())
}
// NewMockACLResolver creates a new instance of MockACLResolver. It also registers a testing interface on the mock and a cleanup function to assert the mocks expectations.
func NewMockACLResolver(t mockConstructorTestingTNewMockACLResolver) *MockACLResolver {
mock := &MockACLResolver{}
mock.Mock.Test(t)

View File

@ -1,4 +1,4 @@
// Code generated by mockery v2.13.1. DO NOT EDIT.
// Code generated by mockery v2.15.0. DO NOT EDIT.
package hcp

View File

@ -1,4 +1,4 @@
// Code generated by mockery v2.14.0. DO NOT EDIT.
// Code generated by mockery v2.15.0. DO NOT EDIT.
package scada

View File

@ -10,6 +10,7 @@ import (
"github.com/hashicorp/consul/acl"
"github.com/hashicorp/consul/agent/configentry"
"github.com/hashicorp/consul/agent/grpc-external/limiter"
"github.com/hashicorp/consul/agent/local"
"github.com/hashicorp/consul/agent/proxycfg"
"github.com/hashicorp/consul/agent/structs"
@ -44,13 +45,24 @@ func NewConfigSource(cfg Config) *ConfigSource {
// Watch wraps the underlying proxycfg.Manager and dynamically registers
// services from the catalog with it when requested by the xDS server.
func (m *ConfigSource) Watch(serviceID structs.ServiceID, nodeName string, token string) (<-chan *proxycfg.ConfigSnapshot, proxycfg.CancelFunc, error) {
func (m *ConfigSource) Watch(serviceID structs.ServiceID, nodeName string, token string) (<-chan *proxycfg.ConfigSnapshot, limiter.SessionTerminatedChan, proxycfg.CancelFunc, error) {
// If the service is registered to the local agent, use the LocalConfigSource
// rather than trying to configure it from the catalog.
if nodeName == m.NodeName && m.LocalState.ServiceExists(serviceID) {
return m.LocalConfigSource.Watch(serviceID, nodeName, token)
}
// Begin a session with the xDS session concurrency limiter.
//
// We do this here rather than in the xDS server because we don't want to apply
// the limit to services from the LocalConfigSource.
//
// See: https://github.com/hashicorp/consul/issues/15753
session, err := m.SessionLimiter.BeginSession()
if err != nil {
return nil, nil, nil, err
}
proxyID := proxycfg.ProxyID{
ServiceID: serviceID,
NodeName: nodeName,
@ -66,6 +78,7 @@ func (m *ConfigSource) Watch(serviceID structs.ServiceID, nodeName string, token
cancelOnce.Do(func() {
cancelWatch()
m.cleanup(proxyID)
session.End()
})
}
@ -82,11 +95,12 @@ func (m *ConfigSource) Watch(serviceID structs.ServiceID, nodeName string, token
if err := m.startSync(w.closeCh, proxyID); err != nil {
delete(m.watches, proxyID)
cancelWatch()
return nil, nil, err
session.End()
return nil, nil, nil, err
}
}
return snapCh, cancel, nil
return snapCh, session.Terminated(), cancel, nil
}
func (m *ConfigSource) Shutdown() {
@ -252,6 +266,9 @@ type Config struct {
// Logger will be used to write log messages.
Logger hclog.Logger
// SessionLimiter is used to enforce xDS concurrency limits.
SessionLimiter SessionLimiter
}
//go:generate mockery --name ConfigManager --inpackage
@ -269,5 +286,10 @@ type Store interface {
//go:generate mockery --name Watcher --inpackage
type Watcher interface {
Watch(proxyID structs.ServiceID, nodeName string, token string) (<-chan *proxycfg.ConfigSnapshot, proxycfg.CancelFunc, error)
Watch(proxyID structs.ServiceID, nodeName string, token string) (<-chan *proxycfg.ConfigSnapshot, limiter.SessionTerminatedChan, proxycfg.CancelFunc, error)
}
//go:generate mockery --name SessionLimiter --inpackage
type SessionLimiter interface {
BeginSession() (limiter.Session, error)
}

View File

@ -11,6 +11,7 @@ import (
"github.com/hashicorp/consul/agent/consul/state"
"github.com/hashicorp/consul/agent/consul/stream"
"github.com/hashicorp/consul/agent/grpc-external/limiter"
"github.com/hashicorp/consul/agent/local"
"github.com/hashicorp/consul/agent/proxycfg"
"github.com/hashicorp/consul/agent/structs"
@ -47,16 +48,33 @@ func TestConfigSource_Success(t *testing.T) {
// behavior without sleeping which leads to slow/racy tests.
cfgMgr := testConfigManager(t, serviceID, nodeName, token)
lim := NewMockSessionLimiter(t)
session1 := newMockSession(t)
session1TermCh := make(limiter.SessionTerminatedChan)
session1.On("Terminated").Return(session1TermCh)
session1.On("End").Return()
session2 := newMockSession(t)
session2TermCh := make(limiter.SessionTerminatedChan)
session2.On("Terminated").Return(session2TermCh)
session2.On("End").Return()
lim.On("BeginSession").Return(session1, nil).Once()
lim.On("BeginSession").Return(session2, nil).Once()
mgr := NewConfigSource(Config{
Manager: cfgMgr,
LocalState: testLocalState(t),
Logger: hclog.NewNullLogger(),
GetStore: func() Store { return store },
Manager: cfgMgr,
LocalState: testLocalState(t),
Logger: hclog.NewNullLogger(),
GetStore: func() Store { return store },
SessionLimiter: lim,
})
t.Cleanup(mgr.Shutdown)
snapCh, cancelWatch1, err := mgr.Watch(serviceID, nodeName, token)
snapCh, termCh, cancelWatch1, err := mgr.Watch(serviceID, nodeName, token)
require.NoError(t, err)
require.Equal(t, session1TermCh, termCh)
// Expect Register to have been called with the proxy's inital port.
select {
@ -111,8 +129,9 @@ func TestConfigSource_Success(t *testing.T) {
}
// Start another watch.
_, cancelWatch2, err := mgr.Watch(serviceID, nodeName, token)
_, termCh2, cancelWatch2, err := mgr.Watch(serviceID, nodeName, token)
require.NoError(t, err)
require.Equal(t, session2TermCh, termCh2)
// Expect the service to have not been re-registered by the second watch.
select {
@ -137,6 +156,9 @@ func TestConfigSource_Success(t *testing.T) {
case <-time.After(100 * time.Millisecond):
t.Fatal("timeout waiting for service to be de-registered")
}
session1.AssertCalled(t, "End")
session2.AssertCalled(t, "End")
}
func TestConfigSource_LocallyManagedService(t *testing.T) {
@ -149,7 +171,7 @@ func TestConfigSource_LocallyManagedService(t *testing.T) {
localWatcher := NewMockWatcher(t)
localWatcher.On("Watch", serviceID, nodeName, token).
Return(make(<-chan *proxycfg.ConfigSnapshot), proxycfg.CancelFunc(func() {}), nil)
Return(make(<-chan *proxycfg.ConfigSnapshot), nil, proxycfg.CancelFunc(func() {}), nil)
mgr := NewConfigSource(Config{
NodeName: nodeName,
@ -157,10 +179,11 @@ func TestConfigSource_LocallyManagedService(t *testing.T) {
LocalConfigSource: localWatcher,
Logger: hclog.NewNullLogger(),
GetStore: func() Store { panic("state store shouldn't have been used") },
SessionLimiter: nullSessionLimiter{},
})
t.Cleanup(mgr.Shutdown)
_, _, err := mgr.Watch(serviceID, nodeName, token)
_, _, _, err := mgr.Watch(serviceID, nodeName, token)
require.NoError(t, err)
}
@ -192,17 +215,26 @@ func TestConfigSource_ErrorRegisteringService(t *testing.T) {
cfgMgr.On("Register", mock.Anything, mock.Anything, mock.Anything, mock.Anything, mock.Anything).
Return(errors.New("KABOOM"))
session := newMockSession(t)
session.On("End").Return()
lim := NewMockSessionLimiter(t)
lim.On("BeginSession").Return(session, nil)
mgr := NewConfigSource(Config{
Manager: cfgMgr,
LocalState: testLocalState(t),
Logger: hclog.NewNullLogger(),
GetStore: func() Store { return store },
Manager: cfgMgr,
LocalState: testLocalState(t),
Logger: hclog.NewNullLogger(),
GetStore: func() Store { return store },
SessionLimiter: lim,
})
t.Cleanup(mgr.Shutdown)
_, _, err := mgr.Watch(serviceID, nodeName, token)
_, _, _, err := mgr.Watch(serviceID, nodeName, token)
require.Error(t, err)
require.True(t, canceledWatch, "watch should've been canceled")
session.AssertCalled(t, "End")
}
func TestConfigSource_NotProxyService(t *testing.T) {
@ -231,19 +263,38 @@ func TestConfigSource_NotProxyService(t *testing.T) {
Return(make(<-chan *proxycfg.ConfigSnapshot), cancel)
mgr := NewConfigSource(Config{
Manager: cfgMgr,
LocalState: testLocalState(t),
Logger: hclog.NewNullLogger(),
GetStore: func() Store { return store },
Manager: cfgMgr,
LocalState: testLocalState(t),
Logger: hclog.NewNullLogger(),
GetStore: func() Store { return store },
SessionLimiter: nullSessionLimiter{},
})
t.Cleanup(mgr.Shutdown)
_, _, err := mgr.Watch(serviceID, nodeName, token)
_, _, _, err := mgr.Watch(serviceID, nodeName, token)
require.Error(t, err)
require.Contains(t, err.Error(), "must be a sidecar proxy or gateway")
require.True(t, canceledWatch, "watch should've been canceled")
}
func TestConfigSource_SessionLimiterError(t *testing.T) {
lim := NewMockSessionLimiter(t)
lim.On("BeginSession").Return(nil, limiter.ErrCapacityReached)
src := NewConfigSource(Config{
LocalState: testLocalState(t),
SessionLimiter: lim,
})
t.Cleanup(src.Shutdown)
_, _, _, err := src.Watch(
structs.NewServiceID("web-sidecar-proxy-1", nil),
"node-name",
"token",
)
require.Equal(t, limiter.ErrCapacityReached, err)
}
func testConfigManager(t *testing.T, serviceID structs.ServiceID, nodeName string, token string) ConfigManager {
t.Helper()
@ -294,3 +345,34 @@ func testLocalState(t *testing.T) *local.State {
l.TriggerSyncChanges = func() {}
return l
}
type nullSessionLimiter struct{}
func (nullSessionLimiter) BeginSession() (limiter.Session, error) {
return nullSession{}, nil
}
type nullSession struct{}
func (nullSession) End() {}
func (nullSession) Terminated() limiter.SessionTerminatedChan { return nil }
type mockSession struct {
mock.Mock
}
func newMockSession(t *testing.T) *mockSession {
m := &mockSession{}
m.Mock.Test(t)
t.Cleanup(func() { m.AssertExpectations(t) })
return m
}
func (m *mockSession) End() { m.Called() }
func (m *mockSession) Terminated() limiter.SessionTerminatedChan {
return m.Called().Get(0).(limiter.SessionTerminatedChan)
}

View File

@ -1,4 +1,4 @@
// Code generated by mockery v2.12.0. DO NOT EDIT.
// Code generated by mockery v2.15.0. DO NOT EDIT.
package catalog
@ -7,8 +7,6 @@ import (
mock "github.com/stretchr/testify/mock"
structs "github.com/hashicorp/consul/agent/structs"
testing "testing"
)
// MockConfigManager is an autogenerated mock type for the ConfigManager type
@ -60,8 +58,13 @@ func (_m *MockConfigManager) Watch(req proxycfg.ProxyID) (<-chan *proxycfg.Confi
return r0, r1
}
// NewMockConfigManager creates a new instance of MockConfigManager. It also registers the testing.TB interface on the mock and a cleanup function to assert the mocks expectations.
func NewMockConfigManager(t testing.TB) *MockConfigManager {
type mockConstructorTestingTNewMockConfigManager interface {
mock.TestingT
Cleanup(func())
}
// NewMockConfigManager creates a new instance of MockConfigManager. It also registers a testing interface on the mock and a cleanup function to assert the mocks expectations.
func NewMockConfigManager(t mockConstructorTestingTNewMockConfigManager) *MockConfigManager {
mock := &MockConfigManager{}
mock.Mock.Test(t)

View File

@ -0,0 +1,51 @@
// Code generated by mockery v2.15.0. DO NOT EDIT.
package catalog
import (
limiter "github.com/hashicorp/consul/agent/grpc-external/limiter"
mock "github.com/stretchr/testify/mock"
)
// MockSessionLimiter is an autogenerated mock type for the SessionLimiter type
type MockSessionLimiter struct {
mock.Mock
}
// BeginSession provides a mock function with given fields:
func (_m *MockSessionLimiter) BeginSession() (limiter.Session, error) {
ret := _m.Called()
var r0 limiter.Session
if rf, ok := ret.Get(0).(func() limiter.Session); ok {
r0 = rf()
} else {
if ret.Get(0) != nil {
r0 = ret.Get(0).(limiter.Session)
}
}
var r1 error
if rf, ok := ret.Get(1).(func() error); ok {
r1 = rf()
} else {
r1 = ret.Error(1)
}
return r0, r1
}
type mockConstructorTestingTNewMockSessionLimiter interface {
mock.TestingT
Cleanup(func())
}
// NewMockSessionLimiter creates a new instance of MockSessionLimiter. It also registers a testing interface on the mock and a cleanup function to assert the mocks expectations.
func NewMockSessionLimiter(t mockConstructorTestingTNewMockSessionLimiter) *MockSessionLimiter {
mock := &MockSessionLimiter{}
mock.Mock.Test(t)
t.Cleanup(func() { mock.AssertExpectations(t) })
return mock
}

View File

@ -1,14 +1,14 @@
// Code generated by mockery v2.12.0. DO NOT EDIT.
// Code generated by mockery v2.15.0. DO NOT EDIT.
package catalog
import (
proxycfg "github.com/hashicorp/consul/agent/proxycfg"
limiter "github.com/hashicorp/consul/agent/grpc-external/limiter"
mock "github.com/stretchr/testify/mock"
structs "github.com/hashicorp/consul/agent/structs"
proxycfg "github.com/hashicorp/consul/agent/proxycfg"
testing "testing"
structs "github.com/hashicorp/consul/agent/structs"
)
// MockWatcher is an autogenerated mock type for the Watcher type
@ -17,7 +17,7 @@ type MockWatcher struct {
}
// Watch provides a mock function with given fields: proxyID, nodeName, token
func (_m *MockWatcher) Watch(proxyID structs.ServiceID, nodeName string, token string) (<-chan *proxycfg.ConfigSnapshot, proxycfg.CancelFunc, error) {
func (_m *MockWatcher) Watch(proxyID structs.ServiceID, nodeName string, token string) (<-chan *proxycfg.ConfigSnapshot, limiter.SessionTerminatedChan, proxycfg.CancelFunc, error) {
ret := _m.Called(proxyID, nodeName, token)
var r0 <-chan *proxycfg.ConfigSnapshot
@ -29,27 +29,41 @@ func (_m *MockWatcher) Watch(proxyID structs.ServiceID, nodeName string, token s
}
}
var r1 proxycfg.CancelFunc
if rf, ok := ret.Get(1).(func(structs.ServiceID, string, string) proxycfg.CancelFunc); ok {
var r1 limiter.SessionTerminatedChan
if rf, ok := ret.Get(1).(func(structs.ServiceID, string, string) limiter.SessionTerminatedChan); ok {
r1 = rf(proxyID, nodeName, token)
} else {
if ret.Get(1) != nil {
r1 = ret.Get(1).(proxycfg.CancelFunc)
r1 = ret.Get(1).(limiter.SessionTerminatedChan)
}
}
var r2 error
if rf, ok := ret.Get(2).(func(structs.ServiceID, string, string) error); ok {
var r2 proxycfg.CancelFunc
if rf, ok := ret.Get(2).(func(structs.ServiceID, string, string) proxycfg.CancelFunc); ok {
r2 = rf(proxyID, nodeName, token)
} else {
r2 = ret.Error(2)
if ret.Get(2) != nil {
r2 = ret.Get(2).(proxycfg.CancelFunc)
}
}
return r0, r1, r2
var r3 error
if rf, ok := ret.Get(3).(func(structs.ServiceID, string, string) error); ok {
r3 = rf(proxyID, nodeName, token)
} else {
r3 = ret.Error(3)
}
return r0, r1, r2, r3
}
// NewMockWatcher creates a new instance of MockWatcher. It also registers the testing.TB interface on the mock and a cleanup function to assert the mocks expectations.
func NewMockWatcher(t testing.TB) *MockWatcher {
type mockConstructorTestingTNewMockWatcher interface {
mock.TestingT
Cleanup(func())
}
// NewMockWatcher creates a new instance of MockWatcher. It also registers a testing interface on the mock and a cleanup function to assert the mocks expectations.
func NewMockWatcher(t mockConstructorTestingTNewMockWatcher) *MockWatcher {
mock := &MockWatcher{}
mock.Mock.Test(t)

View File

@ -1,6 +1,7 @@
package local
import (
"github.com/hashicorp/consul/agent/grpc-external/limiter"
"github.com/hashicorp/consul/agent/proxycfg"
structs "github.com/hashicorp/consul/agent/structs"
)
@ -16,7 +17,7 @@ func NewConfigSource(cfgMgr ConfigManager) *ConfigSource {
return &ConfigSource{cfgMgr}
}
func (m *ConfigSource) Watch(serviceID structs.ServiceID, nodeName string, _ string) (<-chan *proxycfg.ConfigSnapshot, proxycfg.CancelFunc, error) {
func (m *ConfigSource) Watch(serviceID structs.ServiceID, nodeName string, _ string) (<-chan *proxycfg.ConfigSnapshot, limiter.SessionTerminatedChan, proxycfg.CancelFunc, error) {
watchCh, cancelWatch := m.manager.Watch(proxycfg.ProxyID{
ServiceID: serviceID,
NodeName: nodeName,
@ -27,5 +28,5 @@ func (m *ConfigSource) Watch(serviceID structs.ServiceID, nodeName string, _ str
// is checked before the watch is created).
Token: "",
})
return watchCh, cancelWatch, nil
return watchCh, nil, cancelWatch, nil
}

View File

@ -1,4 +1,4 @@
// Code generated by mockery v2.12.0. DO NOT EDIT.
// Code generated by mockery v2.15.0. DO NOT EDIT.
package local
@ -7,8 +7,6 @@ import (
mock "github.com/stretchr/testify/mock"
structs "github.com/hashicorp/consul/agent/structs"
testing "testing"
)
// MockConfigManager is an autogenerated mock type for the ConfigManager type
@ -76,8 +74,13 @@ func (_m *MockConfigManager) Watch(id proxycfg.ProxyID) (<-chan *proxycfg.Config
return r0, r1
}
// NewMockConfigManager creates a new instance of MockConfigManager. It also registers the testing.TB interface on the mock and a cleanup function to assert the mocks expectations.
func NewMockConfigManager(t testing.TB) *MockConfigManager {
type mockConstructorTestingTNewMockConfigManager interface {
mock.TestingT
Cleanup(func())
}
// NewMockConfigManager creates a new instance of MockConfigManager. It also registers a testing interface on the mock and a cleanup function to assert the mocks expectations.
func NewMockConfigManager(t mockConstructorTestingTNewMockConfigManager) *MockConfigManager {
mock := &MockConfigManager{}
mock.Mock.Test(t)

View File

@ -1,4 +1,4 @@
// Code generated by mockery v2.12.0. DO NOT EDIT.
// Code generated by mockery v2.15.0. DO NOT EDIT.
package submatview
@ -7,8 +7,6 @@ import (
mock "github.com/stretchr/testify/mock"
resolver "github.com/hashicorp/consul/acl/resolver"
testing "testing"
)
// MockACLResolver is an autogenerated mock type for the ACLResolver type
@ -37,8 +35,13 @@ func (_m *MockACLResolver) ResolveTokenAndDefaultMeta(token string, entMeta *acl
return r0, r1
}
// NewMockACLResolver creates a new instance of MockACLResolver. It also registers the testing.TB interface on the mock and a cleanup function to assert the mocks expectations.
func NewMockACLResolver(t testing.TB) *MockACLResolver {
type mockConstructorTestingTNewMockACLResolver interface {
mock.TestingT
Cleanup(func())
}
// NewMockACLResolver creates a new instance of MockACLResolver. It also registers a testing interface on the mock and a cleanup function to assert the mocks expectations.
func NewMockACLResolver(t mockConstructorTestingTNewMockACLResolver) *MockACLResolver {
mock := &MockACLResolver{}
mock.Mock.Test(t)

View File

@ -3,6 +3,7 @@ package xds
import (
"crypto/sha256"
"encoding/hex"
"errors"
"fmt"
"sync"
"sync/atomic"
@ -23,6 +24,7 @@ import (
"google.golang.org/protobuf/types/known/anypb"
external "github.com/hashicorp/consul/agent/grpc-external"
"github.com/hashicorp/consul/agent/grpc-external/limiter"
"github.com/hashicorp/consul/agent/proxycfg"
"github.com/hashicorp/consul/agent/structs"
"github.com/hashicorp/consul/agent/xds/xdscommon"
@ -88,17 +90,12 @@ func (s *Server) processDelta(stream ADSDeltaStream, reqCh <-chan *envoy_discove
return err
}
session, err := s.SessionLimiter.BeginSession()
if err != nil {
return errOverwhelmed
}
defer session.End()
// Loop state
var (
cfgSnap *proxycfg.ConfigSnapshot
node *envoy_config_core_v3.Node
stateCh <-chan *proxycfg.ConfigSnapshot
drainCh limiter.SessionTerminatedChan
watchCancel func()
proxyID structs.ServiceID
nonce uint64 // xDS requires a unique nonce to correlate response/request pairs
@ -176,7 +173,7 @@ func (s *Server) processDelta(stream ADSDeltaStream, reqCh <-chan *envoy_discove
for {
select {
case <-session.Terminated():
case <-drainCh:
generator.Logger.Debug("draining stream to rebalance load")
metrics.IncrCounter([]string{"xds", "server", "streamDrained"}, 1)
return errOverwhelmed
@ -293,8 +290,11 @@ func (s *Server) processDelta(stream ADSDeltaStream, reqCh <-chan *envoy_discove
return status.Errorf(codes.Internal, "failed to watch proxy service: %s", err)
}
stateCh, watchCancel, err = s.CfgSrc.Watch(proxyID, nodeName, options.Token)
if err != nil {
stateCh, drainCh, watchCancel, err = s.CfgSrc.Watch(proxyID, nodeName, options.Token)
switch {
case errors.Is(err, limiter.ErrCapacityReached):
return errOverwhelmed
case err != nil:
return status.Errorf(codes.Internal, "failed to watch proxy service: %s", err)
}
// Note that in this case we _intend_ the defer to only be triggered when

View File

@ -32,7 +32,7 @@ func TestServer_DeltaAggregatedResources_v3_BasicProtocol_TCP(t *testing.T) {
// Allow all
return acl.RootAuthorizer("manage"), nil
}
scenario := newTestServerDeltaScenario(t, aclResolve, "web-sidecar-proxy", "", 0, nil)
scenario := newTestServerDeltaScenario(t, aclResolve, "web-sidecar-proxy", "", 0)
mgr, errCh, envoy := scenario.mgr, scenario.errCh, scenario.envoy
sid := structs.NewServiceID("web-sidecar-proxy", nil)
@ -231,7 +231,7 @@ func TestServer_DeltaAggregatedResources_v3_NackLoop(t *testing.T) {
// Allow all
return acl.RootAuthorizer("manage"), nil
}
scenario := newTestServerDeltaScenario(t, aclResolve, "web-sidecar-proxy", "", 0, nil)
scenario := newTestServerDeltaScenario(t, aclResolve, "web-sidecar-proxy", "", 0)
mgr, errCh, envoy := scenario.mgr, scenario.errCh, scenario.envoy
sid := structs.NewServiceID("web-sidecar-proxy", nil)
@ -363,7 +363,7 @@ func TestServer_DeltaAggregatedResources_v3_BasicProtocol_HTTP2(t *testing.T) {
// Allow all
return acl.RootAuthorizer("manage"), nil
}
scenario := newTestServerDeltaScenario(t, aclResolve, "web-sidecar-proxy", "", 0, nil)
scenario := newTestServerDeltaScenario(t, aclResolve, "web-sidecar-proxy", "", 0)
mgr, errCh, envoy := scenario.mgr, scenario.errCh, scenario.envoy
sid := structs.NewServiceID("web-sidecar-proxy", nil)
@ -515,7 +515,7 @@ func TestServer_DeltaAggregatedResources_v3_SlowEndpointPopulation(t *testing.T)
// Allow all
return acl.RootAuthorizer("manage"), nil
}
scenario := newTestServerDeltaScenario(t, aclResolve, "web-sidecar-proxy", "", 0, nil)
scenario := newTestServerDeltaScenario(t, aclResolve, "web-sidecar-proxy", "", 0)
server, mgr, errCh, envoy := scenario.server, scenario.mgr, scenario.errCh, scenario.envoy
// This mutateFn causes any endpoint with a name containing "geo-cache" to be
@ -660,7 +660,7 @@ func TestServer_DeltaAggregatedResources_v3_BasicProtocol_TCP_clusterChangesImpa
// Allow all
return acl.RootAuthorizer("manage"), nil
}
scenario := newTestServerDeltaScenario(t, aclResolve, "web-sidecar-proxy", "", 0, nil)
scenario := newTestServerDeltaScenario(t, aclResolve, "web-sidecar-proxy", "", 0)
mgr, errCh, envoy := scenario.mgr, scenario.errCh, scenario.envoy
sid := structs.NewServiceID("web-sidecar-proxy", nil)
@ -797,7 +797,7 @@ func TestServer_DeltaAggregatedResources_v3_BasicProtocol_HTTP2_RDS_listenerChan
// Allow all
return acl.RootAuthorizer("manage"), nil
}
scenario := newTestServerDeltaScenario(t, aclResolve, "web-sidecar-proxy", "", 0, nil)
scenario := newTestServerDeltaScenario(t, aclResolve, "web-sidecar-proxy", "", 0)
mgr, errCh, envoy := scenario.mgr, scenario.errCh, scenario.envoy
sid := structs.NewServiceID("web-sidecar-proxy", nil)
@ -1055,7 +1055,7 @@ func TestServer_DeltaAggregatedResources_v3_ACLEnforcement(t *testing.T) {
return acl.NewPolicyAuthorizerWithDefaults(acl.RootAuthorizer("deny"), []*acl.Policy{policy}, nil)
}
scenario := newTestServerDeltaScenario(t, aclResolve, "web-sidecar-proxy", tt.token, 0, nil)
scenario := newTestServerDeltaScenario(t, aclResolve, "web-sidecar-proxy", tt.token, 0)
mgr, errCh, envoy := scenario.mgr, scenario.errCh, scenario.envoy
sid := structs.NewServiceID("web-sidecar-proxy", nil)
@ -1154,7 +1154,6 @@ func TestServer_DeltaAggregatedResources_v3_ACLTokenDeleted_StreamTerminatedDuri
}
scenario := newTestServerDeltaScenario(t, aclResolve, "web-sidecar-proxy", token,
100*time.Millisecond, // Make this short.
nil,
)
mgr, errCh, envoy := scenario.mgr, scenario.errCh, scenario.envoy
@ -1253,7 +1252,6 @@ func TestServer_DeltaAggregatedResources_v3_ACLTokenDeleted_StreamTerminatedInBa
}
scenario := newTestServerDeltaScenario(t, aclResolve, "web-sidecar-proxy", token,
100*time.Millisecond, // Make this short.
nil,
)
mgr, errCh, envoy := scenario.mgr, scenario.errCh, scenario.envoy
@ -1334,7 +1332,7 @@ func TestServer_DeltaAggregatedResources_v3_IngressEmptyResponse(t *testing.T) {
// Allow all
return acl.RootAuthorizer("manage"), nil
}
scenario := newTestServerDeltaScenario(t, aclResolve, "ingress-gateway", "", 0, nil)
scenario := newTestServerDeltaScenario(t, aclResolve, "ingress-gateway", "", 0)
mgr, errCh, envoy := scenario.mgr, scenario.errCh, scenario.envoy
sid := structs.NewServiceID("ingress-gateway", nil)
@ -1389,12 +1387,13 @@ func TestServer_DeltaAggregatedResources_v3_IngressEmptyResponse(t *testing.T) {
func TestServer_DeltaAggregatedResources_v3_CapacityReached(t *testing.T) {
aclResolve := func(id string) (acl.Authorizer, error) { return acl.ManageAll(), nil }
scenario := newTestServerDeltaScenario(t, aclResolve, "web-sidecar-proxy", "", 0, capacityReachedLimiter{})
scenario := newTestServerDeltaScenario(t, aclResolve, "web-sidecar-proxy", "", 0)
mgr, errCh, envoy := scenario.mgr, scenario.errCh, scenario.envoy
sid := structs.NewServiceID("web-sidecar-proxy", nil)
mgr.RegisterProxy(t, sid)
mgr.DrainStreams(sid)
snap := newTestSnapshot(t, nil, "")
@ -1420,10 +1419,8 @@ func (capacityReachedLimiter) BeginSession() (limiter.Session, error) {
}
func TestServer_DeltaAggregatedResources_v3_StreamDrained(t *testing.T) {
limiter := &testLimiter{}
aclResolve := func(id string) (acl.Authorizer, error) { return acl.ManageAll(), nil }
scenario := newTestServerDeltaScenario(t, aclResolve, "web-sidecar-proxy", "", 0, limiter)
scenario := newTestServerDeltaScenario(t, aclResolve, "web-sidecar-proxy", "", 0)
mgr, errCh, envoy := scenario.mgr, scenario.errCh, scenario.envoy
sid := structs.NewServiceID("web-sidecar-proxy", nil)
@ -1452,7 +1449,7 @@ func TestServer_DeltaAggregatedResources_v3_StreamDrained(t *testing.T) {
})
testutil.RunStep(t, "terminate limiter session", func(t *testing.T) {
limiter.TerminateSession()
mgr.DrainStreams(sid)
select {
case err := <-errCh:
@ -1488,25 +1485,6 @@ func TestServer_DeltaAggregatedResources_v3_StreamDrained(t *testing.T) {
})
}
type testLimiter struct {
termCh chan struct{}
}
func (t *testLimiter) BeginSession() (limiter.Session, error) {
t.termCh = make(chan struct{})
return &testSession{termCh: t.termCh}, nil
}
func (t *testLimiter) TerminateSession() { close(t.termCh) }
type testSession struct {
termCh chan struct{}
}
func (t *testSession) Terminated() <-chan struct{} { return t.termCh }
func (*testSession) End() {}
func assertDeltaChanBlocked(t *testing.T, ch chan *envoy_discovery_v3.DeltaDiscoveryResponse) {
t.Helper()
select {

View File

@ -96,13 +96,7 @@ type ConfigFetcher interface {
// ProxyConfigSource is the interface xds.Server requires to consume proxy
// config updates.
type ProxyConfigSource interface {
Watch(id structs.ServiceID, nodeName string, token string) (<-chan *proxycfg.ConfigSnapshot, proxycfg.CancelFunc, error)
}
// SessionLimiter is the interface exposed by limiter.SessionLimiter. We depend
// on an interface rather than the concrete type so we can mock it in tests.
type SessionLimiter interface {
BeginSession() (limiter.Session, error)
Watch(id structs.ServiceID, nodeName string, token string) (<-chan *proxycfg.ConfigSnapshot, limiter.SessionTerminatedChan, proxycfg.CancelFunc, error)
}
// Server represents a gRPC server that can handle xDS requests from Envoy. All
@ -111,12 +105,11 @@ type SessionLimiter interface {
// A full description of the XDS protocol can be found at
// https://www.envoyproxy.io/docs/envoy/latest/api-docs/xds_protocol
type Server struct {
NodeName string
Logger hclog.Logger
CfgSrc ProxyConfigSource
ResolveToken ACLResolverFunc
CfgFetcher ConfigFetcher
SessionLimiter SessionLimiter
NodeName string
Logger hclog.Logger
CfgSrc ProxyConfigSource
ResolveToken ACLResolverFunc
CfgFetcher ConfigFetcher
// AuthCheckFrequency is how often we should re-check the credentials used
// during a long-lived gRPC Stream after it has been initially established.
@ -168,7 +161,6 @@ func NewServer(
cfgMgr ProxyConfigSource,
resolveToken ACLResolverFunc,
cfgFetcher ConfigFetcher,
limiter SessionLimiter,
) *Server {
return &Server{
NodeName: nodeName,
@ -176,7 +168,6 @@ func NewServer(
CfgSrc: cfgMgr,
ResolveToken: resolveToken,
CfgFetcher: cfgFetcher,
SessionLimiter: limiter,
AuthCheckFrequency: DefaultAuthCheckFrequency,
activeStreams: &activeStreamCounters{},
}

View File

@ -66,14 +66,16 @@ func newTestSnapshot(
// testing. It also implements ConnectAuthz to allow control over authorization.
type testManager struct {
sync.Mutex
chans map[structs.ServiceID]chan *proxycfg.ConfigSnapshot
cancels chan structs.ServiceID
stateChans map[structs.ServiceID]chan *proxycfg.ConfigSnapshot
drainChans map[structs.ServiceID]chan struct{}
cancels chan structs.ServiceID
}
func newTestManager(t *testing.T) *testManager {
return &testManager{
chans: map[structs.ServiceID]chan *proxycfg.ConfigSnapshot{},
cancels: make(chan structs.ServiceID, 10),
stateChans: map[structs.ServiceID]chan *proxycfg.ConfigSnapshot{},
drainChans: map[structs.ServiceID]chan struct{}{},
cancels: make(chan structs.ServiceID, 10),
}
}
@ -81,7 +83,8 @@ func newTestManager(t *testing.T) *testManager {
func (m *testManager) RegisterProxy(t *testing.T, proxyID structs.ServiceID) {
m.Lock()
defer m.Unlock()
m.chans[proxyID] = make(chan *proxycfg.ConfigSnapshot, 1)
m.stateChans[proxyID] = make(chan *proxycfg.ConfigSnapshot, 1)
m.drainChans[proxyID] = make(chan struct{})
}
// Deliver simulates a proxy registration
@ -90,18 +93,42 @@ func (m *testManager) DeliverConfig(t *testing.T, proxyID structs.ServiceID, cfg
m.Lock()
defer m.Unlock()
select {
case m.chans[proxyID] <- cfg:
case m.stateChans[proxyID] <- cfg:
case <-time.After(10 * time.Millisecond):
t.Fatalf("took too long to deliver config")
}
}
// Watch implements ConfigManager
func (m *testManager) Watch(proxyID structs.ServiceID, _ string, _ string) (<-chan *proxycfg.ConfigSnapshot, proxycfg.CancelFunc, error) {
// DrainStreams drains any open streams for the given proxyID. If there aren't
// any open streams, it'll create a marker so that future attempts to watch the
// given proxyID will return limiter.ErrCapacityReached.
func (m *testManager) DrainStreams(proxyID structs.ServiceID) {
m.Lock()
defer m.Unlock()
ch, ok := m.drainChans[proxyID]
if !ok {
ch = make(chan struct{})
m.drainChans[proxyID] = ch
}
close(ch)
}
// Watch implements ConfigManager
func (m *testManager) Watch(proxyID structs.ServiceID, _ string, _ string) (<-chan *proxycfg.ConfigSnapshot, limiter.SessionTerminatedChan, proxycfg.CancelFunc, error) {
m.Lock()
defer m.Unlock()
// If the drain chan has already been closed, return limiter.ErrCapacityReached.
drainCh := m.drainChans[proxyID]
select {
case <-drainCh:
return nil, nil, nil, limiter.ErrCapacityReached
default:
}
// ch might be nil but then it will just block forever
return m.chans[proxyID], func() {
return m.stateChans[proxyID], drainCh, func() {
m.cancels <- proxyID
}, nil
}
@ -135,7 +162,6 @@ func newTestServerDeltaScenario(
proxyID string,
token string,
authCheckFrequency time.Duration,
sessionLimiter SessionLimiter,
) *testServerScenario {
mgr := newTestManager(t)
envoy := NewTestEnvoy(t, proxyID, token)
@ -154,17 +180,12 @@ func newTestServerDeltaScenario(
metrics.NewGlobal(cfg, sink)
})
if sessionLimiter == nil {
sessionLimiter = limiter.NewSessionLimiter()
}
s := NewServer(
"node-123",
testutil.Logger(t),
mgr,
resolveToken,
nil, /*cfgFetcher ConfigFetcher*/
sessionLimiter,
)
if authCheckFrequency > 0 {
s.AuthCheckFrequency = authCheckFrequency

View File

@ -1,4 +1,4 @@
// Code generated by mockery v2.12.2. DO NOT EDIT.
// Code generated by mockery v2.15.0. DO NOT EDIT.
package pbdns
@ -8,8 +8,6 @@ import (
grpc "google.golang.org/grpc"
mock "github.com/stretchr/testify/mock"
testing "testing"
)
// MockDNSServiceClient is an autogenerated mock type for the DNSServiceClient type
@ -47,8 +45,13 @@ func (_m *MockDNSServiceClient) Query(ctx context.Context, in *QueryRequest, opt
return r0, r1
}
// NewMockDNSServiceClient creates a new instance of MockDNSServiceClient. It also registers the testing.TB interface on the mock and a cleanup function to assert the mocks expectations.
func NewMockDNSServiceClient(t testing.TB) *MockDNSServiceClient {
type mockConstructorTestingTNewMockDNSServiceClient interface {
mock.TestingT
Cleanup(func())
}
// NewMockDNSServiceClient creates a new instance of MockDNSServiceClient. It also registers a testing interface on the mock and a cleanup function to assert the mocks expectations.
func NewMockDNSServiceClient(t mockConstructorTestingTNewMockDNSServiceClient) *MockDNSServiceClient {
mock := &MockDNSServiceClient{}
mock.Mock.Test(t)

View File

@ -1,10 +1,9 @@
// Code generated by mockery v2.12.2. DO NOT EDIT.
// Code generated by mockery v2.15.0. DO NOT EDIT.
package pbdns
import (
context "context"
testing "testing"
mock "github.com/stretchr/testify/mock"
)
@ -37,8 +36,13 @@ func (_m *MockDNSServiceServer) Query(_a0 context.Context, _a1 *QueryRequest) (*
return r0, r1
}
// NewMockDNSServiceServer creates a new instance of MockDNSServiceServer. It also registers the testing.TB interface on the mock and a cleanup function to assert the mocks expectations.
func NewMockDNSServiceServer(t testing.TB) *MockDNSServiceServer {
type mockConstructorTestingTNewMockDNSServiceServer interface {
mock.TestingT
Cleanup(func())
}
// NewMockDNSServiceServer creates a new instance of MockDNSServiceServer. It also registers a testing interface on the mock and a cleanup function to assert the mocks expectations.
func NewMockDNSServiceServer(t mockConstructorTestingTNewMockDNSServiceServer) *MockDNSServiceServer {
mock := &MockDNSServiceServer{}
mock.Mock.Test(t)

View File

@ -1,12 +1,8 @@
// Code generated by mockery v2.12.2. DO NOT EDIT.
// Code generated by mockery v2.15.0. DO NOT EDIT.
package pbdns
import (
testing "testing"
mock "github.com/stretchr/testify/mock"
)
import mock "github.com/stretchr/testify/mock"
// MockUnsafeDNSServiceServer is an autogenerated mock type for the UnsafeDNSServiceServer type
type MockUnsafeDNSServiceServer struct {
@ -18,8 +14,13 @@ func (_m *MockUnsafeDNSServiceServer) mustEmbedUnimplementedDNSServiceServer() {
_m.Called()
}
// NewMockUnsafeDNSServiceServer creates a new instance of MockUnsafeDNSServiceServer. It also registers the testing.TB interface on the mock and a cleanup function to assert the mocks expectations.
func NewMockUnsafeDNSServiceServer(t testing.TB) *MockUnsafeDNSServiceServer {
type mockConstructorTestingTNewMockUnsafeDNSServiceServer interface {
mock.TestingT
Cleanup(func())
}
// NewMockUnsafeDNSServiceServer creates a new instance of MockUnsafeDNSServiceServer. It also registers a testing interface on the mock and a cleanup function to assert the mocks expectations.
func NewMockUnsafeDNSServiceServer(t mockConstructorTestingTNewMockUnsafeDNSServiceServer) *MockUnsafeDNSServiceServer {
mock := &MockUnsafeDNSServiceServer{}
mock.Mock.Test(t)