agent/cache: Make all cache options RegisterOptions

Previously the SupportsBlocking option was specified by a method on the
type, and all the other options were specified from RegisterOptions.

This change moves RegisterOptions to a method on the type, and moves
SupportsBlocking into the options struct.

Currently there are only 2 cache-types. So all cache-types can implement
this method by embedding a struct with those predefined values. In the
future if a cache type needs to be registered more than once with different
options it can remove the embedded type and implement the method in a way
that allows for paramaterization.
This commit is contained in:
Daniel Nephin 2020-04-14 18:29:30 -04:00
parent 6a5eba63ab
commit 1251c01b73
29 changed files with 213 additions and 331 deletions

View file

@ -4182,151 +4182,45 @@ func (a *Agent) registerCache() {
// the a.delegate directly, otherwise tests that rely on overriding RPC
// routing via a.registerEndpoint will not work.
a.cache.RegisterType(cachetype.ConnectCARootName, &cachetype.ConnectCARoot{
RPC: a,
}, &cache.RegisterOptions{
// Maintain a blocking query, retry dropped connections quickly
Refresh: true,
RefreshTimer: 0 * time.Second,
RefreshTimeout: 10 * time.Minute,
})
a.cache.RegisterType(cachetype.ConnectCARootName, &cachetype.ConnectCARoot{RPC: a})
a.cache.RegisterType(cachetype.ConnectCALeafName, &cachetype.ConnectCALeaf{
RPC: a,
Cache: a.cache,
Datacenter: a.config.Datacenter,
TestOverrideCAChangeInitialDelay: a.config.ConnectTestCALeafRootChangeSpread,
}, &cache.RegisterOptions{
// Maintain a blocking query, retry dropped connections quickly
Refresh: true,
RefreshTimer: 0 * time.Second,
RefreshTimeout: 10 * time.Minute,
})
a.cache.RegisterType(cachetype.IntentionMatchName, &cachetype.IntentionMatch{
RPC: a,
}, &cache.RegisterOptions{
// Maintain a blocking query, retry dropped connections quickly
Refresh: true,
RefreshTimer: 0 * time.Second,
RefreshTimeout: 10 * time.Minute,
})
a.cache.RegisterType(cachetype.IntentionMatchName, &cachetype.IntentionMatch{RPC: a})
a.cache.RegisterType(cachetype.CatalogServicesName, &cachetype.CatalogServices{
RPC: a,
}, &cache.RegisterOptions{
// Maintain a blocking query, retry dropped connections quickly
Refresh: true,
RefreshTimer: 0 * time.Second,
RefreshTimeout: 10 * time.Minute,
})
a.cache.RegisterType(cachetype.CatalogServicesName, &cachetype.CatalogServices{RPC: a})
a.cache.RegisterType(cachetype.HealthServicesName, &cachetype.HealthServices{
RPC: a,
}, &cache.RegisterOptions{
// Maintain a blocking query, retry dropped connections quickly
Refresh: true,
RefreshTimer: 0 * time.Second,
RefreshTimeout: 10 * time.Minute,
})
a.cache.RegisterType(cachetype.HealthServicesName, &cachetype.HealthServices{RPC: a})
a.cache.RegisterType(cachetype.PreparedQueryName, &cachetype.PreparedQuery{
RPC: a,
}, &cache.RegisterOptions{
// Prepared queries don't support blocking
Refresh: false,
})
a.cache.RegisterType(cachetype.PreparedQueryName, &cachetype.PreparedQuery{RPC: a})
a.cache.RegisterType(cachetype.NodeServicesName, &cachetype.NodeServices{
RPC: a,
}, &cache.RegisterOptions{
// Maintain a blocking query, retry dropped connections quickly
Refresh: true,
RefreshTimer: 0 * time.Second,
RefreshTimeout: 10 * time.Minute,
})
a.cache.RegisterType(cachetype.NodeServicesName, &cachetype.NodeServices{RPC: a})
a.cache.RegisterType(cachetype.ResolvedServiceConfigName, &cachetype.ResolvedServiceConfig{
RPC: a,
}, &cache.RegisterOptions{
// Maintain a blocking query, retry dropped connections quickly
Refresh: true,
RefreshTimer: 0 * time.Second,
RefreshTimeout: 10 * time.Minute,
})
a.cache.RegisterType(cachetype.ResolvedServiceConfigName, &cachetype.ResolvedServiceConfig{RPC: a})
a.cache.RegisterType(cachetype.CatalogListServicesName, &cachetype.CatalogListServices{
RPC: a,
}, &cache.RegisterOptions{
Refresh: true,
RefreshTimer: 0 * time.Second,
RefreshTimeout: 10 * time.Minute,
})
a.cache.RegisterType(cachetype.CatalogListServicesName, &cachetype.CatalogListServices{RPC: a})
a.cache.RegisterType(cachetype.CatalogServiceListName, &cachetype.CatalogServiceList{
RPC: a,
}, &cache.RegisterOptions{
Refresh: true,
RefreshTimer: 0 * time.Second,
RefreshTimeout: 10 * time.Minute,
})
a.cache.RegisterType(cachetype.CatalogServiceListName, &cachetype.CatalogServiceList{RPC: a})
a.cache.RegisterType(cachetype.CatalogDatacentersName, &cachetype.CatalogDatacenters{
RPC: a,
}, &cache.RegisterOptions{
Refresh: false,
})
a.cache.RegisterType(cachetype.CatalogDatacentersName, &cachetype.CatalogDatacenters{RPC: a})
a.cache.RegisterType(cachetype.InternalServiceDumpName, &cachetype.InternalServiceDump{
RPC: a,
}, &cache.RegisterOptions{
Refresh: true,
RefreshTimer: 0 * time.Second,
RefreshTimeout: 10 * time.Minute,
})
a.cache.RegisterType(cachetype.InternalServiceDumpName, &cachetype.InternalServiceDump{RPC: a})
a.cache.RegisterType(cachetype.CompiledDiscoveryChainName, &cachetype.CompiledDiscoveryChain{
RPC: a,
}, &cache.RegisterOptions{
// Maintain a blocking query, retry dropped connections quickly
Refresh: true,
RefreshTimer: 0 * time.Second,
RefreshTimeout: 10 * time.Minute,
})
a.cache.RegisterType(cachetype.CompiledDiscoveryChainName, &cachetype.CompiledDiscoveryChain{RPC: a})
a.cache.RegisterType(cachetype.GatewayServicesName, &cachetype.GatewayServices{
RPC: a,
}, &cache.RegisterOptions{
// Maintain a blocking query, retry dropped connections quickly
Refresh: true,
RefreshTimer: 0 * time.Second,
RefreshTimeout: 10 * time.Minute,
})
a.cache.RegisterType(cachetype.GatewayServicesName, &cachetype.GatewayServices{RPC: a})
a.cache.RegisterType(cachetype.ConfigEntriesName, &cachetype.ConfigEntries{
RPC: a,
}, &cache.RegisterOptions{
// Maintain a blocking query, retry dropped connections quickly
Refresh: true,
RefreshTimer: 0 * time.Second,
RefreshTimeout: 10 * time.Minute,
})
a.cache.RegisterType(cachetype.ConfigEntriesName, &cachetype.ConfigEntries{RPC: a})
a.cache.RegisterType(cachetype.ServiceHTTPChecksName, &cachetype.ServiceHTTPChecks{
Agent: a,
}, &cache.RegisterOptions{
Refresh: true,
RefreshTimer: 0 * time.Second,
RefreshTimeout: 10 * time.Minute,
})
a.cache.RegisterType(cachetype.ServiceHTTPChecksName, &cachetype.ServiceHTTPChecks{Agent: a})
a.cache.RegisterType(cachetype.FederationStateListMeshGatewaysName, &cachetype.FederationStateListMeshGateways{
RPC: a,
}, &cache.RegisterOptions{
Refresh: true,
RefreshTimer: 0 * time.Second,
RefreshTimeout: 10 * time.Minute,
})
a.cache.RegisterType(cachetype.FederationStateListMeshGatewaysName,
&cachetype.FederationStateListMeshGateways{RPC: a})
}
// LocalState returns the agent's local state

View file

@ -12,6 +12,7 @@ const CatalogDatacentersName = "catalog-datacenters"
// Datacenters supports fetching discovering all the known datacenters
type CatalogDatacenters struct {
RegisterOptionsNoRefresh
RPC RPC
}
@ -75,7 +76,3 @@ func (c *CatalogDatacenters) Fetch(opts cache.FetchOptions, req cache.Request) (
return result, nil
}
func (c *CatalogDatacenters) SupportsBlocking() bool {
return false
}

View file

@ -12,6 +12,7 @@ const CatalogListServicesName = "catalog-list-services"
// CatalogListServices supports fetching discovering service names via the catalog.
type CatalogListServices struct {
RegisterOptionsBlockingRefresh
RPC RPC
}
@ -49,7 +50,3 @@ func (c *CatalogListServices) Fetch(opts cache.FetchOptions, req cache.Request)
result.Index = reply.QueryMeta.Index
return result, nil
}
func (c *CatalogListServices) SupportsBlocking() bool {
return true
}

View file

@ -12,6 +12,7 @@ const CatalogServiceListName = "catalog-services-list"
// CatalogServiceList supports fetching service names via the catalog.
type CatalogServiceList struct {
RegisterOptionsBlockingRefresh
RPC RPC
}
@ -49,7 +50,3 @@ func (c *CatalogServiceList) Fetch(opts cache.FetchOptions, req cache.Request) (
result.Index = reply.QueryMeta.Index
return result, nil
}
func (c *CatalogServiceList) SupportsBlocking() bool {
return true
}

View file

@ -13,6 +13,7 @@ const CatalogServicesName = "catalog-services"
// CatalogServices supports fetching discovering service instances via the
// catalog.
type CatalogServices struct {
RegisterOptionsBlockingRefresh
RPC RPC
}
@ -50,7 +51,3 @@ func (c *CatalogServices) Fetch(opts cache.FetchOptions, req cache.Request) (cac
result.Index = reply.QueryMeta.Index
return result, nil
}
func (c *CatalogServices) SupportsBlocking() bool {
return true
}

View file

@ -12,6 +12,7 @@ const ConfigEntriesName = "config-entries"
// ConfigEntries supports fetching discovering configuration entries
type ConfigEntries struct {
RegisterOptionsBlockingRefresh
RPC RPC
}
@ -49,7 +50,3 @@ func (c *ConfigEntries) Fetch(opts cache.FetchOptions, req cache.Request) (cache
result.Index = reply.QueryMeta.Index
return result, nil
}
func (c *ConfigEntries) SupportsBlocking() bool {
return true
}

View file

@ -50,6 +50,7 @@ const caChangeJitterWindow = 30 * time.Second
// ConnectCALeaf supports fetching and generating Connect leaf
// certificates.
type ConnectCALeaf struct {
RegisterOptionsBlockingRefresh
caIndex uint64 // Current index for CA roots
// rootWatchMu protects access to the rootWatchSubscribers map and
@ -629,10 +630,6 @@ func (c *ConnectCALeaf) generateNewLeaf(req *ConnectCALeafRequest,
return result, nil
}
func (c *ConnectCALeaf) SupportsBlocking() bool {
return true
}
// ConnectCALeafRequest is the cache.Request implementation for the
// ConnectCALeaf cache type. This is implemented here and not in structs
// since this is only used for cache-related requests and not forwarded

View file

@ -966,6 +966,21 @@ func TestConnectCALeaf_expiringLeaf(t *testing.T) {
}
}
// testConnectCaRoot wraps ConnectCARoot to disable refresh so that the gated
// channel controls the request directly. Otherwise, we get background refreshes and
// it screws up the ordering of the channel reads of the testGatedRootsRPC
// implementation.
type testConnectCaRoot struct {
ConnectCARoot
}
func (r testConnectCaRoot) RegisterOptions() cache.RegisterOptions {
return cache.RegisterOptions{
Refresh: false,
SupportsBlocking: true,
}
}
// testCALeafType returns a *ConnectCALeaf that is pre-configured to
// use the given RPC implementation for "ConnectCA.Sign" operations.
func testCALeafType(t *testing.T, rpc RPC) (*ConnectCALeaf, chan structs.IndexedCARoots) {
@ -977,14 +992,9 @@ func testCALeafType(t *testing.T, rpc RPC) (*ConnectCALeaf, chan structs.Indexed
// Create a cache
c := cache.TestCache(t)
c.RegisterType(ConnectCARootName, &ConnectCARoot{RPC: rootsRPC}, &cache.RegisterOptions{
// Disable refresh so that the gated channel controls the
// request directly. Otherwise, we get background refreshes and
// it screws up the ordering of the channel reads of the
// testGatedRootsRPC implementation.
Refresh: false,
c.RegisterType(ConnectCARootName, &testConnectCaRoot{
ConnectCARoot: ConnectCARoot{RPC: rootsRPC},
})
// Create the leaf type
return &ConnectCALeaf{
RPC: rpc,

View file

@ -14,6 +14,7 @@ const ConnectCARootName = "connect-ca-root"
// straightforward cache type since it only has to block on the given
// index and return the data.
type ConnectCARoot struct {
RegisterOptionsBlockingRefresh
RPC RPC
}
@ -45,7 +46,3 @@ func (c *ConnectCARoot) Fetch(opts cache.FetchOptions, req cache.Request) (cache
result.Index = reply.QueryMeta.Index
return result, nil
}
func (c *ConnectCARoot) SupportsBlocking() bool {
return true
}

View file

@ -13,6 +13,7 @@ const CompiledDiscoveryChainName = "compiled-discovery-chain"
// CompiledDiscoveryChain supports fetching the complete discovery chain for a
// service and caching its compilation.
type CompiledDiscoveryChain struct {
RegisterOptionsBlockingRefresh
RPC RPC
}
@ -50,7 +51,3 @@ func (c *CompiledDiscoveryChain) Fetch(opts cache.FetchOptions, req cache.Reques
result.Index = reply.QueryMeta.Index
return result, nil
}
func (c *CompiledDiscoveryChain) SupportsBlocking() bool {
return true
}

View file

@ -12,6 +12,7 @@ const FederationStateListMeshGatewaysName = "federation-state-list-mesh-gateways
// FederationState supports fetching federation states.
type FederationStateListMeshGateways struct {
RegisterOptionsBlockingRefresh
RPC RPC
}
@ -49,7 +50,3 @@ func (c *FederationStateListMeshGateways) Fetch(opts cache.FetchOptions, req cac
result.Index = reply.QueryMeta.Index
return result, nil
}
func (c *FederationStateListMeshGateways) SupportsBlocking() bool {
return true
}

View file

@ -12,6 +12,7 @@ const GatewayServicesName = "gateway-services"
// GatewayUpstreams supports fetching upstreams for a given gateway name.
type GatewayServices struct {
RegisterOptionsBlockingRefresh
RPC RPC
}
@ -49,7 +50,3 @@ func (g *GatewayServices) Fetch(opts cache.FetchOptions, req cache.Request) (cac
result.Index = reply.QueryMeta.Index
return result, nil
}
func (g *GatewayServices) SupportsBlocking() bool {
return true
}

View file

@ -13,6 +13,7 @@ const HealthServicesName = "health-services"
// HealthServices supports fetching discovering service instances via the
// catalog.
type HealthServices struct {
RegisterOptionsBlockingRefresh
RPC RPC
}
@ -50,7 +51,3 @@ func (c *HealthServices) Fetch(opts cache.FetchOptions, req cache.Request) (cach
result.Index = reply.QueryMeta.Index
return result, nil
}
func (c *HealthServices) SupportsBlocking() bool {
return true
}

View file

@ -12,6 +12,7 @@ const IntentionMatchName = "intention-match"
// IntentionMatch supports fetching the intentions via match queries.
type IntentionMatch struct {
RegisterOptionsBlockingRefresh
RPC RPC
}
@ -43,7 +44,3 @@ func (c *IntentionMatch) Fetch(opts cache.FetchOptions, req cache.Request) (cach
result.Index = reply.Index
return result, nil
}
func (c *IntentionMatch) SupportsBlocking() bool {
return true
}

View file

@ -13,6 +13,7 @@ const NodeServicesName = "node-services"
// NodeServices supports fetching discovering service instances via the
// catalog.
type NodeServices struct {
RegisterOptionsBlockingRefresh
RPC RPC
}
@ -50,7 +51,3 @@ func (c *NodeServices) Fetch(opts cache.FetchOptions, req cache.Request) (cache.
result.Index = reply.QueryMeta.Index
return result, nil
}
func (c *NodeServices) SupportsBlocking() bool {
return true
}

View file

@ -0,0 +1,36 @@
package cachetype
import (
"time"
"github.com/hashicorp/consul/agent/cache"
)
// RegisterOptionsBlockingRefresh can be embedded into a struct to implement
// part of the agent/cache.Type interface.
// When embedded into a struct it identifies the cache type as one which
// supports blocking, and uses refresh to keep the cache fresh.
type RegisterOptionsBlockingRefresh struct{}
func (r RegisterOptionsBlockingRefresh) RegisterOptions() cache.RegisterOptions {
return cache.RegisterOptions{
// Maintain a blocking query, retry dropped connections quickly
Refresh: true,
SupportsBlocking: true,
RefreshTimer: 0 * time.Second,
RefreshTimeout: 10 * time.Minute,
}
}
// RegisterOptionsNoRefresh can be embedded into a struct to implement
// part of the agent/cache.Type interface.
// When embedded into a struct it identifies the cache type as one which
// does not support blocking, and should not be refreshed.
type RegisterOptionsNoRefresh struct{}
func (r RegisterOptionsNoRefresh) RegisterOptions() cache.RegisterOptions {
return cache.RegisterOptions{
Refresh: false,
SupportsBlocking: false,
}
}

View file

@ -13,10 +13,11 @@ const PreparedQueryName = "prepared-query"
// PreparedQuery supports fetching discovering service instances via prepared
// queries.
type PreparedQuery struct {
RegisterOptionsNoRefresh
RPC RPC
}
func (c *PreparedQuery) Fetch(opts cache.FetchOptions, req cache.Request) (cache.FetchResult, error) {
func (c *PreparedQuery) Fetch(_ cache.FetchOptions, req cache.Request) (cache.FetchResult, error) {
var result cache.FetchResult
// The request should be a PreparedQueryExecuteRequest.
@ -47,8 +48,3 @@ func (c *PreparedQuery) Fetch(opts cache.FetchOptions, req cache.Request) (cache
return result, nil
}
func (c *PreparedQuery) SupportsBlocking() bool {
// Prepared queries don't support blocking.
return false
}

View file

@ -13,6 +13,7 @@ const ResolvedServiceConfigName = "resolved-service-config"
// ResolvedServiceConfig supports fetching the config for a service resolved from
// the global proxy defaults and the centrally registered service config.
type ResolvedServiceConfig struct {
RegisterOptionsBlockingRefresh
RPC RPC
}
@ -50,7 +51,3 @@ func (c *ResolvedServiceConfig) Fetch(opts cache.FetchOptions, req cache.Request
result.Index = reply.QueryMeta.Index
return result, nil
}
func (c *ResolvedServiceConfig) SupportsBlocking() bool {
return true
}

View file

@ -24,6 +24,7 @@ type Agent interface {
// ServiceHTTPBasedChecks supports fetching discovering checks in the local state
type ServiceHTTPChecks struct {
RegisterOptionsBlockingRefresh
Agent Agent
}
@ -91,10 +92,6 @@ func (c *ServiceHTTPChecks) Fetch(opts cache.FetchOptions, req cache.Request) (c
return result, nil
}
func (c *ServiceHTTPChecks) SupportsBlocking() bool {
return true
}
// ServiceHTTPChecksRequest is the cache.Request implementation for the
// ServiceHTTPBasedChecks cache type. This is implemented here and not in structs
// since this is only used for cache-related requests and not forwarded

View file

@ -12,6 +12,7 @@ const InternalServiceDumpName = "service-dump"
// InternalServiceDump supports fetching discovering service names via the catalog.
type InternalServiceDump struct {
RegisterOptionsBlockingRefresh
RPC RPC
}
@ -49,7 +50,3 @@ func (c *InternalServiceDump) Fetch(opts cache.FetchOptions, req cache.Request)
result.Index = reply.QueryMeta.Index
return result, nil
}
func (c *InternalServiceDump) SupportsBlocking() bool {
return true
}

18
agent/cache/cache.go vendored
View file

@ -160,6 +160,12 @@ type RegisterOptions struct {
// is to only request data on explicit Get.
Refresh bool
// SupportsBlocking should be set to true if the type supports blocking queries.
// Types that do not support blocking queries will not be able to use
// background refresh nor will the cache attempt blocking fetches if the
// client requests them with MinIndex.
SupportsBlocking bool
// RefreshTimer is the time between attempting to refresh data.
// If this is zero, then data is refreshed immediately when a fetch
// is returned.
@ -185,17 +191,15 @@ type RegisterOptions struct {
//
// This makes the type available for Get but does not automatically perform
// any prefetching. In order to populate the cache, Get must be called.
func (c *Cache) RegisterType(n string, typ Type, opts *RegisterOptions) {
if opts == nil {
opts = &RegisterOptions{}
}
func (c *Cache) RegisterType(n string, typ Type) {
opts := typ.RegisterOptions()
if opts.LastGetTTL == 0 {
opts.LastGetTTL = 72 * time.Hour // reasonable default is days
}
c.typesLock.Lock()
defer c.typesLock.Unlock()
c.types[n] = typeEntry{Name: n, Type: typ, Opts: opts}
c.types[n] = typeEntry{Name: n, Type: typ, Opts: &opts}
}
// Get loads the data for the given type and request. If data satisfying the
@ -241,7 +245,7 @@ func (c *Cache) getEntryLocked(
// Check index is not specified or lower than value, or the type doesn't
// support blocking.
if tEntry.Type.SupportsBlocking() && minIndex > 0 && minIndex >= entry.Index {
if tEntry.Opts.SupportsBlocking && minIndex > 0 && minIndex >= entry.Index {
// MinIndex was given and matches or is higher than current value so we
// ignore the cache and fallthrough to blocking on a new value below.
return true, false, entry
@ -465,7 +469,7 @@ func (c *Cache) fetch(tEntry typeEntry, key string, r Request, allowNew bool, at
}
fOpts := FetchOptions{}
if tEntry.Type.SupportsBlocking() {
if tEntry.Opts.SupportsBlocking {
fOpts.MinIndex = entry.Index
fOpts.Timeout = tEntry.Opts.RefreshTimeout
}

View file

@ -23,7 +23,7 @@ func TestCacheGet_noIndex(t *testing.T) {
typ := TestType(t)
defer typ.AssertExpectations(t)
c := TestCache(t)
c.RegisterType("t", typ, nil)
c.RegisterType("t", typ)
// Configure the type
typ.Static(FetchResult{Value: 42}, nil).Times(1)
@ -56,7 +56,7 @@ func TestCacheGet_initError(t *testing.T) {
typ := TestType(t)
defer typ.AssertExpectations(t)
c := TestCache(t)
c.RegisterType("t", typ, nil)
c.RegisterType("t", typ)
// Configure the type
fetcherr := fmt.Errorf("error")
@ -91,7 +91,7 @@ func TestCacheGet_cachedErrorsDontStick(t *testing.T) {
typ := TestType(t)
defer typ.AssertExpectations(t)
c := TestCache(t)
c.RegisterType("t", typ, nil)
c.RegisterType("t", typ)
// Configure the type
fetcherr := fmt.Errorf("initial error")
@ -152,7 +152,7 @@ func TestCacheGet_blankCacheKey(t *testing.T) {
typ := TestType(t)
defer typ.AssertExpectations(t)
c := TestCache(t)
c.RegisterType("t", typ, nil)
c.RegisterType("t", typ)
// Configure the type
typ.Static(FetchResult{Value: 42}, nil).Times(2)
@ -183,7 +183,7 @@ func TestCacheGet_blockingInitSameKey(t *testing.T) {
typ := TestType(t)
defer typ.AssertExpectations(t)
c := TestCache(t)
c.RegisterType("t", typ, nil)
c.RegisterType("t", typ)
// Configure the type
triggerCh := make(chan time.Time)
@ -220,7 +220,7 @@ func TestCacheGet_blockingInitDiffKeys(t *testing.T) {
typ := TestType(t)
defer typ.AssertExpectations(t)
c := TestCache(t)
c.RegisterType("t", typ, nil)
c.RegisterType("t", typ)
// Keep track of the keys
var keysLock sync.Mutex
@ -270,7 +270,7 @@ func TestCacheGet_blockingIndex(t *testing.T) {
typ := TestType(t)
defer typ.AssertExpectations(t)
c := TestCache(t)
c.RegisterType("t", typ, nil)
c.RegisterType("t", typ)
// Configure the type
triggerCh := make(chan time.Time)
@ -304,7 +304,7 @@ func TestCacheGet_blockingIndexTimeout(t *testing.T) {
typ := TestType(t)
defer typ.AssertExpectations(t)
c := TestCache(t)
c.RegisterType("t", typ, nil)
c.RegisterType("t", typ)
// Configure the type
triggerCh := make(chan time.Time)
@ -340,7 +340,7 @@ func TestCacheGet_blockingIndexError(t *testing.T) {
typ := TestType(t)
defer typ.AssertExpectations(t)
c := TestCache(t)
c.RegisterType("t", typ, nil)
c.RegisterType("t", typ)
// Configure the type
var retries uint32
@ -377,7 +377,7 @@ func TestCacheGet_emptyFetchResult(t *testing.T) {
typ := TestType(t)
defer typ.AssertExpectations(t)
c := TestCache(t)
c.RegisterType("t", typ, nil)
c.RegisterType("t", typ)
stateCh := make(chan int, 1)
@ -440,14 +440,15 @@ func TestCacheGet_emptyFetchResult(t *testing.T) {
func TestCacheGet_periodicRefresh(t *testing.T) {
t.Parallel()
typ := TestType(t)
defer typ.AssertExpectations(t)
c := TestCache(t)
c.RegisterType("t", typ, &RegisterOptions{
typ := &MockType{}
typ.On("RegisterOptions").Return(RegisterOptions{
Refresh: true,
RefreshTimer: 100 * time.Millisecond,
RefreshTimeout: 5 * time.Minute,
})
defer typ.AssertExpectations(t)
c := TestCache(t)
c.RegisterType("t", typ)
// This is a bit weird, but we do this to ensure that the final
// call to the Fetch (if it happens, depends on timing) just blocks.
@ -479,14 +480,15 @@ func TestCacheGet_periodicRefresh(t *testing.T) {
func TestCacheGet_periodicRefreshMultiple(t *testing.T) {
t.Parallel()
typ := TestType(t)
defer typ.AssertExpectations(t)
c := TestCache(t)
c.RegisterType("t", typ, &RegisterOptions{
typ := &MockType{}
typ.On("RegisterOptions").Return(RegisterOptions{
Refresh: true,
RefreshTimer: 0 * time.Millisecond,
RefreshTimeout: 5 * time.Minute,
})
defer typ.AssertExpectations(t)
c := TestCache(t)
c.RegisterType("t", typ)
// This is a bit weird, but we do this to ensure that the final
// call to the Fetch (if it happens, depends on timing) just blocks.
@ -527,14 +529,15 @@ func TestCacheGet_periodicRefreshMultiple(t *testing.T) {
func TestCacheGet_periodicRefreshErrorBackoff(t *testing.T) {
t.Parallel()
typ := TestType(t)
defer typ.AssertExpectations(t)
c := TestCache(t)
c.RegisterType("t", typ, &RegisterOptions{
typ := &MockType{}
typ.On("RegisterOptions").Return(RegisterOptions{
Refresh: true,
RefreshTimer: 0,
RefreshTimeout: 5 * time.Minute,
})
defer typ.AssertExpectations(t)
c := TestCache(t)
c.RegisterType("t", typ)
// Configure the type
var retries uint32
@ -568,14 +571,15 @@ func TestCacheGet_periodicRefreshErrorBackoff(t *testing.T) {
func TestCacheGet_periodicRefreshBadRPCZeroIndexErrorBackoff(t *testing.T) {
t.Parallel()
typ := TestType(t)
defer typ.AssertExpectations(t)
c := TestCache(t)
c.RegisterType("t", typ, &RegisterOptions{
typ := &MockType{}
typ.On("RegisterOptions").Return(RegisterOptions{
Refresh: true,
RefreshTimer: 0,
RefreshTimeout: 5 * time.Minute,
})
defer typ.AssertExpectations(t)
c := TestCache(t)
c.RegisterType("t", typ)
// Configure the type
var retries uint32
@ -611,14 +615,16 @@ func TestCacheGet_periodicRefreshBadRPCZeroIndexErrorBackoff(t *testing.T) {
func TestCacheGet_noIndexSetsOne(t *testing.T) {
t.Parallel()
typ := TestType(t)
typ := &MockType{}
typ.On("RegisterOptions").Return(RegisterOptions{
SupportsBlocking: true,
Refresh: true,
RefreshTimer: 0,
RefreshTimeout: 5 * time.Minute,
})
defer typ.AssertExpectations(t)
c := TestCache(t)
c.RegisterType("t", typ, &RegisterOptions{
Refresh: true,
RefreshTimer: 0,
RefreshTimeout: 5 * time.Minute,
})
c.RegisterType("t", typ)
// Simulate "well behaved" RPC with no data yet but returning 1
{
@ -671,15 +677,17 @@ func TestCacheGet_fetchTimeout(t *testing.T) {
require := require.New(t)
typ := TestType(t)
typ := &MockType{}
timeout := 10 * time.Minute
typ.On("RegisterOptions").Return(RegisterOptions{
RefreshTimeout: timeout,
SupportsBlocking: true,
})
defer typ.AssertExpectations(t)
c := TestCache(t)
// Register the type with a timeout
timeout := 10 * time.Minute
c.RegisterType("t", typ, &RegisterOptions{
RefreshTimeout: timeout,
})
c.RegisterType("t", typ)
// Configure the type
var actual time.Duration
@ -705,14 +713,15 @@ func TestCacheGet_expire(t *testing.T) {
require := require.New(t)
typ := TestType(t)
typ := &MockType{}
typ.On("RegisterOptions").Return(RegisterOptions{
LastGetTTL: 400 * time.Millisecond,
})
defer typ.AssertExpectations(t)
c := TestCache(t)
// Register the type with a timeout
c.RegisterType("t", typ, &RegisterOptions{
LastGetTTL: 400 * time.Millisecond,
})
c.RegisterType("t", typ)
// Configure the type
typ.Static(FetchResult{Value: 42}, nil).Times(2)
@ -760,14 +769,15 @@ func TestCacheGet_expireResetGet(t *testing.T) {
require := require.New(t)
typ := TestType(t)
typ := &MockType{}
typ.On("RegisterOptions").Return(RegisterOptions{
LastGetTTL: 150 * time.Millisecond,
})
defer typ.AssertExpectations(t)
c := TestCache(t)
// Register the type with a timeout
c.RegisterType("t", typ, &RegisterOptions{
LastGetTTL: 150 * time.Millisecond,
})
c.RegisterType("t", typ)
// Configure the type
typ.Static(FetchResult{Value: 42}, nil).Times(2)
@ -821,8 +831,8 @@ func TestCacheGet_duplicateKeyDifferentType(t *testing.T) {
defer typ2.AssertExpectations(t)
c := TestCache(t)
c.RegisterType("t", typ, nil)
c.RegisterType("t2", typ2, nil)
c.RegisterType("t", typ)
c.RegisterType("t2", typ2)
// Configure the types
typ.Static(FetchResult{Value: 100}, nil)
@ -863,7 +873,7 @@ func TestCacheGet_partitionDC(t *testing.T) {
t.Parallel()
c := TestCache(t)
c.RegisterType("t", &testPartitionType{}, nil)
c.RegisterType("t", &testPartitionType{})
// Perform multiple gets
getCh1 := TestCacheGetCh(t, c, "t", TestRequest(t, RequestInfo{
@ -882,7 +892,7 @@ func TestCacheGet_partitionToken(t *testing.T) {
t.Parallel()
c := TestCache(t)
c.RegisterType("t", &testPartitionType{}, nil)
c.RegisterType("t", &testPartitionType{})
// Perform multiple gets
getCh1 := TestCacheGetCh(t, c, "t", TestRequest(t, RequestInfo{
@ -907,8 +917,10 @@ func (t *testPartitionType) Fetch(opts FetchOptions, r Request) (FetchResult, er
}, nil
}
func (t *testPartitionType) SupportsBlocking() bool {
return true
func (t *testPartitionType) RegisterOptions() RegisterOptions {
return RegisterOptions{
SupportsBlocking: true,
}
}
// Test that background refreshing reports correct Age in failure and happy
@ -918,14 +930,15 @@ func TestCacheGet_refreshAge(t *testing.T) {
require := require.New(t)
typ := TestType(t)
defer typ.AssertExpectations(t)
c := TestCache(t)
c.RegisterType("t", typ, &RegisterOptions{
typ := &MockType{}
typ.On("RegisterOptions").Return(RegisterOptions{
Refresh: true,
RefreshTimer: 0,
RefreshTimeout: 5 * time.Minute,
})
defer typ.AssertExpectations(t)
c := TestCache(t)
c.RegisterType("t", typ)
// Configure the type
var index, shouldFail uint64
@ -1035,13 +1048,14 @@ func TestCacheGet_nonRefreshAge(t *testing.T) {
require := require.New(t)
typ := TestType(t)
defer typ.AssertExpectations(t)
c := TestCache(t)
c.RegisterType("t", typ, &RegisterOptions{
typ := &MockType{}
typ.On("RegisterOptions").Return(RegisterOptions{
Refresh: false,
LastGetTTL: 100 * time.Millisecond,
})
defer typ.AssertExpectations(t)
c := TestCache(t)
c.RegisterType("t", typ)
// Configure the type
var index uint64
@ -1121,7 +1135,7 @@ func TestCacheGet_nonBlockingType(t *testing.T) {
typ := TestTypeNonBlocking(t)
c := TestCache(t)
c.RegisterType("t", typ, nil)
c.RegisterType("t", typ)
// Configure the type
typ.Static(FetchResult{Value: 42, Index: 1}, nil).Once()

View file

@ -1,4 +1,5 @@
// Code generated by mockery v1.0.0
// Code generated by mockery v1.0.0. DO NOT EDIT.
package cache
import mock "github.com/stretchr/testify/mock"

View file

@ -1,4 +1,5 @@
// Code generated by mockery v1.0.0. DO NOT EDIT.
package cache
import mock "github.com/stretchr/testify/mock"
@ -29,15 +30,15 @@ func (_m *MockType) Fetch(_a0 FetchOptions, _a1 Request) (FetchResult, error) {
return r0, r1
}
// SupportsBlocking provides a mock function with given fields:
func (_m *MockType) SupportsBlocking() bool {
// RegisterOptions provides a mock function with given fields:
func (_m *MockType) RegisterOptions() RegisterOptions {
ret := _m.Called()
var r0 bool
if rf, ok := ret.Get(0).(func() bool); ok {
var r0 RegisterOptions
if rf, ok := ret.Get(0).(func() RegisterOptions); ok {
r0 = rf()
} else {
r0 = ret.Get(0).(bool)
r0 = ret.Get(0).(RegisterOptions)
}
return r0

View file

@ -96,20 +96,21 @@ func TestRequest(t testing.T, info RequestInfo) *MockRequest {
return req
}
// TestType returns a MockType that can be used to setup expectations
// on data fetching.
// TestType returns a MockType that sets default RegisterOptions.
func TestType(t testing.T) *MockType {
return testTypeInternal(t, true)
typ := &MockType{}
typ.On("RegisterOptions").Return(RegisterOptions{
SupportsBlocking: true,
})
return typ
}
// TestTypeNonBlocking returns a MockType that returns false to SupportsBlocking.
func TestTypeNonBlocking(t testing.T) *MockType {
return testTypeInternal(t, false)
}
func testTypeInternal(t testing.T, enableBlocking bool) *MockType {
typ := &MockType{}
typ.On("SupportsBlocking").Return(enableBlocking).Maybe()
typ.On("RegisterOptions").Return(RegisterOptions{
SupportsBlocking: false,
})
return typ
}

8
agent/cache/type.go vendored
View file

@ -30,11 +30,9 @@ type Type interface {
// metadata even when there is no result.
Fetch(FetchOptions, Request) (FetchResult, error)
// SupportsBlocking should return true if the type supports blocking queries.
// Types that do not support blocking queries will not be able to use
// background refresh nor will the cache attempt blocking fetches if the
// client requests them with MinIndex.
SupportsBlocking() bool
// RegisterOptions are used when the type is registered to configure the
// behaviour of cache entries for this type.
RegisterOptions() RegisterOptions
}
// FetchOptions are various settable options when a Fetch is called.

View file

@ -64,7 +64,7 @@ func (c *Cache) Notify(
return fmt.Errorf("unknown type in cache: %s", t)
}
if tEntry.Type.SupportsBlocking() {
if tEntry.Opts.SupportsBlocking {
go c.notifyBlockingQuery(ctx, tEntry, r, correlationID, ch)
return nil
}

View file

@ -17,11 +17,10 @@ func TestCacheNotify(t *testing.T) {
t.Parallel()
typ := TestType(t)
typ.On("RegisterOptions").Return(RegisterOptions{})
defer typ.AssertExpectations(t)
c := TestCache(t)
c.RegisterType("t", typ, &RegisterOptions{
Refresh: false,
})
c.RegisterType("t", typ)
// Setup triggers to control when "updates" should be delivered
trigger := make([]chan time.Time, 5)
@ -167,9 +166,7 @@ func TestCacheNotifyPolling(t *testing.T) {
typ := TestTypeNonBlocking(t)
defer typ.AssertExpectations(t)
c := TestCache(t)
c.RegisterType("t", typ, &RegisterOptions{
Refresh: false,
})
c.RegisterType("t", typ)
// Configure the type
typ.Static(FetchResult{Value: 1, Index: 1}, nil).Once().Run(func(args mock.Arguments) {
@ -280,11 +277,10 @@ func TestCacheWatch_ErrorBackoff(t *testing.T) {
t.Parallel()
typ := TestType(t)
typ.On("RegisterOptions").Return(RegisterOptions{})
defer typ.AssertExpectations(t)
c := TestCache(t)
c.RegisterType("t", typ, &RegisterOptions{
Refresh: false,
})
c.RegisterType("t", typ)
// Configure the type
var retries uint32
@ -345,9 +341,7 @@ func TestCacheWatch_ErrorBackoffNonBlocking(t *testing.T) {
typ := TestTypeNonBlocking(t)
defer typ.AssertExpectations(t)
c := TestCache(t)
c.RegisterType("t", typ, &RegisterOptions{
Refresh: false,
})
c.RegisterType("t", typ)
// Configure the type
var retries uint32

View file

@ -50,35 +50,13 @@ func NewTestCacheTypes(t testing.T) *TestCacheTypes {
// proxycfg will watch suitable for testing a proxycfg.State or Manager.
func TestCacheWithTypes(t testing.T, types *TestCacheTypes) *cache.Cache {
c := cache.TestCache(t)
c.RegisterType(cachetype.ConnectCARootName, types.roots, &cache.RegisterOptions{
Refresh: true,
RefreshTimer: 0,
RefreshTimeout: 10 * time.Minute,
})
c.RegisterType(cachetype.ConnectCALeafName, types.leaf, &cache.RegisterOptions{
Refresh: true,
RefreshTimer: 0,
RefreshTimeout: 10 * time.Minute,
})
c.RegisterType(cachetype.IntentionMatchName, types.intentions, &cache.RegisterOptions{
Refresh: true,
RefreshTimer: 0,
RefreshTimeout: 10 * time.Minute,
})
c.RegisterType(cachetype.HealthServicesName, types.health, &cache.RegisterOptions{
Refresh: true,
RefreshTimer: 0,
RefreshTimeout: 10 * time.Minute,
})
c.RegisterType(cachetype.PreparedQueryName, types.query, &cache.RegisterOptions{
Refresh: false,
})
c.RegisterType(cachetype.CompiledDiscoveryChainName, types.compiledChain, &cache.RegisterOptions{
Refresh: true,
RefreshTimer: 0,
RefreshTimeout: 10 * time.Minute,
})
c.RegisterType(cachetype.ServiceHTTPChecksName, types.serviceHTTPChecks, &cache.RegisterOptions{})
c.RegisterType(cachetype.ConnectCARootName, types.roots)
c.RegisterType(cachetype.ConnectCALeafName, types.leaf)
c.RegisterType(cachetype.IntentionMatchName, types.intentions)
c.RegisterType(cachetype.HealthServicesName, types.health)
c.RegisterType(cachetype.PreparedQueryName, types.query)
c.RegisterType(cachetype.CompiledDiscoveryChainName, types.compiledChain)
c.RegisterType(cachetype.ServiceHTTPChecksName, types.serviceHTTPChecks)
return c
}
@ -1221,7 +1199,10 @@ func (ct *ControllableCacheType) Fetch(opts cache.FetchOptions, req cache.Reques
}, nil
}
// SupportsBlocking implements cache.Type
func (ct *ControllableCacheType) SupportsBlocking() bool {
return ct.blocking
func (ct *ControllableCacheType) RegisterOptions() cache.RegisterOptions {
return cache.RegisterOptions{
Refresh: ct.blocking,
SupportsBlocking: ct.blocking,
RefreshTimeout: 10 * time.Minute,
}
}